import base64 import json import cv2 import numpy as np from typing import List, Dict, Any, Optional, Callable from openai import OpenAI import asyncio from concurrent.futures import ThreadPoolExecutor import threading from dataclasses import dataclass from enum import Enum import queue import time # Configuration API_KEY = "sk-e3a0287ece6a41bb9b79b2c285f10197" BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" MODEL_NAME = "qwen-vl-plus" # Category mapping CATEGORY_MAPPING = { 1: "caisson", 2: "soldier", 3: "gun", 4: "number" } CATEGORY_COLORS = { 1: (0, 255, 0), # Green for caisson 2: (0, 255, 255), # Yellow for soldier 3: (0, 0, 255), # Red for gun 4: (255, 0, 0) # Blue for number } @dataclass class DetectionResult: """Detection result data class""" image_id: str original_image: np.ndarray detections: List[Dict[str, Any]] marked_image: Optional[np.ndarray] = None success: bool = False error_message: Optional[str] = None timestamp: float = 0.0 class TaskStatus(Enum): """Task status enumeration""" PENDING = "pending" PROCESSING = "processing" COMPLETED = "completed" FAILED = "failed" @dataclass class Task: """Task data class for queue""" task_id: str image_id: str image: np.ndarray # OpenCV Mat format prompt: str callback: Optional[Callable[[DetectionResult], None]] = None timestamp: float = 0.0 class VisionAPIClient: """Vision API Client for asynchronous processing with OpenCV Mat input/output""" def __init__(self, api_key: str = API_KEY, base_url: str = BASE_URL, model_name: str = MODEL_NAME, max_workers: int = 4): self.api_key = api_key self.base_url = base_url self.model_name = model_name self.max_workers = max_workers self.executor = ThreadPoolExecutor(max_workers=max_workers) self.client = OpenAI(api_key=api_key, base_url=base_url) # Task management self.task_queue = queue.Queue() self.processing_tasks = {} self.results_cache = {} self._running = False self._worker_thread = None self.default_prompt = """Please perform object detection on the image, identifying and localizing the following four types of targets: - Category 1: Green ammunition box (caisson) - Category 2: Dummy soldier wearing digital camouflage uniform (soldier) - Category 3: Gun (gun) - Category 4: Round blue number plate (number) Please follow these requirements for the output: 1. The output must be in valid JSON format. 2. The JSON structure should contain a list named "detections". 3. Each element in the list represents a detected target, containing the following fields: - "id" (integer): Target category ID (1, 2, 3, 4). - "label" (string): Target category name ("caisson", "soldier", "gun", "number"). - "bbox" (list of int): Bounding box coordinates in format [x_min, y_min, x_max, y_max], where (x_min, y_min) is the top-left coordinate and (x_max, y_max) is the bottom-right coordinate. Coordinate values are integers normalized to the 0-999 range (0,0 represents top-left, 999,999 represents bottom-right). 4. If no targets are detected in the image, "detections" should be an empty list []. 5. Please output only JSON, no other explanatory text. JSON output example (when targets are detected): { "detections": [ { "id": 1, "label": "caisson", "bbox": [x1, y1, x2, y2] // x1, y1, x2, y2 are integers in the 0-999 range }, { "id": 2, "label": "soldier", "bbox": [x3, y3, x4, y4] // x3, y3, x4, y4 are integers in the 0-999 range } ] } JSON output example (when no targets are detected): { "detections": [] }""" def encode_cv_image(self, image: np.ndarray) -> str: """ Encodes an OpenCV image (Mat) to base64 string. Args: image: OpenCV image (Mat) format Returns: Base64 encoded string of the image """ # Encode the image to JPEG format _, buffer = cv2.imencode('.jpg', image) return base64.b64encode(buffer).decode('utf-8') def validate_and_extract_json(self, response_text: str) -> Optional[Dict[str, Any]]: """ Validates and extracts JSON from API response text. Args: response_text: Raw response text from API Returns: Parsed JSON dictionary if valid, None otherwise """ # Try to find JSON within the response text (in case of additional text) start_idx = response_text.find('{') end_idx = response_text.rfind('}') if start_idx == -1 or end_idx == -1 or start_idx > end_idx: print("No valid JSON structure found in response.") return None json_str = response_text[start_idx:end_idx+1] try: parsed_json = json.loads(json_str) return parsed_json except json.JSONDecodeError as e: print(f"JSON parsing failed: {e}") print(f"Problematic JSON string: {json_str[:200]}...") # Show first 200 chars return None def validate_detections_format(self, data: Dict[str, Any]) -> bool: """ Validates the structure and content of the detections JSON. Args: data: Parsed JSON data Returns: True if format is valid, False otherwise """ if not isinstance(data, dict) or "detections" not in data: print("Missing 'detections' key in response.") return False detections = data["detections"] if not isinstance(detections, list): print("'detections' is not a list.") return False for i, detection in enumerate(detections): if not isinstance(detection, dict): print(f"Detection item {i} is not a dictionary.") return False required_keys = ["id", "label", "bbox"] for key in required_keys: if key not in detection: print(f"Missing required key '{key}' in detection {i}.") return False # Validate ID if not isinstance(detection["id"], int) or detection["id"] not in [1, 2, 3, 4]: print(f"Invalid ID in detection {i}: {detection['id']}") return False # Validate label if not isinstance(detection["label"], str) or detection["label"] not in CATEGORY_MAPPING.values(): print(f"Invalid label in detection {i}: {detection['label']}") return False # Validate bbox bbox = detection["bbox"] if not isinstance(bbox, list) or len(bbox) != 4: print(f"Invalid bbox format in detection {i}: {bbox}") return False for coord in bbox: if not isinstance(coord, (int, float)) or not (0 <= coord <= 999): print(f"Invalid bbox coordinate in detection {i}: {coord}") return False # Validate confidence if present if "confidence" in detection: conf = detection["confidence"] if not isinstance(conf, (int, float)) or not (0.0 <= conf <= 1.0): print(f"Invalid confidence in detection {i}: {conf}") return False return True def call_vision_api_sync(self, image: np.ndarray, prompt: str) -> Optional[Dict[str, Any]]: """ Synchronous call to the vision API with OpenCV Mat input. Args: image: OpenCV image (Mat) format prompt: The prompt to send to the API Returns: Parsed JSON response if successful, None otherwise """ # Resize image to 1000x1000 if needed h, w = image.shape[:2] if h != 1000 or w != 1000: image = cv2.resize(image, (1000, 1000)) # Encode the image directly to base64 image_base64 = self.encode_cv_image(image) image_url = f"data:image/jpeg;base64,{image_base64}" try: # Create the completion request response = self.client.chat.completions.create( model=self.model_name, messages=[ { 'role': 'user', 'content': [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": image_url}} ] } ], stream=False # Set to False for single response instead of streaming ) # Extract content from response if response.choices and len(response.choices) > 0: content = response.choices[0].message.content else: print("No choices returned from API.") return None if not content: print("No content returned from API.") return None # Validate and parse JSON from response parsed_data = self.validate_and_extract_json(content) if parsed_data is None: return None # Validate the structure of the detections if not self.validate_detections_format(parsed_data): print("Invalid detections format in response.") return None return parsed_data except Exception as e: print(f"API request failed: {e}") return None def draw_detections_on_image(self, image: np.ndarray, detections: List[Dict[str, Any]]) -> np.ndarray: """ Draws bounding boxes and labels on the OpenCV Mat image (without confidence). Args: image: OpenCV image (Mat) format detections: List of detection dictionaries Returns: Image with drawn detections as numpy array """ # Work on a copy to avoid modifying the original result_image = image.copy() # Get image dimensions img_h, img_w = result_image.shape[:2] for detection in detections: # Get bounding box coordinates (normalized to 0-999 range) bbox = detection["bbox"] x1_norm, y1_norm, x2_norm, y2_norm = bbox # Convert normalized coordinates to pixel coordinates x1 = int((x1_norm / 999) * img_w) y1 = int((y1_norm / 999) * img_h) x2 = int((x2_norm / 999) * img_w) y2 = int((y2_norm / 999) * img_h) # Ensure coordinates are within image bounds x1 = max(0, min(x1, img_w - 1)) y1 = max(0, min(y1, img_h - 1)) x2 = max(0, min(x2, img_w - 1)) y2 = max(0, min(y2, img_h - 1)) # Get color and label category_id = detection["id"] label = detection["label"] color = CATEGORY_COLORS.get(category_id, (255, 255, 255)) # Default white if not found # Draw bounding box cv2.rectangle(result_image, (x1, y1), (x2, y2), color, 2) # Prepare label text (no confidence) label_text = label # Calculate text size and position font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.6 thickness = 2 (text_width, text_height), baseline = cv2.getTextSize(label_text, font, font_scale, thickness) # Draw label background cv2.rectangle(result_image, (x1, y1 - text_height - 10), (x1 + text_width, y1), color, -1) # Draw label text cv2.putText(result_image, label_text, (x1, y1 - 5), font, font_scale, (0, 0, 0), thickness) return result_image def process_single_image(self, image_id: str, image: np.ndarray, prompt: str = None) -> DetectionResult: """ Process a single OpenCV Mat image synchronously. Args: image_id: Unique identifier for the image image: OpenCV image (Mat) format prompt: The prompt for the vision API (optional) Returns: DetectionResult containing the results """ start_time = time.time() # Validate input image if image is None or image.size == 0: error_msg = f"Invalid image for image_id: {image_id}" print(error_msg) return DetectionResult( image_id=image_id, original_image=image, detections=[], success=False, error_message=error_msg, timestamp=start_time ) # Use provided prompt or default use_prompt = prompt if prompt is not None else self.default_prompt # Call the vision API print(f"Calling vision API for image {image_id}...") result = self.call_vision_api_sync(image, use_prompt) if result is None: error_msg = "Failed to get valid response from API." print(error_msg) return DetectionResult( image_id=image_id, original_image=image, detections=[], success=False, error_message=error_msg, timestamp=start_time ) # Extract detections detections = result.get("detections", []) print(f"Found {len(detections)} detections for image {image_id}.") # Draw detections on image try: marked_image = self.draw_detections_on_image(image, detections) except Exception as e: error_msg = f"Error drawing detections on image: {e}" print(error_msg) return DetectionResult( image_id=image_id, original_image=image, detections=detections, success=False, error_message=error_msg, timestamp=start_time ) # Return successful result return DetectionResult( image_id=image_id, original_image=image, detections=detections, marked_image=marked_image, success=True, timestamp=start_time ) def start_worker(self): """Start the background worker thread for processing tasks""" if self._running: return self._running = True self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True) self._worker_thread.start() print("Vision API worker started.") def stop_worker(self): """Stop the background worker thread""" self._running = False if self._worker_thread: self._worker_thread.join(timeout=5.0) # Wait up to 5 seconds print("Vision API worker stopped.") def _worker_loop(self): """Background worker loop for processing tasks from queue""" while self._running: try: # Get task from queue with timeout task = self.task_queue.get(timeout=1.0) # Mark as processing self.processing_tasks[task.task_id] = TaskStatus.PROCESSING # Process the task result = self.process_single_image(task.image_id, task.image, task.prompt) # Store result self.results_cache[task.task_id] = result # Update task status self.processing_tasks[task.task_id] = TaskStatus.COMPLETED if result.success else TaskStatus.FAILED # Call callback if provided if task.callback: try: task.callback(result) except Exception as e: print(f"Callback execution failed for task {task.task_id}: {e}") # Mark task as done self.task_queue.task_done() except queue.Empty: continue # Timeout, continue loop except Exception as e: print(f"Worker error: {e}") continue def submit_task(self, image_id: int, image: np.ndarray, prompt: str = None, callback: Callable[[DetectionResult], None] = None) -> str: """ Submit a task to the processing queue with OpenCV Mat input. Args: image_id: Unique identifier for the image image: OpenCV image (Mat) format prompt: The prompt for the vision API (optional) callback: Callback function to be called when processing is complete (optional) Returns: Task ID for tracking the task """ task_id = f"task_{int(time.time() * 1000000)}_{image_id}" # Generate unique task ID task = Task( task_id=task_id, image_id=image_id, image=image, prompt=prompt if prompt is not None else self.default_prompt, callback=callback, timestamp=time.time() ) self.task_queue.put(task) self.processing_tasks[task_id] = TaskStatus.PENDING return task_id def get_result(self, task_id: str) -> Optional[DetectionResult]: """ Get the result for a specific task. Args: task_id: The task ID to retrieve result for Returns: DetectionResult if available, None otherwise """ return self.results_cache.get(task_id) def get_task_status(self, task_id: str) -> Optional[TaskStatus]: """ Get the status of a specific task. Args: task_id: The task ID to check status for Returns: TaskStatus if task exists, None otherwise """ return self.processing_tasks.get(task_id) def get_queue_size(self) -> int: """Get the current size of the task queue""" return self.task_queue.qsize() def get_processing_count(self) -> int: """Get the number of currently processing tasks""" return sum(1 for status in self.processing_tasks.values() if status == TaskStatus.PROCESSING) def get_completed_count(self) -> int: """Get the number of completed tasks""" return sum(1 for status in self.processing_tasks.values() if status in [TaskStatus.COMPLETED, TaskStatus.FAILED]) def clear_results(self): """Clear the results cache""" self.results_cache.clear() def __enter__(self): """Context manager entry""" self.start_worker() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit""" self.stop_worker() # Example usage def example_callback(result: DetectionResult): """Example callback function""" if result.success: print(f"Callback: Processing completed for image {result.image_id}, found {len(result.detections)} detections") # The result.marked_image is the OpenCV Mat with detections drawn marked_image = result.marked_image # You can now use the marked_image for further processing else: print(f"Callback: Processing failed for image {result.image_id}: {result.error_message}") def main(): original_image = cv2.imread("/home/evan/Desktop/received/left/left_1761388243_7673044.jpg") # Replace with your image source if original_image is None: print("Could not load image") return # Example usage with context manager with VisionAPIClient() as client: # Submit a task with OpenCV Mat task_id = client.submit_task( image_id=1, image=original_image, callback=example_callback ) print(f"Submitted task {task_id}") # Wait for the task to complete print("Waiting for task to complete...") client.task_queue.join() # Wait for all tasks in queue to be processed # Get the result result = client.get_result(task_id) if result: if result.success: print(f"Task completed successfully! Found {len(result.detections)} detections.") # result.marked_image is the OpenCV Mat with detections drawn marked_image = result.marked_image # Display the result (optional) cv2.imshow("Original Image", original_image) cv2.imshow("Marked Image", marked_image) print("Press any key to close windows...") cv2.waitKey(0) cv2.destroyAllWindows() # Or save the result # cv2.imwrite("marked_image.jpg", marked_image) else: print(f"Task failed: {result.error_message}") else: print("No result found") if __name__ == "__main__": main()