#!/usr/bin/python3 # Run this with --help to see available options for tracing and debugging # See https://github.com/cockpit-project/cockpit/blob/master/test/common/testlib.py # "class Browser" and "class MachineCase" for the available API. import os import sys import time import json import configparser # import Cockpit's machinery for test VMs and its browser test API TEST_DIR = os.path.dirname(__file__) sys.path.append(os.path.join(TEST_DIR, "common")) sys.path.append(os.path.join(os.path.dirname(TEST_DIR), "bots/machine")) from testlib import * # Test with pre-recorded journal with tlog UID 981 class TestApplication(MachineCase): def _login(self): self.login_and_go("/session-recording") b = self.browser m = self.machine b.wait_present("#app") return b, m def _sel_rec(self, recording): ''' rec1: whoami id echo thisisatest123 sleep 16 echo thisisanothertest456 exit rec2: echo "Extra Commands" sudo systemctl daemon-reload sudo ssh root@localhost exit binaryrec: cat /usr/bin/gzip ''' recordings = {'rec1': '0f25700a28c44b599869745e5fda8b0c-7106-121e79', 'rec2': '0f25700a28c44b599869745e5fda8b0c-7623-135541', 'binaryrec': '6c652ee938b3485894dbacbb8c7c2c61-5a4-38a5'} page = recordings[recording] self.browser.go(f"/session-recording#/{page}") def _term_line(self, lineno): return f".xterm-accessibility-tree div:nth-child({lineno})" def testPlay(self): b, _ = self._login() self._sel_rec('rec1') b.click("#player-play-pause") b.wait_in_text(self._term_line(1), "localhost") def testPlayBinary(self): b, _ = self._login() self._sel_rec('binaryrec') b.click("#player-play-pause") time.sleep(5) b.wait_in_text(self._term_line(21), "�") b.wait_in_text(self._term_line(25), "exit") def testFastforwardControls(self): progress = ".pf-c-progress__indicator" b, _ = self._login() self._sel_rec('rec1') # fast forward b.click("#player-fast-forward") b.wait_in_text(self._term_line(12), "exit") b.wait_attr(progress, "style", "width: 100%;") # test restart playback b.click("#player-restart") b.wait_text(self._term_line(1), "Blank line") b.wait_attr(progress, "style", "width: 100%;") def testSpeedControls(self): b, _ = self._login() self._sel_rec('rec1') # increase speed b.wait_present("#player-speed-up") b.click("#player-speed-up") b.wait_text("#player-speed", "x2") b.click("#player-speed-up") b.wait_text("#player-speed", "x4") b.click("#player-speed-up") b.wait_text("#player-speed", "x8") b.click("#player-speed-up") b.wait_text("#player-speed", "x16") # decrease speed b.click("#player-speed-down") b.wait_text("#player-speed", "x8") b.click("#player-speed-down") b.wait_text("#player-speed", "x4") b.click("#player-speed-down") b.wait_text("#player-speed", "x2") b.click("#player-speed-down") b.click("#player-speed-down") b.wait_text("#player-speed", "/2") b.click("#player-speed-down") b.wait_text("#player-speed", "/4") b.click("#player-speed-down") b.wait_text("#player-speed", "/8") b.click("#player-speed-down") b.wait_text("#player-speed", "/16") # restore speed b.click(".pf-c-chip .pf-c-button") b.click("#player-speed-down") b.wait_text("#player-speed", "/2") def testZoomControls(self): default_scale_sel = '.console-ct[style^="transform: scale(1)"]' zoom_one_scale_sel = '.console-ct[style^="transform: scale(1.1)"]' zoom_two_scale_sel = '.console-ct[style^="transform: scale(1.2)"]' zoom_three_scale_sel = '.console-ct[style^="transform: scale(1.3)"]' zoom_fit_to = ( '.console-ct[style*="translate(-50%, -50%)"]' '[style*="top: 50%"]' '[style*="left: 50%"]' ) b, _ = self._login() self._sel_rec('rec1') # Wait for terminal with scale(1) b.wait_present(default_scale_sel) # Zoom in x3 b.click("#player-zoom-in") b.wait_present(zoom_one_scale_sel) b.click("#player-zoom-in") b.wait_present(zoom_two_scale_sel) b.click("#player-zoom-in") b.wait_present(zoom_three_scale_sel) # Zoom Out b.click("#player-zoom-out") b.wait_present(zoom_two_scale_sel) # Fit zoom to screen b.click("#player-fit-to") b.wait_present(zoom_fit_to) def testSkipFrame(self): b, _ = self._login() self._sel_rec('rec1') b.wait_present(self._term_line(1)) # loop until 3 valid frames have passed while "localhost" not in b.text(self._term_line(1)): b.click("#player-skip-frame") b.wait_in_text(self._term_line(1), "localhost") def testPlaybackPause(self): b, _ = self._login() self._sel_rec('rec1') # Start and pause the player b.click("#player-restart") b.click("#player-play-pause") b.click("#player-play-pause") time.sleep(10) # Make sure it didn't keep playing b.wait_not_in_text(self._term_line(1), "whoami") # Test if it can start playing again b.click("#player-play-pause") def testSessionRecordingConf(self): b, m = self._login() b.click("#btn-config") # TLOG config conf_file_path = "/etc/tlog/" conf_file = f"{conf_file_path}tlog-rec-session.conf" save_file = "/tmp/tlog-rec-session.conf" test_file = "/tmp/test-tlog-rec-session.conf" # Save the existing config b.click("#btn-save-tlog-conf") m.download(conf_file, save_file) # Change all of the fields b.set_input_text("#shell", "/test/path/shell") b.set_input_text("#notice", "Test Notice") b.set_input_text("#latency", "1") b.set_input_text("#payload", "2") b.set_checked("#log_input", True) b.set_checked("#log_output", False) b.set_checked("#log_window", False) b.set_input_text("#limit_rate", "3") b.set_input_text("#limit_burst", "4") b.set_val("#limit_action", "drop") b.set_input_text("#file_path", "/test/path/file") b.set_input_text("#syslog_facility", "testfac") b.set_val("#syslog_priority", "info") b.set_val("#journal_priority", "info") b.set_checked("#journal_augment", False) b.set_val("#writer", "file") b.click("#btn-save-tlog-conf") time.sleep(1) m.download(conf_file, test_file) # Revert to the previous config before testing to ensure test continuity m.upload([save_file], conf_file_path) # Check that the config reflects the changes conf = json.load(open(test_file, "r")) self.assertEqual( json.dumps(conf), json.dumps( { "shell": "/test/path/shell", "notice": "Test Notice", "latency": 1, "payload": 2, "log": {"input": True, "output": False, "window": False}, "limit": {"rate": 3, "burst": 4, "action": "drop"}, "file": {"path": "/test/path/file"}, "syslog": {"facility": "testfac", "priority": "info"}, "journal": {"priority": "info", "augment": False}, "writer": "file", } ), ) # SSSD config conf_file_path = "/etc/sssd/conf.d/" conf_file = f"{conf_file_path}sssd-session-recording.conf" save_file = "/tmp/sssd-session-recording.conf" test_none_file = "/tmp/test-none-sssd-session-recording.conf" test_some_file = "/tmp/test-some-sssd-session-recording.conf" test_all_file = "/tmp/test-all-sssd-session-recording.conf" # Save the existing config b.click("#btn-save-sssd-conf") time.sleep(1) m.download(conf_file, save_file) # Download test with scope 'None' b.set_val("#scope", "none") b.click("#btn-save-sssd-conf") time.sleep(1) m.download(conf_file, test_none_file) # Download test with scope 'Some' b.set_val("#scope", "some") b.set_input_text("#users", "test users") b.set_input_text("#groups", "test groups") b.click("#btn-save-sssd-conf") time.sleep(1) m.download(conf_file, test_some_file) # Download test with scope 'All' b.set_val("#scope", "all") b.click("#btn-save-sssd-conf") time.sleep(1) m.download(conf_file, test_all_file) # Revert to the previous config before testing to ensure test continuity m.upload([save_file], conf_file_path) # Check that the configs reflected the changes conf = configparser.ConfigParser() conf.read_file(open(test_none_file, "r")) self.assertEqual(conf["session_recording"]["scope"], "none") conf.read_file(open(test_some_file, "r")) self.assertEqual(conf["session_recording"]["scope"], "some") self.assertEqual(conf["session_recording"]["users"], "test users") self.assertEqual(conf["session_recording"]["groups"], "test groups") conf.read_file(open(test_all_file, "r")) self.assertEqual(conf["session_recording"]["scope"], "all") def testDisplayDrag(self): b, _ = self._login() self._sel_rec('rec1') # start playback and pause in middle b.click("#player-play-pause") b.wait_in_text(self._term_line(1), "localhost") b.click("#player-play-pause") # zoom in so that the whole screen is no longer visible b.click("#player-zoom-in") b.click("#player-zoom-in") # select and ensure drag'n'pan mode b.click("#player-drag-pan") # scroll and check for screen movement b.mouse(".dragnpan", "mousedown", 200, 200) b.mouse(".dragnpan", "mousemove", 10, 10) self.assertNotEqual(b.attr(".dragnpan", "scrollTop"), 0) self.assertNotEqual(b.attr(".dragnpan", "scrollLeft"), 0) def testLogCorrelation(self): b, _ = self._login() # select the recording with the extra logs self._sel_rec('rec2') b.click("#btn-logs-view .pf-c-expandable-section__toggle") # fast forward until the end while "exit" not in b.text(self._term_line(22)): b.click("#player-skip-frame") # check for extra log entries b.wait_present(".pf-c-data-list:contains('authentication failure')") def testZoomSpeedControls(self): default_scale_sel = '.console-ct[style^="transform: scale(1)"]' play_scale_sel = '.console-ct[style^="transform: scale(0.4"]' zoom_one_scale_sel = '.console-ct[style^="transform: scale(0.5"]' b, _ = self._login() self._sel_rec('rec1') # set speed x16 and begin playing, expecting a size adjustment for _ in range(4): b.click("#player-speed-up") b.wait_present(default_scale_sel) b.click("#player-play-pause") b.wait_present(play_scale_sel) # wait until sleeping and zoom in b.wait_in_text(self._term_line(8), "sleep") b.click("#player-zoom-in") b.wait_present(zoom_one_scale_sel) # zoom out while typing fast b.wait_in_text(self._term_line(9), "localhost") b.click("#player-zoom-out") b.wait_present(play_scale_sel) def _filter(self, inp, occ_dict): # ignore errors from half-entered timestamps due to searches occuring # before `set_input_text` is complete self.allow_journal_messages(".*timestamp.*") # login and test inputs b, _ = self._login() for occ in occ_dict: for term in occ_dict[occ]: # enter the search term and wait for the results to return b.set_input_text(inp, term) time.sleep(5) self.assertEqual(b.text(".pf-c-table").count("contractor"), occ) def testSearch(self): self._filter( "#filter-search", { 0: { "this should return nothing", "this should also return nothing", "0123456789", }, 1: { "extra commands", "whoami", "ssh", "thisisatest123", "thisisanothertest456", }, 2: { "id", "localhost", "exit", "actor", "contractor", "contractor1@localhost", }, }, ) def testFilterUsername(self): self._filter( "#filter-username", { 0: {"test", "contact", "contractor", "contractor11", "contractor4"}, 2: {"contractor1"}, }, ) def testFilterSince(self): self._filter( "#filter-since", { 0: {"2020-06-02", "2020-06-01 12:31:00"}, 1: {"2020-06-01 12:17:01", "2020-06-01 12:30:50"}, 2: {"2020-06-01", "2020-06-01 12:17:00"}, }, ) def testFilterUntil(self): self._filter( "#filter-until", { 0: {"2020-06-01", "2020-06-01 12:16"}, 1: {"2020-06-01 12:17", "2020-06-01 12:29"}, 2: {"2020-06-02", "2020-06-01 12:31:00"}, }, ) if __name__ == "__main__": test_main()