#define PY_SSIZE_T_CLEAN #include #include #include #include #include // 引入 zlib 库 #include // 引入 libfec 库 #include "hx_serial.h" #include "hx_ringbuffer.h" #include // 引入 pthread 库 // 宏定义 #ifdef USE_FEC #define FEC_SIZE 32 // 前向纠错冗余数据大小 #else #define FEC_SIZE 0 // 前向纠错冗余数据大小 #endif #define FRAME_HEADER 0xAA55 // 帧头 #define FRAME_TAIL 0x0D7E // 帧尾 #define FRAME_SIZE (240) // 每帧大小 #define HEADER_SIZE (4) // 帧头 + 帧序号 + 数据长度 #define TAIL_SIZE (2) // 帧尾 #define CHECKSUM_SIZE (4) // CRC32 校验和大小(4 字节) #define DATA_SIZE (FRAME_SIZE - HEADER_SIZE - CHECKSUM_SIZE - FEC_SIZE - TAIL_SIZE) // 数据段大小 #define RING_BUFFER_SIZE (1024 * 10) // 环形缓冲区大小 - default 10KB #define QUEUE_MAX_SIZE 1024 // 队列最大容量 // 全局变量 static by_serial_t serial_port; static by_ringbuf_t ring_buffer; static unsigned char send_buffer[FRAME_SIZE]; static unsigned char recv_ack_data_buffer[DATA_SIZE]; static unsigned char frame_counter = 0; static unsigned char data_len = 0; static unsigned char ack_flag = 0; unsigned char output_data[8192]; // FIXME 有可能越界 static int output_len = 0; static unsigned char ack_data_dummy[4] = {0x00, 0x00, 0x00, 0x00}; static unsigned char ack_data[DATA_SIZE - 2] = {0}; static int ack_data_len = 0; // 定义队列结构体 typedef struct { unsigned char *data; int length; } FrameData; // 队列相关定义 static FrameData frame_queue[QUEUE_MAX_SIZE]; // 存储接收到的数据帧 static int queue_head = 0; // 队列头指针 static int queue_tail = 0; // 队列尾指针 static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; // 保护队列的互斥锁 static int stop_receiving = false; // 控制接收线程停止的标志 // 前向声明 void *receive_thread_func(void *arg); int generate_response(unsigned char *data, int length, int ack); // 计算 CRC32 校验和 unsigned long calculate_crc32(const unsigned char *data, size_t length) { return crc32(0, data, length); } // 解析一帧数据 int parse_frame(by_ringbuf_t *ringbuf, unsigned char *output_data, int *output_len) { unsigned char frame[FRAME_SIZE]; unsigned char buffer[FRAME_SIZE * 3]; int available_data = by_ringbuf_available_data(ringbuf); while (by_ringbuf_available_data(ringbuf) < FRAME_SIZE) { int ret = by_serial_read(&serial_port, buffer, FRAME_SIZE); if (ret > 0) { by_ringbuf_append(&ring_buffer, buffer, ret); } } // TODO 有无必要 // 检查是否有足够的数据解析一帧 if (available_data < FRAME_SIZE) { return -1; // 数据不足,无法解析 } // 查找帧头 unsigned short header = FRAME_HEADER; unsigned short tail = FRAME_TAIL; int header_pos = by_ringbuf_find(ringbuf, (unsigned char *)&header, 2); if (header_pos < 0) { return -1; // 没有找到帧头 } // 弹出帧头之前的数据 by_ringbuf_pop(ringbuf, frame, header_pos); // 检查是否有足够的数据解析一帧 if (by_ringbuf_available_data(ringbuf) < FRAME_SIZE) { return -1; // 数据不足,无法解析 } // 读取帧数据 by_ringbuf_pop(ringbuf, frame, HEADER_SIZE); int tail_pos = by_ringbuf_find(ringbuf, (unsigned char *)&tail, 2); if (tail_pos < FRAME_SIZE - HEADER_SIZE - TAIL_SIZE - 1) { printf(" fail, next frame header pos: %d\n", header_pos); return -2; // 下一帧帧头小于 FRAME_SIZE,丢弃该帧 } else { by_ringbuf_pop(ringbuf, frame + HEADER_SIZE, tail_pos + TAIL_SIZE); } // 解析帧序号、有效数据长度、数据段和 CRC32 unsigned char seq = frame[2]; unsigned char valid_data_len = frame[2 + 1]; unsigned char *data_segment = &frame[2 + 1 + 1]; unsigned int received_crc = *(unsigned long *)&frame[HEADER_SIZE + DATA_SIZE]; // 计算 CRC32 校验 unsigned int calculated_crc = calculate_crc32(frame, HEADER_SIZE + DATA_SIZE + FEC_SIZE); if (received_crc != calculated_crc) { printf("CRC mismatch! Expected: %08X, Received: %08X\n", calculated_crc, received_crc); return -2; // CRC 校验失败,丢弃该帧,严重错误 } if (0xFF == seq) { // printf("Received ACK frame!\r\n"); *output_len = 0; if (0x1926 == *(unsigned short *)(data_segment)) { memcpy(recv_ack_data_buffer, data_segment + 2, valid_data_len - 2); ack_data_len = valid_data_len - 2; printf("Received ACK frame!\r\n"); return 2; } memset(recv_ack_data_buffer, 0, DATA_SIZE); return 3; } // 将有效数据拼接到输出缓冲区 memcpy(&output_data[*output_len], data_segment, valid_data_len); *output_len += valid_data_len; printf("output len: %d\n", *output_len); printf("Received frame: seq=%d, len=%d\r\n", seq, valid_data_len); // 判断是否为最后一帧 if (valid_data_len < DATA_SIZE) { printf("Received last frame!\n"); return 1; // 最后一帧,解析完成 } return 0; // 成功解析一帧,但可能还有更多帧 } // 接收线程函数 void *receive_thread_func(void *arg) { while (!stop_receiving) { while (1) { int parse_result = parse_frame(&ring_buffer, output_data, &output_len); if (parse_result == 1) { int ack_data_len_t = 0; if (ack_data_len > DATA_SIZE - 2) { ack_data_len_t = DATA_SIZE - 2; } else { ack_data_len_t = ack_data_len; } if (ack_data[0] == 0x00) { generate_response(ack_data_dummy, sizeof(ack_data_dummy), 1); } else { generate_response(ack_data, ack_data_len_t, 1); // generate_response(ack_data_dummy, sizeof(ack_data_dummy), 1); } // 将解析后的数据放入队列 pthread_mutex_lock(&queue_mutex); if ((queue_tail + 1) % QUEUE_MAX_SIZE != queue_head) { // 队列未满 frame_queue[queue_tail].data = malloc(output_len); memcpy(frame_queue[queue_tail].data, output_data, output_len); printf("output_len2: %d\n", output_len); frame_queue[queue_tail].length = output_len; queue_tail = (queue_tail + 1) % QUEUE_MAX_SIZE; } else { printf("Queue is full, dropping frame!\n"); } output_len = 0; pthread_mutex_unlock(&queue_mutex); break; } else if (parse_result == 0) { printf("Parsed data length: %d\n", output_len); break; // 数据不足或解析失败,退出循环 } else if (parse_result == -2) { // TODO 此处不能回应 NAK,必须等数据发完 output_len = 0; break; // 严重错误,丢弃包 } else if (parse_result == 2) { ack_flag = 1; printf("Received ACK frame!\r\n"); break; } else if (parse_result == 3) { ack_flag = 2; printf("Received NACK frame!\r\n"); break; } } usleep(1000); // 避免占用过多 CPU } return NULL; } int generate_response(unsigned char *data, int length, int ack) { // 响应帧结构(与普通帧相似,帧号为 0xFF,数据段前 2 个字节为 ACK 标志,负载数据必须小于普通帧满数据大小,以确保被识别成孤立帧) unsigned char send_buffer[FRAME_SIZE]; memset(send_buffer, 0, FRAME_SIZE); // 构造帧头 unsigned short header = FRAME_HEADER; memcpy(send_buffer, &header, 2); // 构造帧序号和数据长度 // memcpy(send_buffer + 2, 0xFF, 1); *(send_buffer + 2) = 0xFF; length += 2; // 包含 ACK 标志长度 if (length > DATA_SIZE) { return -1; // 数据超长 } memcpy(send_buffer + 3, &length, 1); // magic number for ack unsigned short ack_flag = 0x1926; if (ack) { memcpy(send_buffer + HEADER_SIZE, &ack_flag, 2); } else { memset(send_buffer + HEADER_SIZE, 0, 2); } // 拷贝该帧对应数据段 memcpy(send_buffer + HEADER_SIZE + 2, data, length - 2); // 计算 CRC32 校验和 unsigned long crc = calculate_crc32(send_buffer, HEADER_SIZE + DATA_SIZE + FEC_SIZE); memcpy(send_buffer + HEADER_SIZE + DATA_SIZE + FEC_SIZE, &crc, CHECKSUM_SIZE); unsigned short tail = FRAME_TAIL; memcpy(send_buffer + HEADER_SIZE + DATA_SIZE + FEC_SIZE + CHECKSUM_SIZE, &tail, TAIL_SIZE); // 发送响应帧 if (by_serial_write(&serial_port, send_buffer, FRAME_SIZE) != 0) { return -2; // 发送失败 } return 0; } // 初始化串口 static PyObject *serial_init(PyObject *self, PyObject *args) { const char *dev_name; if (!PyArg_ParseTuple(args, "s", &dev_name)) { return NULL; } if (by_serial_init(&serial_port, dev_name) != 0) { PyErr_SetString(PyExc_IOError, "Failed to initialize serial port"); return NULL; } // 初始化环形缓冲区 if (by_ringbuf_init(&ring_buffer, RING_BUFFER_SIZE) != 0) { PyErr_SetString(PyExc_IOError, "Failed to initialize ring buffer"); return NULL; } // 启动接收线程 pthread_t receive_thread; stop_receiving = false; if (pthread_create(&receive_thread, NULL, receive_thread_func, NULL) != 0) { PyErr_SetString(PyExc_RuntimeError, "Failed to start receive thread"); return NULL; } Py_RETURN_TRUE; } // 发送数据 static PyObject *serial_send(PyObject *self, PyObject *args) { const char *data; Py_ssize_t length; if (!PyArg_ParseTuple(args, "s#", &data, &length)) { return NULL; } size_t offset = 0; while (offset < length) { memset(send_buffer, 0, FRAME_SIZE); // 构造帧头 unsigned short header = FRAME_HEADER; memcpy(send_buffer, &header, 2); // 构造帧序号和数据长度 memcpy(send_buffer + 2, &frame_counter, 1); if (length - offset > DATA_SIZE) { data_len = DATA_SIZE; } else { data_len = length - offset; } memcpy(send_buffer + 3, &data_len, 1); // 拷贝该帧对应数据段 memcpy(send_buffer + HEADER_SIZE, data + offset, data_len); printf("Received frame: seq=%d, len=%d\r\n", frame_counter, data_len); // 计算 CRC32 校验和 unsigned long crc = calculate_crc32(send_buffer, HEADER_SIZE + DATA_SIZE + FEC_SIZE); memcpy(send_buffer + HEADER_SIZE + DATA_SIZE + FEC_SIZE, &crc, CHECKSUM_SIZE); unsigned short tail = FRAME_TAIL; memcpy(send_buffer + HEADER_SIZE + DATA_SIZE + FEC_SIZE + CHECKSUM_SIZE, &tail, TAIL_SIZE); // 发送帧 if (by_serial_write(&serial_port, (const char *)send_buffer, FRAME_SIZE) != 0) { PyErr_SetString(PyExc_IOError, "Failed to send data over serial port"); return NULL; } offset += DATA_SIZE; frame_counter++; usleep(80000); } unsigned short ack_timeout = 10; // 等待 ACK 响应 while (!ack_flag) { usleep(100000); if (ack_timeout-- == 0) { ack_flag = 0; frame_counter = 0; printf("ACK timeout\r\n"); Py_RETURN_FALSE; } } ack_flag = 0; frame_counter = 0; // Py_RETURN_TRUE; // 返回 ACK 响应 PyObject *result = Py_BuildValue("y#", recv_ack_data_buffer, ack_data_len); ack_data_len = 0; return result; } // 接收数据 static PyObject *serial_receive(PyObject *self, PyObject *args) { pthread_mutex_lock(&queue_mutex); if (queue_head == queue_tail) { pthread_mutex_unlock(&queue_mutex); PyErr_SetString(PyExc_IOError, "No data available in the queue"); return NULL; } // 获取队列中最早的数据包 FrameData frame = frame_queue[queue_head]; queue_head = (queue_head + 1) % QUEUE_MAX_SIZE; pthread_mutex_unlock(&queue_mutex); // 返回解析后的数据 PyObject *result = Py_BuildValue("y#", frame.data, frame.length); free(frame.data); // 释放内存 return result; } static PyObject *serial_set_ack_data(PyObject *self, PyObject *args) { const char *input_data; // 用于存储输入的二进制数据 Py_ssize_t input_length; // 用于存储输入数据的长度 // 解析参数,期望接收到一个 bytes 对象 if (!PyArg_ParseTuple(args, "y#", &input_data, &input_length)) { PyErr_SetString(PyExc_TypeError, "Expected a bytes object"); return NULL; } // 检查输入数据是否为空 if (input_length == 0) { PyErr_SetString(PyExc_ValueError, "Input data cannot be empty"); return NULL; } // 检查输入数据是否为空 if (input_length > (DATA_SIZE - 2)) { PyErr_SetString(PyExc_ValueError, "Input data is too long"); return NULL; } // // 创建一个缓冲区用于存储处理后的数据 // char *processed_data = (char *)malloc(input_length); // if (processed_data == NULL) { // PyErr_SetString(PyExc_MemoryError, "Failed to allocate memory"); // return NULL; // } // // 将处理后的数据封装为 Python 的 bytes 对象 // PyObject *result = PyBytes_FromStringAndSize(processed_data, input_length); // // 释放分配的内存 // free(processed_data); // // 返回结果 // return result; memcpy(ack_data, input_data, input_length); ack_data_len = input_length; Py_RETURN_NONE; } // 模块方法表 static PyMethodDef SerialMethods[] = { {"init", serial_init, METH_VARARGS, "Initialize serial port"}, {"send", serial_send, METH_VARARGS, "Send data over serial port"}, {"receive", serial_receive, METH_VARARGS, "Receive data from serial port"}, {"set_ack_data", serial_set_ack_data, METH_VARARGS, "Set ACK data"}, {NULL, NULL, 0, NULL}}; // 模块定义 static struct PyModuleDef serialmodule = { PyModuleDef_HEAD_INIT, "serial_module", "A module to transfer serializable data over serial port", -1, SerialMethods}; // 模块初始化函数 PyMODINIT_FUNC PyInit_serial_module(void) { return PyModule_Create(&serialmodule); }