TA的每日心情 | 开心 2025-10-22 07:09 |
|---|
签到天数: 590 天 连续签到: 1 天 [LV.9]以坛为家II
状元
- 积分
- 8782
|
【前言】
如需要远程控制小车,必须实时获取到小车的状态。本篇将分享如何使用AIMB-2210强大的NPU来实现。
【前提】
【AIMB-2210】AI编写小车控制界面 - #板卡评测 - 与非网
在这一篇中,我详细的分享了如何制做远程控制的界面,这次的工程是在他的上面添加。
【硬件平台】
1、【AIMB-2210】主机
2、智能小车平台
【软件】
vscode+ai
【实现方法】
使用vscode打开文件,在AI对象中将数据解板协义发送给AI
然后提出按协议解析收到的数据包
在强大的【AIMB-2210】主机加持下,生成了代码如下:
- import serial
- import time
- import struct
- import tkinter as tk
- from tkinter import ttk, messagebox
- import serial.tools.list_ports
- import threading
- def calculate_checksum(data):
- """
- 计算校验和:前面数据累加和的低8位
- """
- return sum(data) & 0xFF
- def parse_x_protocol_frame(frame):
- """
- 解析X-Protocol帧
-
- Args:
- frame: 完整的帧数据
-
- Returns:
- dict: 解析出的数据字典
- """
- if len(frame) < 5:
- return None
-
- # 检查帧头
- if frame[0] != 0xAA or frame[1] != 0x55:
- return None
-
- frame_len = frame[2]
- if len(frame) < frame_len:
- return None
-
- # 验证校验和
- checksum = frame[frame_len - 1]
- calculated_checksum = calculate_checksum(frame[:-1])
- if checksum != calculated_checksum:
- return None
-
- frame_code = frame[3]
- data_bytes = frame[4:frame_len - 1]
-
- # 根据帧码解析数据
- result = {"frame_code": frame_code}
-
- # 处理您实际使用的帧格式 (帧码 0x10)
- if frame_code == 0x10:
- # 数据部分有21字节,按照常见的传感器数据格式解析
- # 前6字节是加速度数据 (3个16位整数)
- try:
- if len(data_bytes) >= 2:
- result["acc_x"] = struct.unpack('>h', data_bytes[0:2])[0]
- if len(data_bytes) >= 4:
- result["acc_y"] = struct.unpack('>h', data_bytes[2:4])[0]
- if len(data_bytes) >= 6:
- result["acc_z"] = struct.unpack('>h', data_bytes[4:6])[0]
-
- # 接下来6字节是陀螺仪数据 (3个16位整数)
- if len(data_bytes) >= 8:
- result["gyro_x"] = struct.unpack('>h', data_bytes[6:8])[0]
- if len(data_bytes) >= 10:
- result["gyro_y"] = struct.unpack('>h', data_bytes[8:10])[0]
- if len(data_bytes) >= 12:
- result["gyro_z"] = struct.unpack('>h', data_bytes[10:12])[0]
-
- # 接下来4字节是速度数据 (2个16位整数)
- if len(data_bytes) >= 14:
- result["vel_x"] = struct.unpack('>h', data_bytes[12:14])[0]
- if len(data_bytes) >= 16:
- result["vel_y"] = struct.unpack('>h', data_bytes[14:16])[0]
-
- # 接下来2字节是角速度W
- if len(data_bytes) >= 18:
- result["vel_w"] = struct.unpack('>h', data_bytes[16:18])[0]
-
- # 最后几个字节中取电压数据
- if len(data_bytes) >= 20:
- result["voltage"] = struct.unpack('>h', data_bytes[18:20])[0]
- except Exception as e:
- print(f"数据解析错误: {e}")
-
- # 保留原有的解析逻辑以兼容其他帧格式
- elif frame_code == 0x01:
- if len(data_bytes) >= 6:
- result["acc_x"] = struct.unpack('>h', data_bytes[0:2])[0]
- result["acc_y"] = struct.unpack('>h', data_bytes[2:4])[0]
- result["acc_z"] = struct.unpack('>h', data_bytes[4:6])[0]
- elif frame_code == 0x02:
- if len(data_bytes) >= 6:
- result["gyro_x"] = struct.unpack('>h', data_bytes[0:2])[0]
- result["gyro_y"] = struct.unpack('>h', data_bytes[2:4])[0]
- result["gyro_z"] = struct.unpack('>h', data_bytes[4:6])[0]
- elif frame_code == 0x03:
- if len(data_bytes) >= 6:
- result["vel_x"] = struct.unpack('>h', data_bytes[0:2])[0]
- result["vel_y"] = struct.unpack('>h', data_bytes[2:4])[0]
- result["vel_w"] = struct.unpack('>h', data_bytes[4:6])[0]
- elif frame_code == 0x04:
- if len(data_bytes) >= 2:
- result["voltage"] = struct.unpack('>h', data_bytes[0:2])[0]
-
- return result
- def send_x_protocol_frame(ser, frame_code, data):
- """
- 发送X-Protocol帧
-
- Args:
- ser: 串口对象
- frame_code: 帧码
- data: 数据字节串
- """
- # 构造帧(不包括校验和)
- frame = bytearray()
- frame.append(0xAA) # 帧头1
- frame.append(0x55) # 帧头2
- frame.append(5 + len(data)) # 帧长
- frame.append(frame_code) # 帧码
- frame.extend(data) # 数据
-
- # 计算校验和(从第0位开始计算整个帧)
- checksum = calculate_checksum(frame)
- frame.append(checksum) # 校验和
-
- # 发送帧
- ser.write(frame)
- print(f"发送帧: {' '.join(f'{b:02X}' for b in frame)}")
- return len(frame)
- def send_values(ser, frame_code, value1, value2, value3):
- """
- 发送三个16位整数
-
- Args:
- ser: 串口对象
- frame_code: 帧码
- value1, value2, value3: 三个16位整数
- """
- # 构造6字节的有效数据
- data = bytearray()
- data.extend(struct.pack('>h', value1)) # 第一个16位整数
- data.extend(struct.pack('>h', value2)) # 第二个16位整数
- data.extend(struct.pack('>h', value3)) # 第三个16位整数
-
- # 发送帧
- bytes_sent = send_x_protocol_frame(ser, frame_code, data)
- print(f"发送数据: {value1}, {value2}, {value3}")
- return bytes_sent
- class XProtocolGUI:
- def __init__(self, root):
- self.root = root
- self.root.title("X-Protocol 串口发送工具")
- self.root.geometry("400x500")
- self.ser = None
- self.receiving_thread = None
- self.running = False
-
- self.create_widgets()
- self.refresh_ports()
-
- def create_widgets(self):
- # 主框架
- main_frame = ttk.Frame(self.root, padding="10")
- main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
-
- # 串口选择
- ttk.Label(main_frame, text="串口:").grid(row=0, column=0, sticky=tk.W, pady=5)
- self.port_var = tk.StringVar()
- self.port_combo = ttk.Combobox(main_frame, textvariable=self.port_var, width=15)
- self.port_combo.grid(row=0, column=1, sticky=(tk.W, tk.E), pady=5)
-
- # 刷新按钮
- refresh_btn = ttk.Button(main_frame, text="刷新", command=self.refresh_ports)
- refresh_btn.grid(row=0, column=2, padx=(10,0), pady=5)
-
- # 连接按钮
- self.connect_btn = ttk.Button(main_frame, text="连接", command=self.toggle_connection)
- self.connect_btn.grid(row=0, column=3, padx=(10,0), pady=5)
-
- # 分隔线
- separator = ttk.Separator(main_frame, orient='horizontal')
- separator.grid(row=1, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=10)
-
- # 数据输入
- ttk.Label(main_frame, text="数据1:").grid(row=2, column=0, sticky=tk.W, pady=5)
- self.value1_var = tk.StringVar(value="100")
- ttk.Entry(main_frame, textvariable=self.value1_var, width=10).grid(row=2, column=1, sticky=tk.W, pady=5)
-
- ttk.Label(main_frame, text="数据2:").grid(row=3, column=0, sticky=tk.W, pady=5)
- self.value2_var = tk.StringVar(value="0")
- ttk.Entry(main_frame, textvariable=self.value2_var, width=10).grid(row=3, column=1, sticky=tk.W, pady=5)
-
- ttk.Label(main_frame, text="数据3:").grid(row=4, column=0, sticky=tk.W, pady=5)
- self.value3_var = tk.StringVar(value="0")
- ttk.Entry(main_frame, textvariable=self.value3_var, width=10).grid(row=4, column=1, sticky=tk.W, pady=5)
-
- # 帧码输入
- ttk.Label(main_frame, text="帧码:").grid(row=5, column=0, sticky=tk.W, pady=5)
- self.frame_code_var = tk.StringVar(value="50")
- ttk.Entry(main_frame, textvariable=self.frame_code_var, width=10).grid(row=5, column=1, sticky=tk.W, pady=5)
-
- # 发送按钮
- self.send_btn = ttk.Button(main_frame, text="发送数据", command=self.send_data, state=tk.DISABLED)
- self.send_btn.grid(row=6, column=0, columnspan=2, pady=20)
-
- # 显示区域
- display_frame = ttk.LabelFrame(main_frame, text="接收到的数据", padding="10")
- display_frame.grid(row=7, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=10)
- # 创建多个标签用于显示数据
- labels = [
- ("帧码:", "frame_code"),
- ("加速度X:", "acc_x"),
- ("加速度Y:", "acc_y"),
- ("加速度Z:", "acc_z"),
- ("陀螺仪X:", "gyro_x"),
- ("陀螺仪Y:", "gyro_y"),
- ("陀螺仪Z:", "gyro_z"),
- ("速度X:", "vel_x"),
- ("速度Y:", "vel_y"),
- ("速度W:", "vel_w"),
- ("电池电压:", "voltage"),
- ]
- self.display_vars = {}
- for i, (label_text, key) in enumerate(labels):
- ttk.Label(display_frame, text=label_text).grid(row=i, column=0, sticky=tk.W)
- var = tk.StringVar(value="---")
- self.display_vars[key] = var
- ttk.Label(display_frame, textvariable=var, width=15).grid(row=i, column=1, sticky=tk.W)
-
- # 状态栏
- self.status_var = tk.StringVar(value="请选择串口并连接")
- self.status_bar = ttk.Label(main_frame, textvariable=self.status_var, relief=tk.SUNKEN)
- self.status_bar.grid(row=8, column=0, columnspan=4, sticky=(tk.W, tk.E), pady=(10,0))
-
- # 配置网格权重
- self.root.columnconfigure(0, weight=1)
- self.root.rowconfigure(0, weight=1)
- main_frame.columnconfigure(1, weight=1)
-
- def refresh_ports(self):
- """刷新可用串口列表"""
- ports = [port.device for port in serial.tools.list_ports.comports()]
- self.port_combo['values'] = ports
- if ports:
- self.port_combo.set(ports[0])
-
- def toggle_connection(self):
- """切换串口连接状态"""
- if self.ser is None:
- self.connect_serial()
- else:
- self.disconnect_serial()
-
- def connect_serial(self):
- """连接串口"""
- try:
- port = self.port_var.get()
- if not port:
- messagebox.showerror("错误", "请选择串口")
- return
- self.ser = serial.Serial(port, 230400, timeout=1)
- self.running = True
- self.connect_btn.config(text="断开")
- self.send_btn.config(state=tk.NORMAL)
- self.status_var.set(f"已连接到 {port}")
- print(f"串口 {port} 打开成功")
- # 启动接收线程
- self.start_receiving()
- except Exception as e:
- messagebox.showerror("串口错误", f"无法打开串口: {str(e)}")
- self.ser = None
-
- def disconnect_serial(self):
- """断开串口连接"""
- self.running = False
- if self.ser:
- self.ser.close()
- self.ser = None
- self.connect_btn.config(text="连接")
- self.send_btn.config(state=tk.DISABLED)
- self.status_var.set("已断开连接")
-
- def start_receiving(self):
- """启动接收线程"""
- if self.ser is None:
- return
- self.receiving_thread = threading.Thread(target=self.receive_data, daemon=True)
- self.receiving_thread.start()
- def receive_data(self):
- """接收并解析数据"""
- buffer = bytearray()
- while self.running and self.ser and self.ser.is_open:
- try:
- # 逐字节读取
- byte = self.ser.read(1)
- if not byte:
- continue
- buffer.append(byte[0])
-
- # 至少需要帧头3个字节才能判断帧长
- if len(buffer) >= 3:
- # 检查是否是帧头
- if buffer[0] == 0xAA and buffer[1] == 0x55:
- frame_len = buffer[2] # 第三个字节是帧长度
- # 当缓冲区长度达到帧长度时尝试解析
- if len(buffer) >= frame_len and frame_len > 0:
- # 尝试解析
- parsed = parse_x_protocol_frame(buffer[:frame_len])
- if parsed:
- # 使用after方法确保在主线程更新UI
- self.root.after(0, self.update_display, parsed)
- buffer = buffer[frame_len:] # 移除已处理部分
- else:
- buffer = buffer[1:] # 移动窗口
- elif len(buffer) > 100: # 如果缓冲区太长且没有找到帧头,则清空
- buffer = bytearray()
- except Exception as e:
- print(f"接收异常: {e}")
- break
- def update_display(self, data):
- """更新显示区域"""
- # print(f"更新显示: {data}")
- for key, var in self.display_vars.items():
- if key in data:
- # 对于帧码,显示为十六进制;其他数据显示为十进制
- if key == "frame_code":
- var.set(f"0x{data[key]:02X}")
- elif key == "voltage":
- # 电池电压需要除以100
- var.set(f"{data[key]/100:.3f}")
- elif key in ["vel_x", "vel_y", "vel_w"]:
- # 速度值需要除以1000
- var.set(f"{data[key]/1000:.3f}")
- else:
- var.set(f"{data[key]:.3f}")
- else:
- # 如果某个字段不存在,保持原值或设置为默认值
- if var.get() == "---":
- pass # 保持默认值
- def send_data(self):
- """发送数据"""
- if not self.ser:
- messagebox.showerror("错误", "请先连接串口")
- return
-
- try:
- # 获取输入值
- value1 = int(self.value1_var.get())
- value2 = int(self.value2_var.get())
- value3 = int(self.value3_var.get())
- frame_code = int(self.frame_code_var.get(), 16) # 帧码以十六进制解析
-
- # 发送数据
- send_values(self.ser, frame_code, value1, value2, value3)
- self.status_var.set(f"已发送: {value1}, {value2}, {value3}")
-
- except ValueError as e:
- messagebox.showerror("输入错误", "请输入有效的整数")
- except Exception as e:
- messagebox.showerror("发送错误", f"发送失败: {str(e)}")
-
- def on_closing(self):
- """关闭程序时断开串口"""
- self.running = False
- if self.ser:
- self.ser.close()
- self.root.destroy()
- def main():
- root = tk.Tk()
- app = XProtocolGUI(root)
- root.protocol("WM_DELETE_WINDOW", app.on_closing)
- root.mainloop()
- if __name__ == "__main__":
- main()
复制代码 【界面效果】
当我发送速度为100时,小车慢慢向前,同时反馈的速度也显示到界面了。
【总结】
【AIMB-2210】非常强大,配AI可以实现生成程序代码,是编程好帮手。下一步,将定位信采集,偿试使用自动运行。
|
|