from simple_pid import PID import zmq import time from loguru import logger import utils class PidWrap: def __init__(self, kp, ki, kd, setpoint=0, output_limits=1): self.pid_t = PID(kp, ki, kd, setpoint, output_limits=(0-output_limits, output_limits)) def set_target(self, target): self.pid_t.setpoint = target def set(self, kp, ki, kd): self.pid_t.kp = kp self.pid_t.ki = ki self.pid_t.kd = kd def get(self, val_in): return self.pid_t(val_in) class main_task(): def __init__(self,by_cmd): self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) self.socket.connect("tcp://localhost:6666") # 赛道回归相关 self.x = 0 self.y = 0 self.error_counts = 0 self.lane_error = 0 # 车控制对象初始化 self.by_cmd = by_cmd # 转向 pid self.pid1 = PidWrap(0.7, 0, 0,output_limits=40) self.pid1.set_target(0) def parse_data(self,data): if data.get('code') == 0: self.x += data.get('data')[0] self.y += data.get('data')[1] self.error_counts += 1 else: pass def run(self): # 运行巡线任务 self.lane_task() def lane_ntask(self): # time.sleep(0.002) # self.socket.send_string("") # resp = self.socket.recv_pyobj() # return resp.get('data')[0] - 160 pass def lane_task(self): # TODO 巡航参数从配置文件中读取 time.sleep(0.002) if self.error_counts > 2: self.x = self.x / 3 self.y = self.y / 3 self.lane_error = self.x - 160 utils.lane_error = self.lane_error # 赋全局变量 self.error_counts = 0 self.x = 0 self.y = 0 error_abs = abs(self.lane_error) if error_abs < 10: self.pid1.set(0.7, 0, 0) self.by_cmd.send_speed_x(13) # 12 elif error_abs > 45: self.by_cmd.send_speed_x(6) self.pid1.set(1.8, 0, 0) elif error_abs > 35: self.by_cmd.send_speed_x(8) self.pid1.set(1.5, 0, 0) elif error_abs > 25: self.by_cmd.send_speed_x(10) self.pid1.set(1, 0, 0) else: self.pid1.set(0.8, 0, 0) self.by_cmd.send_speed_x(11) # pid_out = self.pid1.get(self.lane_error*0.65) pid_out = self.pid1.get(self.lane_error*0.7) self.by_cmd.send_speed_omega(pid_out) self.socket.send_string("") resp = self.socket.recv_pyobj() # logger.info(resp) self.parse_data(resp)