diff --git a/cfg_args.toml b/cfg_args.toml index 75d2e0a..5b963e1 100644 --- a/cfg_args.toml +++ b/cfg_args.toml @@ -1,5 +1,5 @@ [kick_ass] -target_person = 4 +target_person = 3 [task] Subtask_enable = true diff --git a/cfg_subtask.toml.6_9.bak b/cfg_subtask.toml.6_9.bak new file mode 100644 index 0000000..6ad28f2 --- /dev/null +++ b/cfg_subtask.toml.6_9.bak @@ -0,0 +1,61 @@ +[get_block] +pid_kp = 0.9 +pid_ki = 0 +pid_kd = 0 +first_block = "blue" + +[put_block] +pid_kp = 1.2 +pid_ki = 0 +pid_kd = 0 + +[get_bball] +pid_kp = 1.2 +pid_ki = 0 +pid_kd = 0 + +[up_tower] +pid_kp = 1.3 +pid_ki = 0 +pid_kd = 0 + +[get_rball] +pid_kp = 0.6 +pid_ki = 0 +pid_kd = 0 + +[put_bball] +pid_kp = 1.3 +pid_ki = 0 +pid_kd = 0 + +[put_hanoi1] +pid_kp = 0.7 +pid_ki = 0 +pid_kd = 0 + +[put_hanoi2] +pid_kp = 1 +pid_ki = 0 +pid_kd = 0 +pos_gap = 160 +first_target = "mp" + +[put_hanoi3] +pid_kp = 1.3 +pid_ki = 0 +pid_kd = 0 + +[move_area] +pid_kp = 1.2 +pid_ki = 0 +pid_kd = 0 +llm_enable = false + +[kick_ass] +pid_kp = 0.8 +pid_ki = 0 +pid_kd = 0 +pos_gap1 = 150 +pos_gap2 = 80 +target_person = 1 diff --git a/cfg_subtask.toml.7126.bak b/cfg_subtask.toml.7126.bak new file mode 100644 index 0000000..d8c64ee --- /dev/null +++ b/cfg_subtask.toml.7126.bak @@ -0,0 +1,61 @@ +[get_block] +pid_kp = 1.2 +pid_ki = 0 +pid_kd = 0 +first_block = "blue" + +[put_block] +pid_kp = 1.3 +pid_ki = 0 +pid_kd = 0 + +[get_bball] +pid_kp = 1.2 +pid_ki = 0 +pid_kd = 0 + +[up_tower] +pid_kp = 1.3 +pid_ki = 0 +pid_kd = 0 + +[get_rball] +pid_kp = 1.0 +pid_ki = 0 +pid_kd = 0 + +[put_bball] +pid_kp = 1.3 +pid_ki = 0 +pid_kd = 0 + +[put_hanoi1] +pid_kp = 1.3 +pid_ki = 0 +pid_kd = 0 + +[put_hanoi2] +pid_kp = 1.2 +pid_ki = 0 +pid_kd = 0 +pos_gap = 160 +first_target = "mp" + +[put_hanoi3] +pid_kp = 1.2 +pid_ki = 0 +pid_kd = 0 + +[move_area] +pid_kp = 1.4 +pid_ki = 0 +pid_kd = 0 +llm_enable = false + +[kick_ass] +pid_kp = 1.2 +pid_ki = 0 +pid_kd = 0 +pos_gap1 = 150 +pos_gap2 = 80 +target_person = 1 diff --git a/subtask.py b/subtask.py index 777561e..47a4e9f 100644 --- a/subtask.py +++ b/subtask.py @@ -657,8 +657,19 @@ class up_tower(): # calibrate(tlabel.TOWER, 27, False, 6) by_cmd.send_distance_x(-10, 120) time.sleep(1) - by_cmd.send_distance_y(-10, 50) - time.sleep(3) + # 上古參數 + # by_cmd.send_distance_y(-10, 50) + # 6_9 模型參數 + by_cmd.send_distance_y(-10, 40) + # 7_12_3 模型參數 + # by_cmd.send_distance_y(-10, 50) + # time.sleep(1) + car_stop() + # FIXME 如果下發 distance 後直接 car_stop,則 distance 執行時間僅由指令間處理延時決定 + # time.sleep(3) + # by_cmd.send_speed_y(-10) + # time.sleep(0.15) + by_cmd.send_angle_zhuan(10) time.sleep(12) by_cmd.send_speed_y(10) @@ -702,8 +713,15 @@ class get_rball(): time.sleep(0.5) # 靠近塔 by_cmd.send_angle_scoop(20) - by_cmd.send_distance_y(-15, 45) # 50 + # 上古參數 + # by_cmd.send_distance_y(-15, 45) # 50 + # 6_9 參數 + by_cmd.send_distance_y(-15, 35) time.sleep(2) + # 7_12_3 參數 + # by_cmd.send_distance_y(-15, 45) + # time.sleep(2) + car_stop() calibrate_new(tlabel.RBALL,offset = 44, run = True) time.sleep(1) logger.info("抓红球") @@ -826,7 +844,8 @@ class put_hanoi1(): # time.sleep(0.5) # by_cmd.send_distance_x(10, 200) - by_cmd.send_distance_x(10, 180) + # by_cmd.send_distance_x(10, 180) + by_cmd.send_distance_x(10, 250) time.sleep(1) car_stop() @@ -1358,9 +1377,10 @@ class kick_ass(): car_stop() time.sleep(1) calibrate_new(tlabel.SIGN, offset = 8, run = True) + by_cmd.send_angle_claw(15) time.sleep(0.5) by_cmd.send_position_axis_z(30, 60) - by_cmd.send_position_axis_x(1, 120) + by_cmd.send_position_axis_x(1, 130) if self.target_person == 1: target_distance = self.pos_gap1 @@ -1371,15 +1391,15 @@ class kick_ass(): time.sleep(1.5 + (self.target_person - 1) * 0.7 ) car_stop() - by_cmd.send_angle_claw_arm(220) - by_cmd.send_angle_claw(15) + # by_cmd.send_angle_claw_arm(220) + # time.sleep(0.5) by_cmd.send_position_axis_x(1, 20) time.sleep(3) by_cmd.send_position_axis_x(1, 120) time.sleep(1) logger.debug("結束任務,前進四") - by_cmd.send_speed_x(25) - time.sleep(4) + # by_cmd.send_speed_x(25) + # time.sleep(4) pass def nexec(self): pass diff --git a/subtask_69.py b/subtask_69.py new file mode 100644 index 0000000..47a4e9f --- /dev/null +++ b/subtask_69.py @@ -0,0 +1,1407 @@ +from enum import Enum +from loguru import logger +from utils import label_filter +from utils import tlabel +from utils import LLM +from utils import CountRecord +import utils +import toml +import zmq +import time +import variable as var +import action as act +import re +import threading +import ctypes +cfg = None +cfg_args = None +by_cmd = None +filter = None +llm_bot = None + +# 目标检测 socket 客户端 +context = None +socket = None + +context1 = None +ocr_socket = None + +global_skip_queue = None + +''' +description: main.py 里执行 引入全局变量 +param {*} _by_cmd 控制器对象 +return {*} +''' +def import_obj(_by_cmd, skip_queue): + + global by_cmd + global filter + global llm_bot + + global context + global socket + + global context1 + global ocr_socket + + global cfg + global cfg_args + global global_skip_queue + cfg = toml.load('/home/evan/Workplace/project_main/cfg_subtask.toml') # 加载任务配置 + cfg_args = toml.load('/home/evan/Workplace/project_main/cfg_args.toml') + by_cmd = _by_cmd + global_skip_queue = skip_queue + + # 目标检测 socket 客户端 + context = zmq.Context() + socket = context.socket(zmq.REQ) + socket.connect("tcp://localhost:6667") + logger.info("subtask yolo client init") + + # ocr socket 客户端 + # context1 = zmq.Context() + # ocr_socket = context1.socket(zmq.REQ) + # ocr_socket.connect("tcp://localhost:6668") + # logger.info("subtask ocr client init") + + filter = label_filter(socket) + if cfg['move_area']['llm_enable']: + llm_bot = LLM() +def car_stop(): + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_y(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) +# 蓝色球使用 +def calibrate_right_new(label, offset, run = True, run_speed = 3.5): + + not_found_counts = 0 + ret, error = filter.aim_right(label) + while not ret: + not_found_counts += 1 + if not_found_counts >= 20: + not_found_counts = 0 + error = -320 # error > 0 front run + logger.info("calibrate_right_new:找不到次数超过 20 帧 直接前进寻找") + break + ret, error = filter.aim_right(label) + + error += offset + if abs(error) > 10 and run: + if error > 0: + by_cmd.send_speed_x(-run_speed) + else: + by_cmd.send_speed_x(run_speed) + # 停的位置已经很接近目标,可以直接使用 distance 校准 + else: + return + + while True: + ret, error = filter.aim_right(label) + while not ret: + ret, error = filter.aim_right(label) + error += offset + # if ret: + if abs(error) <= 8: + car_stop() + logger.info("calibrate_right_new:行进时 误差小于 8 直接停车") + ret, error = filter.aim_right(label) + while not ret: + not_found_counts += 1 + if not_found_counts >=30: + return + ret, error = filter.aim_right(label) + error += offset + logger.info(f"calibrate_right_new:停车后的误差是{error}") + if abs(error) > 8: + logger.info(f"calibrate_right_new:停车后的误差大于 8 使用 distance 校准") + error = error * 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + + break +''' +description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车 +如果停车后 error > 8 则使用 distance 校准 +这个方法仅用于在视野里能找到的情况下进行校准,并不能实现边走边寻找 然后再校准 +param {*} label +param {*} offset +param {*} run +param {*} run_speed +return {*} +''' +def calibrate_new(label, offset, run = True, run_speed = 3.5): + not_found_counts = 0 + ret, box = filter.get(label) + while not ret: + not_found_counts += 1 + if not_found_counts >= 20: + not_found_counts = 0 + error = -320 # error > 0 front run + logger.info("calibrate_new:找不到次数超过 20 帧 直接前进寻找") + break + ret, box = filter.get(label) + + # 如果超过二十帧跳出,则此时 box 为空值,需要再校验 ret + if ret is True: + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if abs(error) > 10 and run: + if error > 0: + by_cmd.send_speed_x(-run_speed) + else: + by_cmd.send_speed_x(run_speed) + # 停的位置已经很接近目标,可以直接使用 distance 校准 + else: + if abs(error) > 8: + logger.info(f"calibrate_new:停车后误差{error}大于 8 使用 distance 校准") + # error = error # 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + return + logger.info(f"calibrate_new:停车后误差{error}小于 8 不校准") + return + while True: + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if ret: + if abs(error) <= 10: # 5 + car_stop() + logger.info("calibrate_new:行进时 误差小于 10 直接停车") + + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + logger.info(f"calibrate_new:停车后的误差是{error}") + if abs(error) > 8: + logger.info(f"calibrate_new:停车后的误差大于 8 使用 distance 校准") + error = error * 3 + # logger.error(f"error * 3 {error}") + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + break +# 弃用 distance 校准 +def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5): + # run_direc == 1 向前 + stop_error = 0 + if run_direc == 1: + by_cmd.send_speed_x(run_speed) + else: + by_cmd.send_speed_x(-run_speed) + + if label == tlabel.TPLATFORM: + stop_error = 8 + else: + stop_error = 15 + while True: + ret, box = filter.get(label) + while not ret: + if not global_skip_queue.empty(): + _ = global_skip_queue.get() + logger.error("跳过 explore_calibrate_new") + return False + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if ret: + # 校准速度越大 停车的条件越宽泛 20 15 + if abs(error) <= stop_error: + car_stop() + logger.info("explore_calibrate_new:行进时 误差小于 15 直接停车") + + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + error = (box[0][2] + box[0][0] - 320) / 2 + offset + logger.info(f"停车后像素误差:{error}") + + # logger.info(f"explore_calibrate_new:停车后的误差大于 8 使用 distance 校准") + # if abs(error) < 8: + # error = error * 3 + # else: + # error = error * 2 + # if error > 0: + # by_cmd.send_distance_x(-10, int(error)) + # else: + # by_cmd.send_distance_x(10, int(-error)) + + break + return True + +# 任务类 +class task: + def __init__(self, name, task_template, find_counts=10, enable=True): + self.enable = enable + self.task_t = task_template() + + self.counts = 0 + self.find_counts = int(find_counts) + self.name = name + + def init(self): + if hasattr(self.task_t, 'init') and callable(getattr(self.task_t, 'init', None)): + self.task_t.init() + else: + logger.warning("[Task ]# 该任务没有 init 方法") + def find(self): + if hasattr(self.task_t, 'find') and callable(getattr(self.task_t, 'find', None)): + return self.task_t.find() + else: + logger.warning("[Task ]# 该任务没有 find 方法") + def exec(self): + # 根据标志位确定是否执行该任务 + if self.enable is True: + if hasattr(self.task_t, 'exec') and callable(getattr(self.task_t, 'exec', None)): + logger.info(f"[Task ]# Executing task") + self.task_t.exec() + else: + logger.warning("[Task ]# 该任务没有 exec 方法") + else: + logger.warning(f"[Task ]# Skip task") + if hasattr(self.task_t, 'nexec') and callable(getattr(self.task_t, 'nexec', None)): + self.task_t.nexec() + else: + logger.warning("[Task ]# 该任务没有 nexec 方法") + def after(self): + if hasattr(self.task_t, 'after') and callable(getattr(self.task_t, 'after', None)): + logger.info(f"[Task ]# {self.name} 正在执行 after") + self.task_t.after() + logger.debug(f"[Task ]# Task completed") + else: + logger.warning("[Task ]# 该任务没有 after 方法") + +# 任务队列状态类 +class task_queuem_status(Enum): + IDEL = 0 + SEARCHING = 1 + EXECUTING = 2 + +# 任务队列类 非 EXECUTEING 时均执行 huigui,注意互斥操作 +class task_queuem(task): + # task_now = task(None, False) + def __init__(self, queue): + super(task_queuem, self) + self.queue = queue + self.status = task_queuem_status.IDEL + self.busy = True + logger.info(f"[TaskM]# Task num {self.queue.qsize()}") + # exec 线程 + self.exec_thread = None + def exec(self, skip_queue): + # 如果空闲状态则将下一个队列任务取出 + if self.status is task_queuem_status.IDEL: + if self.queue.qsize() == 0: + self.busy = False + logger.info(f"[TaskM]# Task queue empty, exit") + return False + self.task_now = self.queue.get() + + # 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息 + if self.task_now.enable is True: + self.status = task_queuem_status.SEARCHING + # 如果使能该任务则执行该任务的初始化动作 + self.task_now.init() + else: + self.status = task_queuem_status.EXECUTING + logger.info(f"[TaskM]# ---------------------->>>>") + # 阻塞搜索任务标志位 + elif self.status is task_queuem_status.SEARCHING: + logger.info(f"[{self.task_now.name}]# Start searching task target") + while True: + if not skip_queue.empty(): + _ = skip_queue.get() + logger.error(f"{self.task_now.name} 任务在 find 中已经被手动跳过") + self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + return True + ret = self.task_now.find() + self.task_now.counts += ret + if self.task_now.counts >= self.task_now.find_counts: + self.status = task_queuem_status.EXECUTING + break + # 执行任务函数 + elif self.status is task_queuem_status.EXECUTING: + if self.task_now.enable is True: + logger.info(f"[TaskM]# Start execute task function") + # self.exec_thread = threading.Thread(target=self.task_now.exec) + # # 启动线程 + # self.exec_thread.start() + # while True: + # if not self.exec_thread.is_alive(): + # break + # else: + # if not skip_queue.empty(): + # car_stop() + # thread_id = self.exec_thread.ident + # res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit)) + # _ = skip_queue.get() + # logger.error(f"{self.task_now.name} 任务在 exec 中已经被手动跳过") + # self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + # self.task_now.after() # 执行任务后处理 + # self.queue.task_done() # 弹出已执行的任务 + # return True + self.task_now.exec() # 执行当前任务函数 + self.status = task_queuem_status.IDEL # 执行完成后为退出巡线阻塞 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + logger.info(f"[TaskM]# <<<<----------------------") + else: + logger.info(f"[TaskM]# Start execute task function (nexec)") + self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + self.task_now.exec() # 执行当前任务函数 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + logger.info(f"[TaskM]# <<<<----------------------") + return True + +# 人员施救 +class get_block1(): + def init(self): + var.task_speed = 15 + act.cmd.camera(0) + act.cmd.z2(20, 60, 0) + filter.switch_camera(1) + # if cfg['get_block']['first_block'] == "blue": + # self.target_label = tlabel.BBLOCK + # self.another_label = tlabel.RBLOCK + # else: + # self.target_label = tlabel.RBLOCK + # self.another_label = tlabel.BBLOCK + # cfg['get_block']['first_block'] = "red" + self.target_label = [tlabel.RBLOCK, tlabel.BBLOCK] + self.target_counts = [0, 0] + def find(self): + # 目标检测红/蓝方块 + # ret = filter.find(self.target_label) + # if ret > 0: + # return True + # return False + ret = filter.find_mult(self.target_label) + self.target_counts[0] += ret[0] + self.target_counts[1] += ret[1] + if any(ret): + return True + return False + + + def exec(self): + car_stop() + if self.target_counts[0] > self.target_counts[1]: + var.first_block = tlabel.RBLOCK + var.second_block = tlabel.BBLOCK + else: + var.first_block = tlabel.BBLOCK + var.second_block = tlabel.RBLOCK + calibrate_new(var.first_block, offset = 15, run = True, run_speed = 5) + logger.info("抓取块") + + by_cmd.send_position_axis_z(30, 60) + time.sleep(1) + by_cmd.send_angle_claw(63) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(25) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 90) + time.sleep(0.5) + + by_cmd.send_angle_claw_arm(175) + time.sleep(0.1) + by_cmd.send_position_axis_x(1, 120) + time.sleep(1) + by_cmd.send_position_axis_z(30, 70) + time.sleep(0.1) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + + by_cmd.send_position_axis_z(30, 130) + time.sleep(1) + by_cmd.send_position_axis_x(1, 140) + by_cmd.send_angle_claw_arm(220) + time.sleep(0.5) + by_cmd.send_angle_storage(55) + time.sleep(1) + + pass + def nexec(self): + # TODO 完成不执行任务的空动作 + pass + def after(self): + var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) + pass + +class get_block2(): + def init(self): + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + filter.switch_camera(1) + # if cfg['get_block']['first_block'] == "red": + # self.target_label = tlabel.BBLOCK + # self.another_label = tlabel.RBLOCK + # else: + # self.target_label = tlabel.RBLOCK + # self.another_label = tlabel.BBLOCK + def find(self): + # 目标检测红/蓝方块 + ret = filter.find(var.second_block) + if ret > 0: + return True + return False + def exec(self): + car_stop() + calibrate_new(var.second_block, offset = 15, run = True, run_speed = 5) + logger.info("抓取块") + time.sleep(0.5) + by_cmd.send_angle_claw_arm(220) + by_cmd.send_angle_claw(63) + by_cmd.send_position_axis_z(30, 60) + time.sleep(1) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(25) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 100) + + # by_cmd.send_distance_x(15, 100) + time.sleep(2) + + pass + def nexec(self): + # TODO 完成不执行任务的空动作 + pass + def after(self): + var.task_speed = 0 + var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) + # 任务检查间隔 + time.sleep(7) + + +# 紧急转移 +class put_block(): + def init(self): + var.task_speed = 0 + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + logger.info("紧急转移初始化") + filter.switch_camera(1) + def find(self): + # 目标检测医院 + ret, box = filter.get(tlabel.HOSPITAL) + if ret > 0: + width = box[0][2] - box[0][0] + if width > 130: + return True + return False + def exec(self): + logger.info("找到医院") + car_stop() + calibrate_new(tlabel.HOSPITAL, offset = 7, run = False) + + by_cmd.send_distance_x(10, 100) + by_cmd.send_position_axis_z(30, 0) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(63) + time.sleep(1) + + # 放置第二個塊 + by_cmd.send_angle_storage(20) + by_cmd.send_position_axis_x(1, 130) + by_cmd.send_position_axis_z(30, 120) + time.sleep(1.5) + by_cmd.send_angle_claw_arm(180) + by_cmd.send_angle_claw(85) + # by_cmd.send_angle_storage(0) + time.sleep(1) + by_cmd.send_position_axis_z(30,70) + time.sleep(1) + by_cmd.send_angle_claw(25) + by_cmd.send_distance_x(-10, 110) + time.sleep(1) + by_cmd.send_position_axis_z(30, 110) + time.sleep(1) + by_cmd.send_angle_claw_arm(220) + time.sleep(1) + by_cmd.send_position_axis_z(30, 0) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1.5) + by_cmd.send_angle_claw(45) + time.sleep(1) + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_block"]["pid_kp"], cfg["put_block"]["pid_ki"], cfg["put_block"]["pid_kd"]) + # 下一动作预备位置 + while by_cmd.send_position_axis_z(30, 130) == -1: + pass + time.sleep(1) + while by_cmd.send_position_axis_x(1, 0) == -1: + pass + while by_cmd.send_angle_claw_arm(36) == -1: + pass + # 任务检查间隔 + # time.sleep(2) + + +# 整装上阵 +class get_bball(): + def init(self): + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + # by_cmd.send_position_axis_z(30, 135) + # time.sleep(0.5) + # by_cmd.send_position_axis_x(1, 0) + # time.sleep(2) + # by_cmd.send_angle_claw_arm(36) + + while (by_cmd.send_angle_storage(0) == -1): + by_cmd.send_angle_storage(0) + + # 调试 临时换源 + filter.switch_camera(1) + logger.info("整装上阵初始化") + # time.sleep(0.5) + + self.record = CountRecord(5) + def find(self): + # 目标检测蓝球 + ret = filter.find(tlabel.BBALL) + # ret = filter.find(tlabel.BBALL) or filter.find(tlabel.YBALL) + # TODO 此处使用连续检出判断感觉不是很好,黄球放置远时停车较晚,可能跟请求速度相关 + # if ret: + # if self.record(tlabel.BBALL): + # return True + if ret: + return True + return False + def exec(self): + logger.info("找到蓝色球") + car_stop() + time.sleep(0.5) + for _ in range(3): + calibrate_right_new(tlabel.BBALL, offset = 16, run = True, run_speed = 5) + logger.info("抓蓝色球") + time.sleep(0.5) + by_cmd.send_angle_claw_arm(36) + by_cmd.send_angle_claw(54) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1.2) + by_cmd.send_angle_claw(25) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(1) + by_cmd.send_position_axis_x(1, 60) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, -40) + time.sleep(0.5) + by_cmd.send_angle_claw_arm(80) + time.sleep(0.5) + by_cmd.send_angle_claw(54) + time.sleep(0.5) + by_cmd.send_angle_claw_arm(36) + time.sleep(1) + by_cmd.send_position_axis_z(30, 135) + # 继续向前走 + # by_cmd.send_speed_x(4) + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["get_bball"]["pid_kp"], cfg["get_bball"]["pid_ki"], cfg["get_bball"]["pid_kd"]) + # 下一动作预备位置 + by_cmd.send_angle_claw(30) + time.sleep(0.5) + while by_cmd.send_position_axis_z(30, 0) == -1: + pass + time.sleep(1) + # # 任务检查间隔 + # time.sleep(1) + + +# 通信抢修 +class up_tower(): + def init(self): + logger.info("通信抢修初始化") + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + filter.switch_camera(1) + def find(self): + # 目标检测通信塔 + ret = filter.find(tlabel.TOWER) + if ret: + return True + return False + def exec(self): + logger.info("找到塔") + car_stop() + time.sleep(1) + calibrate_new(tlabel.TOWER, offset = 20, run = True) + time.sleep(1) + # calibrate(tlabel.TOWER, 27, False, 6) + by_cmd.send_distance_x(-10, 120) + time.sleep(1) + # 上古參數 + # by_cmd.send_distance_y(-10, 50) + # 6_9 模型參數 + by_cmd.send_distance_y(-10, 40) + # 7_12_3 模型參數 + # by_cmd.send_distance_y(-10, 50) + # time.sleep(1) + car_stop() + # FIXME 如果下發 distance 後直接 car_stop,則 distance 執行時間僅由指令間處理延時決定 + # time.sleep(3) + # by_cmd.send_speed_y(-10) + # time.sleep(0.15) + + by_cmd.send_angle_zhuan(10) + time.sleep(12) + by_cmd.send_speed_y(10) + time.sleep(0.3) + car_stop() + by_cmd.send_angle_zhuan(0) + # while True: + # pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["up_tower"]["pid_kp"], cfg["up_tower"]["pid_ki"], cfg["up_tower"]["pid_kd"]) + # 下一动作预备位置 + by_cmd.send_position_axis_z(30, 0) + # 任务检查间隔 + time.sleep(4) + + +# 高空排险 +class get_rball(): + def init(self): + filter.switch_camera(1) + logger.info("高空排险初始化") + while (by_cmd.send_angle_camera(180) == -1): + by_cmd.send_angle_camera(180) + self.record = CountRecord(3) + def find(self): + # 目标检测红球 + ret = filter.find(tlabel.RBALL) + if ret > 0: + # TODO 连续两帧才开始减速 + if self.record(tlabel.RBALL): + var.task_speed = 5 + return True + else: + return False + def exec(self): + logger.info("找到红球") + var.task_speed = 0 + car_stop() + time.sleep(0.5) + # 靠近塔 + by_cmd.send_angle_scoop(20) + # 上古參數 + # by_cmd.send_distance_y(-15, 45) # 50 + # 6_9 參數 + by_cmd.send_distance_y(-15, 35) + time.sleep(2) + # 7_12_3 參數 + # by_cmd.send_distance_y(-15, 45) + # time.sleep(2) + car_stop() + calibrate_new(tlabel.RBALL,offset = 44, run = True) + time.sleep(1) + logger.info("抓红球") + # by_cmd.send_angle_scoop(15) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 170) + time.sleep(2.5) + by_cmd.send_angle_scoop(7) + time.sleep(0.5) + by_cmd.send_speed_y(15) + time.sleep(0.2) + car_stop() + # by_cmd.send_angle_omega(-55,30) + # while True: + # pass + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["get_rball"]["pid_kp"], cfg["get_rball"]["pid_ki"], cfg["get_rball"]["pid_kd"]) + # 任务检查间隔 + time.sleep(2) + + +# 派发物资 +class put_bball(): + def init(self): + logger.info("派发物资初始化") + filter.switch_camera(1) + while by_cmd.send_position_axis_z(30, 0) == -1: + pass + while by_cmd.send_angle_camera(90) == -1: + pass + def find(self): + ret = filter.find(tlabel.BASKET) + if ret > 0: + return True + else: + return False + def exec(self): + logger.info("找到篮筐") + car_stop() + calibrate_new(tlabel.BASKET,offset = -40, run = True, run_speed = 6) + # by_cmd.send_distance_x(10, 10) + # 向左运动 + # by_cmd.send_distance_y(-10, 35) + # by_cmd.send_angle_storage(10) + # time.sleep(1) + + by_cmd.send_angle_storage(50) + logger.info("把球放篮筐里") + + time.sleep(1) + # by_cmd.send_distance_y(10, 55) + by_cmd.send_angle_storage(20) + # time.sleep(1) + # car_stop() + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_bball"]["pid_kp"], cfg["put_bball"]["pid_ki"], cfg["put_bball"]["pid_kd"]) + # 任务检查间隔 + time.sleep(2) + + +# 物资盘点 +class put_hanoi1(): + def init(self): + logger.info("物资盘点 1 初始化") + filter.switch_camera(2) + def find(self): + ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) + if label == tlabel.RMARK: + if abs(error) < 55: + logger.info("向右拐") + utils.direction_right += 1 + return True + return False + elif label == tlabel.LMARK: + if abs(error) < 50: + logger.info("向左拐") + utils.direction_left += 1 + return True + return False + else: + return False + def exec(self): + global direction + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + time.sleep(0.2) + + by_cmd.send_position_axis_z(30, 130) + + # 校准牌子 + if utils.direction_right > utils.direction_left: + ret, error = filter.aim_near(tlabel.RMARK) + while not ret: + ret, error = filter.aim_near(tlabel.RMARK) + utils.direction = tlabel.RMARK + logger.info("应该向右转") + else: + ret, error = filter.aim_near(tlabel.LMARK) + while not ret: + ret, error = filter.aim_near(tlabel.LMARK) + utils.direction = tlabel.LMARK + logger.info("应该向左转") + + + # # 校准 omega + # if error > 0: + # by_cmd.send_angle_omega(-20,abs(var.lane_error)) + # else: + # by_cmd.send_angle_omega(20,abs(var.lane_error)) + # time.sleep(0.5) + # car_stop() + # time.sleep(0.5) + # by_cmd.send_distance_x(10, 200) + + # by_cmd.send_distance_x(10, 180) + by_cmd.send_distance_x(10, 250) + time.sleep(1) + car_stop() + + # 根据方向初始化执行器位置 + if utils.direction is tlabel.RMARK: + # FIXME 右侧的爪子会被 storage 挡住 + by_cmd.send_position_axis_x(1, 0) + by_cmd.send_angle_claw_arm(36) + by_cmd.send_angle_storage(0) + else: + by_cmd.send_position_axis_x(1, 150) + by_cmd.send_angle_claw_arm(220) + by_cmd.send_angle_storage(55) + time.sleep(1) + + by_cmd.send_position_axis_z(30, 10) + + if utils.direction_right > utils.direction_left: + utils.direction = tlabel.RMARK + # by_cmd.send_angle_omega(-25,430) + by_cmd.send_angle_omega(-45,238) + # by_cmd.send_angle_omega(-55,194) + time.sleep(2) + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + else: + utils.direction = tlabel.LMARK + # by_cmd.send_angle_omega(25,430) + by_cmd.send_angle_omega(45,238) + # by_cmd.send_angle_omega(55,194) + time.sleep(2) + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + time.sleep(0.5) + filter.switch_camera(1) + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_hanoi1"]["pid_kp"], cfg["put_hanoi1"]["pid_ki"], cfg["put_hanoi1"]["pid_kd"]) + pass + +class put_hanoi2(): + def __init__(self): + if cfg['put_hanoi2']['first_target'] == "lp": + self.target_label = tlabel.LPILLER + elif cfg['put_hanoi2']['first_target'] == "mp": + self.target_label = tlabel.MPILLER + elif cfg['put_hanoi2']['first_target'] == "sp": + self.target_label = tlabel.SPILLER + def init(self): + logger.info("物资盘点 2 初始化") + if utils.direction == tlabel.RMARK: + # 15 + self.offset = 19 + # self.platform_offset = -25 + self.platform_offset = -10 + else: + self.offset = 10 + #self.platform_offset = -30 + self.platform_offset = -15 + def find(self): + # ret, box = filter.get(self.target_label) + ret, box = filter.get(tlabel.TPLATFORM) + if ret: + var.task_speed = 8.5 + error = (box[0][2] + box[0][0] - 320) / 2 + self.platform_offset + if error > 0: + return True + return False + def exec(self): + logger.info(f"direction:{utils.direction.name}") + var.task_speed = 0 + car_stop() + + # if utils.direction is tlabel.RMARK: + # by_cmd.send_distance_y(10, 50) + # time.sleep(1) + + # time.sleep(0.5) + # by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) + # time.sleep(1) + ret = explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓大平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 130) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(40) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(40) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放大平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_x(1, 130) + time.sleep(1.5) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(1) + by_cmd.send_angle_claw(81) + time.sleep(0.5) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_x(1, 40) + time.sleep(1.5) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(1) + by_cmd.send_angle_claw(81) + time.sleep(0.5) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + + ret = explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓中平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 130) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(35) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(35) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放中平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 100) + time.sleep(2) + by_cmd.send_position_axis_x(1, 130) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(65) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 100) + time.sleep(2) + by_cmd.send_position_axis_x(1, 40) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(65) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + + ret = explore_calibrate_new(tlabel.SPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓小平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 130) + by_cmd.send_angle_claw(50) + time.sleep(2) + by_cmd.send_angle_claw(27) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 10) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(50) + time.sleep(2) + by_cmd.send_angle_claw(27) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 10) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放小平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 170) + time.sleep(1.5) + by_cmd.send_position_axis_x(1, 130) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(80) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + by_cmd.send_speed_y(15) + time.sleep(0.1) + car_stop() + pass + else: + by_cmd.send_position_axis_z(30, 170) + time.sleep(1.5) + by_cmd.send_position_axis_x(1, 40) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(80) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1.5) + # while True: + # pass + + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_hanoi2"]["pid_kp"], cfg["put_hanoi2"]["pid_ki"], cfg["put_hanoi2"]["pid_kd"]) + pass + +class put_hanoi3(): + def init(self): + while by_cmd.send_position_axis_z(30, 130) == -1: + pass + time.sleep(3) + logger.info("完成任务,爪回左侧") + while by_cmd.send_angle_claw_arm(128) == -1: + pass + time.sleep(0.5) + while by_cmd.send_position_axis_x(1, 0) == -1: + pass + time.sleep(1) + # while by_cmd.send_angle_claw_arm(220) == -1: + # pass + # while by_cmd.send_angle_claw(90) == -1: + # pass + def find(self): + time.sleep(1) + return True + def exec(self): + while by_cmd.send_position_axis_z(30, 100) == -1: + pass + pass + def nexec(self): + pass + def after(self): + by_cmd.send_position_axis_x(1, 150) + var.pid_turning.set(cfg["put_hanoi3"]["pid_kp"], cfg["put_hanoi3"]["pid_ki"], cfg["put_hanoi3"]["pid_kd"]) + +# 应急避险 第一阶段 找目标牌 +class move_area1(): + def init(self): + logger.info("应急避险第一阶段初始化") + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + def find(self): + ret = filter.find(tlabel.SIGN) + if ret: + return True + return False + def exec(self): + logger.info("找到标示牌") + # 停车 ocr 识别文字 调用大模型 + car_stop() + time.sleep(1) + # calibrate_new(tlabel.SIGN, offset = 8, run = True) + calibrate_new(tlabel.SIGN, offset = -30, run = True, run_speed = 5) + time.sleep(1) + + by_cmd.send_position_axis_x(1, 50) + time.sleep(1) + + # filter_w = (148, 560) + # filter_h = (165, 390) + + counts = 0 + while True: + ocr_socket.send_string("") + resp = ocr_socket.recv_pyobj() + var.llm_text = '' + counts += 1 + if resp.get('code') == 0: + for item in resp.get('content'): + + if item['probability']['average'] < 0.80: + continue + # box = item['location'] + # center_x = box['left'] + box['width'] / 2 + # center_y = box['top'] + box['height'] / 2 + # if center_x < filter_w[0] or center_x > filter_w[1] \ + # or center_y < filter_h[0] or center_y > filter_h[1]: + # continue + var.llm_text += item['words'] + break + if counts >= 2: + var.skip_llm_task_flag = True + return + logger.error(var.llm_text) + if len(var.llm_text) < 3: + var.skip_llm_task_flag = True + return + + var.task_speed = 9 # 12 + + pass + def nexec(self): + pass + def after(self): + # 任务检查间隔 + by_cmd.send_position_axis_x(1, 150) + # time.sleep(1) + by_cmd.send_angle_claw_arm(220) + + pass + +# 应急避险 第二阶段 找停车区域 +class move_area2(): + def init(self): + logger.info("应急避险第二阶段初始化") + self.offset = 15 + self.delta_x = 0 + self.delta_y = 0 + self.delta_omage = 0 + def find(self): + # time.sleep(0.001) + if var.skip_llm_task_flag: + return 5000 + ret, box = filter.get(tlabel.SHELTER) + if ret: + error = (box[0][2] + box[0][0] - 320) / 2 + self.offset + if abs(error) < 20: + return 5000 + return False + def sub_light(self, delay_time): + by_cmd.send_light(1) + time.sleep(delay_time) + by_cmd.send_light(0) + def sub_beep(self,delay_time): + by_cmd.send_beep(1) + time.sleep(delay_time) + by_cmd.send_beep(0) + def sub_move(self, x, y): + # FIXME 如果同時有 xy,是否會造成 delay 不足 + self.delta_x += x + self.delta_y += y + + if x != 0: + delay_time = int(abs(x) * 500) + if x > 0: + by_cmd.send_distance_x(15, delay_time) + else: + by_cmd.send_distance_x(-15, delay_time) + elif y != 0: + delay_time = int(abs(y) * 500) + if y > 0: # 向左 + by_cmd.send_distance_y(-15, delay_time) + else: + by_cmd.send_distance_y(15, delay_time) + time.sleep(delay_time / 500) + car_stop() + pass + def sub_turn(self, angle): + self.delta_omage += angle + delay_time = int(abs(angle) * 400 / 90) + if angle < 0: + # 左转 + by_cmd.send_angle_omega(+55, delay_time) + else: + # 右转 + by_cmd.send_angle_omega(-55, delay_time) + time.sleep(delay_time / 300 * 1.5) + def exec(self): + var.task_speed = 0 + if var.skip_llm_task_flag: + logger.error("ocr 识别出错 直接跳过改任务") + return + logger.info("开始寻找停车区域") + car_stop() + calibrate_new(tlabel.SHELTER, offset = 15, run = True) + time.sleep(0.5) + # 调用大模型 然后执行动作 + try: + resp = llm_bot.get_command_json(var.llm_text) + logger.info(resp) + except: + logger.error("大模型超时,跳过任务") + return + + + try: + resp_commands = eval(re.findall("```json(.*?)```", resp, re.S)[0]) + if len(resp_commands) == 0: + return + # 进入停车区域 + # by_cmd.send_speed_y(15) + by_cmd.send_distance_y(25, 180) + time.sleep(1) + # time.sleep(1.25) + car_stop() + logger.info(resp_commands) + for command in resp_commands: + logger.info(command) + if command['func'] == 'move': + self.sub_move(float(command['x']), float(command['y'])) + elif command['func'] == 'light': + self.sub_light(int(command['time'])) + elif command['func'] == 'beep': + self.sub_beep(int(command['time'])) + elif command['func'] == 'turn': + self.sub_turn(int(command['angle'])) + pass + else: + continue + time.sleep(0.5) + except: + pass + time.sleep(1) + # 回到原位 + + delay_time = int(abs(self.delta_omage) * 400 / 90) + if int(abs(self.delta_omage)) == 360: + delay_time = 0 + if self.delta_omage < 0: + # 左转 + by_cmd.send_angle_omega(-55, delay_time) + else: + # 右转 + by_cmd.send_angle_omega(55, delay_time) + time.sleep(delay_time / 300 * 1.5) + if self.delta_y > 0: + # 向左移动的距离就要比进入的时候少一些 因为 action 已经向左运动了 + delay_time = 180 - (self.delta_y * 500) + else: + delay_time = 180 + (abs(self.delta_y) * 500) + # 离开停车区域 + by_cmd.send_distance_y(-25, delay_time) + + time.sleep(delay_time * 5e-3) + + car_stop() + + # FIXME 移动距离指令下发后未完成,再发送速度指令,将不会清除未完成的速度值 + # by_cmd.send_distance_y(-15, 300) + pass + def nexec(self): + logger.warning("正在跳過大模型任務") + time.sleep(2) + pass + def after(self): + var.pid_turning.set(cfg["move_area"]["pid_kp"], cfg["move_area"]["pid_ki"], cfg["move_area"]["pid_kd"]) + by_cmd.send_position_axis_z(30, 0) + while by_cmd.send_angle_claw(90) == -1: + pass + time.sleep(2) + +# 扫黑除暴 +class kick_ass(): + def init(self): + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + logger.info("扫黑除暴初始化") + self.pos_gap1 = cfg['kick_ass']['pos_gap1'] + self.pos_gap2 = cfg['kick_ass']['pos_gap2'] + self.target_person = cfg_args['kick_ass']['target_person'] + + # by_cmd.send_angle_claw(15) + by_cmd.send_position_axis_x(1, 160) + + def find(self): + ret = filter.find(tlabel.SIGN) + if ret: + return True + return False + def exec(self): + logger.info("找到标示牌") + # 停的晚无需校准 omage + car_stop() + time.sleep(1) + calibrate_new(tlabel.SIGN, offset = 8, run = True) + by_cmd.send_angle_claw(15) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 60) + by_cmd.send_position_axis_x(1, 130) + + if self.target_person == 1: + target_distance = self.pos_gap1 + else: + target_distance = self.pos_gap1 + (self.target_person - 1) * self.pos_gap2 + (self.target_person - 1) * 10 + by_cmd.send_distance_x(10, target_distance) + logger.info(f"target distance {target_distance}") + time.sleep(1.5 + (self.target_person - 1) * 0.7 ) + car_stop() + + # by_cmd.send_angle_claw_arm(220) + # time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(3) + by_cmd.send_position_axis_x(1, 120) + time.sleep(1) + logger.debug("結束任務,前進四") + # by_cmd.send_speed_x(25) + # time.sleep(4) + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["kick_ass"]["pid_kp"], cfg["kick_ass"]["pid_ki"], cfg["kick_ass"]["pid_kd"]) \ No newline at end of file diff --git a/subtask_7123.py b/subtask_7123.py new file mode 100644 index 0000000..2507cec --- /dev/null +++ b/subtask_7123.py @@ -0,0 +1,1412 @@ +from enum import Enum +from loguru import logger +from utils import label_filter +from utils import tlabel +from utils import LLM +from utils import CountRecord +import utils +import toml +import zmq +import time +import variable as var +import action as act +import re +import threading +import ctypes +cfg = None +cfg_args = None +by_cmd = None +filter = None +llm_bot = None + +# 目标检测 socket 客户端 +context = None +socket = None + +context1 = None +ocr_socket = None + +global_skip_queue = None + +''' +description: main.py 里执行 引入全局变量 +param {*} _by_cmd 控制器对象 +return {*} +''' +def import_obj(_by_cmd, skip_queue): + + global by_cmd + global filter + global llm_bot + + global context + global socket + + global context1 + global ocr_socket + + global cfg + global cfg_args + global global_skip_queue + cfg = toml.load('/home/evan/Workplace/project_main/cfg_subtask.toml') # 加载任务配置 + cfg_args = toml.load('/home/evan/Workplace/project_main/cfg_args.toml') + by_cmd = _by_cmd + global_skip_queue = skip_queue + + # 目标检测 socket 客户端 + context = zmq.Context() + socket = context.socket(zmq.REQ) + socket.connect("tcp://localhost:6667") + logger.info("subtask yolo client init") + + # ocr socket 客户端 + # context1 = zmq.Context() + # ocr_socket = context1.socket(zmq.REQ) + # ocr_socket.connect("tcp://localhost:6668") + # logger.info("subtask ocr client init") + + filter = label_filter(socket) + if cfg['move_area']['llm_enable']: + llm_bot = LLM() +def car_stop(): + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_y(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) +# 蓝色球使用 +def calibrate_right_new(label, offset, run = True, run_speed = 3.5): + + not_found_counts = 0 + ret, error = filter.aim_right(label) + while not ret: + not_found_counts += 1 + if not_found_counts >= 20: + not_found_counts = 0 + error = -320 # error > 0 front run + logger.info("calibrate_right_new:找不到次数超过 20 帧 直接前进寻找") + break + ret, error = filter.aim_right(label) + + error += offset + if abs(error) > 10 and run: + if error > 0: + by_cmd.send_speed_x(-run_speed) + else: + by_cmd.send_speed_x(run_speed) + # 停的位置已经很接近目标,可以直接使用 distance 校准 + else: + return + + while True: + ret, error = filter.aim_right(label) + while not ret: + ret, error = filter.aim_right(label) + error += offset + # if ret: + if abs(error) <= 8: + car_stop() + logger.info("calibrate_right_new:行进时 误差小于 8 直接停车") + ret, error = filter.aim_right(label) + while not ret: + not_found_counts += 1 + if not_found_counts >=30: + return + ret, error = filter.aim_right(label) + error += offset + logger.info(f"calibrate_right_new:停车后的误差是{error}") + if abs(error) > 8: + logger.info(f"calibrate_right_new:停车后的误差大于 8 使用 distance 校准") + error = error * 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + + break +''' +description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车 +如果停车后 error > 8 则使用 distance 校准 +这个方法仅用于在视野里能找到的情况下进行校准,并不能实现边走边寻找 然后再校准 +param {*} label +param {*} offset +param {*} run +param {*} run_speed +return {*} +''' +def calibrate_new(label, offset, run = True, run_speed = 3.5): + not_found_counts = 0 + ret, box = filter.get(label) + while not ret: + not_found_counts += 1 + if not_found_counts >= 20: + not_found_counts = 0 + error = -320 # error > 0 front run + logger.info("calibrate_new:找不到次数超过 20 帧 直接前进寻找") + break + ret, box = filter.get(label) + + # 如果超过二十帧跳出,则此时 box 为空值,需要再校验 ret + if ret is True: + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if abs(error) > 10 and run: + if error > 0: + by_cmd.send_speed_x(-run_speed) + else: + by_cmd.send_speed_x(run_speed) + # 停的位置已经很接近目标,可以直接使用 distance 校准 + else: + if abs(error) > 8: + logger.info(f"calibrate_new:停车后误差{error}大于 8 使用 distance 校准") + # error = error # 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + return + logger.info(f"calibrate_new:停车后误差{error}小于 8 不校准") + return + while True: + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if ret: + if abs(error) <= 10: # 5 + car_stop() + logger.info("calibrate_new:行进时 误差小于 10 直接停车") + + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + logger.info(f"calibrate_new:停车后的误差是{error}") + if abs(error) > 8: + logger.info(f"calibrate_new:停车后的误差大于 8 使用 distance 校准") + error = error * 3 + # logger.error(f"error * 3 {error}") + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + break +# 弃用 distance 校准 +def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5): + # run_direc == 1 向前 + stop_error = 0 + if run_direc == 1: + by_cmd.send_speed_x(run_speed) + else: + by_cmd.send_speed_x(-run_speed) + + if label == tlabel.TPLATFORM: + stop_error = 8 + else: + stop_error = 15 + while True: + ret, box = filter.get(label) + while not ret: + if not global_skip_queue.empty(): + _ = global_skip_queue.get() + logger.error("跳过 explore_calibrate_new") + return False + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if ret: + # 校准速度越大 停车的条件越宽泛 20 15 + if abs(error) <= stop_error: + car_stop() + logger.info("explore_calibrate_new:行进时 误差小于 15 直接停车") + + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + error = (box[0][2] + box[0][0] - 320) / 2 + offset + logger.info(f"停车后像素误差:{error}") + + # logger.info(f"explore_calibrate_new:停车后的误差大于 8 使用 distance 校准") + # if abs(error) < 8: + # error = error * 3 + # else: + # error = error * 2 + # if error > 0: + # by_cmd.send_distance_x(-10, int(error)) + # else: + # by_cmd.send_distance_x(10, int(-error)) + + break + return True + +# 任务类 +class task: + def __init__(self, name, task_template, find_counts=10, enable=True): + self.enable = enable + self.task_t = task_template() + + self.counts = 0 + self.find_counts = int(find_counts) + self.name = name + + def init(self): + if hasattr(self.task_t, 'init') and callable(getattr(self.task_t, 'init', None)): + self.task_t.init() + else: + logger.warning("[Task ]# 该任务没有 init 方法") + def find(self): + if hasattr(self.task_t, 'find') and callable(getattr(self.task_t, 'find', None)): + return self.task_t.find() + else: + logger.warning("[Task ]# 该任务没有 find 方法") + def exec(self): + # 根据标志位确定是否执行该任务 + if self.enable is True: + if hasattr(self.task_t, 'exec') and callable(getattr(self.task_t, 'exec', None)): + logger.info(f"[Task ]# Executing task") + self.task_t.exec() + else: + logger.warning("[Task ]# 该任务没有 exec 方法") + else: + logger.warning(f"[Task ]# Skip task") + if hasattr(self.task_t, 'nexec') and callable(getattr(self.task_t, 'nexec', None)): + self.task_t.nexec() + else: + logger.warning("[Task ]# 该任务没有 nexec 方法") + def after(self): + if hasattr(self.task_t, 'after') and callable(getattr(self.task_t, 'after', None)): + logger.info(f"[Task ]# {self.name} 正在执行 after") + self.task_t.after() + logger.debug(f"[Task ]# Task completed") + else: + logger.warning("[Task ]# 该任务没有 after 方法") + +# 任务队列状态类 +class task_queuem_status(Enum): + IDEL = 0 + SEARCHING = 1 + EXECUTING = 2 + +# 任务队列类 非 EXECUTEING 时均执行 huigui,注意互斥操作 +class task_queuem(task): + # task_now = task(None, False) + def __init__(self, queue): + super(task_queuem, self) + self.queue = queue + self.status = task_queuem_status.IDEL + self.busy = True + logger.info(f"[TaskM]# Task num {self.queue.qsize()}") + # exec 线程 + self.exec_thread = None + def exec(self, skip_queue): + # 如果空闲状态则将下一个队列任务取出 + if self.status is task_queuem_status.IDEL: + if self.queue.qsize() == 0: + self.busy = False + logger.info(f"[TaskM]# Task queue empty, exit") + return False + self.task_now = self.queue.get() + + # 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息 + if self.task_now.enable is True: + self.status = task_queuem_status.SEARCHING + # 如果使能该任务则执行该任务的初始化动作 + self.task_now.init() + else: + self.status = task_queuem_status.EXECUTING + logger.info(f"[TaskM]# ---------------------->>>>") + # 阻塞搜索任务标志位 + elif self.status is task_queuem_status.SEARCHING: + logger.info(f"[{self.task_now.name}]# Start searching task target") + while True: + if not skip_queue.empty(): + _ = skip_queue.get() + logger.error(f"{self.task_now.name} 任务在 find 中已经被手动跳过") + self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + return True + ret = self.task_now.find() + self.task_now.counts += ret + if self.task_now.counts >= self.task_now.find_counts: + self.status = task_queuem_status.EXECUTING + break + # 执行任务函数 + elif self.status is task_queuem_status.EXECUTING: + if self.task_now.enable is True: + logger.info(f"[TaskM]# Start execute task function") + # self.exec_thread = threading.Thread(target=self.task_now.exec) + # # 启动线程 + # self.exec_thread.start() + # while True: + # if not self.exec_thread.is_alive(): + # break + # else: + # if not skip_queue.empty(): + # car_stop() + # thread_id = self.exec_thread.ident + # res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit)) + # _ = skip_queue.get() + # logger.error(f"{self.task_now.name} 任务在 exec 中已经被手动跳过") + # self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + # self.task_now.after() # 执行任务后处理 + # self.queue.task_done() # 弹出已执行的任务 + # return True + self.task_now.exec() # 执行当前任务函数 + self.status = task_queuem_status.IDEL # 执行完成后为退出巡线阻塞 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + logger.info(f"[TaskM]# <<<<----------------------") + else: + logger.info(f"[TaskM]# Start execute task function (nexec)") + self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + self.task_now.exec() # 执行当前任务函数 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + logger.info(f"[TaskM]# <<<<----------------------") + return True + +# 人员施救 +class get_block1(): + def init(self): + var.task_speed = 15 + act.cmd.camera(0) + act.cmd.z2(20, 60, 0) + filter.switch_camera(1) + # if cfg['get_block']['first_block'] == "blue": + # self.target_label = tlabel.BBLOCK + # self.another_label = tlabel.RBLOCK + # else: + # self.target_label = tlabel.RBLOCK + # self.another_label = tlabel.BBLOCK + # cfg['get_block']['first_block'] = "red" + self.target_label = [tlabel.RBLOCK, tlabel.BBLOCK] + self.target_counts = [0, 0] + def find(self): + # 目标检测红/蓝方块 + # ret = filter.find(self.target_label) + # if ret > 0: + # return True + # return False + ret = filter.find_mult(self.target_label) + self.target_counts[0] += ret[0] + self.target_counts[1] += ret[1] + if any(ret): + return True + return False + + + def exec(self): + car_stop() + if self.target_counts[0] > self.target_counts[1]: + var.first_block = tlabel.RBLOCK + var.second_block = tlabel.BBLOCK + else: + var.first_block = tlabel.BBLOCK + var.second_block = tlabel.RBLOCK + calibrate_new(var.first_block, offset = 15, run = True, run_speed = 5) + logger.info("抓取块") + + by_cmd.send_position_axis_z(30, 60) + time.sleep(1) + by_cmd.send_angle_claw(63) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(25) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 90) + time.sleep(0.5) + + by_cmd.send_angle_claw_arm(175) + time.sleep(0.1) + by_cmd.send_position_axis_x(1, 120) + time.sleep(1) + by_cmd.send_position_axis_z(30, 70) + time.sleep(0.1) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + + by_cmd.send_position_axis_z(30, 130) + time.sleep(1) + by_cmd.send_position_axis_x(1, 140) + by_cmd.send_angle_claw_arm(220) + time.sleep(0.5) + by_cmd.send_angle_storage(55) + time.sleep(1) + + pass + def nexec(self): + # TODO 完成不执行任务的空动作 + pass + def after(self): + var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) + pass + +class get_block2(): + def init(self): + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + filter.switch_camera(1) + # if cfg['get_block']['first_block'] == "red": + # self.target_label = tlabel.BBLOCK + # self.another_label = tlabel.RBLOCK + # else: + # self.target_label = tlabel.RBLOCK + # self.another_label = tlabel.BBLOCK + def find(self): + # 目标检测红/蓝方块 + ret = filter.find(var.second_block) + if ret > 0: + return True + return False + def exec(self): + car_stop() + calibrate_new(var.second_block, offset = 15, run = True, run_speed = 5) + logger.info("抓取块") + time.sleep(0.5) + by_cmd.send_angle_claw_arm(220) + by_cmd.send_angle_claw(63) + by_cmd.send_position_axis_z(30, 60) + time.sleep(1) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(25) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 100) + + # by_cmd.send_distance_x(15, 100) + time.sleep(2) + + pass + def nexec(self): + # TODO 完成不执行任务的空动作 + pass + def after(self): + var.task_speed = 0 + var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) + # 任务检查间隔 + time.sleep(7) + + +# 紧急转移 +class put_block(): + def init(self): + var.task_speed = 0 + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + logger.info("紧急转移初始化") + filter.switch_camera(1) + def find(self): + # 目标检测医院 + ret, box = filter.get(tlabel.HOSPITAL) + if ret > 0: + width = box[0][2] - box[0][0] + # 130 + if width > 140: + return True + return False + def exec(self): + logger.info("找到医院") + car_stop() + calibrate_new(tlabel.HOSPITAL, offset = 7, run = False) + + by_cmd.send_distance_x(10, 100) + by_cmd.send_position_axis_z(30, 0) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(63) + time.sleep(1) + + # 放置第二個塊 + by_cmd.send_angle_storage(20) + by_cmd.send_position_axis_x(1, 130) + by_cmd.send_position_axis_z(30, 120) + time.sleep(1.5) + by_cmd.send_angle_claw_arm(180) + by_cmd.send_angle_claw(85) + # by_cmd.send_angle_storage(0) + time.sleep(1) + by_cmd.send_position_axis_z(30,70) + time.sleep(1) + by_cmd.send_angle_claw(25) + by_cmd.send_distance_x(-10, 110) + time.sleep(1) + by_cmd.send_position_axis_z(30, 110) + time.sleep(1) + by_cmd.send_angle_claw_arm(220) + time.sleep(1) + by_cmd.send_position_axis_z(30, 0) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1.5) + by_cmd.send_angle_claw(45) + time.sleep(1) + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_block"]["pid_kp"], cfg["put_block"]["pid_ki"], cfg["put_block"]["pid_kd"]) + # 下一动作预备位置 + while by_cmd.send_position_axis_z(30, 130) == -1: + pass + time.sleep(1) + while by_cmd.send_position_axis_x(1, 0) == -1: + pass + while by_cmd.send_angle_claw_arm(36) == -1: + pass + # 任务检查间隔 + # time.sleep(2) + + +# 整装上阵 +class get_bball(): + def init(self): + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + # by_cmd.send_position_axis_z(30, 135) + # time.sleep(0.5) + # by_cmd.send_position_axis_x(1, 0) + # time.sleep(2) + # by_cmd.send_angle_claw_arm(36) + + while (by_cmd.send_angle_storage(0) == -1): + by_cmd.send_angle_storage(0) + + # 调试 临时换源 + filter.switch_camera(1) + logger.info("整装上阵初始化") + # time.sleep(0.5) + + self.record = CountRecord(5) + def find(self): + # 目标检测蓝球 + ret = filter.find(tlabel.BBALL) + # ret = filter.find(tlabel.BBALL) or filter.find(tlabel.YBALL) + # TODO 此处使用连续检出判断感觉不是很好,黄球放置远时停车较晚,可能跟请求速度相关 + # if ret: + # if self.record(tlabel.BBALL): + # return True + if ret: + return True + return False + def exec(self): + logger.info("找到蓝色球") + car_stop() + time.sleep(0.5) + for _ in range(3): + calibrate_right_new(tlabel.BBALL, offset = 16, run = True, run_speed = 5) + logger.info("抓蓝色球") + time.sleep(0.5) + by_cmd.send_angle_claw_arm(36) + by_cmd.send_angle_claw(54) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1.2) + by_cmd.send_angle_claw(25) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(1) + by_cmd.send_position_axis_x(1, 60) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, -40) + time.sleep(0.5) + by_cmd.send_angle_claw_arm(80) + time.sleep(0.5) + by_cmd.send_angle_claw(54) + time.sleep(0.5) + by_cmd.send_angle_claw_arm(36) + time.sleep(1) + by_cmd.send_position_axis_z(30, 135) + # 继续向前走 + # by_cmd.send_speed_x(4) + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["get_bball"]["pid_kp"], cfg["get_bball"]["pid_ki"], cfg["get_bball"]["pid_kd"]) + # 下一动作预备位置 + by_cmd.send_angle_claw(30) + time.sleep(0.5) + while by_cmd.send_position_axis_z(30, 0) == -1: + pass + time.sleep(1) + # # 任务检查间隔 + # time.sleep(1) + + +# 通信抢修 +class up_tower(): + def init(self): + logger.info("通信抢修初始化") + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + filter.switch_camera(1) + def find(self): + # 目标检测通信塔 + ret = filter.find(tlabel.TOWER) + if ret: + return True + return False + def exec(self): + logger.info("找到塔") + car_stop() + time.sleep(1) + calibrate_new(tlabel.TOWER, offset = 20, run = True) + time.sleep(1) + # calibrate(tlabel.TOWER, 27, False, 6) + by_cmd.send_distance_x(-10, 120) + time.sleep(1) + # 上古參數 + # by_cmd.send_distance_y(-10, 50) + # 6_9 模型參數 + # by_cmd.send_distance_y(-10, 40) + # 7_12_3 模型參數 + by_cmd.send_distance_y(-10, 75) + time.sleep(1) + car_stop() + # FIXME 如果下發 distance 後直接 car_stop,則 distance 執行時間僅由指令間處理延時決定 + # time.sleep(3) + # by_cmd.send_speed_y(-10) + # time.sleep(0.15) + + by_cmd.send_angle_zhuan(10) + time.sleep(12) + by_cmd.send_speed_y(10) + time.sleep(0.3) + car_stop() + by_cmd.send_angle_zhuan(0) + # while True: + # pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["up_tower"]["pid_kp"], cfg["up_tower"]["pid_ki"], cfg["up_tower"]["pid_kd"]) + # 下一动作预备位置 + by_cmd.send_position_axis_z(30, 0) + # 任务检查间隔 + time.sleep(4) + + +# 高空排险 +class get_rball(): + def init(self): + filter.switch_camera(1) + logger.info("高空排险初始化") + while (by_cmd.send_angle_camera(180) == -1): + by_cmd.send_angle_camera(180) + self.record = CountRecord(3) + def find(self): + # 目标检测红球 + ret = filter.find(tlabel.RBALL) + if ret > 0: + # TODO 连续两帧才开始减速 + if self.record(tlabel.RBALL): + var.task_speed = 5 + return True + else: + return False + def exec(self): + logger.info("找到红球") + var.task_speed = 0 + car_stop() + time.sleep(0.5) + # 靠近塔 + by_cmd.send_angle_scoop(20) + # 上古參數 + # by_cmd.send_distance_y(-15, 45) # 50 + # 6_9 參數 + # by_cmd.send_distance_y(-15, 35) + # time.sleep(2) + # 7_12_3 參數 + by_cmd.send_distance_y(-15, 50) + time.sleep(2) + car_stop() + calibrate_new(tlabel.RBALL,offset = 44, run = True) + time.sleep(1) + logger.info("抓红球") + # by_cmd.send_angle_scoop(15) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 170) + time.sleep(2.5) + by_cmd.send_angle_scoop(7) + time.sleep(0.5) + by_cmd.send_speed_y(15) + time.sleep(0.2) + car_stop() + # by_cmd.send_angle_omega(-55,30) + # while True: + # pass + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["get_rball"]["pid_kp"], cfg["get_rball"]["pid_ki"], cfg["get_rball"]["pid_kd"]) + # 任务检查间隔 + time.sleep(2) + + +# 派发物资 +class put_bball(): + def init(self): + logger.info("派发物资初始化") + filter.switch_camera(1) + while by_cmd.send_position_axis_z(30, 0) == -1: + pass + while by_cmd.send_angle_camera(90) == -1: + pass + def find(self): + ret = filter.find(tlabel.BASKET) + if ret > 0: + return True + else: + return False + def exec(self): + logger.info("找到篮筐") + car_stop() + calibrate_new(tlabel.BASKET,offset = -40, run = True, run_speed = 6) + # by_cmd.send_distance_x(10, 10) + # 向左运动 + # by_cmd.send_distance_y(-10, 35) + + by_cmd.send_speed_y(-10) + time.sleep(0.15) + car_stop() + by_cmd.send_angle_storage(50) + logger.info("把球放篮筐里") + + time.sleep(1) + by_cmd.send_speed_y(10) + time.sleep(0.15) + car_stop() + time.sleep(0.5) + by_cmd.send_angle_storage(20) + # time.sleep(1) + + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_bball"]["pid_kp"], cfg["put_bball"]["pid_ki"], cfg["put_bball"]["pid_kd"]) + # 任务检查间隔 + time.sleep(2) + + +# 物资盘点 +class put_hanoi1(): + def init(self): + logger.info("物资盘点 1 初始化") + filter.switch_camera(2) + def find(self): + ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) + if label == tlabel.RMARK: + if abs(error) < 55: + logger.info("向右拐") + utils.direction_right += 1 + return True + return False + elif label == tlabel.LMARK: + if abs(error) < 50: + logger.info("向左拐") + utils.direction_left += 1 + return True + return False + else: + return False + def exec(self): + global direction + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + time.sleep(0.2) + + by_cmd.send_position_axis_z(30, 130) + + # 校准牌子 + if utils.direction_right > utils.direction_left: + ret, error = filter.aim_near(tlabel.RMARK) + while not ret: + ret, error = filter.aim_near(tlabel.RMARK) + utils.direction = tlabel.RMARK + logger.info("应该向右转") + else: + ret, error = filter.aim_near(tlabel.LMARK) + while not ret: + ret, error = filter.aim_near(tlabel.LMARK) + utils.direction = tlabel.LMARK + logger.info("应该向左转") + + + # # 校准 omega + # if error > 0: + # by_cmd.send_angle_omega(-20,abs(var.lane_error)) + # else: + # by_cmd.send_angle_omega(20,abs(var.lane_error)) + # time.sleep(0.5) + # car_stop() + # time.sleep(0.5) + # by_cmd.send_distance_x(10, 200) + + by_cmd.send_distance_x(10, 180) + time.sleep(1) + car_stop() + + # 根据方向初始化执行器位置 + if utils.direction is tlabel.RMARK: + # FIXME 右侧的爪子会被 storage 挡住 + by_cmd.send_position_axis_x(1, 0) + by_cmd.send_angle_claw_arm(36) + by_cmd.send_angle_storage(0) + else: + by_cmd.send_position_axis_x(1, 150) + by_cmd.send_angle_claw_arm(220) + by_cmd.send_angle_storage(55) + time.sleep(1) + + by_cmd.send_position_axis_z(30, 10) + + if utils.direction_right > utils.direction_left: + utils.direction = tlabel.RMARK + # by_cmd.send_angle_omega(-25,430) + by_cmd.send_angle_omega(-45,238) + # by_cmd.send_angle_omega(-55,194) + time.sleep(2) + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + else: + utils.direction = tlabel.LMARK + # by_cmd.send_angle_omega(25,430) + by_cmd.send_angle_omega(45,238) + # by_cmd.send_angle_omega(55,194) + time.sleep(2) + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + time.sleep(0.5) + filter.switch_camera(1) + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_hanoi1"]["pid_kp"], cfg["put_hanoi1"]["pid_ki"], cfg["put_hanoi1"]["pid_kd"]) + pass + +class put_hanoi2(): + def __init__(self): + if cfg['put_hanoi2']['first_target'] == "lp": + self.target_label = tlabel.LPILLER + elif cfg['put_hanoi2']['first_target'] == "mp": + self.target_label = tlabel.MPILLER + elif cfg['put_hanoi2']['first_target'] == "sp": + self.target_label = tlabel.SPILLER + def init(self): + logger.info("物资盘点 2 初始化") + if utils.direction == tlabel.RMARK: + # 15 + self.offset = 19 + # self.platform_offset = -25 + self.platform_offset = -10 + else: + self.offset = 10 + #self.platform_offset = -30 + self.platform_offset = -15 + def find(self): + # ret, box = filter.get(self.target_label) + ret, box = filter.get(tlabel.TPLATFORM) + if ret: + var.task_speed = 8.5 + error = (box[0][2] + box[0][0] - 320) / 2 + self.platform_offset + if error > 0: + return True + return False + def exec(self): + logger.info(f"direction:{utils.direction.name}") + var.task_speed = 0 + car_stop() + + # if utils.direction is tlabel.RMARK: + # by_cmd.send_distance_y(10, 50) + # time.sleep(1) + + # time.sleep(0.5) + # by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) + # time.sleep(1) + ret = explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓大平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 160) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(40) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(40) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放大平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_x(1, 160) + time.sleep(1.5) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(1) + by_cmd.send_angle_claw(81) + time.sleep(0.5) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_x(1, 40) + time.sleep(1.5) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(1) + by_cmd.send_angle_claw(81) + time.sleep(0.5) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + + ret = explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓中平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 160) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(35) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(35) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放中平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 100) + time.sleep(2) + by_cmd.send_position_axis_x(1, 160) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(65) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 100) + time.sleep(2) + by_cmd.send_position_axis_x(1, 40) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(65) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + + ret = explore_calibrate_new(tlabel.SPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓小平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 160) + by_cmd.send_angle_claw(50) + time.sleep(2) + by_cmd.send_angle_claw(27) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 10) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(50) + time.sleep(2) + by_cmd.send_angle_claw(27) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 10) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放小平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 170) + time.sleep(1.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(80) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1.5) + # 6_9 參數 + # time.sleep(1) + # by_cmd.send_speed_y(15) + # time.sleep(0.1) + # car_stop() + pass + else: + by_cmd.send_position_axis_z(30, 170) + time.sleep(1.5) + by_cmd.send_position_axis_x(1, 40) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(80) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1.5) + # while True: + # pass + + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_hanoi2"]["pid_kp"], cfg["put_hanoi2"]["pid_ki"], cfg["put_hanoi2"]["pid_kd"]) + pass + +class put_hanoi3(): + def init(self): + while by_cmd.send_position_axis_z(30, 130) == -1: + pass + time.sleep(3) + logger.info("完成任务,爪回左侧") + while by_cmd.send_angle_claw_arm(128) == -1: + pass + time.sleep(0.5) + while by_cmd.send_position_axis_x(1, 0) == -1: + pass + time.sleep(1) + # while by_cmd.send_angle_claw_arm(220) == -1: + # pass + # while by_cmd.send_angle_claw(90) == -1: + # pass + def find(self): + time.sleep(1) + return True + def exec(self): + while by_cmd.send_position_axis_z(30, 100) == -1: + pass + pass + def nexec(self): + pass + def after(self): + by_cmd.send_position_axis_x(1, 150) + var.pid_turning.set(cfg["put_hanoi3"]["pid_kp"], cfg["put_hanoi3"]["pid_ki"], cfg["put_hanoi3"]["pid_kd"]) + +# 应急避险 第一阶段 找目标牌 +class move_area1(): + def init(self): + logger.info("应急避险第一阶段初始化") + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + def find(self): + ret = filter.find(tlabel.SIGN) + if ret: + return True + return False + def exec(self): + logger.info("找到标示牌") + # 停车 ocr 识别文字 调用大模型 + car_stop() + time.sleep(1) + # calibrate_new(tlabel.SIGN, offset = 8, run = True) + calibrate_new(tlabel.SIGN, offset = -30, run = True, run_speed = 5) + time.sleep(1) + + by_cmd.send_position_axis_x(1, 50) + time.sleep(1) + + # filter_w = (148, 560) + # filter_h = (165, 390) + + counts = 0 + while True: + ocr_socket.send_string("") + resp = ocr_socket.recv_pyobj() + var.llm_text = '' + counts += 1 + if resp.get('code') == 0: + for item in resp.get('content'): + + if item['probability']['average'] < 0.80: + continue + # box = item['location'] + # center_x = box['left'] + box['width'] / 2 + # center_y = box['top'] + box['height'] / 2 + # if center_x < filter_w[0] or center_x > filter_w[1] \ + # or center_y < filter_h[0] or center_y > filter_h[1]: + # continue + var.llm_text += item['words'] + break + if counts >= 2: + var.skip_llm_task_flag = True + return + logger.error(var.llm_text) + if len(var.llm_text) < 3: + var.skip_llm_task_flag = True + return + + var.task_speed = 9 # 12 + + pass + def nexec(self): + pass + def after(self): + # 任务检查间隔 + by_cmd.send_position_axis_x(1, 150) + # time.sleep(1) + by_cmd.send_angle_claw_arm(220) + + pass + +# 应急避险 第二阶段 找停车区域 +class move_area2(): + def init(self): + logger.info("应急避险第二阶段初始化") + self.offset = 15 + self.delta_x = 0 + self.delta_y = 0 + self.delta_omage = 0 + def find(self): + # time.sleep(0.001) + if var.skip_llm_task_flag: + return 5000 + ret, box = filter.get(tlabel.SHELTER) + if ret: + error = (box[0][2] + box[0][0] - 320) / 2 + self.offset + if abs(error) < 20: + return 5000 + return False + def sub_light(self, delay_time): + by_cmd.send_light(1) + time.sleep(delay_time) + by_cmd.send_light(0) + def sub_beep(self,delay_time): + by_cmd.send_beep(1) + time.sleep(delay_time) + by_cmd.send_beep(0) + def sub_move(self, x, y): + # FIXME 如果同時有 xy,是否會造成 delay 不足 + self.delta_x += x + self.delta_y += y + + if x != 0: + delay_time = int(abs(x) * 500) + if x > 0: + by_cmd.send_distance_x(15, delay_time) + else: + by_cmd.send_distance_x(-15, delay_time) + elif y != 0: + delay_time = int(abs(y) * 500) + if y > 0: # 向左 + by_cmd.send_distance_y(-15, delay_time) + else: + by_cmd.send_distance_y(15, delay_time) + time.sleep(delay_time / 500) + car_stop() + pass + def sub_turn(self, angle): + self.delta_omage += angle + delay_time = int(abs(angle) * 400 / 90) + if angle < 0: + # 左转 + by_cmd.send_angle_omega(+55, delay_time) + else: + # 右转 + by_cmd.send_angle_omega(-55, delay_time) + time.sleep(delay_time / 300 * 1.5) + def exec(self): + var.task_speed = 0 + if var.skip_llm_task_flag: + logger.error("ocr 识别出错 直接跳过改任务") + return + logger.info("开始寻找停车区域") + car_stop() + calibrate_new(tlabel.SHELTER, offset = 15, run = True) + time.sleep(0.5) + # 调用大模型 然后执行动作 + try: + resp = llm_bot.get_command_json(var.llm_text) + logger.info(resp) + except: + logger.error("大模型超时,跳过任务") + return + + + try: + resp_commands = eval(re.findall("```json(.*?)```", resp, re.S)[0]) + if len(resp_commands) == 0: + return + # 进入停车区域 + # by_cmd.send_speed_y(15) + by_cmd.send_distance_y(25, 180) + time.sleep(1) + # time.sleep(1.25) + car_stop() + logger.info(resp_commands) + for command in resp_commands: + logger.info(command) + if command['func'] == 'move': + self.sub_move(float(command['x']), float(command['y'])) + elif command['func'] == 'light': + self.sub_light(int(command['time'])) + elif command['func'] == 'beep': + self.sub_beep(int(command['time'])) + elif command['func'] == 'turn': + self.sub_turn(int(command['angle'])) + pass + else: + continue + time.sleep(0.5) + except: + pass + time.sleep(1) + # 回到原位 + + delay_time = int(abs(self.delta_omage) * 400 / 90) + if int(abs(self.delta_omage)) == 360: + delay_time = 0 + if self.delta_omage < 0: + # 左转 + by_cmd.send_angle_omega(-55, delay_time) + else: + # 右转 + by_cmd.send_angle_omega(55, delay_time) + time.sleep(delay_time / 300 * 1.5) + if self.delta_y > 0: + # 向左移动的距离就要比进入的时候少一些 因为 action 已经向左运动了 + delay_time = 180 - (self.delta_y * 500) + else: + delay_time = 180 + (abs(self.delta_y) * 500) + # 离开停车区域 + by_cmd.send_distance_y(-25, delay_time) + + time.sleep(delay_time * 5e-3) + + car_stop() + + # FIXME 移动距离指令下发后未完成,再发送速度指令,将不会清除未完成的速度值 + # by_cmd.send_distance_y(-15, 300) + pass + def nexec(self): + logger.warning("正在跳過大模型任務") + time.sleep(2) + pass + def after(self): + var.pid_turning.set(cfg["move_area"]["pid_kp"], cfg["move_area"]["pid_ki"], cfg["move_area"]["pid_kd"]) + by_cmd.send_position_axis_z(30, 0) + while by_cmd.send_angle_claw(90) == -1: + pass + time.sleep(2) + +# 扫黑除暴 +class kick_ass(): + def init(self): + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + logger.info("扫黑除暴初始化") + self.pos_gap1 = cfg['kick_ass']['pos_gap1'] + self.pos_gap2 = cfg['kick_ass']['pos_gap2'] + self.target_person = cfg_args['kick_ass']['target_person'] + + # by_cmd.send_angle_claw(15) + by_cmd.send_position_axis_x(1, 160) + + def find(self): + ret = filter.find(tlabel.SIGN) + if ret: + return True + return False + def exec(self): + logger.info("找到标示牌") + # 停的晚无需校准 omage + car_stop() + time.sleep(1) + calibrate_new(tlabel.SIGN, offset = 8, run = True) + by_cmd.send_angle_claw(15) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 60) + by_cmd.send_position_axis_x(1, 130) + + if self.target_person == 1: + target_distance = self.pos_gap1 + else: + target_distance = self.pos_gap1 + (self.target_person - 1) * self.pos_gap2 + (self.target_person - 1) * 10 + by_cmd.send_distance_x(10, target_distance) + logger.info(f"target distance {target_distance}") + time.sleep(1.5 + (self.target_person - 1) * 0.7 ) + car_stop() + + # by_cmd.send_angle_claw_arm(220) + # time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(3) + by_cmd.send_position_axis_x(1, 120) + time.sleep(1) + logger.debug("結束任務,前進四") + var.task_speed = 25 + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["kick_ass"]["pid_kp"], cfg["kick_ass"]["pid_ki"], cfg["kick_ass"]["pid_kd"]) \ No newline at end of file diff --git a/subtask_7126.py b/subtask_7126.py new file mode 100644 index 0000000..9baa9b0 --- /dev/null +++ b/subtask_7126.py @@ -0,0 +1,1406 @@ +from enum import Enum +from loguru import logger +from utils import label_filter +from utils import tlabel +from utils import LLM +from utils import CountRecord +import utils +import toml +import zmq +import time +import variable as var +import action as act +import re +import threading +import ctypes +cfg = None +cfg_args = None +by_cmd = None +filter = None +llm_bot = None + +# 目标检测 socket 客户端 +context = None +socket = None + +context1 = None +ocr_socket = None + +global_skip_queue = None + +''' +description: main.py 里执行 引入全局变量 +param {*} _by_cmd 控制器对象 +return {*} +''' +def import_obj(_by_cmd, skip_queue): + + global by_cmd + global filter + global llm_bot + + global context + global socket + + global context1 + global ocr_socket + + global cfg + global cfg_args + global global_skip_queue + cfg = toml.load('/home/evan/Workplace/project_main/cfg_subtask.toml') # 加载任务配置 + cfg_args = toml.load('/home/evan/Workplace/project_main/cfg_args.toml') + by_cmd = _by_cmd + global_skip_queue = skip_queue + + # 目标检测 socket 客户端 + context = zmq.Context() + socket = context.socket(zmq.REQ) + socket.connect("tcp://localhost:6667") + logger.info("subtask yolo client init") + + # ocr socket 客户端 + # context1 = zmq.Context() + # ocr_socket = context1.socket(zmq.REQ) + # ocr_socket.connect("tcp://localhost:6668") + # logger.info("subtask ocr client init") + + filter = label_filter(socket) + if cfg['move_area']['llm_enable']: + llm_bot = LLM() +def car_stop(): + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_y(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) +# 蓝色球使用 +def calibrate_right_new(label, offset, run = True, run_speed = 3.5): + + not_found_counts = 0 + ret, error = filter.aim_right(label) + while not ret: + not_found_counts += 1 + if not_found_counts >= 20: + not_found_counts = 0 + error = -320 # error > 0 front run + logger.info("calibrate_right_new:找不到次数超过 20 帧 直接前进寻找") + break + ret, error = filter.aim_right(label) + + error += offset + if abs(error) > 10 and run: + if error > 0: + by_cmd.send_speed_x(-run_speed) + else: + by_cmd.send_speed_x(run_speed) + # 停的位置已经很接近目标,可以直接使用 distance 校准 + else: + return + + while True: + ret, error = filter.aim_right(label) + while not ret: + ret, error = filter.aim_right(label) + error += offset + # if ret: + if abs(error) <= 8: + car_stop() + logger.info("calibrate_right_new:行进时 误差小于 8 直接停车") + ret, error = filter.aim_right(label) + while not ret: + not_found_counts += 1 + if not_found_counts >=30: + return + ret, error = filter.aim_right(label) + error += offset + logger.info(f"calibrate_right_new:停车后的误差是{error}") + if abs(error) > 8: + logger.info(f"calibrate_right_new:停车后的误差大于 8 使用 distance 校准") + error = error * 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + + break +''' +description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车 +如果停车后 error > 8 则使用 distance 校准 +这个方法仅用于在视野里能找到的情况下进行校准,并不能实现边走边寻找 然后再校准 +param {*} label +param {*} offset +param {*} run +param {*} run_speed +return {*} +''' +def calibrate_new(label, offset, run = True, run_speed = 3.5): + not_found_counts = 0 + ret, box = filter.get(label) + while not ret: + not_found_counts += 1 + if not_found_counts >= 20: + not_found_counts = 0 + error = -320 # error > 0 front run + logger.info("calibrate_new:找不到次数超过 20 帧 直接前进寻找") + break + ret, box = filter.get(label) + + # 如果超过二十帧跳出,则此时 box 为空值,需要再校验 ret + if ret is True: + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if abs(error) > 10 and run: + if error > 0: + by_cmd.send_speed_x(-run_speed) + else: + by_cmd.send_speed_x(run_speed) + # 停的位置已经很接近目标,可以直接使用 distance 校准 + else: + if abs(error) > 8: + logger.info(f"calibrate_new:停车后误差{error}大于 8 使用 distance 校准") + # error = error # 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + return + logger.info(f"calibrate_new:停车后误差{error}小于 8 不校准") + return + while True: + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if ret: + if abs(error) <= 10: # 5 + car_stop() + logger.info("calibrate_new:行进时 误差小于 10 直接停车") + + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + logger.info(f"calibrate_new:停车后的误差是{error}") + if abs(error) > 8: + logger.info(f"calibrate_new:停车后的误差大于 8 使用 distance 校准") + error = error * 3 + # logger.error(f"error * 3 {error}") + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + break +# 弃用 distance 校准 +def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5): + # run_direc == 1 向前 + stop_error = 0 + if run_direc == 1: + by_cmd.send_speed_x(run_speed) + else: + by_cmd.send_speed_x(-run_speed) + + if label == tlabel.TPLATFORM: + stop_error = 8 + else: + stop_error = 15 + while True: + ret, box = filter.get(label) + while not ret: + if not global_skip_queue.empty(): + _ = global_skip_queue.get() + logger.error("跳过 explore_calibrate_new") + return False + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if ret: + # 校准速度越大 停车的条件越宽泛 20 15 + if abs(error) <= stop_error: + car_stop() + logger.info("explore_calibrate_new:行进时 误差小于 15 直接停车") + + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + error = (box[0][2] + box[0][0] - 320) / 2 + offset + logger.info(f"停车后像素误差:{error}") + + # logger.info(f"explore_calibrate_new:停车后的误差大于 8 使用 distance 校准") + # if abs(error) < 8: + # error = error * 3 + # else: + # error = error * 2 + # if error > 0: + # by_cmd.send_distance_x(-10, int(error)) + # else: + # by_cmd.send_distance_x(10, int(-error)) + + break + return True + +# 任务类 +class task: + def __init__(self, name, task_template, find_counts=10, enable=True): + self.enable = enable + self.task_t = task_template() + + self.counts = 0 + self.find_counts = int(find_counts) + self.name = name + + def init(self): + if hasattr(self.task_t, 'init') and callable(getattr(self.task_t, 'init', None)): + self.task_t.init() + else: + logger.warning("[Task ]# 该任务没有 init 方法") + def find(self): + if hasattr(self.task_t, 'find') and callable(getattr(self.task_t, 'find', None)): + return self.task_t.find() + else: + logger.warning("[Task ]# 该任务没有 find 方法") + def exec(self): + # 根据标志位确定是否执行该任务 + if self.enable is True: + if hasattr(self.task_t, 'exec') and callable(getattr(self.task_t, 'exec', None)): + logger.info(f"[Task ]# Executing task") + self.task_t.exec() + else: + logger.warning("[Task ]# 该任务没有 exec 方法") + else: + logger.warning(f"[Task ]# Skip task") + if hasattr(self.task_t, 'nexec') and callable(getattr(self.task_t, 'nexec', None)): + self.task_t.nexec() + else: + logger.warning("[Task ]# 该任务没有 nexec 方法") + def after(self): + if hasattr(self.task_t, 'after') and callable(getattr(self.task_t, 'after', None)): + logger.info(f"[Task ]# {self.name} 正在执行 after") + self.task_t.after() + logger.debug(f"[Task ]# Task completed") + else: + logger.warning("[Task ]# 该任务没有 after 方法") + +# 任务队列状态类 +class task_queuem_status(Enum): + IDEL = 0 + SEARCHING = 1 + EXECUTING = 2 + +# 任务队列类 非 EXECUTEING 时均执行 huigui,注意互斥操作 +class task_queuem(task): + # task_now = task(None, False) + def __init__(self, queue): + super(task_queuem, self) + self.queue = queue + self.status = task_queuem_status.IDEL + self.busy = True + logger.info(f"[TaskM]# Task num {self.queue.qsize()}") + # exec 线程 + self.exec_thread = None + def exec(self, skip_queue): + # 如果空闲状态则将下一个队列任务取出 + if self.status is task_queuem_status.IDEL: + if self.queue.qsize() == 0: + self.busy = False + logger.info(f"[TaskM]# Task queue empty, exit") + return False + self.task_now = self.queue.get() + + # 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息 + if self.task_now.enable is True: + self.status = task_queuem_status.SEARCHING + # 如果使能该任务则执行该任务的初始化动作 + self.task_now.init() + else: + self.status = task_queuem_status.EXECUTING + logger.info(f"[TaskM]# ---------------------->>>>") + # 阻塞搜索任务标志位 + elif self.status is task_queuem_status.SEARCHING: + logger.info(f"[{self.task_now.name}]# Start searching task target") + while True: + if not skip_queue.empty(): + _ = skip_queue.get() + logger.error(f"{self.task_now.name} 任务在 find 中已经被手动跳过") + self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + return True + ret = self.task_now.find() + self.task_now.counts += ret + if self.task_now.counts >= self.task_now.find_counts: + self.status = task_queuem_status.EXECUTING + break + # 执行任务函数 + elif self.status is task_queuem_status.EXECUTING: + if self.task_now.enable is True: + logger.info(f"[TaskM]# Start execute task function") + # self.exec_thread = threading.Thread(target=self.task_now.exec) + # # 启动线程 + # self.exec_thread.start() + # while True: + # if not self.exec_thread.is_alive(): + # break + # else: + # if not skip_queue.empty(): + # car_stop() + # thread_id = self.exec_thread.ident + # res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit)) + # _ = skip_queue.get() + # logger.error(f"{self.task_now.name} 任务在 exec 中已经被手动跳过") + # self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + # self.task_now.after() # 执行任务后处理 + # self.queue.task_done() # 弹出已执行的任务 + # return True + self.task_now.exec() # 执行当前任务函数 + self.status = task_queuem_status.IDEL # 执行完成后为退出巡线阻塞 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + logger.info(f"[TaskM]# <<<<----------------------") + else: + logger.info(f"[TaskM]# Start execute task function (nexec)") + self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 + self.task_now.exec() # 执行当前任务函数 + self.task_now.after() # 执行任务后处理 + self.queue.task_done() # 弹出已执行的任务 + logger.info(f"[TaskM]# <<<<----------------------") + return True + +# 人员施救 +class get_block1(): + def init(self): + var.task_speed = 15 + act.cmd.camera(0) + act.cmd.z2(20, 60, 0) + filter.switch_camera(1) + # if cfg['get_block']['first_block'] == "blue": + # self.target_label = tlabel.BBLOCK + # self.another_label = tlabel.RBLOCK + # else: + # self.target_label = tlabel.RBLOCK + # self.another_label = tlabel.BBLOCK + # cfg['get_block']['first_block'] = "red" + self.target_label = [tlabel.RBLOCK, tlabel.BBLOCK] + self.target_counts = [0, 0] + def find(self): + # 目标检测红/蓝方块 + # ret = filter.find(self.target_label) + # if ret > 0: + # return True + # return False + ret = filter.find_mult(self.target_label) + self.target_counts[0] += ret[0] + self.target_counts[1] += ret[1] + if any(ret): + return True + return False + + + def exec(self): + car_stop() + if self.target_counts[0] > self.target_counts[1]: + var.first_block = tlabel.RBLOCK + var.second_block = tlabel.BBLOCK + else: + var.first_block = tlabel.BBLOCK + var.second_block = tlabel.RBLOCK + calibrate_new(var.first_block, offset = 15, run = True, run_speed = 5) + logger.info("抓取块") + + by_cmd.send_position_axis_z(30, 60) + time.sleep(1) + by_cmd.send_angle_claw(63) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(25) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 90) + time.sleep(0.5) + + by_cmd.send_angle_claw_arm(175) + time.sleep(0.1) + by_cmd.send_position_axis_x(1, 120) + time.sleep(1) + by_cmd.send_position_axis_z(30, 70) + time.sleep(0.1) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + + by_cmd.send_position_axis_z(30, 130) + time.sleep(1) + by_cmd.send_position_axis_x(1, 140) + by_cmd.send_angle_claw_arm(220) + time.sleep(0.5) + by_cmd.send_angle_storage(55) + time.sleep(1) + + pass + def nexec(self): + # TODO 完成不执行任务的空动作 + pass + def after(self): + var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) + pass + +class get_block2(): + def init(self): + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + filter.switch_camera(1) + # if cfg['get_block']['first_block'] == "red": + # self.target_label = tlabel.BBLOCK + # self.another_label = tlabel.RBLOCK + # else: + # self.target_label = tlabel.RBLOCK + # self.another_label = tlabel.BBLOCK + def find(self): + # 目标检测红/蓝方块 + ret = filter.find(var.second_block) + if ret > 0: + return True + return False + def exec(self): + car_stop() + calibrate_new(var.second_block, offset = 15, run = True, run_speed = 5) + logger.info("抓取块") + time.sleep(0.5) + by_cmd.send_angle_claw_arm(220) + by_cmd.send_angle_claw(63) + by_cmd.send_position_axis_z(30, 60) + time.sleep(1) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(25) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 100) + + # by_cmd.send_distance_x(15, 100) + time.sleep(2) + + pass + def nexec(self): + # TODO 完成不执行任务的空动作 + pass + def after(self): + var.task_speed = 0 + var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) + # 任务检查间隔 + time.sleep(7) + + +# 紧急转移 +class put_block(): + def init(self): + var.task_speed = 0 + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + logger.info("紧急转移初始化") + filter.switch_camera(1) + def find(self): + # 目标检测医院 + ret, box = filter.get(tlabel.HOSPITAL) + if ret > 0: + width = box[0][2] - box[0][0] + if width > 130: + return True + return False + def exec(self): + logger.info("找到医院") + car_stop() + calibrate_new(tlabel.HOSPITAL, offset = 7, run = False) + + by_cmd.send_distance_x(10, 100) + by_cmd.send_position_axis_z(30, 0) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1) + by_cmd.send_angle_claw(63) + time.sleep(1) + + # 放置第二個塊 + by_cmd.send_angle_storage(20) + by_cmd.send_position_axis_x(1, 130) + by_cmd.send_position_axis_z(30, 120) + time.sleep(1.5) + by_cmd.send_angle_claw_arm(180) + by_cmd.send_angle_claw(85) + # by_cmd.send_angle_storage(0) + time.sleep(1) + by_cmd.send_position_axis_z(30,70) + time.sleep(1) + by_cmd.send_angle_claw(25) + by_cmd.send_distance_x(-10, 110) + time.sleep(1) + by_cmd.send_position_axis_z(30, 110) + time.sleep(1) + by_cmd.send_angle_claw_arm(220) + time.sleep(1) + by_cmd.send_position_axis_z(30, 0) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(1.5) + by_cmd.send_angle_claw(45) + time.sleep(1) + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_block"]["pid_kp"], cfg["put_block"]["pid_ki"], cfg["put_block"]["pid_kd"]) + # 下一动作预备位置 + while by_cmd.send_position_axis_z(30, 130) == -1: + pass + time.sleep(1) + while by_cmd.send_position_axis_x(1, 0) == -1: + pass + while by_cmd.send_angle_claw_arm(36) == -1: + pass + # 任务检查间隔 + # time.sleep(2) + + +# 整装上阵 +class get_bball(): + def init(self): + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + # by_cmd.send_position_axis_z(30, 135) + # time.sleep(0.5) + # by_cmd.send_position_axis_x(1, 0) + # time.sleep(2) + # by_cmd.send_angle_claw_arm(36) + + while (by_cmd.send_angle_storage(0) == -1): + by_cmd.send_angle_storage(0) + + # 调试 临时换源 + filter.switch_camera(1) + logger.info("整装上阵初始化") + # time.sleep(0.5) + + self.record = CountRecord(5) + def find(self): + # 目标检测蓝球 + ret = filter.find(tlabel.BBALL) + # ret = filter.find(tlabel.BBALL) or filter.find(tlabel.YBALL) + # TODO 此处使用连续检出判断感觉不是很好,黄球放置远时停车较晚,可能跟请求速度相关 + # if ret: + # if self.record(tlabel.BBALL): + # return True + if ret: + return True + return False + def exec(self): + logger.info("找到蓝色球") + car_stop() + time.sleep(0.5) + for _ in range(3): + calibrate_right_new(tlabel.BBALL, offset = 16, run = True, run_speed = 5) + logger.info("抓蓝色球") + time.sleep(0.5) + by_cmd.send_angle_claw_arm(36) + by_cmd.send_angle_claw(54) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1.2) + by_cmd.send_angle_claw(25) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(1) + by_cmd.send_position_axis_x(1, 60) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, -40) + time.sleep(0.5) + by_cmd.send_angle_claw_arm(80) + time.sleep(0.5) + by_cmd.send_angle_claw(54) + time.sleep(0.5) + by_cmd.send_angle_claw_arm(36) + time.sleep(1) + by_cmd.send_position_axis_z(30, 135) + # 继续向前走 + # by_cmd.send_speed_x(4) + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["get_bball"]["pid_kp"], cfg["get_bball"]["pid_ki"], cfg["get_bball"]["pid_kd"]) + # 下一动作预备位置 + by_cmd.send_angle_claw(30) + time.sleep(0.5) + while by_cmd.send_position_axis_z(30, 0) == -1: + pass + time.sleep(1) + # # 任务检查间隔 + # time.sleep(1) + + +# 通信抢修 +class up_tower(): + def init(self): + logger.info("通信抢修初始化") + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + filter.switch_camera(1) + def find(self): + # 目标检测通信塔 + ret = filter.find(tlabel.TOWER) + if ret: + return True + return False + def exec(self): + logger.info("找到塔") + car_stop() + time.sleep(1) + calibrate_new(tlabel.TOWER, offset = 20, run = True) + time.sleep(1) + # calibrate(tlabel.TOWER, 27, False, 6) + by_cmd.send_distance_x(-10, 120) + time.sleep(1) + # 上古參數 + # by_cmd.send_distance_y(-10, 50) + # 6_9 模型參數 + # by_cmd.send_distance_y(-10, 40) + # 7_12_3 模型參數 + by_cmd.send_distance_y(-10, 50) + time.sleep(1) + car_stop() + # FIXME 如果下發 distance 後直接 car_stop,則 distance 執行時間僅由指令間處理延時決定 + # time.sleep(3) + # by_cmd.send_speed_y(-10) + # time.sleep(0.15) + + by_cmd.send_angle_zhuan(10) + time.sleep(12) + by_cmd.send_speed_y(10) + time.sleep(0.3) + car_stop() + by_cmd.send_angle_zhuan(0) + # while True: + # pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["up_tower"]["pid_kp"], cfg["up_tower"]["pid_ki"], cfg["up_tower"]["pid_kd"]) + # 下一动作预备位置 + by_cmd.send_position_axis_z(30, 0) + # 任务检查间隔 + time.sleep(4) + + +# 高空排险 +class get_rball(): + def init(self): + filter.switch_camera(1) + logger.info("高空排险初始化") + while (by_cmd.send_angle_camera(180) == -1): + by_cmd.send_angle_camera(180) + self.record = CountRecord(3) + def find(self): + # 目标检测红球 + ret = filter.find(tlabel.RBALL) + if ret > 0: + # TODO 连续两帧才开始减速 + if self.record(tlabel.RBALL): + var.task_speed = 5 + return True + else: + return False + def exec(self): + logger.info("找到红球") + var.task_speed = 0 + car_stop() + time.sleep(0.5) + # 靠近塔 + by_cmd.send_angle_scoop(20) + # 上古參數 + # by_cmd.send_distance_y(-15, 45) # 50 + # 6_9 參數 + # by_cmd.send_distance_y(-15, 35) + # time.sleep(2) + # 7_12_3 參數 + by_cmd.send_distance_y(-15, 45) + time.sleep(2) + car_stop() + calibrate_new(tlabel.RBALL,offset = 44, run = True) + time.sleep(1) + logger.info("抓红球") + # by_cmd.send_angle_scoop(15) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 170) + time.sleep(2.5) + by_cmd.send_angle_scoop(7) + time.sleep(0.5) + by_cmd.send_speed_y(15) + time.sleep(0.2) + car_stop() + # by_cmd.send_angle_omega(-55,30) + # while True: + # pass + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["get_rball"]["pid_kp"], cfg["get_rball"]["pid_ki"], cfg["get_rball"]["pid_kd"]) + # 任务检查间隔 + time.sleep(2) + + +# 派发物资 +class put_bball(): + def init(self): + logger.info("派发物资初始化") + filter.switch_camera(1) + while by_cmd.send_position_axis_z(30, 0) == -1: + pass + while by_cmd.send_angle_camera(90) == -1: + pass + def find(self): + ret = filter.find(tlabel.BASKET) + if ret > 0: + return True + else: + return False + def exec(self): + logger.info("找到篮筐") + car_stop() + calibrate_new(tlabel.BASKET,offset = -40, run = True, run_speed = 6) + # by_cmd.send_distance_x(10, 10) + # 向左运动 + # by_cmd.send_distance_y(-10, 35) + # by_cmd.send_angle_storage(10) + # time.sleep(1) + + by_cmd.send_angle_storage(50) + logger.info("把球放篮筐里") + + time.sleep(1) + # by_cmd.send_distance_y(10, 55) + by_cmd.send_angle_storage(20) + # time.sleep(1) + # car_stop() + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_bball"]["pid_kp"], cfg["put_bball"]["pid_ki"], cfg["put_bball"]["pid_kd"]) + # 任务检查间隔 + time.sleep(2) + + +# 物资盘点 +class put_hanoi1(): + def init(self): + logger.info("物资盘点 1 初始化") + filter.switch_camera(2) + def find(self): + ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) + if label == tlabel.RMARK: + if abs(error) < 55: + logger.info("向右拐") + utils.direction_right += 1 + return True + return False + elif label == tlabel.LMARK: + if abs(error) < 50: + logger.info("向左拐") + utils.direction_left += 1 + return True + return False + else: + return False + def exec(self): + global direction + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + time.sleep(0.2) + + by_cmd.send_position_axis_z(30, 130) + + # 校准牌子 + if utils.direction_right > utils.direction_left: + ret, error = filter.aim_near(tlabel.RMARK) + while not ret: + ret, error = filter.aim_near(tlabel.RMARK) + utils.direction = tlabel.RMARK + logger.info("应该向右转") + else: + ret, error = filter.aim_near(tlabel.LMARK) + while not ret: + ret, error = filter.aim_near(tlabel.LMARK) + utils.direction = tlabel.LMARK + logger.info("应该向左转") + + + # # 校准 omega + # if error > 0: + # by_cmd.send_angle_omega(-20,abs(var.lane_error)) + # else: + # by_cmd.send_angle_omega(20,abs(var.lane_error)) + # time.sleep(0.5) + # car_stop() + # time.sleep(0.5) + # by_cmd.send_distance_x(10, 200) + + by_cmd.send_distance_x(10, 180) + time.sleep(1) + car_stop() + + # 根据方向初始化执行器位置 + if utils.direction is tlabel.RMARK: + # FIXME 右侧的爪子会被 storage 挡住 + by_cmd.send_position_axis_x(1, 0) + by_cmd.send_angle_claw_arm(36) + by_cmd.send_angle_storage(0) + else: + by_cmd.send_position_axis_x(1, 150) + by_cmd.send_angle_claw_arm(220) + by_cmd.send_angle_storage(55) + time.sleep(1) + + by_cmd.send_position_axis_z(30, 10) + + if utils.direction_right > utils.direction_left: + utils.direction = tlabel.RMARK + # by_cmd.send_angle_omega(-25,430) + by_cmd.send_angle_omega(-45,238) + # by_cmd.send_angle_omega(-55,194) + time.sleep(2) + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + else: + utils.direction = tlabel.LMARK + # by_cmd.send_angle_omega(25,430) + by_cmd.send_angle_omega(45,238) + # by_cmd.send_angle_omega(55,194) + time.sleep(2) + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + time.sleep(0.5) + filter.switch_camera(1) + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_hanoi1"]["pid_kp"], cfg["put_hanoi1"]["pid_ki"], cfg["put_hanoi1"]["pid_kd"]) + pass + +class put_hanoi2(): + def __init__(self): + if cfg['put_hanoi2']['first_target'] == "lp": + self.target_label = tlabel.LPILLER + elif cfg['put_hanoi2']['first_target'] == "mp": + self.target_label = tlabel.MPILLER + elif cfg['put_hanoi2']['first_target'] == "sp": + self.target_label = tlabel.SPILLER + def init(self): + logger.info("物资盘点 2 初始化") + if utils.direction == tlabel.RMARK: + # 15 + self.offset = 19 + # self.platform_offset = -25 + self.platform_offset = -10 + else: + self.offset = 10 + #self.platform_offset = -30 + self.platform_offset = -15 + def find(self): + # ret, box = filter.get(self.target_label) + ret, box = filter.get(tlabel.TPLATFORM) + if ret: + var.task_speed = 8.5 + error = (box[0][2] + box[0][0] - 320) / 2 + self.platform_offset + if error > 0: + return True + return False + def exec(self): + logger.info(f"direction:{utils.direction.name}") + var.task_speed = 0 + car_stop() + + # if utils.direction is tlabel.RMARK: + # by_cmd.send_distance_y(10, 50) + # time.sleep(1) + + # time.sleep(0.5) + # by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) + # time.sleep(1) + ret = explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓大平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 160) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(40) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(40) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放大平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_x(1, 160) + time.sleep(1.5) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(1) + by_cmd.send_angle_claw(81) + time.sleep(0.5) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_x(1, 40) + time.sleep(1.5) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(1) + by_cmd.send_angle_claw(81) + time.sleep(0.5) + by_cmd.send_angle_claw(63) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + + ret = explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓中平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 160) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(35) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(63) + time.sleep(2) + by_cmd.send_angle_claw(35) + time.sleep(0.5) + by_cmd.send_distance_axis_z(30, 20) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放中平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 100) + time.sleep(2) + by_cmd.send_position_axis_x(1, 160) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(65) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 100) + time.sleep(2) + by_cmd.send_position_axis_x(1, 40) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(65) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + + ret = explore_calibrate_new(tlabel.SPILLER, offset = self.offset, run_direc = 1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("抓小平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 160) + by_cmd.send_angle_claw(50) + time.sleep(2) + by_cmd.send_angle_claw(27) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 10) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + pass + else: + by_cmd.send_position_axis_z(30, 10) + by_cmd.send_position_axis_x(1, 40) + by_cmd.send_angle_claw(50) + time.sleep(2) + by_cmd.send_angle_claw(27) + time.sleep(1) + by_cmd.send_distance_axis_z(30, 10) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1) + pass + ret = explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = -1, run_speed = 5) + if not ret: + logger.error("跳过物资盘点 2 exec") + return + time.sleep(0.5) + logger.info("放小平台") + if utils.direction is tlabel.RMARK: + by_cmd.send_position_axis_z(30, 170) + time.sleep(1.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(80) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 10) + time.sleep(1) + by_cmd.send_speed_y(15) + time.sleep(0.1) + car_stop() + pass + else: + by_cmd.send_position_axis_z(30, 170) + time.sleep(1.5) + by_cmd.send_position_axis_x(1, 40) + time.sleep(2) + by_cmd.send_distance_axis_z(30, -20) + time.sleep(0.5) + by_cmd.send_angle_claw(80) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 160) + time.sleep(1.5) + # while True: + # pass + + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["put_hanoi2"]["pid_kp"], cfg["put_hanoi2"]["pid_ki"], cfg["put_hanoi2"]["pid_kd"]) + pass + +class put_hanoi3(): + def init(self): + while by_cmd.send_position_axis_z(30, 130) == -1: + pass + time.sleep(3) + logger.info("完成任务,爪回左侧") + while by_cmd.send_angle_claw_arm(128) == -1: + pass + time.sleep(0.5) + while by_cmd.send_position_axis_x(1, 0) == -1: + pass + time.sleep(1) + # while by_cmd.send_angle_claw_arm(220) == -1: + # pass + # while by_cmd.send_angle_claw(90) == -1: + # pass + def find(self): + time.sleep(1) + return True + def exec(self): + while by_cmd.send_position_axis_z(30, 100) == -1: + pass + pass + def nexec(self): + pass + def after(self): + by_cmd.send_position_axis_x(1, 150) + var.pid_turning.set(cfg["put_hanoi3"]["pid_kp"], cfg["put_hanoi3"]["pid_ki"], cfg["put_hanoi3"]["pid_kd"]) + +# 应急避险 第一阶段 找目标牌 +class move_area1(): + def init(self): + logger.info("应急避险第一阶段初始化") + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + def find(self): + ret = filter.find(tlabel.SIGN) + if ret: + return True + return False + def exec(self): + logger.info("找到标示牌") + # 停车 ocr 识别文字 调用大模型 + car_stop() + time.sleep(1) + # calibrate_new(tlabel.SIGN, offset = 8, run = True) + calibrate_new(tlabel.SIGN, offset = -30, run = True, run_speed = 5) + time.sleep(1) + + by_cmd.send_position_axis_x(1, 50) + time.sleep(1) + + # filter_w = (148, 560) + # filter_h = (165, 390) + + counts = 0 + while True: + ocr_socket.send_string("") + resp = ocr_socket.recv_pyobj() + var.llm_text = '' + counts += 1 + if resp.get('code') == 0: + for item in resp.get('content'): + + if item['probability']['average'] < 0.80: + continue + # box = item['location'] + # center_x = box['left'] + box['width'] / 2 + # center_y = box['top'] + box['height'] / 2 + # if center_x < filter_w[0] or center_x > filter_w[1] \ + # or center_y < filter_h[0] or center_y > filter_h[1]: + # continue + var.llm_text += item['words'] + break + if counts >= 2: + var.skip_llm_task_flag = True + return + logger.error(var.llm_text) + if len(var.llm_text) < 3: + var.skip_llm_task_flag = True + return + + var.task_speed = 9 # 12 + + pass + def nexec(self): + pass + def after(self): + # 任务检查间隔 + by_cmd.send_position_axis_x(1, 150) + # time.sleep(1) + by_cmd.send_angle_claw_arm(220) + + pass + +# 应急避险 第二阶段 找停车区域 +class move_area2(): + def init(self): + logger.info("应急避险第二阶段初始化") + self.offset = 15 + self.delta_x = 0 + self.delta_y = 0 + self.delta_omage = 0 + def find(self): + # time.sleep(0.001) + if var.skip_llm_task_flag: + return 5000 + ret, box = filter.get(tlabel.SHELTER) + if ret: + error = (box[0][2] + box[0][0] - 320) / 2 + self.offset + if abs(error) < 20: + return 5000 + return False + def sub_light(self, delay_time): + by_cmd.send_light(1) + time.sleep(delay_time) + by_cmd.send_light(0) + def sub_beep(self,delay_time): + by_cmd.send_beep(1) + time.sleep(delay_time) + by_cmd.send_beep(0) + def sub_move(self, x, y): + # FIXME 如果同時有 xy,是否會造成 delay 不足 + self.delta_x += x + self.delta_y += y + + if x != 0: + delay_time = int(abs(x) * 500) + if x > 0: + by_cmd.send_distance_x(15, delay_time) + else: + by_cmd.send_distance_x(-15, delay_time) + elif y != 0: + delay_time = int(abs(y) * 500) + if y > 0: # 向左 + by_cmd.send_distance_y(-15, delay_time) + else: + by_cmd.send_distance_y(15, delay_time) + time.sleep(delay_time / 500) + car_stop() + pass + def sub_turn(self, angle): + self.delta_omage += angle + delay_time = int(abs(angle) * 400 / 90) + if angle < 0: + # 左转 + by_cmd.send_angle_omega(+55, delay_time) + else: + # 右转 + by_cmd.send_angle_omega(-55, delay_time) + time.sleep(delay_time / 300 * 1.5) + def exec(self): + var.task_speed = 0 + if var.skip_llm_task_flag: + logger.error("ocr 识别出错 直接跳过改任务") + return + logger.info("开始寻找停车区域") + car_stop() + calibrate_new(tlabel.SHELTER, offset = 15, run = True) + time.sleep(0.5) + # 调用大模型 然后执行动作 + try: + resp = llm_bot.get_command_json(var.llm_text) + logger.info(resp) + except: + logger.error("大模型超时,跳过任务") + return + + + try: + resp_commands = eval(re.findall("```json(.*?)```", resp, re.S)[0]) + if len(resp_commands) == 0: + return + # 进入停车区域 + # by_cmd.send_speed_y(15) + by_cmd.send_distance_y(25, 180) + time.sleep(1) + # time.sleep(1.25) + car_stop() + logger.info(resp_commands) + for command in resp_commands: + logger.info(command) + if command['func'] == 'move': + self.sub_move(float(command['x']), float(command['y'])) + elif command['func'] == 'light': + self.sub_light(int(command['time'])) + elif command['func'] == 'beep': + self.sub_beep(int(command['time'])) + elif command['func'] == 'turn': + self.sub_turn(int(command['angle'])) + pass + else: + continue + time.sleep(0.5) + except: + pass + time.sleep(1) + # 回到原位 + + delay_time = int(abs(self.delta_omage) * 400 / 90) + if int(abs(self.delta_omage)) == 360: + delay_time = 0 + if self.delta_omage < 0: + # 左转 + by_cmd.send_angle_omega(-55, delay_time) + else: + # 右转 + by_cmd.send_angle_omega(55, delay_time) + time.sleep(delay_time / 300 * 1.5) + if self.delta_y > 0: + # 向左移动的距离就要比进入的时候少一些 因为 action 已经向左运动了 + delay_time = 180 - (self.delta_y * 500) + else: + delay_time = 180 + (abs(self.delta_y) * 500) + # 离开停车区域 + by_cmd.send_distance_y(-25, delay_time) + + time.sleep(delay_time * 5e-3) + + car_stop() + + # FIXME 移动距离指令下发后未完成,再发送速度指令,将不会清除未完成的速度值 + # by_cmd.send_distance_y(-15, 300) + pass + def nexec(self): + logger.warning("正在跳過大模型任務") + time.sleep(2) + pass + def after(self): + var.pid_turning.set(cfg["move_area"]["pid_kp"], cfg["move_area"]["pid_ki"], cfg["move_area"]["pid_kd"]) + by_cmd.send_position_axis_z(30, 0) + while by_cmd.send_angle_claw(90) == -1: + pass + time.sleep(2) + +# 扫黑除暴 +class kick_ass(): + def init(self): + while (by_cmd.send_angle_camera(0) == -1): + by_cmd.send_angle_camera(0) + logger.info("扫黑除暴初始化") + self.pos_gap1 = cfg['kick_ass']['pos_gap1'] + self.pos_gap2 = cfg['kick_ass']['pos_gap2'] + self.target_person = cfg_args['kick_ass']['target_person'] + + # by_cmd.send_angle_claw(15) + by_cmd.send_position_axis_x(1, 160) + + def find(self): + ret = filter.find(tlabel.SIGN) + if ret: + return True + return False + def exec(self): + logger.info("找到标示牌") + # 停的晚无需校准 omage + car_stop() + time.sleep(1) + calibrate_new(tlabel.SIGN, offset = 8, run = True) + by_cmd.send_angle_claw(15) + time.sleep(0.5) + by_cmd.send_position_axis_z(30, 60) + by_cmd.send_position_axis_x(1, 130) + + if self.target_person == 1: + target_distance = self.pos_gap1 + else: + target_distance = self.pos_gap1 + (self.target_person - 1) * self.pos_gap2 + (self.target_person - 1) * 10 + by_cmd.send_distance_x(10, target_distance) + logger.info(f"target distance {target_distance}") + time.sleep(1.5 + (self.target_person - 1) * 0.7 ) + car_stop() + + # by_cmd.send_angle_claw_arm(220) + # time.sleep(0.5) + by_cmd.send_position_axis_x(1, 20) + time.sleep(3) + by_cmd.send_position_axis_x(1, 120) + time.sleep(1) + logger.debug("結束任務,前進四") + by_cmd.send_speed_x(25) + time.sleep(4) + pass + def nexec(self): + pass + def after(self): + var.pid_turning.set(cfg["kick_ass"]["pid_kp"], cfg["kick_ass"]["pid_ki"], cfg["kick_ass"]["pid_kd"]) \ No newline at end of file diff --git a/variable.py b/variable.py index 154ee8a..8565a41 100644 --- a/variable.py +++ b/variable.py @@ -6,7 +6,7 @@ lane_error = 0 task_speed = 0 # pid 参数 -pid_argv = {"kp" : 2, "ki" : 0, "kd" : 0} # 1.2 +pid_argv = {"kp" : 1.2, "ki" : 0, "kd" : 0} # 1.2 # 转向 pid 对象 pid_turning = PidWrap(pid_argv["kp"], pid_argv["ki"], pid_argv["kd"], output_limits=50)