Files
project_infer/ocr_server/ocr_infer_server.py

122 lines
3.8 KiB
Python
Raw Normal View History

2024-06-07 20:19:04 +08:00
import toml
from loguru import logger
import zmq
import cv2
import numpy as np
import requests
import base64
def get_access_token():
client_id = "MDCGplPqK0kteOgbXwt5cyn0"
client_secret = "yIHJQUUiMkkw53nlQqHpiLvRFsLGcqgn"
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {
'grant_type': 'client_credentials',
'client_id': client_id,
'client_secret': client_secret
}
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.post(url, params=params, headers=headers)
response_json = response.json()
if 'access_token' in response_json:
return response_json['access_token']
else:
print("Failed to get access_token:", response_json.get('error_description'))
return None
def ocr_api_request(image_base64):
# url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic" # 高精度
2024-07-04 17:49:41 +08:00
url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate" # 高精度带位置
# url = "https://aip.baidubce.com/rest/2.0/ocr/v1/general" # 标准精度带位置
headers = {
'Content-Type': 'application/json'
}
params = {
'access_token': get_access_token(),
'image': image_base64,
'probability': 'true'
}
try:
response = requests.post(url, headers=headers, data=params, timeout=5)
try:
return response.json()
except requests.exceptions.JSONDecodeError:
return None
except requests.exceptions.Timeout:
return None
except requests.exceptions.RequestException as e:
return None
2024-06-07 20:19:04 +08:00
if __name__ == "__main__":
2024-07-04 17:49:41 +08:00
cfg = toml.load('/home/evan/Workplace/project_infer/cfg_infer_server.toml')
2024-06-07 20:19:04 +08:00
# 配置日志输出
logger.add(cfg['debug']['logger_filename'], format=cfg['debug']['logger_format'], retention = 5, level="INFO")
2024-07-04 17:49:41 +08:00
# context1 = zmq.Context()
# camera_socket = context1.socket(zmq.REQ)
# camera_socket.connect(f"tcp://localhost:{cfg['camera']['camera2_port']}")
# logger.info("connect camera success")
2024-06-07 20:19:04 +08:00
2024-07-04 17:49:41 +08:00
cap = cv2.VideoCapture(20)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc('M','J','P','G'))
cap.set(cv2.CAP_PROP_FPS, 20)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540)
2024-06-07 20:19:04 +08:00
# 初始化 server
context = zmq.Context()
# 启动 server
socket = context.socket(zmq.REP)
socket.bind(f"tcp://*:{cfg['server']['ocr_infer_port']}")
2024-07-04 17:49:41 +08:00
import signal
import sys
def signal_handler(signum, frame):
logger.info("Received signal, exiting...")
socket.close()
context.term()
sys.exit(0)
# 注册信号处理函数
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
2024-06-07 20:19:04 +08:00
while True:
message1 = socket.recv_string()
logger.info("recv client request")
2024-07-04 17:49:41 +08:00
ret, frame = cap.read()
if ret:
frame = frame[:,0:480]
frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
output_file_path = 'rotate.jpg'
success = cv2.imwrite(output_file_path, frame)
_, frame = cv2.imencode('.jpg', frame)
encoded_image = base64.b64encode(frame).decode('utf-8')
result = ocr_api_request(encoded_image)
print(result)
if result != None:
socket.send_pyobj({'code': 0, 'content': result.get('words_result')})
else:
socket.send_pyobj({'code': -1, 'content': " ocr 没找到文字"})
else:
2024-07-04 17:49:41 +08:00
socket.send_pyobj({'code': -1, 'content': "ocr 摄像头读取出错"})
2024-06-07 20:19:04 +08:00
if cv2.waitKey(1) == 27:
break
logger.info("ocr infer server exit")