tcp粘包处理
tcp是流式传输的,是安全的, 可靠的,顺序的。
udp是数据报协议,是不可靠的。
面试中经常被问到tcp粘包是如何处理的,通过百度和自己的理解,这里做笔记记录。
如果有不对,请指正~
业务层认识tcp传输及分析方案
tcp是流式传输的,tcp协议栈可以保证传输过程的顺序,可靠性:
也就是说,发送端调用send发送,接收端肯定能按照发送顺序依次接收到。
tcp是流式传输的,协议栈只是把接收到的数据放入到对应连接fd的缓冲区中。(待从源码确定)
1:分析tcp协议栈的接收:
1:协议栈只是负责把接收到的数据放入应用层缓冲区(很大的文件传输时,可能一次放不下)。
==》自己的误区,tcp是安全的流式传输,缓冲区接收到的数据必然是完整的,有序的,只会涉及到粘包处理,不涉其他,这里要考虑大包的多次接收。
2:获取缓冲区中的包,可能是多个小包粘合在一起的大包(需要做粘包处理)
==》小包的粘包问题,需要再业务层做处理,nagle算法
3:获取缓冲区中的包,可能是一个大包拆分出的多个小包进行传输 (接收到半包)
==》默认是MTU大小,协议栈会拆包,这个应该是协议栈做了处理,收到还是一个大包,我们只需要循环接收做业务处理(并不需要组包处理)。
==》recv时,根据长度用while循环做接收,确保包的完整。(拆包后的包应该是依次接收到,从缓冲区中获取到)
4:放入缓冲区中数据的完整性,协议栈是否是一次把一整个包放进来的,应该是的(看看源码吧,协议栈是如何处理的?)
==》recv时,需要用while循环做多次的接收。
2:tcp的接收,我们需要关注的问题:
1:对粘包问题做拆包处理。(需要业务层定义协议,增加结束符,固定长度,或者指定长度)
2:对协议栈大包拆包后的数据做组包处理/持续接收(半包)。 ==》流式可靠顺序传输的
==》同一个连接,用while循环一直进行接收,应该时可以保证持续接收到的数据时这个大包的数据。
==》根据业务,需要在业务层,对大包进行自定义协议拆包后组包(用户控制发小包),或者根据长度持续接收(确保一个大包的接收完整),或者用缓冲区保存(可能多个连接同时处理的业务)等处理。
3:recv一次时,不一定能获取到一整个包。 (半包)==》应该还是只有大于MTU时拆包场景
==》同2,while多次循环接收,通过长度/校验码确保数据的完整可靠。
4:如果要关注业务层的消息机制,消息的完整性,正确性,需要业务层对消息做一定的协议处理。
3:总结及方案
1:tcp是流式安全的,传输接收到的数据必然是顺序的,有序的:
==》1:半包问题(协议栈拆包),多次while循环进行接收。
==》2:粘包问题,需要在业务层定义协议处理。
2:处理粘包问题的方案:
1:发送固定长度的字节,不够时补充特定字符如0
2:末尾终结符 如\r\n,如FTP协议
3:区分消息为头部和消息体,收到足够的数据确定包的完整性
4:混合使用在应用层做拆包和粘包的处理。(头+类型+长度+数据+尾)
实现一个tcp粘包处理的demo
这里没有考虑大包的拆包时,缓冲区的细节机制。
简单描述:
1:定义用户层协议,使用“头+data+尾部”(这里的头部和尾部自己定义比较随意)的方案,方便粘包处理以及保证数据完整性。(data数据区可以扩展协议)
2:用一个ringbuffer暂存接收到的数据(暂时不考虑半包问题,可以用多个ringbuffer保存不同的fd的缓存做处理/发送端做拆包处理,ringbuffer接收后做业务层作保处理)。
3:接收到数据后放入ringbuffer中,并根据头+尾检测ringbuffer中数据的完整性,这里只是做打印(业务处理以及组包协议定义等都可以自由扩展)
测试demo:
tcp_ringbuffer.h
/************************************* 实现针对tcp接收处理的业务ringbuffer 1:ringbuffer中的数据肯定是完整的包,然后处理粘包问题 2:作为tcp的服务端,所有的接收放入包中,来自不同的客户端和不同的消息需要扩展处理 3:如果发送很大很大的文件,tcp是如何处理的? 会是一次性一直接收吗? *************************************/ #ifndef __RINGBUFFER_H_ #define __RINGBUFFER_H_ typedef struct RINGBUFF_T{ void * data; unsigned int size; unsigned int read_pos; //数据起始位置 unsigned int write_pos; //数据终止位置 }ringbuffer_t; //创建ringbuffer ringbuffer_t * ringbuffer_create(unsigned int size); //销毁ringbuffer void ringbuffer_destroy(ringbuffer_t * ring_buffer); //往ringbuffer中存数据 写入 int ringbuffer_put(ringbuffer_t * ring_buffer, const char* buffer, unsigned int len); //判断是否是完整的数据 然后进行处理 int ringbuffer_get_len(ringbuffer_t *ring_buffer); //依赖ringbuffer_get_len 返回值申请内存,取出ring_buffer中的数据 int ringbuffer_get(ringbuffer_t * ring_buffer, char * buffer, unsigned int len); //基本接口 外部基本不用,但是函数内部有使用 //重置缓冲区 void ringbuffer_reset(ringbuffer_t * ring_buffer); //ringbuffer已经使用的内存空间的大小 int ringbuffer_use_len(ringbuffer_t * ring_buffer); //ringbuffer没有使用的内存的大小 int ringbuffer_space_len(ringbuffer_t * ring_buffer); //基本的判空和判满接口 int ringbuffer_isempty(ringbuffer_t * ring_buffer); int ringbuffer_isfull(ringbuffer_t * ring_buffer); // int get_ringbuffer_size(ringbuffer_t * ring_buffer); #endif //__RINGBUFFER_H_
tcp_ringbuffer.c
#include <stdio.h> #include <stdlib.h> #include <string.h> #include "tcp_ringbuffer.h" static inline __attribute__((const)) int is_power_of_2(unsigned long n) { return (n != 0 && ((n & (n - 1)) == 0)); } static unsigned long roundup_power_of_two(unsigned long n) { if((n & (n-1)) == 0) return n; unsigned long maxulong = (unsigned long)((unsigned long)~0); unsigned long andv = ~(maxulong&(maxulong>>1)); while((andv & n) == 0) andv = andv>>1; return andv<<1; } //创建ringbuffer ringbuffer_t * ringbuffer_create(unsigned int size) { //对入参进行校验 并且是2的次方 if (!is_power_of_2(size)) { size = roundup_power_of_two(size); } ringbuffer_t * ring_buffer; ring_buffer = (ringbuffer_t*)malloc(sizeof(*ring_buffer)); if(ring_buffer == NULL) { printf("create ringbuffer error \n"); return NULL; } ring_buffer->data = (void*)malloc(size); if(ring_buffer->data == NULL) { printf("create ringbuffer data error \n"); free(ring_buffer); return NULL; } ring_buffer->size = size; ring_buffer->read_pos = 0; ring_buffer->write_pos = 0; return ring_buffer; } //销毁ringbuffer void ringbuffer_destroy(ringbuffer_t * ring_buffer) { if(ring_buffer) { if(ring_buffer->data) { free(ring_buffer->data); ring_buffer->data = NULL; } free(ring_buffer); ring_buffer = NULL; } } // typedef struct RINGBUFF_T{ // void * data; // unsigned int size; // unsigned int read_pos; //数据起始位置 // unsigned int write_pos; //数据终止位置 // }ringbuffer_t; //往ringbuffer中存数据 写入 int ringbuffer_put(ringbuffer_t * ring_buffer, const char* buffer, unsigned int len) { if(ring_buffer->write_pos >=ring_buffer->read_pos &&(len <(ring_buffer->size - ring_buffer->write_pos +ring_buffer->read_pos))) { //进行拷贝 if(ring_buffer->size - ring_buffer->write_pos >len) { memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, len); ring_buffer->write_pos += len; }else { unsigned int right_space_len = ring_buffer->size - ring_buffer->write_pos; memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, right_space_len); memcpy(ring_buffer->data, buffer+right_space_len, len - right_space_len); ring_buffer->write_pos = len - right_space_len; } return 0; } if(ring_buffer->write_pos <ring_buffer->read_pos && (ring_buffer->read_pos - ring_buffer->write_pos) >len) { memcpy(ring_buffer->data + ring_buffer->write_pos, buffer, len); ring_buffer->write_pos += len; return 0; } return -1; } //判断是否是完整的数据 然后进行处理 int ringbuffer_get_len(ringbuffer_t *ring_buffer) { //对ringbuffer中的数据做判断解析 如果是完整的数据 则提取出去 if(ringbuffer_use_len(ring_buffer) < strlen("FFFF0D0A<header><tail>0D0AFEFE")) { printf("ringbuffer data is error [%d], [%ld]\n", ringbuffer_use_len(ring_buffer), strlen("FFFF0D0A<header><tail>0D0AFEFE")); return -1; } //判断是否是终结的字段 const char* end_str = "<tail>0D0AFEFE"; char check_end_str[20] = {0}; if(ring_buffer->write_pos >strlen(end_str)) { memcpy(check_end_str, ring_buffer->data+ring_buffer->write_pos - (strlen(end_str)), strlen(end_str)); }else { unsigned int left_len = ring_buffer->write_pos; memcpy(check_end_str, ring_buffer->data +ring_buffer->size - (strlen(end_str) - left_len), ring_buffer->size - (strlen(end_str) - left_len)); memcpy(check_end_str + (strlen(end_str) - left_len), ring_buffer->data, left_len); } printf("get check_end_str is %s \n", check_end_str); char * ret_addr = strstr(check_end_str, end_str); if(ret_addr == NULL) { return -1; } if(check_end_str - ret_addr != 0) { printf("DDDDD :why end string is error"); return -1; } return ringbuffer_use_len(ring_buffer); } //从ringbuffer中取数据做处理, 判断接收到的字符是否是终结符号,就可以去做处理 //取完数据后重置ringbuffer的位置 读取 int ringbuffer_get(ringbuffer_t * ring_buffer, char * buffer, unsigned int len) { //这里建立在ringbuffer_get_len 的基础上,传入入参,取出数据 int data_len = ringbuffer_use_len(ring_buffer); if(data_len >= len) { printf("para buffer is not enough space \n"); return -1; } if(ring_buffer->write_pos >ring_buffer->read_pos ) { printf("get data from ringbuffer len: [%d] \n", ring_buffer->write_pos - ring_buffer->read_pos); memcpy(buffer, ring_buffer->data + ring_buffer->read_pos, data_len); }else { memcpy(buffer, ring_buffer->data+ring_buffer->read_pos, ring_buffer->size - ring_buffer->read_pos); memcpy(buffer+ring_buffer->size - ring_buffer->read_pos, ring_buffer->data, data_len - (ring_buffer->size - ring_buffer->read_pos)); } ring_buffer->write_pos = 0; ring_buffer->read_pos = 0; return 0; } //直接从socket中读数据放入ringbuffer中也可以 int ringbuffer_get_from_dev() { return 0; } //直接从ringbuffer中取数据用socket进行发送 int ringbuffer_put_to_dev() { return 0; } void ringbuffer_reset(ringbuffer_t * ring_buffer) { ring_buffer->read_pos = ring_buffer->write_pos = 0; } int ringbuffer_use_len(ringbuffer_t * ring_buffer) { if(ring_buffer->write_pos >= ring_buffer->read_pos) { return ring_buffer->write_pos-ring_buffer->read_pos; } return ring_buffer->write_pos + ring_buffer->size - ring_buffer->read_pos; } int ringbuffer_space_len(ringbuffer_t * ring_buffer) { if(ring_buffer->write_pos >= ring_buffer->read_pos) { return ring_buffer->read_pos +(ring_buffer->size - ring_buffer->write_pos); } return ring_buffer->read_pos - ring_buffer->write_pos; } int ringbuffer_isempty(ringbuffer_t * ring_buffer) { return ringbuffer_use_len(ring_buffer) == 0? 0 :-1; } int ringbuffer_isfull(ringbuffer_t * ring_buffer) { return ringbuffer_space_len(ring_buffer) == 0? 0 :-1; } // int get_ringbuffer_size(ringbuffer_t * ring_buffer) // { // return ring_buffer->size; // }
tcp_sticky_bag.c
//在业务层实现对tcp的粘包,拆包处理逻辑 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <errno.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <sys/epoll.h> #include "tcp_ringbuffer.h" /********************************************************************* https://bbs.csdn.net/topics/380167545 梳理一下有关tcp数据包的定义方案: TCP的可靠性是到哪种程度,在传输过程中数据的内容或长度有没有可能出错,不管是意外还是人为,数据的内容是可能出错的。 首先LZ的问题很单纯, ===》对于粘包情况,是记录长度拆分好呢,还是按特定分隔符来拆分好。 1楼、2楼的回答, ===》通常是用固定长度,或者记录长度 3楼就问, ===》结束符的判断不受欢迎?以前的老代码用过结束符,其中难道有什么玄机 接着zhao4zhong1 ===》就给出个简单的方法base64,将二进制数据流转化成文本来,然后用'\0'作为结束符, 也就是要将二进制流中的'\0'转译掉,又懒得自己写代码,所以用了现成的base64,虽然很浪费资源,但也凑合着可以用。 单到这点上,还是个很好的答案的。 不过接下来的问题就奇怪了,竟然奇怪的问了用包头加包体几个问题。一下子引得无数人进攻。然后就是口水了。 使用分隔符和包头加包体的方案,都是为了处理数据的安全性的。 1、仅收到一个字节时怎么办。 :你用base64时仅收到一个字母而没有受到'\0'时怎么办? :你怎么办这里就怎么办 2、收到两个字节的包头比如FF FF(表示后续包体长度为65535,或者包头+包体长度65535即后续包体长度65533),但后续收到数据长度迟迟达不到要求怎么办? :你用base64时一直收到字母和数字,就是收不到'\0'时怎么办? :你怎么办这里就怎么办 3、比如约定每个包最大长度为0x1000,但收到的包头中长度超过0x1000怎么办? :base64中可能出现的字符范围是固定的,但收到的数据里有超过范围的字符怎么办?比如收到个值为00000001的字节 :对于这种数据出错的情况,你怎么办的这里就这么办 4、应用需要发送超过65535个字节的包怎么办 :base64的字符只有64种来表示数据,对于一个值>64的字节要怎么表示? :base64可以把数据分成很多个字符来表示,这里为什么就不能把大数据分成很多个包后再传? ……………………………… 接下来大多都是无谓的口水了(我板凳看了2小时的热闹) ……………………………… 还是回到LZ这个很单纯的问题上来吧,是记录长度拆分好呢,还是按特定分隔符来拆分好? 从计算机逻辑上来讲,其实没什么区别,都是要有校验有分割,而从人的思维逻辑上讲,我还是喜欢记录长度的方式。 因为如果用分隔符,就要对每个字节都进行转译,发送前转译,收到后还要转译回来,虽然对于编程来说,就是一个简单的循环语句,但在心理上,却有一种对所有数据都要进行变换的这种很麻烦的感觉。 而按长度来,只要处理少数几个字节,真正的数据体是原样copy的,虽然从编程语句上来说没什么变化,但从心理上,却有一种数据就是原样搬来搬去,这样清爽的感觉。 所以,我建议大家用长度。 qq120848369在22楼说的就是大众格式,LZ在26楼也赞同了。 --------原文: 头和尾基本用来做校验, 不是拿来做边界的. 头+类型+长度+数据+尾, 这种结构就可以. 拆包就是: 检验头, 然后拆出类型+长度, 然后根据长度拆数据, 然后检验尾巴. -------- 顺便再啰嗦一下,我自己现在这个应用中,只传输控制命令,而且是加密成128字节的数据段,加解密的算法也是自己写的,本身就有校验,上层逻辑还有数据是否合法的判断, 所以在网络这块什么多余的都不用考虑,就一次128字节的收发,超时设为10秒,有错就重来,连错3次就重连,连续3次重连就拒绝此IP五分钟,逻辑上单纯又直白。 *****************************************************************************/ /**************************************************************************** 一直不理解面试时为什么会问到tcp粘包相关知识,以及业务层对tcp粘包需要做的处理到底有哪些。 然后通过探索,总结,按照自己的思路理解一下: 1:协议栈只是负责把接收到的数据放入应用层缓冲区 ==》自己的误区,tcp是安全的流式传输,缓冲区接收到的数据必然是完整的,有序的,只会涉及到粘包处理,不涉其他,这里要考虑大包的多次接收。 2:获取缓冲区中的包,可能是多个小包粘合再一起的大包 ==》小包的粘包问题,需要再业务层做处理,nagle算法 3:获取缓冲区中的包,可能是一个大包拆分出的多个小包进行传输 ==》默认是MTU大小,协议栈会拆包,这个应该是协议栈做了处理,收到还是一个大包,我们只需要循环接收做业务处理(并不需要组包处理)。 ==》recv时,根据长度用while循环做处理。 4:放入缓冲区中数据的完整性,协议栈是否是一次把一整个包放进来的,会有接收到一个大包,先放入缓冲区一半,有位置再放入下一半?(看看源码吧,协议栈是如何处理的?) ==》recv时,需要用while循环做多次的接收。 综上所述,我们需要关注的问题有: 1:对粘包问题做拆包处理。 2:对协议栈大包拆包后的数据做组包处理。 ==》协议栈会做,业务层不需要关注。 ==》流式可靠顺序传输的 3:recv一次时,不一定能获取到一整个包。 1:recv缓冲区中的数据,一次不一定全获取到包数据,需要根据返回值做处理。 ==》用while+业务实现包的完整性 2:tcp是流式的,安全的,所以面对的是字符流,能保证各个字节按顺序到达,不会乱序。 ==》直接接收,协议栈的拆包多次发送我们不关注。 4:如果要关注业务层的消息机制,消息的完整性,正确性,需要对消息做一定的协议处理。 思考一下处理方案: 1:发送固定长度的字节,不够时补充特定字符如0 2:末尾终结符 如\r\n,如FTP协议 3:区分消息为头部和消息体,收到足够的数据确定包的完整性 4:再应用层做拆包和粘包的处理。 可以参考netty对包进行处理。 总结:tcp是流式安全的,传输接收到的数据必然是顺序的,有序的,但是会有粘包,recv一次性接收不完全的现象,需要我们处理。 *****************************************************************************/ /************************************* 通过测试demo,实现对tcp消息的接收处理: 1:作为服务端,接收客户端主动上报的各种消息。 2:自定义协议,做消息构造,方便消息完整性接收,数据校验,粘包处理 实现方案: 1:实现自定义协议的业务 2:增加tcp服务端业务,进行demo验证。 *************************************/ //触发accept 连接相关的处理 int demo_accept_exec(int epfd, int listenfd); //触发数据接收的相关处理 int demo_recv_exec(ringbuffer_t *ringbuff, int epofd, int connectfd); //创建服务端的fd int demo_init_socket(); //加入epoll 真正的处理入口 int demo_server_exec(int listenfd); //处理接收到的粘包报文 int check_recv_data(char *data, int len); int SetNonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL, 0); if (flags < 0) return flags; flags |= O_NONBLOCK; if (fcntl(fd, F_SETFL, flags) < 0) return -1; return 0; } //创建服务端socket fd int demo_init_socket() { int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd < 0) { printf("create socket fd error. \n"); return -1; } SetNonblock(fd); struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(9000); server_addr.sin_addr.s_addr = htonl(INADDR_ANY); //设置端口可重用 也可以设置缓冲区大小等 int optval = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(int)); bind(fd, (struct sockaddr*)&server_addr, sizeof(server_addr)); if(listen(fd, 20) < 0) { printf("listen sock fd error. \n"); } printf(" listen 9000 port,create socket fd is %d \n", fd); return fd; } //要么用accpet做连接监听,开始处理, //用epoll实现连接与接收的消息通信。 int demo_server_exec(int listenfd) { //创建epfd 并把serverfd加入epfd中 int epfd = -1; { epfd = epoll_create(1); if(epfd == -1) { printf("create epoll error \n"); close(listenfd); return -1; } //把我们的fd加入到epoll中,只监听可读事件,连接处理 struct epoll_event event; event.data.fd = listenfd; event.events = EPOLLIN|EPOLLET; if(epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &event) == -1) { printf("add listenfd to epoll error \n"); close(listenfd); close(epfd); return -1; } } //创建缓冲区 ringbuffer_t * ringbuff = ringbuffer_create(1024*4); if(ringbuff == NULL) { printf("create ringbuff error, ringbuff is null \n"); close(listenfd); close(epfd); return -1; } struct epoll_event event_wait[1024]; int nready = 0; printf("start to exex server: \n"); while(1) { nready = epoll_wait(epfd, event_wait, 1024, 1000); if(nready < 0) { if (errno == EINTR)// 信号被中断 { printf("epoll_wait return and errno is EINTR \n"); continue; } printf("epoll_wait error. \n"); break; }else if(nready == 0) // 超时,继续 { // printf("epoll_wait timeout \n"); continue; } //已经准备就绪的fd for(int i=0; i< nready; i++) { //处理可读 if(event_wait[i].events & EPOLLIN) { if(event_wait[i].data.fd == listenfd) { demo_accept_exec(epfd, listenfd); }else { demo_recv_exec(ringbuff, epfd, event_wait[i].data.fd); } } //这种情况下应该从epoll中移除,并关闭fd //这里如果不是客户端发完就终止的业务,我们是不是不del,只有异常时del if (event_wait[i].events & (EPOLLERR | EPOLLHUP)) { printf("epoll error [EPOLLERR | EPOLLHUP].\n"); epoll_ctl(epfd, EPOLL_CTL_DEL, event_wait[i].data.fd, NULL); close(event_wait[i].data.fd); } } } ringbuffer_destroy(ringbuff); return 0; } //开始进行accept处理并且把连接fd放入epoll中 int demo_accept_exec(int epfd, int listenfd) { struct sockaddr_in cliaddr; socklen_t clilen = sizeof(cliaddr); //进行accept接收,因为是et模式,所以需要循环接收 int clifd = -1; while((clifd = accept(listenfd, (struct sockaddr*)&cliaddr, &clilen)) >= 0) { // if(clifd == -1) // { // if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) // { // /* 资源暂时不可读,再来一遍 */ // break; //这里应该重复读 // } // //这里其实是异常的 // printf(" accept error: [%s]\n", strerror(errno)); // return -1; // } SetNonblock(clifd); //加入epoll struct epoll_event clifd_event; clifd_event.data.fd = clifd; clifd_event.events = EPOLLIN | EPOLLET; //ET模式要循环读 if(epoll_ctl(epfd, EPOLL_CTL_ADD, clifd, &clifd_event) == -1) { printf(" epoll ctl error . \n"); close(clifd); return -1; } printf("accept success. [%s:%d] connected \n", inet_ntoa(cliaddr.sin_addr), ntohs(cliaddr.sin_port)); } return 0; } //这里只是处理了最常规的accept成功加入epoll以及有数据时的加入 // EPOLLERR 是read或者write时知道对端关闭了 // FFFF0D0A<header>len|data<tail>0D0AFEFE int demo_recv_exec(ringbuffer_t *ringbuff, int epfd, int connectfd) { //因为这里用了et模式,所以要循环进行读 //定义一个缓冲区,对数据进行持续接收以及处理 printf("start to recv \n"); char recv_data [1024] = {0}; while(1) { //ET模式要一次性读完,读到缓冲区没有数据 //定义一个内存,每次读取,放入缓冲区中 int datalen = -1; //读取所有的数据 while((datalen = read(connectfd, recv_data, 1024)) > 0) { printf("recv data is [%s] [%lu] [%d]\n", recv_data, strlen(recv_data), datalen); if(ringbuffer_put(ringbuff, recv_data, datalen)== 0) { printf("put to ringbuff success \n"); } memset(recv_data, 0 , 1024); } //应该是对端关闭 关闭fd,从epfd中移除该fd //这里可以改为监听可写 写完后删除 if(datalen == 0) { if(epoll_ctl(epfd, EPOLL_CTL_DEL, connectfd, 0) == -1) { printf("client disconnection error from epoll \n"); }else { printf("client disconnected success,clientfd is [%d] \n", connectfd); } close(connectfd);//这里做epoll的删除和fd的close,客户端在发送时会重连 break; } if(datalen < 0) //接收到最后一个报文的返回,这里循环接收必然触发 { printf("DDDDD recv data len <0 \n"); if (errno == EWOULDBLOCK && errno == EINTR) //不做处理 { continue; } // if(epoll_ctl(epfd, EPOLL_CTL_DEL, connectfd, 0) == -1) // { // printf("client 1 disconnection error from epoll \n"); // }else // { // printf("client 1 disconnected success,clientfd is [%d] \n", connectfd); // } // close(connectfd); break; } } //获取ringbuff中的数据然后进行显示 int ringbuff_data_len = ringbuffer_get_len(ringbuff); if(ringbuff_data_len == -1) { printf("get ringbuff dada error. \n"); return 0; } printf("get ringbuff data len is %d \n", ringbuff_data_len); char * data = (char*)malloc(ringbuff_data_len +1); memset(data, 0, ringbuff_data_len+1); ringbuffer_get(ringbuff, data, ringbuff_data_len+1); printf("get all data is [%ld][%s] \n", strlen(data), data); check_recv_data(data, ringbuff_data_len); //这里对取出来的数据做业务处理 free(data); data =NULL; return 0; //每次读取完一次缓冲区的数据后,对读取到的数据做粘包处理的业务逻辑,应该不涉及丢包的 } int exec_one_package_data(char* data, int len) { char * print_data = malloc(len+1); memset(print_data, 0, len+1); memcpy(print_data, data, len); printf("111 one package data is [%s][%ld][%d] \n", print_data, strlen(print_data), len); free(print_data); return 0; } //对特定格式的数据做业务处理,单包处理 //这里的数据必然是有我们的终结符尾部的数据, int exec_one_data(char* data, int len) { //为了打印 char * print_data = malloc(len+1); memset(print_data, 0, len+1); memcpy(print_data, data, len); printf("one package data is [%s][%ld][%d] \n", print_data, strlen(print_data), len); free(print_data); //真正的业务处理,1:正确提取到内部数据 //我们的业务应该是只走这个 const char * start_str = "FFFF0D0A<header>"; char * ops; ops = strstr(data, start_str); if(ops == data) { //正确的一帧数据,去做正常业务处理 exec_one_package_data(data + strlen("FFFF0D0A<header>"), len -strlen("FFFF0D0A<header><tail>0D0AFEFE")); } if(ops == NULL) { printf("package data is error, not find start data. \n"); for(int i=0; i<len; i++) { printf("%c", *(data+i)); } printf("\n"); } if(ops != data) { printf("recv package data is error. "); for(int i=0; i<len; i++) { printf("%c", *(data+i)); } printf("\n"); exec_one_package_data(ops+ strlen("FFFF0D0A<header>"), len - strlen("FFFF0D0A<header><tail>0D0AFEFE") - (ops-data)); } //至于中间包含header,通过日志分析把 return 0; } //客户端正常逻辑应该是 连接一次 然后发送,发送完后断开。 //如果客户端是长连接 连接一次,一直发送,上面是否能满足条件? //如何实现tcp的长连接,以及如何用session管理 //也可以用mod,在recv后,监听可写,read后监听可读 //如果依赖于tcp的拆包,我们在业务层没有做拆包处理,这里直接循环接收即可, //然后校验头和尾,做完整的包校验即可 //对接收到的数据做校验,拆包处理 int check_recv_data(char *data, int len) { if(len <= strlen("FFFF0D0A<header><tail>0D0AFEFE")) { printf("get data from ringbuff is error, and throw away data : %s.", data); return -1; } //对接收到的数据做拆包处理 int datalen = -1; char * onedata; char * ops; char * temp_data = data; //因为包含着开始的头,所以这里用尾处理更方便 // const char * start_str = "FFFF0D0A<header>"; // while((ops = strstr(temp_data, start_str)) != NULL) // { // datalen = ops - temp_data; // exec_one_data(temp_data, datalen); // temp_data = ops; // } const char * end_str = "<tail>0D0AFEFE"; while((ops = strstr(temp_data, end_str)) != NULL) { datalen = ops - temp_data +strlen(end_str); exec_one_data(temp_data, datalen); temp_data = ops+strlen(end_str); } //有剩下的数据 if(temp_data - data != len) { printf("there is loss data: [%ld][%s] \n", strlen(temp_data), temp_data); } return 0; } //扔到ringbuff,然后每一次扔完,需要做校验处理 //暂定的tcp包的协议是 FFFF0D0A<header>len|data<tail>0D0AFEFE // //1:接收到对端的数据 //2:放入缓冲区中,可以是环形缓冲区 //3:对缓冲区中的数据做校验处理 //4:缓冲区对完整数据做业务处理,不完整的数据做等待或者丢弃处理 int main() { int fd = demo_init_socket(); if(fd < 0) { printf("create fd error is %d \n", fd); return -1; } printf("create fd success is %d \n", fd); //开始循环进行处理 demo_server_exec(fd); printf("main func end\n"); return 0; } //这里假设报文数据够小,每一次对应的缓冲区都能取完,取到异常的数据那就丢弃 //不然,这里的方案就会复杂,需要管理每一个fd对应的缓冲区的ringbuffer