/******************************************************************************************* * @file backend.c * * @brief 会话层(Session Layer). 后端服务器管理,帧接收. * * (c) Copyright 2021, Shandong Huali electromechanical Co., Ltd.. * This is protected by international copyright laws. Knowledge of the * source code may not be used to write a similar product. This file may * only be used in accordance with a license and should not be redistributed * in any way. We appreciate your understanding and fairness. * * * @author Simon * @date Created: 2021.06.16-T16:08:11+0800 * *******************************************************************************************/ #include "rtthread.h" #include "stdbool.h" #include "lwip/opt.h" #include "lwip/sockets.h" #include "lwip/sys.h" #include "lwip/netdb.h" #include "lwip/netif.h" #define DBG_TAG "wcs.srv" #define DBG_LVL DBG_INFO #include /* Description */ #ifndef BE_SOCK_PORT #define BE_SOCK_PORT 2504 #endif /* 发送缓存最大字节 */ #ifndef BE_TX_MAX_SIZE #define BE_TX_MAX_SIZE 1024 #endif #define BE_SOCK_TO 10 /* socket超时时间10ms */ #define BE_BACKLOG 5 /* socket backlog */ /* 帧头 */ #define FRAME_HEAD_TAG1 0X02 #define FRAME_HEAD_TAG2 0XFD /* 帧尾 */ #define FRAME_TAIL_TAG1 0X03 #define FRAME_TAIL_TAG2 0XFC /* 帧最短大小 */ #define FRAME_MIN_SIZE 24 /** * 错误类型 * @brief 错误类型定义 */ enum { EOK, /* 无错误 */ ERR, /* 错误 */ ETO, /* 超时 */ }; /** * backend_session_t * @brief 后端会话数据 */ typedef struct { rt_thread_t server_task; /* 任务句柄 */ rt_thread_t client_task; /* 任务句柄 */ int server_fd; /* 服务端socket */ int client_fd; /* 客户端socket */ uint32_t recv_bufsz; /* 接收缓存大小 */ uint8_t *recv_buffer; /* 接收缓存 */ uint32_t cur_recv_len; /* 现接收长度 */ int (*parser_fun)(void *, int); /* 帧解析函数 */ rt_mq_t tx_buffer; /* 发送缓存 */ rt_mutex_t tx_locker; /* 发送互斥量 */ rt_mutex_t task_locker; /* 线程互斥量 */ }backend_session_t; static backend_session_t backend = {0}; /** * @funtion be_is_link_up * @brief 是否接入网络 * @Author Simon * @DateTime 2021.06.16-T16:10:20+0800 * * @return 1-是,0-否 */ static int be_is_link_up(void) { struct netif *netif = netif_find("e0"); if(netif) { if(netif_is_link_up(netif)) { if(netif->ip_addr.addr != IPADDR_ANY) { return 1; } } } return 0; } /** * @funtion be_tx_lock * @brief 发送互斥锁 * @Author Simon * @DateTime 2021.06.16-T16:10:55+0800 * */ static void be_tx_lock(void) { /* is in thread context */ // if (__get_CONTROL()) { rt_mutex_take(backend.tx_locker, RT_WAITING_FOREVER); } } /** * @funtion be_tx_unlock * @brief 发送互斥锁解锁 * @Author Simon * @DateTime 2021.06.16-T16:11:20+0800 * */ static void be_tx_unlock(void) { /* is in thread context */ // if (__get_CONTROL()) { rt_mutex_release(backend.tx_locker); } } /** * @funtion be_server_close * @brief 关闭服务器 * @Author Simon * @DateTime 2021.06.16-T16:11:37+0800 * * @param be 会话 */ static void be_server_close(backend_session_t *be) { closesocket(be->server_fd); be->server_fd = -1; } /** * @funtion be_server_create * @brief 创建服务器 * @Author Simon * @DateTime 2021.06.16-T16:11:52+0800 * * @param be 会话 * @return EOK-成功, 负数-失败 */ static int be_server_create(backend_session_t *be) { struct sockaddr_in addr; int opt = 1; /* 申请socket */ if ((be->server_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { LOG_E("create socket failed"); return -ERR; } /* 启用SO_REUSEADDR 地址重用 */ setsockopt(be->server_fd, SOL_SOCKET, SO_REUSEADDR, (const void *)&opt, sizeof(opt)); /* bind addr */ addr.sin_family = AF_INET; addr.sin_port = htons(BE_SOCK_PORT); addr.sin_addr.s_addr = INADDR_ANY; memset(&(addr.sin_zero), 0, sizeof(addr.sin_zero)); if (bind(be->server_fd, (struct sockaddr *) &addr, sizeof(struct sockaddr)) == -1) { LOG_E("bind socket[%d] failed", be->server_fd); return -ERR; } /* 监听 */ if (listen(be->server_fd, BE_BACKLOG) == -1) { LOG_E("listen socket[%d] failed", be->server_fd); return -ERR; } LOG_I("WCS server start successfully"); return EOK; } /** * @funtion be_client_close * @brief 关闭客服端 * @Author Simon * @DateTime 2021.06.16-T16:12:57+0800 * * @param be 会话 */ static void be_client_close(backend_session_t *be) { /* close connection */ closesocket(be->client_fd); be->client_fd = -1; } /** * @funtion be_send_to_client * @brief 发送数据到客户端 * @Author Simon * @DateTime 2021.06.16-T16:13:20+0800 * * @param be 会话 */ static void be_send_to_client(backend_session_t *be) { int length; static uint8_t tx_buffer[BE_TX_MAX_SIZE]; while (1) { memset(tx_buffer, 0, sizeof(tx_buffer)); be_tx_lock(); /* get buffer from ringbuffer */ length = rt_mq_recv(be->tx_buffer, tx_buffer, BE_TX_MAX_SIZE, 10); be_tx_unlock(); /* do a tx procedure */ if (length > 0) { send(be->client_fd, tx_buffer, length, 0); } else break; } } /** * @funtion be_client_getchar * @brief 从客户端socket获取1字节 * @Author Simon * @DateTime 2021.06.16-T16:13:51+0800 * * @param be 会话 * @param ch 字节指针 * @param timeout 超时时间ms * @return EOK-成功, -ETO-超时, -ERR-错误 */ static int be_client_getchar(backend_session_t *be, char *ch, int timeout) { int result = EOK; int to = 0; while (1) { result = recv(be->client_fd, ch, 1, 0); if(result > 0) { break; } else { int err = 0; err = errno; if(err == EINTR || err == EWOULDBLOCK || err == EAGAIN) { to += BE_SOCK_TO; if(to >= timeout) { return -ETO; } } else { LOG_D("socket recv error code[%d]", err); return -ERR; } } } return EOK; } /** * @funtion be_readline * @brief 从客户端socket获取1帧数据 * @Author Simon * @DateTime 2021.06.16-T16:15:19+0800 * * @param be 会话 * @return 0-未收到数据, 负数-发生错误, 正数-帧长度 */ static int be_readline(backend_session_t *be) { int read_len = 0; char ch = 0, last_ch = 0; bool is_full = false; bool is_newline = false; int rc = 0; memset(be->recv_buffer, 0x00, be->recv_bufsz); be->cur_recv_len = 0; while (be->client_fd >= 0) { rc = be_client_getchar(be, &ch, 10); if(rc != 0) { memset(be->recv_buffer, 0x00, be->recv_bufsz); be->cur_recv_len = 0; if(rc == -ETO) { rc = 0; } return rc; } /* is newline */ if(ch == FRAME_HEAD_TAG2 && last_ch == FRAME_HEAD_TAG1) { be->recv_buffer[read_len++] = last_ch; /* push last ch[first head tag] */ is_newline = true; } /* copy body */ if(is_newline) { if (read_len < be->recv_bufsz) { be->recv_buffer[read_len++] = ch; be->cur_recv_len = read_len; } else { is_full = true; } } /* is end */ if (read_len > FRAME_MIN_SIZE && ch == FRAME_TAIL_TAG2 && last_ch == FRAME_TAIL_TAG1) { if (is_full) { LOG_E("read line failed. The line data length is out of buffer size(%d)!", be->recv_bufsz); memset(be->recv_buffer, 0x00, be->recv_bufsz); be->cur_recv_len = 0; return 0; } break; } last_ch = ch; } if(read_len) { LOG_D("recv frame"); LOG_HEX(DBG_TAG, 16, be->recv_buffer, read_len); } return read_len; } /** * @funtion be_set_parser * @brief 设置数据帧解析函数 * @Author Simon * @DateTime 2021.06.16-T16:17:00+0800 * * @param parser_fun 解析函数 */ void be_set_parser(int (*parser_fun)(void *, int)) { backend.parser_fun = parser_fun; } void be_send(void *buf, int sz) { LOG_D("send frame"); LOG_HEX(DBG_TAG, 16, buf, sz); send(backend.client_fd, buf, sz, 0); } /** * @funtion be_task * @brief 后端任务 * @Author Simon * @DateTime 2021.06.16-T16:21:10+0800 * * @param arg 参数 */ static void be_server(void *arg) { struct sockaddr_in addr; socklen_t addr_size; struct timeval tm; tm.tv_sec = 1; tm.tv_usec = 0; while(be_is_link_up()) { rt_thread_delay(10); } while (1) { if(backend.server_fd < 0) { while(be_server_create(&backend) < 0) { be_server_close(&backend); rt_thread_delay(1000); } } else { int new_clinet_fd = -1; // LOG_I("waiting for connection"); /* grab new connection */ if ((new_clinet_fd = accept(backend.server_fd, (struct sockaddr *) &addr, &addr_size)) == -1) { continue; } if(new_clinet_fd >= 0) { LOG_I("new wcs client(%s:%d) connection.", inet_ntoa(addr.sin_addr), addr.sin_port); setsockopt(new_clinet_fd, SOL_SOCKET, SO_RCVTIMEO, &tm, sizeof(tm)); rt_mutex_take(backend.task_locker, RT_WAITING_FOREVER); if(backend.client_fd >= 0) { be_client_close(&backend); } backend.client_fd = new_clinet_fd; rt_mutex_release(backend.task_locker); } } } } static void be_client(void *arg) { int recv_sz; while (1) { if(backend.client_fd >= 0) { rt_mutex_take(backend.task_locker, RT_WAITING_FOREVER); /* try to send all data in tx ringbuffer */ be_send_to_client(&backend); /* do a rx procedure */ recv_sz = be_readline(&backend); if (recv_sz > 0) { if(backend.parser_fun) backend.parser_fun(backend.recv_buffer, recv_sz); } else if(recv_sz < 0) { /* close connection */ be_client_close(&backend); } rt_mutex_release(backend.task_locker); } else { rt_thread_mdelay(10); } } } /** * @funtion be_init * @brief 后端初始化 * @Author Simon * @DateTime 2021.06.16-T16:21:29+0800 * * @return 0-成功 */ static int be_init(void) { backend.recv_bufsz = 1024; backend.recv_buffer = rt_malloc(backend.recv_bufsz); backend.tx_buffer = rt_mq_create("wcs_tx", BE_TX_MAX_SIZE, 3, RT_IPC_FLAG_FIFO); backend.tx_locker = rt_mutex_create("wcs_lock", RT_IPC_FLAG_FIFO); backend.task_locker = rt_mutex_create("wcs_tlock", RT_IPC_FLAG_FIFO); backend.client_fd = -1; backend.server_fd = -1; backend.server_task = rt_thread_create("server", be_server, &backend, 4096, 11, 10); if (backend.server_task) { rt_thread_startup(backend.server_task); } backend.client_task = rt_thread_create("client", be_client, &backend, 4096, 12, 10); if (backend.client_task) { rt_thread_startup(backend.client_task); } return 0; } INIT_APP_EXPORT(be_init);