编辑
2025-03-01
移动端
00
请注意,本文编写于 62 天前,最后修改于 62 天前,其中某些信息可能已经过时。

目录

背景
代码

背景

1、需要实现源源不断的获取h264的视频流格式;

2、需要实现远控安卓设备,抽象出来需要实现up、down、move

代码

底层协议参考 :scrcpy开发者文档

python
import struct import socket class TouchController: """scrcpy控制类,支持scrcpy版本>=2.2""" def __init__(self, control_socket: socket.socket, format_string: str, unknown1: int, unknown2: int, const_value: int): self.control_socket = control_socket self.format_string = format_string # self.width = width # self.height = height self.unknown1 = unknown1 self.unknown2 = unknown2 self.const_value = const_value def down(self, x: int, y: int, width: int, height: int): """发送down操作""" values = struct.pack(self.format_string, 2, 0, 1, x, y, width, height, self.const_value, self.unknown1, self.unknown2) self.control_socket.send(values) def up(self, x: int, y: int, width: int, height: int): """发送up操作""" values = struct.pack(self.format_string, 2, 1, 1, x, y, width, height, self.const_value, self.unknown1, self.unknown2) self.control_socket.send(values) def move(self, x: int, y: int, width: int, height: int): """发送move操作""" values = struct.pack(self.format_string, 2, 2, 1, x, y, width, height, self.const_value, self.unknown1, self.unknown2) self.control_socket.send(values) def text(self, text: str): """发送文本操作""" # buffer = text.encode("utf-8") # values = struct.pack(self.format_string, 2, 3, 1, len(buffer), 0, 0, 0, self.const_value, # self.unknown1, self.unknown2) + buffer # self.control_socket.send(values) pass

含oss实现及异常情况处理(经公司运行3年流畅)

python
import asyncio import fcntl import glob import json import logging import os import re import signal import struct import subprocess import time from collections import deque import websockets from adbutils import adb, AdbError, Network from datetime import datetime from exceptions import AppErrors from touch_controller import TouchController from utils import init_oos_s3 # 配置 logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', ) logging.getLogger().setLevel(logging.INFO) class ScrcpyServer: """scrcpy服务类""" def __init__(self, scrcpy_jar_path: str = None, record: bool = False): self.scrcpy_jar_path = scrcpy_jar_path or 'scrcpy-server.jar' self.need_run_scrcpy = True self.controller = None self.video_socket = None self.device = None self.record = record or self.convert_to_bool(os.environ.get('RECORD', False)) self.bundle_id = os.environ.get('BUNDLE_ID', '0') self.output_dir = 'temp_hls' # 录制文件所在本地目录 self.s3_bucket = 'device-recording' # 远程bucket名称,一般对应投屏录制专用目录 self.s3_path_prefix = '' # 远程bucket下面的目录名称,一般对应<udid>/<bundle_id> self.ffmpeg_process = None self.m3u8_path = None # 根底m3u8文件路径的位置 self.s3 = None self.resolution_width = 0 self.resolution_height = 0 self.data_queue = asyncio.Queue() @staticmethod def convert_to_bool(value): """将值强制转换为布尔值""" logging.info(f'value: {value}') if isinstance(value, bool): return value elif isinstance(value, str): return value.lower() in ['true', '1', 'yes', 'y'] else: return bool(value) def start_scrcpy_server(self, serial: str, control: bool = True): logging.info(f'self.scrcpy_jar_path: {self.scrcpy_jar_path}') logging.info(f'serial: {serial}') # 推送scrcpy服务器到设备 subprocess.run(['adb', '-s', serial, 'push', self.scrcpy_jar_path, '/data/local/tmp/scrcpy-server.jar']) # 启动scrcpy服务器 start_command = [ 'adb', '-s', serial, 'shell', 'CLASSPATH=/data/local/tmp/scrcpy-server.jar', 'app_process', '/', # 用于启动 Android 应用程序的工具 'com.genymobile.scrcpy.Server', # scrcpy 服务器的入口点,负责处理和管理 scrcpy 的所有功能 '2.7', 'log_level=info', 'max_size=1024', # 0 表示不限制分辨率,保持原始设备分辨率,这里需要设置最大1024,因为某些oppo手机无法使用1920去解码 'max_fps=30', # 15 表示最大帧率为 15 帧每秒。可以调整以减少带宽消耗或提高性能 'video_bit_rate=8000000', # 视频的比特率, 默认 8000000 即 8 Mbps 1000000000 1Gbps 'tunnel_forward=true', # 启用或禁用 adb 的隧道转发 'send_frame_meta=false', # 是否发送每帧的元数据。元数据包括帧的时间戳、序列号等信息,通常用于同步 'control=true' if control else 'control=false', 'audio=false', 'show_touches=false', # 是否显示触摸操作的指示器 'stay_awake=false', # 是否保持设备唤醒状态 'power_off_on_close=false', # 是否在关闭 scrcpy 时关闭设备屏幕 'clipboard_autosync=false' # 是否自动同步剪贴板 ] process = subprocess.Popen(start_command) logging.info('scrcpy server started') return process def init_recording_config(self): """录制配置初始化""" logging.info(f'record: {self.record} type: {type(self.record)}') if not self.record: return if not self.device: return AppErrors.ERROR_DEVICE_UDID_IS_NOT_FOUND s3 = init_oos_s3() # 确保输出目录存在 if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) # 获取当前时间,格式化到秒 # timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # 指定输出的 m3u8 文件路径和 ts 文件路径,添加时间维度 self.m3u8_path = os.path.join(self.output_dir, f'{self.device.serial}_{self.bundle_id}.m3u8') # self.s3_path_prefix = f'{self.device.serial}_{self.bundle_id}' self.s3_path_prefix = f'{self.device.serial}/{self.bundle_id}' ts_path_template = os.path.join(self.output_dir, f'{self.device.serial}_{self.bundle_id}_%03d.ts') logging.info(f'm3u8_path: {self.m3u8_path}') # FFmpeg 进程,用于将 h264 流保存为 m3u8 和 ts 文件 ffmpeg_process = subprocess.Popen( [ 'ffmpeg', '-y', # 覆盖现有文件 '-f', 'h264', # 输入格式是 h264 '-i', '-', # 输入是从管道传入的 '-c:v', 'copy', # 直接拷贝视频流,不重新编码 '-f', 'hls', # 输出格式为 HLS (m3u8) '-hls_time', '10', # 每个 ts 文件的时长为 10 秒 '-hls_list_size', '0', # m3u8 文件中包含所有的 ts 文件 '-hls_segment_filename', ts_path_template, # ts 文件保存路径 self.m3u8_path # 输出的 m3u8 文件保存路径 ], stdin=subprocess.PIPE, # 将数据传入 FFmpeg 的 stdin stdout=subprocess.PIPE, stderr=subprocess.PIPE ) self.ffmpeg_process = ffmpeg_process self.s3 = s3 # 设置 stdin 为非阻塞模式 fd = self.ffmpeg_process.stdin.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) async def write_to_ffmpeg(self): """从队列中获取数据并写入 FFmpeg 进程""" logging.info('write_to_ffmpeg process.') while True: data = await self.data_queue.get() if data is None: break try: # 分块写入数据 chunk_size = 64 * 1024 # 64 KB for i in range(0, len(data), chunk_size): chunk = data[i:i + chunk_size] while True: try: self.ffmpeg_process.stdin.write(chunk) self.ffmpeg_process.stdin.flush() break except BlockingIOError: logging.warning('FFmpeg process stdin buffer is full, discarding old data...') # 丢弃旧数据 discarded_data = await self.data_queue.get() if discarded_data is not None: logging.info(f'Discarded {len(discarded_data)} bytes of old data.') await asyncio.sleep(0.1) # 等待一段时间后重试 # logging.info('Data written to FFmpeg process.') except BrokenPipeError: logging.error('FFmpeg process stdin is closed.') break except Exception as e: logging.error(f'Error writing to FFmpeg process: {e}') break # except BrokenPipeError: # logging.error('FFmpeg process stdin is closed.') # break # except Exception as e: # logging.error(f'Error writing to FFmpeg process: {e}') # break async def read_video_stream(self, video_socket, websocket): """读取视频流信息""" # buffer = deque(maxlen=10) # 使用一个缓冲区,最大长度为10 try: # 启动写入任务 asyncio.create_task(self.write_to_ffmpeg()) while True: if video_socket._closed: logging.warning('Video socket is closed.') break loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, video_socket.recv, 1024 * 1024 * 10) # 非阻塞异步读取 # try: # data = await asyncio.wait_for(loop.run_in_executor(None, video_socket.recv, 1024 * 1024 * 10), # timeout=10.0) # 设置超时时间 # except asyncio.TimeoutError: # logging.warning('Timeout while reading from video socket.') # # continue # 超时后继续尝试读取数据 # if buffer: # data = buffer.popleft() # 从缓冲区中取出数据 # else: # continue # 缓冲区为空,继续尝试读取数据 # logging.info(f'{len(data)}') if not data: logging.warning('No data received, breaking the loop.') # # logging.warning('No data received, continue the loop.') # # continue break # buffer.append(data) # 将数据添加到缓冲区 if websocket: await websocket.send(data) if self.record and self.ffmpeg_process: # # 将数据传入到 FFmpeg 的 stdin,进行 HLS 转码 # self.ffmpeg_process.stdin.write(data) # self.ffmpeg_process.stdin.flush() # 确保数据实时写入 # logging.info('Data written to FFmpeg process.') # 将数据放入队列 # logging.info(f'queue: {len(data)}') await self.data_queue.put(data) queue_size = self.data_queue.qsize() # logging.info(f'Queue size: {queue_size}') except Exception as e: logging.error(f'Error reading video stream: {e}') if websocket: await websocket.close(reason=f'Error reading video stream: {e}') finally: logging.info('enter finally ...') # 结束写入任务 await self.data_queue.put(None) # 只有websocket为空时,即默认没有外部WS发起(自己发起的录制屏幕)时,才需要停止录制和上传相关的操作 await self.clean_up(websocket) def run_signal_monitor(self, websocket=None): loop = asyncio.get_event_loop() # 使用闭包传递 websocket 参数 loop.add_signal_handler(signal.SIGTERM, lambda: self.signal_handler(websocket)) loop.add_signal_handler(signal.SIGINT, lambda: self.signal_handler(websocket)) def signal_handler(self, websocket): logging.info('Received termination signal, initiating cleanup...') # 在信号处理程序中调用清理逻辑 asyncio.create_task(self.clean_up(websocket)) async def clean_up(self, websocket): logging.info('enter finally ...') # 只有 websocket 为空时才执行清理操作 if not websocket and self.record and self.ffmpeg_process: # # 关闭 FFmpeg 的 stdin,确保它处理完所有数据并结束 # 确保所有数据都已经处理完毕 while not self.data_queue.empty(): discarded_data = await self.data_queue.get() if discarded_data is not None: logging.info(f'Discarded {len(discarded_data)} bytes of old data.') self.ffmpeg_process.stdin.close() # 等待 FFmpeg 进程结束 await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # await self.ffmpeg_process.wait() logging.info('FFmpeg process terminated.') # 上传 ts 文件和 m3u8 文件到 OSS ts_files = self.upload_to_oss() # 清理已生成的文件 self.clean_up_files(ts_files) def upload_to_oss(self): """上传文件到 OSS (对象存储). 上传到oss特定目录下,假设文件路径为: m3u8: https://device-recording.oss-cn-uat.midea.com/<udid>/<service_id>/<udid>_<service_id>.m3u8 ts: https://device-recording.oss-cn-uat.midea.com/<udid>/<service_id>/<udid>_<service_id>_000.ts """ logging.info('OSS upload start ...') # 获取所有生成的 ts 文件 ts_files = glob.glob(os.path.join(self.output_dir, f'*.ts')) ts_files.append(self.m3u8_path) # 用于记录成功上传的文件 uploaded_files = [] for file_path in ts_files: if not file_path or not os.path.exists(file_path): logging.error(f'Invalid file path: {file_path}') continue try: key = os.path.join(f'{self.s3_path_prefix}', os.path.basename(file_path)) self.s3.meta.client.upload_file(file_path, self.s3_bucket, key, ExtraArgs={'ACL': 'public-read'}) logging.info(f'Uploaded {file_path} to {self.s3_path_prefix}') # 记录成功上传的文件 uploaded_files.append(file_path) except Exception as e: logging.error(f'Failed to upload {file_path}: {str(e)}') logging.info('OSS upload finished ...') return uploaded_files # 返回成功上传的文件列表 @staticmethod def clean_up_files(uploaded_files): """删除已生成的 ts 文件和 m3u8 文件.""" logging.info('Cleaning up files...') if not uploaded_files: logging.warning('No files to clean up.') return for file_path in uploaded_files: try: os.remove(file_path) logging.info(f'Removed {file_path}') except Exception as e: logging.error(f'Failed to remove {file_path}: {str(e)}') logging.info('Clean up finished ...') @staticmethod def setup_connection(serial: str, control: bool = True): device = adb.device(serial=serial) video_socket = None for _ in range(100): try: video_socket = device.create_connection(Network.LOCAL_ABSTRACT, 'scrcpy') logging.info(f'video_socket = {video_socket}') break except AdbError: time.sleep(0.1) dummy_byte = video_socket.recv(1) if not dummy_byte or dummy_byte != b"\x00": raise ConnectionError("Did not receive Dummy Byte!") logging.info('Received Dummy Byte!') if not control: return video_socket else: control_socket = None for _ in range(100): try: control_socket = device.create_connection(Network.LOCAL_ABSTRACT, 'scrcpy') logging.info(f'control_socket = {control_socket}') break except AdbError: time.sleep(0.1) device_name = video_socket.recv(64).decode('utf-8').rstrip('\x00') logging.info(f'Device name: {device_name}') codec = video_socket.recv(4) logging.info(f'resolution_data: {codec}') resolution_data = video_socket.recv(8) logging.info(f'resolution_data: {resolution_data}') screen_width, screen_height = struct.unpack(">II", resolution_data) logging.info(f'Resolution: {screen_width}x{screen_height}') format_string = '>BBqiiHHHii' const_value = 65535 unknown1 = 1 unknown2 = 1 touch_controller = TouchController(control_socket, format_string, const_value, unknown1, unknown2) return touch_controller, video_socket, device, screen_width, screen_height async def handle_control_websocket(self, websocket, serial): logging.info(f'New control connection from {websocket.remote_address} for serial: {serial}') try: async for message in websocket: logging.info(f'Received message: {message}') try: message = json.loads(message) message_type = message.get('messageType', None) if message_type is None: continue if message_type == 'touch': action_type = message['data']['actionType'] x, y, width, height = (message['data']['x'], message['data']['y'], message['data']['width'], message['data']['height']) if action_type == 0: self.controller.down(x, y, width, height) elif action_type == 1: self.controller.up(x, y, width, height) elif action_type == 2: self.controller.move(x, y, width, height) else: raise Exception(f'not support action_type: {action_type}') elif message_type == 'keyEvent': event_number = message['data']['eventNumber'] self.device.shell(f'input keyevent {event_number}') elif message_type == 'text': text = message['detail'] self.device.shell(f'am broadcast -a SONIC_KEYBOARD --es msg \'{text}\'') except Exception as e: logging.error(f'Error handling message: {e}') except Exception as e: logging.error(f'Control connection handler error: {e}') finally: await websocket.close() async def handle_video_websocket(self, websocket=None, serial=''): if websocket: logging.info(f'New video connection from {websocket.remote_address} for serial: {serial}') video_task = asyncio.create_task(self.read_video_stream(self.video_socket, websocket)) try: async for message in websocket: try: data = json.loads(message) if 'udid' in data: response = json.dumps({ 'messageType': 'sizeInfo', 'sizeInfo': { 'width': self.resolution_width, 'height': self.resolution_height, 'rotation': 0, } }) await websocket.send(response) logging.info(f'Sent resolution: {response}') except json.JSONDecodeError: logging.error('Failed to decode JSON message') except Exception as e: logging.error(f'Error handling message: {e}') await video_task except Exception as e: logging.error(f'Video connection handler error: {e}') finally: await websocket.close() async def websocket_handler(websocket, path): logging.info(f'New connection from {websocket.remote_address}') logging.info(f'path: {path}') match = re.match(r'^/android/scrcpy/(?:screen/)?([^/]+)$', path) if match: serial = match.group(1) else: raise Exception('not valid serial !!!') global server # todo 这里前端必须先发投屏scrcpy流的WS过来,再发控制流的WS,否则控制流无法正常使用 if re.match(r'^/android/scrcpy/screen/([^/]+)$', path): logging.info('/android/scrcpy/screen ...') server = ScrcpyServer() if server.need_run_scrcpy: logging.info('need run scrcpy ...') server.start_scrcpy_server(serial) server.controller, server.video_socket, server.device, server.resolution_width, server.resolution_height = ( server.setup_connection(serial)) # # 启动scrcpy后检查是否需要开启录制 # server.init_recording_config() # 由于视频流是必开的,故每次开启scrcpy服务时,只需要在视频流中控制开启即可(开启后,控制流直接可以共用,不需要重复开启) await server.handle_video_websocket(websocket, serial) elif re.match(r'^/android/scrcpy/([^/]+)$', path): logging.debug('/android/scrcpy ...') if server: await server.handle_control_websocket(websocket, serial) else: logging.info(f'Invalid WebSocket path: {path}') await websocket.close() def init_record(udid): server = ScrcpyServer(record=True) # 默认开启录制屏幕 logging.info('scrcpy record init ...') server.start_scrcpy_server(udid) server.controller, server.video_socket, server.device, server.resolution_width, server.resolution_height = ( server.setup_connection(udid)) # 启动scrcpy后检查是否需要开启录制 server.init_recording_config() # 启动监控信号(杀掉容器时捕获,一定要有,否则可能录制收尾部分会丢失) server.run_signal_monitor() asyncio.create_task(server.read_video_stream(server.video_socket, None)) # await server.handle_video_websocket(server.websocket, udid) async def main(): # record=True free_port = os.environ.get('FREE_PORT') udid = os.environ.get('UDID') # 这里做个假设,一旦用户发起请求到此处,都是默认需要录制屏幕 init_record(udid) ws_server = await websockets.serve(websocket_handler, '0.0.0.0', free_port) logging.info(f'WebSocket server started at ws://0.0.0.0:{free_port}') try: logging.info('Main task is running...') await asyncio.Future() # 使主任务保持运行 except Exception as e: logging.error(f'Main task encountered an error: {e}') finally: logging.info('Main task is exiting, closing WebSocket server...') await ws_server.wait_closed() logging.info('WebSocket server closed.') if __name__ == '__main__': asyncio.run(main())

本文作者:lixf6

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!