* Removed unused css files * Converted all UI elements to patternfly 4 * Implemented config page under same app * Replaced slider with patternfly `Progress` component
363 lines
13 KiB
Python
Executable file
363 lines
13 KiB
Python
Executable file
#!/usr/bin/python3
|
|
# Run this with --help to see available options for tracing and debugging
|
|
# See https://github.com/cockpit-project/cockpit/blob/master/test/common/testlib.py
|
|
# "class Browser" and "class MachineCase" for the available API.
|
|
|
|
import os
|
|
import sys
|
|
|
|
# import Cockpit's machinery for test VMs and its browser test API
|
|
TEST_DIR = os.path.dirname(__file__)
|
|
sys.path.append(os.path.join(TEST_DIR, "common"))
|
|
sys.path.append(os.path.join(os.path.dirname(TEST_DIR), "bots/machine"))
|
|
from testlib import *
|
|
|
|
# Test with pre-recorded journal with tlog UID 981
|
|
class TestApplication(MachineCase):
|
|
def _login(self):
|
|
self.login_and_go("/session-recording")
|
|
b = self.browser
|
|
m = self.machine
|
|
b.wait_present("#app")
|
|
return b, m
|
|
|
|
def _sel_rec(self, index=0):
|
|
page = (
|
|
"0f25700a28c44b599869745e5fda8b0c-7106-121e79"
|
|
if not index
|
|
else "0f25700a28c44b599869745e5fda8b0c-7623-135541"
|
|
)
|
|
self.browser.go(f"/session-recording#/{page}")
|
|
|
|
def _term_line(self, lineno):
|
|
return f".xterm-accessibility-tree div:nth-child({lineno})"
|
|
|
|
def testPlay(self):
|
|
b, _ = self._login()
|
|
self._sel_rec()
|
|
b.click("#player-play-pause")
|
|
b.wait_in_text(self._term_line(1), "localhost")
|
|
|
|
def testFastforwardControls(self):
|
|
progress = ".pf-c-progress__indicator"
|
|
|
|
b, _ = self._login()
|
|
self._sel_rec()
|
|
# fast forward
|
|
b.click("#player-fast-forward")
|
|
b.wait_in_text(self._term_line(12), "exit")
|
|
b.wait_attr(progress, "style", "width: 100%;")
|
|
# test restart playback
|
|
b.click("#player-restart")
|
|
b.wait_text(self._term_line(1), "Blank line")
|
|
b.wait_attr(progress, "style", "width: 100%;")
|
|
|
|
def testSpeedControls(self):
|
|
b, _ = self._login()
|
|
self._sel_rec()
|
|
# increase speed
|
|
b.wait_present("#player-speed-up")
|
|
b.click("#player-speed-up")
|
|
b.wait_text("#player-speed", "x2")
|
|
b.click("#player-speed-up")
|
|
b.wait_text("#player-speed", "x4")
|
|
b.click("#player-speed-up")
|
|
b.wait_text("#player-speed", "x8")
|
|
b.click("#player-speed-up")
|
|
b.wait_text("#player-speed", "x16")
|
|
# decrease speed
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "x8")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "x4")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "x2")
|
|
b.click("#player-speed-down")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/2")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/4")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/8")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/16")
|
|
# restore speed
|
|
b.click(".pf-c-chip .pf-c-button")
|
|
b.click("#player-speed-down")
|
|
b.wait_text("#player-speed", "/2")
|
|
|
|
def testZoomControls(self):
|
|
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
|
|
zoom_one_scale_sel = '.console-ct[style^="transform: scale(1.1)"]'
|
|
zoom_two_scale_sel = '.console-ct[style^="transform: scale(1.2)"]'
|
|
zoom_three_scale_sel = '.console-ct[style^="transform: scale(1.3)"]'
|
|
zoom_fit_to = (
|
|
'.console-ct[style*="translate(-50%, -50%)"]'
|
|
'[style*="top: 50%"]'
|
|
'[style*="left: 50%"]'
|
|
)
|
|
|
|
b, _ = self._login()
|
|
self._sel_rec()
|
|
# Wait for terminal with scale(1)
|
|
b.wait_present(default_scale_sel)
|
|
# Zoom in x3
|
|
b.click("#player-zoom-in")
|
|
b.wait_present(zoom_one_scale_sel)
|
|
b.click("#player-zoom-in")
|
|
b.wait_present(zoom_two_scale_sel)
|
|
b.click("#player-zoom-in")
|
|
b.wait_present(zoom_three_scale_sel)
|
|
# Zoom Out
|
|
b.click("#player-zoom-out")
|
|
b.wait_present(zoom_two_scale_sel)
|
|
# Fit zoom to screen
|
|
b.click("#player-fit-to")
|
|
b.wait_present(zoom_fit_to)
|
|
|
|
def testSkipFrame(self):
|
|
b, _ = self._login()
|
|
self._sel_rec()
|
|
b.wait_present(self._term_line(1))
|
|
# loop until 3 valid frames have passed
|
|
while "localhost" not in b.text(self._term_line(1)):
|
|
b.click("#player-skip-frame")
|
|
b.wait_in_text(self._term_line(1), "localhost")
|
|
|
|
def testPlaybackPause(self):
|
|
import time
|
|
|
|
b, _ = self._login()
|
|
self._sel_rec()
|
|
# Start and pause the player
|
|
b.click("#player-restart")
|
|
b.click("#player-play-pause")
|
|
b.click("#player-play-pause")
|
|
time.sleep(10)
|
|
# Make sure it didn't keep playing
|
|
b.wait_not_in_text(self._term_line(1), "whoami")
|
|
# Test if it can start playing again
|
|
b.click("#player-play-pause")
|
|
|
|
def testSessionRecordingConf(self):
|
|
import time, json, configparser
|
|
|
|
b, m = self._login()
|
|
b.click("#btn-config")
|
|
|
|
# TLOG config
|
|
conf_file_path = "/etc/tlog/"
|
|
conf_file = f"{conf_file_path}tlog-rec-session.conf"
|
|
save_file = "/tmp/tlog-rec-session.conf"
|
|
test_file = "/tmp/test-tlog-rec-session.conf"
|
|
|
|
# Save the existing config
|
|
b.click("#btn-save-tlog-conf")
|
|
m.download(conf_file, save_file)
|
|
# Change all of the fields
|
|
b.set_input_text("#shell", "/test/path/shell")
|
|
b.set_input_text("#notice", "Test Notice")
|
|
b.set_input_text("#latency", "1")
|
|
b.set_input_text("#payload", "2")
|
|
b.set_checked("#log_input", True)
|
|
b.set_checked("#log_output", False)
|
|
b.set_checked("#log_window", False)
|
|
b.set_input_text("#limit_rate", "3")
|
|
b.set_input_text("#limit_burst", "4")
|
|
b.set_val("#limit_action", "drop")
|
|
b.set_input_text("#file_path", "/test/path/file")
|
|
b.set_input_text("#syslog_facility", "testfac")
|
|
b.set_val("#syslog_priority", "info")
|
|
b.set_val("#journal_priority", "info")
|
|
b.set_checked("#journal_augment", False)
|
|
b.set_val("#writer", "file")
|
|
b.click("#btn-save-tlog-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, test_file)
|
|
# Revert to the previous config before testing to ensure test continuity
|
|
m.upload([save_file], conf_file_path)
|
|
# Check that the config reflects the changes
|
|
conf = json.load(open(test_file, "r"))
|
|
self.assertEqual(
|
|
json.dumps(conf),
|
|
json.dumps(
|
|
{
|
|
"shell": "/test/path/shell",
|
|
"notice": "Test Notice",
|
|
"latency": 1,
|
|
"payload": 2,
|
|
"log": {"input": True, "output": False, "window": False},
|
|
"limit": {"rate": 3, "burst": 4, "action": "drop"},
|
|
"file": {"path": "/test/path/file"},
|
|
"syslog": {"facility": "testfac", "priority": "info"},
|
|
"journal": {"priority": "info", "augment": False},
|
|
"writer": "file",
|
|
}
|
|
),
|
|
)
|
|
|
|
# SSSD config
|
|
conf_file_path = "/etc/sssd/conf.d/"
|
|
conf_file = f"{conf_file_path}sssd-session-recording.conf"
|
|
save_file = "/tmp/sssd-session-recording.conf"
|
|
test_none_file = "/tmp/test-none-sssd-session-recording.conf"
|
|
test_some_file = "/tmp/test-some-sssd-session-recording.conf"
|
|
test_all_file = "/tmp/test-all-sssd-session-recording.conf"
|
|
|
|
# Save the existing config
|
|
b.click("#btn-save-sssd-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, save_file)
|
|
# Download test with scope 'None'
|
|
b.set_val("#scope", "none")
|
|
b.click("#btn-save-sssd-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, test_none_file)
|
|
# Download test with scope 'Some'
|
|
b.set_val("#scope", "some")
|
|
b.set_input_text("#users", "test users")
|
|
b.set_input_text("#groups", "test groups")
|
|
b.click("#btn-save-sssd-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, test_some_file)
|
|
# Download test with scope 'All'
|
|
b.set_val("#scope", "all")
|
|
b.click("#btn-save-sssd-conf")
|
|
time.sleep(1)
|
|
m.download(conf_file, test_all_file)
|
|
# Revert to the previous config before testing to ensure test continuity
|
|
m.upload([save_file], conf_file_path)
|
|
# Check that the configs reflected the changes
|
|
conf = configparser.ConfigParser()
|
|
conf.read_file(open(test_none_file, "r"))
|
|
self.assertEqual(conf["session_recording"]["scope"], "none")
|
|
conf.read_file(open(test_some_file, "r"))
|
|
self.assertEqual(conf["session_recording"]["scope"], "some")
|
|
self.assertEqual(conf["session_recording"]["users"], "test users")
|
|
self.assertEqual(conf["session_recording"]["groups"], "test groups")
|
|
conf.read_file(open(test_all_file, "r"))
|
|
self.assertEqual(conf["session_recording"]["scope"], "all")
|
|
|
|
def testDisplayDrag(self):
|
|
b, _ = self._login()
|
|
self._sel_rec()
|
|
# start playback and pause in middle
|
|
b.click("#player-play-pause")
|
|
b.wait_in_text(self._term_line(1), "localhost")
|
|
b.click("#player-play-pause")
|
|
# zoom in so that the whole screen is no longer visible
|
|
b.click("#player-zoom-in")
|
|
b.click("#player-zoom-in")
|
|
# select and ensure drag'n'pan mode
|
|
b.click("#player-drag-pan")
|
|
# scroll and check for screen movement
|
|
b.mouse(".dragnpan", "mousedown", 200, 200)
|
|
b.mouse(".dragnpan", "mousemove", 10, 10)
|
|
self.assertNotEqual(b.attr(".dragnpan", "scrollTop"), 0)
|
|
self.assertNotEqual(b.attr(".dragnpan", "scrollLeft"), 0)
|
|
|
|
def testLogCorrelation(self):
|
|
b, _ = self._login()
|
|
# select the recording with the extra logs
|
|
self._sel_rec(1)
|
|
b.click("#btn-logs-view")
|
|
# fast forward until the end
|
|
while "exit" not in b.text(self._term_line(22)):
|
|
b.click("#player-skip-frame")
|
|
# check for extra log entries
|
|
b.wait_present(".pf-c-data-list:contains('authentication failure')")
|
|
|
|
def testZoomSpeedControls(self):
|
|
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
|
|
play_scale_sel = '.console-ct[style^="transform: scale(0.4"]'
|
|
zoom_one_scale_sel = '.console-ct[style^="transform: scale(0.5"]'
|
|
b, _ = self._login()
|
|
self._sel_rec()
|
|
# set speed x16 and begin playing, expecting a size adjustment
|
|
for _ in range(4):
|
|
b.click("#player-speed-up")
|
|
b.wait_present(default_scale_sel)
|
|
b.click("#player-play-pause")
|
|
b.wait_present(play_scale_sel)
|
|
# wait until sleeping and zoom in
|
|
b.wait_in_text(self._term_line(8), "sleep")
|
|
b.click("#player-zoom-in")
|
|
b.wait_present(zoom_one_scale_sel)
|
|
# zoom out while typing fast
|
|
b.wait_in_text(self._term_line(9), "localhost")
|
|
b.click("#player-zoom-out")
|
|
b.wait_present(play_scale_sel)
|
|
|
|
def _filter(self, inp, occ_dict):
|
|
import time
|
|
|
|
# allow temporary timestamp failures
|
|
self.allow_journal_messages(".*timestamp.*")
|
|
# login and test inputs
|
|
b, _ = self._login()
|
|
for occ in occ_dict:
|
|
for term in occ_dict[occ]:
|
|
# enter the search term and wait for the results to return
|
|
b.set_input_text(inp, term)
|
|
time.sleep(2)
|
|
self.assertEqual(b.text(".pf-c-table").count("contractor"), occ)
|
|
|
|
def testSearch(self):
|
|
self._filter(
|
|
"#filter-search",
|
|
{
|
|
0: {
|
|
"this should return nothing",
|
|
"this should also return nothing",
|
|
"0123456789",
|
|
},
|
|
1: {
|
|
"extra commands",
|
|
"whoami",
|
|
"ssh",
|
|
"thisisatest123",
|
|
"thisisanothertest456",
|
|
},
|
|
2: {
|
|
"id",
|
|
"localhost",
|
|
"exit",
|
|
"actor",
|
|
"contractor",
|
|
"contractor1@localhost",
|
|
},
|
|
},
|
|
)
|
|
|
|
def testFilterUsername(self):
|
|
self._filter(
|
|
"#filter-username",
|
|
{
|
|
0: {"test", "contact", "contractor", "contractor11", "contractor4"},
|
|
2: {"contractor1"},
|
|
},
|
|
)
|
|
|
|
def testFilterSince(self):
|
|
self._filter(
|
|
"#filter-since",
|
|
{
|
|
0: {"2020-06-02", "2020-06-01 12:31:00"},
|
|
1: {"2020-06-01 12:17:01", "2020-06-01 12:30:50"},
|
|
2: {"2020-06-01", "2020-06-01 12:17:00"},
|
|
},
|
|
)
|
|
|
|
def testFilterUntil(self):
|
|
self._filter(
|
|
"#filter-until",
|
|
{
|
|
0: {"2020-06-01", "2020-06-01 12:16"},
|
|
1: {"2020-06-01 12:17", "2020-06-01 12:29"},
|
|
2: {"2020-06-02", "2020-06-01 12:31:00"},
|
|
},
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|