frame tracking and cockpit testing
Hint from https://github.com/w3c/webdriver-bidi/issues/418
This commit is contained in:
parent
5b31add62f
commit
6cae2c652a
1 changed files with 124 additions and 32 deletions
156
bidi.py
156
bidi.py
|
|
@ -12,7 +12,8 @@ from typing import Any
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
log_proto = logging.getLogger("bidi.proto")
|
||||||
|
log_command = logging.getLogger("bidi.command")
|
||||||
|
|
||||||
|
|
||||||
class WebdriverError(RuntimeError):
|
class WebdriverError(RuntimeError):
|
||||||
|
|
@ -75,7 +76,8 @@ class WebdriverBidi:
|
||||||
self.logs: list[LogMessage] = []
|
self.logs: list[LogMessage] = []
|
||||||
self.bidi_session: BidiSession | None = None
|
self.bidi_session: BidiSession | None = None
|
||||||
self.future_wait_page_load = None
|
self.future_wait_page_load = None
|
||||||
self.context: str | None = None # default browsingContext
|
self.top_context: str | None = None # top-level browsingContext
|
||||||
|
self.context: str | None # currently selected context (top or iframe)
|
||||||
|
|
||||||
async def start_bidi_session(self) -> None:
|
async def start_bidi_session(self) -> None:
|
||||||
raise NotImplementedError('must be implemented by concrete subclass')
|
raise NotImplementedError('must be implemented by concrete subclass')
|
||||||
|
|
@ -85,7 +87,7 @@ class WebdriverBidi:
|
||||||
|
|
||||||
async def close(self):
|
async def close(self):
|
||||||
assert self.bidi_session is not None
|
assert self.bidi_session is not None
|
||||||
logger.debug("cleaning up webdriver")
|
log_proto.debug("cleaning up webdriver")
|
||||||
|
|
||||||
self.task_reader.cancel()
|
self.task_reader.cancel()
|
||||||
del self.task_reader
|
del self.task_reader
|
||||||
|
|
@ -111,13 +113,15 @@ class WebdriverBidi:
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
realms = (await self.bidi("script.getRealms"))["realms"]
|
realms = (await self.bidi("script.getRealms"))["realms"]
|
||||||
if len(realms) > 0:
|
if len(realms) > 0:
|
||||||
self.context = realms[0]["context"]
|
self.top_context = realms[0]["context"]
|
||||||
|
self.context = self.top_context
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
raise WebdriverError("timed out waiting for default realm")
|
raise WebdriverError("timed out waiting for default realm")
|
||||||
|
|
||||||
# avoid not seeing elements due to too small window
|
# avoid not seeing elements due to too small window
|
||||||
await self.bidi("browsingContext.setViewport", context=self.context, viewport={"width": 1024, "height": 5000})
|
# await self.bidi("browsingContext.setViewport", context=self.top_context,
|
||||||
|
# viewport={"width": 1024, "height": 5000})
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
await self.start_session()
|
await self.start_session()
|
||||||
|
|
@ -131,9 +135,9 @@ class WebdriverBidi:
|
||||||
async for msg in ws:
|
async for msg in ws:
|
||||||
if msg.type == aiohttp.WSMsgType.TEXT:
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
||||||
data = json.loads(msg.data)
|
data = json.loads(msg.data)
|
||||||
logger.debug("ws TEXT → %r", data)
|
log_proto.debug("ws TEXT → %r", data)
|
||||||
if "id" in data and data["id"] in self.pending_commands:
|
if "id" in data and data["id"] in self.pending_commands:
|
||||||
logger.debug("ws_reader: resolving pending command %i", data["id"])
|
log_proto.debug("ws_reader: resolving pending command %i", data["id"])
|
||||||
if data["type"] == "success":
|
if data["type"] == "success":
|
||||||
self.pending_commands[data["id"]].set_result(data["result"])
|
self.pending_commands[data["id"]].set_result(data["result"])
|
||||||
else:
|
else:
|
||||||
|
|
@ -144,31 +148,41 @@ class WebdriverBidi:
|
||||||
|
|
||||||
if data["type"] == "event":
|
if data["type"] == "event":
|
||||||
if data["method"] == "log.entryAdded":
|
if data["method"] == "log.entryAdded":
|
||||||
self.logs.append(LogMessage(data["params"]))
|
log = LogMessage(data["params"])
|
||||||
|
self.logs.append(log)
|
||||||
|
log_command.info(str(log))
|
||||||
continue
|
continue
|
||||||
if data["method"] == "browsingContext.domContentLoaded":
|
if data["method"] == "browsingContext.domContentLoaded":
|
||||||
if self.future_wait_page_load:
|
if self.future_wait_page_load:
|
||||||
logger.debug("page loaded: %r, resolving wait page load future", data["params"])
|
log_command.debug("page loaded: %r, resolving wait page load future", data["params"])
|
||||||
self.future_wait_page_load.set_result(data["params"]["url"])
|
self.future_wait_page_load.set_result(data["params"]["url"])
|
||||||
else:
|
else:
|
||||||
logger.debug("page loaded: %r (not awaited)", data["params"])
|
log_command.debug("page loaded: %r (not awaited)", data["params"])
|
||||||
continue
|
continue
|
||||||
|
# if data["method"] == "script.realmCreated":
|
||||||
|
# realms = await self.bidi("script.getRealms")
|
||||||
|
# log_command.warning("XXX script.realmCreated new: %r", realms)
|
||||||
|
|
||||||
logger.warning("ws_reader: unhandled message %r", data)
|
log_proto.warning("ws_reader: unhandled message %r", data)
|
||||||
elif msg.type == aiohttp.WSMsgType.ERROR:
|
elif msg.type == aiohttp.WSMsgType.ERROR:
|
||||||
logger.error("BiDi failure: %s", msg)
|
log_proto.error("BiDi failure: %s", msg)
|
||||||
break
|
break
|
||||||
|
|
||||||
async def bidi(self, method, **params) -> dict[str, Any]:
|
async def bidi(self, method, **params) -> dict[str, Any]:
|
||||||
"""Send a Webdriver BiDi command and return the JSON response"""
|
"""Send a Webdriver BiDi command and return the JSON response"""
|
||||||
|
|
||||||
payload = json.dumps({"id": self.last_id, "method": method, "params": params})
|
payload = json.dumps({"id": self.last_id, "method": method, "params": params})
|
||||||
logger.debug("ws ← %r", payload)
|
log_proto.debug("ws ← %r", payload)
|
||||||
await self.ws.send_str(payload)
|
await self.ws.send_str(payload)
|
||||||
future = asyncio.get_event_loop().create_future()
|
future = asyncio.get_event_loop().create_future()
|
||||||
self.pending_commands[self.last_id] = future
|
self.pending_commands[self.last_id] = future
|
||||||
self.last_id += 1
|
self.last_id += 1
|
||||||
return await future
|
# we really expect this to be fast, otherwise the browser crashed; in particular, TIMEOUT is too long
|
||||||
|
return await asyncio.wait_for(future, timeout=5)
|
||||||
|
|
||||||
|
#
|
||||||
|
# BiDi state tracking
|
||||||
|
#
|
||||||
|
|
||||||
def arm_page_load(self):
|
def arm_page_load(self):
|
||||||
assert self.future_wait_page_load is None, "already waiting for page load"
|
assert self.future_wait_page_load is None, "already waiting for page load"
|
||||||
|
|
@ -183,6 +197,20 @@ class WebdriverBidi:
|
||||||
except asyncio.TimeoutError as e:
|
except asyncio.TimeoutError as e:
|
||||||
raise ValueError("timed out waiting for page load") from e
|
raise ValueError("timed out waiting for page load") from e
|
||||||
|
|
||||||
|
async def switch_to_frame(self, name: str) -> None:
|
||||||
|
frame = await self.locate(f"iframe[name='{name}']")
|
||||||
|
cw = await self.bidi("script.callFunction",
|
||||||
|
functionDeclaration="f => f.contentWindow",
|
||||||
|
arguments=[frame],
|
||||||
|
awaitPromise=False,
|
||||||
|
target={"context": self.top_context})
|
||||||
|
self.context = cw["result"]["value"]["context"]
|
||||||
|
log_command.debug("switch_to_frame(%s)", name)
|
||||||
|
|
||||||
|
def switch_to_top(self) -> None:
|
||||||
|
self.context = self.top_context
|
||||||
|
log_command.debug("switch_to_top")
|
||||||
|
|
||||||
#
|
#
|
||||||
# High-level helpers
|
# High-level helpers
|
||||||
#
|
#
|
||||||
|
|
@ -195,8 +223,24 @@ class WebdriverBidi:
|
||||||
raise ValueError(f"no element found for {selector}")
|
raise ValueError(f"no element found for {selector}")
|
||||||
if len(nodes) > 1:
|
if len(nodes) > 1:
|
||||||
raise ValueError(f"selector {selector} is ambiguous: {nodes}")
|
raise ValueError(f"selector {selector} is ambiguous: {nodes}")
|
||||||
|
log_command.debug("locate(%s) = %r", selector, nodes[0])
|
||||||
return nodes[0]
|
return nodes[0]
|
||||||
|
|
||||||
|
async def wait(self, selector: str, timeout: int = TIMEOUT) -> None:
|
||||||
|
log_command.debug("wait(%s)", selector)
|
||||||
|
# FIXME: this is very inefficient; use our JS page helper with promise await
|
||||||
|
last_error = None
|
||||||
|
for _ in range(timeout * 10):
|
||||||
|
try:
|
||||||
|
n = await self.locate(selector)
|
||||||
|
log_command.debug("wait(%s) success: %r", selector, n)
|
||||||
|
return
|
||||||
|
except (WebdriverError, ValueError, TimeoutError) as e:
|
||||||
|
last_error = e
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"timed out waiting for {selector}: {last_error}")
|
||||||
|
|
||||||
async def text(self, selector: str) -> str:
|
async def text(self, selector: str) -> str:
|
||||||
# there is no BiDi way of evaluating a script on a particular element or getting
|
# there is no BiDi way of evaluating a script on a particular element or getting
|
||||||
# the text of an element, so just use the standard `.textContent` web platform property
|
# the text of an element, so just use the standard `.textContent` web platform property
|
||||||
|
|
@ -223,6 +267,32 @@ class WebdriverBidi:
|
||||||
}
|
}
|
||||||
])
|
])
|
||||||
|
|
||||||
|
async def key(self, value: str) -> None:
|
||||||
|
await self.bidi("input.performActions", context=self.context, actions=[{
|
||||||
|
"type": "key", "id": "key-0", "actions": [
|
||||||
|
{"type": "keyDown", "value": value},
|
||||||
|
{"type": "keyUp", "value": value},
|
||||||
|
]}])
|
||||||
|
|
||||||
|
async def input_text(self, text: str) -> None:
|
||||||
|
actions = []
|
||||||
|
for c in text:
|
||||||
|
actions.append({"type": "keyDown", "value": c})
|
||||||
|
actions.append({"type": "keyUp", "value": c})
|
||||||
|
await self.bidi("input.performActions", context=self.context, actions=[
|
||||||
|
{"type": "key", "id": "key-0", "actions": actions}])
|
||||||
|
|
||||||
|
async def focus(self, selector: str) -> None:
|
||||||
|
await self.locate(selector)
|
||||||
|
await self.bidi("script.evaluate", expression=f"document.querySelector('{selector}').focus()",
|
||||||
|
awaitPromise=False, target={"context": self.context})
|
||||||
|
|
||||||
|
async def set_input_text(self, selector: str, text: str) -> None:
|
||||||
|
await self.focus(selector)
|
||||||
|
await self.input_text(text)
|
||||||
|
# TODO: wait for text
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
|
||||||
class ChromiumBidi(WebdriverBidi):
|
class ChromiumBidi(WebdriverBidi):
|
||||||
async def start_bidi_session(self) -> None:
|
async def start_bidi_session(self) -> None:
|
||||||
|
|
@ -248,10 +318,10 @@ class ChromiumBidi(WebdriverBidi):
|
||||||
async with self.http_session.post(f"{wd_url}/session",
|
async with self.http_session.post(f"{wd_url}/session",
|
||||||
data=json.dumps(session_args).encode()) as resp:
|
data=json.dumps(session_args).encode()) as resp:
|
||||||
session_info = json.loads(await resp.text())["value"]
|
session_info = json.loads(await resp.text())["value"]
|
||||||
logger.debug("webdriver session request: %r %r", resp, session_info)
|
log_proto.debug("webdriver session request: %r %r", resp, session_info)
|
||||||
break
|
break
|
||||||
except (IOError, aiohttp.client.ClientResponseError) as e:
|
except (IOError, aiohttp.client.ClientResponseError) as e:
|
||||||
logger.debug("waiting for webdriver: %s", e)
|
log_proto.debug("waiting for webdriver: %s", e)
|
||||||
await asyncio.sleep(0.1 * retry)
|
await asyncio.sleep(0.1 * retry)
|
||||||
else:
|
else:
|
||||||
raise WebdriverError("could not connect to chromedriver")
|
raise WebdriverError("could not connect to chromedriver")
|
||||||
|
|
@ -260,7 +330,7 @@ class ChromiumBidi(WebdriverBidi):
|
||||||
session_url=f"{wd_url}/session/{session_info['sessionId']}",
|
session_url=f"{wd_url}/session/{session_info['sessionId']}",
|
||||||
ws_url=session_info["capabilities"]["webSocketUrl"],
|
ws_url=session_info["capabilities"]["webSocketUrl"],
|
||||||
process=driver)
|
process=driver)
|
||||||
logger.debug("Established chromium session %r", self.bidi_session)
|
log_proto.debug("Established chromium session %r", self.bidi_session)
|
||||||
|
|
||||||
async def close_bidi_session(self):
|
async def close_bidi_session(self):
|
||||||
await self.http_session.delete(self.bidi_session.session_url)
|
await self.http_session.delete(self.bidi_session.session_url)
|
||||||
|
|
@ -305,7 +375,7 @@ class FirefoxBidi(WebdriverBidi):
|
||||||
reader, self.writer_marionette = await asyncio.open_connection("127.0.0.1", marionette_port)
|
reader, self.writer_marionette = await asyncio.open_connection("127.0.0.1", marionette_port)
|
||||||
break
|
break
|
||||||
except ConnectionRefusedError as e:
|
except ConnectionRefusedError as e:
|
||||||
logger.debug("waiting for firefox marionette: %s", e)
|
log_proto.debug("waiting for firefox marionette: %s", e)
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
else:
|
else:
|
||||||
raise WebdriverError("could not connect to firefox marionette")
|
raise WebdriverError("could not connect to firefox marionette")
|
||||||
|
|
@ -321,11 +391,11 @@ class FirefoxBidi(WebdriverBidi):
|
||||||
reply = json.loads(reply[reply.index(b":") + 1:].decode())
|
reply = json.loads(reply[reply.index(b":") + 1:].decode())
|
||||||
if not isinstance(reply, list) or len(reply) != 4 or not isinstance(reply[3], dict):
|
if not isinstance(reply, list) or len(reply) != 4 or not isinstance(reply[3], dict):
|
||||||
raise WebdriverError(f"unexpected marionette session request reply: {reply!r}")
|
raise WebdriverError(f"unexpected marionette session request reply: {reply!r}")
|
||||||
logger.debug("marionette session request reply: %s", reply)
|
log_proto.debug("marionette session request reply: %s", reply)
|
||||||
|
|
||||||
url = reply[3]["capabilities"]["webSocketUrl"]
|
url = reply[3]["capabilities"]["webSocketUrl"]
|
||||||
self.bidi_session = BidiSession(session_url=url, ws_url=url, process=driver)
|
self.bidi_session = BidiSession(session_url=url, ws_url=url, process=driver)
|
||||||
logger.debug("Established firefox session %r", self.bidi_session)
|
log_proto.debug("Established firefox session %r", self.bidi_session)
|
||||||
|
|
||||||
async def close_bidi_session(self):
|
async def close_bidi_session(self):
|
||||||
self.writer_marionette.close()
|
self.writer_marionette.close()
|
||||||
|
|
@ -334,6 +404,8 @@ class FirefoxBidi(WebdriverBidi):
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
logging.basicConfig(level=logging.DEBUG)
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
|
log_proto.setLevel(logging.DEBUG)
|
||||||
|
log_command.setLevel(logging.DEBUG)
|
||||||
|
|
||||||
headless = True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False
|
headless = True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False
|
||||||
cls = FirefoxBidi if len(sys.argv) > 1 and sys.argv[1] == "firefox" else ChromiumBidi
|
cls = FirefoxBidi if len(sys.argv) > 1 and sys.argv[1] == "firefox" else ChromiumBidi
|
||||||
|
|
@ -342,22 +414,42 @@ async def main():
|
||||||
await d.bidi("script.evaluate", expression="console.log('Hello BiDi')",
|
await d.bidi("script.evaluate", expression="console.log('Hello BiDi')",
|
||||||
awaitPromise=False, target={"context": d.context})
|
awaitPromise=False, target={"context": d.context})
|
||||||
await d.bidi("browsingContext.navigate", context=d.context,
|
await d.bidi("browsingContext.navigate", context=d.context,
|
||||||
url="https://piware.de", wait="complete")
|
url="http://127.0.0.2:9091", wait="complete")
|
||||||
|
|
||||||
assert 'Addicted to Free Software Development' in await d.text("#menu-content")
|
print("\n\nSTEP: logging in")
|
||||||
|
await d.set_input_text("#login-user-input", "admin")
|
||||||
|
await d.set_input_text("#login-password-input", "foobar")
|
||||||
|
|
||||||
# click first social link
|
# d.arm_page_load()
|
||||||
d.arm_page_load()
|
# await d.key("Enter") # FIXME: this doesn't work: Neither with Return nor \n
|
||||||
await d.mouse("a[rel='me']:first-child")
|
await d.mouse("#login-button")
|
||||||
url = await d.wait_page_load()
|
|
||||||
assert url == "https://github.com/martinpitt/", url
|
|
||||||
|
|
||||||
logger.info("Collected debug messages:")
|
# this is optional: wait() can wait across page loads
|
||||||
|
# print("\n\nSTEP: waiting for page load")
|
||||||
|
# await d.wait_page_load()
|
||||||
|
|
||||||
|
print("\n\nSTEP: super-user-indicator")
|
||||||
|
await d.wait("#super-user-indicator")
|
||||||
|
# FIXME: wait for text helper
|
||||||
|
for _ in range(5):
|
||||||
|
t = await d.text("#super-user-indicator")
|
||||||
|
if t == "Limited access":
|
||||||
|
break
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
else:
|
||||||
|
raise ValueError("timed out waiting for #super-user-indicator text")
|
||||||
|
|
||||||
|
print("\n\nSTEP: wait/switch frame")
|
||||||
|
await d.switch_to_frame('cockpit1:localhost/system')
|
||||||
|
|
||||||
|
print("\n\nSTEP: inspect system frame")
|
||||||
|
await d.wait(".system-configuration")
|
||||||
|
t = await d.text(".system-configuration")
|
||||||
|
assert "Join domain" in t, t
|
||||||
|
|
||||||
|
log_command.info("Collected debug messages:")
|
||||||
for log in d.logs:
|
for log in d.logs:
|
||||||
logger.info(log)
|
log_command.info(log)
|
||||||
|
|
||||||
if not headless:
|
|
||||||
await asyncio.sleep(3)
|
|
||||||
|
|
||||||
|
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue