内部开发者Java版本官方SDK:java版鸿蒙远控底层OS对应SDK
可以通过此,获取稳定的h264流及远控up down move实现
协议底层:设备跟pc的协议是通过grpc协议
Java待实现
hypium版本 参考官方源码实现(可以投屏,但没法实现丝滑的up down move:华为官方hypium文档
python3import logging import os import socket import json import asyncio import websockets from threading import Thread from datetime import datetime from hypium import UiDriver, KeyCode, BY from hmdriver2.driver import Driver # 配置 logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) # 设置 root logger 的级别 logging.getLogger().setLevel(logging.INFO) class HmScrcpyServer: """录制屏幕并通过 WebSocket 提供实时投屏流""" def __init__(self, driver): self.driver = driver # self.client = driver._client self.clients = set() self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) # self.ws_port = None self.gesture = driver.gesture def start(self, free_port, timeout=3600): self.is_running = True self.timeout = timeout self.stop_capture_if_running() msg_json = {'api': "startCaptureScreen", 'args': []} full_msg = {"module": "com.ohos.devicetest.hypiumApiHelper", "method": "Captures", "params": msg_json, "request_id": datetime.now().strftime("%Y%m%d%H%M%S%f")} self.driver._client._send_msg(full_msg) reply = self.driver._client._recv_msg(1024) logging.info(f'reply: {reply}') if b"true" in reply: thread_ws = Thread(target=self.run_loop, args=(self._start_websocket_server(free_port),)) thread_ws.start() thread_record = Thread(target=self._record_worker) thread_record.start() else: raise RuntimeError("Fail to start screen capture") def run_loop(self, coro): asyncio.set_event_loop(self.loop) self.loop.run_until_complete(coro) self.loop.run_forever() def stop_capture_if_running(self): msg_json = { 'api': "stopCaptureScreen", 'args': [] } full_msg = {"module": "com.ohos.devicetest.hypiumApiHelper", "method": "Captures", "params": msg_json, "request_id": datetime.now().strftime("%Y%m%d%H%M%S%f")} self.driver._client._send_msg(full_msg) reply = self.driver._client._recv_msg(1024) logging.info(f'stop reply: {reply}') async def _start_websocket_server(self, free_port=8765): # 动态选择一个可用端口 self.ws_port = free_port while True: try: server = await websockets.serve(self._handler, "0.0.0.0", self.ws_port) logging.info(f"WebSocket server started at ws://0.0.0.0:{self.ws_port}") await server.wait_closed() break except OSError as e: if e.errno == 10048: self.ws_port += 1 else: raise e async def _handler(self, websocket, path): self.clients.add(websocket) logging.info(f"New WebSocket connection from {websocket.remote_address}") try: async for message in websocket: logging.info(f"Received message from {websocket.remote_address}: {message}") try: data = json.loads(message) if data.get('type') == 'touch': action = data.get('action') x = data.get('x') y = data.get('y') if action == 'down': logging.info(f'down {x} {y}') self.gesture.start(x, y, 0.1).action() elif action == 'up': logging.info(f'up {x} {y}') self.gesture.pause(0.1).action() elif action == 'move': logging.info(f'move {x} {y}') self.gesture.move(x, y, 0.1).action() elif action == 'normal': # 调用触摸操作函数,传递 x 和 y 坐标 self.driver.touch((x, y)) elif action == 'long': # 处理长按操作 self.driver.touch(target=(x, y), mode='long') elif action == 'double': # 处理双击操作 self.driver.touch(target=(x, y), mode='double') elif action == 'move': x1 = data.get('x1') y1 = data.get('y1') x2 = data.get('x2') y2 = data.get('y2') self.driver.slide(start=(x1, y1), end=(x2, y2), slide_time=0.1) else: logging.info(f'not support action {action}') elif data.get('type') == 'keyEvent': # {"type":"keyEvent","eventNumber":3} # 使用 adbutils 提供的 shell 方法执行按键事件命令 event_number = data['eventNumber'] # tudo 这里应该要把这些数字抽象出来为一个类,这里先跑通再优化 if event_number == 187: # self. driver.press_key(KeyCode.MENU) self.driver.swipe_to_recent_task() elif event_number == 3: # home self.driver.go_home() elif event_number == 4: # home self.driver.go_back() # press_back 也可以用 elif event_number == 224: # wake up display self.driver.wake_up_display() elif data.get('type') == 'text': # input: {"type":"text","detail":"hello"} # 1-借助hdc/shell命令执行输入文本操作 但是执行太慢了 # exam: hdc shell uitest uiInput inputText point_x point_y text # self.driver.hdc(f"shell uitest uiInput inputText {data.get('x')} {data.get('y')} {data.get('detail')}") if data.get('detail') == 'CODE_AC_BACK': self.driver.press_key(KeyCode.DEL) elif data.get('detail') == 'CODE_AC_ENTER': self.driver.press_key(KeyCode.ENTER) else: # 常规字符输入 self.driver.shell( f"uitest uiInput inputText {data.get('x')} {data.get('y')} {data.get('detail')}") # 2-使用input_text方法 # self.driver.input_text(BY.type("TextInput"), data.get('detail')) except json.JSONDecodeError as e: logging.info(f"Error decoding JSON: {e}") except Exception as e: logging.info(f"Error processing message: {e}") except websockets.exceptions.ConnectionClosedError: pass finally: self.clients.remove(websocket) def _record_worker(self): tmp_data = b'' start_flag = b'\xff\xd8' end_flag = b'\xff\xd9' while self.is_running: try: result = self.driver._client._recv_msg(4096 * 1024, print=False) # logging.info(f'len: {len(result)}') tmp_data += result while start_flag in tmp_data and end_flag in tmp_data: start_index = tmp_data.index(start_flag) end_index = tmp_data.index(end_flag) + 2 frame = tmp_data[start_index:end_index] tmp_data = tmp_data[end_index:] # 发送完整的JPEG图像帧给所有客户端 asyncio.run_coroutine_threadsafe(self._broadcast(frame), self.loop) except socket.timeout: logging.info("timeout") continue except Exception as e: logging.info("connection closed", e) self.is_running = False self.driver._client._connect_sock = None break async def _broadcast(self, data): for client in self.clients: try: await client.send(data) except websockets.exceptions.ConnectionClosedError: logging.info("WebSocket connection closed") self.clients.remove(client) except Exception as e: logging.info(f"Error sending data to client: {e}") def stop(self): self.is_running = False if self.driver._client._connect_sock is None: return msg_json = {'api': "stopCaptureScreen", 'args': []} full_msg = {"module": "com.ohos.devicetest.hypiumApiHelper", "method": "Captures", "params": msg_json, "request_id": datetime.now().strftime("%Y%m%d%H%M%S%f")} self.driver._client.sendall(full_msg) reply = self.driver._client._recv_msg(1024) if b"true" not in reply: logging.info("Fail to stop capture") self.driver._client.release() self.driver._client._connect_sock = None self.loop.stop() for client in self.clients: asyncio.run_coroutine_threadsafe(client.close(), self.loop) def main(): # 从环境变量中读取空闲端口 free_port = int(os.environ.get('FREE_PORT')) logging.info(f'free_port: {free_port}') # 从环境变量中读取UDID udid = os.environ.get('UDID') logging.info(f'udid: {udid}') driver = Driver(udid) recorder = HmScrcpyServer(driver) recorder.start(free_port) if __name__ == "__main__": main() # result = input("press to stop:") # recorder.stop()
基于官方python版本SDK实现
python3.11待补充
其他相关: 已开源贡献mjpeg方式的实现: uiautodev
本文作者:lixf6
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!