Files
project_main/app.py
bmy f3bb720bed feat: 增加base64传入动作指令
feat: 换用 deepseek 大模型
pref: 修改应急避险2停车条件
2024-08-04 10:04:27 +08:00

214 lines
7.7 KiB
Python

from flask import Flask, render_template, request
from flask_socketio import SocketIO
import toml
from loguru import logger
import logging
from multiprocessing import Process, Queue
import threading
import multiprocessing
import os
import time
import subprocess
import signal
import base64
import json
from main_upper import main_func
server_command = [
{"path": "/home/evan/Workplace/project_capture/build/", "script": "./capture"},
{"path": "/home/evan/Workplace/project_infer/lane_server/", "script": "lane_infer_server.py"},
{"path": "/home/evan/Workplace/project_infer/yolo_server/", "script": "yolo_infer_server.py"},
{"path": "/home/evan/Workplace/project_infer/ocr_server/", "script": "ocr_infer_server.py"},
]
processes = []
time_record = None
task_run_flag = False
# 日志队列
queue = Queue()
# 跳过任务 干预任务调度
skip_task_queue = Queue()
app = Flask(__name__)
app.jinja_env.variable_start_string = '[('
app.jinja_env.variable_end_string = ')]'
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, allow_unsafe_werkzeug=True)
server_process = None
task_process = None
class WebSocketHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
socketio.emit('log', {'level': record.levelname.lower(), 'content': log_entry})
# 设置日志
logger.remove()
handler = WebSocketHandler()
logger.add(handler, format="{time:MM-DD HH:mm:ss} {message}", level="DEBUG")
fileOptions_path = '/home/evan/Workplace/project_main'
fileOptions_list = ['cfg_args.toml','cfg_main.toml', 'cfg_subtask.toml']
cfg_args_path = os.path.join(fileOptions_path, 'cfg_args.toml')
cfg_move_area_path = os.path.join(fileOptions_path, 'cfg_move_area.json')
@app.route('/')
def index():
return render_template('index.html')
@app.route('/run')
def run():
mode_index = request.args.get('mode')
config_args = toml.load(cfg_args_path)
config_args['lane_mode']['mode_index'] = int(mode_index)
with open(cfg_args_path, 'w') as config_file:
toml.dump(config_args, config_file)
try:
action_base64 = request.args.get('action')
decoded_bytes = base64.b64decode(action_base64)
decoded_str = decoded_bytes.decode('utf-8')
json_data = json.loads(decoded_str)
with open(cfg_move_area_path, 'w') as json_file:
json.dump(json_data, json_file)
except:
# 当该字段没有传入参数时 清空配置文件 该任务按照正常流程去做
with open(cfg_move_area_path, 'w') as json_file:
pass
return render_template('index2.html')
# @app.route('/csdn')
# def csdn():
# return render_template('csdn.html')
@socketio.on('operate')
def operate_handle(data):
global server_process
global task_process
global processes
global time_record
global task_run_flag
if data['type'] == 'save_config':
f = open(os.path.join(fileOptions_path,data['file_name']), 'w')
ret = toml.dump(data['content'], f)
logger.info(f"保存成功 {data['file_name']}")
f.close()
elif data['type'] == 'operate_server':
logger.info(data)
if data['content'] == 'run':
log_file = "server_processes.log"
log = open(log_file, "w")
time.sleep(2)
# 启动所有脚本
for i, env_info in enumerate(server_command):
env_path = env_info["path"]
script = env_info["script"]
env = os.environ.copy()
if i == 0:
process = subprocess.Popen([script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT)
processes.append(process)
time.sleep(2)
process = subprocess.Popen(['python', script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT)
processes.append(process)
logger.info("开启 server")
elif data['content'] == 'stop':
for process in processes:
logger.error(process.pid)
os.kill(process.pid, signal.SIGINT)
logger.info("关闭 server")
elif data['content'] == 'restart':
logger.info("重启 server")
elif data['type'] == 'operate_task':
# 任务函数
if data['content'] == 'run':
task_run_flag = True
if task_process != None:
task_process.terminate()
time_record = time.perf_counter()
task_process = Process(target=main_func, args=(queue,skip_task_queue))
task_process.start()
logger.info("开启 task")
elif data['content'] == 'stop':
task_run_flag = False
task_process.terminate()
logger.info(f"任务结束 用时{time.perf_counter() - time_record}s")
logger.info("关闭 task")
elif data['content'] == 'restart':
if task_process != None:
task_process.terminate()
task_process = Process(target=main_func, args=(queue,skip_task_queue))
task_process.start()
elif data['type'] == 'show_server_log':
content = ''
try:
with open("server_processes.log", 'r') as file:
content = file.read()
except:
pass
socketio.emit('server_log', {'type': 'server_log', 'content': content})
elif data['type'] == 'skip_task':
logger.info(data)
skip_task_queue.put(1)
# elif data['type'] == 'save_target_person':
# config_path = os.path.join(fileOptions_path, 'cfg_args.toml')
# config_args = toml.load(config_path)
# config_args['lane_mode']['mode_index'] = int(data['content'])
# with open(config_path, 'w') as config_file:
# toml.dump(config_args, config_file)
@socketio.on('connect')
def test_connect():
logger.info('Client connected')
socketio.emit('config_data', {'type': 'fileOptions', 'content': fileOptions_list})
config_data = {}
for item in fileOptions_list:
config_data[item] = toml.load(os.path.join(fileOptions_path,item))
socketio.emit('config_data', {'type': 'config_data', 'content': config_data})
socketio.emit('task_status', {'type': 'task_status', 'content': int(task_run_flag)})
def thread_function():
global queue
while True:
try:
log = queue.get()
socketio.emit('log', log)
except multiprocessing.Queue.Empty:
pass
if __name__ == '__main__':
config_path = os.path.join(fileOptions_path, 'cfg_args.toml')
config_args = toml.load(config_path)
config_args['lane_mode']['mode_index'] = 1
with open(config_path, 'w') as config_file:
toml.dump(config_args, config_file)
log_file = "server_processes.log"
log = open(log_file, "w")
time.sleep(2)
# 启动所有脚本
for i, env_info in enumerate(server_command):
env_path = env_info["path"]
script = env_info["script"]
env = os.environ.copy()
if i == 0:
process = subprocess.Popen([script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT)
processes.append(process)
time.sleep(2)
process = subprocess.Popen(['python', script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT)
processes.append(process)
thread1 = threading.Thread(target=thread_function, daemon = True)
thread1.start()
socketio.run(app, host='0.0.0.0', port=5001, allow_unsafe_werkzeug=True)
if server_process != None:
server_process.terminate()
if task_process != None:
task_process.terminate()
for process in processes:
logger.error(process.pid)
os.kill(process.pid, signal.SIGINT)