Files
m20_core_web/llm_req.py

610 lines
22 KiB
Python
Raw Permalink Normal View History

2025-10-26 14:25:30 +08:00
import base64
import json
import cv2
import numpy as np
from typing import List, Dict, Any, Optional, Callable
from openai import OpenAI
import asyncio
from concurrent.futures import ThreadPoolExecutor
import threading
from dataclasses import dataclass
from enum import Enum
import queue
import time
# Configuration
API_KEY = "sk-e3a0287ece6a41bb9b79b2c285f10197"
BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
MODEL_NAME = "qwen-vl-plus"
# Category mapping
CATEGORY_MAPPING = {
1: "caisson",
2: "soldier",
3: "gun",
4: "number"
}
CATEGORY_COLORS = {
1: (0, 255, 0), # Green for caisson
2: (0, 255, 255), # Yellow for soldier
3: (0, 0, 255), # Red for gun
4: (255, 0, 0) # Blue for number
}
@dataclass
class DetectionResult:
"""Detection result data class"""
image_id: str
original_image: np.ndarray
detections: List[Dict[str, Any]]
marked_image: Optional[np.ndarray] = None
success: bool = False
error_message: Optional[str] = None
timestamp: float = 0.0
class TaskStatus(Enum):
"""Task status enumeration"""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
"""Task data class for queue"""
task_id: str
image_id: str
image: np.ndarray # OpenCV Mat format
prompt: str
callback: Optional[Callable[[DetectionResult], None]] = None
timestamp: float = 0.0
class VisionAPIClient:
"""Vision API Client for asynchronous processing with OpenCV Mat input/output"""
def __init__(self, api_key: str = API_KEY, base_url: str = BASE_URL,
model_name: str = MODEL_NAME, max_workers: int = 4):
self.api_key = api_key
self.base_url = base_url
self.model_name = model_name
self.max_workers = max_workers
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.client = OpenAI(api_key=api_key, base_url=base_url)
# Task management
self.task_queue = queue.Queue()
self.processing_tasks = {}
self.results_cache = {}
self._running = False
self._worker_thread = None
self.default_prompt = """Please perform object detection on the image, identifying and localizing the following four types of targets:
- Category 1: Green ammunition box (caisson)
- Category 2: Dummy soldier wearing digital camouflage uniform (soldier)
- Category 3: Gun (gun)
- Category 4: Round blue number plate (number)
Please follow these requirements for the output:
1. The output must be in valid JSON format.
2. The JSON structure should contain a list named "detections".
3. Each element in the list represents a detected target, containing the following fields:
- "id" (integer): Target category ID (1, 2, 3, 4).
- "label" (string): Target category name ("caisson", "soldier", "gun", "number").
- "bbox" (list of int): Bounding box coordinates in format [x_min, y_min, x_max, y_max], where (x_min, y_min) is the top-left coordinate and (x_max, y_max) is the bottom-right coordinate. Coordinate values are integers normalized to the 0-999 range (0,0 represents top-left, 999,999 represents bottom-right).
4. If no targets are detected in the image, "detections" should be an empty list [].
5. Please output only JSON, no other explanatory text.
JSON output example (when targets are detected):
{
"detections": [
{
"id": 1,
"label": "caisson",
"bbox": [x1, y1, x2, y2] // x1, y1, x2, y2 are integers in the 0-999 range
},
{
"id": 2,
"label": "soldier",
"bbox": [x3, y3, x4, y4] // x3, y3, x4, y4 are integers in the 0-999 range
}
]
}
JSON output example (when no targets are detected):
{
"detections": []
}"""
def encode_cv_image(self, image: np.ndarray) -> str:
"""
Encodes an OpenCV image (Mat) to base64 string.
Args:
image: OpenCV image (Mat) format
Returns:
Base64 encoded string of the image
"""
# Encode the image to JPEG format
_, buffer = cv2.imencode('.jpg', image)
return base64.b64encode(buffer).decode('utf-8')
def validate_and_extract_json(self, response_text: str) -> Optional[Dict[str, Any]]:
"""
Validates and extracts JSON from API response text.
Args:
response_text: Raw response text from API
Returns:
Parsed JSON dictionary if valid, None otherwise
"""
# Try to find JSON within the response text (in case of additional text)
start_idx = response_text.find('{')
end_idx = response_text.rfind('}')
if start_idx == -1 or end_idx == -1 or start_idx > end_idx:
print("No valid JSON structure found in response.")
return None
json_str = response_text[start_idx:end_idx+1]
try:
parsed_json = json.loads(json_str)
return parsed_json
except json.JSONDecodeError as e:
print(f"JSON parsing failed: {e}")
print(f"Problematic JSON string: {json_str[:200]}...") # Show first 200 chars
return None
def validate_detections_format(self, data: Dict[str, Any]) -> bool:
"""
Validates the structure and content of the detections JSON.
Args:
data: Parsed JSON data
Returns:
True if format is valid, False otherwise
"""
if not isinstance(data, dict) or "detections" not in data:
print("Missing 'detections' key in response.")
return False
detections = data["detections"]
if not isinstance(detections, list):
print("'detections' is not a list.")
return False
for i, detection in enumerate(detections):
if not isinstance(detection, dict):
print(f"Detection item {i} is not a dictionary.")
return False
required_keys = ["id", "label", "bbox"]
for key in required_keys:
if key not in detection:
print(f"Missing required key '{key}' in detection {i}.")
return False
# Validate ID
if not isinstance(detection["id"], int) or detection["id"] not in [1, 2, 3, 4]:
print(f"Invalid ID in detection {i}: {detection['id']}")
return False
# Validate label
if not isinstance(detection["label"], str) or detection["label"] not in CATEGORY_MAPPING.values():
print(f"Invalid label in detection {i}: {detection['label']}")
return False
# Validate bbox
bbox = detection["bbox"]
if not isinstance(bbox, list) or len(bbox) != 4:
print(f"Invalid bbox format in detection {i}: {bbox}")
return False
for coord in bbox:
if not isinstance(coord, (int, float)) or not (0 <= coord <= 999):
print(f"Invalid bbox coordinate in detection {i}: {coord}")
return False
# Validate confidence if present
if "confidence" in detection:
conf = detection["confidence"]
if not isinstance(conf, (int, float)) or not (0.0 <= conf <= 1.0):
print(f"Invalid confidence in detection {i}: {conf}")
return False
return True
def call_vision_api_sync(self, image: np.ndarray, prompt: str) -> Optional[Dict[str, Any]]:
"""
Synchronous call to the vision API with OpenCV Mat input.
Args:
image: OpenCV image (Mat) format
prompt: The prompt to send to the API
Returns:
Parsed JSON response if successful, None otherwise
"""
# Resize image to 1000x1000 if needed
h, w = image.shape[:2]
if h != 1000 or w != 1000:
image = cv2.resize(image, (1000, 1000))
# Encode the image directly to base64
image_base64 = self.encode_cv_image(image)
image_url = f"data:image/jpeg;base64,{image_base64}"
try:
# Create the completion request
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
'role': 'user',
'content': [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": image_url}}
]
}
],
stream=False # Set to False for single response instead of streaming
)
# Extract content from response
if response.choices and len(response.choices) > 0:
content = response.choices[0].message.content
else:
print("No choices returned from API.")
return None
if not content:
print("No content returned from API.")
return None
# Validate and parse JSON from response
parsed_data = self.validate_and_extract_json(content)
if parsed_data is None:
return None
# Validate the structure of the detections
if not self.validate_detections_format(parsed_data):
print("Invalid detections format in response.")
return None
return parsed_data
except Exception as e:
print(f"API request failed: {e}")
return None
def draw_detections_on_image(self, image: np.ndarray, detections: List[Dict[str, Any]]) -> np.ndarray:
"""
Draws bounding boxes and labels on the OpenCV Mat image (without confidence).
Args:
image: OpenCV image (Mat) format
detections: List of detection dictionaries
Returns:
Image with drawn detections as numpy array
"""
# Work on a copy to avoid modifying the original
result_image = image.copy()
# Get image dimensions
img_h, img_w = result_image.shape[:2]
for detection in detections:
# Get bounding box coordinates (normalized to 0-999 range)
bbox = detection["bbox"]
x1_norm, y1_norm, x2_norm, y2_norm = bbox
# Convert normalized coordinates to pixel coordinates
x1 = int((x1_norm / 999) * img_w)
y1 = int((y1_norm / 999) * img_h)
x2 = int((x2_norm / 999) * img_w)
y2 = int((y2_norm / 999) * img_h)
# Ensure coordinates are within image bounds
x1 = max(0, min(x1, img_w - 1))
y1 = max(0, min(y1, img_h - 1))
x2 = max(0, min(x2, img_w - 1))
y2 = max(0, min(y2, img_h - 1))
# Get color and label
category_id = detection["id"]
label = detection["label"]
color = CATEGORY_COLORS.get(category_id, (255, 255, 255)) # Default white if not found
# Draw bounding box
cv2.rectangle(result_image, (x1, y1), (x2, y2), color, 2)
# Prepare label text (no confidence)
label_text = label
# Calculate text size and position
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.6
thickness = 2
(text_width, text_height), baseline = cv2.getTextSize(label_text, font, font_scale, thickness)
# Draw label background
cv2.rectangle(result_image, (x1, y1 - text_height - 10), (x1 + text_width, y1), color, -1)
# Draw label text
cv2.putText(result_image, label_text, (x1, y1 - 5), font, font_scale, (0, 0, 0), thickness)
return result_image
def process_single_image(self, image_id: str, image: np.ndarray, prompt: str = None) -> DetectionResult:
"""
Process a single OpenCV Mat image synchronously.
Args:
image_id: Unique identifier for the image
image: OpenCV image (Mat) format
prompt: The prompt for the vision API (optional)
Returns:
DetectionResult containing the results
"""
start_time = time.time()
# Validate input image
if image is None or image.size == 0:
error_msg = f"Invalid image for image_id: {image_id}"
print(error_msg)
return DetectionResult(
image_id=image_id,
original_image=image,
detections=[],
success=False,
error_message=error_msg,
timestamp=start_time
)
# Use provided prompt or default
use_prompt = prompt if prompt is not None else self.default_prompt
# Call the vision API
print(f"Calling vision API for image {image_id}...")
result = self.call_vision_api_sync(image, use_prompt)
if result is None:
error_msg = "Failed to get valid response from API."
print(error_msg)
return DetectionResult(
image_id=image_id,
original_image=image,
detections=[],
success=False,
error_message=error_msg,
timestamp=start_time
)
# Extract detections
detections = result.get("detections", [])
print(f"Found {len(detections)} detections for image {image_id}.")
# Draw detections on image
try:
marked_image = self.draw_detections_on_image(image, detections)
except Exception as e:
error_msg = f"Error drawing detections on image: {e}"
print(error_msg)
return DetectionResult(
image_id=image_id,
original_image=image,
detections=detections,
success=False,
error_message=error_msg,
timestamp=start_time
)
# Return successful result
return DetectionResult(
image_id=image_id,
original_image=image,
detections=detections,
marked_image=marked_image,
success=True,
timestamp=start_time
)
def start_worker(self):
"""Start the background worker thread for processing tasks"""
if self._running:
return
self._running = True
self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True)
self._worker_thread.start()
print("Vision API worker started.")
def stop_worker(self):
"""Stop the background worker thread"""
self._running = False
if self._worker_thread:
self._worker_thread.join(timeout=5.0) # Wait up to 5 seconds
print("Vision API worker stopped.")
def _worker_loop(self):
"""Background worker loop for processing tasks from queue"""
while self._running:
try:
# Get task from queue with timeout
task = self.task_queue.get(timeout=1.0)
# Mark as processing
self.processing_tasks[task.task_id] = TaskStatus.PROCESSING
# Process the task
result = self.process_single_image(task.image_id, task.image, task.prompt)
# Store result
self.results_cache[task.task_id] = result
# Update task status
self.processing_tasks[task.task_id] = TaskStatus.COMPLETED if result.success else TaskStatus.FAILED
# Call callback if provided
if task.callback:
try:
task.callback(result)
except Exception as e:
print(f"Callback execution failed for task {task.task_id}: {e}")
# Mark task as done
self.task_queue.task_done()
except queue.Empty:
continue # Timeout, continue loop
except Exception as e:
print(f"Worker error: {e}")
continue
def submit_task(self, image_id: int, image: np.ndarray, prompt: str = None,
callback: Callable[[DetectionResult], None] = None) -> str:
"""
Submit a task to the processing queue with OpenCV Mat input.
Args:
image_id: Unique identifier for the image
image: OpenCV image (Mat) format
prompt: The prompt for the vision API (optional)
callback: Callback function to be called when processing is complete (optional)
Returns:
Task ID for tracking the task
"""
task_id = f"task_{int(time.time() * 1000000)}_{image_id}" # Generate unique task ID
task = Task(
task_id=task_id,
image_id=image_id,
image=image,
prompt=prompt if prompt is not None else self.default_prompt,
callback=callback,
timestamp=time.time()
)
self.task_queue.put(task)
self.processing_tasks[task_id] = TaskStatus.PENDING
return task_id
def get_result(self, task_id: str) -> Optional[DetectionResult]:
"""
Get the result for a specific task.
Args:
task_id: The task ID to retrieve result for
Returns:
DetectionResult if available, None otherwise
"""
return self.results_cache.get(task_id)
def get_task_status(self, task_id: str) -> Optional[TaskStatus]:
"""
Get the status of a specific task.
Args:
task_id: The task ID to check status for
Returns:
TaskStatus if task exists, None otherwise
"""
return self.processing_tasks.get(task_id)
def get_queue_size(self) -> int:
"""Get the current size of the task queue"""
return self.task_queue.qsize()
def get_processing_count(self) -> int:
"""Get the number of currently processing tasks"""
return sum(1 for status in self.processing_tasks.values()
if status == TaskStatus.PROCESSING)
def get_completed_count(self) -> int:
"""Get the number of completed tasks"""
return sum(1 for status in self.processing_tasks.values()
if status in [TaskStatus.COMPLETED, TaskStatus.FAILED])
def clear_results(self):
"""Clear the results cache"""
self.results_cache.clear()
def __enter__(self):
"""Context manager entry"""
self.start_worker()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.stop_worker()
# Example usage
def example_callback(result: DetectionResult):
"""Example callback function"""
if result.success:
print(f"Callback: Processing completed for image {result.image_id}, found {len(result.detections)} detections")
# The result.marked_image is the OpenCV Mat with detections drawn
marked_image = result.marked_image
# You can now use the marked_image for further processing
else:
print(f"Callback: Processing failed for image {result.image_id}: {result.error_message}")
def main():
original_image = cv2.imread("/home/evan/Desktop/received/left/left_1761388243_7673044.jpg") # Replace with your image source
if original_image is None:
print("Could not load image")
return
# Example usage with context manager
with VisionAPIClient() as client:
# Submit a task with OpenCV Mat
task_id = client.submit_task(
image_id=1,
image=original_image,
callback=example_callback
)
print(f"Submitted task {task_id}")
# Wait for the task to complete
print("Waiting for task to complete...")
client.task_queue.join() # Wait for all tasks in queue to be processed
# Get the result
result = client.get_result(task_id)
if result:
if result.success:
print(f"Task completed successfully! Found {len(result.detections)} detections.")
# result.marked_image is the OpenCV Mat with detections drawn
marked_image = result.marked_image
# Display the result (optional)
cv2.imshow("Original Image", original_image)
cv2.imshow("Marked Image", marked_image)
print("Press any key to close windows...")
cv2.waitKey(0)
cv2.destroyAllWindows()
# Or save the result
# cv2.imwrite("marked_image.jpg", marked_image)
else:
print(f"Task failed: {result.error_message}")
else:
print("No result found")
if __name__ == "__main__":
main()