starter-kit/test/check-application
Justin Stephenson 88a167a89a Tests: Assert pause state with a later command in rec1
On some systems, the 'whoami' command may still show in the terminal output
if the pause does not happen fast enough.
2022-04-19 15:53:33 -04:00

411 lines
15 KiB
Python
Executable file
Raw Blame History

#!/usr/bin/python3
# Run this with --help to see available options for tracing and debugging
# See https://github.com/cockpit-project/cockpit/blob/master/test/common/testlib.py
# "class Browser" and "class MachineCase" for the available API.
import os
import sys
import time
import json
import configparser
# import Cockpit's machinery for test VMs and its browser test API
TEST_DIR = os.path.dirname(__file__)
sys.path.append(os.path.join(TEST_DIR, "common"))
sys.path.append(os.path.join(os.path.dirname(TEST_DIR), "bots/machine"))
from testlib import *
# Test with pre-recorded journal with tlog UID 981
class TestApplication(MachineCase):
def _login(self, loc="/session-recording", wait="#app"):
self.login_and_go(loc)
b = self.browser
m = self.machine
b.wait_present(wait)
self.allow_journal_messages('.*type=1400.*avc: denied .* comm="systemctl".*')
return b, m
def _sel_rec(self, recording):
'''
rec1:
whoami
id
echo thisisatest123
sleep 16
echo thisisanothertest456
exit
rec2:
echo "Extra Commands"
sudo systemctl daemon-reload
sudo ssh root@localhost
exit
binaryrec:
cat /usr/bin/gzip
'''
recordings = {'rec1': '0f25700a28c44b599869745e5fda8b0c-7106-121e79',
'rec2': '0f25700a28c44b599869745e5fda8b0c-7623-135541',
'binaryrec': '6c652ee938b3485894dbacbb8c7c2c61-5a4-38a5'}
page = recordings[recording]
self.browser.go(f"/session-recording#/{page}")
def _term_line(self, lineno):
return f".xterm-accessibility-tree div:nth-child({lineno})"
def testPlay(self):
b, _ = self._login()
self._sel_rec('rec1')
b.click("#player-play-pause")
b.wait_in_text(self._term_line(1), "localhost")
def testPlayBinary(self):
b, _ = self._login()
self._sel_rec('binaryrec')
b.click("#player-play-pause")
time.sleep(5)
b.wait_in_text(self._term_line(21), "<EFBFBD>")
b.wait_in_text(self._term_line(25), "exit")
def testFastforwardControls(self):
progress = ".pf-c-progress__indicator"
b, _ = self._login()
self._sel_rec('rec1')
# fast forward
b.click("#player-fast-forward")
b.wait_in_text(self._term_line(12), "exit")
b.wait_attr(progress, "style", "width: 100%;")
# test restart playback
b.click("#player-restart")
b.wait_text(self._term_line(1), "Blank line")
b.wait_attr(progress, "style", "width: 100%;")
def testSpeedControls(self):
b, _ = self._login()
self._sel_rec('rec1')
# increase speed
b.wait_present("#player-speed-up")
b.click("#player-speed-up")
b.wait_text("#player-speed", "x2")
b.click("#player-speed-up")
b.wait_text("#player-speed", "x4")
b.click("#player-speed-up")
b.wait_text("#player-speed", "x8")
b.click("#player-speed-up")
b.wait_text("#player-speed", "x16")
# decrease speed
b.click("#player-speed-down")
b.wait_text("#player-speed", "x8")
b.click("#player-speed-down")
b.wait_text("#player-speed", "x4")
b.click("#player-speed-down")
b.wait_text("#player-speed", "x2")
b.click("#player-speed-down")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/2")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/4")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/8")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/16")
# restore speed
b.click(".pf-c-chip .pf-c-button")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/2")
def testZoomControls(self):
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
zoom_one_scale_sel = '.console-ct[style^="transform: scale(1.1)"]'
zoom_two_scale_sel = '.console-ct[style^="transform: scale(1.2)"]'
zoom_three_scale_sel = '.console-ct[style^="transform: scale(1.3)"]'
zoom_fit_to = (
'.console-ct[style*="translate(-50%, -50%)"]'
'[style*="top: 50%"]'
'[style*="left: 50%"]'
)
b, _ = self._login()
self._sel_rec('rec1')
# Wait for terminal with scale(1)
b.wait_present(default_scale_sel)
# Zoom in x3
b.click("#player-zoom-in")
b.wait_present(zoom_one_scale_sel)
b.click("#player-zoom-in")
b.wait_present(zoom_two_scale_sel)
b.click("#player-zoom-in")
b.wait_present(zoom_three_scale_sel)
# Zoom Out
b.click("#player-zoom-out")
b.wait_present(zoom_two_scale_sel)
# Fit zoom to screen
b.click("#player-fit-to")
b.wait_present(zoom_fit_to)
def testSkipFrame(self):
b, _ = self._login()
self._sel_rec('rec1')
b.wait_present(self._term_line(1))
# loop until 3 valid frames have passed
while "localhost" not in b.text(self._term_line(1)):
b.click("#player-skip-frame")
b.wait_in_text(self._term_line(1), "localhost")
def testPlaybackPause(self):
b, _ = self._login()
self._sel_rec('rec1')
# Start and pause the player
b.click("#player-restart")
b.click("#player-play-pause")
b.click("#player-play-pause")
time.sleep(10)
# Make sure it didn't keep playing
b.wait_not_in_text(self._term_line(6), "thisisatest123")
# Test if it can start playing again
b.click("#player-play-pause")
def testSessionRecordingConf(self):
b, m = self._login()
b.click("#btn-config")
# TLOG config
conf_file_path = "/etc/tlog/"
conf_file = f"{conf_file_path}tlog-rec-session.conf"
save_file = "/tmp/tlog-rec-session.conf"
test_file = "/tmp/test-tlog-rec-session.conf"
# Save the existing config
b.click("#btn-save-tlog-conf")
m.download(conf_file, save_file)
# Change all of the fields
b.set_input_text("#shell", "/test/path/shell")
b.set_input_text("#notice", "Test Notice")
b.set_input_text("#latency", "1")
b.set_input_text("#payload", "2")
b.set_checked("#log_input", True)
b.set_checked("#log_output", False)
b.set_checked("#log_window", False)
b.set_input_text("#limit_rate", "3")
b.set_input_text("#limit_burst", "4")
b.set_val("#limit_action", "drop")
b.set_input_text("#file_path", "/test/path/file")
b.set_input_text("#syslog_facility", "testfac")
b.set_val("#syslog_priority", "info")
b.set_val("#journal_priority", "info")
b.set_checked("#journal_augment", False)
b.set_val("#writer", "file")
b.click("#btn-save-tlog-conf")
time.sleep(1)
m.download(conf_file, test_file)
# Revert to the previous config before testing to ensure test continuity
m.upload([save_file], conf_file_path)
# Check that the config reflects the changes
conf = json.load(open(test_file, "r"))
self.assertEqual(
json.dumps(conf),
json.dumps(
{
"shell": "/test/path/shell",
"notice": "Test Notice",
"latency": 1,
"payload": 2,
"log": {"input": True, "output": False, "window": False},
"limit": {"rate": 3, "burst": 4, "action": "drop"},
"file": {"path": "/test/path/file"},
"syslog": {"facility": "testfac", "priority": "info"},
"journal": {"priority": "info", "augment": False},
"writer": "file",
}
),
)
# SSSD config
conf_file_path = "/etc/sssd/conf.d/"
conf_file = f"{conf_file_path}sssd-session-recording.conf"
save_file = "/tmp/sssd-session-recording.conf"
test_none_file = "/tmp/test-none-sssd-session-recording.conf"
test_some_file = "/tmp/test-some-sssd-session-recording.conf"
test_all_file = "/tmp/test-all-sssd-session-recording.conf"
# Save the existing config
b.click("#btn-save-sssd-conf")
time.sleep(1)
m.download(conf_file, save_file)
# Download test with scope 'Some'
b.set_val("#scope", "some")
b.set_input_text("#users", "test users")
b.set_input_text("#groups", "test groups")
b.click("#btn-save-sssd-conf")
time.sleep(1)
m.download(conf_file, test_some_file)
# Download test with scope 'All'
b.set_val("#scope", "all")
b.set_input_text("#exclude_users", "testuser1")
b.set_input_text("#exclude_groups", "testgroup1")
b.click("#btn-save-sssd-conf")
time.sleep(1)
m.download(conf_file, test_all_file)
# Download test with scope 'None'
b.set_val("#scope", "none")
b.click("#btn-save-sssd-conf")
time.sleep(1)
m.download(conf_file, test_none_file)
# Revert to the previous config before testing to ensure test continuity
m.upload([save_file], conf_file_path)
# Check that the configs reflected the changes
conf = configparser.ConfigParser()
conf.read_file(open(test_some_file, "r"))
self.assertEqual(conf["session_recording"]["scope"], "some")
self.assertEqual(conf["session_recording"]["users"], "test users")
self.assertEqual(conf["session_recording"]["groups"], "test groups")
conf.read_file(open(test_all_file, "r"))
self.assertEqual(conf["session_recording"]["scope"], "all")
self.assertEqual(conf["session_recording"]["exclude_users"], "testuser1")
self.assertEqual(conf["session_recording"]["exclude_groups"], "testgroup1")
conf.read_file(open(test_none_file, "r"))
self.assertEqual(conf["session_recording"]["scope"], "none")
def testDisplayDrag(self):
b, _ = self._login()
self._sel_rec('rec1')
# start playback and pause in middle
b.click("#player-play-pause")
b.wait_in_text(self._term_line(1), "localhost")
b.click("#player-play-pause")
# zoom in so that the whole screen is no longer visible
b.click("#player-zoom-in")
b.click("#player-zoom-in")
# select and ensure drag'n'pan mode
b.click("#player-drag-pan")
# scroll and check for screen movement
b.mouse(".dragnpan", "mousedown", 200, 200)
b.mouse(".dragnpan", "mousemove", 10, 10)
self.assertNotEqual(b.attr(".dragnpan", "scrollTop"), 0)
self.assertNotEqual(b.attr(".dragnpan", "scrollLeft"), 0)
def testLogCorrelation(self):
b, m = self._login()
# make sure system is on expected timezone EST
m.execute("timedatectl set-timezone America/New_York")
# select the recording with the extra logs
self._sel_rec('rec2')
b.click("#btn-logs-view .pf-c-expandable-section__toggle")
# fast forward until the end
while "exit" not in b.text(self._term_line(22)):
b.click("#player-skip-frame")
# check for extra log entries
b.wait_present(".pf-c-data-list:contains('authentication failure')")
def testZoomSpeedControls(self):
b, m = self._login()
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
if m.image.startswith('rhel') or m.image.startswith('centos'):
play_scale_sel = '.console-ct[style^="transform: scale(0.3"]'
zoom_one_scale_sel = '.console-ct[style^="transform: scale(0.4"]'
else:
play_scale_sel = '.console-ct[style^="transform: scale(0.4"]'
zoom_one_scale_sel = '.console-ct[style^="transform: scale(0.5"]'
self._sel_rec('rec1')
# set speed x16 and begin playing, expecting a size adjustment
for _ in range(4):
b.click("#player-speed-up")
b.wait_present(default_scale_sel)
b.click("#player-play-pause")
b.wait_present(play_scale_sel)
# wait until sleeping and zoom in
b.wait_in_text(self._term_line(8), "sleep")
b.click("#player-zoom-in")
b.wait_present(zoom_one_scale_sel)
# zoom out while typing fast
b.wait_in_text(self._term_line(9), "localhost")
b.click("#player-zoom-out")
b.wait_present(play_scale_sel)
def _filter(self, inp, occ_dict):
# ignore errors from half-entered timestamps due to searches occuring
# before `set_input_text` is complete
self.allow_journal_messages(".*timestamp.*")
# login and test inputs
b, _ = self._login()
time.sleep(5)
for occ in occ_dict:
for term in occ_dict[occ]:
# enter the search term and wait for the results to return
b.set_input_text(inp, term)
time.sleep(5)
self.assertEqual(b.text(".pf-c-table").count("contractor"), occ)
def testSearch(self):
self._filter(
"#filter-search",
{
0: {
"this should return nothing",
"this should also return nothing",
"0123456789",
},
1: {
"extra commands",
"whoami",
"ssh",
"thisisatest123",
"thisisanothertest456",
},
2: {
"id",
"localhost",
"exit",
"actor",
"contractor",
"contractor1@localhost",
},
},
)
def testFilterUsername(self):
self._filter(
"#filter-username",
{
0: {"test", "contact", "contractor", "contractor11", "contractor4"},
2: {"contractor1"},
},
)
def testFilterSince(self):
self._filter(
"#filter-since",
{
0: {"2020-06-02", "2020-06-01 12:31:00"},
1: {"2020-06-01 12:17:01", "2020-06-01 12:30:50"},
2: {"2020-06-01", "2020-06-01 12:17:00"},
},
)
def testFilterUntil(self):
self._filter(
"#filter-until",
{
0: {"2020-06-01", "2020-06-01 12:16"},
1: {"2020-06-01 12:17", "2020-06-01 12:29"},
2: {"2020-06-02", "2020-06-01 12:31:00"},
},
)
def testAppMenu(self):
srrow = ".app-list .pf-c-data-list__item-row:" \
"contains('Session Recording')"
srbut = "{} button:contains('Session Recording')" \
"".format(srrow)
b, _ = self._login("/apps", srrow)
self.allow_journal_messages(".*chromium-browser.appdata.xml.*",
".*xml.etree.ElementTree.ParseError:.*")
b.click(srbut)
b.enter_page("/session-recording")
b.wait_present("#app")
if __name__ == "__main__":
test_main()