import zmq import time import numpy as np from loguru import logger import utils from utils import PidWrap class main_task(): def __init__(self,by_cmd): self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) self.socket.connect("tcp://localhost:6666") # 赛道回归相关 self.x = 0 self.y = 0 self.error_counts = 0 self.lane_error = 0 # 车控制对象初始化 self.by_cmd = by_cmd # 转向 pid # self.pid1 = PidWrap(0.5, 0, 0, output_limits=55) # 1.2 self.pid1 = PidWrap(1.2, 0, 0, output_limits=50) # 1.2 self.pid1.set_target(0) def parse_data(self,data): if data.get('code') == 0: ck_val = data.get('data') # logger.debug(ck_val) if isinstance(ck_val, np.ndarray): self.x += ck_val[0] self.y += ck_val[1] self.error_counts += 1 else: pass def run(self): # 运行巡线任务 self.lane_task() def lane_ntask(self): # time.sleep(0.002) # self.socket.send_string("") # resp = self.socket.recv_pyobj() # return resp.get('data')[0] - 160 pass def lane_task(self): # TODO 巡航参数从配置文件中读取 time.sleep(0.002) if self.error_counts > 2: self.x = self.x / 3 self.y = self.y / 3 self.lane_error = self.x - 160 utils.lane_error = self.lane_error # 赋全局变量 self.error_counts = 0 self.x = 0 self.y = 0 error_abs = abs(self.lane_error) if error_abs > 50: speed = 15 elif error_abs > 45: speed = 15 elif error_abs > 35: speed = 15 elif error_abs > 25: speed = 15 elif error_abs > 15: speed = 15 else: speed = 18 # lane_model initial # if error_abs > 50: # speed = 10 # elif error_abs > 45: # speed = 11 # elif error_abs > 35: # speed = 13 # elif error_abs > 25: # speed = 15 # elif error_abs > 15: # speed = 15 # else: # speed = 18 if utils.task_speed != 0: speed = utils.task_speed self.by_cmd.send_speed_x(speed) # pid_out = self.pid1.get(self.lane_error*0.65) pid_out = self.pid1.get(self.lane_error) # pid_out = -pid_out logger.debug(f"err={self.lane_error}, pwm out={pid_out}") self.by_cmd.send_speed_omega(pid_out) self.socket.send_string("") resp = self.socket.recv_pyobj() # logger.info(resp) self.parse_data(resp)