412 lines
15 KiB
Python
Executable file
412 lines
15 KiB
Python
Executable file
#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/common/pywrap", sys.argv)
|
|
|
|
# Run this with --help to see available options for tracing and debugging
|
|
# See https://github.com/cockpit-project/cockpit/blob/master/test/common/testlib.py
|
|
# "class Browser" and "class MachineCase" for the available API.
|
|
|
|
import testlib
|
|
|
|
import time
|
|
import json
|
|
import configparser
|
|
|
|
# Nondestructive tests all run in the same running VM. This allows them to run in Packit, Fedora, and RHEL dist-git gating
|
|
# They must not permanently change any file or configuration on the system in a way that influences other tests.
|
|
@testlib.nondestructive
|
|
class TestApplication(testlib.MachineCase):
|
|
def _login(self, loc="/session-recording", wait="#app"):
|
|
self.login_and_go(loc)
|
|
b = self.browser
|
|
m = self.machine
|
|
b.wait_visible(wait)
|
|
self.allow_journal_messages('.*type=1400.*avc: denied .* comm="systemctl".*')
|
|
self.allow_journal_messages('.*invalid non-UTF8.*web_socket_connection_send.*')
|
|
self.allow_journal_messages('.*Locale charset.*ANSI.*')
|
|
self.allow_journal_messages('.*Assuming locale environment.*UTF-8.*')
|
|
return b, m
|
|
|
|
def _sel_rec(self, recording):
|
|
'''
|
|
rec1:
|
|
whoami
|
|
id
|
|
echo thisisatest123
|
|
sleep 16
|
|
echo thisisanothertest456
|
|
exit
|
|
|
|
rec2:
|
|
echo "Extra Commands"
|
|
sudo systemctl daemon-reload
|
|
sudo ssh root@localhost
|
|
exit
|
|
|
|
binaryrec:
|
|
mc
|
|
exit
|
|
'''
|
|
recordings = {'rec1': '0f25700a28c44b599869745e5fda8b0c-7106-121e79',
|
|
'rec2': '0f25700a28c44b599869745e5fda8b0c-7623-135541',
|
|
'binaryrec': '976e4ef1d66741848ed35f7600b94c5c-1a0f-c1ae'}
|
|
|
|
page = recordings[recording]
|
|
|
|
self.browser.go(f"/session-recording#/{page}")
|
|
|
|
def _term_line(self, lineno):
|
|
return f".xterm-accessibility-tree div:nth-child({lineno})"
|
|
|
|
def testPlay(self):
|
|
b, _ = self._login()
|
|
self._sel_rec('rec1')
|
|
b.click("#player-play-pause")
|
|
b.wait_in_text(self._term_line(1), "localhost")
|
|
|
|
def testPlayBinary(self):
|
|
b, _ = self._login()
|
|
self._sel_rec('binaryrec')
|
|
b.click("#player-play-pause")
|
|
time.sleep(5)
|
|
b.wait_in_text(self._term_line(4), "exit")
|
|
|
|
def testFastforwardControls(self):
|
|
progress = ".pf-v5-c-progress__indicator"
|
|
|
|
b, _ = self._login()
|
|
self._sel_rec('rec1')
|
|
# fast forward
|
|
b.click("#player-fast-forward")
|
|
b.wait_in_text(self._term_line(12), "exit")
|
|
b.wait_attr(progress, "style", "width: 100%;")
|
|
# test restart playback
|
|
b.click("#player-restart")
|
|
b.click("#player-play-pause")
|
|
b.wait_text(self._term_line(7), "thisisatest123")
|
|
with b.wait_timeout(100):
|
|
b.wait_attr(progress, "style", "width: 100%;")
|
|
|
|
def testSpeedControls(self):
|
|
b, _ = self._login()
|
|
self._sel_rec('rec1')
|
|
# increase speed
|
|
b.wait_visible("#player-speed-up")
|
|
b.click("#player-speed-up")
|
|
b.wait_text("#player-speed", "x2")
|
|
b.click("#player-speed-up")
|
|
b.wait_text("#player-speed", "x4")
|
|
b.click("#player-speed-up")
|
|
b.wait_text("#player-speed", "x8")
|
|
b.click("#player-speed-up")
|
|
b.wait_text("#player-speed", "x16")
|
|
# decrease speed
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "x8")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "x4")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "x2")
|
|
b.click("#player-speed-down")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/2")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/4")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/8")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/16")
|
|
# restore speed
|
|
b.click(".pf-v5-c-chip .pf-v5-c-button")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/2")
|
|
|
|
def testZoomControls(self):
|
|
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
|
|
zoom_one_scale_sel = '.console-ct[style^="transform: scale(1.1)"]'
|
|
zoom_two_scale_sel = '.console-ct[style^="transform: scale(1.2)"]'
|
|
zoom_three_scale_sel = '.console-ct[style^="transform: scale(1.3)"]'
|
|
zoom_fit_to = (
|
|
'.console-ct[style*="translate(-50%, -50%)"]'
|
|
'[style*="top: 50%"]'
|
|
'[style*="left: 50%"]'
|
|
)
|
|
|
|
b, _ = self._login()
|
|
self._sel_rec('rec1')
|
|
# Wait for terminal with scale(1)
|
|
b.wait_visible(default_scale_sel)
|
|
# Zoom in x3
|
|
b.click("#player-zoom-in")
|
|
b.wait_visible(zoom_one_scale_sel)
|
|
b.click("#player-zoom-in")
|
|
b.wait_visible(zoom_two_scale_sel)
|
|
b.click("#player-zoom-in")
|
|
b.wait_visible(zoom_three_scale_sel)
|
|
# Zoom Out
|
|
b.click("#player-zoom-out")
|
|
b.wait_visible(zoom_two_scale_sel)
|
|
# Fit zoom to screen
|
|
b.click("#player-fit-to")
|
|
b.wait_visible(zoom_fit_to)
|
|
|
|
def testSkipFrame(self):
|
|
b, _ = self._login()
|
|
self._sel_rec('rec1')
|
|
b.wait_visible(self._term_line(1))
|
|
# loop until 3 valid frames have passed
|
|
while "localhost" not in b.text(self._term_line(1)):
|
|
b.click("#player-skip-frame")
|
|
b.wait_in_text(self._term_line(1), "localhost")
|
|
|
|
def testPlaybackPause(self):
|
|
b, _ = self._login()
|
|
self._sel_rec('rec1')
|
|
# Start and pause the player
|
|
b.click("#player-restart")
|
|
b.click("#player-play-pause")
|
|
b.click("#player-play-pause")
|
|
time.sleep(10)
|
|
# Make sure it didn't keep playing
|
|
b.wait_not_in_text(self._term_line(6), "thisisatest123")
|
|
# Test if it can start playing again
|
|
b.click("#player-play-pause")
|
|
|
|
def testSessionRecordingConf(self):
|
|
b, m = self._login()
|
|
b.click("#btn-config")
|
|
|
|
# TLOG config
|
|
conf_file_path = "/etc/tlog/"
|
|
conf_file = f"{conf_file_path}tlog-rec-session.conf"
|
|
save_file = "/tmp/tlog-rec-session.conf"
|
|
test_file = "/tmp/test-tlog-rec-session.conf"
|
|
|
|
# Save the existing config
|
|
b.click("#btn-save-tlog-conf")
|
|
m.download(conf_file, save_file)
|
|
# Change all of the fields
|
|
b.set_input_text("#shell", "/test/path/shell")
|
|
b.set_input_text("#notice", "Test Notice")
|
|
b.set_input_text("#latency", "1")
|
|
b.set_input_text("#payload", "2")
|
|
b.set_checked("#log_input", True)
|
|
b.set_checked("#log_output", False)
|
|
b.set_checked("#log_window", False)
|
|
b.set_input_text("#limit_rate", "3")
|
|
b.set_input_text("#limit_burst", "4")
|
|
b.set_val("#limit_action", "drop")
|
|
b.set_input_text("#file_path", "/test/path/file")
|
|
b.set_input_text("#syslog_facility", "testfac")
|
|
b.set_val("#syslog_priority", "info")
|
|
b.set_val("#journal_priority", "info")
|
|
b.set_checked("#journal_augment", False)
|
|
b.set_val("#writer", "file")
|
|
b.click("#btn-save-tlog-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, test_file)
|
|
# Revert to the previous config before testing to ensure test continuity
|
|
m.upload([save_file], conf_file_path)
|
|
# Check that the config reflects the changes
|
|
conf = json.load(open(test_file, "r"))
|
|
self.assertEqual(
|
|
json.dumps(conf),
|
|
json.dumps(
|
|
{
|
|
"shell": "/test/path/shell",
|
|
"notice": "Test Notice",
|
|
"latency": 1,
|
|
"payload": 2,
|
|
"log": {"input": True, "output": False, "window": False},
|
|
"limit": {"rate": 3, "burst": 4, "action": "drop"},
|
|
"file": {"path": "/test/path/file"},
|
|
"syslog": {"facility": "testfac", "priority": "info"},
|
|
"journal": {"priority": "info", "augment": False},
|
|
"writer": "file",
|
|
}
|
|
),
|
|
)
|
|
|
|
# SSSD config
|
|
conf_file_path = "/etc/sssd/conf.d/"
|
|
conf_file = f"{conf_file_path}sssd-session-recording.conf"
|
|
save_file = "/tmp/sssd-session-recording.conf"
|
|
test_none_file = "/tmp/test-none-sssd-session-recording.conf"
|
|
test_some_file = "/tmp/test-some-sssd-session-recording.conf"
|
|
test_all_file = "/tmp/test-all-sssd-session-recording.conf"
|
|
|
|
# Save the existing config
|
|
b.click("#btn-save-sssd-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, save_file)
|
|
# Download test with scope 'Some'
|
|
b.set_val("#scope", "some")
|
|
b.set_input_text("#users", "test users")
|
|
b.set_input_text("#groups", "test groups")
|
|
b.click("#btn-save-sssd-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, test_some_file)
|
|
# Download test with scope 'All'
|
|
b.set_val("#scope", "all")
|
|
b.set_input_text("#exclude_users", "testuser1")
|
|
b.set_input_text("#exclude_groups", "testgroup1")
|
|
b.click("#btn-save-sssd-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, test_all_file)
|
|
# Download test with scope 'None'
|
|
b.set_val("#scope", "none")
|
|
b.click("#btn-save-sssd-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, test_none_file)
|
|
# Revert to the previous config before testing to ensure test continuity
|
|
m.upload([save_file], conf_file_path)
|
|
# Check that the configs reflected the changes
|
|
conf = configparser.ConfigParser()
|
|
conf.read_file(open(test_some_file, "r"))
|
|
self.assertEqual(conf["session_recording"]["scope"], "some")
|
|
self.assertEqual(conf["session_recording"]["users"], "test users")
|
|
self.assertEqual(conf["session_recording"]["groups"], "test groups")
|
|
conf.read_file(open(test_all_file, "r"))
|
|
self.assertEqual(conf["session_recording"]["scope"], "all")
|
|
self.assertEqual(conf["session_recording"]["exclude_users"], "testuser1")
|
|
self.assertEqual(conf["session_recording"]["exclude_groups"], "testgroup1")
|
|
self.assertEqual(conf["domain/nssfiles"]["id_provider"], "proxy")
|
|
self.assertEqual(conf["domain/nssfiles"]["proxy_lib_name"], "files")
|
|
self.assertEqual(conf["domain/nssfiles"]["proxy_pam_target"], "sssd-shadowutils")
|
|
conf.read_file(open(test_none_file, "r"))
|
|
self.assertEqual(conf["session_recording"]["scope"], "none")
|
|
|
|
def testDisplayDrag(self):
|
|
b, _ = self._login()
|
|
self._sel_rec('rec1')
|
|
# start playback and pause in middle
|
|
b.click("#player-play-pause")
|
|
b.wait_in_text(self._term_line(1), "localhost")
|
|
b.click("#player-play-pause")
|
|
# zoom in so that the whole screen is no longer visible
|
|
b.click("#player-zoom-in")
|
|
b.click("#player-zoom-in")
|
|
# select and ensure drag'n'pan mode
|
|
b.click("#player-drag-pan")
|
|
# scroll and check for screen movement
|
|
b.mouse(".dragnpan", "mousedown", 200, 200)
|
|
b.mouse(".dragnpan", "mousemove", 10, 10)
|
|
self.assertNotEqual(b.attr(".dragnpan", "scrollTop"), 0)
|
|
self.assertNotEqual(b.attr(".dragnpan", "scrollLeft"), 0)
|
|
|
|
def testLogCorrelation(self):
|
|
b, m = self._login()
|
|
# make sure system is on expected timezone EST
|
|
m.execute("timedatectl set-timezone America/New_York")
|
|
# select the recording with the extra logs
|
|
self._sel_rec('rec2')
|
|
b.click("#btn-logs-view .pf-v5-c-expandable-section__toggle")
|
|
# fast forward until the end
|
|
while "exit" not in b.text(self._term_line(22)):
|
|
b.click("#player-skip-frame")
|
|
# check for extra log entries
|
|
b.wait_visible(".pf-v5-c-data-list:contains('authentication failure')")
|
|
|
|
def testZoomSpeedControls(self):
|
|
b, m = self._login()
|
|
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
|
|
self._sel_rec('rec1')
|
|
# set speed x16 and begin playing
|
|
for _ in range(4):
|
|
b.click("#player-speed-up")
|
|
b.wait_visible(default_scale_sel)
|
|
b.click("#player-play-pause")
|
|
# wait until sleeping and zoom in
|
|
b.wait_in_text(self._term_line(8), "sleep")
|
|
b.click("#player-zoom-in")
|
|
b.wait_not_present(default_scale_sel)
|
|
# zoom out while typing fast
|
|
b.wait_in_text(self._term_line(9), "localhost")
|
|
b.click("#player-zoom-out")
|
|
b.wait_not_present(default_scale_sel)
|
|
|
|
def _filter(self, inp, occ_dict):
|
|
m = self.machine
|
|
|
|
m.execute("timedatectl set-timezone America/New_York")
|
|
# ignore errors from half-entered timestamps due to searches occuring
|
|
# before `set_input_text` is complete
|
|
self.allow_journal_messages(".*timestamp.*")
|
|
# login and test inputs
|
|
b, _ = self._login()
|
|
time.sleep(5)
|
|
for occ in occ_dict:
|
|
for term in occ_dict[occ]:
|
|
# enter the search term and wait for the results to return
|
|
b.set_input_text(inp, term)
|
|
time.sleep(5)
|
|
self.assertEqual(b.text(".pf-v5-c-table").count("contractor"), occ)
|
|
|
|
def testSearch(self):
|
|
self._filter(
|
|
"#filter-search",
|
|
{
|
|
0: {
|
|
"this should return nothing",
|
|
"this should also return nothing",
|
|
"0123456789",
|
|
},
|
|
1: {
|
|
"extra commands",
|
|
"whoami",
|
|
"ssh",
|
|
"thisisatest123",
|
|
"thisisanothertest456",
|
|
},
|
|
2: {
|
|
"id",
|
|
"localhost",
|
|
"exit",
|
|
"actor",
|
|
"contractor",
|
|
"contractor1@localhost",
|
|
},
|
|
},
|
|
)
|
|
|
|
def testFilterUsername(self):
|
|
self._filter(
|
|
"#filter-username",
|
|
{
|
|
0: {"test", "contact", "contractor", "contractor11", "contractor4"},
|
|
2: {"contractor1"},
|
|
},
|
|
)
|
|
|
|
def testFilterSince(self):
|
|
self._filter(
|
|
"#filter-since",
|
|
{
|
|
0: {"2020-06-02", "2020-06-01 12:31:00"},
|
|
1: {"2020-06-01 12:17:01", "2020-06-01 12:30:50"},
|
|
2: {"2020-06-01", "2020-06-01 12:17:00"},
|
|
},
|
|
)
|
|
|
|
def testFilterUntil(self):
|
|
self._filter(
|
|
"#filter-until",
|
|
{
|
|
0: {"2020-06-01", "2020-06-01 12:16"},
|
|
1: {"2020-06-01 12:17", "2020-06-01 12:29"},
|
|
2: {"2020-06-02", "2020-06-01 12:31:00"},
|
|
},
|
|
)
|
|
|
|
def testAppMenu(self):
|
|
srrow = ".app-list .pf-v5-c-data-list__item-row:" \
|
|
"contains('Session Recording')"
|
|
srbut = "{} button:contains('Session Recording')" \
|
|
"".format(srrow)
|
|
b, _ = self._login("/apps", srrow)
|
|
self.allow_journal_messages(".*chromium-browser.appdata.xml.*",
|
|
".*xml.etree.ElementTree.ParseError:.*")
|
|
b.click(srbut)
|
|
b.enter_page("/session-recording")
|
|
b.wait_visible("#app")
|
|
|
|
if __name__ == "__main__":
|
|
testlib.test_main()
|