diff --git a/bidi.py b/bidi.py index 3295c4b..1e39015 100755 --- a/bidi.py +++ b/bidi.py @@ -12,7 +12,8 @@ from typing import Any import aiohttp -logger = logging.getLogger(__name__) +log_proto = logging.getLogger("bidi.proto") +log_command = logging.getLogger("bidi.command") class WebdriverError(RuntimeError): @@ -75,7 +76,8 @@ class WebdriverBidi: self.logs: list[LogMessage] = [] self.bidi_session: BidiSession | None = None self.future_wait_page_load = None - self.context: str | None = None # default browsingContext + self.top_context: str | None = None # top-level browsingContext + self.context: str | None # currently selected context (top or iframe) async def start_bidi_session(self) -> None: raise NotImplementedError('must be implemented by concrete subclass') @@ -85,7 +87,7 @@ class WebdriverBidi: async def close(self): assert self.bidi_session is not None - logger.debug("cleaning up webdriver") + log_proto.debug("cleaning up webdriver") self.task_reader.cancel() del self.task_reader @@ -111,13 +113,15 @@ class WebdriverBidi: for _ in range(10): realms = (await self.bidi("script.getRealms"))["realms"] if len(realms) > 0: - self.context = realms[0]["context"] + self.top_context = realms[0]["context"] + self.context = self.top_context break else: raise WebdriverError("timed out waiting for default realm") # avoid not seeing elements due to too small window - await self.bidi("browsingContext.setViewport", context=self.context, viewport={"width": 1024, "height": 5000}) + # await self.bidi("browsingContext.setViewport", context=self.top_context, + # viewport={"width": 1024, "height": 5000}) async def __aenter__(self): await self.start_session() @@ -131,9 +135,9 @@ class WebdriverBidi: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) - logger.debug("ws TEXT → %r", data) + log_proto.debug("ws TEXT → %r", data) if "id" in data and data["id"] in self.pending_commands: - logger.debug("ws_reader: resolving pending command %i", data["id"]) + log_proto.debug("ws_reader: resolving pending command %i", data["id"]) if data["type"] == "success": self.pending_commands[data["id"]].set_result(data["result"]) else: @@ -144,31 +148,41 @@ class WebdriverBidi: if data["type"] == "event": if data["method"] == "log.entryAdded": - self.logs.append(LogMessage(data["params"])) + log = LogMessage(data["params"]) + self.logs.append(log) + log_command.info(str(log)) continue if data["method"] == "browsingContext.domContentLoaded": if self.future_wait_page_load: - logger.debug("page loaded: %r, resolving wait page load future", data["params"]) + log_command.debug("page loaded: %r, resolving wait page load future", data["params"]) self.future_wait_page_load.set_result(data["params"]["url"]) else: - logger.debug("page loaded: %r (not awaited)", data["params"]) + log_command.debug("page loaded: %r (not awaited)", data["params"]) continue + # if data["method"] == "script.realmCreated": + # realms = await self.bidi("script.getRealms") + # log_command.warning("XXX script.realmCreated new: %r", realms) - logger.warning("ws_reader: unhandled message %r", data) + log_proto.warning("ws_reader: unhandled message %r", data) elif msg.type == aiohttp.WSMsgType.ERROR: - logger.error("BiDi failure: %s", msg) + log_proto.error("BiDi failure: %s", msg) break async def bidi(self, method, **params) -> dict[str, Any]: """Send a Webdriver BiDi command and return the JSON response""" payload = json.dumps({"id": self.last_id, "method": method, "params": params}) - logger.debug("ws ← %r", payload) + log_proto.debug("ws ← %r", payload) await self.ws.send_str(payload) future = asyncio.get_event_loop().create_future() self.pending_commands[self.last_id] = future self.last_id += 1 - return await future + # we really expect this to be fast, otherwise the browser crashed; in particular, TIMEOUT is too long + return await asyncio.wait_for(future, timeout=5) + + # + # BiDi state tracking + # def arm_page_load(self): assert self.future_wait_page_load is None, "already waiting for page load" @@ -183,6 +197,20 @@ class WebdriverBidi: except asyncio.TimeoutError as e: raise ValueError("timed out waiting for page load") from e + async def switch_to_frame(self, name: str) -> None: + frame = await self.locate(f"iframe[name='{name}']") + cw = await self.bidi("script.callFunction", + functionDeclaration="f => f.contentWindow", + arguments=[frame], + awaitPromise=False, + target={"context": self.top_context}) + self.context = cw["result"]["value"]["context"] + log_command.debug("switch_to_frame(%s)", name) + + def switch_to_top(self) -> None: + self.context = self.top_context + log_command.debug("switch_to_top") + # # High-level helpers # @@ -195,8 +223,24 @@ class WebdriverBidi: raise ValueError(f"no element found for {selector}") if len(nodes) > 1: raise ValueError(f"selector {selector} is ambiguous: {nodes}") + log_command.debug("locate(%s) = %r", selector, nodes[0]) return nodes[0] + async def wait(self, selector: str, timeout: int = TIMEOUT) -> None: + log_command.debug("wait(%s)", selector) + # FIXME: this is very inefficient; use our JS page helper with promise await + last_error = None + for _ in range(timeout * 10): + try: + n = await self.locate(selector) + log_command.debug("wait(%s) success: %r", selector, n) + return + except (WebdriverError, ValueError, TimeoutError) as e: + last_error = e + await asyncio.sleep(0.1) + else: + raise ValueError(f"timed out waiting for {selector}: {last_error}") + async def text(self, selector: str) -> str: # there is no BiDi way of evaluating a script on a particular element or getting # the text of an element, so just use the standard `.textContent` web platform property @@ -223,6 +267,32 @@ class WebdriverBidi: } ]) + async def key(self, value: str) -> None: + await self.bidi("input.performActions", context=self.context, actions=[{ + "type": "key", "id": "key-0", "actions": [ + {"type": "keyDown", "value": value}, + {"type": "keyUp", "value": value}, + ]}]) + + async def input_text(self, text: str) -> None: + actions = [] + for c in text: + actions.append({"type": "keyDown", "value": c}) + actions.append({"type": "keyUp", "value": c}) + await self.bidi("input.performActions", context=self.context, actions=[ + {"type": "key", "id": "key-0", "actions": actions}]) + + async def focus(self, selector: str) -> None: + await self.locate(selector) + await self.bidi("script.evaluate", expression=f"document.querySelector('{selector}').focus()", + awaitPromise=False, target={"context": self.context}) + + async def set_input_text(self, selector: str, text: str) -> None: + await self.focus(selector) + await self.input_text(text) + # TODO: wait for text + await asyncio.sleep(0.2) + class ChromiumBidi(WebdriverBidi): async def start_bidi_session(self) -> None: @@ -248,10 +318,10 @@ class ChromiumBidi(WebdriverBidi): async with self.http_session.post(f"{wd_url}/session", data=json.dumps(session_args).encode()) as resp: session_info = json.loads(await resp.text())["value"] - logger.debug("webdriver session request: %r %r", resp, session_info) + log_proto.debug("webdriver session request: %r %r", resp, session_info) break except (IOError, aiohttp.client.ClientResponseError) as e: - logger.debug("waiting for webdriver: %s", e) + log_proto.debug("waiting for webdriver: %s", e) await asyncio.sleep(0.1 * retry) else: raise WebdriverError("could not connect to chromedriver") @@ -260,7 +330,7 @@ class ChromiumBidi(WebdriverBidi): session_url=f"{wd_url}/session/{session_info['sessionId']}", ws_url=session_info["capabilities"]["webSocketUrl"], process=driver) - logger.debug("Established chromium session %r", self.bidi_session) + log_proto.debug("Established chromium session %r", self.bidi_session) async def close_bidi_session(self): await self.http_session.delete(self.bidi_session.session_url) @@ -305,7 +375,7 @@ class FirefoxBidi(WebdriverBidi): reader, self.writer_marionette = await asyncio.open_connection("127.0.0.1", marionette_port) break except ConnectionRefusedError as e: - logger.debug("waiting for firefox marionette: %s", e) + log_proto.debug("waiting for firefox marionette: %s", e) await asyncio.sleep(1) else: raise WebdriverError("could not connect to firefox marionette") @@ -321,11 +391,11 @@ class FirefoxBidi(WebdriverBidi): reply = json.loads(reply[reply.index(b":") + 1:].decode()) if not isinstance(reply, list) or len(reply) != 4 or not isinstance(reply[3], dict): raise WebdriverError(f"unexpected marionette session request reply: {reply!r}") - logger.debug("marionette session request reply: %s", reply) + log_proto.debug("marionette session request reply: %s", reply) url = reply[3]["capabilities"]["webSocketUrl"] self.bidi_session = BidiSession(session_url=url, ws_url=url, process=driver) - logger.debug("Established firefox session %r", self.bidi_session) + log_proto.debug("Established firefox session %r", self.bidi_session) async def close_bidi_session(self): self.writer_marionette.close() @@ -334,6 +404,8 @@ class FirefoxBidi(WebdriverBidi): async def main(): logging.basicConfig(level=logging.DEBUG) + log_proto.setLevel(logging.DEBUG) + log_command.setLevel(logging.DEBUG) headless = True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False cls = FirefoxBidi if len(sys.argv) > 1 and sys.argv[1] == "firefox" else ChromiumBidi @@ -342,22 +414,42 @@ async def main(): await d.bidi("script.evaluate", expression="console.log('Hello BiDi')", awaitPromise=False, target={"context": d.context}) await d.bidi("browsingContext.navigate", context=d.context, - url="https://piware.de", wait="complete") + url="http://127.0.0.2:9091", wait="complete") - assert 'Addicted to Free Software Development' in await d.text("#menu-content") + print("\n\nSTEP: logging in") + await d.set_input_text("#login-user-input", "admin") + await d.set_input_text("#login-password-input", "foobar") - # click first social link - d.arm_page_load() - await d.mouse("a[rel='me']:first-child") - url = await d.wait_page_load() - assert url == "https://github.com/martinpitt/", url + # d.arm_page_load() + # await d.key("Enter") # FIXME: this doesn't work: Neither with Return nor \n + await d.mouse("#login-button") - logger.info("Collected debug messages:") + # this is optional: wait() can wait across page loads + # print("\n\nSTEP: waiting for page load") + # await d.wait_page_load() + + print("\n\nSTEP: super-user-indicator") + await d.wait("#super-user-indicator") + # FIXME: wait for text helper + for _ in range(5): + t = await d.text("#super-user-indicator") + if t == "Limited access": + break + await asyncio.sleep(0.5) + else: + raise ValueError("timed out waiting for #super-user-indicator text") + + print("\n\nSTEP: wait/switch frame") + await d.switch_to_frame('cockpit1:localhost/system') + + print("\n\nSTEP: inspect system frame") + await d.wait(".system-configuration") + t = await d.text(".system-configuration") + assert "Join domain" in t, t + + log_command.info("Collected debug messages:") for log in d.logs: - logger.info(log) - - if not headless: - await asyncio.sleep(3) + log_command.info(log) asyncio.run(main())