Refactor run and page helpers
This commit is contained in:
parent
c1496f15a3
commit
4c18ab08b0
1 changed files with 67 additions and 49 deletions
116
bidi.py
116
bidi.py
|
|
@ -8,6 +8,7 @@ import sys
|
|||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
|
|
@ -70,6 +71,7 @@ class WebdriverBidi:
|
|||
self.logs: list[LogMessage] = []
|
||||
self.bidi_session: BidiSession | None = None
|
||||
self.future_wait_page_load = None
|
||||
self.context: str | None = None # default browsingContext
|
||||
|
||||
async def start_bidi_session(self) -> None:
|
||||
raise NotImplementedError('must be implemented by concrete subclass')
|
||||
|
|
@ -97,6 +99,19 @@ class WebdriverBidi:
|
|||
self.ws = await self.http_session.ws_connect(self.bidi_session.ws_url)
|
||||
self.task_reader = asyncio.create_task(self.ws_reader(self.ws), name="bidi_reader")
|
||||
|
||||
await self.bidi("session.subscribe", events=[
|
||||
"log.entryAdded", "browsingContext.domContentLoaded",
|
||||
])
|
||||
|
||||
# wait for browser to initialize default context
|
||||
for _ in range(10):
|
||||
realms = (await self.bidi("script.getRealms"))["realms"]
|
||||
if len(realms) > 0:
|
||||
self.context = realms[0]["context"]
|
||||
break
|
||||
else:
|
||||
raise WebdriverError("timed out waiting for default realm")
|
||||
|
||||
async def __aenter__(self):
|
||||
await self.start_session()
|
||||
return self
|
||||
|
|
@ -135,7 +150,7 @@ class WebdriverBidi:
|
|||
logger.error("BiDi failure: %s", msg)
|
||||
break
|
||||
|
||||
async def bidi(self, method, **params) -> asyncio.Future:
|
||||
async def bidi(self, method, **params) -> dict[str, Any]:
|
||||
"""Send a Webdriver BiDi command and return the JSON response"""
|
||||
|
||||
payload = json.dumps({"id": self.last_id, "method": method, "params": params})
|
||||
|
|
@ -154,64 +169,46 @@ class WebdriverBidi:
|
|||
assert self.future_wait_page_load is not None, "call arm_page_load() first"
|
||||
return await self.future_wait_page_load
|
||||
|
||||
async def run(self):
|
||||
# wait for browser to initialize default context
|
||||
for _ in range(10):
|
||||
realms = (await self.bidi("script.getRealms"))["realms"]
|
||||
if len(realms) > 0:
|
||||
context = realms[0]["context"]
|
||||
break
|
||||
else:
|
||||
raise WebdriverError("timed out waiting for default realm")
|
||||
#
|
||||
# High-level helpers
|
||||
#
|
||||
|
||||
await self.bidi("session.subscribe", events=[
|
||||
"log.entryAdded", "browsingContext.domContentLoaded",
|
||||
])
|
||||
async def locate(self, selector: str) -> str:
|
||||
r = await self.bidi("browsingContext.locateNodes", context=self.context,
|
||||
locator={"type": "css", "value": selector})
|
||||
nodes = r["nodes"]
|
||||
if len(nodes) == 0:
|
||||
raise ValueError(f"no element found for {selector}")
|
||||
if len(nodes) > 1:
|
||||
raise ValueError(f"selector {selector} is ambiguous: {nodes}")
|
||||
return nodes[0]
|
||||
|
||||
await self.bidi("script.evaluate", expression="console.log('Hello BiDi')",
|
||||
awaitPromise=False, target={"context": context})
|
||||
await self.bidi("browsingContext.navigate", context=context,
|
||||
url="https://piware.de", wait="complete")
|
||||
async def text(self, selector: str) -> str:
|
||||
# there is no BiDi way of evaluating a script on a particular element or getting
|
||||
# the text of an element, so just use the standard `.textContent` web platform property
|
||||
# but first make sure the locator is unique
|
||||
await self.locate(selector)
|
||||
r = await self.bidi("script.evaluate", expression=f"document.querySelector('{selector}').textContent",
|
||||
awaitPromise=False, target={"context": self.context})
|
||||
return r['result']['value']
|
||||
|
||||
r = (await self.bidi("browsingContext.locateNodes", context=context,
|
||||
locator={"type": "css", "value": "#menu-content"}))["nodes"]
|
||||
assert len(r) == 1
|
||||
async def mouse(self, selector: str, button: int = 0, click_count: int = 1) -> None:
|
||||
element = await self.locate(selector)
|
||||
|
||||
r = await self.bidi("script.evaluate", expression="document.querySelector('#menu-content').textContent",
|
||||
awaitPromise=False, target={"context": context})
|
||||
assert 'Addicted to Free Software Development' in r['result']['value']
|
||||
actions = [{"type": "pointerMove", "x": 0, "y": 0, "origin": {"type": "element", "element": element}}]
|
||||
for _ in range(click_count):
|
||||
actions.append({"type": "pointerDown", "button": button})
|
||||
actions.append({"type": "pointerUp", "button": button})
|
||||
|
||||
# locate first social link
|
||||
r = (await self.bidi("browsingContext.locateNodes", context=context,
|
||||
locator={"type": "css", "value": "a[rel='me']:first-child"}))["nodes"]
|
||||
assert len(r) == 1
|
||||
|
||||
self.arm_page_load()
|
||||
|
||||
# click it
|
||||
await self.bidi("input.performActions", context=context, actions=[
|
||||
await self.bidi("input.performActions", context=self.context, actions=[
|
||||
{
|
||||
"id": "pointer-0",
|
||||
"type": "pointer",
|
||||
"parameters": {"pointerType": "mouse"},
|
||||
"actions": [
|
||||
{"type": "pointerMove", "x": 0, "y": 0, "origin": {"type": "element", "element": r[0]}},
|
||||
{"type": "pointerDown", "button": 0},
|
||||
{"type": "pointerUp", "button": 0},
|
||||
],
|
||||
"actions": actions,
|
||||
}
|
||||
])
|
||||
|
||||
url = await self.wait_page_load()
|
||||
assert url == "https://github.com/martinpitt/"
|
||||
|
||||
logger.info("Collected debug messages:")
|
||||
for log in self.logs:
|
||||
logger.info(log)
|
||||
|
||||
if not self.headless:
|
||||
await asyncio.sleep(3)
|
||||
|
||||
|
||||
class ChromiumBidi(WebdriverBidi):
|
||||
async def start_bidi_session(self) -> None:
|
||||
|
|
@ -323,9 +320,30 @@ class FirefoxBidi(WebdriverBidi):
|
|||
|
||||
async def main():
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
headless = True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False
|
||||
cls = FirefoxBidi if len(sys.argv) > 1 and sys.argv[1] == "firefox" else ChromiumBidi
|
||||
async with cls(headless=True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False) as d:
|
||||
await d.run()
|
||||
|
||||
async with cls(headless=headless) as d:
|
||||
await d.bidi("script.evaluate", expression="console.log('Hello BiDi')",
|
||||
awaitPromise=False, target={"context": d.context})
|
||||
await d.bidi("browsingContext.navigate", context=d.context,
|
||||
url="https://piware.de", wait="complete")
|
||||
|
||||
assert 'Addicted to Free Software Development' in await d.text("#menu-content")
|
||||
|
||||
# click first social link
|
||||
d.arm_page_load()
|
||||
await d.mouse("a[rel='me']:first-child")
|
||||
url = await d.wait_page_load()
|
||||
assert url == "https://github.com/martinpitt/", url
|
||||
|
||||
logger.info("Collected debug messages:")
|
||||
for log in d.logs:
|
||||
logger.info(log)
|
||||
|
||||
if not headless:
|
||||
await asyncio.sleep(3)
|
||||
|
||||
|
||||
asyncio.run(main())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue