diff --git a/bidi.py b/bidi.py index 391c424..9292ca6 100755 --- a/bidi.py +++ b/bidi.py @@ -3,24 +3,15 @@ import asyncio import json import logging -import subprocess import sys -import time +import tempfile from dataclasses import dataclass +from pathlib import Path import aiohttp logger = logging.getLogger(__name__) -# https://w3c.github.io/webdriver/#dfn-find-elements -EL_ID = 'element-6066-11e4-a52e-4f735466cecf' - -DRIVERS = { - "chromium": "chromedriver", - # TODO: not packaged, get from https://github.com/mozilla/geckodriver/releases - "firefox": "/tmp/geckodriver", -} - class WebdriverError(RuntimeError): pass @@ -54,7 +45,7 @@ class Session: class WebdriverBidi: - def __init__(self, browser, headless=False) -> None: + def __init__(self, headless=False) -> None: self.headless = headless self.last_id = 0 self.pending_commands: dict[int, asyncio.Future] = {} @@ -62,55 +53,30 @@ class WebdriverBidi: self.session: Session | None = None self.future_wait_page_load = None - # TODO: make dynamic - self.webdriver_port = 12345 + async def start_session(self) -> None: + raise NotImplementedError('must be implemented by concrete subclass') - chrome_binary = "/usr/lib64/chromium-browser/headless_shell" if self.headless else "/usr/bin/chromium-browser" + async def close_session(self) -> None: + raise NotImplementedError('must be implemented by concrete subclass') - self.session_args = {"capabilities": { - "alwaysMatch": { - "webSocketUrl": True, - "goog:chromeOptions": {"binary": chrome_binary}, - "moz:firefoxOptions": {"args": ["-headless"] if self.headless else []} - } - }} + async def close(self): + assert self.session is not None + logger.debug("cleaning up webdriver") - try: - self.driver = subprocess.Popen([DRIVERS[browser], "--port=" + str(self.webdriver_port)]) - except KeyError as e: - raise ValueError(f"unknown browser {browser}") from e + self.session.task_reader.cancel() + del self.session.task_reader + await self.session.ws.close() + await self.close_session() + await self.session.http.close() + self.session = None - async def ensure_session(self) -> None: + async def __aenter__(self): + await self.start_session() + return self + + async def __aexit__(self, *_excinfo): if self.session is not None: - return - - aiohttp_session = aiohttp.ClientSession(raise_for_status=True) - wd_url = f"http://localhost:{self.webdriver_port}" - - # webdriver needs some time to launch - for retry in range(1, 10): - try: - async with aiohttp_session.post(f"{wd_url}/session", - data=json.dumps(self.session_args).encode()) as resp: - session_info = json.loads(await resp.text())["value"] - logger.debug("webdriver session request: %r %r", resp, session_info) - break - except (IOError, aiohttp.client.ClientResponseError) as e: - logger.debug("waiting for webdriver: %s", e) - time.sleep(0.1 * retry) - else: - raise WebdriverError("could not connect to webdriver") - - ws = await aiohttp_session.ws_connect(session_info["capabilities"]["webSocketUrl"]) - - self.session = Session( - http=aiohttp_session, - ws=ws, - session_url=f"{wd_url}/session/{session_info['sessionId']}", - task_reader=asyncio.create_task(self.ws_reader(ws), name="bidi_reader") - ) - - logger.debug("Established session %r", self.session) + await self.close() async def ws_reader(self, ws: aiohttp.client.ClientWebSocketResponse) -> None: async for msg in ws: @@ -155,20 +121,6 @@ class WebdriverBidi: self.last_id += 1 return await future - async def webdriver(self, path: str, data: dict | None = None, method: str | None = None) -> dict: - """Send a classic Webdriver request and return the JSON response""" - - assert self.session - - # asyncio shares the connection - post_data = json.dumps(data).encode() if data is not None else None - method = method if method else ("POST" if post_data is not None else "GET") - - async with self.session.http.request(method, f"{self.session.session_url}{path}", data=post_data) as resp: - r = await resp.text() - logger.debug("webdriver %s %s %r → %r", method, path, post_data, r) - return json.loads(r) - def arm_page_load(self): assert self.future_wait_page_load is None, "already waiting for page load" self.future_wait_page_load = asyncio.get_event_loop().create_future() @@ -178,8 +130,6 @@ class WebdriverBidi: return await self.future_wait_page_load async def run(self): - await self.ensure_session() - # wait for browser to initialize default context for _ in range(10): realms = (await self.bidi("script.getRealms"))["realms"] @@ -202,9 +152,6 @@ class WebdriverBidi: locator={"type": "css", "value": "#menu-content"}))["nodes"] assert len(r) == 1 - # this doensn't yet have a BiDi command - # r = await self.webdriver(f"/element/{r[0]['sharedId']}/text") - # ... but we don't need it, our CDP driver does this too: r = await self.bidi("script.evaluate", expression="document.querySelector('#menu-content').textContent", awaitPromise=False, target={"context": context}) assert 'Addicted to Free Software Development' in r['result']['value'] @@ -216,10 +163,7 @@ class WebdriverBidi: self.arm_page_load() - # click it: high-level webdriver command: - # await self.webdriver(f"/element/{r[0]['sharedId']}/click", {}) - - # click it: low-level BiDi command: + # click it await self.bidi("input.performActions", context=context, actions=[ { "id": "pointer-0", @@ -236,37 +180,150 @@ class WebdriverBidi: url = await self.wait_page_load() assert url == "https://github.com/martinpitt/" - if not self.headless: - await asyncio.sleep(3) - - async def __aenter__(self): - return self - - async def __aexit__(self, *_excinfo): - if self.session is not None: - logger.debug("cleaning up webdriver") - - self.session.task_reader.cancel() - del self.session.task_reader - - await self.webdriver("", method="DELETE") - await self.session.ws.close() - await self.session.http.close() - - self.session = None - logger.info("Collected debug messages:") for log in self.logs: logger.info(log) + if not self.headless: + await asyncio.sleep(3) + + +class ChromiumBidi(WebdriverBidi): + async def start_session(self) -> None: + assert self.session is None + + # TODO: make dynamic + webdriver_port = 12345 + + chrome_binary = "/usr/lib64/chromium-browser/headless_shell" if self.headless else "/usr/bin/chromium-browser" + + session_args = {"capabilities": { + "alwaysMatch": { + "webSocketUrl": True, + "goog:chromeOptions": {"binary": chrome_binary}, + } + }} + + self.driver = await asyncio.create_subprocess_exec("chromedriver", "--port=" + str(webdriver_port)) + + aiohttp_session = aiohttp.ClientSession(raise_for_status=True) + wd_url = f"http://localhost:{webdriver_port}" + + # webdriver needs some time to launch + for retry in range(1, 10): + try: + async with aiohttp_session.post(f"{wd_url}/session", + data=json.dumps(session_args).encode()) as resp: + session_info = json.loads(await resp.text())["value"] + logger.debug("webdriver session request: %r %r", resp, session_info) + break + except (IOError, aiohttp.client.ClientResponseError) as e: + logger.debug("waiting for webdriver: %s", e) + await asyncio.sleep(0.1 * retry) + else: + raise WebdriverError("could not connect to chromedriver") + + ws = await aiohttp_session.ws_connect(session_info["capabilities"]["webSocketUrl"]) + + self.session = Session( + http=aiohttp_session, + ws=ws, + session_url=f"{wd_url}/session/{session_info['sessionId']}", + task_reader=asyncio.create_task(self.ws_reader(ws), name="bidi_reader") + ) + + logger.debug("Established chromium session %r", self.session) + + async def close_session(self): + await self.session.http.delete(self.session.session_url) + self.driver.terminate() + await self.driver.wait() + + +# We could do this with https://github.com/mozilla/geckodriver/releases with a similar protocol as ChromeBidi +# But let's use https://firefox-source-docs.mozilla.org/testing/marionette/Protocol.html directly, fewer moving parts +class FirefoxBidi(WebdriverBidi): + async def start_session(self) -> None: + # TODO: make dynamic + marionette_port = 12345 + bidi_port = 12346 + + self.homedir = tempfile.TemporaryDirectory(prefix="firefox-home-") + (Path(self.homedir.name) / 'download').mkdir() + self.profiledir = Path(self.homedir.name) / "profile" + self.profiledir.mkdir() + (self.profiledir / "user.js").write_text(f""" + user_pref("remote.enabled", true); + user_pref("remote.frames.enabled", true); + user_pref("app.update.auto", false); + user_pref("datareporting.policy.dataSubmissionEnabled", false); + user_pref("toolkit.telemetry.reportingpolicy.firstRun", false); + user_pref("dom.disable_beforeunload", true); + user_pref("browser.download.dir", "{self.homedir}/download"); + user_pref("browser.download.folderList", 2); + user_pref("signon.rememberSignons", false); + user_pref("dom.navigation.locationChangeRateLimit.count", 9999); + // HACK: https://bugzilla.mozilla.org/show_bug.cgi?id=1746154 + user_pref("fission.webContentIsolationStrategy", 0); + user_pref("fission.bfcacheInParent", false); + user_pref('marionette.port', {marionette_port}); + """) + + self.driver = await asyncio.create_subprocess_exec( + "firefox", "-profile", str(self.profiledir), "--marionette", "--no-remote", + f"--remote-debugging-port={bidi_port}", + *(["-headless"] if self.headless else []), "about:blank") + + # needs some time to launch + for _ in range(1, 30): + try: + # we must keep this socket open throughout the lifetime of that session + reader, self.writer_marionette = await asyncio.open_connection("127.0.0.1", marionette_port) + break + except ConnectionRefusedError as e: + logger.debug("waiting for firefox marionette: %s", e) + await asyncio.sleep(1) + else: + raise WebdriverError("could not connect to firefox marionette") + + reply = await reader.read(1024) + if b'"marionetteProtocol":3' not in reply: + raise WebdriverError(f"unexpected marionette reply: {reply.decode()}") + cmd = '[0,1,"WebDriver:NewSession",{"webSocketUrl":true}]' + self.writer_marionette.write(f"{len(cmd)}:{cmd}".encode()) + await self.writer_marionette.drain() + reply = await reader.read(1024) + # cut off length prefix + reply = json.loads(reply[reply.index(b":") + 1:].decode()) + if not isinstance(reply, list) or len(reply) != 4 or not isinstance(reply[3], dict): + raise WebdriverError(f"unexpected marionette session request reply: {reply!r}") + logger.debug("marionette session request reply: %s", reply) + + aiohttp_session = aiohttp.ClientSession(raise_for_status=True) + ws_url = reply[3]["capabilities"]["webSocketUrl"] + ws = await aiohttp_session.ws_connect(ws_url) + + self.session = Session( + http=aiohttp_session, + ws=ws, + session_url=ws_url, + task_reader=asyncio.create_task(self.ws_reader(ws), name="bidi_reader") + ) + + logger.debug("Established firefox session %r", self.session) + + async def close_session(self): + self.writer_marionette.close() + await self.writer_marionette.wait_closed() self.driver.terminate() self.driver.wait() async def main(): logging.basicConfig(level=logging.DEBUG) - async with WebdriverBidi(sys.argv[1] if len(sys.argv) > 1 else 'chromium', - headless=True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False) as d: + cls = FirefoxBidi if len(sys.argv) > 1 and sys.argv[1] == "firefox" else ChromiumBidi + async with cls(headless=True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False) as d: await d.run() + asyncio.run(main())