Files
m20_core_web/cam_cap.py

208 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import pynng
import struct
import json
import time
import asyncio
import aiohttp
from threading import Thread, Lock
from queue import Queue, Empty
import logging
HTTP_SERVER_URL = "http://c1.sgc.brisky.space/upload"
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ImageBuffer:
def __init__(self, max_size=2):
self.left_queue = Queue(maxsize=max_size)
self.right_queue = Queue(maxsize=max_size)
self.lock = Lock()
def update_frames(self, left_frame, right_frame):
"""更新缓冲区中的帧,如果缓冲区满则丢弃旧帧"""
with self.lock:
try:
if not self.left_queue.full():
self.left_queue.put_nowait(left_frame)
else:
try:
self.left_queue.get_nowait() # 丢弃旧帧
self.left_queue.put_nowait(left_frame)
except Empty:
pass
if not self.right_queue.full():
self.right_queue.put_nowait(right_frame)
else:
try:
self.right_queue.get_nowait() # 丢弃旧帧
self.right_queue.put_nowait(right_frame)
except Empty:
pass
except:
pass
def get_latest_frames(self):
"""获取最新的帧对"""
with self.lock:
try:
left_frames = []
right_frames = []
# 获取所有可用的左帧
while not self.left_queue.empty():
try:
left_frames.append(self.left_queue.get_nowait())
except Empty:
break
# 获取所有可用的右帧
while not self.right_queue.empty():
try:
right_frames.append(self.right_queue.get_nowait())
except Empty:
break
if left_frames and right_frames:
# 返回最新的帧对
return left_frames[-1], right_frames[-1]
except:
pass
return None, None
# 全局缓冲区
image_buffer = ImageBuffer()
def capture_thread():
"""异步捕获线程"""
cam_left = cv2.VideoCapture('/dev/videoL')
cam_right = cv2.VideoCapture('/dev/videoR')
# # 设置摄像头分辨率
cam_left.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cam_left.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cam_right.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cam_right.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
while True:
ret_left, frame_left = cam_left.read()
ret_right, frame_right = cam_right.read()
if ret_left and ret_right:
image_buffer.update_frames(frame_left.copy(), frame_right.copy())
# print(f"num of left frames: {image_buffer.left_queue.qsize()}")
# print(f"num of right frames: {image_buffer.right_queue.qsize()}")
else:
print(f"Error capture thread, ret_left: {ret_left}, ret_right: {ret_right}")
time.sleep(0.03) # ~30fps
cam_left.release()
cam_right.release()
async def send_image_to_web_server(frame_left, frame_right, metadata):
"""异步发送图片到 Web 服务器(留空实现)"""
try:
# 编码为 JPEG
_, jpeg_left = cv2.imencode('.jpg', frame_left)
_, jpeg_right = cv2.imencode('.jpg', frame_right)
jpeg_left = jpeg_left.tobytes()
jpeg_right = jpeg_right.tobytes()
# HTTP POST 请求实现留空
print(f"Would send frame {metadata['timestamp']} to web server via HTTP POST")
# print(f"Client ID: {metadata['client_id']}")
cv2.imwrite(f"./saved/left_{metadata['timestamp']}.jpg", frame_left)
cv2.imwrite(f"./saved/right_{metadata['timestamp']}.jpg", frame_right)
# 这里可以添加实际的 HTTP POST 请求代码
# 示例:
async with aiohttp.ClientSession() as session:
data = aiohttp.FormData()
data.add_field('left_image', jpeg_left, filename='left.jpg', content_type='image/jpeg')
data.add_field('right_image', jpeg_right, filename='right.jpg', content_type='image/jpeg')
data.add_field('metadata', json.dumps(metadata))
async with session.post(HTTP_SERVER_URL, data=data) as response:
if response.status == 200:
print("Images sent successfully")
else:
print(f"Failed to send images: {response.status}")
# 模拟发送延迟
await asyncio.sleep(0.01)
except Exception as e:
print(f"Error sending images to web server: {e}")
class ImageServer:
def __init__(self, address="tcp://0.0.0.0:54321"):
self.address = address
self.socket = None
async def handle_client_request(self, client_id):
"""处理客户端请求:获取最新帧并发送到 Web 服务器"""
try:
# 从缓冲区获取最新帧
frame_left, frame_right = image_buffer.get_latest_frames()
if frame_left is not None and frame_right is not None:
# 创建元数据
metadata = {
"timestamp": time.time(),
# "client_id": client_id,
# "resolution": {
# "width": frame_left.shape[1],
# "height": frame_left.shape[0]
# }
}
# 异步发送图片到 Web 服务器
asyncio.create_task(send_image_to_web_server(frame_left, frame_right, metadata))
logger.info(f"Processed request from client {client_id}, frames sent to web server")
return b'ACK'
else:
logger.warning(f"No frames available for client {client_id}")
return b'NO_FRAMES'
except Exception as e:
logger.error(f"Error processing request for client {client_id}: {e}")
return b'ERROR'
async def run(self):
"""运行服务器"""
self.socket = pynng.Rep0()
self.socket.listen(self.address)
logger.info(f"Image server listening on {self.address}...")
while True:
try:
# 接收客户端请求(客户端 ID
data = await self.socket.arecv()
# 解析客户端 ID
client_id = data.decode('utf-8') if data else "unknown"
# logger.info(f"Received request from client: {client_id}")
# 处理请求并发送响应
response = await self.handle_client_request(client_id)
await self.socket.asend(response)
except Exception as e:
logger.error(f"Server error: {e}")
break
async def main():
# 启动捕获线程
capture_thread_obj = Thread(target=capture_thread, daemon=True)
capture_thread_obj.start()
print("Capture thread started")
# 启动图像服务器
server = ImageServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())