Files
project_main/subtask.py
2024-06-02 17:49:52 +08:00

703 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from enum import Enum
from loguru import logger
from utils import label_filter
from utils import tlabel
import utils
import toml
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:6667")
logger.info("subtask yolo client init")
cfg = toml.load('cfg_subtask.toml') # 加载任务配置
logger.info("load subtask config")
by_cmd = None
filter = None
def import_obj(_by_cmd):
global by_cmd
global filter
by_cmd = _by_cmd
filter = label_filter(socket)
def car_stop():
pass
# run_on 是否继续向前行进 等待 error<5 后停止 如果不需要前进(在校准前车已经停下)
# 则不需要等待 error < 5 可以直接 aim_near
def calibrate(label, offset, run_on = True, cali_speed = 10):
logger.info("开始校准")
# go on
if run_on:
for _ in range(3):
by_cmd.send_speed_x(7)
by_cmd.send_speed_omega(0)
time.sleep(0.1)
while True:
ret, error = filter.aim_near(label)
while not ret:
ret, error = filter.aim_near(label)
error += offset
if run_on:
if abs(error) < 5:
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
break
else:
break
ret, error = filter.aim_near(label)
while not ret:
ret, error = filter.aim_near(label)
error += offset
time.sleep(1)
logger.error(error)
if abs(error) > 5:
logger.info("校准中")
if error > 0:
by_cmd.send_distance_x(-cali_speed, int(error*4))
else:
by_cmd.send_distance_x(cali_speed, int(-error*4))
logger.error(error)
time.sleep(1)
# stop
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
pass
# 任务类
class task:
def __init__(self, task_template, find_counts=10, enable=True):
self.enable = enable
self.task_t = task_template()
self.counts = 0
self.find_counts = find_counts
def init(self):
self.task_t.init()
def find(self):
# 检查该任执行标志
while True:
# if self.func_find():
# 特殊地方可以直接跳出 find
ret = self.task_t.find()
self.counts += ret
# if self.task_t.find():
# self.counts += 1
if self.counts >= self.find_counts:
break
# while self.func_find() is False:
# pass
def exec(self):
# 根据标志位确定是否执行该任务
if self.enable is True:
logger.debug(f"[Task ]# Executing task")
self.task_t.exec()
logger.debug(f"[Task ]# Task completed")
else:
logger.warning(f"[Task ]# Skip task")
self.task_t.nexec()
# 任务队列状态类
class task_queuem_status(Enum):
IDEL = 0
SEARCHING = 1
EXECUTING = 2
# 任务队列类 非 EXECUTEING 时均执行 huigui注意互斥操作
class task_queuem(task):
# task_now = task(None, False)
def __init__(self, queue):
super(task_queuem, self)
self.queue = queue
self.status = task_queuem_status.IDEL
self.busy = True
logger.info(f"[TaskM]# Task num {self.queue.qsize()}")
def exec(self):
# 如果空闲状态则将下一个队列任务取出
if self.status is task_queuem_status.IDEL:
if self.queue.qsize() == 0:
self.busy = False
logger.info(f"[TaskM]# Task queue empty, exit")
return False
self.task_now = self.queue.get()
# 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息
if self.task_now.enable is True:
self.status = task_queuem_status.SEARCHING
# 如果使能该任务则执行该任务的初始化动作
self.task_now.init()
else:
self.status = task_queuem_status.EXECUTING
logger.info(f"[TaskM]# ---------------------->>>>")
# 阻塞搜索任务标志位
elif self.status is task_queuem_status.SEARCHING:
logger.info(f"[TaskM]# Start searching task target")
self.task_now.find()
self.status = task_queuem_status.EXECUTING
# 执行任务函数
elif self.status is task_queuem_status.EXECUTING:
logger.info(f"[TaskM]# Start execute task function")
self.task_now.exec() # 执行当前任务函数
self.queue.task_done() # 弹出已执行的任务
self.status = task_queuem_status.IDEL #
logger.info(f"[TaskM]# <<<<----------------------")
return True
# 人员施救
class get_block():
def init(self):
logger.info("人员施救初始化")
def find(self):
# 目标检测红/蓝方块
ret1, list1 = filter.get(tlabel.RBLOCK)
if ret1 > 0:
logger.info("[抓方块]# find label")
return True
else:
return False
def exec(self):
calibrate(tlabel.RBLOCK,15)
# for _ in range(3):
# by_cmd.send_speed_x(7)
# by_cmd.send_speed_omega(0)
# time.sleep(0.1)
# while True:
# # logger.info("等待进入准确区域")
# ret, error = filter.aim_near(tlabel.RBLOCK)
# while not ret:
# ret, error = filter.aim_near(tlabel.RBLOCK)
# # logger.info(error)
# if abs(error) < 5:
# for _ in range(3):
# by_cmd.send_speed_x(0)
# time.sleep(0.2)
# by_cmd.send_speed_omega(0)
# break
# ret, error = filter.aim_near(tlabel.RBLOCK)
# while not ret:
# ret, error = filter.aim_near(tlabel.RBLOCK)
# time.sleep(1)
# logger.error(error)
# if abs(error) > 5:
# logger.info("校准中")
# if error > 0:
# by_cmd.send_distance_x(-10, int(error*3))
# else:
# by_cmd.send_distance_x(10, int(-error*3))
# logger.error(error)
# time.sleep(1)
# for _ in range(3):
# by_cmd.send_speed_x(0)
# time.sleep(0.2)
# by_cmd.send_speed_omega(0)
time.sleep(2)
by_cmd.send_position_axis_z(10, 150)
time.sleep(5)
by_cmd.send_angle_claw_arm(127)
time.sleep(1)
by_cmd.send_position_axis_x(4, 140)
time.sleep(4)
by_cmd.send_angle_claw_arm(220)
by_cmd.send_angle_claw(90)
time.sleep(1)
by_cmd.send_distance_axis_z(10, -70)
time.sleep(3)
by_cmd.send_angle_claw(27)
by_cmd.send_distance_axis_z(10, 10)
time.sleep(2)
by_cmd.send_distance_axis_x(4, -100)
time.sleep(1)
by_cmd.send_distance_axis_z(10, -40)
time.sleep(3)
by_cmd.send_angle_claw(35)
time.sleep(1)
by_cmd.send_position_axis_z(10, 150)
time.sleep(3)
by_cmd.send_position_axis_x(2, 140)
# 抓取第二个块后 收爪
time.sleep(3)
by_cmd.send_position_axis_x(4, 0)
def nexec(self):
# TODO 完成不执行任务的空动作
pass
# 紧急转移
class put_block():
def init(self):
logger.info("紧急转移初始化")
def find(self):
# 目标检测医院
ret1, list1 = filter.get(tlabel.HOSPITAL)
if ret1 > 0:
return True
else:
return False
def exec(self):
# TODO 测试对准效果
logger.info("找到医院")
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
time.sleep(0.1)
while True:
# logger.info("等待进入准确区域")
ret, error = filter.aim_near(tlabel.HOSPITAL)
while not ret:
ret, error = filter.aim_near(tlabel.HOSPITAL)
# logger.info(error)
if abs(error) < 5:
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
break
ret, error = filter.aim_near(tlabel.HOSPITAL)
while not ret:
ret, error = filter.aim_near(tlabel.HOSPITAL)
time.sleep(1)
logger.error(error)
if abs(error) > 5:
logger.info("校准中")
if error > 0:
by_cmd.send_distance_x(-10, int(error*3))
else:
by_cmd.send_distance_x(10, int(-error*3))
logger.error(error)
time.sleep(1)
by_cmd.send_position_axis_z(10, 150)
time.sleep(3)
# TODO 切换爪子方向
by_cmd.send_position_axis_x(2, 140)
time.sleep(2)
by_cmd.send_position_axis_z(10, 170)
pass
def nexec(self):
pass
# 整装上阵
class get_bball():
def init(self):
by_cmd.send_position_axis_x(2, 140)
logger.info("整装上阵初始化")
time.sleep(0.5)
while (by_cmd.send_angle_camera(90) == -1):
by_cmd.send_angle_camera(90)
def find(self):
# 目标检测黄球
ret1, list1 = filter.get(tlabel.YBALL)
if ret1 > 0:
return True
else:
return False
def exec(self):
logger.info("找到黄色球")
for _ in range(3):
by_cmd.send_speed_x(7)
by_cmd.send_speed_omega(0)
time.sleep(0.1)
while True:
# logger.info("等待进入准确区域")
ret, error = filter.aim_near(tlabel.YBALL)
while not ret:
ret, error = filter.aim_near(tlabel.YBALL)
# logger.info(error)
if abs(error) < 5:
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
break
ret, error = filter.aim_near(tlabel.YBALL)
while not ret:
ret, error = filter.aim_near(tlabel.YBALL)
time.sleep(1)
logger.error(error)
if abs(error) > 5:
logger.info("校准中")
if error > 0:
by_cmd.send_distance_x(-10, int(error*3))
else:
by_cmd.send_distance_x(10, int(-error*3))
logger.error(error)
time.sleep(1)
while (by_cmd.send_angle_camera(0) == -1):
by_cmd.send_angle_camera(0)
by_cmd.send_position_axis_z(20, 160)
time.sleep(2)
by_cmd.send_position_axis_x(2, 70)
time.sleep(2)
by_cmd.send_angle_claw(90)
time.sleep(0.2)
by_cmd.send_position_axis_x(2, 0)
time.sleep(2)
by_cmd.send_angle_claw(27)
time.sleep(1)
by_cmd.send_position_axis_z(20, 180)
time.sleep(1)
by_cmd.send_position_axis_x(4, 45)
time.sleep(1)
by_cmd.send_position_axis_z(20, 140)
time.sleep(3)
by_cmd.send_position_axis_x(2, 140)
time.sleep(2)
by_cmd.send_angle_claw(90)
pass
def nexec(self):
pass
# 通信抢修
class up_tower():
def init(self):
logger.info("通信抢修初始化")
def find(self):
# 目标检测通信塔
ret1, list1 = filter.get(tlabel.TOWER)
if ret1 > 0:
return True
else:
return False
def exec(self):
logger.info("找到塔")
for _ in range(3):
by_cmd.send_speed_x(7)
by_cmd.send_speed_omega(0)
time.sleep(0.1)
while True:
# logger.info("等待进入准确区域")
ret, error = filter.aim_near(tlabel.TOWER)
while not ret:
ret, error = filter.aim_near(tlabel.TOWER)
# logger.info(error)
if abs(error) < 5:
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
break
ret, error = filter.aim_near(tlabel.TOWER)
while not ret:
ret, error = filter.aim_near(tlabel.TOWER)
time.sleep(1)
logger.error(error)
if abs(error) > 5:
logger.info("校准中")
if error > 0:
by_cmd.send_distance_x(-10, int(error*3))
else:
by_cmd.send_distance_x(10, int(-error*3))
logger.error(error)
time.sleep(1)
def nexec(self):
pass
# 高空排险
class get_rball():
def init(self):
logger.info("高空排险初始化")
while (by_cmd.send_angle_camera(0) == -1):
by_cmd.send_angle_camera(0)
def find(self):
# 目标检测红球
ret1, list1 = filter.get(tlabel.RBALL)
if ret1 > 0:
return True
else:
return False
def exec(self):
logger.info("找到红球")
# TODO 高空排险 offset 需要调整
calibrate(tlabel.RBALL,15, True, 6)
pass
def nexec(self):
pass
# 派发物资
class put_bball():
def init(self):
logger.info("派发物资初始化")
while (by_cmd.send_angle_camera(90) == -1):
by_cmd.send_angle_camera(90)
def find(self):
# 目标检测通信塔
ret1, list1 = filter.get(tlabel.BASKET)
if ret1 > 0:
return True
else:
return False
def exec(self):
logger.info("找到篮筐")
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
time.sleep(1)
pass
def nexec(self):
pass
direction = tlabel.RMARK
direction_left = 0
direction_right = 0
# 物资盘点
class put_hanoi1():
def init(self):
logger.info("物资盘点 1 初始化")
socket.send_string("2")
socket.recv()
def find(self):
global direction
global direction_left
global direction_right
# 目标检测左右转向标识
# TODO 框的大小判断距离
ret1, list1 = filter.get(tlabel.RMARK)
ret2, list2 = filter.get(tlabel.LMARK)
if ret1:
logger.info("向右拐")
list1 = list1[0]
area = (list1[2] - list1[0]) * (list1[3] - list1[1])
logger.info(area)
if area > 2500:
direction_right += 1
return True
return False
elif ret2:
logger.info("向左拐")
list2 = list2[0]
area = (list2[2] - list2[0]) * (list2[3] - list2[1])
logger.info(area)
if area > 2500:
direction_left += 1
return True
return False
return False
def exec(self):
global direction
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
time.sleep(0.2)
# if direction == tlabel.RMARK:
if direction_right > direction_left:
direction = tlabel.RMARK
# by_cmd.send_distance_x(6, 200)
by_cmd.send_angle_omega(-20,380)
time.sleep(2)
while (by_cmd.send_angle_camera(90) == -1):
by_cmd.send_angle_camera(90)
else:
direction = tlabel.LMARK
# by_cmd.send_distance_x(6, 200)
by_cmd.send_angle_omega(20,500)
time.sleep(2)
while (by_cmd.send_angle_camera(180) == -1):
by_cmd.send_angle_camera(180)
time.sleep(0.5)
socket.send_string("1")
socket.recv()
pass
def nexec(self):
pass
def sub_put_hanoi2(label,distance_type,run_on,back_flag):
# logger.info("找到大红色柱体")
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
time.sleep(2)
# 对准大红色柱体
calibrate(label,15, run_on, 6)
# 抓取大红色柱体
logger.info("抓取柱体")
# 根据 direction 确定移动方向
by_cmd.send_distance_x(-8, distance_type)
# 移动 self.distance_lp 距离
# 放置物块
logger.info("开始放置柱体")
time.sleep(1)
if back_flag:
pass
# 回移相同距离
# if direction == tlabel.RMARK:
# by_cmd.send_distance_x(8, distance_type)
# else:
# by_cmd.send_distance_x(-8, distance_type)
class put_hanoi2():
def __init__(self):
self.pos_lp = cfg['put_hanoi2']['pos_lp']
self.pos_mp = cfg['put_hanoi2']['pos_mp']
self.pos_sp = 6 - self.pos_lp - self.pos_mp
self.distance_lp = self.pos_lp * cfg['put_hanoi2']['pos_gap']
self.distance_mp = self.pos_mp * cfg['put_hanoi2']['pos_gap']
self.distance_sp = self.pos_sp * cfg['put_hanoi2']['pos_gap']
logger.info(f"setting hanoi pos_lp[{self.pos_lp}] pos_mp[{self.pos_mp}] pos_sp[{self.pos_sp}]")
def init(self):
logger.info("物资盘点 2 初始化")
def find(self):
time.sleep(0.001)
return True
# # 目标检测左右转向标识
# ret1, list1 = filter.get(tlabel.LPILLER)
# ret, _ = filter.get_mult([tlabel.LPILLER])
# if ret:
# logger.info("看到了三个 直接停车")
# return 100
# if ret1 > 0:
# list1 = list1[0]
# area = (list1[2] - list1[0]) * (list1[3] - list1[1])
# logger.info(area)
# if area > 3300:
# return True
# return False
# else:
# return False
def exec(self):
# TODO 延时需要根据移动的 distance 判断
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
# 往回走一段 然后向前行进校准
by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30)
time.sleep(1)
logger.info(f"方向{direction}")
if utils.lane_error > 0:
by_cmd.send_angle_omega(-20,abs(utils.lane_error))
else:
by_cmd.send_angle_omega(20,abs(utils.lane_error))
time.sleep(1)
sub_put_hanoi2(tlabel.LPILLER, self.distance_lp, True, True)
time.sleep(5)
# 对准中蓝色柱体
sub_put_hanoi2(tlabel.MPILLER, self.distance_mp, True, True)
time.sleep(5)
# 根据 direction 确定移动方向
# 移动 self.distance_mp 距离
# 放置物块
# 回移相同距离
sub_put_hanoi2(tlabel.SPILLER, self.distance_sp, True, True)
time.sleep(5)
# 对准小红色柱体
# 根据 direction 确定移动方向
# 移动 self.distance_sp 距离
# 放置物块
pass
def nexec(self):
pass
# 应急避险 第一阶段 找目标牌
class move_area1():
def init(self):
logger.info("应急避险第一阶段初始化")
while (by_cmd.send_angle_camera(180) == -1):
by_cmd.send_angle_camera(180)
def find(self):
# 目标检测标志牌
ret, error = filter.aim_near(tlabel.SIGN)
if ret > 0 and abs(error) < 8:
return True
else:
return False
def exec(self):
logger.info("找到标示牌")
# for _ in range(3):
# by_cmd.send_speed_x(0)
# time.sleep(0.2)
# by_cmd.send_speed_omega(0)
pass
def nexec(self):
pass
# 应急避险 第二阶段 找停车区域
class move_area2():
def init(self):
logger.info("应急避险第二阶段初始化")
def find(self):
time.sleep(0.001)
return True
def exec(self):
logger.info("开始寻找停车区域")
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
calibrate(tlabel.SHELTER, 15, False, 6)
time.sleep(1)
# 进入停车区域
by_cmd.send_distance_y(10, 450)
time.sleep(3)
# 离开停车区域
by_cmd.send_distance_y(-10, 450)
time.sleep(3)
while True:
pass
pass
def nexec(self):
pass
# 扫黑除暴
class kick_ass():
def init(self):
logger.info("扫黑除暴初始化")
self.pos_gap1 = cfg['kick_ass']['pos_gap1']
self.pos_gap2 = cfg['kick_ass']['pos_gap2']
self.target_person = cfg['kick_ass']['target_person']
def find(self):
ret, error = filter.aim_near(tlabel.SIGN)
if ret > 0 and abs(error) < 8:
return True
else:
return False
def exec(self):
logger.info("找到标示牌")
for _ in range(3):
by_cmd.send_speed_x(0)
time.sleep(0.2)
by_cmd.send_speed_omega(0)
time.sleep(1)
if utils.lane_error > 0:
by_cmd.send_angle_omega(-20,abs(utils.lane_error)*1.5)
else:
by_cmd.send_angle_omega(20,abs(utils.lane_error)*1.5)
time.sleep(1)
calibrate(tlabel.SIGN, 8, False, 6)
time.sleep(0.5)
if self.target_person == 1:
by_cmd.send_distance_x(10, self.pos_gap1)
else:
by_cmd.send_distance_x(10, self.pos_gap1 + (self.target_person - 1) * self.pos_gap2)
time.sleep(2)
time.sleep(5)
pass
def nexec(self):
pass