Skip to content

Commit

Permalink
Merge pull request #1346 from doronz88/refactor/webinspector-applicat…
Browse files Browse the repository at this point in the history
…ion-pages

webinspector: Allow to select debugging from specific app
  • Loading branch information
doronz88 authored Feb 3, 2025
2 parents cc5ae7d + 84570b0 commit 76bfad4
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 24 deletions.
47 changes: 28 additions & 19 deletions pymobiledevice3/cli/webinspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@
from pymobiledevice3.exceptions import InspectorEvaluateError, LaunchingApplicationError, \
RemoteAutomationNotEnabledError, WebInspectorNotEnabledError, WirError
from pymobiledevice3.lockdown import LockdownClient, create_using_usbmux
from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider
from pymobiledevice3.osu.os_utils import get_os_utils
from pymobiledevice3.services.web_protocol.cdp_server import app
from pymobiledevice3.services.web_protocol.driver import By, Cookie, WebDriver
from pymobiledevice3.services.web_protocol.inspector_session import InspectorSession
from pymobiledevice3.services.webinspector import SAFARI, Page, WebinspectorService
from pymobiledevice3.services.webinspector import SAFARI, ApplicationPage, WebinspectorService

SCRIPT = '''
function inspectedPage_evalResult_getCompletions(primitiveType) {{
Expand Down Expand Up @@ -212,9 +213,11 @@ def shell(service_provider: LockdownClient, timeout):
@webinspector.command(cls=Command)
@click.option('-t', '--timeout', default=3, show_default=True, type=float)
@click.option('--automation', is_flag=True, help='Use remote automation')
@click.option('--no-open-safari', is_flag=True, help='Avoid opening the Safari app')
@click.argument('url', required=False, default='')
@catch_errors
def js_shell(service_provider: LockdownClient, timeout, automation, url):
def js_shell(service_provider: LockdownServiceProvider, timeout: float, automation: bool, no_open_safari: bool,
url: str) -> None:
"""
Create a javascript shell. This interpreter runs on your local machine,
but evaluates each expression on the remote
Expand All @@ -229,7 +232,7 @@ def js_shell(service_provider: LockdownClient, timeout, automation, url):
"""

js_shell_class = AutomationJsShell if automation else InspectorJsShell
asyncio.run(run_js_shell(js_shell_class, service_provider, timeout, url))
asyncio.run(run_js_shell(js_shell_class, service_provider, timeout, url, not no_open_safari))


udid = ''
Expand Down Expand Up @@ -297,7 +300,7 @@ def get_completions(


class JsShell(ABC):
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.prompt_session = PromptSession(lexer=PygmentsLexer(lexers.JavascriptLexer),
auto_suggest=AutoSuggestFromHistory(),
Expand All @@ -307,7 +310,7 @@ def __init__(self):

@classmethod
@abstractmethod
def create(cls, lockdown: LockdownClient, timeout: float, app: str):
def create(cls, lockdown: LockdownServiceProvider, timeout: float, open_safari: bool) -> None:
pass

@abstractmethod
Expand Down Expand Up @@ -357,8 +360,8 @@ def __init__(self, driver: WebDriver):

@classmethod
@asynccontextmanager
async def create(cls, lockdown: LockdownClient, timeout: float, app: str) -> 'AutomationJsShell':
inspector, application = create_webinspector_and_launch_app(lockdown, timeout, app)
async def create(cls, lockdown: LockdownClient, timeout: float, open_safari: bool) -> 'AutomationJsShell':
inspector, application = create_webinspector_and_launch_app(lockdown, timeout, SAFARI)
automation_session = inspector.automation_session(application)
driver = WebDriver(automation_session)
driver.start_session()
Expand All @@ -382,13 +385,16 @@ def __init__(self, inspector_session: InspectorSession):

@classmethod
@asynccontextmanager
async def create(cls, lockdown: LockdownClient, timeout: float, app: str) -> 'InspectorJsShell':
inspector, application = create_webinspector_and_launch_app(lockdown, timeout, app)
page = InspectorJsShell.query_page(inspector)
if page is None:
async def create(cls, lockdown: LockdownClient, timeout: float, open_safari: bool) -> 'InspectorJsShell':
inspector = WebinspectorService(lockdown=lockdown)
inspector.connect(timeout)
if open_safari:
_ = inspector.open_app(SAFARI)
application_page = cls.query_page(inspector, bundle_identifier=SAFARI if open_safari else None)
if application_page is None:
raise click.exceptions.Exit()

inspector_session = await inspector.inspector_session(application, page)
inspector_session = await inspector.inspector_session(application_page.application, application_page.page)
await inspector_session.console_enable()
await inspector_session.runtime_enable()

Expand All @@ -404,19 +410,22 @@ async def navigate(self, url: str):
await self.inspector_session.navigate_to_url(url)

@staticmethod
def query_page(inspector: WebinspectorService) -> Optional[Page]:
reload_pages(inspector)
available_pages = list(inspector.get_open_pages().get('Safari', []))
def query_page(inspector: WebinspectorService, bundle_identifier: Optional[str] = None) \
-> Optional[ApplicationPage]:
available_pages = inspector.get_open_application_pages(timeout=1)
if bundle_identifier is not None:
available_pages = [application_page for application_page in available_pages if
application_page.application.bundle == bundle_identifier]
if not available_pages:
logger.error('Unable to find available pages (try to unlock device)')
return
return None

page_query = [inquirer3.List('page', message='choose page', choices=available_pages, carousel=True)]
page = inquirer3.prompt(page_query, theme=GreenPassion(), raise_keyboard_interrupt=True)['page']
return page


async def run_js_shell(js_shell_class: type[JsShell], lockdown: LockdownClient,
timeout: float, url: str):
async with js_shell_class.create(lockdown, timeout, SAFARI) as js_shell_instance:
async def run_js_shell(js_shell_class: type[JsShell], lockdown: LockdownServiceProvider,
timeout: float, url: str, open_safari: bool) -> None:
async with js_shell_class.create(lockdown, timeout, open_safari) as js_shell_instance:
await js_shell_instance.start(url)
35 changes: 33 additions & 2 deletions pymobiledevice3/services/webinspector.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ def from_application_dictionary(cls, app_dict) -> 'Application':
)


@dataclass
class ApplicationPage:
application: Application
page: Page

def __str__(self) -> str:
return f'<{self.application.name}({self.application.pid}) TYPE:{self.page.type_.value} URL:{self.page.web_url}>'


class WebinspectorService:
SERVICE_NAME = 'com.apple.webinspector'
RSD_SERVICE_NAME = 'com.apple.webinspector.shim.remote'
Expand Down Expand Up @@ -185,10 +194,10 @@ def automation_session(self, app: Application) -> AutomationSession:
self.await_(asyncio.sleep(0))
return AutomationSession(SessionProtocol(self, session_id, app, page))

async def inspector_session(self, app: Application, page: Page, wait_target: bool = True) -> InspectorSession:
async def inspector_session(self, app: Application, page: Page) -> InspectorSession:
session_id = str(uuid.uuid4()).upper()
return await InspectorSession.create(SessionProtocol(self, session_id, app, page, method_prefix=''),
wait_target=wait_target)
wait_target=page.type_ != WirTypes.JAVASCRIPT)

def get_open_pages(self) -> dict:
apps = {}
Expand All @@ -198,6 +207,20 @@ def get_open_pages(self) -> dict:
apps[self.connected_application[app].name] = self.application_pages[app].values()
return apps

def get_open_application_pages(self, timeout: float) -> list[ApplicationPage]:
# Query all connected applications
self.await_(self._get_connected_applications())

# Give some time for `webinspectord` to reply with all inspectable applications
self.await_(asyncio.sleep(timeout))

result = []
for app in self.connected_application:
if self.application_pages.get(app, False):
for page in self.application_pages[app].values():
result.append(ApplicationPage(self.connected_application[app], page))
return result

def open_app(self, bundle: str, timeout: Union[float, int] = 3) -> Application:
self.await_(self._request_application_launch(bundle))
self.get_open_pages()
Expand Down Expand Up @@ -235,17 +258,22 @@ def _handle_report_connected_application_list(self, arg):
for key, application in arg['WIRApplicationDictionaryKey'].items():
self.connected_application[key] = Application.from_application_dictionary(application)

# Immediately also query the application pages
self.await_(self._forward_get_listing(application))

def _handle_report_connected_driver_list(self, arg):
pass

def _handle_application_sent_listing(self, arg):
if arg['WIRApplicationIdentifierKey'] in self.application_pages:
# Update existing application pages
for id_, page in arg['WIRListingKey'].items():
if id_ in self.application_pages[arg['WIRApplicationIdentifierKey']]:
self.application_pages[arg['WIRApplicationIdentifierKey']][id_].update(page)
else:
self.application_pages[arg['WIRApplicationIdentifierKey']][id_] = Page.from_page_dictionary(page)
else:
# Add new application pages
pages = {}
for id_, page in arg['WIRListingKey'].items():
pages[id_] = Page.from_page_dictionary(page)
Expand Down Expand Up @@ -281,6 +309,9 @@ async def _forward_get_listing(self, app_id):
async def _request_application_launch(self, bundle: str):
await self._send_message('_rpc_requestApplicationLaunch:', {'WIRApplicationBundleIdentifierKey': bundle})

async def _get_connected_applications(self) -> None:
await self._send_message('_rpc_getConnectedApplications:', {})

async def _forward_automation_session_request(self, session_id: str, app_id: str):
await self._send_message('_rpc_forwardAutomationSessionRequest:', {
'WIRApplicationIdentifierKey': app_id,
Expand Down
6 changes: 3 additions & 3 deletions tests/services/test_web_protocol/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

def test_back(webdriver):
webdriver.get('https://www.google.com')
webdriver.get('https://www.github.com')
webdriver.get('https://github.com')
webdriver.back()
assert webdriver.current_url.rstrip('/') == 'https://www.google.com'

Expand All @@ -18,10 +18,10 @@ def test_current_url(webdriver):

def test_forward(webdriver):
webdriver.get('https://www.google.com')
webdriver.get('https://www.github.com')
webdriver.get('https://github.com')
webdriver.back()
webdriver.forward()
assert webdriver.current_url.rstrip('/') == 'https://www.github.com'
assert webdriver.current_url.rstrip('/') == 'https://github.com'


def test_find_element(webdriver):
Expand Down

0 comments on commit 76bfad4

Please sign in to comment.