diff --git a/bidi.py b/bidi.py index 7519fb1..7e46e49 100755 --- a/bidi.py +++ b/bidi.py @@ -1,11 +1,9 @@ #!/usr/bin/python3 import asyncio -import base64 import json import logging import socket -import sys import tempfile from dataclasses import dataclass from pathlib import Path @@ -208,22 +206,17 @@ class WebdriverBidi: # safe-guard timeout for avoiding eternally hanging tests return await asyncio.wait_for(future, timeout=60) - # - # BiDi state tracking - # - - def arm_page_load(self): - assert self.future_wait_page_load is None, "already waiting for page load" - self.future_wait_page_load = asyncio.get_event_loop().create_future() - - async def wait_page_load(self, timeout: int = TIMEOUT) -> str: - assert self.future_wait_page_load is not None, "call arm_page_load() first" - try: - url = await asyncio.wait_for(self.future_wait_page_load, timeout=timeout) - self.future_wait_page_load = None - return url - except asyncio.TimeoutError as e: - raise Error("timed out waiting for page load") from e + # this is mostly unused; testlib uses ph_find() due to sizzle + async def locate(self, selector: str) -> str: + r = await self.bidi("browsingContext.locateNodes", context=self.context, + locator={"type": "css", "value": selector}) + nodes = r["nodes"] + if len(nodes) == 0: + raise Error(f"no element found for {selector}") + if len(nodes) > 1: + raise Error(f"selector {selector} is ambiguous: {nodes}") + log_command.debug("locate(%s) = %r", selector, nodes[0]) + return nodes[0] async def switch_to_frame(self, name: str) -> None: frame = await self.locate(f"iframe[name='{name}']") @@ -239,99 +232,6 @@ class WebdriverBidi: self.context = self.top_context log_command.debug("switch_to_top") - # - # High-level helpers - # - - async def locate(self, selector: str) -> str: - r = await self.bidi("browsingContext.locateNodes", context=self.context, - locator={"type": "css", "value": selector}) - nodes = r["nodes"] - if len(nodes) == 0: - raise Error(f"no element found for {selector}") - if len(nodes) > 1: - raise Error(f"selector {selector} is ambiguous: {nodes}") - log_command.debug("locate(%s) = %r", selector, nodes[0]) - return nodes[0] - - async def wait(self, selector: str, timeout: int = TIMEOUT) -> None: - log_command.debug("wait(%s)", selector) - # FIXME: this is very inefficient; use our JS page helper with promise await - last_error = None - for _ in range(timeout * 10): - try: - n = await self.locate(selector) - log_command.debug("wait(%s) success: %r", selector, n) - return - except (WebdriverError, Error, TimeoutError) as e: - last_error = e - await asyncio.sleep(0.1) - else: - raise Error(f"timed out waiting for {selector}: {last_error}") - - async def wait_js_cond(self, cond: str, error_description: str = "null") -> None: - log_command.debug("wait_js_cond(%s)", cond) - await self.bidi("script.evaluate", - expression=f"window.ph_wait_cond(() => {cond}, {TIMEOUT * 1000}, {error_description})", - awaitPromise=True, target={"context": self.context}) - - async def text(self, selector: str) -> str: - # there is no BiDi way of evaluating a script on a particular element or getting - # the text of an element, so just use the standard `.textContent` web platform property - # but first make sure the locator is unique - await self.locate(selector) - r = await self.bidi("script.evaluate", expression=f"document.querySelector('{selector}').textContent", - awaitPromise=False, target={"context": self.context}) - return r['result']['value'] - - async def wait_text(self, selector: str, text: str) -> None: - await self.wait(selector) - await self.wait_js_cond(f"window.ph_text({jsquote(selector)}) === {jsquote(text)}", - error_description=f"() => 'actual text: ' + window.ph_text({jsquote(selector)})") - - async def mouse(self, selector: str, button: int = 0, click_count: int = 1) -> None: - element = await self.locate(selector) - - actions = [{"type": "pointerMove", "x": 0, "y": 0, "origin": {"type": "element", "element": element}}] - for _ in range(click_count): - actions.append({"type": "pointerDown", "button": button}) - actions.append({"type": "pointerUp", "button": button}) - - await self.bidi("input.performActions", context=self.context, actions=[ - { - "id": "pointer-0", - "type": "pointer", - "parameters": {"pointerType": "mouse"}, - "actions": actions, - } - ]) - - async def key(self, value: str) -> None: - await self.bidi("input.performActions", context=self.context, actions=[{ - "type": "key", "id": "key-0", "actions": [ - {"type": "keyDown", "value": value}, - {"type": "keyUp", "value": value}, - ]}]) - - async def input_text(self, text: str) -> None: - actions = [] - for c in text: - actions.append({"type": "keyDown", "value": c}) - actions.append({"type": "keyUp", "value": c}) - await self.bidi("input.performActions", context=self.context, actions=[ - {"type": "key", "id": "key-0", "actions": actions}]) - - async def focus(self, selector: str) -> None: - await self.locate(selector) - await self.bidi("script.evaluate", expression=f"document.querySelector('{selector}').focus()", - awaitPromise=False, target={"context": self.context}) - - async def set_input_text(self, selector: str, text: str) -> None: - await self.focus(selector) - await self.input_text(text) - # TODO: wait for text - await asyncio.sleep(0.2) - class ChromiumBidi(WebdriverBidi): async def start_bidi_session(self) -> None: @@ -439,54 +339,3 @@ class FirefoxBidi(WebdriverBidi): async def close_bidi_session(self): self.writer_marionette.close() await self.writer_marionette.wait_closed() - - -async def main(): - logging.basicConfig(level=logging.DEBUG) - log_proto.setLevel(logging.DEBUG) - log_command.setLevel(logging.DEBUG) - - headless = True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False - cls = FirefoxBidi if len(sys.argv) > 1 and sys.argv[1] == "firefox" else ChromiumBidi - - async with cls(headless=headless) as d: - await d.bidi("script.evaluate", expression="console.log('Hello BiDi')", - awaitPromise=False, target={"context": d.context}) - await d.bidi("browsingContext.navigate", context=d.context, - url="http://127.0.0.2:9091", wait="complete") - - print("\n\nSTEP: logging in") - await d.set_input_text("#login-user-input", "admin") - await d.set_input_text("#login-password-input", "foobar") - - # d.arm_page_load() - # await d.key("Enter") # FIXME: this doesn't work: Neither with Return nor \n - await d.mouse("#login-button") - - # this is optional: wait() can wait across page loads - # print("\n\nSTEP: waiting for page load") - # await d.wait_page_load() - - print("\n\nSTEP: super-user-indicator") - try: - await d.wait_text("#super-user-indicator", "Limited access") - except Error: - s = await d.bidi("browsingContext.captureScreenshot", context=d.top_context, origin="document") - Path("screenshot.png").write_bytes(base64.b64decode(s["data"])) - raise - - print("\n\nSTEP: wait/switch frame") - await d.switch_to_frame('cockpit1:localhost/system') - - print("\n\nSTEP: inspect system frame") - await d.wait(".system-configuration") - t = await d.text(".system-configuration") - assert "Join domain" in t, t - - log_command.info("Collected debug messages:") - for log in d.logs: - log_command.info(log) - - -if __name__ == "__main__": - asyncio.run(main())