编辑
2025-03-01
移动端
00
请注意,本文编写于 62 天前,最后修改于 62 天前,其中某些信息可能已经过时。

目录

背景
代码

背景

1、需要通过mjpeg高速静态截图传流的方式获取视频流;

2、需要实现远控安卓设备,抽象出来需要实现up、down、move

代码

1、需要tidevice阿里库(目前最多支持iOS16及之前版本,17及之后的不支持,需要考虑兼容) 2、需要wda

python
import contextlib import logging import os import socket import threading import asyncio import websockets import tidevice import wda import time import json # 配置 logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) # 设置 root logger 的级别 logging.getLogger().setLevel(logging.INFO) class SocketBuffer: def __init__(self, sock: socket.socket): self._sock = sock self._buf = bytearray() async def _drain(self): loop = asyncio.get_event_loop() _data = await loop.run_in_executor(None, self._sock.recv, 4096) # logging.info(f'_drain: Received {len(_data)} bytes') if _data is None or len(_data) == 0: raise IOError('socket closed') self._buf.extend(_data) return len(_data) async def read_until(self, delimiter: bytes) -> bytes: """ return without delimiter """ while True: index = self._buf.find(delimiter) if index != -1: _return = self._buf[:index] self._buf = self._buf[index + len(delimiter):] return _return await self._drain() async def read_bytes(self, length: int) -> bytes: while length > len(self._buf): await self._drain() _return, self._buf = self._buf[:length], self._buf[length:] return _return async def write(self, data: bytes): loop = asyncio.get_event_loop() start_time = time.time() await loop.run_in_executor(None, self._sock.sendall, data) end_time = time.time() logging.info(f'write: Sent {len(data)} bytes in {end_time - start_time:.4f} seconds') @contextlib.asynccontextmanager async def make_screenrecord(c: wda.Client, t: tidevice.Device, websocket): logging.info(f'New WebSocket connection from {websocket.remote_address}') _old_fps = c.appium_settings()['mjpegServerFramerate'] _fps = 60 # 提高帧率 c.appium_settings({'mjpegServerFramerate': _fps}) # Read image from WDA mjpeg server pconn = t.create_inner_connection(9100) # default WDA mjpeg server port sock = pconn.get_socket() buf = SocketBuffer(sock) await buf.write(b'GET / HTTP/1.0\r\nHost: localhost\r\n\r\n') await buf.read_until(b'\r\n\r\n') logging.info('screenrecord is ready to begin') s = c.session() logging.info('session is ready to begin') async def _drain(stop_event, done_event): while not stop_event.is_set(): try: # read http header length = None while True: line = await buf.read_until(b'\r\n') if line.startswith(b'Content-Length'): length = int(line.decode('utf-8').split(': ')[1]) break while True: if await buf.read_until(b'\r\n') == b'': break imdata = await buf.read_bytes(length) # logging.info(f'Received frame of length {length} bytes') # 输出图像数据的大小 await websocket.send(imdata) except Exception as e: logging.info(f'Error in _drain: {e}') break done_event.set() async def _handle_messages(): try: async for message in websocket: logging.info(f'Received message: {message}') try: data = json.loads(message) if data.get('type') == 'touch': action = data.get('action') x = data.get('x') y = data.get('y') if action == 'normal': # 调用触摸操作函数,传递 x 和 y 坐标 logging.info(f'Touch action at ({x}, {y})') await asyncio.to_thread(s.click, x, y) elif action == 'long': # 处理长按操作 logging.info(f'Long touch action at ({x}, {y})') elif action == 'double': # 处理双击操作 logging.info(f'Double touch action at ({x}, {y})') elif action == 'move': x1 = data.get('x1') y1 = data.get('y1') x2 = data.get('x2') y2 = data.get('y2') logging.info(f'Move action from ({x1}, {y1}) to ({x2}, {y2})') await asyncio.to_thread(s.swipe, x1, y1, x2, y2) else: logging.info(f'Not supported action {action}') elif data.get('type') == 'keyEvent': event_number = data['eventNumber'] logging.info(f'Key event: {event_number}') # tudo 这里应该要把这些数字抽象出来为一个类,这里先跑通再优化 if event_number == 187: # self. driver.press_key(KeyCode.MENU) pass elif event_number == 3: # home await asyncio.to_thread(s.home) elif event_number == 4: # back pass elif event_number == 224: # wake up display await asyncio.to_thread(s.unlock) elif data.get('type') == 'text': detail = data.get('detail') logging.info(f'Text input: {detail}') except json.JSONDecodeError as e: logging.info(f"Error decoding JSON: {e}") except Exception as e: logging.info(f"Error processing message: {e}") except websockets.ConnectionClosed: logging.info('WebSocket connection closed') except Exception as e: logging.error(f'Unexpected error: {e}') stop_event = threading.Event() done_event = threading.Event() drain_task = asyncio.create_task(_drain(stop_event, done_event)) handle_messages_task = asyncio.create_task(_handle_messages()) try: yield finally: stop_event.set() await done_event.wait() drain_task.cancel() handle_messages_task.cancel() c.appium_settings({'mjpegServerFramerate': _old_fps}) logging.info('Screen recording stopped') async def mjpeg_stream(websocket): # 从环境变量中读取UDID udid = os.environ.get('UDID') logging.info(f'udid: {udid}') c = wda.USBClient(udid) c.unlock() t = tidevice.Device(udid) try: async with make_screenrecord(c, t, websocket): await asyncio.Future() # run forever except websockets.ConnectionClosed: logging.info('WebSocket connection closed') except Exception as e: logging.error(f'Unexpected error: {e}') async def main(): # 从环境变量中读取空闲端口 free_port = int(os.environ.get('FREE_PORT')) logging.info(f'free_port: {free_port}') # # 从环境变量中读取UDID # udid = os.environ.get('UDID') # logging.info(f'udid: {udid}') server = await websockets.serve(mjpeg_stream, '0.0.0.0', free_port) logging.info( f'WebSocket server started at ws://{server.sockets[0].getsockname()[0]}:{server.sockets[0].getsockname()[1]}') await asyncio.Future() # run forever if __name__ == '__main__': asyncio.run(main())

本文作者:lixf6

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!