Files
lora_plug/serial_module.c
2025-03-01 19:22:24 +08:00

473 lines
15 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <zlib.h> // 引入 zlib 库
#include <fec.h> // 引入 libfec 库
#include "hx_serial.h"
#include "hx_ringbuffer.h"
#include <pthread.h> // 引入 pthread 库
// 宏定义
#ifdef USE_FEC
#define FEC_SIZE 32 // 前向纠错冗余数据大小
#else
#define FEC_SIZE 0 // 前向纠错冗余数据大小
#endif
#define FRAME_HEADER 0xAA55 // 帧头
#define FRAME_TAIL 0x0D7E // 帧尾
#define FRAME_SIZE (240) // 每帧大小
#define HEADER_SIZE (4) // 帧头 + 帧序号 + 数据长度
#define TAIL_SIZE (2) // 帧尾
#define CHECKSUM_SIZE (4) // CRC32 校验和大小4 字节)
#define DATA_SIZE (FRAME_SIZE - HEADER_SIZE - CHECKSUM_SIZE - FEC_SIZE - TAIL_SIZE) // 数据段大小
#define RING_BUFFER_SIZE (1024 * 10) // 环形缓冲区大小 - default 10KB
#define QUEUE_MAX_SIZE 1024 // 队列最大容量
// 全局变量
static by_serial_t serial_port;
static by_ringbuf_t ring_buffer;
static unsigned char send_buffer[FRAME_SIZE];
static unsigned char recv_ack_data_buffer[DATA_SIZE];
static unsigned char frame_counter = 0;
static unsigned char data_len = 0;
static unsigned char ack_flag = 0;
unsigned char output_data[8192]; // FIXME 有可能越界
static int output_len = 0;
static unsigned char ack_data_dummy[4] = {0x00, 0x00, 0x00, 0x00};
static unsigned char ack_data[DATA_SIZE - 2] = {0};
static int ack_data_len = 0;
// 定义队列结构体
typedef struct
{
unsigned char *data;
int length;
} FrameData;
// 队列相关定义
static FrameData frame_queue[QUEUE_MAX_SIZE]; // 存储接收到的数据帧
static int queue_head = 0; // 队列头指针
static int queue_tail = 0; // 队列尾指针
static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER; // 保护队列的互斥锁
static int stop_receiving = false; // 控制接收线程停止的标志
// 前向声明
void *receive_thread_func(void *arg);
int generate_response(unsigned char *data, int length, int ack);
// 计算 CRC32 校验和
unsigned long calculate_crc32(const unsigned char *data, size_t length)
{
return crc32(0, data, length);
}
// 解析一帧数据
int parse_frame(by_ringbuf_t *ringbuf, unsigned char *output_data, int *output_len)
{
unsigned char frame[FRAME_SIZE];
unsigned char buffer[FRAME_SIZE * 3];
int available_data = by_ringbuf_available_data(ringbuf);
while (by_ringbuf_available_data(ringbuf) < FRAME_SIZE)
{
int ret = by_serial_read(&serial_port, buffer, FRAME_SIZE);
if (ret > 0)
{
by_ringbuf_append(&ring_buffer, buffer, ret);
}
}
// TODO 有无必要
// 检查是否有足够的数据解析一帧
if (available_data < FRAME_SIZE)
{
return -1; // 数据不足,无法解析
}
// 查找帧头
unsigned short header = FRAME_HEADER;
unsigned short tail = FRAME_TAIL;
int header_pos = by_ringbuf_find(ringbuf, (unsigned char *)&header, 2);
if (header_pos < 0)
{
return -1; // 没有找到帧头
}
// 弹出帧头之前的数据
by_ringbuf_pop(ringbuf, frame, header_pos);
// 检查是否有足够的数据解析一帧
if (by_ringbuf_available_data(ringbuf) < FRAME_SIZE)
{
return -1; // 数据不足,无法解析
}
// 读取帧数据
by_ringbuf_pop(ringbuf, frame, HEADER_SIZE);
int tail_pos = by_ringbuf_find(ringbuf, (unsigned char *)&tail, 2);
if (tail_pos < FRAME_SIZE - HEADER_SIZE - TAIL_SIZE - 1)
{
printf(" fail, next frame header pos: %d\n", header_pos);
return -2; // 下一帧帧头小于 FRAME_SIZE丢弃该帧
}
else
{
by_ringbuf_pop(ringbuf, frame + HEADER_SIZE, tail_pos + TAIL_SIZE);
}
// 解析帧序号、有效数据长度、数据段和 CRC32
unsigned char seq = frame[2];
unsigned char valid_data_len = frame[2 + 1];
unsigned char *data_segment = &frame[2 + 1 + 1];
unsigned int received_crc = *(unsigned long *)&frame[HEADER_SIZE + DATA_SIZE];
// 计算 CRC32 校验
unsigned int calculated_crc = calculate_crc32(frame, HEADER_SIZE + DATA_SIZE + FEC_SIZE);
if (received_crc != calculated_crc)
{
printf("CRC mismatch! Expected: %08X, Received: %08X\n", calculated_crc, received_crc);
return -2; // CRC 校验失败,丢弃该帧,严重错误
}
if (0xFF == seq)
{
// printf("Received ACK frame!\r\n");
*output_len = 0;
if (0x1926 == *(unsigned short *)(data_segment))
{
memcpy(recv_ack_data_buffer, data_segment + 2, valid_data_len - 2);
ack_data_len = valid_data_len - 2;
printf("Received ACK frame!\r\n");
return 2;
}
memset(recv_ack_data_buffer, 0, DATA_SIZE);
return 3;
}
// 将有效数据拼接到输出缓冲区
memcpy(&output_data[*output_len], data_segment, valid_data_len);
*output_len += valid_data_len;
printf("output len: %d\n", *output_len);
printf("Received frame: seq=%d, len=%d\r\n", seq, valid_data_len);
// 判断是否为最后一帧
if (valid_data_len < DATA_SIZE)
{
printf("Received last frame!\n");
return 1; // 最后一帧,解析完成
}
return 0; // 成功解析一帧,但可能还有更多帧
}
// 接收线程函数
void *receive_thread_func(void *arg)
{
while (!stop_receiving)
{
while (1)
{
int parse_result = parse_frame(&ring_buffer, output_data, &output_len);
if (parse_result == 1)
{
int ack_data_len_t = 0;
if (ack_data_len > DATA_SIZE - 2)
{
ack_data_len_t = DATA_SIZE - 2;
}
else
{
ack_data_len_t = ack_data_len;
}
if (ack_data[0] == 0x00)
{
generate_response(ack_data_dummy, sizeof(ack_data_dummy), 1);
}
else
{
generate_response(ack_data, ack_data_len_t, 1);
// generate_response(ack_data_dummy, sizeof(ack_data_dummy), 1);
}
// 将解析后的数据放入队列
pthread_mutex_lock(&queue_mutex);
if ((queue_tail + 1) % QUEUE_MAX_SIZE != queue_head)
{ // 队列未满
frame_queue[queue_tail].data = malloc(output_len);
memcpy(frame_queue[queue_tail].data, output_data, output_len);
printf("output_len2: %d\n", output_len);
frame_queue[queue_tail].length = output_len;
queue_tail = (queue_tail + 1) % QUEUE_MAX_SIZE;
}
else
{
printf("Queue is full, dropping frame!\n");
}
output_len = 0;
pthread_mutex_unlock(&queue_mutex);
break;
}
else if (parse_result == 0)
{
printf("Parsed data length: %d\n", output_len);
break; // 数据不足或解析失败,退出循环
}
else if (parse_result == -2)
{
// TODO 此处不能回应 NAK必须等数据发完
output_len = 0;
break; // 严重错误,丢弃包
}
else if (parse_result == 2)
{
ack_flag = 1;
printf("Received ACK frame!\r\n");
break;
}
else if (parse_result == 3)
{
ack_flag = 2;
printf("Received NACK frame!\r\n");
break;
}
}
usleep(1000); // 避免占用过多 CPU
}
return NULL;
}
int generate_response(unsigned char *data, int length, int ack)
{
// 响应帧结构(与普通帧相似,帧号为 0xFF数据段前 2 个字节为 ACK 标志,负载数据必须小于普通帧满数据大小,以确保被识别成孤立帧)
unsigned char send_buffer[FRAME_SIZE];
memset(send_buffer, 0, FRAME_SIZE);
// 构造帧头
unsigned short header = FRAME_HEADER;
memcpy(send_buffer, &header, 2);
// 构造帧序号和数据长度
// memcpy(send_buffer + 2, 0xFF, 1);
*(send_buffer + 2) = 0xFF;
length += 2; // 包含 ACK 标志长度
if (length > DATA_SIZE)
{
return -1; // 数据超长
}
memcpy(send_buffer + 3, &length, 1);
// magic number for ack
unsigned short ack_flag = 0x1926;
if (ack)
{
memcpy(send_buffer + HEADER_SIZE, &ack_flag, 2);
}
else
{
memset(send_buffer + HEADER_SIZE, 0, 2);
}
// 拷贝该帧对应数据段
memcpy(send_buffer + HEADER_SIZE + 2, data, length - 2);
// 计算 CRC32 校验和
unsigned long crc = calculate_crc32(send_buffer, HEADER_SIZE + DATA_SIZE + FEC_SIZE);
memcpy(send_buffer + HEADER_SIZE + DATA_SIZE + FEC_SIZE, &crc, CHECKSUM_SIZE);
unsigned short tail = FRAME_TAIL;
memcpy(send_buffer + HEADER_SIZE + DATA_SIZE + FEC_SIZE + CHECKSUM_SIZE, &tail, TAIL_SIZE);
// 发送响应帧
if (by_serial_write(&serial_port, send_buffer, FRAME_SIZE) != 0)
{
return -2; // 发送失败
}
return 0;
}
// 初始化串口
static PyObject *serial_init(PyObject *self, PyObject *args)
{
const char *dev_name;
if (!PyArg_ParseTuple(args, "s", &dev_name))
{
return NULL;
}
if (by_serial_init(&serial_port, dev_name) != 0)
{
PyErr_SetString(PyExc_IOError, "Failed to initialize serial port");
return NULL;
}
// 初始化环形缓冲区
if (by_ringbuf_init(&ring_buffer, RING_BUFFER_SIZE) != 0)
{
PyErr_SetString(PyExc_IOError, "Failed to initialize ring buffer");
return NULL;
}
// 启动接收线程
pthread_t receive_thread;
stop_receiving = false;
if (pthread_create(&receive_thread, NULL, receive_thread_func, NULL) != 0)
{
PyErr_SetString(PyExc_RuntimeError, "Failed to start receive thread");
return NULL;
}
Py_RETURN_TRUE;
}
// 发送数据
static PyObject *serial_send(PyObject *self, PyObject *args)
{
const char *data;
Py_ssize_t length;
if (!PyArg_ParseTuple(args, "s#", &data, &length))
{
return NULL;
}
size_t offset = 0;
while (offset < length)
{
memset(send_buffer, 0, FRAME_SIZE);
// 构造帧头
unsigned short header = FRAME_HEADER;
memcpy(send_buffer, &header, 2);
// 构造帧序号和数据长度
memcpy(send_buffer + 2, &frame_counter, 1);
if (length - offset > DATA_SIZE)
{
data_len = DATA_SIZE;
}
else
{
data_len = length - offset;
}
memcpy(send_buffer + 3, &data_len, 1);
// 拷贝该帧对应数据段
memcpy(send_buffer + HEADER_SIZE, data + offset, data_len);
printf("Received frame: seq=%d, len=%d\r\n", frame_counter, data_len);
// 计算 CRC32 校验和
unsigned long crc = calculate_crc32(send_buffer, HEADER_SIZE + DATA_SIZE + FEC_SIZE);
memcpy(send_buffer + HEADER_SIZE + DATA_SIZE + FEC_SIZE, &crc, CHECKSUM_SIZE);
unsigned short tail = FRAME_TAIL;
memcpy(send_buffer + HEADER_SIZE + DATA_SIZE + FEC_SIZE + CHECKSUM_SIZE, &tail, TAIL_SIZE);
// 发送帧
if (by_serial_write(&serial_port, (const char *)send_buffer, FRAME_SIZE) != 0)
{
PyErr_SetString(PyExc_IOError, "Failed to send data over serial port");
return NULL;
}
offset += DATA_SIZE;
frame_counter++;
usleep(80000);
}
unsigned short ack_timeout = 10;
// 等待 ACK 响应
while (!ack_flag)
{
usleep(100000);
if (ack_timeout-- == 0)
{
ack_flag = 0;
frame_counter = 0;
printf("ACK timeout\r\n");
Py_RETURN_FALSE;
}
}
ack_flag = 0;
frame_counter = 0;
// Py_RETURN_TRUE;
// 返回 ACK 响应
PyObject *result = Py_BuildValue("y#", recv_ack_data_buffer, ack_data_len);
ack_data_len = 0;
return result;
}
// 接收数据
static PyObject *serial_receive(PyObject *self, PyObject *args)
{
pthread_mutex_lock(&queue_mutex);
if (queue_head == queue_tail)
{
pthread_mutex_unlock(&queue_mutex);
PyErr_SetString(PyExc_IOError, "No data available in the queue");
return NULL;
}
// 获取队列中最早的数据包
FrameData frame = frame_queue[queue_head];
queue_head = (queue_head + 1) % QUEUE_MAX_SIZE;
pthread_mutex_unlock(&queue_mutex);
// 返回解析后的数据
PyObject *result = Py_BuildValue("y#", frame.data, frame.length);
free(frame.data); // 释放内存
return result;
}
static PyObject *serial_set_ack_data(PyObject *self, PyObject *args)
{
const char *input_data; // 用于存储输入的二进制数据
Py_ssize_t input_length; // 用于存储输入数据的长度
// 解析参数,期望接收到一个 bytes 对象
if (!PyArg_ParseTuple(args, "y#", &input_data, &input_length))
{
PyErr_SetString(PyExc_TypeError, "Expected a bytes object");
return NULL;
}
// 检查输入数据是否为空
if (input_length == 0)
{
PyErr_SetString(PyExc_ValueError, "Input data cannot be empty");
return NULL;
}
// 检查输入数据是否为空
if (input_length > (DATA_SIZE - 2))
{
PyErr_SetString(PyExc_ValueError, "Input data is too long");
return NULL;
}
// // 创建一个缓冲区用于存储处理后的数据
// char *processed_data = (char *)malloc(input_length);
// if (processed_data == NULL) {
// PyErr_SetString(PyExc_MemoryError, "Failed to allocate memory");
// return NULL;
// }
// // 将处理后的数据封装为 Python 的 bytes 对象
// PyObject *result = PyBytes_FromStringAndSize(processed_data, input_length);
// // 释放分配的内存
// free(processed_data);
// // 返回结果
// return result;
memcpy(ack_data, input_data, input_length);
ack_data_len = input_length;
Py_RETURN_NONE;
}
// 模块方法表
static PyMethodDef SerialMethods[] = {
{"init", serial_init, METH_VARARGS, "Initialize serial port"},
{"send", serial_send, METH_VARARGS, "Send data over serial port"},
{"receive", serial_receive, METH_VARARGS, "Receive data from serial port"},
{"set_ack_data", serial_set_ack_data, METH_VARARGS, "Set ACK data"},
{NULL, NULL, 0, NULL}};
// 模块定义
static struct PyModuleDef serialmodule = {
PyModuleDef_HEAD_INIT,
"serial_module",
"A module to transfer serializable data over serial port",
-1,
SerialMethods};
// 模块初始化函数
PyMODINIT_FUNC PyInit_serial_module(void)
{
return PyModule_Create(&serialmodule);
}