444 lines
20 KiB
Python
Raw Normal View History

2026-04-20 14:19:56 +08:00
import time
import os
import json
import requests
import re
import threading
import urllib3
2026-04-20 14:19:56 +08:00
from queue import Queue
task_queue = Queue()
llm_config = {
"api_url": "",
"api_key": "",
"model": ""
}
LOCAL_API_BASE_URL = os.getenv("LOCAL_API_BASE_URL", "https://localhost:5443")
LOCAL_API_VERIFY = os.getenv("LOCAL_API_VERIFY", "0").lower() in ("1", "true", "yes")
if not LOCAL_API_VERIFY:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def local_api_request(method, path, **kwargs):
kwargs.setdefault("timeout", 5)
kwargs.setdefault("verify", LOCAL_API_VERIFY)
return requests.request(method, f"{LOCAL_API_BASE_URL}{path}", **kwargs)
2026-04-20 14:19:56 +08:00
def load_skills_md():
try:
with open('agent/skills.md', 'r', encoding='utf-8') as f:
return f.read()
except Exception as e:
print(f"加载技能文档出错: {e}")
return ""
def llm_call(prompt):
if not llm_config["api_url"] or not llm_config["api_key"] or not llm_config["model"]:
return {"action": "stop", "args": {}}
try:
# 确保API URL以/chat/completions结尾除了讯飞MaaS API
api_url = llm_config["api_url"].strip()
# 对于非讯飞MaaS API自动添加/chat/completions路径
if "maas-api.cn" not in api_url and not api_url.endswith("/chat/completions"):
if api_url.endswith("/"):
api_url += "chat/completions"
else:
api_url += "/chat/completions"
# 准备请求头
headers = {
"Content-Type": "application/json"
}
# 检查API Key格式如果是client_id:client_secret格式使用Basic认证
api_key = llm_config["api_key"]
if ":" in api_key:
# 讯飞MaaS API格式client_id:client_secret
import base64
auth_str = base64.b64encode(api_key.encode()).decode()
headers["Authorization"] = f"Basic {auth_str}"
else:
# OpenAI API格式Bearer token
headers["Authorization"] = f"Bearer {api_key}"
# 构建请求数据
# 优先使用讯飞MaaS API格式
if "maas-api.cn" in api_url:
# 讯飞MaaS API格式
data = {
"model": llm_config["model"],
"messages": [
{"role": "system", "content": "你是一个智能垃圾桶控制助手,请根据用户输入返回对应的动作指令。"},
{"role": "user", "content": prompt}
],
"temperature": 0.7,
"max_tokens": 500
}
else:
# OpenAI API格式
data = {
"model": llm_config["model"],
"messages": [
{"role": "system", "content": "你是一个智能垃圾桶控制助手,请根据用户输入返回对应的动作指令。"},
{"role": "user", "content": prompt}
]
}
print(f"调用LLM API: {api_url}")
print(f"请求数据: {json.dumps(data, ensure_ascii=False)}")
response = requests.post(api_url, headers=headers, json=data, timeout=15)
print(f"API响应状态码: {response.status_code}")
print(f"API响应内容: {response.text}")
# 检查响应状态码
if response.status_code != 200:
print(f"API调用失败状态码: {response.status_code}")
return {"action": "stop", "args": {}}
try:
result = response.json()
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
return {"action": "stop", "args": {}}
# 处理不同API的响应格式
if "maas-api.cn" in api_url:
# 讯飞MaaS API响应格式
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
return safe_parse(content)
else:
return {"action": "stop", "args": {}}
else:
# OpenAI API响应格式
if "choices" in result and len(result["choices"]) > 0:
content = result["choices"][0]["message"]["content"]
return safe_parse(content)
else:
return {"action": "stop", "args": {}}
except Exception as e:
print(f"LLM调用出错: {e}")
return {"action": "stop", "args": {}}
def safe_parse(result):
try:
if isinstance(result, dict):
return result
if isinstance(result, list):
return result
result = result.strip()
if result.startswith("```"):
lines = result.split("\n")
for i, line in enumerate(lines):
if not line.startswith("```") and line.strip():
result = "\n".join(lines[i:])
break
if result.startswith("```"):
return {"action": "stop", "args": {}}
if result.endswith("```"):
result = result[:-3].strip()
# 尝试解析为JSON数组多个动作
try:
parsed = json.loads(result)
if isinstance(parsed, list):
return parsed
except:
pass
# 尝试解析为JSON对象单个动作
for line in result.split("\n"):
line = line.strip()
if line.startswith("{") and line.endswith("}"):
return json.loads(line)
match = re.search(r'\{[^}]+\}', result)
if match:
return json.loads(match.group())
return {"action": "stop", "args": {}}
except:
return {"action": "stop", "args": {}}
def execute_skill(action, args, motor_module):
ALLOWED = [
"move_forward", "move_backward", "turn_left", "turn_right",
"stop", "play_path", "list_paths", "delete_path", "save_path"
]
if action not in ALLOWED:
return {"status": "error", "message": "不允许的动作"}
try:
if action == "move_forward":
print("控制垃圾桶前进")
motor_module.backward(speed=0.6)
def stop_after_duration():
time.sleep(args.get("duration", 1))
motor_module.stop()
threading.Thread(target=stop_after_duration).start()
return {"status": "success", "message": f"前进{args.get('duration', 1)}"}
elif action == "move_backward":
print("控制垃圾桶后退")
motor_module.forward(speed=0.6)
def stop_after_duration():
time.sleep(args.get("duration", 1))
motor_module.stop()
threading.Thread(target=stop_after_duration).start()
return {"status": "success", "message": f"后退{args.get('duration', 1)}"}
elif action == "turn_left":
print("控制垃圾桶左旋转")
motor_module.rotate_left(speed=0.6)
def stop_after_duration():
time.sleep(args.get("duration", 1))
motor_module.stop()
threading.Thread(target=stop_after_duration).start()
return {"status": "success", "message": f"左转{args.get('duration', 1)}"}
elif action == "turn_right":
print("控制垃圾桶右旋转")
motor_module.rotate_right(speed=0.6)
def stop_after_duration():
time.sleep(args.get("duration", 1))
motor_module.stop()
threading.Thread(target=stop_after_duration).start()
return {"status": "success", "message": f"右转{args.get('duration', 1)}"}
elif action == "stop":
print("停止垃圾桶")
motor_module.stop()
return {"status": "success", "message": "已停止"}
elif action == "play_path":
path_name = args.get("name")
if not path_name:
return {"status": "error", "message": "路径名称不能为空"}
# 调用加载轨迹API
try:
response = local_api_request("GET", "/load_path", params={"name": path_name})
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
# 播放轨迹逻辑
actions = data.get("data", {}).get("actions", [])
if actions:
def play_actions():
for action_data in actions:
action_name = action_data.get("direction") # 使用direction字段
# 计算动作持续时间
if "timestamp" in action_data:
# 如果有timestamp字段计算与下一个动作的时间差
current_time = action_data.get("timestamp")
# 找到下一个动作
next_index = actions.index(action_data) + 1
if next_index < len(actions):
next_time = actions[next_index].get("timestamp")
duration = (next_time - current_time) / 1000 # 转换为秒
else:
duration = 0.5 # 默认持续时间
else:
duration = action_data.get("time", 0.5) # 兼容time字段
if action_name == "forward":
print(f"执行前进,持续{duration}")
motor_module.backward(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "backward":
print(f"执行后退,持续{duration}")
motor_module.forward(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "left":
print(f"执行左移,持续{duration}")
motor_module.move_left(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "right":
print(f"执行右移,持续{duration}")
motor_module.move_right(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "forward_left":
print(f"执行左前移动,持续{duration}")
motor_module.move_left_forward(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "forward_right":
print(f"执行右前移动,持续{duration}")
motor_module.move_right_forward(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "backward_left":
print(f"执行左后移动,持续{duration}")
motor_module.move_left_backward(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "backward_right":
print(f"执行右后移动,持续{duration}")
motor_module.move_right_backward(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "rotate_left":
print(f"执行左旋转,持续{duration}")
motor_module.rotate_left(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "rotate_right":
print(f"执行右旋转,持续{duration}")
motor_module.rotate_right(speed=0.6)
time.sleep(duration)
motor_module.stop()
elif action_name == "stop":
print("执行停止")
motor_module.stop()
time.sleep(0.1) # 短暂停止
# 可以添加其他方向的处理
time.sleep(0.1) # 动作间隔
threading.Thread(target=play_actions).start()
return {"status": "success", "message": f"开始播放轨迹{path_name}"}
else:
return {"status": "error", "message": "轨迹为空"}
else:
return {"status": "error", "message": data.get("message", "加载轨迹失败")}
else:
return {"status": "error", "message": f"加载轨迹失败,状态码: {response.status_code}"}
except Exception as e:
print(f"播放轨迹出错: {e}")
return {"status": "error", "message": f"播放轨迹出错: {e}"}
2026-04-20 14:19:56 +08:00
elif action == "list_paths":
# 调用列出轨迹API
try:
response = local_api_request("GET", "/list_paths")
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
paths = data.get("paths", [])
if paths:
return {"status": "success", "message": f"轨迹列表: {', '.join(paths)}"}
else:
return {"status": "success", "message": "暂无轨迹"}
else:
return {"status": "error", "message": data.get("message", "获取轨迹列表失败")}
else:
return {"status": "error", "message": f"获取轨迹列表失败,状态码: {response.status_code}"}
except Exception as e:
print(f"获取轨迹列表出错: {e}")
return {"status": "error", "message": f"获取轨迹列表出错: {e}"}
2026-04-20 14:19:56 +08:00
elif action == "delete_path":
path_name = args.get("name")
if not path_name:
return {"status": "error", "message": "路径名称不能为空"}
# 调用删除轨迹API
try:
response = local_api_request("DELETE", "/delete_path", json={"name": path_name})
if response.status_code == 200:
data = response.json()
if data.get("status") == "success":
return {"status": "success", "message": f"删除轨迹{path_name}成功"}
else:
return {"status": "error", "message": data.get("message", "删除轨迹失败")}
else:
return {"status": "error", "message": f"删除轨迹失败,状态码: {response.status_code}"}
except Exception as e:
print(f"删除轨迹出错: {e}")
return {"status": "error", "message": f"删除轨迹出错: {e}"}
2026-04-20 14:19:56 +08:00
elif action == "save_path":
path_name = args.get("name")
if not path_name:
return {"status": "error", "message": "路径名称不能为空"}
# 这里需要获取当前录制的轨迹,暂时返回成功消息
return {"status": "success", "message": f"保存轨迹{path_name}成功"}
2026-04-20 14:19:56 +08:00
else:
return {"status": "error", "message": "无效的动作"}
except Exception as e:
print(f"执行技能出错: {e}")
return {"status": "error", "message": f"执行技能出错: {e}"}
def create_agent_routes(app, motor_module):
@app.route('/agent_config', methods=['POST'])
def agent_config():
from flask import request, jsonify
data = request.get_json()
api_url = data.get('api_url', '')
api_key = data.get('api_key', '')
model = data.get('model', '')
llm_config['api_url'] = api_url
llm_config['api_key'] = api_key
llm_config['model'] = model
print(f"LLM配置已更新: API={api_url}, Model={model}")
return jsonify({'status': 'success', 'message': '配置已保存'})
@app.route('/agent_config', methods=['GET'])
def get_agent_config():
from flask import jsonify
return jsonify({
'status': 'success',
'api_url': llm_config['api_url'],
'api_key': llm_config['api_key'],
'model': llm_config['model']
})
@app.route('/agent_chat', methods=['POST'])
def agent_chat():
from flask import request, jsonify
data = request.get_json()
text = data.get('text')
if not text:
return jsonify({'status': 'error', 'message': '输入文本不能为空'})
try:
skills_prompt = load_skills_md()
prompt = skills_prompt + "\n用户输入:" + text
result = llm_call(prompt)
parsed_result = safe_parse(result)
# 检查是否是多个动作(列表)
if isinstance(parsed_result, list) and len(parsed_result) > 0:
# 执行第一个动作
first_action = parsed_result[0]
action = first_action.get("action", "stop")
args = first_action.get("args", {})
task_queue.put((action, args))
response = execute_skill(action, args, motor_module)
# 如果有后续动作,在第一个动作完成后执行
if len(parsed_result) > 1:
def execute_next_actions():
for i, next_action_data in enumerate(parsed_result[1:]):
# 等待前一个动作完成
time.sleep(next_action_data.get("args", {}).get("duration", 1))
# 添加0.5秒缓冲时间
time.sleep(0.5)
next_action = next_action_data.get("action", "stop")
next_args = next_action_data.get("args", {})
task_queue.put((next_action, next_args))
execute_skill(next_action, next_args, motor_module)
threading.Thread(target=execute_next_actions).start()
return jsonify({"status": "success", "message": f"开始执行复合指令,共{len(parsed_result)}个动作"})
else:
# 单个动作的处理
action = parsed_result.get("action", "stop")
args = parsed_result.get("args", {})
task_queue.put((action, args))
response = execute_skill(action, args, motor_module)
return jsonify(response)
except Exception as e:
print(f"Agent聊天出错: {e}")
return jsonify({'status': 'error', 'message': f'Agent聊天出错: {e}'})