Also relax the scale (value) assertion, as the scale value switches between 0.X values in different environments.
406 lines
15 KiB
Python
Executable file
406 lines
15 KiB
Python
Executable file
#!/usr/bin/python3
|
||
# Run this with --help to see available options for tracing and debugging
|
||
# See https://github.com/cockpit-project/cockpit/blob/master/test/common/testlib.py
|
||
# "class Browser" and "class MachineCase" for the available API.
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import json
|
||
import configparser
|
||
|
||
# import Cockpit's machinery for test VMs and its browser test API
|
||
TEST_DIR = os.path.dirname(__file__)
|
||
sys.path.append(os.path.join(TEST_DIR, "common"))
|
||
sys.path.append(os.path.join(os.path.dirname(TEST_DIR), "bots/machine"))
|
||
import testlib
|
||
|
||
# Nondestructive tests all run in the same running VM. This allows them to run in Packit, Fedora, and RHEL dist-git gating
|
||
# They must not permanently change any file or configuration on the system in a way that influences other tests.
|
||
@testlib.nondestructive
|
||
class TestApplication(testlib.MachineCase):
|
||
def _login(self, loc="/session-recording", wait="#app"):
|
||
self.login_and_go(loc)
|
||
b = self.browser
|
||
m = self.machine
|
||
b.wait_visible(wait)
|
||
self.allow_journal_messages('.*type=1400.*avc: denied .* comm="systemctl".*')
|
||
return b, m
|
||
|
||
def _sel_rec(self, recording):
|
||
'''
|
||
rec1:
|
||
whoami
|
||
id
|
||
echo thisisatest123
|
||
sleep 16
|
||
echo thisisanothertest456
|
||
exit
|
||
|
||
rec2:
|
||
echo "Extra Commands"
|
||
sudo systemctl daemon-reload
|
||
sudo ssh root@localhost
|
||
exit
|
||
|
||
binaryrec:
|
||
cat /usr/bin/gzip
|
||
'''
|
||
recordings = {'rec1': '0f25700a28c44b599869745e5fda8b0c-7106-121e79',
|
||
'rec2': '0f25700a28c44b599869745e5fda8b0c-7623-135541',
|
||
'binaryrec': '6c652ee938b3485894dbacbb8c7c2c61-5a4-38a5'}
|
||
|
||
page = recordings[recording]
|
||
|
||
self.browser.go(f"/session-recording#/{page}")
|
||
|
||
def _term_line(self, lineno):
|
||
return f".xterm-accessibility-tree div:nth-child({lineno})"
|
||
|
||
def testPlay(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
b.click("#player-play-pause")
|
||
b.wait_in_text(self._term_line(1), "localhost")
|
||
|
||
def testPlayBinary(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('binaryrec')
|
||
b.click("#player-play-pause")
|
||
time.sleep(5)
|
||
b.wait_in_text(self._term_line(21), "<EFBFBD>")
|
||
b.wait_in_text(self._term_line(25), "exit")
|
||
|
||
def testFastforwardControls(self):
|
||
progress = ".pf-c-progress__indicator"
|
||
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# fast forward
|
||
b.click("#player-fast-forward")
|
||
b.wait_in_text(self._term_line(12), "exit")
|
||
b.wait_attr(progress, "style", "width: 100%;")
|
||
# test restart playback
|
||
b.click("#player-restart")
|
||
b.wait_text(self._term_line(7), "thisisatest123")
|
||
b.wait_attr(progress, "style", "width: 100%;")
|
||
|
||
def testSpeedControls(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# increase speed
|
||
b.wait_visible("#player-speed-up")
|
||
b.click("#player-speed-up")
|
||
b.wait_text("#player-speed", "x2")
|
||
b.click("#player-speed-up")
|
||
b.wait_text("#player-speed", "x4")
|
||
b.click("#player-speed-up")
|
||
b.wait_text("#player-speed", "x8")
|
||
b.click("#player-speed-up")
|
||
b.wait_text("#player-speed", "x16")
|
||
# decrease speed
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "x8")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "x4")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "x2")
|
||
b.click("#player-speed-down")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/2")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/4")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/8")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/16")
|
||
# restore speed
|
||
b.click(".pf-c-chip .pf-c-button")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/2")
|
||
|
||
def testZoomControls(self):
|
||
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
|
||
zoom_one_scale_sel = '.console-ct[style^="transform: scale(1.1)"]'
|
||
zoom_two_scale_sel = '.console-ct[style^="transform: scale(1.2)"]'
|
||
zoom_three_scale_sel = '.console-ct[style^="transform: scale(1.3)"]'
|
||
zoom_fit_to = (
|
||
'.console-ct[style*="translate(-50%, -50%)"]'
|
||
'[style*="top: 50%"]'
|
||
'[style*="left: 50%"]'
|
||
)
|
||
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# Wait for terminal with scale(1)
|
||
b.wait_visible(default_scale_sel)
|
||
# Zoom in x3
|
||
b.click("#player-zoom-in")
|
||
b.wait_visible(zoom_one_scale_sel)
|
||
b.click("#player-zoom-in")
|
||
b.wait_visible(zoom_two_scale_sel)
|
||
b.click("#player-zoom-in")
|
||
b.wait_visible(zoom_three_scale_sel)
|
||
# Zoom Out
|
||
b.click("#player-zoom-out")
|
||
b.wait_visible(zoom_two_scale_sel)
|
||
# Fit zoom to screen
|
||
b.click("#player-fit-to")
|
||
b.wait_visible(zoom_fit_to)
|
||
|
||
def testSkipFrame(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
b.wait_visible(self._term_line(1))
|
||
# loop until 3 valid frames have passed
|
||
while "localhost" not in b.text(self._term_line(1)):
|
||
b.click("#player-skip-frame")
|
||
b.wait_in_text(self._term_line(1), "localhost")
|
||
|
||
def testPlaybackPause(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# Start and pause the player
|
||
b.click("#player-restart")
|
||
b.click("#player-play-pause")
|
||
b.click("#player-play-pause")
|
||
time.sleep(10)
|
||
# Make sure it didn't keep playing
|
||
b.wait_not_in_text(self._term_line(6), "thisisatest123")
|
||
# Test if it can start playing again
|
||
b.click("#player-play-pause")
|
||
|
||
def testSessionRecordingConf(self):
|
||
b, m = self._login()
|
||
b.click("#btn-config")
|
||
|
||
# TLOG config
|
||
conf_file_path = "/etc/tlog/"
|
||
conf_file = f"{conf_file_path}tlog-rec-session.conf"
|
||
save_file = "/tmp/tlog-rec-session.conf"
|
||
test_file = "/tmp/test-tlog-rec-session.conf"
|
||
|
||
# Save the existing config
|
||
b.click("#btn-save-tlog-conf")
|
||
m.download(conf_file, save_file)
|
||
# Change all of the fields
|
||
b.set_input_text("#shell", "/test/path/shell")
|
||
b.set_input_text("#notice", "Test Notice")
|
||
b.set_input_text("#latency", "1")
|
||
b.set_input_text("#payload", "2")
|
||
b.set_checked("#log_input", True)
|
||
b.set_checked("#log_output", False)
|
||
b.set_checked("#log_window", False)
|
||
b.set_input_text("#limit_rate", "3")
|
||
b.set_input_text("#limit_burst", "4")
|
||
b.set_val("#limit_action", "drop")
|
||
b.set_input_text("#file_path", "/test/path/file")
|
||
b.set_input_text("#syslog_facility", "testfac")
|
||
b.set_val("#syslog_priority", "info")
|
||
b.set_val("#journal_priority", "info")
|
||
b.set_checked("#journal_augment", False)
|
||
b.set_val("#writer", "file")
|
||
b.click("#btn-save-tlog-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, test_file)
|
||
# Revert to the previous config before testing to ensure test continuity
|
||
m.upload([save_file], conf_file_path)
|
||
# Check that the config reflects the changes
|
||
conf = json.load(open(test_file, "r"))
|
||
self.assertEqual(
|
||
json.dumps(conf),
|
||
json.dumps(
|
||
{
|
||
"shell": "/test/path/shell",
|
||
"notice": "Test Notice",
|
||
"latency": 1,
|
||
"payload": 2,
|
||
"log": {"input": True, "output": False, "window": False},
|
||
"limit": {"rate": 3, "burst": 4, "action": "drop"},
|
||
"file": {"path": "/test/path/file"},
|
||
"syslog": {"facility": "testfac", "priority": "info"},
|
||
"journal": {"priority": "info", "augment": False},
|
||
"writer": "file",
|
||
}
|
||
),
|
||
)
|
||
|
||
# SSSD config
|
||
conf_file_path = "/etc/sssd/conf.d/"
|
||
conf_file = f"{conf_file_path}sssd-session-recording.conf"
|
||
save_file = "/tmp/sssd-session-recording.conf"
|
||
test_none_file = "/tmp/test-none-sssd-session-recording.conf"
|
||
test_some_file = "/tmp/test-some-sssd-session-recording.conf"
|
||
test_all_file = "/tmp/test-all-sssd-session-recording.conf"
|
||
|
||
# Save the existing config
|
||
b.click("#btn-save-sssd-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, save_file)
|
||
# Download test with scope 'Some'
|
||
b.set_val("#scope", "some")
|
||
b.set_input_text("#users", "test users")
|
||
b.set_input_text("#groups", "test groups")
|
||
b.click("#btn-save-sssd-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, test_some_file)
|
||
# Download test with scope 'All'
|
||
b.set_val("#scope", "all")
|
||
b.set_input_text("#exclude_users", "testuser1")
|
||
b.set_input_text("#exclude_groups", "testgroup1")
|
||
b.click("#btn-save-sssd-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, test_all_file)
|
||
# Download test with scope 'None'
|
||
b.set_val("#scope", "none")
|
||
b.click("#btn-save-sssd-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, test_none_file)
|
||
# Revert to the previous config before testing to ensure test continuity
|
||
m.upload([save_file], conf_file_path)
|
||
# Check that the configs reflected the changes
|
||
conf = configparser.ConfigParser()
|
||
conf.read_file(open(test_some_file, "r"))
|
||
self.assertEqual(conf["session_recording"]["scope"], "some")
|
||
self.assertEqual(conf["session_recording"]["users"], "test users")
|
||
self.assertEqual(conf["session_recording"]["groups"], "test groups")
|
||
conf.read_file(open(test_all_file, "r"))
|
||
self.assertEqual(conf["session_recording"]["scope"], "all")
|
||
self.assertEqual(conf["session_recording"]["exclude_users"], "testuser1")
|
||
self.assertEqual(conf["session_recording"]["exclude_groups"], "testgroup1")
|
||
conf.read_file(open(test_none_file, "r"))
|
||
self.assertEqual(conf["session_recording"]["scope"], "none")
|
||
|
||
def testDisplayDrag(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# start playback and pause in middle
|
||
b.click("#player-play-pause")
|
||
b.wait_in_text(self._term_line(1), "localhost")
|
||
b.click("#player-play-pause")
|
||
# zoom in so that the whole screen is no longer visible
|
||
b.click("#player-zoom-in")
|
||
b.click("#player-zoom-in")
|
||
# select and ensure drag'n'pan mode
|
||
b.click("#player-drag-pan")
|
||
# scroll and check for screen movement
|
||
b.mouse(".dragnpan", "mousedown", 200, 200)
|
||
b.mouse(".dragnpan", "mousemove", 10, 10)
|
||
self.assertNotEqual(b.attr(".dragnpan", "scrollTop"), 0)
|
||
self.assertNotEqual(b.attr(".dragnpan", "scrollLeft"), 0)
|
||
|
||
def testLogCorrelation(self):
|
||
b, m = self._login()
|
||
# make sure system is on expected timezone EST
|
||
m.execute("timedatectl set-timezone America/New_York")
|
||
# select the recording with the extra logs
|
||
self._sel_rec('rec2')
|
||
b.click("#btn-logs-view .pf-c-expandable-section__toggle")
|
||
# fast forward until the end
|
||
while "exit" not in b.text(self._term_line(22)):
|
||
b.click("#player-skip-frame")
|
||
# check for extra log entries
|
||
b.wait_visible(".pf-c-data-list:contains('authentication failure')")
|
||
|
||
def testZoomSpeedControls(self):
|
||
b, m = self._login()
|
||
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
|
||
self._sel_rec('rec1')
|
||
# set speed x16 and begin playing
|
||
for _ in range(4):
|
||
b.click("#player-speed-up")
|
||
b.wait_visible(default_scale_sel)
|
||
b.click("#player-play-pause")
|
||
# wait until sleeping and zoom in
|
||
b.wait_in_text(self._term_line(8), "sleep")
|
||
b.click("#player-zoom-in")
|
||
b.wait_not_present(default_scale_sel)
|
||
# zoom out while typing fast
|
||
b.wait_in_text(self._term_line(9), "localhost")
|
||
b.click("#player-zoom-out")
|
||
b.wait_not_present(default_scale_sel)
|
||
|
||
def _filter(self, inp, occ_dict):
|
||
# ignore errors from half-entered timestamps due to searches occuring
|
||
# before `set_input_text` is complete
|
||
self.allow_journal_messages(".*timestamp.*")
|
||
# login and test inputs
|
||
b, _ = self._login()
|
||
time.sleep(5)
|
||
for occ in occ_dict:
|
||
for term in occ_dict[occ]:
|
||
# enter the search term and wait for the results to return
|
||
b.set_input_text(inp, term)
|
||
time.sleep(5)
|
||
self.assertEqual(b.text(".pf-c-table").count("contractor"), occ)
|
||
|
||
def testSearch(self):
|
||
self._filter(
|
||
"#filter-search",
|
||
{
|
||
0: {
|
||
"this should return nothing",
|
||
"this should also return nothing",
|
||
"0123456789",
|
||
},
|
||
1: {
|
||
"extra commands",
|
||
"whoami",
|
||
"ssh",
|
||
"thisisatest123",
|
||
"thisisanothertest456",
|
||
},
|
||
2: {
|
||
"id",
|
||
"localhost",
|
||
"exit",
|
||
"actor",
|
||
"contractor",
|
||
"contractor1@localhost",
|
||
},
|
||
},
|
||
)
|
||
|
||
def testFilterUsername(self):
|
||
self._filter(
|
||
"#filter-username",
|
||
{
|
||
0: {"test", "contact", "contractor", "contractor11", "contractor4"},
|
||
2: {"contractor1"},
|
||
},
|
||
)
|
||
|
||
def testFilterSince(self):
|
||
self._filter(
|
||
"#filter-since",
|
||
{
|
||
0: {"2020-06-02", "2020-06-01 12:31:00"},
|
||
1: {"2020-06-01 12:17:01", "2020-06-01 12:30:50"},
|
||
2: {"2020-06-01", "2020-06-01 12:17:00"},
|
||
},
|
||
)
|
||
|
||
def testFilterUntil(self):
|
||
self._filter(
|
||
"#filter-until",
|
||
{
|
||
0: {"2020-06-01", "2020-06-01 12:16"},
|
||
1: {"2020-06-01 12:17", "2020-06-01 12:29"},
|
||
2: {"2020-06-02", "2020-06-01 12:31:00"},
|
||
},
|
||
)
|
||
|
||
def testAppMenu(self):
|
||
srrow = ".app-list .pf-c-data-list__item-row:" \
|
||
"contains('Session Recording')"
|
||
srbut = "{} button:contains('Session Recording')" \
|
||
"".format(srrow)
|
||
b, _ = self._login("/apps", srrow)
|
||
self.allow_journal_messages(".*chromium-browser.appdata.xml.*",
|
||
".*xml.etree.ElementTree.ParseError:.*")
|
||
b.click(srbut)
|
||
b.enter_page("/session-recording")
|
||
b.wait_visible("#app")
|
||
|
||
if __name__ == "__main__":
|
||
testlib.test_main()
|