starter-kit/test/check-application
Matej Marusak d2708cf533 test: Enter session-recording page
Cockpit can have multiple pages opened at the same time. This is
handled through iframes. When switching between pages we need to tell
tests that we will be now working with different iframe.

Before this test was checking `b.wait_present("#app")` in `/apps` page
and not in `/session-recording` page.
2021-08-12 09:27:57 -04:00

405 lines
15 KiB
Python
Executable file
Raw Blame History

#!/usr/bin/python3
# Run this with --help to see available options for tracing and debugging
# See https://github.com/cockpit-project/cockpit/blob/master/test/common/testlib.py
# "class Browser" and "class MachineCase" for the available API.
import os
import sys
import time
import json
import configparser
# import Cockpit's machinery for test VMs and its browser test API
TEST_DIR = os.path.dirname(__file__)
sys.path.append(os.path.join(TEST_DIR, "common"))
sys.path.append(os.path.join(os.path.dirname(TEST_DIR), "bots/machine"))
from testlib import *
# Test with pre-recorded journal with tlog UID 981
class TestApplication(MachineCase):
def _login(self, loc="/session-recording", wait="#app"):
self.login_and_go(loc)
b = self.browser
m = self.machine
b.wait_present(wait)
self.allow_journal_messages('.*type=1400.*avc: denied .* comm="systemctl".*')
return b, m
def _sel_rec(self, recording):
'''
rec1:
whoami
id
echo thisisatest123
sleep 16
echo thisisanothertest456
exit
rec2:
echo "Extra Commands"
sudo systemctl daemon-reload
sudo ssh root@localhost
exit
binaryrec:
cat /usr/bin/gzip
'''
recordings = {'rec1': '0f25700a28c44b599869745e5fda8b0c-7106-121e79',
'rec2': '0f25700a28c44b599869745e5fda8b0c-7623-135541',
'binaryrec': '6c652ee938b3485894dbacbb8c7c2c61-5a4-38a5'}
page = recordings[recording]
self.browser.go(f"/session-recording#/{page}")
def _term_line(self, lineno):
return f".xterm-accessibility-tree div:nth-child({lineno})"
def testPlay(self):
b, _ = self._login()
self._sel_rec('rec1')
b.click("#player-play-pause")
b.wait_in_text(self._term_line(1), "localhost")
def testPlayBinary(self):
b, _ = self._login()
self._sel_rec('binaryrec')
b.click("#player-play-pause")
time.sleep(5)
b.wait_in_text(self._term_line(21), "<EFBFBD>")
b.wait_in_text(self._term_line(25), "exit")
def testFastforwardControls(self):
progress = ".pf-c-progress__indicator"
b, _ = self._login()
self._sel_rec('rec1')
# fast forward
b.click("#player-fast-forward")
b.wait_in_text(self._term_line(12), "exit")
b.wait_attr(progress, "style", "width: 100%;")
# test restart playback
b.click("#player-restart")
b.wait_text(self._term_line(1), "Blank line")
b.wait_attr(progress, "style", "width: 100%;")
def testSpeedControls(self):
b, _ = self._login()
self._sel_rec('rec1')
# increase speed
b.wait_present("#player-speed-up")
b.click("#player-speed-up")
b.wait_text("#player-speed", "x2")
b.click("#player-speed-up")
b.wait_text("#player-speed", "x4")
b.click("#player-speed-up")
b.wait_text("#player-speed", "x8")
b.click("#player-speed-up")
b.wait_text("#player-speed", "x16")
# decrease speed
b.click("#player-speed-down")
b.wait_text("#player-speed", "x8")
b.click("#player-speed-down")
b.wait_text("#player-speed", "x4")
b.click("#player-speed-down")
b.wait_text("#player-speed", "x2")
b.click("#player-speed-down")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/2")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/4")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/8")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/16")
# restore speed
b.click(".pf-c-chip .pf-c-button")
b.click("#player-speed-down")
b.wait_text("#player-speed", "/2")
def testZoomControls(self):
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
zoom_one_scale_sel = '.console-ct[style^="transform: scale(1.1)"]'
zoom_two_scale_sel = '.console-ct[style^="transform: scale(1.2)"]'
zoom_three_scale_sel = '.console-ct[style^="transform: scale(1.3)"]'
zoom_fit_to = (
'.console-ct[style*="translate(-50%, -50%)"]'
'[style*="top: 50%"]'
'[style*="left: 50%"]'
)
b, _ = self._login()
self._sel_rec('rec1')
# Wait for terminal with scale(1)
b.wait_present(default_scale_sel)
# Zoom in x3
b.click("#player-zoom-in")
b.wait_present(zoom_one_scale_sel)
b.click("#player-zoom-in")
b.wait_present(zoom_two_scale_sel)
b.click("#player-zoom-in")
b.wait_present(zoom_three_scale_sel)
# Zoom Out
b.click("#player-zoom-out")
b.wait_present(zoom_two_scale_sel)
# Fit zoom to screen
b.click("#player-fit-to")
b.wait_present(zoom_fit_to)
def testSkipFrame(self):
b, _ = self._login()
self._sel_rec('rec1')
b.wait_present(self._term_line(1))
# loop until 3 valid frames have passed
while "localhost" not in b.text(self._term_line(1)):
b.click("#player-skip-frame")
b.wait_in_text(self._term_line(1), "localhost")
def testPlaybackPause(self):
b, _ = self._login()
self._sel_rec('rec1')
# Start and pause the player
b.click("#player-restart")
b.click("#player-play-pause")
b.click("#player-play-pause")
time.sleep(10)
# Make sure it didn't keep playing
b.wait_not_in_text(self._term_line(1), "whoami")
# Test if it can start playing again
b.click("#player-play-pause")
def testSessionRecordingConf(self):
b, m = self._login()
b.click("#btn-config")
# TLOG config
conf_file_path = "/etc/tlog/"
conf_file = f"{conf_file_path}tlog-rec-session.conf"
save_file = "/tmp/tlog-rec-session.conf"
test_file = "/tmp/test-tlog-rec-session.conf"
# Save the existing config
b.click("#btn-save-tlog-conf")
m.download(conf_file, save_file)
# Change all of the fields
b.set_input_text("#shell", "/test/path/shell")
b.set_input_text("#notice", "Test Notice")
b.set_input_text("#latency", "1")
b.set_input_text("#payload", "2")
b.set_checked("#log_input", True)
b.set_checked("#log_output", False)
b.set_checked("#log_window", False)
b.set_input_text("#limit_rate", "3")
b.set_input_text("#limit_burst", "4")
b.set_val("#limit_action", "drop")
b.set_input_text("#file_path", "/test/path/file")
b.set_input_text("#syslog_facility", "testfac")
b.set_val("#syslog_priority", "info")
b.set_val("#journal_priority", "info")
b.set_checked("#journal_augment", False)
b.set_val("#writer", "file")
b.click("#btn-save-tlog-conf")
time.sleep(1)
m.download(conf_file, test_file)
# Revert to the previous config before testing to ensure test continuity
m.upload([save_file], conf_file_path)
# Check that the config reflects the changes
conf = json.load(open(test_file, "r"))
self.assertEqual(
json.dumps(conf),
json.dumps(
{
"shell": "/test/path/shell",
"notice": "Test Notice",
"latency": 1,
"payload": 2,
"log": {"input": True, "output": False, "window": False},
"limit": {"rate": 3, "burst": 4, "action": "drop"},
"file": {"path": "/test/path/file"},
"syslog": {"facility": "testfac", "priority": "info"},
"journal": {"priority": "info", "augment": False},
"writer": "file",
}
),
)
# SSSD config
conf_file_path = "/etc/sssd/conf.d/"
conf_file = f"{conf_file_path}sssd-session-recording.conf"
save_file = "/tmp/sssd-session-recording.conf"
test_none_file = "/tmp/test-none-sssd-session-recording.conf"
test_some_file = "/tmp/test-some-sssd-session-recording.conf"
test_all_file = "/tmp/test-all-sssd-session-recording.conf"
# Save the existing config
b.click("#btn-save-sssd-conf")
time.sleep(1)
m.download(conf_file, save_file)
# Download test with scope 'None'
b.set_val("#scope", "none")
b.click("#btn-save-sssd-conf")
time.sleep(1)
m.download(conf_file, test_none_file)
# Download test with scope 'Some'
b.set_val("#scope", "some")
b.set_input_text("#users", "test users")
b.set_input_text("#groups", "test groups")
b.click("#btn-save-sssd-conf")
time.sleep(1)
m.download(conf_file, test_some_file)
# Download test with scope 'All'
b.set_val("#scope", "all")
b.click("#btn-save-sssd-conf")
time.sleep(1)
m.download(conf_file, test_all_file)
# Revert to the previous config before testing to ensure test continuity
m.upload([save_file], conf_file_path)
# Check that the configs reflected the changes
conf = configparser.ConfigParser()
conf.read_file(open(test_none_file, "r"))
self.assertEqual(conf["session_recording"]["scope"], "none")
conf.read_file(open(test_some_file, "r"))
self.assertEqual(conf["session_recording"]["scope"], "some")
self.assertEqual(conf["session_recording"]["users"], "test users")
self.assertEqual(conf["session_recording"]["groups"], "test groups")
conf.read_file(open(test_all_file, "r"))
self.assertEqual(conf["session_recording"]["scope"], "all")
def testDisplayDrag(self):
b, _ = self._login()
self._sel_rec('rec1')
# start playback and pause in middle
b.click("#player-play-pause")
b.wait_in_text(self._term_line(1), "localhost")
b.click("#player-play-pause")
# zoom in so that the whole screen is no longer visible
b.click("#player-zoom-in")
b.click("#player-zoom-in")
# select and ensure drag'n'pan mode
b.click("#player-drag-pan")
# scroll and check for screen movement
b.mouse(".dragnpan", "mousedown", 200, 200)
b.mouse(".dragnpan", "mousemove", 10, 10)
self.assertNotEqual(b.attr(".dragnpan", "scrollTop"), 0)
self.assertNotEqual(b.attr(".dragnpan", "scrollLeft"), 0)
def testLogCorrelation(self):
b, m = self._login()
# make sure system is on expected timezone EST
m.execute("timedatectl set-timezone America/New_York")
# select the recording with the extra logs
self._sel_rec('rec2')
b.click("#btn-logs-view .pf-c-expandable-section__toggle")
# fast forward until the end
while "exit" not in b.text(self._term_line(22)):
b.click("#player-skip-frame")
# check for extra log entries
b.wait_present(".pf-c-data-list:contains('authentication failure')")
def testZoomSpeedControls(self):
b, m = self._login()
default_scale_sel = '.console-ct[style^="transform: scale(1)"]'
if m.image.startswith('rhel') or m.image.startswith('centos'):
play_scale_sel = '.console-ct[style^="transform: scale(0.3"]'
zoom_one_scale_sel = '.console-ct[style^="transform: scale(0.4"]'
else:
play_scale_sel = '.console-ct[style^="transform: scale(0.4"]'
zoom_one_scale_sel = '.console-ct[style^="transform: scale(0.5"]'
self._sel_rec('rec1')
# set speed x16 and begin playing, expecting a size adjustment
for _ in range(4):
b.click("#player-speed-up")
b.wait_present(default_scale_sel)
b.click("#player-play-pause")
b.wait_present(play_scale_sel)
# wait until sleeping and zoom in
b.wait_in_text(self._term_line(8), "sleep")
b.click("#player-zoom-in")
b.wait_present(zoom_one_scale_sel)
# zoom out while typing fast
b.wait_in_text(self._term_line(9), "localhost")
b.click("#player-zoom-out")
b.wait_present(play_scale_sel)
def _filter(self, inp, occ_dict):
# ignore errors from half-entered timestamps due to searches occuring
# before `set_input_text` is complete
self.allow_journal_messages(".*timestamp.*")
# login and test inputs
b, _ = self._login()
time.sleep(5)
for occ in occ_dict:
for term in occ_dict[occ]:
# enter the search term and wait for the results to return
b.set_input_text(inp, term)
time.sleep(5)
self.assertEqual(b.text(".pf-c-table").count("contractor"), occ)
def testSearch(self):
self._filter(
"#filter-search",
{
0: {
"this should return nothing",
"this should also return nothing",
"0123456789",
},
1: {
"extra commands",
"whoami",
"ssh",
"thisisatest123",
"thisisanothertest456",
},
2: {
"id",
"localhost",
"exit",
"actor",
"contractor",
"contractor1@localhost",
},
},
)
def testFilterUsername(self):
self._filter(
"#filter-username",
{
0: {"test", "contact", "contractor", "contractor11", "contractor4"},
2: {"contractor1"},
},
)
def testFilterSince(self):
self._filter(
"#filter-since",
{
0: {"2020-06-02", "2020-06-01 12:31:00"},
1: {"2020-06-01 12:17:01", "2020-06-01 12:30:50"},
2: {"2020-06-01", "2020-06-01 12:17:00"},
},
)
def testFilterUntil(self):
self._filter(
"#filter-until",
{
0: {"2020-06-01", "2020-06-01 12:16"},
1: {"2020-06-01 12:17", "2020-06-01 12:29"},
2: {"2020-06-02", "2020-06-01 12:31:00"},
},
)
def testAppMenu(self):
srrow = ".app-list .pf-c-data-list__item-row:" \
"contains('Session Recording')"
srbut = "{} button:contains('Session Recording')" \
"".format(srrow)
b, _ = self._login("/apps", srrow)
b.click(srbut)
b.enter_page("/session-recording")
b.wait_present("#app")
if __name__ == "__main__":
test_main()