含义:
TCP
传输协议是面向流的,没有数据包界限,也就是说消息无边界。客户端向服务端发送数据时,可能将一个完整的报文拆分成多个小报文进行发送,也可能将多个报文合并成一个大的报文进行发送。(TCP协议的底层,并不了解上层业务的具体定义,它会根据TCP缓冲区的实际情况进行包的划分。在业务层面认为一个完整的包,可能会被TCP拆分成多个小包进行发送,也可能把多个小的包封装成一个大的数据包进行发送,这就是所谓的TCP粘包拆包问题。)。
因此就有了拆包和粘包。 在网络通信的过程中,每次可以发送的数据包大小是受多种因素限制的,如 MTU 传输单元大小、滑动窗口等。
所以如果一次传输的网络包数据大小超过传输单元大小,那么我们的数据可能会拆分为多个数据包发送出去。 如果每次请求的网络包数据都很小,比如一共请求了
10000 次,TCP 并不会分别发送 10000 次。 TCP采用的 Nagle(批量发送,主要用于解决频繁发送小数据包而带来的网络拥塞问题)
算法对此作出了优化。
客户端发送了两个数据包P1和P2给服务端,服务端一次读取到的字节数是不确定的,可能存在以下4种情况:
(1)服务端分两次读取到了两个独立的数据包P1和P2,没有发送粘包和拆包;
(2)服务端一次读到
了两个数据包,P1和P2粘在一起,这就是TCP粘包情况;
(3)服务端分两次读取到了两个数据包,第一次读取了完整的P1包和P2包的一部分,第二次读取到了P2包的剩余部分,这被称为TCP拆包;
(4)服务端分两次读取了两个数据包,第一次读取了P1包的一部分,第二次读取到了P1包的剩余部分,这也是TCP拆包;
解决方法:
由于TCP协议底层无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,所以,这个问题只能通过上层的应用层协议设计来解决,常见方案如下:
(1)消息定长,发送方和接收方规定固定大小的消息长度,例如每个报文大小固定为200字节,如果不够,空位补空格;(消息定长法使用非常简单,但是缺点也非常明显,无法很好设定固定长度的值,如果长度太大会造成字节浪费,长度太小又会影响消息传输,所以在一般情况下消息定长法不会被采用。)
(2)在包围增加特殊字符进行分割,例如FTP协议;分隔符的选择一定要避免和消息体中字符相同,以免冲突。否则可能出现错误的消息拆分。比较推荐的做法是将消息进行编码,例如
base64 编码,然后可以选择 64 个编码字符之外的字符作为特定分隔符。
(3)自定义协议,将消息分为消息头和消息体,消息头中包含消息总长度,这样服务端就可以知道每个数据包的具体长度了,知道了发送数据包的具体边界后,就可以解决粘包和拆包问题了;
netty解决粘包拆包问题:
DelimiterBasedFrameDecoder:每个应用层数据包,都通过自定义分隔符,进行分割拆分
LineBasedFrameDecoder:每个应用层数据包,都以换行符作为分隔符,进行分割拆分。
FixedLengthFrameDecoder:每个应用层数据包的拆分都是固定长度大小
LengthFieldBasedFrameDecoder+LengthFieldPrepender:自定义消息长度。 将应用层数据包的长度,作为接收端应用层数据包的拆分依据。按照应用层数据包的大小,进行拆包。这个拆包器有个要求,应用层协议包含数据包长度。(LengthFieldPrepender:待发送消息长度写入到前几个字节)。
笔者本人自研rpc框架的编解码自定义协议:
先读取消息类型(Requst, Response), 序列化方式(原生, json 加上消息长度:防止粘包, 再根据长度读取data.
消息类型(2Byte)序列化方式 2Byte消息长度 4Byte
序列化后的Data….序列化后的Data…序列化后的Data…. 编码 protected void
encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 写入消息类型 if(msg instanceof RPCRequest){
out.writeShort(MessageType.REQUEST.getCode()); } else if(msg instanceof
RPCResponse){ out.writeShort(MessageType.RESPONSE.getCode()); } // 写入序列化方式
out.writeShort(serializer.getType()); // 得到序列化数组 byte[] serialize =
serializer.serialize(msg); // 写入长度 out.writeInt(serialize.length); // 写入序列化字节数组
out.writeBytes(serialize); } 解码 protected void decode(ChannelHandlerContext
ctx, ByteBuf in, List<Object> out) throws Exception { // 1. 读取消息类型 short
messageType = in.readShort(); // 现在还只支持request与response请求 if(messageType !=
MessageType.REQUEST.getCode() && messageType !=
MessageType.RESPONSE.getCode()){ System.out.println("暂不支持此种数据"); return; } //
2. 读取序列化的类型 short serializerType = in.readShort(); // 根据类型得到相应的序列化器 Serializer
serializer = Serializer.getSerializerByCode(serializerType); if(serializer ==
null)throw new RuntimeException("不存在对应的序列化器"); // 3. 读取数据序列化后的字节长度 int length =
in.readInt(); // 4. 读取序列化数组 byte[] bytes = new byte[length];
in.readBytes(bytes); // 用对应的序列化器解码字节数组 Object deserialize =
serializer.deserialize(bytes, messageType); out.add(deserialize); }