Add test helpers

This makes waiting for text robust.

script.addPreloadScript() doesn't export the declared functions, so we
need to attach them to `window`. We also don't need all of them.
This commit is contained in:
Martin Pitt 2024-07-24 08:12:12 +02:00
parent ecb9420e8e
commit 91be31b648
2 changed files with 102 additions and 12 deletions

39
bidi.py
View file

@ -69,6 +69,10 @@ def pick_ports(count: int) -> list[int]:
return ports
def jsquote(js: object) -> str:
return json.dumps(js)
class WebdriverBidi:
def __init__(self, headless=False) -> None:
self.headless = headless
@ -117,6 +121,9 @@ class WebdriverBidi:
"log.entryAdded", "browsingContext.domContentLoaded",
])
test_functions = Path("test-functions.js").read_text()
await self.bidi("script.addPreloadScript", functionDeclaration=f"() => {{ {test_functions} }}")
# wait for browser to initialize default context
for _ in range(10):
realms = (await self.bidi("script.getRealms"))["realms"]
@ -147,7 +154,11 @@ class WebdriverBidi:
if "id" in data and data["id"] in self.pending_commands:
log_proto.debug("ws_reader: resolving pending command %i", data["id"])
if data["type"] == "success":
self.pending_commands[data["id"]].set_result(data["result"])
result = data["result"]
if result.get("type") == "exception":
self.pending_commands[data["id"]].set_exception(RuntimeError(result["exceptionDetails"]))
else:
self.pending_commands[data["id"]].set_result(result)
else:
self.pending_commands[data["id"]].set_exception(
WebdriverError(f"{data['type']}: {data['message']}"))
@ -185,8 +196,9 @@ class WebdriverBidi:
future = asyncio.get_event_loop().create_future()
self.pending_commands[self.last_id] = future
self.last_id += 1
# we really expect this to be fast, otherwise the browser crashed; in particular, TIMEOUT is too long
return await asyncio.wait_for(future, timeout=5)
# some calls can take very long (wait for condition);
# safe-guard timeout for avoiding eternally hanging tests
return await asyncio.wait_for(future, timeout=60)
#
# BiDi state tracking
@ -249,6 +261,12 @@ class WebdriverBidi:
else:
raise ValueError(f"timed out waiting for {selector}: {last_error}")
async def wait_js_cond(self, cond: str, error_description: str = "null") -> None:
log_command.debug("wait_js_cond(%s)", cond)
await self.bidi("script.evaluate",
expression=f"window.ph_wait_cond(() => {cond}, {TIMEOUT * 1000}, {error_description})",
awaitPromise=True, target={"context": self.context})
async def text(self, selector: str) -> str:
# there is no BiDi way of evaluating a script on a particular element or getting
# the text of an element, so just use the standard `.textContent` web platform property
@ -258,6 +276,11 @@ class WebdriverBidi:
awaitPromise=False, target={"context": self.context})
return r['result']['value']
async def wait_text(self, selector: str, text: str) -> None:
await self.wait(selector)
await self.wait_js_cond(f"window.ph_text({jsquote(selector)}) === {jsquote(text)}",
error_description=f"() => 'actual text: ' + window.ph_text({jsquote(selector)})")
async def mouse(self, selector: str, button: int = 0, click_count: int = 1) -> None:
element = await self.locate(selector)
@ -438,19 +461,11 @@ async def main():
print("\n\nSTEP: super-user-indicator")
try:
await d.wait("#super-user-indicator")
await d.wait_text("#super-user-indicator", "Limited access")
except ValueError:
s = await d.bidi("browsingContext.captureScreenshot", context=d.top_context, origin="document")
Path("screenshot.png").write_bytes(base64.b64decode(s["data"]))
raise
# FIXME: wait for text helper
for _ in range(5):
t = await d.text("#super-user-indicator")
if t == "Limited access":
break
await asyncio.sleep(0.5)
else:
raise ValueError("timed out waiting for #super-user-indicator text")
print("\n\nSTEP: wait/switch frame")
await d.switch_to_frame('cockpit1:localhost/system')

75
test-functions.js Normal file
View file

@ -0,0 +1,75 @@
window.ph_select = function(sel) {
if (!window.Sizzle) {
return Array.from(document.querySelectorAll(sel));
}
if (sel.includes(":contains(")) {
if (!window.Sizzle) {
throw new Error("Using ':contains' when window.Sizzle is not available.");
}
return window.Sizzle(sel);
} else {
return Array.from(document.querySelectorAll(sel));
}
};
window.ph_find = function(sel) {
const els = window.ph_select(sel);
if (els.length === 0)
throw new Error(sel + " not found");
if (els.length > 1)
throw new Error(sel + " is ambiguous");
return els[0];
};
window.ph_text = function(sel) {
const el = window.ph_find(sel);
if (el.textContent === undefined)
throw new Error(sel + " can not have text");
// 0xa0 is a non-breakable space, which is a rendering detail of Chromium
// and awkward to handle in tests; turn it into normal spaces
return el.textContent.replaceAll("\xa0", " ");
};
window.ph_is_visible = function(sel) {
const el = window.ph_find(sel);
return el.tagName === "svg" || ((el.offsetWidth > 0 || el.offsetHeight > 0) && !(el.style.visibility === "hidden" || el.style.display === "none"));
};
class PhWaitCondTimeout extends Error {
constructor(description) {
if (description && description.apply)
description = description.apply();
if (description)
super(description);
else
super("condition did not become true");
}
}
window.ph_wait_cond = function (cond, timeout, error_description) {
return new Promise((resolve, reject) => {
// poll every 100 ms for now; FIXME: poll less often and re-check on mutations using
// https://developer.mozilla.org/en-US/docs/Web/API/MutationObserver
let stepTimer = null;
let last_err = null;
const tm = window.setTimeout(() => {
if (stepTimer)
window.clearTimeout(stepTimer);
reject(last_err || new PhWaitCondTimeout(error_description));
}, timeout);
function step() {
try {
if (cond()) {
window.clearTimeout(tm);
resolve();
return;
}
} catch (err) {
last_err = err;
}
stepTimer = window.setTimeout(step, 100);
}
step();
});
};