from simple_pid import PID import zmq import time from loguru import logger class PidWrap: def __init__(self, kp, ki, kd, setpoint=0, output_limits=1): self.pid_t = PID(kp, ki, kd, setpoint, output_limits=(0-output_limits, output_limits)) def set_target(self, target): self.pid_t.setpoint = target def set(self, kp, ki, kd): self.pid_t.kp = kp self.pid_t.ki = ki self.pid_t.kd = kd def get(self, val_in): return self.pid_t(val_in) class main_task(): def __init__(self,by_cmd): self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) self.socket.connect("tcp://localhost:6666") # 赛道回归相关 self.x = 0 self.y = 0 self.error_counts = 0 self.lane_error = 0 # 车控制对象初始化 self.by_cmd = by_cmd # 转向 pid self.pid1 = PidWrap(0.7, 0, 0,output_limits=40) self.pid1.set_target(0) def parse_data(self,data): if data.get('code') == 0: self.x += data.get('data')[0] self.y += data.get('data')[1] self.error_counts += 1 else: pass def run(self): # TODO 请求和解析回归值待完成 try: # data = self.queue.get_nowait() # self.parse_data(data) pass except: pass # 运行巡线任务 self.lane_task() def lane_task(self): # TODO 巡航参数从配置文件中读取 time.sleep(0.002) if self.error_counts > 2: self.x = self.x / 3 self.y = self.y / 3 self.lane_error = self.x - 160 self.error_counts = 0 self.x = 0 self.y = 0 error_abs = abs(self.lane_error) if error_abs < 10: self.pid1.set(0.7, 0, 0) self.by_cmd.send_speed_x(12) elif error_abs > 45: self.by_cmd.send_speed_x(6) self.pid1.set(1.8, 0, 0) elif error_abs > 35: self.by_cmd.send_speed_x(8) self.pid1.set(1.5, 0, 0) elif error_abs > 25: self.by_cmd.send_speed_x(10) self.pid1.set(1, 0, 0) else: self.pid1.set(0.8, 0, 0) self.by_cmd.send_speed_x(11) # TODO 待引入控制接口 pid_out = self.pid1.get(self.lane_error*0.65) self.by_cmd.send_speed_omega(pid_out) self.socket.send_string("") resp = self.socket.recv_pyobj() # logger.info(resp) self.parse_data(resp)