Drop obsolete bidi.py async code
This better lives in the sync Browser class.
This commit is contained in:
parent
a68e5a0831
commit
5d89be7665
1 changed files with 11 additions and 162 deletions
173
bidi.py
173
bidi.py
|
|
@ -1,11 +1,9 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
|
@ -208,22 +206,17 @@ class WebdriverBidi:
|
|||
# safe-guard timeout for avoiding eternally hanging tests
|
||||
return await asyncio.wait_for(future, timeout=60)
|
||||
|
||||
#
|
||||
# BiDi state tracking
|
||||
#
|
||||
|
||||
def arm_page_load(self):
|
||||
assert self.future_wait_page_load is None, "already waiting for page load"
|
||||
self.future_wait_page_load = asyncio.get_event_loop().create_future()
|
||||
|
||||
async def wait_page_load(self, timeout: int = TIMEOUT) -> str:
|
||||
assert self.future_wait_page_load is not None, "call arm_page_load() first"
|
||||
try:
|
||||
url = await asyncio.wait_for(self.future_wait_page_load, timeout=timeout)
|
||||
self.future_wait_page_load = None
|
||||
return url
|
||||
except asyncio.TimeoutError as e:
|
||||
raise Error("timed out waiting for page load") from e
|
||||
# this is mostly unused; testlib uses ph_find() due to sizzle
|
||||
async def locate(self, selector: str) -> str:
|
||||
r = await self.bidi("browsingContext.locateNodes", context=self.context,
|
||||
locator={"type": "css", "value": selector})
|
||||
nodes = r["nodes"]
|
||||
if len(nodes) == 0:
|
||||
raise Error(f"no element found for {selector}")
|
||||
if len(nodes) > 1:
|
||||
raise Error(f"selector {selector} is ambiguous: {nodes}")
|
||||
log_command.debug("locate(%s) = %r", selector, nodes[0])
|
||||
return nodes[0]
|
||||
|
||||
async def switch_to_frame(self, name: str) -> None:
|
||||
frame = await self.locate(f"iframe[name='{name}']")
|
||||
|
|
@ -239,99 +232,6 @@ class WebdriverBidi:
|
|||
self.context = self.top_context
|
||||
log_command.debug("switch_to_top")
|
||||
|
||||
#
|
||||
# High-level helpers
|
||||
#
|
||||
|
||||
async def locate(self, selector: str) -> str:
|
||||
r = await self.bidi("browsingContext.locateNodes", context=self.context,
|
||||
locator={"type": "css", "value": selector})
|
||||
nodes = r["nodes"]
|
||||
if len(nodes) == 0:
|
||||
raise Error(f"no element found for {selector}")
|
||||
if len(nodes) > 1:
|
||||
raise Error(f"selector {selector} is ambiguous: {nodes}")
|
||||
log_command.debug("locate(%s) = %r", selector, nodes[0])
|
||||
return nodes[0]
|
||||
|
||||
async def wait(self, selector: str, timeout: int = TIMEOUT) -> None:
|
||||
log_command.debug("wait(%s)", selector)
|
||||
# FIXME: this is very inefficient; use our JS page helper with promise await
|
||||
last_error = None
|
||||
for _ in range(timeout * 10):
|
||||
try:
|
||||
n = await self.locate(selector)
|
||||
log_command.debug("wait(%s) success: %r", selector, n)
|
||||
return
|
||||
except (WebdriverError, Error, TimeoutError) as e:
|
||||
last_error = e
|
||||
await asyncio.sleep(0.1)
|
||||
else:
|
||||
raise Error(f"timed out waiting for {selector}: {last_error}")
|
||||
|
||||
async def wait_js_cond(self, cond: str, error_description: str = "null") -> None:
|
||||
log_command.debug("wait_js_cond(%s)", cond)
|
||||
await self.bidi("script.evaluate",
|
||||
expression=f"window.ph_wait_cond(() => {cond}, {TIMEOUT * 1000}, {error_description})",
|
||||
awaitPromise=True, target={"context": self.context})
|
||||
|
||||
async def text(self, selector: str) -> str:
|
||||
# there is no BiDi way of evaluating a script on a particular element or getting
|
||||
# the text of an element, so just use the standard `.textContent` web platform property
|
||||
# but first make sure the locator is unique
|
||||
await self.locate(selector)
|
||||
r = await self.bidi("script.evaluate", expression=f"document.querySelector('{selector}').textContent",
|
||||
awaitPromise=False, target={"context": self.context})
|
||||
return r['result']['value']
|
||||
|
||||
async def wait_text(self, selector: str, text: str) -> None:
|
||||
await self.wait(selector)
|
||||
await self.wait_js_cond(f"window.ph_text({jsquote(selector)}) === {jsquote(text)}",
|
||||
error_description=f"() => 'actual text: ' + window.ph_text({jsquote(selector)})")
|
||||
|
||||
async def mouse(self, selector: str, button: int = 0, click_count: int = 1) -> None:
|
||||
element = await self.locate(selector)
|
||||
|
||||
actions = [{"type": "pointerMove", "x": 0, "y": 0, "origin": {"type": "element", "element": element}}]
|
||||
for _ in range(click_count):
|
||||
actions.append({"type": "pointerDown", "button": button})
|
||||
actions.append({"type": "pointerUp", "button": button})
|
||||
|
||||
await self.bidi("input.performActions", context=self.context, actions=[
|
||||
{
|
||||
"id": "pointer-0",
|
||||
"type": "pointer",
|
||||
"parameters": {"pointerType": "mouse"},
|
||||
"actions": actions,
|
||||
}
|
||||
])
|
||||
|
||||
async def key(self, value: str) -> None:
|
||||
await self.bidi("input.performActions", context=self.context, actions=[{
|
||||
"type": "key", "id": "key-0", "actions": [
|
||||
{"type": "keyDown", "value": value},
|
||||
{"type": "keyUp", "value": value},
|
||||
]}])
|
||||
|
||||
async def input_text(self, text: str) -> None:
|
||||
actions = []
|
||||
for c in text:
|
||||
actions.append({"type": "keyDown", "value": c})
|
||||
actions.append({"type": "keyUp", "value": c})
|
||||
await self.bidi("input.performActions", context=self.context, actions=[
|
||||
{"type": "key", "id": "key-0", "actions": actions}])
|
||||
|
||||
async def focus(self, selector: str) -> None:
|
||||
await self.locate(selector)
|
||||
await self.bidi("script.evaluate", expression=f"document.querySelector('{selector}').focus()",
|
||||
awaitPromise=False, target={"context": self.context})
|
||||
|
||||
async def set_input_text(self, selector: str, text: str) -> None:
|
||||
await self.focus(selector)
|
||||
await self.input_text(text)
|
||||
# TODO: wait for text
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
|
||||
class ChromiumBidi(WebdriverBidi):
|
||||
async def start_bidi_session(self) -> None:
|
||||
|
|
@ -439,54 +339,3 @@ class FirefoxBidi(WebdriverBidi):
|
|||
async def close_bidi_session(self):
|
||||
self.writer_marionette.close()
|
||||
await self.writer_marionette.wait_closed()
|
||||
|
||||
|
||||
async def main():
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
log_proto.setLevel(logging.DEBUG)
|
||||
log_command.setLevel(logging.DEBUG)
|
||||
|
||||
headless = True if len(sys.argv) > 2 and sys.argv[2] == 'headless' else False
|
||||
cls = FirefoxBidi if len(sys.argv) > 1 and sys.argv[1] == "firefox" else ChromiumBidi
|
||||
|
||||
async with cls(headless=headless) as d:
|
||||
await d.bidi("script.evaluate", expression="console.log('Hello BiDi')",
|
||||
awaitPromise=False, target={"context": d.context})
|
||||
await d.bidi("browsingContext.navigate", context=d.context,
|
||||
url="http://127.0.0.2:9091", wait="complete")
|
||||
|
||||
print("\n\nSTEP: logging in")
|
||||
await d.set_input_text("#login-user-input", "admin")
|
||||
await d.set_input_text("#login-password-input", "foobar")
|
||||
|
||||
# d.arm_page_load()
|
||||
# await d.key("Enter") # FIXME: this doesn't work: Neither with Return nor \n
|
||||
await d.mouse("#login-button")
|
||||
|
||||
# this is optional: wait() can wait across page loads
|
||||
# print("\n\nSTEP: waiting for page load")
|
||||
# await d.wait_page_load()
|
||||
|
||||
print("\n\nSTEP: super-user-indicator")
|
||||
try:
|
||||
await d.wait_text("#super-user-indicator", "Limited access")
|
||||
except Error:
|
||||
s = await d.bidi("browsingContext.captureScreenshot", context=d.top_context, origin="document")
|
||||
Path("screenshot.png").write_bytes(base64.b64decode(s["data"]))
|
||||
raise
|
||||
|
||||
print("\n\nSTEP: wait/switch frame")
|
||||
await d.switch_to_frame('cockpit1:localhost/system')
|
||||
|
||||
print("\n\nSTEP: inspect system frame")
|
||||
await d.wait(".system-configuration")
|
||||
t = await d.text(".system-configuration")
|
||||
assert "Join domain" in t, t
|
||||
|
||||
log_command.info("Collected debug messages:")
|
||||
for log in d.logs:
|
||||
log_command.info(log)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue