diff --git a/bidi.py b/bidi.py index 4033a08..7b77489 100755 --- a/bidi.py +++ b/bidi.py @@ -13,6 +13,9 @@ import aiohttp logger = logging.getLogger(__name__) +# https://w3c.github.io/webdriver/#dfn-find-elements +EL_ID = 'element-6066-11e4-a52e-4f735466cecf' + class WebdriverError(RuntimeError): pass @@ -83,6 +86,7 @@ class WebdriverBidi: self.session_info = resp["value"] self.last_id = 0 self.ws = None + self.session = None self.pending_commands: dict[int, asyncio.Future] = {} self.logs: list[LogMessage] = [] @@ -112,7 +116,9 @@ class WebdriverBidi: logger.error("BiDi failure: %s", msg) break - async def command(self, method, **params) -> asyncio.Future: + async def bidi(self, method, **params) -> asyncio.Future: + """Send a Webdriver BiDI command and return the JSON response""" + assert self.ws payload = json.dumps({"id": self.last_id, "method": method, "params": params}) logger.debug("ws ← %r", payload) @@ -122,18 +128,59 @@ class WebdriverBidi: self.last_id += 1 return await future + async def webdriver(self, path: str, data: dict | None = None) -> dict: + """Send a classic Webdriver request and return the JSON response""" + + assert self.session + # asyncio shares the connection + url = f"{self.webdriver_url}/session/{self.session_info['sessionId']}/{path}" + post_data = json.dumps(data).encode() if data is not None else None + method = "POST" if post_data is not None else "GET" + + async with self.session.request(method, url, data=post_data) as resp: + r = await resp.text() + logger.debug("webdriver %s %s %r → %r", method, path, post_data, r) + return json.loads(r) + async def run(self): # open bidi websocket for session - async with aiohttp.ClientSession() as session: + async with aiohttp.ClientSession(raise_for_status=True) as session: + self.session = session async with session.ws_connect(self.session_info["capabilities"]["webSocketUrl"]) as ws: self.ws = ws self.task_reader = asyncio.create_task(self.ws_reader(), name="bidi_reader") - await self.command("session.subscribe", events=["log.entryAdded"]) - context = (await self.command("browsingContext.create", type="tab"))["context"] - await self.command("script.evaluate", expression="console.log('Hello BiDi')", - awaitPromise=False, target={"context": context}) - await self.command("browsingContext.navigate", context=context, url="https://piware.de") + # wait for browser to initialize default context + for _ in range(10): + realms = (await self.bidi("script.getRealms"))["realms"] + if len(realms) > 0: + context = realms[0]["context"] + break + else: + raise WebdriverError("timed out waiting for default realm") + + await self.bidi("session.subscribe", events=["log.entryAdded"]) + + await self.bidi("script.evaluate", expression="console.log('Hello BiDi')", + awaitPromise=False, target={"context": context}) + await self.bidi("browsingContext.navigate", context=context, + url="https://piware.de", wait="complete") + + r = (await self.bidi("browsingContext.locateNodes", context=context, + locator={"type": "css", "value": "#menu-content"}))["nodes"] + assert len(r) == 1 + menu_content_id = r[0]['sharedId'] + + # this doensn't yet have a BiDi command + r = await self.webdriver(f"element/{menu_content_id}/text") + assert 'ADDICTED TO FREE SOFTWARE DEVELOPMENT' in r['value'] + + # locate first social link + r = (await self.bidi("browsingContext.locateNodes", context=context, + locator={"type": "css", "value": "a[rel='me']:first-child"}))["nodes"] + assert len(r) == 1 + # click it (again, no BiDi command) + await self.webdriver(f"element/{r[0]['sharedId']}/click", {}) if not self.headless: await asyncio.sleep(3)