From 4c1cf9ceb0d74b85933ed9b24d0a108c04430ba4 Mon Sep 17 00:00:00 2001 From: bmy <2583236812@qq.com> Date: Sun, 2 Jun 2024 17:49:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E9=83=A8=E5=88=86?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cfg_main.toml | 33 ++-- cfg_subtask.toml | 6 + main.py | 36 ++-- majtask.py | 23 +-- subtask.py | 426 ++++++++++++++++++++++++++++++++++++----------- utils.py | 95 ++++++++--- 6 files changed, 458 insertions(+), 161 deletions(-) diff --git a/cfg_main.toml b/cfg_main.toml index d4956b4..57c5d3a 100644 --- a/cfg_main.toml +++ b/cfg_main.toml @@ -5,22 +5,23 @@ logger_format = "{time} {level} {message}" [task] GetBlock_enable = false # 人员施救使能 PutBlock_enable = false # 紧急转移使能 -GetBBall_enable = true # 整装上阵使能 -UpTower_enable = true # 通信抢修使能 -GetRBall_enable = true # 高空排险使能 -PutBBall_enable = true # 派发物资使能 -PutHanoi_enable = true # 物资盘点使能 -MoveArea_enable = true # 应急避险使能 +GetBBall_enable = false # 整装上阵使能 +UpTower_enable = false # 通信抢修使能 +GetRBall_enable = false # 高空排险使能 +PutBBall_enable = false # 派发物资使能 +PutHanoi_enable = false # 物资盘点使能 +MoveArea_enable = false # 应急避险使能 KickAss_enable = true # 扫黑除暴使能 [find_counts] -GetBlock_counts = 5 # 人员施救使能 -PutBlock_counts = 5 # 紧急转移使能 -GetBBall_counts = 5 # 整装上阵使能 -UpTower_counts = 5 # 通信抢修使能 -GetRBall_counts = 5 # 高空排险使能 -PutBBall_counts = 5 # 派发物资使能 -PutHanoi1_counts = 20 # 物资盘点使能 -PutHanoi2_counts = 5 # 物资盘点使能 -MoveArea_counts = 5 # 应急避险使能 -KickAss_counts = 5 # 扫黑除暴使能 +GetBlock_counts = 5 # 人员施救计数 +PutBlock_counts = 5 # 紧急转移计数 +GetBBall_counts = 5 # 整装上阵计数 +UpTower_counts = 5 # 通信抢修计数 +GetRBall_counts = 5 # 高空排险计数 +PutBBall_counts = 5 # 派发物资计数 +PutHanoi1_counts = 10 # 物资盘点计数 +PutHanoi2_counts = 4300 # 物资盘点计数 +MoveArea1_counts = 2 # 应急避险计数 +MoveArea2_counts = 1700 # 应急避险第二阶段计数 +KickAss_counts = 2 # 扫黑除暴计数 diff --git a/cfg_subtask.toml b/cfg_subtask.toml index ffa4435..bc53026 100644 --- a/cfg_subtask.toml +++ b/cfg_subtask.toml @@ -13,7 +13,13 @@ [put_hanoi1] [put_hanoi2] +pos_gap = 160 # 标定值,两个放置位置的标定距离 +pos_lp = 1 # 1\2\3 数字越小越靠近红色置物区 +pos_mp = 3 [move_area] [kick_ass] +pos_gap1 = 150 # 目标牌和第一个 person 之间的距离 +pos_gap2 = 100 # person 之间的距离 +target_person = 3 diff --git a/main.py b/main.py index 67ecf5d..3722d80 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ import subtask as sb import majtask as mj from by_cmd_py import by_cmd_py import time + cmd_py_obj = by_cmd_py() sb.import_obj(cmd_py_obj) @@ -16,17 +17,17 @@ cfg_main = toml.load('cfg_main.toml') logger.add(cfg_main['debug']['logger_filename'], format=cfg_main['debug']['logger_format'], retention = 5, level="INFO") # 向任务队列添加任务 -# TODO 任务关闭相关联 task_queue = queue.Queue() task_queue.put(sb.task(sb.get_block, cfg_main['find_counts']['GetBlock_counts'], cfg_main['task']['GetBlock_enable'])) -task_queue.put(sb.task(sb.put_block, cfg_main['find_counts']['PutBlock_counts'], cfg_main['task']['PutBlock_enable'])) +task_queue.put(sb.task(sb.put_block, cfg_main['find_counts']['PutBlock_counts'], cfg_main['task']['GetBlock_enable'] and cfg_main['task']['PutBlock_enable'])) task_queue.put(sb.task(sb.get_bball, cfg_main['find_counts']['GetBBall_counts'], cfg_main['task']['GetBBall_enable'])) task_queue.put(sb.task(sb.up_tower, cfg_main['find_counts']['UpTower_counts'], cfg_main['task']['UpTower_enable'])) task_queue.put(sb.task(sb.get_rball, cfg_main['find_counts']['GetRBall_counts'], cfg_main['task']['GetRBall_enable'])) -task_queue.put(sb.task(sb.put_bball, cfg_main['find_counts']['PutBBall_counts'], cfg_main['task']['PutBBall_enable'])) -task_queue.put(sb.task(sb.put_hanoi1, cfg_main['find_counts']['PutHanoi1_counts'], True)) # 无论是否进行任务,检测标识并转向都是必须进行的 +task_queue.put(sb.task(sb.put_bball, cfg_main['find_counts']['PutBBall_counts'], cfg_main['task']['GetBBall_enable'] and cfg_main['task']['PutBBall_enable'])) +task_queue.put(sb.task(sb.put_hanoi1, cfg_main['find_counts']['PutHanoi1_counts'], False)) # 无论是否进行任务,检测标识并转向都是必须进行的 task_queue.put(sb.task(sb.put_hanoi2, cfg_main['find_counts']['PutHanoi2_counts'], cfg_main['task']['PutHanoi_enable'])) -task_queue.put(sb.task(sb.move_area, cfg_main['find_counts']['MoveArea_counts'], cfg_main['task']['MoveArea_enable'])) +task_queue.put(sb.task(sb.move_area1, cfg_main['find_counts']['MoveArea1_counts'], cfg_main['task']['MoveArea_enable'])) +task_queue.put(sb.task(sb.move_area2, cfg_main['find_counts']['MoveArea2_counts'], cfg_main['task']['MoveArea_enable'])) task_queue.put(sb.task(sb.kick_ass, cfg_main['find_counts']['KickAss_counts'], cfg_main['task']['KickAss_enable'])) # 将任务队列传入调度模块中 @@ -43,21 +44,26 @@ worker.start() if (cmd_py_obj.send_angle_camera(180) == -1): cmd_py_obj.send_angle_camera(180) time.sleep(2) -# cmd_py_obj.send_speed_x(5) -# cmd_py_obj.send_position_axis_z(10, 100) # 创建主任务 main_task_t = mj.main_task(cmd_py_obj) # 初始化时传入 zmq socket 对象 # 主线程仅在子线程搜索 (SEARCHING) 和 空闲 (IDLE) 状态下进行操作 # while task_queuem_t.busy is True: -while True: - if task_queuem_t.status is sb.task_queuem_status.EXECUTING: - pass - else: - # 模拟执行回归任务 - # logger.info("***** sim huigui task *****") - main_task_t.run() - pass +try: + while True: + if task_queuem_t.status is sb.task_queuem_status.EXECUTING: + pass + else: + main_task_t.run() + pass +except KeyboardInterrupt: + logger.info("Interrupt received, stopping...") + # 停车 + for _ in range(3): + cmd_py_obj.send_speed_x(0) + time.sleep(0.1) + cmd_py_obj.send_speed_omega(0) + time.sleep(0.1) logger.info("Main thread exit") diff --git a/majtask.py b/majtask.py index 656d808..f63dd18 100644 --- a/majtask.py +++ b/majtask.py @@ -2,6 +2,7 @@ from simple_pid import PID import zmq import time from loguru import logger +import utils class PidWrap: def __init__(self, kp, ki, kd, setpoint=0, output_limits=1): @@ -42,17 +43,18 @@ class main_task(): else: pass + + def run(self): - # TODO 请求和解析回归值待完成 - try: - # data = self.queue.get_nowait() - # self.parse_data(data) - pass - except: - pass # 运行巡线任务 self.lane_task() - + + def lane_ntask(self): + # time.sleep(0.002) + # self.socket.send_string("") + # resp = self.socket.recv_pyobj() + # return resp.get('data')[0] - 160 + pass def lane_task(self): # TODO 巡航参数从配置文件中读取 time.sleep(0.002) @@ -60,6 +62,7 @@ class main_task(): self.x = self.x / 3 self.y = self.y / 3 self.lane_error = self.x - 160 + utils.lane_error = self.lane_error # 赋全局变量 self.error_counts = 0 self.x = 0 self.y = 0 @@ -79,9 +82,9 @@ class main_task(): else: self.pid1.set(0.8, 0, 0) self.by_cmd.send_speed_x(11) - # TODO 待引入控制接口 - pid_out = self.pid1.get(self.lane_error*0.65) + # pid_out = self.pid1.get(self.lane_error*0.65) + pid_out = self.pid1.get(self.lane_error*0.75) self.by_cmd.send_speed_omega(pid_out) self.socket.send_string("") resp = self.socket.recv_pyobj() diff --git a/subtask.py b/subtask.py index 4972a95..2fde91d 100644 --- a/subtask.py +++ b/subtask.py @@ -2,6 +2,7 @@ from enum import Enum from loguru import logger from utils import label_filter from utils import tlabel +import utils import toml import zmq import time @@ -9,18 +10,67 @@ import time context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:6667") -logger.info("socket init") +logger.info("subtask yolo client init") +cfg = toml.load('cfg_subtask.toml') # 加载任务配置 +logger.info("load subtask config") by_cmd = None filter = None + def import_obj(_by_cmd): global by_cmd global filter by_cmd = _by_cmd filter = label_filter(socket) +def car_stop(): + pass +# run_on 是否继续向前行进 等待 error<5 后停止 如果不需要前进(在校准前车已经停下) +# 则不需要等待 error < 5 可以直接 aim_near - +def calibrate(label, offset, run_on = True, cali_speed = 10): + logger.info("开始校准") + # go on + if run_on: + for _ in range(3): + by_cmd.send_speed_x(7) + by_cmd.send_speed_omega(0) + time.sleep(0.1) + while True: + ret, error = filter.aim_near(label) + while not ret: + ret, error = filter.aim_near(label) + error += offset + if run_on: + if abs(error) < 5: + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + break + else: + break + + ret, error = filter.aim_near(label) + while not ret: + ret, error = filter.aim_near(label) + error += offset + time.sleep(1) + logger.error(error) + if abs(error) > 5: + logger.info("校准中") + if error > 0: + by_cmd.send_distance_x(-cali_speed, int(error*4)) + else: + by_cmd.send_distance_x(cali_speed, int(-error*4)) + logger.error(error) + time.sleep(1) + # stop + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + pass # 任务类 class task: @@ -36,8 +86,11 @@ class task: # 检查该任执行标志 while True: # if self.func_find(): - if self.task_t.find(): - self.counts += 1 + # 特殊地方可以直接跳出 find + ret = self.task_t.find() + self.counts += ret + # if self.task_t.find(): + # self.counts += 1 if self.counts >= self.find_counts: break # while self.func_find() is False: @@ -47,11 +100,11 @@ class task: # 根据标志位确定是否执行该任务 if self.enable is True: logger.debug(f"[Task ]# Executing task") - # self.func_exec() self.task_t.exec() logger.debug(f"[Task ]# Task completed") else: logger.warning(f"[Task ]# Skip task") + self.task_t.nexec() # 任务队列状态类 class task_queuem_status(Enum): @@ -115,41 +168,40 @@ class get_block(): else: return False def exec(self): - for _ in range(3): - by_cmd.send_speed_x(7) - by_cmd.send_speed_omega(0) - time.sleep(0.1) - logger.info("abcd") - cfg = toml.load('cfg_subtask.toml') # 加载任务配置 - while True: - # logger.info("等待进入准确区域") - ret, error = filter.aim_near(tlabel.RBLOCK) - while not ret: - ret, error = filter.aim_near(tlabel.RBLOCK) - # logger.info(error) - if abs(error) < 5: - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - break - ret, error = filter.aim_near(tlabel.RBLOCK) - while not ret: - ret, error = filter.aim_near(tlabel.RBLOCK) - time.sleep(1) - logger.error(error) - if abs(error) > 5: - logger.info("校准中") - if error > 0: - by_cmd.send_distance_x(-10, int(error*3)) - else: - by_cmd.send_distance_x(10, int(-error*3)) - logger.error(error) - time.sleep(1) - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) + calibrate(tlabel.RBLOCK,15) + # for _ in range(3): + # by_cmd.send_speed_x(7) + # by_cmd.send_speed_omega(0) + # time.sleep(0.1) + # while True: + # # logger.info("等待进入准确区域") + # ret, error = filter.aim_near(tlabel.RBLOCK) + # while not ret: + # ret, error = filter.aim_near(tlabel.RBLOCK) + # # logger.info(error) + # if abs(error) < 5: + # for _ in range(3): + # by_cmd.send_speed_x(0) + # time.sleep(0.2) + # by_cmd.send_speed_omega(0) + # break + # ret, error = filter.aim_near(tlabel.RBLOCK) + # while not ret: + # ret, error = filter.aim_near(tlabel.RBLOCK) + # time.sleep(1) + # logger.error(error) + # if abs(error) > 5: + # logger.info("校准中") + # if error > 0: + # by_cmd.send_distance_x(-10, int(error*3)) + # else: + # by_cmd.send_distance_x(10, int(-error*3)) + # logger.error(error) + # time.sleep(1) + # for _ in range(3): + # by_cmd.send_speed_x(0) + # time.sleep(0.2) + # by_cmd.send_speed_omega(0) time.sleep(2) by_cmd.send_position_axis_z(10, 150) @@ -179,6 +231,7 @@ class get_block(): time.sleep(3) by_cmd.send_position_axis_x(4, 0) def nexec(self): + # TODO 完成不执行任务的空动作 pass @@ -194,13 +247,39 @@ class put_block(): else: return False def exec(self): - cfg = toml.load('cfg_subtask.toml') # 加载任务配置 + # TODO 测试对准效果 logger.info("找到医院") for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) by_cmd.send_speed_omega(0) - + time.sleep(0.1) + while True: + # logger.info("等待进入准确区域") + ret, error = filter.aim_near(tlabel.HOSPITAL) + while not ret: + ret, error = filter.aim_near(tlabel.HOSPITAL) + # logger.info(error) + if abs(error) < 5: + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + break + ret, error = filter.aim_near(tlabel.HOSPITAL) + while not ret: + ret, error = filter.aim_near(tlabel.HOSPITAL) + time.sleep(1) + logger.error(error) + if abs(error) > 5: + logger.info("校准中") + if error > 0: + by_cmd.send_distance_x(-10, int(error*3)) + else: + by_cmd.send_distance_x(10, int(-error*3)) + logger.error(error) + time.sleep(1) + by_cmd.send_position_axis_z(10, 150) time.sleep(3) # TODO 切换爪子方向 @@ -208,6 +287,8 @@ class put_block(): time.sleep(2) by_cmd.send_position_axis_z(10, 170) pass + def nexec(self): + pass # 整装上阵 class get_bball(): @@ -215,7 +296,7 @@ class get_bball(): by_cmd.send_position_axis_x(2, 140) logger.info("整装上阵初始化") time.sleep(0.5) - if (by_cmd.send_angle_camera(90) == -1): + while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) def find(self): # 目标检测黄球 @@ -255,7 +336,7 @@ class get_bball(): by_cmd.send_distance_x(10, int(-error*3)) logger.error(error) time.sleep(1) - if (by_cmd.send_angle_camera(0) == -1): + while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) by_cmd.send_position_axis_z(20, 160) time.sleep(2) @@ -277,6 +358,8 @@ class get_bball(): time.sleep(2) by_cmd.send_angle_claw(90) pass + def nexec(self): + pass # 通信抢修 class up_tower(): @@ -322,12 +405,15 @@ class up_tower(): by_cmd.send_distance_x(10, int(-error*3)) logger.error(error) time.sleep(1) + def nexec(self): + pass + # 高空排险 class get_rball(): def init(self): logger.info("高空排险初始化") - if (by_cmd.send_angle_camera(0) == -1): + while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) def find(self): # 目标检测红球 @@ -338,18 +424,17 @@ class get_rball(): return False def exec(self): logger.info("找到红球") - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - time.sleep(1) + # TODO 高空排险 offset 需要调整 + calibrate(tlabel.RBALL,15, True, 6) + pass + def nexec(self): pass # 派发物资 class put_bball(): def init(self): logger.info("派发物资初始化") - if (by_cmd.send_angle_camera(90) == -1): + while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) def find(self): # 目标检测通信塔 @@ -365,8 +450,10 @@ class put_bball(): time.sleep(0.2) by_cmd.send_speed_omega(0) time.sleep(1) - pass + def nexec(self): + pass + direction = tlabel.RMARK direction_left = 0 direction_right = 0 @@ -386,14 +473,25 @@ class put_hanoi1(): ret2, list2 = filter.get(tlabel.LMARK) if ret1: logger.info("向右拐") - direction_right += 1 - return True + list1 = list1[0] + area = (list1[2] - list1[0]) * (list1[3] - list1[1]) + logger.info(area) + if area > 2500: + direction_right += 1 + return True + return False elif ret2: logger.info("向左拐") - direction_left += 1 - return True + list2 = list2[0] + area = (list2[2] - list2[0]) * (list2[3] - list2[1]) + logger.info(area) + if area > 2500: + direction_left += 1 + return True + return False return False def exec(self): + global direction for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) @@ -401,70 +499,204 @@ class put_hanoi1(): time.sleep(0.2) # if direction == tlabel.RMARK: if direction_right > direction_left: - by_cmd.send_angle_omega(-20,500) + direction = tlabel.RMARK + # by_cmd.send_distance_x(6, 200) + by_cmd.send_angle_omega(-20,380) + time.sleep(2) + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) else: + direction = tlabel.LMARK + # by_cmd.send_distance_x(6, 200) by_cmd.send_angle_omega(20,500) - time.sleep(0.2) - if (by_cmd.send_angle_camera(180) == -1): + time.sleep(2) + while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) - time.sleep(2) + time.sleep(0.5) socket.send_string("1") socket.recv() pass + def nexec(self): + pass + +def sub_put_hanoi2(label,distance_type,run_on,back_flag): + # logger.info("找到大红色柱体") + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + time.sleep(2) + + # 对准大红色柱体 + calibrate(label,15, run_on, 6) + # 抓取大红色柱体 + logger.info("抓取柱体") + # 根据 direction 确定移动方向 + by_cmd.send_distance_x(-8, distance_type) + # 移动 self.distance_lp 距离 + # 放置物块 + logger.info("开始放置柱体") + time.sleep(1) + if back_flag: + pass + # 回移相同距离 + # if direction == tlabel.RMARK: + # by_cmd.send_distance_x(8, distance_type) + # else: + # by_cmd.send_distance_x(-8, distance_type) class put_hanoi2(): + def __init__(self): + self.pos_lp = cfg['put_hanoi2']['pos_lp'] + self.pos_mp = cfg['put_hanoi2']['pos_mp'] + self.pos_sp = 6 - self.pos_lp - self.pos_mp + self.distance_lp = self.pos_lp * cfg['put_hanoi2']['pos_gap'] + self.distance_mp = self.pos_mp * cfg['put_hanoi2']['pos_gap'] + self.distance_sp = self.pos_sp * cfg['put_hanoi2']['pos_gap'] + logger.info(f"setting hanoi pos_lp[{self.pos_lp}] pos_mp[{self.pos_mp}] pos_sp[{self.pos_sp}]") def init(self): logger.info("物资盘点 2 初始化") def find(self): - # 目标检测左右转向标识 - ret1, list1 = filter.get(tlabel.LPILLER) + time.sleep(0.001) + return True + # # 目标检测左右转向标识 + # ret1, list1 = filter.get(tlabel.LPILLER) + # ret, _ = filter.get_mult([tlabel.LPILLER]) + # if ret: + # logger.info("看到了三个 直接停车") + # return 100 + # if ret1 > 0: + # list1 = list1[0] + # area = (list1[2] - list1[0]) * (list1[3] - list1[1]) + # logger.info(area) + # if area > 3300: + # return True + # return False + # else: + # return False + def exec(self): + # TODO 延时需要根据移动的 distance 判断 + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + # 往回走一段 然后向前行进校准 + by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) + time.sleep(1) + logger.info(f"方向{direction}") + + if utils.lane_error > 0: + by_cmd.send_angle_omega(-20,abs(utils.lane_error)) + else: + by_cmd.send_angle_omega(20,abs(utils.lane_error)) + + time.sleep(1) + sub_put_hanoi2(tlabel.LPILLER, self.distance_lp, True, True) + time.sleep(5) + # 对准中蓝色柱体 + sub_put_hanoi2(tlabel.MPILLER, self.distance_mp, True, True) + time.sleep(5) + # 根据 direction 确定移动方向 + # 移动 self.distance_mp 距离 + # 放置物块 + # 回移相同距离 + sub_put_hanoi2(tlabel.SPILLER, self.distance_sp, True, True) + time.sleep(5) + # 对准小红色柱体 + # 根据 direction 确定移动方向 + # 移动 self.distance_sp 距离 + # 放置物块 + pass + def nexec(self): + pass - if ret1 > 0: +# 应急避险 第一阶段 找目标牌 +class move_area1(): + def init(self): + logger.info("应急避险第一阶段初始化") + while (by_cmd.send_angle_camera(180) == -1): + by_cmd.send_angle_camera(180) + def find(self): + # 目标检测标志牌 + ret, error = filter.aim_near(tlabel.SIGN) + if ret > 0 and abs(error) < 8: return True else: return False def exec(self): - logger.info("找到最大块") + logger.info("找到标示牌") + # for _ in range(3): + # by_cmd.send_speed_x(0) + # time.sleep(0.2) + # by_cmd.send_speed_omega(0) + + pass + def nexec(self): + pass +# 应急避险 第二阶段 找停车区域 +class move_area2(): + def init(self): + logger.info("应急避险第二阶段初始化") + def find(self): + time.sleep(0.001) + return True + def exec(self): + logger.info("开始寻找停车区域") + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + calibrate(tlabel.SHELTER, 15, False, 6) + time.sleep(1) + # 进入停车区域 + by_cmd.send_distance_y(10, 450) + time.sleep(3) + # 离开停车区域 + by_cmd.send_distance_y(-10, 450) + time.sleep(3) + while True: + pass + + pass + def nexec(self): + pass +# 扫黑除暴 +class kick_ass(): + def init(self): + logger.info("扫黑除暴初始化") + self.pos_gap1 = cfg['kick_ass']['pos_gap1'] + self.pos_gap2 = cfg['kick_ass']['pos_gap2'] + self.target_person = cfg['kick_ass']['target_person'] + def find(self): + ret, error = filter.aim_near(tlabel.SIGN) + if ret > 0 and abs(error) < 8: + return True + else: + return False + def exec(self): + logger.info("找到标示牌") for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) by_cmd.send_speed_omega(0) time.sleep(1) - pass - -# 应急避险 -class move_area(): - def init(self): - logger.info("应急避险初始化") - if (by_cmd.send_angle_camera(180) == -1): - by_cmd.send_angle_camera(180) - def find(self): - # 目标检测标志牌 - # TODO 如何确保在都检测标志牌的情况下,和下一个任务进行区分 - ret1, list1 = filter.get(tlabel.SIGN) - - if ret1 > 0: - return True + if utils.lane_error > 0: + by_cmd.send_angle_omega(-20,abs(utils.lane_error)*1.5) else: - return False - def exec(self): - logger.info("找到标示牌") - pass + by_cmd.send_angle_omega(20,abs(utils.lane_error)*1.5) + time.sleep(1) + calibrate(tlabel.SIGN, 8, False, 6) + time.sleep(0.5) + -# 扫黑除暴 -class kick_ass(): - def init(self): - logger.info("扫黑除暴初始化") - def find(self): - # 目标检测标志牌 - # TODO 如何确保在都检测标志牌的情况下,和上一个任务进行区分 - ret1, list1 = filter.get(tlabel.SIGN) - - if ret1 > 0: - return True + if self.target_person == 1: + by_cmd.send_distance_x(10, self.pos_gap1) else: - return False - def exec(self): - logger.info("找到标示牌") + by_cmd.send_distance_x(10, self.pos_gap1 + (self.target_person - 1) * self.pos_gap2) + + time.sleep(2) + time.sleep(5) + pass + def nexec(self): pass diff --git a/utils.py b/utils.py index 420d9f5..d28a063 100644 --- a/utils.py +++ b/utils.py @@ -1,24 +1,9 @@ + from enum import Enum import numpy as np -# 根据标签修改 -# class tlabel(Enum): -# BBLOCK = 5 # 蓝色方块 -# RBLOCK = 2 # 红色方块 -# HOSPITAL = 3 # 医院 -# BBALL = 4 # 蓝球 -# YBALL = 5 # 黄球 -# TOWER = 6 # 通信塔 -# RBALL = 7 # 红球 -# BASKET = 8 # 球筐 -# MARKL = 9 # 指向标 -# MARKR = 10 # 指向标 -# SPILLAR = 11 # 小柱体 (红色) -# MPILLAR = 12 # 中柱体 (蓝色) -# LPILLAR = 13 # 大柱体 (红色) -# SIGN = 14 # 文字标牌 -# TARGET = 15 # 目标靶 -# SHELTER = 16 # 停车区 -# BASE = 17 # 基地 + +lane_error = 0 + class tlabel(Enum): TOWER = 0 SIGN = 1 @@ -36,6 +21,9 @@ class tlabel(Enum): LMARK = 13 BBLOCK = 14 BBALL = 15 +''' +description: label_filter 的测试数据 +''' test_resp = { 'code': 0, 'data': np.array([ @@ -48,6 +36,9 @@ test1_resp = { 'code': 0, 'data': np.array([]) } +''' +description: yolo 目标检测标签过滤器,需要传入连接到 yolo server 的 socket 对象 +''' class label_filter: def __init__(self, socket, threshold=0.6): self.num = 0 @@ -55,16 +46,33 @@ class label_filter: self.socket = socket self.threshold = threshold - self.img_size = (320, 240) + self.img_size = (320, 240) + ''' + description: 向 yolo server 请求目标检测数据 + param {*} self + return {*} + ''' def get_resp(self): self.socket.send_string('') response = self.socket.recv_pyobj() return response + ''' + description: 切换 yolo server 视频源 在分叉路口时目标检测需要使用前摄 + param {*} self + param {*} camera_id 1 或者 2 字符串 + return {*} + ''' def switch_camera(self,camera_id): if camera_id == 1 or camera_id == 2: self.socket.send_string(f'{camera_id}') response = self.socket.recv_pyobj() return response + ''' + description: 对模型推理推理结果使用 threshold 过滤 默认阈值为 0.5 + param {*} self + param {*} data get_resp 返回的数据 + return {bool,array} + ''' def filter_box(self,data): if len(data) > 0: expect_boxes = (data[:, 1] > self.threshold) & (data[:, 0] > -1) @@ -83,10 +91,15 @@ class label_filter: if len(results) > 0: return True, np.array(results) return False, None + ''' + description: 根据传入的标签过滤,返回该标签的个数以及 box + param {*} self + param {*} tlabel + return {int, array} + ''' def get(self, tlabel): # 循环查找匹配的标签值 # 返回对应标签的个数,以及坐标列表 - # TODO self.filter_box none judge response = self.get_resp() if response['code'] == 0: ret, results = self.filter_box(response['data']) @@ -97,8 +110,38 @@ class label_filter: self.pos = boxes[:, 2:] # [[x1 y1 x2 y2]] return self.num, self.pos return 0, [] + ''' + description: + param {*} self + param {*} tlabel_list + return {*} + ''' + def get_mult(self, tlabel_list): + response = self.get_resp() + if response['code'] == 0: + ret, results = self.filter_box(response['data']) + target_counts = len(tlabel_list) + counts = 0 + if ret: + for tlabel in tlabel_list: + expect_boxes = (results[:, 0] == tlabel.value) + has_true = np.any(expect_boxes) + if has_true: + counts += 1 + else: + return False, [] + if counts == target_counts: + return True, counts + return False, [] + return False, [] + return False, [] + ''' + description: 判断传入的标签是否存在,存在返回 True + param {*} self + param {*} tlabel + return {bool} + ''' def find(self, tlabel): - # 遍历返回的列表,有对应标签则返回 True response = self.get_resp() if response['code'] == 0: ret, results = self.filter_box(response['data']) @@ -108,6 +151,12 @@ class label_filter: if len(boxes) != 0: return True return False + ''' + description: 根据传入的标签, + param {*} self + param {*} tlabel + return {*} + ''' def aim_left(self, tlabel): # 如果标签存在,则返回列表中位置最靠左的目标框和中心的偏移值 response = self.get_resp() @@ -151,7 +200,7 @@ class label_filter: center_x_values = np.abs(boxes[:, 2] + boxes[:, 4] - self.img_size[0]) center_x_index = np.argmin(center_x_values) error = (boxes[center_x_index][4] + boxes[center_x_index][2] - self.img_size[0]) / 2 - return (True, error+15) + return (True, error) return (False, 0) # class Calibrate: