move to aiohttp

This commit is contained in:
Martin Pitt 2024-07-19 18:12:41 +02:00
parent e34f982375
commit 577ec26307

70
bidi.py Normal file → Executable file
View file

@ -1,3 +1,5 @@
#!/usr/bin/python3
import asyncio import asyncio
import json import json
import logging import logging
@ -6,7 +8,7 @@ import time
import urllib.request import urllib.request
from dataclasses import dataclass from dataclasses import dataclass
import websockets import aiohttp
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -80,34 +82,35 @@ class WebdriverBidi:
async def ws_reader(self) -> None: async def ws_reader(self) -> None:
assert self.ws assert self.ws
while True: async for msg in self.ws:
try: if msg.type == aiohttp.WSMsgType.TEXT:
msg = json.loads(await self.ws.recv()) data = json.loads(msg.data)
except websockets.exceptions.ConnectionClosedOK: logger.debug("ws TEXT → %r", data)
logger.debug("ws_reader connection closed") if "id" in data and data["id"] in self.pending_commands:
break logger.debug("ws_reader: resolving pending command %i", data["id"])
logger.debug("ws → %r", msg) if data["type"] == "success":
if "id" in msg and msg["id"] in self.pending_commands: self.pending_commands[data["id"]].set_result(data["result"])
logger.debug("ws_reader: resolving pending command %i", msg["id"]) else:
if msg["type"] == "success": self.pending_commands[data["id"]].set_exception(
self.pending_commands[msg["id"]].set_result(msg["result"]) WebdriverError(f"{data['type']}: {data['message']}"))
else: del self.pending_commands[data["id"]]
self.pending_commands[msg["id"]].set_exception(WebdriverError(f"{msg['type']}: {msg['message']}"))
del self.pending_commands[msg["id"]]
continue
if msg["type"] == "event":
if msg["method"] == "log.entryAdded":
self.logs.append(LogMessage(msg["params"]))
continue continue
logger.warning("ws_reader: unhandled message %r", msg) if data["type"] == "event":
if data["method"] == "log.entryAdded":
self.logs.append(LogMessage(data["params"]))
continue
logger.warning("ws_reader: unhandled message %r", data)
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error("BiDi failure: %s", msg)
break
async def command(self, method, **params) -> asyncio.Future: async def command(self, method, **params) -> asyncio.Future:
assert self.ws assert self.ws
payload = json.dumps({"id": self.last_id, "method": method, "params": params}) payload = json.dumps({"id": self.last_id, "method": method, "params": params})
logger.debug("ws ← %r", payload) logger.debug("ws ← %r", payload)
await self.ws.send(payload) await self.ws.send_str(payload)
future = asyncio.get_event_loop().create_future() future = asyncio.get_event_loop().create_future()
self.pending_commands[self.last_id] = future self.pending_commands[self.last_id] = future
self.last_id += 1 self.last_id += 1
@ -115,19 +118,20 @@ class WebdriverBidi:
async def run(self): async def run(self):
# open bidi websocket for session # open bidi websocket for session
async with websockets.connect(self.session_info["capabilities"]["webSocketUrl"]) as ws: async with aiohttp.ClientSession() as session:
self.ws = ws async with session.ws_connect(self.session_info["capabilities"]["webSocketUrl"]) as ws:
self.task_reader = asyncio.create_task(self.ws_reader(), name="bidi_reader") self.ws = ws
self.task_reader = asyncio.create_task(self.ws_reader(), name="bidi_reader")
await self.command("session.subscribe", events=["log.entryAdded"]) await self.command("session.subscribe", events=["log.entryAdded"])
context = (await self.command("browsingContext.create", type="tab"))["context"] context = (await self.command("browsingContext.create", type="tab"))["context"]
await self.command("script.evaluate", expression="console.log('Hello BiDi')", await self.command("script.evaluate", expression="console.log('Hello BiDi')",
awaitPromise=False, target={"context": context}) awaitPromise=False, target={"context": context})
await self.command("browsingContext.navigate", context=context, url="https://piware.de") await self.command("browsingContext.navigate", context=context, url="https://piware.de")
await asyncio.sleep(5) await asyncio.sleep(5)
self.task_reader.cancel() self.task_reader.cancel()
del self.task_reader del self.task_reader
def __del__(self): def __del__(self):
logger.debug("cleaning up webdriver") logger.debug("cleaning up webdriver")