feat: 新增文心一言接口
This commit is contained in:
@@ -9,6 +9,10 @@ from loguru import logger
|
||||
from utils import tlabel
|
||||
import zmq
|
||||
import time
|
||||
from by_cmd_py import by_cmd_py
|
||||
import time
|
||||
from utils import CountRecord
|
||||
by_cmd = by_cmd_py()
|
||||
|
||||
context = zmq.Context()
|
||||
socket = context.socket(zmq.REQ)
|
||||
@@ -17,35 +21,210 @@ logger.info("subtask yolo client init")
|
||||
|
||||
filter = label_filter(socket)
|
||||
filter.switch_camera(1)
|
||||
offset = 12
|
||||
while True:
|
||||
time.sleep(0.2)
|
||||
# ret1 = filter.get_mult(tlabel.RMARK)
|
||||
# ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK])
|
||||
# if ret:
|
||||
# # print(error)
|
||||
# if abs(error) < 40:
|
||||
# logger.info('yes')
|
||||
|
||||
label = tlabel.RBLOCK
|
||||
def car_stop():
|
||||
for _ in range(3):
|
||||
by_cmd.send_speed_x(0)
|
||||
time.sleep(0.2)
|
||||
by_cmd.send_speed_omega(0)
|
||||
# 蓝色球使用
|
||||
def calibrate_right_new(label, offset, run = True, run_speed = 3.5):
|
||||
not_found_counts = 0
|
||||
ret, error = filter.aim_right(label)
|
||||
while not ret:
|
||||
not_found_counts += 1
|
||||
if not_found_counts >= 20:
|
||||
not_found_counts = 0
|
||||
error = -320 # error > 0 front run
|
||||
logger.info("calibrate_right_new:找不到次数超过 20 帧 直接前进寻找")
|
||||
break
|
||||
ret, error = filter.aim_right(label)
|
||||
|
||||
error += offset
|
||||
if abs(error) > 10 and run:
|
||||
if error > 0:
|
||||
by_cmd.send_speed_x(-run_speed)
|
||||
else:
|
||||
by_cmd.send_speed_x(run_speed)
|
||||
pass
|
||||
# 停的位置已经很接近目标,可以直接使用 distance 校准
|
||||
else:
|
||||
# logger.info("停车时误差就小于")
|
||||
# error = error * 3
|
||||
# if error > 0:
|
||||
# by_cmd.send_distance_x(-10, int(error))
|
||||
# else:
|
||||
# by_cmd.send_distance_x(10, int(-error))
|
||||
return
|
||||
while True:
|
||||
ret, error = filter.aim_right(label)
|
||||
while not ret:
|
||||
ret, error = filter.aim_right(label)
|
||||
error += offset
|
||||
if ret:
|
||||
if abs(error) <= 8:
|
||||
car_stop()
|
||||
logger.info("calibrate_right_new:行进时 误差小于 8 直接停车")
|
||||
|
||||
ret, error = filter.aim_right(label)
|
||||
while not ret:
|
||||
ret, error = filter.aim_right(label)
|
||||
error += offset
|
||||
logger.info(f"calibrate_right_new:停车后的误差是{error}")
|
||||
if abs(error) > 8:
|
||||
logger.info(f"calibrate_right_new:停车后的误差大于 8 使用 distance 校准")
|
||||
error = error * 3
|
||||
if error > 0:
|
||||
by_cmd.send_distance_x(-10, int(error))
|
||||
else:
|
||||
by_cmd.send_distance_x(10, int(-error))
|
||||
|
||||
break
|
||||
'''
|
||||
description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车
|
||||
如果停车后 error > 8 则使用 distance 校准
|
||||
这个方法仅用于在视野里能找到的情况下进行校准,并不能实现边走边寻找 然后再校准
|
||||
param {*} label
|
||||
param {*} offset
|
||||
param {*} run
|
||||
param {*} run_speed
|
||||
return {*}
|
||||
'''
|
||||
def calibrate_new(label, offset, run = True, run_speed = 3.5):
|
||||
not_found_counts = 0
|
||||
ret, box = filter.get(label)
|
||||
while not ret:
|
||||
# 注意这里一定要保证摄像头内有该目标 否则会无限循环
|
||||
not_found_counts += 1
|
||||
if not_found_counts >= 20:
|
||||
not_found_counts = 0
|
||||
error = -320 # error > 0 front run
|
||||
logger.info("calibrate_new:找不到次数超过 20 帧 直接前进寻找")
|
||||
ret, box = filter.get(label)
|
||||
error = (box[0][2] + box[0][0] - 320) / 2 + offset
|
||||
# ret, box = filter.get(tlabel.BBLOCK)
|
||||
|
||||
if ret:
|
||||
print(error)
|
||||
error = (box[0][2] + box[0][0] - 320) / 2 + offset
|
||||
if abs(error) > 10 and run:
|
||||
if error > 0:
|
||||
by_cmd.send_speed_x(-run_speed)
|
||||
else:
|
||||
by_cmd.send_speed_x(run_speed)
|
||||
# 停的位置已经很接近目标,可以直接使用 distance 校准
|
||||
else:
|
||||
pass
|
||||
if abs(error) > 8:
|
||||
logger.info(f"calibrate_new:停车后误差{error}大于 8 使用 distance 校准")
|
||||
# error = error # 3
|
||||
if error > 0:
|
||||
by_cmd.send_distance_x(-10, int(error))
|
||||
else:
|
||||
by_cmd.send_distance_x(10, int(-error))
|
||||
return
|
||||
logger.info(f"calibrate_new:停车后误差{error}小于 8 不校准")
|
||||
return
|
||||
while True:
|
||||
ret, box = filter.get(label)
|
||||
while not ret:
|
||||
ret, box = filter.get(label)
|
||||
|
||||
error = (box[0][2] + box[0][0] - 320) / 2 + offset
|
||||
if ret:
|
||||
if abs(error) <= 10: # 5
|
||||
car_stop()
|
||||
logger.info("calibrate_new:行进时 误差小于 10 直接停车")
|
||||
|
||||
ret, box = filter.get(label)
|
||||
while not ret:
|
||||
ret, box = filter.get(label)
|
||||
|
||||
error = (box[0][2] + box[0][0] - 320) / 2 + offset
|
||||
logger.info(f"calibrate_new:停车后的误差是{error}")
|
||||
if abs(error) > 8:
|
||||
logger.info(f"calibrate_new:停车后的误差大于 8 使用 distance 校准")
|
||||
error = error * 3
|
||||
logger.error(f"error * 3{error}")
|
||||
if error > 0:
|
||||
by_cmd.send_distance_x(-10, int(error))
|
||||
else:
|
||||
by_cmd.send_distance_x(10, int(-error))
|
||||
|
||||
break
|
||||
def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5):
|
||||
# run_direc == 1 向前
|
||||
if run_direc == 1:
|
||||
by_cmd.send_speed_x(run_speed)
|
||||
else:
|
||||
by_cmd.send_speed_x(-run_speed)
|
||||
|
||||
while True:
|
||||
ret, box = filter.get(label)
|
||||
while not ret:
|
||||
ret, box = filter.get(label)
|
||||
error = (box[0][2] + box[0][0] - 320) / 2 + offset
|
||||
if ret:
|
||||
logger.info(f"当前误差:{error}, box[{box[0][2]},{box[0][0]}]")
|
||||
# 校准速度越大 停车的条件越宽泛
|
||||
if abs(error) <= 20:
|
||||
car_stop()
|
||||
logger.info("explore_calibrate_new:行进时 误差小于 10 直接停车")
|
||||
|
||||
error_sum = 0
|
||||
for _ in range(3):
|
||||
ret, box = filter.get(label)
|
||||
while not ret:
|
||||
ret, box = filter.get(label)
|
||||
error = (box[0][2] + box[0][0] - 320) / 2 + offset
|
||||
error_sum += error
|
||||
error_sum /= 3
|
||||
# logger.info(f"停车后像素误差:{error}")
|
||||
logger.info(f"停车后像素误差:{error_sum}")
|
||||
# if abs(error) > 8:
|
||||
logger.info(f"explore_calibrate_new:停车后的误差大于 8 使用 distance 校准")
|
||||
error_sum = error_sum * 3
|
||||
logger.error(f"error * 3 {error}")
|
||||
if error > 0:
|
||||
by_cmd.send_distance_x(-10, int(error))
|
||||
else:
|
||||
by_cmd.send_distance_x(10, int(-error))
|
||||
|
||||
break
|
||||
|
||||
offset = 10
|
||||
# by_cmd.send_angle_claw_arm(217)
|
||||
# while True:
|
||||
# pass
|
||||
# while (by_cmd.send_angle_camera(180) == -1):
|
||||
# by_cmd.send_angle_camera(180)
|
||||
while (by_cmd.send_angle_camera(180) == -1):
|
||||
by_cmd.send_angle_camera(180)
|
||||
# by_cmd.send_speed_x(15)
|
||||
|
||||
find_counts = 0
|
||||
label = tlabel.HOSPITAL
|
||||
record = CountRecord(5)
|
||||
while True:
|
||||
time.sleep(0.2)
|
||||
|
||||
|
||||
# ret = filter.find(label)
|
||||
# ret = filter.find(label)
|
||||
# if ret > 0:
|
||||
# find_counts += 1
|
||||
# if record(label):
|
||||
# car_stop()
|
||||
# if find_counts >= 5:
|
||||
# car_stop()
|
||||
ret, box = filter.get(label)
|
||||
while not ret:
|
||||
ret, box = filter.get(label)
|
||||
width = box[0][2] - box[0][0]
|
||||
logger.info(width)
|
||||
# error = (box[0][2] + box[0][0] - 320) / 2 + offset
|
||||
# explore_calibrate_new(tlabel.LPILLER, offset = 10, run_direc = 1, run_speed = 5)
|
||||
# calibrate_new(label, offset = 15, run = True)
|
||||
# car_stop()
|
||||
# time.sleep(0.5)
|
||||
# for _ in range(3):
|
||||
# calibrate_right_new(tlabel.BBALL, offset = 27, run = True, run_speed = 4)
|
||||
# logger.info("抓蓝色球")
|
||||
# time.sleep(5)
|
||||
# logger.info("抓取块")
|
||||
# time.sleep(0.1)
|
||||
|
||||
# if ret1:
|
||||
# print(error1)
|
||||
# if abs(error1) < 30:
|
||||
# print("you")
|
||||
# elif ret2:
|
||||
# print(error2)
|
||||
# if abs(error2) < 30:
|
||||
# print("zuo")
|
||||
|
||||
@@ -14,7 +14,7 @@ class LLM:
|
||||
|
||||
self.model = 'ernie-3.5'
|
||||
self.prompt = '''你是一个机器人动作规划者,需要把我的话翻译成机器人动作规划并生成对应的 json 结果,机器人工作空间参考右手坐标系。
|
||||
严格按照下面的描述生成给定格式 json,从现在开始你仅仅给我返回 json 数据!'''
|
||||
严格按照下面的描述生成给定格式 json,从现在开始你仅仅给我返回 json 数据'''
|
||||
self.prompt += '''正确的示例如下:
|
||||
向左移 0.1m, 向左转弯 85 度 [{'func': 'move', 'x': 0, 'y': 0.1},{'func': 'turn','angle': -85}],
|
||||
向右移 0.2m, 向前 0.1m [{'func': 'move', 'x': 0, 'y': -0.2},{'func': 'move', 'x': 0.1, 'y': 0}],
|
||||
@@ -22,6 +22,8 @@ class LLM:
|
||||
原地左转 38 度 [{'func': 'turn','angle': -38}],
|
||||
蜂鸣器发声 5 秒 [{'func': 'beep', 'time': 5}]
|
||||
发光或者照亮 5 秒 [{'func': 'light', 'time': 5}]
|
||||
向右走 30cm,照亮 2s [{'func': 'move', 'x': 0, 'y': -0.3}, {'func': 'light', 'time': 2}],
|
||||
向左移 0.2m, 向后 0.1m [{'func': 'move', 'x': 0, 'y': 0.2},{'func': 'move', 'x': -0.1, 'y': 0}],
|
||||
'''
|
||||
self.messages = []
|
||||
self.resp = None
|
||||
@@ -47,6 +49,8 @@ if __name__ == "__main__":
|
||||
lmm_bot = LLM()
|
||||
while True:
|
||||
chat = input("输入:")
|
||||
print(lmm_bot.get_command_json(chat))
|
||||
resp = lmm_bot.get_command_json(chat).replace(' ', '').replace('\n', '').replace('\t', '')
|
||||
|
||||
print(eval(resp[7:-3]))
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user