From eb0b01c8afcd6f2f632e65454f28696638c2ada9 Mon Sep 17 00:00:00 2001 From: bmy <2583236812@qq.com> Date: Fri, 7 Jun 2024 20:08:41 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=97=A5=E5=B8=B8=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=EF=BC=8C=E6=9B=B4=E6=96=B0=E5=AF=B9=E5=87=86=E6=96=B9=E6=B3=95?= =?UTF-8?q?=EF=BC=8C=E5=AE=8C=E6=88=90hanoi2=E5=AF=B9=E5=87=86=EF=BC=8Cget?= =?UTF-8?q?/put=5Fbball=E5=8A=A8=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cfg_main.toml | 10 +- cfg_subtask.toml | 4 +- main.py | 13 +- majtask.py | 15 +- subtask.py | 482 +++++++++++++++++--------------------------- test/test_filter.py | 51 +++++ test/test_llm.py | 52 +++++ utils.py | 46 +++-- 8 files changed, 337 insertions(+), 336 deletions(-) create mode 100644 test/test_filter.py create mode 100644 test/test_llm.py diff --git a/cfg_main.toml b/cfg_main.toml index c6eed12..f8d7ba3 100644 --- a/cfg_main.toml +++ b/cfg_main.toml @@ -21,11 +21,9 @@ UpTower_counts = 10 # 通信抢修计数 GetRBall_counts = 2 # 高空排险计数 - - PutBBall_counts = 5 # 派发物资计数 -PutHanoi1_counts = 8 # 物资盘点计数 -PutHanoi2_counts = 4200 # 物资盘点计数 -MoveArea1_counts = 2 # 应急避险计数 +PutHanoi1_counts = 12 # 物资盘点计数 +PutHanoi2_counts = 2 # 物资盘点计数 +MoveArea1_counts = 10 # 应急避险计数 MoveArea2_counts = 1700 # 应急避险第二阶段计数 -KickAss_counts = 2 # 扫黑除暴计数 +KickAss_counts = 20 # 扫黑除暴计数 diff --git a/cfg_subtask.toml b/cfg_subtask.toml index f147a7a..e4bbfda 100644 --- a/cfg_subtask.toml +++ b/cfg_subtask.toml @@ -15,12 +15,12 @@ first_block = "blue" [put_hanoi2] pos_gap = 160 # 标定值,两个放置位置的标定距离 pos_lp = 2 # 1\2\3 数字越小越靠近红色置物区 -pos_mp = 1 +pos_mp = 3 [move_area] llm_enable = false # 大模型机器人 [kick_ass] pos_gap1 = 150 # 目标牌和第一个 person 之间的距离 -pos_gap2 = 100 # person 之间的距离 +pos_gap2 = 80 # person 之间的距离 target_person = 3 diff --git a/main.py b/main.py index 278e765..c1cd341 100644 --- a/main.py +++ b/main.py @@ -21,15 +21,15 @@ task_queue = queue.Queue() # task_queue.put(sb.task(sb.get_block, cfg_main['find_counts']['GetBlock_counts'], cfg_main['task']['GetBlock_enable'])) # task_queue.put(sb.task(sb.get_block, cfg_main['find_counts']['GetBlock_counts'], cfg_main['task']['GetBlock_enable'])) # task_queue.put(sb.task(sb.put_block, cfg_main['find_counts']['PutBlock_counts'], cfg_main['task']['GetBlock_enable'] and cfg_main['task']['PutBlock_enable'])) -# task_queue.put(sb.task(sb.get_bball, cfg_main['find_counts']['GetBBall_counts'], cfg_main['task']['GetBBall_enable'])) +task_queue.put(sb.task(sb.get_bball, cfg_main['find_counts']['GetBBall_counts'], cfg_main['task']['GetBBall_enable'])) # task_queue.put(sb.task(sb.up_tower, cfg_main['find_counts']['UpTower_counts'], cfg_main['task']['UpTower_enable'])) # task_queue.put(sb.task(sb.get_rball, cfg_main['find_counts']['GetRBall_counts'], cfg_main['task']['GetRBall_enable'])) task_queue.put(sb.task(sb.put_bball, cfg_main['find_counts']['PutBBall_counts'], cfg_main['task']['GetBBall_enable'] and cfg_main['task']['PutBBall_enable'])) task_queue.put(sb.task(sb.put_hanoi1, cfg_main['find_counts']['PutHanoi1_counts'], True)) # 无论是否进行任务,检测标识并转向都是必须进行的 task_queue.put(sb.task(sb.put_hanoi2, cfg_main['find_counts']['PutHanoi2_counts'], cfg_main['task']['PutHanoi_enable'])) -task_queue.put(sb.task(sb.move_area1, cfg_main['find_counts']['MoveArea1_counts'], cfg_main['task']['MoveArea_enable'])) -task_queue.put(sb.task(sb.move_area2, cfg_main['find_counts']['MoveArea2_counts'], cfg_main['task']['MoveArea_enable'])) -task_queue.put(sb.task(sb.kick_ass, cfg_main['find_counts']['KickAss_counts'], cfg_main['task']['KickAss_enable'])) +# task_queue.put(sb.task(sb.move_area1, cfg_main['find_counts']['MoveArea1_counts'], cfg_main['task']['MoveArea_enable'])) +# task_queue.put(sb.task(sb.move_area2, cfg_main['find_counts']['MoveArea2_counts'], cfg_main['task']['MoveArea_enable'])) +# task_queue.put(sb.task(sb.kick_ass, cfg_main['find_counts']['KickAss_counts'], cfg_main['task']['KickAss_enable'])) # 将任务队列传入调度模块中 task_queuem_t = sb.task_queuem(task_queue) @@ -42,8 +42,9 @@ def worker_thread(): # 启动工作线程 worker = threading.Thread(target=worker_thread, daemon=True) worker.start() -if (cmd_py_obj.send_angle_camera(180) == -1): - cmd_py_obj.send_angle_camera(180) +# if (cmd_py_obj.send_angle_camera(180) == -1): +# cmd_py_obj.send_angle_camera(180) +cmd_py_obj.send_angle_storage(20) time.sleep(2) # 创建主任务 diff --git a/majtask.py b/majtask.py index 099227c..7e69153 100644 --- a/majtask.py +++ b/majtask.py @@ -68,22 +68,25 @@ class main_task(): self.x = 0 self.y = 0 error_abs = abs(self.lane_error) + if error_abs < 10: self.pid1.set(0.7, 0, 0) - self.by_cmd.send_speed_x(13) # 12 + speed = 13 elif error_abs > 45: - self.by_cmd.send_speed_x(6) + speed = 6 self.pid1.set(1.8, 0, 0) elif error_abs > 35: - self.by_cmd.send_speed_x(8) + speed = 8 self.pid1.set(1.5, 0, 0) elif error_abs > 25: - self.by_cmd.send_speed_x(10) + speed = 10 self.pid1.set(1, 0, 0) else: self.pid1.set(0.8, 0, 0) - self.by_cmd.send_speed_x(11) - + speed = 11 + if utils.task_speed != 0: + speed = utils.task_speed + self.by_cmd.send_speed_x(speed) # pid_out = self.pid1.get(self.lane_error*0.65) pid_out = self.pid1.get(self.lane_error*0.7) self.by_cmd.send_speed_omega(pid_out) diff --git a/subtask.py b/subtask.py index a7d53d9..524b31a 100644 --- a/subtask.py +++ b/subtask.py @@ -80,6 +80,8 @@ def calibrate_right_new(label, offset, run = True, run_speed = 3.5): break ''' description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车 +如果停车后 error > 8 则使用 distance 校准 +这个方法仅用于在视野里能找到的情况下进行校准,并不能实现边走边寻找 然后再校准 param {*} label param {*} offset param {*} run @@ -128,118 +130,37 @@ def calibrate_new(label, offset, run = True, run_speed = 3.5): by_cmd.send_distance_x(10, int(-error)) break -''' -description: 与 calibrate 一样 只不过寻找最右侧的 整装上阵时使用 -param {*} label -param {*} offset -param {*} run_on -param {*} cali_speed -return {*} -''' -def calibrate_right(label, offset, run_on = True, cali_speed = 10): - logger.info("开始校准") - # go on - if run_on: - for _ in range(3): - by_cmd.send_speed_x(7) - by_cmd.send_speed_omega(0) - time.sleep(0.1) - while True: - ret, error = filter.aim_right(label) - while not ret: - ret, error = filter.aim_right(label) - error += offset - if run_on: - if abs(error) < 5: - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - break - else: - break - - ret, error = filter.aim_right(label) - while not ret: - ret, error = filter.aim_right(label) - error += offset - time.sleep(1) - logger.error(error) - if abs(error) > 5: - logger.info("校准中") - if error > 0: - by_cmd.send_distance_x(-cali_speed, int(error*4)) - else: - by_cmd.send_distance_x(cali_speed, int(-error*4)) - time.sleep(1) - # stop - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - pass -''' -description: 校准函数 传入对应的标签将车校准到最接近中心的标签的位置 -param {*} label -param {*} offset 摄像头 error offset -param {*} run_on 校准前是否继续向前行进以寻找 label -param {*} cali_speed send_distance_x 校准时移动的速度 -return {*} -''' -def calibrate(label, offset, run_on = True, cali_speed = 10): - logger.info("开始校准") - # go on - if run_on: - for _ in range(3): - by_cmd.send_speed_x(7) - by_cmd.send_speed_omega(0) - time.sleep(0.1) - while True: - # 寻找距离屏幕中心最近的标签 - ret, error = filter.aim_near(label) - while not ret: - # 注意这里一定要保证摄像头内有该目标 否则会无限循环 - ret, error = filter.aim_near(label) - error += offset - if run_on: - if abs(error) < 5: - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - break - else: - break - # 代码运行到这里代表摄像头内一定有物体 label,此时再获取一次 error 用于校准 - ret, error = filter.aim_near(label) - while not ret: - ret, error = filter.aim_near(label) - error += offset - time.sleep(1) - logger.error(error) - if abs(error) > 5: - logger.info("校准中") - if abs(error) < 5: - error = error * 5 - logger.error("校准分段 error * 5") - elif abs(error) < 20: - error = error * 4 - logger.error("校准分段 error * 4") - else: - error = error * 3 - logger.error("校准分段 error * 3") - if error > 0: - by_cmd.send_distance_x(-cali_speed, int(error)) - else: - by_cmd.send_distance_x(cali_speed, int(-error)) - time.sleep(2) - # stop - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - pass +def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5): + # run_direc == 1 向前 + if run_direc == 1: + by_cmd.send_speed_x(run_speed) + else: + by_cmd.send_speed_x(-run_speed) + while True: + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if ret: + # 校准速度越大 停车的条件越宽泛 + if abs(error) <= 15: + car_stop() + logger.info("可以停车了") + + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + error = (box[0][2] + box[0][0] - 320) / 2 + offset + logger.info(f"停车后像素误差:{error}") + if abs(error) > 8: + error = error * 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + break # 任务类 class task: def __init__(self, task_template, find_counts=10, enable=True): @@ -253,16 +174,10 @@ class task: def find(self): # 检查该任执行标志 while True: - # if self.func_find(): - # 特殊地方可以直接跳出 find ret = self.task_t.find() self.counts += ret - # if self.task_t.find(): - # self.counts += 1 if self.counts >= self.find_counts: break - # while self.func_find() is False: - # pass def exec(self): # 根据标志位确定是否执行该任务 @@ -337,59 +252,13 @@ class get_block(): ret = filter.find(self.target_label) if ret > 0: return True - else: - return False + return False def exec(self): car_stop() calibrate_new(self.target_label, offset = 12) logger.info("抓取块") time.sleep(3) - # 测试新方案 - - - - # 旧方案 - # for _ in range(3): - # by_cmd.send_speed_x(0) - # time.sleep(0.2) - # by_cmd.send_speed_omega(0) - # calibrate(self.target_label, 12, False, 7) - # logger.info("抓取块") - # time.sleep(2) - # by_cmd.send_distance_axis_z(20, 20*7) - # time.sleep(3) - # # 向内退 - # by_cmd.send_distance_axis_x(3, 20*2) - # time.sleep(2) - # # 旋转机械臂到最右侧 - # by_cmd.send_angle_claw_arm(36 + 184) - # time.sleep(1.5) - # # 开爪子 - # by_cmd.send_angle_claw(27.0 + 9 * 3) - # # 下降 - # by_cmd.send_distance_axis_z(20, -20*4) - # time.sleep(1.5) - # # 关爪子 - # by_cmd.send_angle_claw(27.0 - 2) - # time.sleep(1) - # by_cmd.send_distance_axis_z(15, 20*2) - # time.sleep(2) - # while True: - # pass - - # by_cmd.send_distance_x(10, 500) - # counts = 0 - # while True: - # counts += filter.find(self.another_label) - # if counts >= 35: - # break - # for _ in range(3): - # by_cmd.send_speed_x(0) - # time.sleep(0.2) - # by_cmd.send_speed_omega(0) - # calibrate(self.another_label, 12, False, 7) - # logger.info("抓取块") - # time.sleep(2) + pass # 调试 临时注释掉 # calibrate(tlabel.RBLOCK,15) @@ -434,20 +303,18 @@ class put_block(): socket.recv() def find(self): # 目标检测医院 - ret1, list1 = filter.get(tlabel.HOSPITAL) - if ret1 > 0: - width = list1[0][2] - list1[0][0] + ret, box = filter.get(tlabel.HOSPITAL) + if ret > 0: + width = box[0][2] - box[0][0] if width > 120: return True - return False - else: - return False + return False def exec(self): logger.info("找到医院") car_stop() calibrate_new(tlabel.HOSPITAL, offset = 7, run = False) time.sleep(3) - # calibrate(tlabel.HOSPITAL, 7, False, 6) + # by_cmd.send_position_axis_z(10, 150) # time.sleep(3) @@ -462,49 +329,56 @@ class put_block(): # 整装上阵 class get_bball(): def init(self): - # by_cmd.send_position_axis_x(2, 140) + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) + by_cmd.send_position_axis_z(10, 130) + time.sleep(0.5) + by_cmd.send_position_axis_x(1, 0) + time.sleep(0.5) + + by_cmd.send_angle_storage(0) # 调试 临时换源 socket.send_string("1") socket.recv() logger.info("整装上阵初始化") time.sleep(0.5) - while (by_cmd.send_angle_camera(90) == -1): - by_cmd.send_angle_camera(90) + def find(self): # 目标检测蓝球 ret = filter.find(tlabel.BBALL) if ret: return True - else: - return False + return False def exec(self): logger.info("找到蓝色球") car_stop() time.sleep(0.5) for _ in range(3): - calibrate_right_new(tlabel.BBALL, offset = 27, run = True, run_speed = 3.5) + calibrate_right_new(tlabel.BBALL, offset = 27, run = True, run_speed = 4) logger.info("抓蓝色球") - time.sleep(5) - # 原方案 - # time.sleep(1) - # calibrate_right(tlabel.BBALL, 27, False, 5) - # logger.info("抓到了蓝色球") - # time.sleep(5) - # for i in range(2): - # while True: - # by_cmd.send_distance_x(6, 60) - # ret, error = filter.aim_near(tlabel.BBALL) - # if ret and abs(error) < 30: - # break - # else: - # by_cmd.send_distance_x(6, 60) - # for _ in range(3): - # by_cmd.send_speed_x(0) - # time.sleep(0.1) - # by_cmd.send_speed_omega(0) - - # calibrate_right(tlabel.BBALL, 27, False, 5) - # time.sleep(5) + + by_cmd.send_angle_claw_arm(36) + by_cmd.send_angle_claw(54) + by_cmd.send_position_axis_x(1, 140) + time.sleep(1) + by_cmd.send_angle_claw(70) + time.sleep(1) + by_cmd.send_angle_claw(30) + time.sleep(1) + by_cmd.send_distance_axis_z(10, 20) + time.sleep(2) + by_cmd.send_position_axis_x(1, 0) + time.sleep(1) + by_cmd.send_angle_claw_arm(82) + time.sleep(1) + by_cmd.send_angle_claw(54) + time.sleep(1) + by_cmd.send_angle_claw_arm(36) + time.sleep(1) + by_cmd.send_distance_axis_z(10, -20) + + time.sleep(1) + # by_cmd.send_position_axis_z(20, 160) @@ -538,15 +412,15 @@ class up_tower(): by_cmd.send_angle_camera(90) def find(self): # 目标检测通信塔 - ret, error = filter.aim_near(tlabel.TOWER) + ret = filter.find(tlabel.TOWER) if ret: return True - else: - return False + return False def exec(self): logger.info("找到塔") car_stop() calibrate_new(tlabel.TOWER, offset = 27, run = True) + time.sleep(0.5) # calibrate(tlabel.TOWER, 27, False, 6) time.sleep(3) @@ -602,16 +476,22 @@ class put_bball(): def exec(self): logger.info("找到篮筐") car_stop() - calibrate_new(tlabel.BASKET,offset = 12, run = True) + calibrate_new(tlabel.BASKET,offset = -15, run = True, run_speed = 5.5) + by_cmd.send_distance_y(-10, 65) + time.sleep(1) + + by_cmd.send_angle_storage(55) logger.info("把球放篮筐里") - time.sleep(2) + + time.sleep(1) + by_cmd.send_distance_y(10, 65) + time.sleep(1) + pass def nexec(self): pass -direction = tlabel.RMARK -direction_left = 0 -direction_right = 0 + # 物资盘点 class put_hanoi1(): def init(self): @@ -619,24 +499,17 @@ class put_hanoi1(): socket.send_string("2") socket.recv() def find(self): - global direction - global direction_left - global direction_right - # 目标检测左右转向标识 - # TODO 框的大小判断距离 - # ret1, list1 = filter.get(tlabel.RMARK) - # ret2, list2 = filter.get(tlabel.LMARK) ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) if label == tlabel.RMARK: if abs(error) < 55: logger.info("向右拐") - direction_right += 1 + utils.direction_right += 1 return True return False elif label == tlabel.LMARK: if abs(error) < 50: logger.info("向左拐") - direction_left += 1 + utils.direction_left += 1 return True return False else: @@ -650,35 +523,44 @@ class put_hanoi1(): time.sleep(0.2) # 校准牌子 - if direction_right > direction_left: + if utils.direction_right > utils.direction_left: ret, error = filter.aim_near(tlabel.RMARK) while not ret: ret, error = filter.aim_near(tlabel.RMARK) - direction = tlabel.RMARK + utils.direction = tlabel.RMARK + logger.info("应该向右转") else: ret, error = filter.aim_near(tlabel.LMARK) while not ret: ret, error = filter.aim_near(tlabel.LMARK) - direction = tlabel.LMARK + utils.direction = tlabel.LMARK + logger.info("应该向左转") # 校准 omega if error > 0: - by_cmd.send_angle_omega(-20,abs(utils.lane_error)*13) + by_cmd.send_angle_omega(-20,abs(utils.lane_error)) else: - by_cmd.send_angle_omega(20,abs(utils.lane_error)*13) + by_cmd.send_angle_omega(20,abs(utils.lane_error)) time.sleep(0.5) - by_cmd.send_speed_omega(0) + car_stop() time.sleep(0.5) - by_cmd.send_distance_x(10, 250) - time.sleep(1) - if direction_right > direction_left: - direction = tlabel.RMARK - by_cmd.send_angle_omega(-20,380) + by_cmd.send_distance_x(10, 200) + time.sleep(0.5) + # 根据岔路口切换爪子的方向 + if utils.direction_right > utils.direction_left: + pass + else: + + pass + time.sleep(1.5) + if utils.direction_right > utils.direction_left: + utils.direction = tlabel.RMARK + by_cmd.send_angle_omega(-25,430) time.sleep(2) while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) else: - direction = tlabel.LMARK - by_cmd.send_angle_omega(20,500) + utils.direction = tlabel.LMARK + by_cmd.send_angle_omega(25,430) time.sleep(2) while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) @@ -689,37 +571,22 @@ class put_hanoi1(): def nexec(self): pass - -def sub_put_hanoi2(label,distance_type,run_on,back_flag): - # logger.info("找到大红色柱体") - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - time.sleep(2) - - # 对准大红色柱体 - calibrate(label,7, run_on, 10) - # 抓取大红色柱体 - logger.info("抓取柱体") - # 根据 direction 确定移动方向 - by_cmd.send_distance_x(-8, distance_type) - # 移动 self.distance_lp 距离 - # 放置物块 - logger.info("开始放置柱体") - time.sleep(1) - if back_flag: - pass - # 回移相同距离 - # if direction == tlabel.RMARK: - # by_cmd.send_distance_x(8, distance_type) - # else: - # by_cmd.send_distance_x(-8, distance_type) class put_hanoi2(): def __init__(self): + + if utils.direction_right > utils.direction_left: + self.offset = 25 + else: + self.offset = 15 self.pos_lp = cfg['put_hanoi2']['pos_lp'] self.pos_mp = cfg['put_hanoi2']['pos_mp'] self.pos_sp = 6 - self.pos_lp - self.pos_mp + if self.pos_lp == 1: + self.target_label = tlabel.LPILLER + elif self.pos_mp == 1: + self.target_label = tlabel.MPILLER + else: + self.target_label = tlabel.SPILLER self.distance_lp = self.pos_lp * cfg['put_hanoi2']['pos_gap'] self.distance_mp = self.pos_mp * cfg['put_hanoi2']['pos_gap'] self.distance_sp = self.pos_sp * cfg['put_hanoi2']['pos_gap'] @@ -727,16 +594,46 @@ class put_hanoi2(): def init(self): logger.info("物资盘点 2 初始化") def find(self): - time.sleep(0.001) - return True + ret, box = filter.get(self.target_label) + if ret: + utils.task_speed = 8.5 + error = (box[0][2] + box[0][0] - 320) / 2 + self.offset + if abs(error) < 40: + return True + return False + def exec(self): - # TODO 延时需要根据移动的 distance 判断 - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) + logger.info(f"offset:{self.offset}") + utils.task_speed = 0 + car_stop() + time.sleep(0.5) + # by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) + time.sleep(1) + explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = 1, run_speed = 6) + logger.info("抓大平台") + time.sleep(5) + explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 6) + logger.info("放大平台") + time.sleep(5) + + explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = 1, run_speed = 6) + logger.info("抓中平台") + time.sleep(5) + explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 6) + logger.info("放中平台") + time.sleep(5) + + + explore_calibrate_new(tlabel.SPILLER, offset = self.offset, run_direc = 1, run_speed = 6) + logger.info("抓小平台") + time.sleep(5) + explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 6) + logger.info("放小平台") + time.sleep(5) + while True: + pass # 往回走一段 然后向前行进校准 - by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) + time.sleep(1) logger.info(f"方向{direction}") @@ -772,21 +669,16 @@ class move_area1(): while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) def find(self): - # 目标检测标志牌 - ret, error = filter.aim_near(tlabel.SIGN) - if ret > 0 and abs(error) < 8: + ret = filter.find(tlabel.SIGN) + if ret: return True - else: - return False + return False def exec(self): logger.info("找到标示牌") # 停车 ocr 识别文字 调用大模型 - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) + car_stop() time.sleep(2) - + utils.task_speed = 8 pass def nexec(self): pass @@ -794,16 +686,20 @@ class move_area1(): class move_area2(): def init(self): logger.info("应急避险第二阶段初始化") + self.offset = 15 def find(self): - time.sleep(0.001) - return True + # time.sleep(0.001) + ret, box = filter.get(tlabel.SHELTER) + if ret: + error = (box[0][2] + box[0][0] - 320) / 2 + self.offset + if abs(error) < 10: + return 5000 + return False def exec(self): + utils.task_speed = 0 logger.info("开始寻找停车区域") - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - calibrate(tlabel.SHELTER, 15, False, 6) + car_stop() + calibrate_new(tlabel.SHELTER, offset = 15, run = True) time.sleep(1) # 进入停车区域 by_cmd.send_distance_y(10, 450) @@ -828,32 +724,24 @@ class kick_ass(): self.pos_gap2 = cfg['kick_ass']['pos_gap2'] self.target_person = cfg['kick_ass']['target_person'] def find(self): - ret, error = filter.aim_near(tlabel.SIGN) - if ret > 0 and abs(error) < 16: + ret = filter.find(tlabel.SIGN) + if ret: return True - else: - return False + return False def exec(self): logger.info("找到标示牌") - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) + # 停的晚无需校准 omage + car_stop() time.sleep(1) - if utils.lane_error > 0: - by_cmd.send_angle_omega(-20,abs(utils.lane_error)*1.5) - else: - by_cmd.send_angle_omega(20,abs(utils.lane_error)*1.5) - time.sleep(1) - calibrate(tlabel.SIGN, 8, False, 6) + calibrate_new(tlabel.SIGN, offset = 8, run = True) time.sleep(0.5) if self.target_person == 1: - by_cmd.send_distance_x(10, self.pos_gap1) + target_distance = self.pos_gap1 else: - by_cmd.send_distance_x(10, self.pos_gap1 + (self.target_person - 1) * self.pos_gap2) - + target_distance = self.pos_gap1 + (self.target_person - 1) * self.pos_gap2 + (self.target_person - 1) * 10 + by_cmd.send_distance_x(10, target_distance) time.sleep(2) time.sleep(5) pass diff --git a/test/test_filter.py b/test/test_filter.py new file mode 100644 index 0000000..0915f93 --- /dev/null +++ b/test/test_filter.py @@ -0,0 +1,51 @@ + +import sys +import os +parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(parent_dir) + +from utils import label_filter +from loguru import logger +from utils import tlabel +import zmq +import time + +context = zmq.Context() +socket = context.socket(zmq.REQ) +socket.connect("tcp://localhost:6667") +logger.info("subtask yolo client init") + +filter = label_filter(socket) +filter.switch_camera(1) +offset = 15 +while True: + time.sleep(0.2) + # ret1 = filter.get_mult(tlabel.RMARK) + # ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) + # if ret: + # # print(error) + # if abs(error) < 40: + # logger.info('yes') + + label = tlabel.BASKET + ret, box = filter.get(label) + while not ret: + # 注意这里一定要保证摄像头内有该目标 否则会无限循环 + ret, box = filter.get(label) + error = (box[0][2] + box[0][0] - 320) / 2 + offset + # ret, box = filter.get(tlabel.BBLOCK) + + if ret: + print(error) + else: + pass + + + # if ret1: + # print(error1) + # if abs(error1) < 30: + # print("you") + # elif ret2: + # print(error2) + # if abs(error2) < 30: + # print("zuo") diff --git a/test/test_llm.py b/test/test_llm.py new file mode 100644 index 0000000..40becf5 --- /dev/null +++ b/test/test_llm.py @@ -0,0 +1,52 @@ +import sys +import os +parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +sys.path.append(parent_dir) + +import erniebot + +erniebot.api_type = "qianfan" +erniebot.ak = "jReawMtWhPu0wrxN9Rp1MzZX" +erniebot.sk = "eowS1BqsNgD2i0C9xNnHUVOSNuAzVTh6" + +class LLM: + def __init__(self): + + self.model = 'ernie-3.5' + self.prompt = '''你是一个机器人动作规划者,需要把我的话翻译成机器人动作规划并生成对应的 json 结果,机器人工作空间参考右手坐标系。 + 严格按照下面的描述生成给定格式 json,从现在开始你仅仅给我返回 json 数据!''' + self.prompt += '''正确的示例如下: + 向左移 0.1m, 向左转弯 85 度 [{'func': 'move', 'x': 0, 'y': 0.1},{'func': 'turn','angle': -85}], + 向右移 0.2m, 向前 0.1m [{'func': 'move', 'x': 0, 'y': -0.2},{'func': 'move', 'x': 0.1, 'y': 0}], + 向右转 85 度,向右移 0.1m [{'func': 'turn','angle': 85},{'func': 'move', 'x': 0, 'y': -0.1}], + 原地左转 38 度 [{'func': 'turn','angle': -38}], + 蜂鸣器发声 5 秒 [{'func': 'beep', 'time': 5}] + 发光或者照亮 5 秒 [{'func': 'light', 'time': 5}] + ''' + self.messages = [] + self.resp = None + self.reset() + def reset(self): + self.messages = [self.make_message(self.prompt)] + self.resp = erniebot.ChatCompletion.create( + model=self.model, + messages=self.messages, + ) + self.messages.append(self.resp.to_message()) + def make_message(self,content): + return {'role': 'user', 'content': content} + def get_command_json(self,chat): + self.messages.append(self.make_message(chat)) + self.resp = erniebot.ChatCompletion.create( + model=self.model, + messages=self.messages, + ) + self.messages.append(self.resp.to_message()) + return self.resp.get_result() +if __name__ == "__main__": + lmm_bot = LLM() + while True: + chat = input("输入:") + print(lmm_bot.get_command_json(chat)) + + diff --git a/utils.py b/utils.py index e3a9a13..3a891a5 100644 --- a/utils.py +++ b/utils.py @@ -2,26 +2,36 @@ from enum import Enum import numpy as np import erniebot - +# 巡线误差 lane_error = 0 +# 进入任务时可以通过修改 task_speed 控制巡线速度 +task_speed = 0 + class tlabel(Enum): - TOWER = 0 - SIGN = 1 - SHELTER = 2 - HOSPITAL = 3 - BASKET = 4 - BASE = 5 - YBALL = 6 - SPILLER = 7 - RMARK = 8 - RBLOCK = 9 - RBALL = 10 - MPILLER = 11 - LPILLER = 12 - LMARK = 13 - BBLOCK = 14 - BBALL = 15 + TPLATFORM = 0 + TOWER = 1 + SIGN = 2 + SHELTER = 3 + HOSPITAL = 4 + BASKET = 5 + BASE = 6 + YBALL = 7 + SPILLER = 8 + RMARK = 9 + RBLOCK = 10 + RBALL = 11 + MPILLER = 12 + LPILLER = 13 + LMARK = 14 + BBLOCK = 15 + BBALL = 16 + +# 岔路口参数 +direction = tlabel.RMARK +direction_left = 0 +direction_right = 0 + ''' description: label_filter 的测试数据 ''' @@ -134,8 +144,6 @@ class label_filter: break if except_label != None: return True, except_label, error - return False, None, None - return False, None, None return False, None, None def get_near_box(self, tlabel_list): response = self.get_resp()