做tcp⽹络编程,要解析⼀批批的数据,可是数据是通过Socket连接的InputStream⼀次次读取的,读取到的不是需要转换的对象,⽽是要直接根据字节流和协议来⽣成⾃⼰的数据对象。
按照之前的编程思维,总是请求然后响应,当然Socket也是请求和响应,不过与单纯的请求响应是不同的。
这⾥Socket连接往往是要保持住的,也就是长连接,然后设置⼀个缓冲区,⽹络流不断的追加到缓冲区。然后后台去解析缓冲区的字节流。
如图所⽰,⽹络的流⼀直在传递,我们收到也许是完成的数据流,也可能是没有传递完的。这⾥就需要监视管道,不断读取管道中的流数据,然后向缓冲区追加。程序从头开始解析,如果⽬前缓冲区包含了数据,则解析,没有则放弃继续读取管道流。就算管道中包含了数据,也不⼀定包含了完成的数据。例如,100个字节是⼀个数据体,可是⽬前缓冲区内包含了120个字节,这就是说缓冲区包含了⼀条数据,但是还有没有传递完的字节流。那么就要把前100个字节拿出来解析,然后从缓冲区清除这100个字节。那缓冲区就剩下20个字节了,这些数据可能在下次流中补充完成。如何建⽴缓冲?
/**
* 全局MVB数据缓冲区 占⽤ 1M 内存 */
private static ByteBuffer bbuf = ByteBuffer.allocate(10240); /**
* 线程安全的取得缓冲变量 */
public static synchronized ByteBuffer getByteBuffer() { return bbuf; }
写⼀个Socket客户端,该客户端得到Socket连接,然后读取流,⼀直向缓冲中追加字节流,每次追加后调⽤⼀个⽅法来解析该流
public void run() {
Socket socket = GlobalClientKeep.mvbSocket; if (null != socket) { try {
// 获得mvb连接引⽤
OutputStream ops = socket.getOutputStream(); InputStream ips = socket.getInputStream(); while (true) {
if (null != ops && null != ips) { // 接收返回信息
byte[] bt = StreamTool.inputStreamToByte(ips);
ByteBuffer bbuf = GlobalCommonObjectKeep.getByteBuffer(); // 设置到缓冲区中 bbuf.put(bt);
// //////////////////////////////////////////////////////////////////////// // 拆包解析⽅法 splitByte(ops); ops.flush(); } }
} catch (Exception e) { e.printStackTrace(); }
} else {
// 如果连接存在问题,则必须重新建⽴ GlobalClientKeep.initMvbSocket(); } }
关于如何读取流,我有⼀篇博客专门讲解了所以这⾥是直接调⽤⽅法
byte[] bt = StreamTool.inputStreamToByte(ips);
那么解析⽅法是如何做的?
解析⽅法⾸先获得该缓冲中的所有可⽤字节,然后判断是否符合⼀条数据条件,符合就解析。如果符合两条数据条件,则递归
调⽤⾃⼰。其中每次解析⼀条数据以后,要从缓冲区中清除已经读取的字节信息。
/**
* @说明 拆包解析⽅法 */
public static void splitByte(OutputStream ops) { try {
ByteBuffer bbuf = GlobalCommonObjectKeep.getByteBuffer(); int p = bbuf.position(); int l = bbuf.limit();
// 回绕缓冲区 ⼀是将 curPointer 移到 0, ⼆是将 endPointer 移到有效数据结尾 bbuf.flip();
byte[] byten = new byte[bbuf.limit()]; // 可⽤的字节数量
bbuf.get(byten, bbuf.position(), bbuf.limit()); // 得到⽬前为⽌缓冲区所有的数据 // 进⾏基本检查,保证已经包含了⼀组数据 if (checkByte(byten)) { byte[] len = new byte[4];
// 数组源,数组源拷贝的开始位⼦,⽬标,⽬标填写的开始位⼦,拷贝的长度 System.arraycopy(byten, 0, len, 0, 4);
int length = StreamTool.bytesToInt(len); // 每个字节流的最开始肯定是定义本条数据的长度 byte[] deco = new byte[length]; // deco 就是这条数据体 System.arraycopy(byten, 0, deco, 0, length);
// 判断消息类型,这个应该是从 deco 中解析了,但是下⾯具体的解析内容不再啰嗦 int type = 0;
// 判断类型分类操作 if (type == 1) {
} else if (type == 2) {
} else if (type == 3) {
} else {
System.out.println(\"未知的消息类型,解析结束!\"); // 清空缓存 bbuf.clear(); }
// 如果字节流是多余⼀组数据则递归 if (byten.length > length) {
byte[] temp = new byte[bbuf.limit() - length];
// 数组源,数组源拷贝的开始位⼦,⽬标,⽬标填写的开始位⼦,拷贝的长度 System.arraycopy(byten, length, temp, 0, bbuf.limit() - length); // 情况缓存 bbuf.clear(); // 重新定义缓存 bbuf.put(temp); // 递归回调 splitByte(ops);
}else if(byten.length == length){ // 如果只有⼀条数据,则直接重置缓冲就可以了 // 清空缓存 bbuf.clear(); }
} else {
// 如果没有符合格式包含数据,则还原缓冲变量属性 bbuf.position(p); bbuf.limit(l); }
} catch (Exception e) { e.printStackTrace(); } }
代码只是⼀个参考,主要讲解如何分解缓冲区,和取得缓冲区的⼀条数据,然后清除该数据原来站的空间。
⾄于缓冲区的属性,如何得到缓冲区的数据,为什么要清空,bbuf.flip();是什么意思。下⾯来说⼀下关于ByteBuffer 的⼀下事情。
ByteBuffer 中有⼏个属性,其中有两个很重要。limit和 position。position开始在0,填充数据后等于数据的长度,⽽limit是整个缓冲可⽤的长度。bbuf.flip();之后,position直接变为0,⽽limit直接等于position。JDK源码如下:
/**
* Flips this buffer. The limit is set to the current position and then * the position is set to zero. If the mark is defined then it is * discarded. *
*
After a sequence of channel-read or put operations, invoke
* this method to prepare for a sequence of channel-write or relative * get operations. For example: *
*
** buf.put(magic); // Prepend header
* in.read(buf); // Read data into rest of buffer * buf.flip(); // Flip buffer
* out.write(buf); // Write header + data to channel
*
This method is often used in conjunction with the {@link
* java.nio.ByteBuffer#compact compact} method when transferring data from * one place to another.
** @return This buffer */
public final Buffer flip() { limit = position; position = 0; mark = -1; return this; }
这样,在position和limit之间的数据就是我们要的可⽤数据。
但是position和limit是ByteBuffer在put和get时需要的属性,所以在使⽤后要么还原,要么像上⾯代码⼀样,清除⼀些字节信息然后重置。
ByteBuffer 的get和put不是我们平常的取值和设值⼀样,他会操纵⼀些属性变化。以上就是本⽂的全部内容,希望对⼤家的学习有所帮助,也希望⼤家多多⽀持。
因篇幅问题不能全部显示,请点此查看更多更全内容