from flask import Flask, render_template, request from flask_socketio import SocketIO import toml from loguru import logger import logging from multiprocessing import Process, Queue import threading import multiprocessing import os import time import subprocess import signal import base64 import json from main_upper import main_func server_command = [ {"path": "/home/evan/Workplace/project_capture/build/", "script": "./capture"}, {"path": "/home/evan/Workplace/project_infer/lane_server/", "script": "lane_infer_server.py"}, {"path": "/home/evan/Workplace/project_infer/yolo_server/", "script": "yolo_infer_server.py"}, {"path": "/home/evan/Workplace/project_infer/ocr_server/", "script": "ocr_infer_server.py"}, ] processes = [] time_record = None task_run_flag = False # 日志队列 queue = Queue() # 跳过任务 干预任务调度 skip_task_queue = Queue() app = Flask(__name__) app.jinja_env.variable_start_string = '[(' app.jinja_env.variable_end_string = ')]' app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, allow_unsafe_werkzeug=True) server_process = None task_process = None class WebSocketHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) socketio.emit('log', {'level': record.levelname.lower(), 'content': log_entry}) # 设置日志 logger.remove() handler = WebSocketHandler() logger.add(handler, format="{time:MM-DD HH:mm:ss} {message}", level="DEBUG") fileOptions_path = '/home/evan/Workplace/project_main' fileOptions_list = ['cfg_args.toml','cfg_main.toml', 'cfg_subtask.toml'] cfg_args_path = os.path.join(fileOptions_path, 'cfg_args.toml') cfg_move_area_path = os.path.join(fileOptions_path, 'cfg_move_area.json') @app.route('/') def index(): return render_template('index.html') @app.route('/run') def run(): mode_index = request.args.get('mode') config_args = toml.load(cfg_args_path) config_args['lane_mode']['mode_index'] = int(mode_index) with open(cfg_args_path, 'w') as config_file: toml.dump(config_args, config_file) try: action_base64 = request.args.get('action') decoded_bytes = base64.b64decode(action_base64) decoded_str = decoded_bytes.decode('utf-8') json_data = json.loads(decoded_str) with open(cfg_move_area_path, 'w') as json_file: json.dump(json_data, json_file) except: # 当该字段没有传入参数时 清空配置文件 该任务按照正常流程去做 with open(cfg_move_area_path, 'w') as json_file: pass return render_template('index2.html') # @app.route('/csdn') # def csdn(): # return render_template('csdn.html') @socketio.on('operate') def operate_handle(data): global server_process global task_process global processes global time_record global task_run_flag if data['type'] == 'save_config': f = open(os.path.join(fileOptions_path,data['file_name']), 'w') ret = toml.dump(data['content'], f) logger.info(f"保存成功 {data['file_name']}") f.close() elif data['type'] == 'operate_server': logger.info(data) if data['content'] == 'run': log_file = "server_processes.log" log = open(log_file, "w") time.sleep(2) # 启动所有脚本 for i, env_info in enumerate(server_command): env_path = env_info["path"] script = env_info["script"] env = os.environ.copy() if i == 0: process = subprocess.Popen([script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT) processes.append(process) time.sleep(2) process = subprocess.Popen(['python', script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT) processes.append(process) logger.info("开启 server") elif data['content'] == 'stop': for process in processes: logger.error(process.pid) os.kill(process.pid, signal.SIGINT) logger.info("关闭 server") elif data['content'] == 'restart': logger.info("重启 server") elif data['type'] == 'operate_task': # 任务函数 if data['content'] == 'run': task_run_flag = True if task_process != None: task_process.terminate() time_record = time.perf_counter() task_process = Process(target=main_func, args=(queue,skip_task_queue)) task_process.start() logger.info("开启 task") elif data['content'] == 'stop': task_run_flag = False task_process.terminate() logger.info(f"任务结束 用时{time.perf_counter() - time_record}s") logger.info("关闭 task") elif data['content'] == 'restart': if task_process != None: task_process.terminate() task_process = Process(target=main_func, args=(queue,skip_task_queue)) task_process.start() elif data['type'] == 'show_server_log': content = '' try: with open("server_processes.log", 'r') as file: content = file.read() except: pass socketio.emit('server_log', {'type': 'server_log', 'content': content}) elif data['type'] == 'skip_task': logger.info(data) skip_task_queue.put(1) # elif data['type'] == 'save_target_person': # config_path = os.path.join(fileOptions_path, 'cfg_args.toml') # config_args = toml.load(config_path) # config_args['lane_mode']['mode_index'] = int(data['content']) # with open(config_path, 'w') as config_file: # toml.dump(config_args, config_file) @socketio.on('connect') def test_connect(): logger.info('Client connected') socketio.emit('config_data', {'type': 'fileOptions', 'content': fileOptions_list}) config_data = {} for item in fileOptions_list: config_data[item] = toml.load(os.path.join(fileOptions_path,item)) socketio.emit('config_data', {'type': 'config_data', 'content': config_data}) socketio.emit('task_status', {'type': 'task_status', 'content': int(task_run_flag)}) def thread_function(): global queue while True: try: log = queue.get() socketio.emit('log', log) except multiprocessing.Queue.Empty: pass if __name__ == '__main__': config_path = os.path.join(fileOptions_path, 'cfg_args.toml') config_args = toml.load(config_path) config_args['lane_mode']['mode_index'] = 1 with open(config_path, 'w') as config_file: toml.dump(config_args, config_file) log_file = "server_processes.log" log = open(log_file, "w") time.sleep(2) # 启动所有脚本 for i, env_info in enumerate(server_command): env_path = env_info["path"] script = env_info["script"] env = os.environ.copy() if i == 0: process = subprocess.Popen([script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT) processes.append(process) time.sleep(2) process = subprocess.Popen(['python', script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT) processes.append(process) thread1 = threading.Thread(target=thread_function, daemon = True) thread1.start() socketio.run(app, host='0.0.0.0', port=5001, allow_unsafe_werkzeug=True) if server_process != None: server_process.terminate() if task_process != None: task_process.terminate() for process in processes: logger.error(process.pid) os.kill(process.pid, signal.SIGINT)