Files
project_main/subtask.py

1012 lines
33 KiB
Python
Raw Normal View History

2024-05-21 23:53:47 +08:00
from enum import Enum
from loguru import logger
2024-05-22 18:50:21 +08:00
from utils import label_filter
from utils import tlabel
2024-06-05 16:07:32 +08:00
from utils import LLM
from utils import CountRecord
2024-06-02 17:49:52 +08:00
import utils
import toml
import zmq
2024-05-21 23:53:47 +08:00
import time
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:6667")
2024-06-02 17:49:52 +08:00
logger.info("subtask yolo client init")
2024-06-02 17:49:52 +08:00
cfg = toml.load('cfg_subtask.toml') # 加载任务配置
logger.info("load subtask config")
by_cmd = None
filter = None
2024-06-05 16:07:32 +08:00
llm_bot = None
'''
description: main.py 里执行 引入全局变量
param {*} _by_cmd 控制器对象
return {*}
'''
def import_obj(_by_cmd):
global by_cmd
global filter
2024-06-05 16:07:32 +08:00
global llm_bot
by_cmd = _by_cmd
filter = label_filter(socket)
2024-06-05 16:07:32 +08:00
if cfg['move_area']['llm_enable']:
llm_bot = LLM()
2024-06-02 17:49:52 +08:00
def car_stop():
2024-06-05 16:07:32 +08:00
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
# 蓝色球使用
2024-06-05 16:07:32 +08:00
def calibrate_right_new(label, offset, run = True, run_speed = 3.5):
2024-06-09 21:54:22 +08:00
not_found_counts = 0
2024-06-05 16:07:32 +08:00
ret, error = filter.aim_right(label)
while not ret:
2024-06-09 21:54:22 +08:00
not_found_counts += 1
if not_found_counts >= 20:
not_found_counts = 0
error = -320 # error > 0 front run
logger.info("calibrate_right_new:找不到次数超过 20 帧 直接前进寻找")
2024-06-09 21:54:22 +08:00
break
2024-06-05 16:07:32 +08:00
ret, error = filter.aim_right(label)
error += offset
if abs(error) > 10 and run:
if error > 0:
by_cmd.send_speed_x(-run_speed)
else:
by_cmd.send_speed_x(run_speed)
2024-06-09 21:54:22 +08:00
pass
2024-06-05 16:07:32 +08:00
# 停的位置已经很接近目标,可以直接使用 distance 校准
else:
# logger.info("停车时误差就小于")
# error = error * 3
# if error > 0:
# by_cmd.send_distance_x(-10, int(error))
# else:
# by_cmd.send_distance_x(10, int(-error))
2024-06-05 16:07:32 +08:00
return
while True:
ret, error = filter.aim_right(label)
while not ret:
ret, error = filter.aim_right(label)
error += offset
if ret:
2024-06-09 21:54:22 +08:00
if abs(error) <= 8:
2024-06-05 16:07:32 +08:00
car_stop()
logger.info("calibrate_right_new:行进时 误差小于 8 直接停车")
2024-06-05 16:07:32 +08:00
ret, error = filter.aim_right(label)
while not ret:
ret, error = filter.aim_right(label)
error += offset
logger.info(f"calibrate_right_new:停车后的误差是{error}")
2024-06-05 16:07:32 +08:00
if abs(error) > 8:
logger.info(f"calibrate_right_new:停车后的误差大于 8 使用 distance 校准")
2024-06-05 16:07:32 +08:00
error = error * 3
if error > 0:
by_cmd.send_distance_x(-10, int(error))
else:
by_cmd.send_distance_x(10, int(-error))
break
'''
description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车
如果停车后 error > 8 则使用 distance 校准
这个方法仅用于在视野里能找到的情况下进行校准并不能实现边走边寻找 然后再校准
2024-06-05 16:07:32 +08:00
param {*} label
param {*} offset
param {*} run
param {*} run_speed
return {*}
'''
def calibrate_new(label, offset, run = True, run_speed = 3.5):
not_found_counts = 0
2024-06-05 16:07:32 +08:00
ret, box = filter.get(label)
while not ret:
not_found_counts += 1
if not_found_counts >= 20:
not_found_counts = 0
error = -320 # error > 0 front run
logger.info("calibrate_new:找不到次数超过 20 帧 直接前进寻找")
2024-06-05 16:07:32 +08:00
ret, box = filter.get(label)
error = (box[0][2] + box[0][0] - 320) / 2 + offset
if abs(error) > 10 and run:
if error > 0:
by_cmd.send_speed_x(-run_speed)
else:
by_cmd.send_speed_x(run_speed)
# 停的位置已经很接近目标,可以直接使用 distance 校准
else:
if abs(error) > 8:
logger.info(f"calibrate_new:停车后误差{error}大于 8 使用 distance 校准")
# error = error # 3
if error > 0:
by_cmd.send_distance_x(-10, int(error))
else:
by_cmd.send_distance_x(10, int(-error))
return
logger.info(f"calibrate_new:停车后误差{error}小于 8 不校准")
return
2024-06-05 16:07:32 +08:00
while True:
ret, box = filter.get(label)
while not ret:
ret, box = filter.get(label)
error = (box[0][2] + box[0][0] - 320) / 2 + offset
if ret:
2024-06-09 16:48:31 +08:00
if abs(error) <= 10: # 5
2024-06-05 16:07:32 +08:00
car_stop()
logger.info("calibrate_new:行进时 误差小于 10 直接停车")
2024-06-05 16:07:32 +08:00
ret, box = filter.get(label)
while not ret:
ret, box = filter.get(label)
2024-06-09 16:48:31 +08:00
2024-06-05 16:07:32 +08:00
error = (box[0][2] + box[0][0] - 320) / 2 + offset
logger.info(f"calibrate_new:停车后的误差是{error}")
2024-06-05 16:07:32 +08:00
if abs(error) > 8:
logger.info(f"calibrate_new:停车后的误差大于 8 使用 distance 校准")
error = error * 3
logger.error(f"error * 3{error}")
2024-06-05 16:07:32 +08:00
if error > 0:
by_cmd.send_distance_x(-10, int(error))
else:
by_cmd.send_distance_x(10, int(-error))
break
def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5):
# run_direc == 1 向前
if run_direc == 1:
by_cmd.send_speed_x(run_speed)
else:
by_cmd.send_speed_x(-run_speed)
2024-06-05 16:07:32 +08:00
while True:
ret, box = filter.get(label)
2024-06-05 16:07:32 +08:00
while not ret:
ret, box = filter.get(label)
2024-06-05 16:07:32 +08:00
error = (box[0][2] + box[0][0] - 320) / 2 + offset
if ret:
# 校准速度越大 停车的条件越宽泛
if abs(error) <= 10:
car_stop()
logger.info("explore_calibrate_new:行进时 误差小于 10 直接停车")
ret, box = filter.get(label)
while not ret:
ret, box = filter.get(label)
error = (box[0][2] + box[0][0] - 320) / 2 + offset
logger.info(f"停车后像素误差:{error}")
if abs(error) > 8:
logger.info(f"explore_calibrate_new:停车后的误差大于 8 使用 distance 校准")
error = error * 3
if error > 0:
by_cmd.send_distance_x(-10, int(error))
else:
by_cmd.send_distance_x(10, int(-error))
2024-06-02 17:49:52 +08:00
break
2024-05-21 23:53:47 +08:00
# 任务类
class task:
def __init__(self, task_template, find_counts=10, enable=True):
2024-05-21 23:53:47 +08:00
self.enable = enable
self.task_t = task_template()
self.counts = 0
self.find_counts = find_counts
def init(self):
self.task_t.init()
2024-05-22 12:37:15 +08:00
def find(self):
# 检查该任执行标志
while True:
2024-06-02 17:49:52 +08:00
ret = self.task_t.find()
self.counts += ret
if self.counts >= self.find_counts:
break
2024-05-22 12:37:15 +08:00
def exec(self):
2024-05-21 23:53:47 +08:00
# 根据标志位确定是否执行该任务
if self.enable is True:
2024-05-22 12:37:15 +08:00
logger.debug(f"[Task ]# Executing task")
self.task_t.exec()
2024-05-22 12:37:15 +08:00
logger.debug(f"[Task ]# Task completed")
2024-05-21 23:53:47 +08:00
else:
2024-05-22 12:37:15 +08:00
logger.warning(f"[Task ]# Skip task")
2024-06-02 17:49:52 +08:00
self.task_t.nexec()
2024-05-21 23:53:47 +08:00
# 任务队列状态类
class task_queuem_status(Enum):
IDEL = 0
SEARCHING = 1
EXECUTING = 2
# 任务队列类 非 EXECUTEING 时均执行 huigui注意互斥操作
2024-05-21 23:53:47 +08:00
class task_queuem(task):
# task_now = task(None, False)
def __init__(self, queue):
super(task_queuem, self)
self.queue = queue
self.status = task_queuem_status.IDEL
self.busy = True
2024-05-22 12:37:15 +08:00
logger.info(f"[TaskM]# Task num {self.queue.qsize()}")
2024-05-21 23:53:47 +08:00
def exec(self):
# 如果空闲状态则将下一个队列任务取出
if self.status is task_queuem_status.IDEL:
if self.queue.qsize() == 0:
self.busy = False
2024-05-22 12:37:15 +08:00
logger.info(f"[TaskM]# Task queue empty, exit")
2024-05-21 23:53:47 +08:00
return False
self.task_now = self.queue.get()
# 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息
if self.task_now.enable is True:
self.status = task_queuem_status.SEARCHING
# 如果使能该任务则执行该任务的初始化动作
self.task_now.init()
2024-05-21 23:53:47 +08:00
else:
self.status = task_queuem_status.EXECUTING
2024-05-22 12:37:15 +08:00
logger.info(f"[TaskM]# ---------------------->>>>")
2024-05-21 23:53:47 +08:00
# 阻塞搜索任务标志位
elif self.status is task_queuem_status.SEARCHING:
2024-05-22 12:37:15 +08:00
logger.info(f"[TaskM]# Start searching task target")
self.task_now.find()
2024-05-21 23:53:47 +08:00
self.status = task_queuem_status.EXECUTING
# 执行任务函数
elif self.status is task_queuem_status.EXECUTING:
2024-05-22 12:37:15 +08:00
logger.info(f"[TaskM]# Start execute task function")
self.task_now.exec() # 执行当前任务函数
2024-05-21 23:53:47 +08:00
self.queue.task_done() # 弹出已执行的任务
self.status = task_queuem_status.IDEL #
2024-05-22 12:37:15 +08:00
logger.info(f"[TaskM]# <<<<----------------------")
2024-05-21 23:53:47 +08:00
return True
# 人员施救
class get_block1():
def init(self):
logger.info(f"人员施救 1 初始化,第一抓取块为 {cfg['get_block']['first_block']}")
2024-06-09 16:48:31 +08:00
while (by_cmd.send_angle_camera(180) == -1):
by_cmd.send_angle_camera(180)
2024-06-05 16:07:32 +08:00
filter.switch_camera(1)
if cfg['get_block']['first_block'] == "blue":
self.target_label = tlabel.BBLOCK
self.another_label = tlabel.RBLOCK
else:
self.target_label = tlabel.RBLOCK
self.another_label = tlabel.BBLOCK
cfg['get_block']['first_block'] = "red"
def find(self):
2024-05-22 18:50:21 +08:00
# 目标检测红/蓝方块
2024-06-05 16:07:32 +08:00
ret = filter.find(self.target_label)
if ret > 0:
2024-05-22 18:50:21 +08:00
return True
return False
def exec(self):
2024-06-05 16:07:32 +08:00
car_stop()
calibrate_new(self.target_label, offset = 15, run = True)
2024-06-05 16:07:32 +08:00
logger.info("抓取块")
2024-06-09 16:48:31 +08:00
time.sleep(0.5)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 60)
by_cmd.send_position_axis_x(1, 100)
time.sleep(1)
2024-06-09 16:48:31 +08:00
by_cmd.send_angle_claw_arm(220)
by_cmd.send_angle_claw(63)
by_cmd.send_position_axis_x(1, 20)
time.sleep(2)
by_cmd.send_angle_claw(30)
by_cmd.send_beep(1)
time.sleep(0.5)
by_cmd.send_beep(0)
time.sleep(0.5)
by_cmd.send_position_axis_x(1, 80)
by_cmd.send_position_axis_z(20, 130)
time.sleep(2)
by_cmd.send_angle_claw_arm(36)
time.sleep(1)
by_cmd.send_position_axis_z(20, 20)
time.sleep(0.5)
by_cmd.send_position_axis_x(1, 40)
time.sleep(2)
by_cmd.send_angle_claw(45)
time.sleep(0.5)
by_cmd.send_position_axis_z(20, 80)
time.sleep(2)
by_cmd.send_angle_claw_arm(220)
time.sleep(0.5)
by_cmd.send_position_axis_x(1, 100)
time.sleep(1)
pass
def nexec(self):
# TODO 完成不执行任务的空动作
pass
class get_block2():
def init(self):
logger.info(f"人员施救 2 初始化,第一抓取块为 {cfg['get_block']['first_block']}")
while (by_cmd.send_angle_camera(180) == -1):
by_cmd.send_angle_camera(180)
filter.switch_camera(1)
if cfg['get_block']['first_block'] == "red":
self.target_label = tlabel.BBLOCK
self.another_label = tlabel.RBLOCK
else:
self.target_label = tlabel.RBLOCK
self.another_label = tlabel.BBLOCK
def find(self):
# 目标检测红/蓝方块
ret = filter.find(self.target_label)
if ret > 0:
return True
return False
def exec(self):
car_stop()
calibrate_new(self.target_label, offset = 15, run = True)
logger.info("抓取块")
time.sleep(0.5)
by_cmd.send_angle_claw_arm(220)
by_cmd.send_angle_claw(63)
by_cmd.send_position_axis_x(1, 20)
time.sleep(1)
by_cmd.send_position_axis_z(20, 60)
time.sleep(1)
by_cmd.send_angle_claw(30)
by_cmd.send_beep(1)
time.sleep(0.5)
by_cmd.send_beep(0)
time.sleep(0.5)
by_cmd.send_position_axis_x(1, 100)
time.sleep(2)
2024-06-05 16:07:32 +08:00
pass
def nexec(self):
2024-06-02 17:49:52 +08:00
# TODO 完成不执行任务的空动作
2024-05-22 12:37:15 +08:00
pass
2024-05-21 23:53:47 +08:00
# 紧急转移
2024-05-22 12:37:15 +08:00
class put_block():
def init(self):
2024-06-09 16:48:31 +08:00
while (by_cmd.send_angle_camera(180) == -1):
by_cmd.send_angle_camera(180)
logger.info("紧急转移初始化")
2024-06-05 16:07:32 +08:00
socket.send_string("1")
socket.recv()
def find(self):
2024-05-22 18:50:21 +08:00
# 目标检测医院
ret, box = filter.get(tlabel.HOSPITAL)
if ret > 0:
width = box[0][2] - box[0][0]
2024-06-05 16:07:32 +08:00
if width > 120:
return True
return False
def exec(self):
logger.info("找到医院")
2024-06-05 16:07:32 +08:00
car_stop()
calibrate_new(tlabel.HOSPITAL, offset = 7, run = False)
by_cmd.send_position_axis_z(20, 0)
time.sleep(3)
2024-06-09 16:48:31 +08:00
by_cmd.send_position_axis_x(1, 0)
time.sleep(1)
by_cmd.send_angle_claw(63)
time.sleep(1)
by_cmd.send_position_axis_x(1, 80)
by_cmd.send_position_axis_z(20, 130)
time.sleep(2)
by_cmd.send_angle_claw_arm(36)
time.sleep(1)
by_cmd.send_position_axis_z(20, 20)
2024-06-09 16:48:31 +08:00
time.sleep(0.5)
by_cmd.send_position_axis_x(1, 40)
time.sleep(2)
by_cmd.send_angle_claw(30)
time.sleep(0.5)
by_cmd.send_position_axis_z(20, 130)
2024-06-09 21:54:22 +08:00
time.sleep(3)
by_cmd.send_angle_claw_arm(220)
by_cmd.send_distance_x(-10, 120)
by_cmd.send_position_axis_z(20, 0)
time.sleep(1)
by_cmd.send_position_axis_x(1, 0)
time.sleep(2)
2024-06-09 16:48:31 +08:00
by_cmd.send_angle_claw(63)
time.sleep(3)
2024-06-09 16:48:31 +08:00
# 下一动作预备位置
by_cmd.send_position_axis_z(20, 130)
2024-06-09 21:54:22 +08:00
time.sleep(4)
2024-06-09 16:48:31 +08:00
by_cmd.send_position_axis_x(1, 0)
2024-06-09 21:54:22 +08:00
time.sleep(1)
2024-06-09 16:48:31 +08:00
by_cmd.send_angle_claw_arm(36)
2024-05-22 12:37:15 +08:00
pass
2024-06-02 17:49:52 +08:00
def nexec(self):
2024-06-09 16:48:31 +08:00
# 下一动作预备位置
by_cmd.send_position_axis_z(20, 130)
time.sleep(1.5)
by_cmd.send_position_axis_x(1, 0)
by_cmd.send_angle_claw_arm(36)
time.sleep(4)
2024-06-02 17:49:52 +08:00
pass
2024-05-21 23:53:47 +08:00
# 整装上阵
2024-05-22 12:37:15 +08:00
class get_bball():
def init(self):
while (by_cmd.send_angle_camera(90) == -1):
by_cmd.send_angle_camera(90)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 135)
time.sleep(0.5)
by_cmd.send_position_axis_x(1, 0)
time.sleep(0.5)
2024-06-09 21:54:22 +08:00
while (by_cmd.send_angle_storage(0) == -1):
by_cmd.send_angle_storage(0)
2024-06-05 16:07:32 +08:00
# 调试 临时换源
socket.send_string("1")
socket.recv()
logger.info("整装上阵初始化")
2024-06-09 16:48:31 +08:00
# time.sleep(0.5)
def find(self):
2024-06-05 16:07:32 +08:00
# 目标检测蓝球
ret = filter.find(tlabel.BBALL) or filter.find(tlabel.YBALL)
2024-06-05 16:07:32 +08:00
if ret:
2024-05-22 18:50:21 +08:00
return True
return False
def exec(self):
2024-06-05 16:07:32 +08:00
logger.info("找到蓝色球")
car_stop()
time.sleep(0.5)
for _ in range(3):
2024-06-09 21:54:22 +08:00
calibrate_right_new(tlabel.BBALL, offset = 27, run = True, run_speed = 4)
2024-06-05 16:07:32 +08:00
logger.info("抓蓝色球")
2024-06-09 21:54:22 +08:00
time.sleep(0.5)
by_cmd.send_angle_claw_arm(36)
by_cmd.send_angle_claw(54)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_x(1, 160)
time.sleep(1)
by_cmd.send_angle_claw(70)
time.sleep(1)
by_cmd.send_angle_claw(30)
time.sleep(1)
2024-06-09 16:48:31 +08:00
by_cmd.send_distance_axis_z(20, 20)
time.sleep(1)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_x(1, 0)
time.sleep(1.5)
2024-06-09 16:48:31 +08:00
by_cmd.send_angle_claw_arm(67)
time.sleep(0.5)
by_cmd.send_angle_claw(54)
time.sleep(1)
by_cmd.send_angle_claw_arm(36)
time.sleep(1)
2024-06-09 16:48:31 +08:00
by_cmd.send_distance_axis_z(20, -20)
time.sleep(1)
2024-06-09 21:54:22 +08:00
# 继续向前走
# by_cmd.send_speed_x(4)
2024-06-09 16:48:31 +08:00
# 下一动作预备位置
by_cmd.send_angle_claw(30)
by_cmd.send_position_axis_z(20, 0)
time.sleep(2)
2024-05-22 12:37:15 +08:00
pass
2024-06-02 17:49:52 +08:00
def nexec(self):
2024-06-09 16:48:31 +08:00
by_cmd.send_angle_claw(30)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 0)
2024-06-09 16:48:31 +08:00
time.sleep(2)
2024-06-02 17:49:52 +08:00
pass
2024-05-21 23:53:47 +08:00
# 通信抢修
2024-05-22 12:37:15 +08:00
class up_tower():
def init(self):
logger.info("通信抢修初始化")
2024-06-05 16:07:32 +08:00
while (by_cmd.send_angle_camera(90) == -1):
by_cmd.send_angle_camera(90)
filter.switch_camera(1)
def find(self):
2024-05-22 18:50:21 +08:00
# 目标检测通信塔
ret = filter.find(tlabel.TOWER)
2024-06-05 16:07:32 +08:00
if ret:
2024-05-22 18:50:21 +08:00
return True
return False
def exec(self):
logger.info("找到塔")
2024-06-05 16:07:32 +08:00
car_stop()
calibrate_new(tlabel.TOWER, offset = 27, run = True)
2024-06-09 16:48:31 +08:00
by_cmd.send_light(1)
time.sleep(0.5)
2024-06-09 16:48:31 +08:00
by_cmd.send_light(0)
2024-06-05 16:07:32 +08:00
# calibrate(tlabel.TOWER, 27, False, 6)
time.sleep(3)
2024-06-09 16:48:31 +08:00
# while True:
# pass
# 下一动作预备位置
by_cmd.send_position_axis_z(20, 0)
2024-06-02 17:49:52 +08:00
def nexec(self):
2024-06-09 16:48:31 +08:00
# 下一动作预备位置
by_cmd.send_position_axis_z(20, 0)
time.sleep(4)
2024-06-02 17:49:52 +08:00
pass
2024-05-21 23:53:47 +08:00
# 高空排险
2024-05-22 12:37:15 +08:00
class get_rball():
def init(self):
socket.send_string("1")
socket.recv()
logger.info("高空排险初始化")
2024-06-02 17:49:52 +08:00
while (by_cmd.send_angle_camera(0) == -1):
by_cmd.send_angle_camera(0)
self.record = CountRecord(3)
def find(self):
2024-05-22 18:50:21 +08:00
# 目标检测红球
ret = filter.find(tlabel.RBALL)
2024-06-05 16:07:32 +08:00
if ret > 0:
# TODO 连续两帧才开始减速
if self.record(tlabel.RBALL):
utils.task_speed = 5
2024-05-22 18:50:21 +08:00
return True
else:
return False
def exec(self):
logger.info("找到红球")
2024-06-09 16:48:31 +08:00
utils.task_speed = 0
2024-06-05 16:07:32 +08:00
car_stop()
2024-06-09 16:48:31 +08:00
time.sleep(0.5)
# 靠近塔
by_cmd.send_angle_scoop(20)
by_cmd.send_distance_y(-15, 50)
time.sleep(2)
2024-06-09 16:48:31 +08:00
calibrate_new(tlabel.RBALL,offset = 50, run = True)
2024-06-05 16:07:32 +08:00
time.sleep(1)
logger.info("抓红球")
2024-06-09 16:48:31 +08:00
# by_cmd.send_angle_scoop(15)
time.sleep(0.5)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 170)
2024-06-09 16:48:31 +08:00
time.sleep(5)
by_cmd.send_angle_scoop(7)
by_cmd.send_distance_y(15, 70)
time.sleep(1)
2024-06-09 16:48:31 +08:00
# while True:
# pass
2024-06-02 17:49:52 +08:00
pass
def nexec(self):
2024-05-22 12:37:15 +08:00
pass
2024-05-21 23:53:47 +08:00
# 派发物资
2024-05-22 12:37:15 +08:00
class put_bball():
def init(self):
logger.info("派发物资初始化")
2024-06-05 16:07:32 +08:00
socket.send_string("1")
socket.recv()
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 0)
2024-06-02 17:49:52 +08:00
while (by_cmd.send_angle_camera(90) == -1):
by_cmd.send_angle_camera(90)
def find(self):
2024-06-05 16:07:32 +08:00
ret = filter.find(tlabel.BASKET)
if ret > 0:
2024-05-22 18:50:21 +08:00
return True
else:
return False
def exec(self):
logger.info("找到篮筐")
2024-06-05 16:07:32 +08:00
car_stop()
calibrate_new(tlabel.BASKET,offset = -15, run = True, run_speed = 5.5)
by_cmd.send_distance_y(-10, 65)
time.sleep(1)
by_cmd.send_angle_storage(55)
2024-06-05 16:07:32 +08:00
logger.info("把球放篮筐里")
2024-06-09 21:54:22 +08:00
time.sleep(2)
by_cmd.send_distance_y(10, 65)
time.sleep(1)
2024-06-09 21:54:22 +08:00
by_cmd.send_angle_storage(20)
2024-05-22 12:37:15 +08:00
pass
2024-06-02 17:49:52 +08:00
def nexec(self):
pass
2024-05-21 23:53:47 +08:00
# 物资盘点
2024-05-22 18:50:21 +08:00
class put_hanoi1():
def init(self):
logger.info("物资盘点 1 初始化")
socket.send_string("2")
socket.recv()
def find(self):
2024-06-05 16:07:32 +08:00
ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK])
if label == tlabel.RMARK:
if abs(error) < 55:
logger.info("向右拐")
utils.direction_right += 1
2024-06-02 17:49:52 +08:00
return True
return False
2024-06-05 16:07:32 +08:00
elif label == tlabel.LMARK:
if abs(error) < 50:
logger.info("向左拐")
utils.direction_left += 1
2024-06-02 17:49:52 +08:00
return True
return False
2024-06-05 16:07:32 +08:00
else:
return False
def exec(self):
2024-06-02 17:49:52 +08:00
global direction
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
time.sleep(0.2)
2024-06-05 16:07:32 +08:00
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 130)
2024-06-05 16:07:32 +08:00
# 校准牌子
if utils.direction_right > utils.direction_left:
2024-06-05 16:07:32 +08:00
ret, error = filter.aim_near(tlabel.RMARK)
while not ret:
ret, error = filter.aim_near(tlabel.RMARK)
utils.direction = tlabel.RMARK
logger.info("应该向右转")
2024-06-05 16:07:32 +08:00
else:
ret, error = filter.aim_near(tlabel.LMARK)
while not ret:
ret, error = filter.aim_near(tlabel.LMARK)
utils.direction = tlabel.LMARK
logger.info("应该向左转")
2024-06-09 21:54:22 +08:00
2024-06-05 16:07:32 +08:00
# 校准 omega
if error > 0:
by_cmd.send_angle_omega(-20,abs(utils.lane_error))
2024-06-05 16:07:32 +08:00
else:
by_cmd.send_angle_omega(20,abs(utils.lane_error))
2024-06-05 16:07:32 +08:00
time.sleep(0.5)
car_stop()
2024-06-05 16:07:32 +08:00
time.sleep(0.5)
2024-06-09 22:31:32 +08:00
# by_cmd.send_distance_x(10, 200)
by_cmd.send_distance_x(10, 180)
time.sleep(0.5)
2024-06-09 21:54:22 +08:00
# 根据方向初始化执行器位置
if utils.direction is tlabel.RMARK:
# FIXME 右侧的爪子会被 storage 挡住
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_x(1, 0)
by_cmd.send_angle_claw_arm(36)
by_cmd.send_angle_storage(0)
else:
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_x(1, 150)
by_cmd.send_angle_claw_arm(217)
2024-06-09 21:54:22 +08:00
by_cmd.send_angle_storage(55)
time.sleep(1.5)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 0)
if utils.direction_right > utils.direction_left:
utils.direction = tlabel.RMARK
# by_cmd.send_angle_omega(-25,430)
# by_cmd.send_angle_omega(-45,238)
by_cmd.send_angle_omega(-55,194)
2024-06-02 17:49:52 +08:00
time.sleep(2)
while (by_cmd.send_angle_camera(90) == -1):
by_cmd.send_angle_camera(90)
2024-05-22 18:50:21 +08:00
else:
utils.direction = tlabel.LMARK
# by_cmd.send_angle_omega(25,430)
# by_cmd.send_angle_omega(45,238)
by_cmd.send_angle_omega(55,194)
2024-06-02 17:49:52 +08:00
time.sleep(2)
while (by_cmd.send_angle_camera(180) == -1):
by_cmd.send_angle_camera(180)
2024-06-02 17:49:52 +08:00
time.sleep(0.5)
socket.send_string("1")
socket.recv()
2024-06-02 17:49:52 +08:00
def nexec(self):
pass
2024-05-22 18:50:21 +08:00
class put_hanoi2():
2024-06-02 17:49:52 +08:00
def __init__(self):
2024-06-02 17:49:52 +08:00
self.pos_lp = cfg['put_hanoi2']['pos_lp']
self.pos_mp = cfg['put_hanoi2']['pos_mp']
self.pos_sp = 6 - self.pos_lp - self.pos_mp
if self.pos_lp == 1:
self.target_label = tlabel.LPILLER
elif self.pos_mp == 1:
self.target_label = tlabel.MPILLER
else:
self.target_label = tlabel.SPILLER
2024-06-02 17:49:52 +08:00
self.distance_lp = self.pos_lp * cfg['put_hanoi2']['pos_gap']
self.distance_mp = self.pos_mp * cfg['put_hanoi2']['pos_gap']
self.distance_sp = self.pos_sp * cfg['put_hanoi2']['pos_gap']
logger.info(f"setting hanoi pos_lp[{self.pos_lp}] pos_mp[{self.pos_mp}] pos_sp[{self.pos_sp}]")
def init(self):
logger.info("物资盘点 2 初始化")
if utils.direction == tlabel.RMARK:
self.offset = 25
else:
self.offset = 10
def find(self):
ret, box = filter.get(self.target_label)
if ret:
utils.task_speed = 8.5
error = (box[0][2] + box[0][0] - 320) / 2 + self.offset
if abs(error) < 40:
return True
return False
def exec(self):
2024-06-09 21:54:22 +08:00
logger.info(f"direction:{utils.direction.name}")
utils.task_speed = 0
car_stop()
time.sleep(0.5)
# by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30)
time.sleep(1)
explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = 1, run_speed = 5)
logger.info("抓大平台")
2024-06-09 21:54:22 +08:00
if utils.direction is tlabel.RMARK:
# by_cmd.send_position_axis_z(20, 40)
# by_cmd.send_position_axis_x(1, 160)
# by_cmd.send_angle_claw(63)
# time.sleep(1)
# by_cmd.send_angle_claw(85)
# time.sleep(4)
# by_cmd.send_angle_claw(50)
# time.sleep(4)
# by_cmd.send_position_axis_x(2, 80)
# # FIXME 多往回收一些 会挡住识别
# by_cmd.send_distance_axis_z(20, 10)
pass
2024-06-09 21:54:22 +08:00
else:
by_cmd.send_position_axis_z(20, 30)
by_cmd.send_position_axis_x(1, 50)
2024-06-09 21:54:22 +08:00
by_cmd.send_angle_claw(63)
time.sleep(1)
by_cmd.send_angle_claw(90)
time.sleep(2)
2024-06-09 21:54:22 +08:00
by_cmd.send_angle_claw(50)
time.sleep(2)
by_cmd.send_distance_axis_z(20, 20)
time.sleep(2)
by_cmd.send_position_axis_x(2, 160)
time.sleep(4)
pass
explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 5)
logger.info("放大平台")
2024-06-09 21:54:22 +08:00
if utils.direction is tlabel.RMARK:
# by_cmd.send_distance_axis_z(20, -10)
# by_cmd.send_position_axis_x(2, 160)
# time.sleep(4)
# by_cmd.send_angle_claw(90)
# by_cmd.send_position_axis_x(1, 0)
pass
2024-06-09 21:54:22 +08:00
else:
by_cmd.send_position_axis_x(2, 60)
2024-06-09 21:54:22 +08:00
time.sleep(4)
by_cmd.send_distance_axis_z(20, -20)
time.sleep(1)
2024-06-09 21:54:22 +08:00
by_cmd.send_angle_claw(90)
time.sleep(1)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_x(1, 160)
explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = 1, run_speed = 5)
logger.info("抓中平台")
2024-06-09 21:54:22 +08:00
if utils.direction is tlabel.RMARK:
# by_cmd.send_position_axis_z(20, 40)
# by_cmd.send_position_axis_x(1, 160)
# by_cmd.send_angle_claw(63)
# time.sleep(1)
# by_cmd.send_angle_claw(85)
# time.sleep(4)
# by_cmd.send_angle_claw(37)
# time.sleep(4)
# by_cmd.send_position_axis_x(2, 40)
# # FIXME 多往回收一些 会挡住识别
# by_cmd.send_distance_axis_z(20, 10)
2024-06-09 21:54:22 +08:00
pass
else:
by_cmd.send_position_axis_z(20, 30)
by_cmd.send_position_axis_x(1, 40)
by_cmd.send_angle_claw(63)
time.sleep(1)
by_cmd.send_angle_claw(85)
time.sleep(2)
by_cmd.send_angle_claw(40)
time.sleep(2)
by_cmd.send_distance_axis_z(20, 20)
time.sleep(2)
by_cmd.send_position_axis_x(2, 160)
time.sleep(4)
2024-06-09 21:54:22 +08:00
pass
explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = -1, run_speed = 5)
logger.info("放中平台")
2024-06-09 21:54:22 +08:00
if utils.direction is tlabel.RMARK:
# by_cmd.send_distance_axis_z(20, -10)
# by_cmd.send_position_axis_x(2, 160)
# time.sleep(4)
# by_cmd.send_angle_claw(90)
# by_cmd.send_position_axis_x(1, 0)
2024-06-09 21:54:22 +08:00
pass
else:
by_cmd.send_distance_axis_z(20, 60)
time.sleep(4)
by_cmd.send_position_axis_x(2, 40)
time.sleep(4)
by_cmd.send_distance_axis_z(20, -20)
time.sleep(1)
by_cmd.send_angle_claw(90)
time.sleep(1)
by_cmd.send_position_axis_x(1, 160)
explore_calibrate_new(tlabel.SPILLER, offset = self.offset, run_direc = 1, run_speed = 5)
logger.info("抓小平台")
2024-06-09 21:54:22 +08:00
if utils.direction is tlabel.RMARK:
# by_cmd.send_position_axis_z(20, 40)
# by_cmd.send_position_axis_x(1, 160)
# by_cmd.send_angle_claw(63)
# time.sleep(1)
# by_cmd.send_angle_claw(85)
# time.sleep(4)
# by_cmd.send_angle_claw(37)
# time.sleep(4)
# by_cmd.send_position_axis_x(2, 40)
# # FIXME 多往回收一些 会挡住识别
# by_cmd.send_distance_axis_z(20, 10)
2024-06-09 21:54:22 +08:00
pass
else:
by_cmd.send_position_axis_z(20, 30)
by_cmd.send_position_axis_x(1, 40)
by_cmd.send_angle_claw(63)
time.sleep(1)
by_cmd.send_angle_claw(85)
time.sleep(2)
by_cmd.send_angle_claw(30)
time.sleep(2)
by_cmd.send_distance_axis_z(20, 20)
time.sleep(2)
by_cmd.send_position_axis_x(2, 160)
time.sleep(4)
2024-06-09 21:54:22 +08:00
pass
explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = -1, run_speed = 5)
logger.info("放小平台")
2024-06-09 21:54:22 +08:00
if utils.direction is tlabel.RMARK:
# by_cmd.send_distance_axis_z(20, -10)
# by_cmd.send_position_axis_x(2, 160)
# time.sleep(4)
# by_cmd.send_angle_claw(90)
# by_cmd.send_position_axis_x(1, 0)
pass
2024-06-02 17:49:52 +08:00
else:
by_cmd.send_distance_axis_z(20, 155)
time.sleep(4)
by_cmd.send_position_axis_x(2, 40)
time.sleep(4)
by_cmd.send_distance_axis_z(20, -20)
time.sleep(1)
by_cmd.send_angle_claw(90)
time.sleep(1)
by_cmd.send_position_axis_x(1, 160)
time.sleep(2)
2024-06-09 21:54:22 +08:00
# while True:
# pass
2024-06-02 17:49:52 +08:00
2024-06-09 21:54:22 +08:00
def nexec(self):
pass
class put_hanoi3():
def init(self):
by_cmd.send_position_axis_z(20, 130)
time.sleep(3)
2024-06-09 21:54:22 +08:00
logger.info("完成任务,爪回左侧")
by_cmd.send_angle_claw_arm(128)
time.sleep(0.5)
by_cmd.send_position_axis_x(1, 150)
time.sleep(1)
by_cmd.send_angle_claw_arm(220)
def find(self):
time.sleep(1)
return True
def exec(self):
by_cmd.send_position_axis_z(20, 100)
2024-06-02 17:49:52 +08:00
pass
def nexec(self):
2024-05-22 12:37:15 +08:00
pass
2024-05-21 23:53:47 +08:00
2024-06-02 17:49:52 +08:00
# 应急避险 第一阶段 找目标牌
class move_area1():
def init(self):
2024-06-02 17:49:52 +08:00
logger.info("应急避险第一阶段初始化")
while (by_cmd.send_angle_camera(180) == -1):
by_cmd.send_angle_camera(180)
def find(self):
ret = filter.find(tlabel.SIGN)
if ret:
2024-05-22 18:50:21 +08:00
return True
return False
def exec(self):
logger.info("找到标示牌")
2024-06-05 16:07:32 +08:00
# 停车 ocr 识别文字 调用大模型
car_stop()
2024-06-05 16:07:32 +08:00
time.sleep(2)
utils.task_speed = 8
2024-06-02 17:49:52 +08:00
pass
def nexec(self):
pass
# 应急避险 第二阶段 找停车区域
class move_area2():
def init(self):
logger.info("应急避险第二阶段初始化")
self.offset = 15
2024-06-02 17:49:52 +08:00
def find(self):
# time.sleep(0.001)
ret, box = filter.get(tlabel.SHELTER)
if ret:
error = (box[0][2] + box[0][0] - 320) / 2 + self.offset
if abs(error) < 20:
return 5000
return False
2024-06-02 17:49:52 +08:00
def exec(self):
utils.task_speed = 0
2024-06-02 17:49:52 +08:00
logger.info("开始寻找停车区域")
car_stop()
calibrate_new(tlabel.SHELTER, offset = 15, run = True)
2024-06-02 17:49:52 +08:00
time.sleep(1)
# 进入停车区域
by_cmd.send_distance_y(10, 450)
time.sleep(3)
2024-06-05 16:07:32 +08:00
# TODO 调用大模型 然后执行动作
by_cmd.send_light(1)
time.sleep(2)
by_cmd.send_light(0)
2024-06-02 17:49:52 +08:00
# 离开停车区域
by_cmd.send_distance_y(-10, 450)
time.sleep(3)
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 0)
2024-06-02 17:49:52 +08:00
pass
def nexec(self):
2024-06-09 21:54:22 +08:00
by_cmd.send_position_axis_z(20, 0)
2024-05-22 12:37:15 +08:00
pass
2024-06-09 22:31:32 +08:00
2024-05-21 23:53:47 +08:00
# 扫黑除暴
2024-05-22 12:37:15 +08:00
class kick_ass():
def init(self):
2024-06-09 22:31:32 +08:00
while (by_cmd.send_angle_camera(180) == -1):
by_cmd.send_angle_camera(180)
logger.info("扫黑除暴初始化")
2024-06-02 17:49:52 +08:00
self.pos_gap1 = cfg['kick_ass']['pos_gap1']
self.pos_gap2 = cfg['kick_ass']['pos_gap2']
self.target_person = cfg['kick_ass']['target_person']
def find(self):
ret = filter.find(tlabel.SIGN)
if ret:
2024-05-22 18:50:21 +08:00
return True
return False
def exec(self):
logger.info("找到标示牌")
# 停的晚无需校准 omage
car_stop()
2024-06-02 17:49:52 +08:00
time.sleep(1)
calibrate_new(tlabel.SIGN, offset = 8, run = True)
2024-06-02 17:49:52 +08:00
time.sleep(0.5)
2024-06-09 22:31:32 +08:00
by_cmd.send_position_axis_z(20, 60)
2024-06-02 17:49:52 +08:00
if self.target_person == 1:
target_distance = self.pos_gap1
2024-06-02 17:49:52 +08:00
else:
target_distance = self.pos_gap1 + (self.target_person - 1) * self.pos_gap2 + (self.target_person - 1) * 10
by_cmd.send_distance_x(10, target_distance)
2024-06-02 17:49:52 +08:00
time.sleep(5)
2024-06-09 22:31:32 +08:00
by_cmd.send_angle_claw_arm(220)
by_cmd.send_angle_claw(15)
by_cmd.send_position_axis_x(1, 160)
time.sleep(2)
by_cmd.send_position_axis_x(1, 0)
time.sleep(4)
by_cmd.send_position_axis_x(1, 160)
time.sleep(3)
2024-06-02 17:49:52 +08:00
pass
def nexec(self):
2024-05-22 12:37:15 +08:00
pass
2024-05-21 23:53:47 +08:00