From 4c18ab08b053b1ce2a4e80109b758334d8ef5e27 Mon Sep 17 00:00:00 2001 From: Martin Pitt Date: Tue, 23 Jul 2024 08:21:24 +0200 Subject: [PATCH] Refactor run and page helpers --- bidi.py | 116 ++++++++++++++++++++++++++++++++------------------------ 1 file changed, 67 insertions(+), 49 deletions(-) diff --git a/bidi.py b/bidi.py index e0326a3..7387e6d 100755 --- a/bidi.py +++ b/bidi.py @@ -8,6 +8,7 @@ import sys import tempfile from dataclasses import dataclass from pathlib import Path +from typing import Any import aiohttp @@ -70,6 +71,7 @@ class WebdriverBidi: self.logs: list[LogMessage] = [] self.bidi_session: BidiSession | None = None self.future_wait_page_load = None + self.context: str | None = None # default browsingContext async def start_bidi_session(self) -> None: raise NotImplementedError('must be implemented by concrete subclass') @@ -97,6 +99,19 @@ class WebdriverBidi: self.ws = await self.http_session.ws_connect(self.bidi_session.ws_url) self.task_reader = asyncio.create_task(self.ws_reader(self.ws), name="bidi_reader") + await self.bidi("session.subscribe", events=[ + "log.entryAdded", "browsingContext.domContentLoaded", + ]) + + # wait for browser to initialize default context + for _ in range(10): + realms = (await self.bidi("script.getRealms"))["realms"] + if len(realms) > 0: + self.context = realms[0]["context"] + break + else: + raise WebdriverError("timed out waiting for default realm") + async def __aenter__(self): await self.start_session() return self @@ -135,7 +150,7 @@ class WebdriverBidi: logger.error("BiDi failure: %s", msg) break - async def bidi(self, method, **params) -> asyncio.Future: + async def bidi(self, method, **params) -> dict[str, Any]: """Send a Webdriver BiDi command and return the JSON response""" payload = json.dumps({"id": self.last_id, "method": method, "params": params}) @@ -154,64 +169,46 @@ class WebdriverBidi: assert self.future_wait_page_load is not None, "call arm_page_load() first" return await self.future_wait_page_load - async def run(self): - # wait for browser to initialize default context - for _ in range(10): - realms = (await self.bidi("script.getRealms"))["realms"] - if len(realms) > 0: - context = realms[0]["context"] - break - else: - raise WebdriverError("timed out waiting for default realm") + # + # High-level helpers + # - await self.bidi("session.subscribe", events=[ - "log.entryAdded", "browsingContext.domContentLoaded", - ]) + async def locate(self, selector: str) -> str: + r = await self.bidi("browsingContext.locateNodes", context=self.context, + locator={"type": "css", "value": selector}) + nodes = r["nodes"] + if len(nodes) == 0: + raise ValueError(f"no element found for {selector}") + if len(nodes) > 1: + raise ValueError(f"selector {selector} is ambiguous: {nodes}") + return nodes[0] - await self.bidi("script.evaluate", expression="console.log('Hello BiDi')", - awaitPromise=False, target={"context": context}) - await self.bidi("browsingContext.navigate", context=context, - url="https://piware.de", wait="complete") + async def text(self, selector: str) -> str: + # there is no BiDi way of evaluating a script on a particular element or getting + # the text of an element, so just use the standard `.textContent` web platform property + # but first make sure the locator is unique + await self.locate(selector) + r = await self.bidi("script.evaluate", expression=f"document.querySelector('{selector}').textContent", + awaitPromise=False, target={"context": self.context}) + return r['result']['value'] - r = (await self.bidi("browsingContext.locateNodes", context=context, - locator={"type": "css", "value": "#menu-content"}))["nodes"] - assert len(r) == 1 + async def mouse(self, selector: str, button: int = 0, click_count: int = 1) -> None: + element = await self.locate(selector) - r = await self.bidi("script.evaluate", expression="document.querySelector('#menu-content').textContent", - awaitPromise=False, target={"context": context}) - assert 'Addicted to Free Software Development' in r['result']['value'] + actions = [{"type": "pointerMove", "x": 0, "y": 0, "origin": {"type": "element", "element": element}}] + for _ in range(click_count): + actions.append({"type": "pointerDown", "button": button}) + actions.append({"type": "pointerUp", "button": button}) - # locate first social link - r = (await self.bidi("browsingContext.locateNodes", context=context, - locator={"type": "css", "value": "a[rel='me']:first-child"}))["nodes"] - assert len(r) == 1 - - self.arm_page_load() - - # click it - await self.bidi("input.performActions", context=context, actions=[ + await self.bidi("input.performActions", context=self.context, actions=[ { "id": "pointer-0", "type": "pointer", "parameters": {"pointerType": "mouse"}, - "actions": [ - {"type": "pointerMove", "x": 0, "y": 0, "origin": {"type": "element", "element": r[0]}}, - {"type": "pointerDown", "button": 0}, - {"type": "pointerUp", "button": 0}, - ], + "actions": actions, } ]) - url = await self.wait_page_load() - assert url == "https://github.com/martinpitt/" - - logger.info("Collected debug messages:") - for log in self.logs: - logger.info(log) - - if not self.headless: - await asyncio.sleep(3) - class ChromiumBidi(WebdriverBidi): async def start_bidi_session(self) -> None: @@ -323,9 +320,30 @@ class FirefoxBidi(WebdriverBidi): async def main(): logging.basicConfig(level=logging.DEBUG) + + headless = True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False cls = FirefoxBidi if len(sys.argv) > 1 and sys.argv[1] == "firefox" else ChromiumBidi - async with cls(headless=True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False) as d: - await d.run() + + async with cls(headless=headless) as d: + await d.bidi("script.evaluate", expression="console.log('Hello BiDi')", + awaitPromise=False, target={"context": d.context}) + await d.bidi("browsingContext.navigate", context=d.context, + url="https://piware.de", wait="complete") + + assert 'Addicted to Free Software Development' in await d.text("#menu-content") + + # click first social link + d.arm_page_load() + await d.mouse("a[rel='me']:first-child") + url = await d.wait_page_load() + assert url == "https://github.com/martinpitt/", url + + logger.info("Collected debug messages:") + for log in d.logs: + logger.info(log) + + if not headless: + await asyncio.sleep(3) asyncio.run(main())