1、需要通过mjpeg高速静态截图传流的方式获取视频流;
2、需要实现远控安卓设备,抽象出来需要实现up、down、move
1、需要tidevice阿里库(目前最多支持iOS16及之前版本,17及之后的不支持,需要考虑兼容) 2、需要wda
pythonimport contextlib
import logging
import os
import socket
import threading
import asyncio
import websockets
import tidevice
import wda
import time
import json
# 配置 logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
# 设置 root logger 的级别
logging.getLogger().setLevel(logging.INFO)
class SocketBuffer:
def __init__(self, sock: socket.socket):
self._sock = sock
self._buf = bytearray()
async def _drain(self):
loop = asyncio.get_event_loop()
_data = await loop.run_in_executor(None, self._sock.recv, 4096)
# logging.info(f'_drain: Received {len(_data)} bytes')
if _data is None or len(_data) == 0:
raise IOError('socket closed')
self._buf.extend(_data)
return len(_data)
async def read_until(self, delimiter: bytes) -> bytes:
""" return without delimiter """
while True:
index = self._buf.find(delimiter)
if index != -1:
_return = self._buf[:index]
self._buf = self._buf[index + len(delimiter):]
return _return
await self._drain()
async def read_bytes(self, length: int) -> bytes:
while length > len(self._buf):
await self._drain()
_return, self._buf = self._buf[:length], self._buf[length:]
return _return
async def write(self, data: bytes):
loop = asyncio.get_event_loop()
start_time = time.time()
await loop.run_in_executor(None, self._sock.sendall, data)
end_time = time.time()
logging.info(f'write: Sent {len(data)} bytes in {end_time - start_time:.4f} seconds')
@contextlib.asynccontextmanager
async def make_screenrecord(c: wda.Client, t: tidevice.Device, websocket):
logging.info(f'New WebSocket connection from {websocket.remote_address}')
_old_fps = c.appium_settings()['mjpegServerFramerate']
_fps = 60 # 提高帧率
c.appium_settings({'mjpegServerFramerate': _fps})
# Read image from WDA mjpeg server
pconn = t.create_inner_connection(9100) # default WDA mjpeg server port
sock = pconn.get_socket()
buf = SocketBuffer(sock)
await buf.write(b'GET / HTTP/1.0\r\nHost: localhost\r\n\r\n')
await buf.read_until(b'\r\n\r\n')
logging.info('screenrecord is ready to begin')
s = c.session()
logging.info('session is ready to begin')
async def _drain(stop_event, done_event):
while not stop_event.is_set():
try:
# read http header
length = None
while True:
line = await buf.read_until(b'\r\n')
if line.startswith(b'Content-Length'):
length = int(line.decode('utf-8').split(': ')[1])
break
while True:
if await buf.read_until(b'\r\n') == b'':
break
imdata = await buf.read_bytes(length)
# logging.info(f'Received frame of length {length} bytes') # 输出图像数据的大小
await websocket.send(imdata)
except Exception as e:
logging.info(f'Error in _drain: {e}')
break
done_event.set()
async def _handle_messages():
try:
async for message in websocket:
logging.info(f'Received message: {message}')
try:
data = json.loads(message)
if data.get('type') == 'touch':
action = data.get('action')
x = data.get('x')
y = data.get('y')
if action == 'normal':
# 调用触摸操作函数,传递 x 和 y 坐标
logging.info(f'Touch action at ({x}, {y})')
await asyncio.to_thread(s.click, x, y)
elif action == 'long':
# 处理长按操作
logging.info(f'Long touch action at ({x}, {y})')
elif action == 'double':
# 处理双击操作
logging.info(f'Double touch action at ({x}, {y})')
elif action == 'move':
x1 = data.get('x1')
y1 = data.get('y1')
x2 = data.get('x2')
y2 = data.get('y2')
logging.info(f'Move action from ({x1}, {y1}) to ({x2}, {y2})')
await asyncio.to_thread(s.swipe, x1, y1, x2, y2)
else:
logging.info(f'Not supported action {action}')
elif data.get('type') == 'keyEvent':
event_number = data['eventNumber']
logging.info(f'Key event: {event_number}')
# tudo 这里应该要把这些数字抽象出来为一个类,这里先跑通再优化
if event_number == 187:
# self. driver.press_key(KeyCode.MENU)
pass
elif event_number == 3:
# home
await asyncio.to_thread(s.home)
elif event_number == 4:
# back
pass
elif event_number == 224:
# wake up display
await asyncio.to_thread(s.unlock)
elif data.get('type') == 'text':
detail = data.get('detail')
logging.info(f'Text input: {detail}')
except json.JSONDecodeError as e:
logging.info(f"Error decoding JSON: {e}")
except Exception as e:
logging.info(f"Error processing message: {e}")
except websockets.ConnectionClosed:
logging.info('WebSocket connection closed')
except Exception as e:
logging.error(f'Unexpected error: {e}')
stop_event = threading.Event()
done_event = threading.Event()
drain_task = asyncio.create_task(_drain(stop_event, done_event))
handle_messages_task = asyncio.create_task(_handle_messages())
try:
yield
finally:
stop_event.set()
await done_event.wait()
drain_task.cancel()
handle_messages_task.cancel()
c.appium_settings({'mjpegServerFramerate': _old_fps})
logging.info('Screen recording stopped')
async def mjpeg_stream(websocket):
# 从环境变量中读取UDID
udid = os.environ.get('UDID')
logging.info(f'udid: {udid}')
c = wda.USBClient(udid)
c.unlock()
t = tidevice.Device(udid)
try:
async with make_screenrecord(c, t, websocket):
await asyncio.Future() # run forever
except websockets.ConnectionClosed:
logging.info('WebSocket connection closed')
except Exception as e:
logging.error(f'Unexpected error: {e}')
async def main():
# 从环境变量中读取空闲端口
free_port = int(os.environ.get('FREE_PORT'))
logging.info(f'free_port: {free_port}')
# # 从环境变量中读取UDID
# udid = os.environ.get('UDID')
# logging.info(f'udid: {udid}')
server = await websockets.serve(mjpeg_stream, '0.0.0.0', free_port)
logging.info(
f'WebSocket server started at ws://{server.sockets[0].getsockname()[0]}:{server.sockets[0].getsockname()[1]}')
await asyncio.Future() # run forever
if __name__ == '__main__':
asyncio.run(main())
本文作者:lixf6
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!