import cv2 import numpy as np from flask import Flask, render_template, request, jsonify, send_file from flask_socketio import SocketIO, emit from flask_httpauth import HTTPBasicAuth import io import base64 import time import threading import logging import os import json import sqlite3 from datetime import datetime import zipfile import tempfile # # 导入视觉处理相关的模块 # from llm_req import VisionAPIClient, DetectionResult from cap_trigger import ImageClient # --- 配置 --- SAVE_PATH_LEFT = "./static/received/left" SAVE_PATH_RIGHT = "./static/received/right" SAVE_PATH_LEFT_MARKED = "./static/received/left_marked" SAVE_PATH_RIGHT_MARKED = "./static/received/right_marked" FLASK_HOST = "0.0.0.0" FLASK_PORT = 5000 MAX_LIVE_FRAMES = 2 DATABASE_PATH = "received_images.db" # --- 配置 --- USERNAME = os.getenv('BASIC_AUTH_USERNAME', 'admin') PASSWORD = os.getenv('BASIC_AUTH_PASSWORD', '19260817') # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) app.config['SECRET_KEY'] = 'your-secret-key-change-this' socketio = SocketIO(app, cors_allowed_origins="*") auth = HTTPBasicAuth() # 创建 HTTPBasicAuth 实例 @auth.verify_password def verify_password(username, password): """验证用户名和密码""" # 比较提供的用户名和密码与配置的值 if username == USERNAME and password == PASSWORD: logger.info(f"Successful BasicAuth login for user: {username}") return username # 返回用户名表示认证成功 else: logger.warning(f"Failed BasicAuth attempt for user: {username}") return None # 返回 None 表示认证失败 # --- 数据库初始化 --- def init_db(): """初始化 SQLite 数据库和表""" conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS images ( id INTEGER PRIMARY KEY AUTOINCREMENT, left_filename TEXT NOT NULL, right_filename TEXT NOT NULL, left_marked_filename TEXT, right_marked_filename TEXT, timestamp REAL NOT NULL, metadata TEXT, comment TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, manual_detections_left TEXT, manual_detections_right TEXT, is_manual_labeled_left INTEGER DEFAULT 0, is_manual_labeled_right INTEGER DEFAULT 0, left_position INTEGER DEFAULT 0, right_position INTEGER DEFAULT 0 ) ''') conn.commit() conn.close() logger.info(f"Database {DATABASE_PATH} initialized.") # --- 全局变量 --- latest_left_frame = None latest_right_frame = None latest_timestamp = None frame_lock = threading.Lock() # 初始化数据库 init_db() image_client = ImageClient("tcp://175.24.228.220:7701", client_id="local") def draw_detections_on_image(image: np.ndarray, detections: list, left_position: int = None, right_position: int = None) -> np.ndarray: """在图像上绘制检测框和位置信息""" marked_image = image.copy() color_map = {1: (0, 255, 0), 2: (255, 0, 0), 3: (0, 0, 255), 4: (255, 255, 0)} h, w = image.shape[:2] position_text = "" if left_position is not None: position_text = f"POS/CAM_L: {left_position if left_position != 0 else 'NAN'}" elif right_position is not None: position_text = f"POS/CAM_R: {right_position if right_position != 0 else 'NAN'}" if position_text: font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1.0 font_thickness = 2 text_size = cv2.getTextSize(position_text, font, font_scale, font_thickness)[0] text_x, text_y = 10, 30 cv2.rectangle(marked_image, (text_x - 5, text_y - text_size[1] - 5), (text_x + text_size[0] + 5, text_y + 5), (255, 255, 255), -1) cv2.putText(marked_image, position_text, (text_x, text_y), font, font_scale, (0, 0, 0), font_thickness) for detection in detections: obj_id = detection.get("id", 0) label = detection.get("label", "") bbox = detection.get("bbox", []) if len(bbox) == 4: x_min = int(bbox[0] * w / 999) y_min = int(bbox[1] * h / 999) x_max = int(bbox[2] * w / 999) y_max = int(bbox[3] * h / 999) color = color_map.get(obj_id, (255, 255, 255)) cv2.rectangle(marked_image, (x_min, y_min), (x_max, y_max), color, 2) cv2.putText(marked_image, f"{label} ({obj_id})", (x_min, y_min + 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) return marked_image # --- Flask 路由 --- # 对需要保护的路由添加 @auth.login_required 装饰器 @app.route('/') @auth.login_required # 需要认证才能访问主页 def index(): """主页,加载实时图像页面""" logger.info(f"User {auth.current_user()} accessed the main page.") return render_template('index.html') @app.route('/list') # 新增路由用于显示图片列表 @auth.login_required # 需要认证才能访问列表页 def list_images(): """加载图片列表页面""" logger.info(f"User {auth.current_user()} accessed the image list page.") return render_template('list_images.html') @app.route('/api/images', methods=['GET']) @auth.login_required # 保护 API 端点 def get_images_api(): """API: 获取图片列表 (JSON 格式)""" logger.info(f"User {auth.current_user()} requested image list API.") conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute(""" SELECT id, left_filename, right_filename, left_marked_filename, right_marked_filename, timestamp, metadata, comment, created_at, manual_detections_left, manual_detections_right, is_manual_labeled_left, is_manual_labeled_right, left_position, right_position FROM images ORDER BY timestamp DESC """) rows = cursor.fetchall() conn.close() images = [] for row in rows: images.append({ "id": row[0], "left_filename": row[1], "right_filename": row[2], "left_marked_filename": row[3] or "", "right_marked_filename": row[4] or "", "timestamp": row[5], "metadata": row[6], "comment": row[7] or "", "created_at": row[8], "manual_detections_left": row[9] or "[]", "manual_detections_right": row[10] or "[]", "is_manual_labeled_left": bool(row[11]) if row[11] is not None else False, "is_manual_labeled_right": bool(row[12]) if row[12] is not None else False, "left_position": row[13], "right_position": row[14] }) return jsonify(images) @app.route('/api/images', methods=['DELETE']) @auth.login_required def delete_image_api(): """API: 删除单张图片记录及其文件""" logger.info(f"User {auth.current_user()} requested to delete an image.") image_id = request.json.get('id') if not image_id: return jsonify({"error": "Image ID is required"}), 400 conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute("SELECT left_filename, right_filename, left_marked_filename, right_marked_filename FROM images WHERE id = ?", (image_id,)) row = cursor.fetchone() if not row: conn.close() return jsonify({"error": "Image not found"}), 404 left_filename, right_filename, left_marked_filename, right_marked_filename = row cursor.execute("DELETE FROM images WHERE id = ?", (image_id,)) conn.commit() conn.close() left_path = os.path.join(SAVE_PATH_LEFT, left_filename) right_path = os.path.join(SAVE_PATH_RIGHT, right_filename) left_marked_path = os.path.join(SAVE_PATH_LEFT_MARKED, left_marked_filename) if left_marked_filename else None right_marked_path = os.path.join(SAVE_PATH_RIGHT_MARKED, right_marked_filename) if right_marked_filename else None try: for path in [left_path, right_path, left_marked_path, right_marked_path]: if path and os.path.exists(path): os.remove(path) logger.info(f"Deleted file: {path}") except OSError as e: logger.error(f"Error deleting files: {e}") pass return jsonify({"message": f"Image {image_id} deleted successfully"}) @app.route('/api/images/export', methods=['POST']) @auth.login_required def export_images_api(): """API: 打包导出选中的图片,优先导出标注图片""" logger.info(f"User {auth.current_user()} requested to export images.") selected_ids = request.json.get('ids', []) if not selected_ids: return jsonify({"error": "No image IDs selected"}), 400 conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() placeholders = ','.join('?' * len(selected_ids)) cursor.execute(f"SELECT left_filename, right_filename, left_marked_filename, right_marked_filename FROM images WHERE id IN ({placeholders})", selected_ids) rows = cursor.fetchall() conn.close() if not rows: return jsonify({"error": "No matching images found"}), 404 temp_zip_fd, temp_zip_path = tempfile.mkstemp(suffix='.zip') os.close(temp_zip_fd) try: with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for left_fn, right_fn, left_marked_fn, right_marked_fn in rows: left_export_fn = left_marked_fn if left_marked_fn else left_fn right_export_fn = right_marked_fn if right_marked_fn else right_fn left_export_path = os.path.join(SAVE_PATH_LEFT_MARKED if left_marked_fn else SAVE_PATH_LEFT, left_export_fn) right_export_path = os.path.join(SAVE_PATH_RIGHT_MARKED if right_marked_fn else SAVE_PATH_RIGHT, right_export_fn) if os.path.exists(left_export_path): zipf.write(left_export_path, os.path.join('left', left_export_fn)) if os.path.exists(right_export_path): zipf.write(right_export_path, os.path.join('right', right_export_fn)) logger.info(f"Exported {len(rows)} image pairs to {temp_zip_path}") return send_file(temp_zip_path, as_attachment=True, download_name='exported_images.zip') except Exception as e: logger.error(f"Error creating export ZIP: {e}") if os.path.exists(temp_zip_path): os.remove(temp_zip_path) return jsonify({"error": str(e)}), 500 @app.route('/upload', methods=['POST']) def upload_images(): """接收左右摄像头图片,保存并推送更新,同时生成标注图片""" try: left_file = request.files.get('left_image') right_file = request.files.get('right_image') metadata_str = request.form.get('metadata') comment = request.form.get('comment', '') if not left_file or not right_file: logger.warning("Received request without required image files.") return jsonify({"error": "Missing left_image or right_image"}), 400 nparr_left = np.frombuffer(left_file.read(), np.uint8) nparr_right = np.frombuffer(right_file.read(), np.uint8) img_left = cv2.imdecode(nparr_left, cv2.IMREAD_COLOR) img_right = cv2.imdecode(nparr_right, cv2.IMREAD_COLOR) if img_left is None or img_right is None: logger.error("Failed to decode received images.") return jsonify({"error": "Could not decode images"}), 400 metadata = {} if metadata_str: try: metadata = json.loads(metadata_str) except json.JSONDecodeError as e: logger.warning(f"Could not parse metadata: {e}") timestamp_str = str(metadata.get("timestamp", str(int(time.time())))) timestamp_str_safe = timestamp_str.replace(".", "_") left_filename = f"left_{timestamp_str_safe}.jpg" right_filename = f"right_{timestamp_str_safe}.jpg" left_marked_filename = f"left_marked_{timestamp_str_safe}.jpg" right_marked_filename = f"right_marked_{timestamp_str_safe}.jpg" left_path = os.path.join(SAVE_PATH_LEFT, left_filename) right_path = os.path.join(SAVE_PATH_RIGHT, right_filename) os.makedirs(SAVE_PATH_LEFT, exist_ok=True) os.makedirs(SAVE_PATH_RIGHT, exist_ok=True) os.makedirs(SAVE_PATH_LEFT_MARKED, exist_ok=True) os.makedirs(SAVE_PATH_RIGHT_MARKED, exist_ok=True) cv2.imwrite(left_path, img_left) cv2.imwrite(right_path, img_right) logger.info(f"Saved original images: {left_path}, {right_path}") # Debug: 直接原图覆盖 left_marked_path = os.path.join(SAVE_PATH_LEFT_MARKED, left_marked_filename) cv2.imwrite(left_marked_path, img_left) logger.info(f"Saved marked left image: {left_marked_path}") right_marked_path = os.path.join(SAVE_PATH_RIGHT_MARKED, right_marked_filename) cv2.imwrite(right_marked_path, img_right) logger.info(f"Saved marked right image: {right_marked_path}") conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute(''' INSERT INTO images (left_filename, right_filename, left_marked_filename, right_marked_filename, timestamp, metadata, comment) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (left_filename, right_filename, left_marked_filename if left_marked_path else None, right_marked_filename if right_marked_path else None, float(timestamp_str), json.dumps(metadata), comment)) conn.commit() image_id = cursor.lastrowid conn.close() logger.info(f"Recorded image pair (ID: {image_id}) in database.") _, left_encoded = cv2.imencode('.jpg', img_left) _, right_encoded = cv2.imencode('.jpg', img_right) left_b64 = base64.b64encode(left_encoded).decode('utf-8') right_b64 = base64.b64encode(right_encoded).decode('utf-8') with frame_lock: global latest_left_frame, latest_right_frame, latest_timestamp latest_left_frame = img_left latest_right_frame = img_right latest_timestamp = timestamp_str socketio.emit('update_image', { 'left_image': left_b64, 'right_image': right_b64, 'timestamp': timestamp_str }) return jsonify({"message": "Images received, saved, and pushed via WebSocket", "timestamp": timestamp_str, "id": image_id}) except Exception as e: logger.error(f"Error processing upload: {e}") return jsonify({"error": str(e)}), 500 @app.route('/api/images/comment', methods=['PUT']) @auth.login_required # 保护 API 端点 def update_image_comment(): """API: 更新图片的 comment""" logger.info(f"User {auth.current_user()} requested to update image comment.") data = request.json image_id = data.get('id') comment = data.get('comment', '') if not image_id: return jsonify({"error": "Image ID is required"}), 400 conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute("UPDATE images SET comment = ? WHERE id = ?", (comment, image_id)) conn.commit() conn.close() return jsonify({"message": f"Comment for image {image_id} updated successfully"}) @app.route('/api/images/position', methods=['PUT']) @auth.login_required # 保护 API 端点 def update_image_position(): """API: 更新图片的位置编号""" logger.info(f"User {auth.current_user()} requested to update image position.") data = request.json image_id = data.get('id') left_position = data.get('left_position', 0) right_position = data.get('right_position', 0) if not image_id: return jsonify({"error": "Image ID is required"}), 400 logger.info(f"Updating position for image {image_id}: {left_position}, {right_position}") conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute("UPDATE images SET left_position = ?, right_position = ? WHERE id = ?", (left_position, right_position, image_id)) conn.commit() conn.close() return jsonify({"message": f"Position for image {image_id} updated successfully"}) @app.route('/status') # @auth.login_required # 通常状态检查接口不需要认证,保持开放 def status(): with frame_lock: has_frames = latest_left_frame is not None and latest_right_frame is not None timestamp = latest_timestamp if has_frames else "N/A" return jsonify({"has_frames": has_frames, "latest_timestamp": timestamp}) @app.route('/api/images/manual-detections', methods=['PUT']) @auth.login_required # 保护 API 端点 def update_manual_detections(): """API: 更新图片的人工标注检测框结果,支持左右图像分别标注""" # logger.info(f"User {auth.current_user()} requested to update manual detections.") data = request.json image_id = data.get('id') side = data.get('side', 'left') detections = data.get('detections') if not image_id or detections is None: return jsonify({"error": "Image ID and detections are required"}), 400 if not isinstance(detections, list): return jsonify({"error": "Detections must be a list"}), 400 for detection in detections: if not isinstance(detection, dict): return jsonify({"error": "Each detection must be a dictionary"}), 400 required_keys = ['id', 'label', 'bbox'] for key in required_keys: if key not in detection: return jsonify({"error": f"Missing required key '{key}' in detection"}), 400 if not isinstance(detection['id'], int) or detection['id'] not in [1, 2, 3, 4]: return jsonify({"error": f"Invalid ID in detection: {detection['id']}"}), 400 valid_labels = ['caisson', 'soldier', 'gun', 'number'] if detection['label'] not in valid_labels: return jsonify({"error": f"Invalid label in detection: {detection['label']}"}), 400 bbox = detection['bbox'] if not isinstance(bbox, list) or len(bbox) != 4: return jsonify({"error": f"Invalid bbox format in detection"}), 400 for coord in bbox: if not isinstance(coord, int) or not (0 <= coord <= 999): return jsonify({"error": f"Invalid bbox coordinate: {coord}"}), 400 conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute("SELECT id FROM images WHERE id = ?", (image_id,)) if not cursor.fetchone(): conn.close() return jsonify({"error": "Image not found"}), 404 if side == 'left': cursor.execute(""" UPDATE images SET manual_detections_left = ?, is_manual_labeled_left = 1 WHERE id = ? """, (json.dumps(detections), image_id)) else: # side == 'right' cursor.execute(""" UPDATE images SET manual_detections_right = ?, is_manual_labeled_right = 1 WHERE id = ? """, (json.dumps(detections), image_id)) conn.commit() conn.close() try: regenerate_marked_images(image_id, detections, side) return jsonify({"message": f"Manual detections for image {image_id} ({side}) updated successfully and marked images regenerated"}) except Exception as e: return jsonify({"message": f"Manual detections for image {image_id} ({side}) updated successfully but failed to regenerate marked images: {str(e)}"}), 500 def regenerate_marked_images(image_id, detections, side): """重新生成标注图片,支持左右图像分别处理""" conn = sqlite3.connect(DATABASE_PATH) cursor = conn.cursor() cursor.execute(""" SELECT left_filename, right_filename, left_marked_filename, right_marked_filename, left_position, right_position FROM images WHERE id = ? """, (image_id,)) row = cursor.fetchone() conn.close() if not row: raise Exception("Image not found") left_filename, right_filename, left_marked_filename, right_marked_filename, left_position, right_position = row if side == 'left' and left_marked_filename: left_path = os.path.join(SAVE_PATH_LEFT, left_filename) left_marked_path = os.path.join(SAVE_PATH_LEFT_MARKED, left_marked_filename) if os.path.exists(left_path): img_left = cv2.imread(left_path) if img_left is not None: marked_left_img = draw_detections_on_image(img_left, detections, left_position=left_position, right_position=None) cv2.imwrite(left_marked_path, marked_left_img) elif side == 'right' and right_marked_filename: right_path = os.path.join(SAVE_PATH_RIGHT, right_filename) right_marked_path = os.path.join(SAVE_PATH_RIGHT_MARKED, right_marked_filename) if os.path.exists(right_path): img_right = cv2.imread(right_path) if img_right is not None: marked_right_img = draw_detections_on_image(img_right, detections, left_position=None, right_position=right_position) cv2.imwrite(right_marked_path, marked_right_img) @app.route('/manual-annotation') @auth.login_required # 需要认证才能访问标注页 def manual_annotation(): """标注页面""" logger.info(f"User {auth.current_user()} accessed the manual annotation page.") return render_template('manual_annotation.html') @app.route('/view') def simple_view(): """查看图片列表页面""" return render_template('view.html') # --- SocketIO 事件处理程序 --- @socketio.event def connect(): """客户端连接事件""" logger.info("Client connected") @socketio.event def disconnect(): """客户端断开连接事件""" logger.info("Client disconnected") @socketio.event def capture_button(data): """ SocketIO 事件处理器,当客户端发送 'button_pressed' 事件时触发 """ try: image_client.request_sync() logger.info("Request sent to server.") except Exception as e: logger.error(f"Error sending request: {e}") if __name__ == '__main__': logger.info(f"Starting Flask-SocketIO server on {FLASK_HOST}:{FLASK_PORT}") os.environ['BASIC_AUTH_USERNAME'] = USERNAME os.environ['BASIC_AUTH_PASSWORD'] = PASSWORD socketio.run(app, host=FLASK_HOST, port=FLASK_PORT, debug=False, allow_unsafe_werkzeug=True)