411 lines
15 KiB
Python
Executable file
411 lines
15 KiB
Python
Executable file
#!/usr/bin/python3
|
||
# Run this with --help to see available options for tracing and debugging
|
||
# See https://github.com/cockpit-project/cockpit/blob/master/test/common/testlib.py
|
||
# "class Browser" and "class MachineCase" for the available API.
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import json
|
||
import configparser
|
||
|
||
# import Cockpit's machinery for test VMs and its browser test API
|
||
TEST_DIR = os.path.dirname(__file__)
|
||
sys.path.append(os.path.join(TEST_DIR, "common"))
|
||
sys.path.append(os.path.join(os.path.dirname(TEST_DIR), "bots/machine"))
|
||
import testlib
|
||
|
||
# Nondestructive tests all run in the same running VM. This allows them to run in Packit, Fedora, and RHEL dist-git gating
|
||
# They must not permanently change any file or configuration on the system in a way that influences other tests.
|
||
@testlib.nondestructive
|
||
class TestApplication(testlib.MachineCase):
|
||
def _login(self, loc="/session-recording", wait="#app"):
|
||
self.login_and_go(loc)
|
||
b = self.browser
|
||
m = self.machine
|
||
b.wait_visible(wait)
|
||
self.allow_journal_messages('.*type=1400.*avc: denied .* comm="systemctl".*')
|
||
return b, m
|
||
|
||
def _sel_rec(self, recording):
|
||
'''
|
||
rec1:
|
||
whoami
|
||
id
|
||
echo thisisatest123
|
||
sleep 16
|
||
echo thisisanothertest456
|
||
exit
|
||
|
||
rec2:
|
||
echo "Extra Commands"
|
||
sudo systemctl daemon-reload
|
||
sudo ssh root@localhost
|
||
exit
|
||
|
||
binaryrec:
|
||
cat /usr/bin/gzip
|
||
'''
|
||
recordings = {'rec1': '0f25700a28c44b599869745e5fda8b0c-7106-121e79',
|
||
'rec2': '0f25700a28c44b599869745e5fda8b0c-7623-135541',
|
||
'binaryrec': '6c652ee938b3485894dbacbb8c7c2c61-5a4-38a5'}
|
||
|
||
page = recordings[recording]
|
||
|
||
self.browser.go(f"/session-recording#/{page}")
|
||
|
||
def _term_line(self, lineno):
|
||
return f".xterm-accessibility-tree div:nth-child({lineno})"
|
||
|
||
def testPlay(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
b.click("#player-play-pause")
|
||
b.wait_in_text(self._term_line(1), "localhost")
|
||
|
||
def testPlayBinary(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('binaryrec')
|
||
b.click("#player-play-pause")
|
||
time.sleep(5)
|
||
b.wait_in_text(self._term_line(21), "<EFBFBD>")
|
||
b.wait_in_text(self._term_line(25), "exit")
|
||
|
||
def testFastforwardControls(self):
|
||
progress = ".pf-c-progress__indicator"
|
||
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# fast forward
|
||
b.click("#player-fast-forward")
|
||
b.wait_in_text(self._term_line(12), "exit")
|
||
b.wait_attr(progress, "style", "width: 100%;")
|
||
# test restart playback
|
||
b.click("#player-restart")
|
||
b.click("#player-play-pause")
|
||
b.wait_text(self._term_line(7), "thisisatest123")
|
||
with b.wait_timeout(100):
|
||
b.wait_attr(progress, "style", "width: 100%;")
|
||
|
||
def testSpeedControls(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# increase speed
|
||
b.wait_visible("#player-speed-up")
|
||
b.click("#player-speed-up")
|
||
b.wait_text("#player-speed", "x2")
|
||
b.click("#player-speed-up")
|
||
b.wait_text("#player-speed", "x4")
|
||
b.click("#player-speed-up")
|
||
b.wait_text("#player-speed", "x8")
|
||
b.click("#player-speed-up")
|
||
b.wait_text("#player-speed", "x16")
|
||
# decrease speed
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "x8")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "x4")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "x2")
|
||
b.click("#player-speed-down")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/2")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/4")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/8")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/16")
|
||
# restore speed
|
||
b.click(".pf-c-chip .pf-c-button")
|
||
b.click("#player-speed-down")
|
||
b.wait_text("#player-speed", "/2")
|
||
|
||
def testZoomControls(self):
|
||
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
|
||
zoom_one_scale_sel = '.console-ct[style^="transform: scale(1.1)"]'
|
||
zoom_two_scale_sel = '.console-ct[style^="transform: scale(1.2)"]'
|
||
zoom_three_scale_sel = '.console-ct[style^="transform: scale(1.3)"]'
|
||
zoom_fit_to = (
|
||
'.console-ct[style*="translate(-50%, -50%)"]'
|
||
'[style*="top: 50%"]'
|
||
'[style*="left: 50%"]'
|
||
)
|
||
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# Wait for terminal with scale(1)
|
||
b.wait_visible(default_scale_sel)
|
||
# Zoom in x3
|
||
b.click("#player-zoom-in")
|
||
b.wait_visible(zoom_one_scale_sel)
|
||
b.click("#player-zoom-in")
|
||
b.wait_visible(zoom_two_scale_sel)
|
||
b.click("#player-zoom-in")
|
||
b.wait_visible(zoom_three_scale_sel)
|
||
# Zoom Out
|
||
b.click("#player-zoom-out")
|
||
b.wait_visible(zoom_two_scale_sel)
|
||
# Fit zoom to screen
|
||
b.click("#player-fit-to")
|
||
b.wait_visible(zoom_fit_to)
|
||
|
||
def testSkipFrame(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
b.wait_visible(self._term_line(1))
|
||
# loop until 3 valid frames have passed
|
||
while "localhost" not in b.text(self._term_line(1)):
|
||
b.click("#player-skip-frame")
|
||
b.wait_in_text(self._term_line(1), "localhost")
|
||
|
||
def testPlaybackPause(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# Start and pause the player
|
||
b.click("#player-restart")
|
||
b.click("#player-play-pause")
|
||
b.click("#player-play-pause")
|
||
time.sleep(10)
|
||
# Make sure it didn't keep playing
|
||
b.wait_not_in_text(self._term_line(6), "thisisatest123")
|
||
# Test if it can start playing again
|
||
b.click("#player-play-pause")
|
||
|
||
def testSessionRecordingConf(self):
|
||
b, m = self._login()
|
||
b.click("#btn-config")
|
||
|
||
# TLOG config
|
||
conf_file_path = "/etc/tlog/"
|
||
conf_file = f"{conf_file_path}tlog-rec-session.conf"
|
||
save_file = "/tmp/tlog-rec-session.conf"
|
||
test_file = "/tmp/test-tlog-rec-session.conf"
|
||
|
||
# Save the existing config
|
||
b.click("#btn-save-tlog-conf")
|
||
m.download(conf_file, save_file)
|
||
# Change all of the fields
|
||
b.set_input_text("#shell", "/test/path/shell")
|
||
b.set_input_text("#notice", "Test Notice")
|
||
b.set_input_text("#latency", "1")
|
||
b.set_input_text("#payload", "2")
|
||
b.set_checked("#log_input", True)
|
||
b.set_checked("#log_output", False)
|
||
b.set_checked("#log_window", False)
|
||
b.set_input_text("#limit_rate", "3")
|
||
b.set_input_text("#limit_burst", "4")
|
||
b.set_val("#limit_action", "drop")
|
||
b.set_input_text("#file_path", "/test/path/file")
|
||
b.set_input_text("#syslog_facility", "testfac")
|
||
b.set_val("#syslog_priority", "info")
|
||
b.set_val("#journal_priority", "info")
|
||
b.set_checked("#journal_augment", False)
|
||
b.set_val("#writer", "file")
|
||
b.click("#btn-save-tlog-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, test_file)
|
||
# Revert to the previous config before testing to ensure test continuity
|
||
m.upload([save_file], conf_file_path)
|
||
# Check that the config reflects the changes
|
||
conf = json.load(open(test_file, "r"))
|
||
self.assertEqual(
|
||
json.dumps(conf),
|
||
json.dumps(
|
||
{
|
||
"shell": "/test/path/shell",
|
||
"notice": "Test Notice",
|
||
"latency": 1,
|
||
"payload": 2,
|
||
"log": {"input": True, "output": False, "window": False},
|
||
"limit": {"rate": 3, "burst": 4, "action": "drop"},
|
||
"file": {"path": "/test/path/file"},
|
||
"syslog": {"facility": "testfac", "priority": "info"},
|
||
"journal": {"priority": "info", "augment": False},
|
||
"writer": "file",
|
||
}
|
||
),
|
||
)
|
||
|
||
# SSSD config
|
||
conf_file_path = "/etc/sssd/conf.d/"
|
||
conf_file = f"{conf_file_path}sssd-session-recording.conf"
|
||
save_file = "/tmp/sssd-session-recording.conf"
|
||
test_none_file = "/tmp/test-none-sssd-session-recording.conf"
|
||
test_some_file = "/tmp/test-some-sssd-session-recording.conf"
|
||
test_all_file = "/tmp/test-all-sssd-session-recording.conf"
|
||
|
||
# Save the existing config
|
||
b.click("#btn-save-sssd-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, save_file)
|
||
# Download test with scope 'Some'
|
||
b.set_val("#scope", "some")
|
||
b.set_input_text("#users", "test users")
|
||
b.set_input_text("#groups", "test groups")
|
||
b.click("#btn-save-sssd-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, test_some_file)
|
||
# Download test with scope 'All'
|
||
b.set_val("#scope", "all")
|
||
b.set_input_text("#exclude_users", "testuser1")
|
||
b.set_input_text("#exclude_groups", "testgroup1")
|
||
b.click("#btn-save-sssd-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, test_all_file)
|
||
# Download test with scope 'None'
|
||
b.set_val("#scope", "none")
|
||
b.click("#btn-save-sssd-conf")
|
||
time.sleep(1)
|
||
m.download(conf_file, test_none_file)
|
||
# Revert to the previous config before testing to ensure test continuity
|
||
m.upload([save_file], conf_file_path)
|
||
# Check that the configs reflected the changes
|
||
conf = configparser.ConfigParser()
|
||
conf.read_file(open(test_some_file, "r"))
|
||
self.assertEqual(conf["session_recording"]["scope"], "some")
|
||
self.assertEqual(conf["session_recording"]["users"], "test users")
|
||
self.assertEqual(conf["session_recording"]["groups"], "test groups")
|
||
conf.read_file(open(test_all_file, "r"))
|
||
self.assertEqual(conf["session_recording"]["scope"], "all")
|
||
self.assertEqual(conf["session_recording"]["exclude_users"], "testuser1")
|
||
self.assertEqual(conf["session_recording"]["exclude_groups"], "testgroup1")
|
||
conf.read_file(open(test_none_file, "r"))
|
||
self.assertEqual(conf["session_recording"]["scope"], "none")
|
||
|
||
def testDisplayDrag(self):
|
||
b, _ = self._login()
|
||
self._sel_rec('rec1')
|
||
# start playback and pause in middle
|
||
b.click("#player-play-pause")
|
||
b.wait_in_text(self._term_line(1), "localhost")
|
||
b.click("#player-play-pause")
|
||
# zoom in so that the whole screen is no longer visible
|
||
b.click("#player-zoom-in")
|
||
b.click("#player-zoom-in")
|
||
# select and ensure drag'n'pan mode
|
||
b.click("#player-drag-pan")
|
||
# scroll and check for screen movement
|
||
b.mouse(".dragnpan", "mousedown", 200, 200)
|
||
b.mouse(".dragnpan", "mousemove", 10, 10)
|
||
self.assertNotEqual(b.attr(".dragnpan", "scrollTop"), 0)
|
||
self.assertNotEqual(b.attr(".dragnpan", "scrollLeft"), 0)
|
||
|
||
def testLogCorrelation(self):
|
||
b, m = self._login()
|
||
# make sure system is on expected timezone EST
|
||
m.execute("timedatectl set-timezone America/New_York")
|
||
# select the recording with the extra logs
|
||
self._sel_rec('rec2')
|
||
b.click("#btn-logs-view .pf-c-expandable-section__toggle")
|
||
# fast forward until the end
|
||
while "exit" not in b.text(self._term_line(22)):
|
||
b.click("#player-skip-frame")
|
||
# check for extra log entries
|
||
b.wait_visible(".pf-c-data-list:contains('authentication failure')")
|
||
|
||
def testZoomSpeedControls(self):
|
||
b, m = self._login()
|
||
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
|
||
self._sel_rec('rec1')
|
||
# set speed x16 and begin playing
|
||
for _ in range(4):
|
||
b.click("#player-speed-up")
|
||
b.wait_visible(default_scale_sel)
|
||
b.click("#player-play-pause")
|
||
# wait until sleeping and zoom in
|
||
b.wait_in_text(self._term_line(8), "sleep")
|
||
b.click("#player-zoom-in")
|
||
b.wait_not_present(default_scale_sel)
|
||
# zoom out while typing fast
|
||
b.wait_in_text(self._term_line(9), "localhost")
|
||
b.click("#player-zoom-out")
|
||
b.wait_not_present(default_scale_sel)
|
||
|
||
def _filter(self, inp, occ_dict):
|
||
m = self.machine
|
||
|
||
m.execute("timedatectl set-timezone America/New_York")
|
||
# ignore errors from half-entered timestamps due to searches occuring
|
||
# before `set_input_text` is complete
|
||
self.allow_journal_messages(".*timestamp.*")
|
||
# login and test inputs
|
||
b, _ = self._login()
|
||
time.sleep(5)
|
||
for occ in occ_dict:
|
||
for term in occ_dict[occ]:
|
||
# enter the search term and wait for the results to return
|
||
b.set_input_text(inp, term)
|
||
time.sleep(5)
|
||
self.assertEqual(b.text(".pf-c-table").count("contractor"), occ)
|
||
|
||
def testSearch(self):
|
||
self._filter(
|
||
"#filter-search",
|
||
{
|
||
0: {
|
||
"this should return nothing",
|
||
"this should also return nothing",
|
||
"0123456789",
|
||
},
|
||
1: {
|
||
"extra commands",
|
||
"whoami",
|
||
"ssh",
|
||
"thisisatest123",
|
||
"thisisanothertest456",
|
||
},
|
||
2: {
|
||
"id",
|
||
"localhost",
|
||
"exit",
|
||
"actor",
|
||
"contractor",
|
||
"contractor1@localhost",
|
||
},
|
||
},
|
||
)
|
||
|
||
def testFilterUsername(self):
|
||
self._filter(
|
||
"#filter-username",
|
||
{
|
||
0: {"test", "contact", "contractor", "contractor11", "contractor4"},
|
||
2: {"contractor1"},
|
||
},
|
||
)
|
||
|
||
def testFilterSince(self):
|
||
self._filter(
|
||
"#filter-since",
|
||
{
|
||
0: {"2020-06-02", "2020-06-01 12:31:00"},
|
||
1: {"2020-06-01 12:17:01", "2020-06-01 12:30:50"},
|
||
2: {"2020-06-01", "2020-06-01 12:17:00"},
|
||
},
|
||
)
|
||
|
||
def testFilterUntil(self):
|
||
self._filter(
|
||
"#filter-until",
|
||
{
|
||
0: {"2020-06-01", "2020-06-01 12:16"},
|
||
1: {"2020-06-01 12:17", "2020-06-01 12:29"},
|
||
2: {"2020-06-02", "2020-06-01 12:31:00"},
|
||
},
|
||
)
|
||
|
||
def testAppMenu(self):
|
||
srrow = ".app-list .pf-c-data-list__item-row:" \
|
||
"contains('Session Recording')"
|
||
srbut = "{} button:contains('Session Recording')" \
|
||
"".format(srrow)
|
||
b, _ = self._login("/apps", srrow)
|
||
self.allow_journal_messages(".*chromium-browser.appdata.xml.*",
|
||
".*xml.etree.ElementTree.ParseError:.*")
|
||
b.click(srbut)
|
||
b.enter_page("/session-recording")
|
||
b.wait_visible("#app")
|
||
|
||
if __name__ == "__main__":
|
||
testlib.test_main()
|