`

nio实现Socket长连接和心跳

 
阅读更多

       前段时间用bio方式,也就是传统io实现了socket的长连接和心跳,总觉着服务端开启多线程管理socket连接的方式过于消耗资源,数据并发的情况下可能会影响到性能,因此就尝试使用nio改进原来的代码。

       然而改进的过程却不像我起初设想的那般容易,可以说一波三折,原因主要是nio读写都是字节流,LZ一开始依然通过ObjectOutputStream.writeObject直接向Socket服务端发送数据,然而问题出现了,每次从ByteBuffer解析出来字节流都不一样,LZ使出浑身解数,一个字节一个字节的读取啊,问题没有了,可是由于是长连接,数据怎么解析啊,查资料,找大神,最后一个网友说有可能是粘包和分包的问题,一时晕菜,LZ网络可是渣渣啊,行吧,恶补一番,想了解的童鞋可以看看这个。http://blog.csdn.net/sunmenggmail/article/details/38952131

       实现原理就像很多协议那样,自定义一套传输协议,比如消息长度(int型,4个字节)+消息体的方式,根据解析的消息长度定长解析消息内容,虽然最后证明LZ的问题不是由于粘包和分包造成的,但是LZ就这样歪打正着,给实现了!!!数据不正常的问题后来通过DataOutputStream和DataInputStream的方式也得到了解决。

       废话多了,帖代码。

服务端:

package com.feng.test.longconnection1;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import org.apache.commons.lang.ArrayUtils;

/**
 * 
 * @author songfeng
 * @version 1.0
 * @since 2015-10-24
 * @category com.feng.test.longconnection
 *
 */
public class Server
{
	private Map<String, Long> heatTimeMap = new HashMap<String, Long>();
	
	public Server(int port)
	{
		Selector selector = null;
		ServerSocketChannel serverChannel = null;
		try
		{
			//获取一个ServerSocket通道
	        serverChannel = ServerSocketChannel.open();
	        serverChannel.configureBlocking(false);
	        serverChannel.socket().bind(new InetSocketAddress(port));
	        //获取通道管理器
	        selector = Selector.open();
	        //将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,
	        //只有当该事件到达时,Selector.select()会返回,否则一直阻塞。
	        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

			while (selector.select() > 0)
			{
				//选择注册过的io操作的事件
				Iterator<SelectionKey> it = selector.selectedKeys().iterator();
				while (it.hasNext())
				{
					SelectionKey readyKey = it.next();
					//删除已选key,防止重复处理
					it.remove();
					if (readyKey.isAcceptable())
					{
						ServerSocketChannel serverSocketChannel = (ServerSocketChannel) readyKey.channel();  
						SocketChannel socketChannel = serverSocketChannel.accept();  

						socketChannel.configureBlocking(false);
						// 连接成功后,注册接收服务器消息的事件
						socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
					}
	                else if(readyKey.isReadable())
					{ 
						SocketChannel socketChannel = (SocketChannel)readyKey.channel();
						
						Object obj = receiveData(socketChannel);
						String msg = "Server back:";
						if(obj instanceof String)
						{
							String id = obj.toString().split(",")[0];
							if(heatTimeMap.get(id) != null 
								&& System.currentTimeMillis() - heatTimeMap.get(id) > 5000)
							{
								socketChannel.socket().close();
							}
							else
							{
								heatTimeMap.put(id, System.currentTimeMillis());
							}
							long time = System.currentTimeMillis();
							msg += time + "\n";
							sendData(socketChannel, msg);
						}
						else if(obj instanceof Pojo)
						{
							msg += ((Pojo)obj).getName() + "\n";
							sendData(socketChannel, msg);
						}
						
					}
				}
			}
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		finally
		{
			try
			{
				selector.close();
				if(serverChannel != null)
				{
					serverChannel.close();
				}
			}
			catch (Exception e)
			{
				e.printStackTrace();
			}
		}
	}
	
	private static Object receiveData(SocketChannel socketChannel)
	{
		Object obj = null;
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		ByteBuffer intBuffer = ByteBuffer.allocate(4);
		ByteBuffer objBuffer = ByteBuffer.allocate(1024);
		int size = 0;
		int sum = 0;
		int objlen = 0;
		byte[] bytes = null;
		try
		{
			while((size = socketChannel.read(intBuffer)) > 0)
			{
				intBuffer.flip();
				bytes = new byte[size];
				intBuffer.get(bytes);
				baos.write(bytes);
				intBuffer.clear();
				if(bytes.length == 4)
				{
					objlen = bytesToInt(bytes,0);
				}
				if(objlen > 0)
				{
					byte[] objByte = new byte[0];
					while(sum != objlen)
					{
						size = socketChannel.read(objBuffer);
						if(size > 0)
						{
							objBuffer.flip();
							bytes = new byte[size];
							objBuffer.get(bytes,0,size);
							baos.write(bytes);
							objBuffer.clear();
							objByte = ArrayUtils.addAll(objByte, bytes);
							sum += bytes.length;
						}
					}
					obj = ByteToObject(objByte);
					break;
				}
			}
            
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		finally
		{
			try
			{
				baos.close();
			}
			catch (Exception e)
			{
				e.printStackTrace();
			}
		}
		return obj;
	}

	private static void sendData(SocketChannel socketChannel,Object obj)
	{
		byte[] bytes = ObjectToByte(obj);
		
		ByteBuffer buffer = ByteBuffer.wrap(bytes);
		try
		{
			socketChannel.write(buffer);
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
	}
	
	 /** 
     * byte数组中取int数值,本方法适用于(低位在前,高位在后)的顺序。
     *  
     * @param ary 
     *            byte数组 
     * @param offset 
     *            从数组的第offset位开始 
     * @return int数值 
     */  
	public static int bytesToInt(byte[] ary, int offset) {
		int value;	
		value = (int) ((ary[offset]&0xFF) 
				| ((ary[offset+1]<<8) & 0xFF00)
				| ((ary[offset+2]<<16)& 0xFF0000) 
				| ((ary[offset+3]<<24) & 0xFF000000));
		return value;
	}
	
	public static Object ByteToObject(byte[] bytes)
	{
		Object obj = null;
		try
		{
			// bytearray to object
			ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
			ObjectInputStream oi = new ObjectInputStream(bi);
			obj = oi.readObject();
			bi.close();
			oi.close();
		}
		catch (Exception e)
		{
			//e.printStackTrace();
		}
		return obj;
	}
	
	public static byte[] ObjectToByte(Object obj)
	{
		byte[] bytes = null;
		try
		{
			// object to bytearray
			ByteArrayOutputStream bo = new ByteArrayOutputStream();
			ObjectOutputStream oo = new ObjectOutputStream(bo);
			oo.writeObject(obj);
			bytes = bo.toByteArray();
			bo.close();
			oo.close();
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		return bytes;
	}
	
	public static void main(String[] args)
	{
		Server server = new Server(55555);
	}
}

 客户端:

package com.feng.test.longconnection1;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;

/**
 * 
 * @author songfeng
 * @version 1.0
 * @since 2015-10-24
 * @category com.feng.test.longconnection
 *
 */
public class Client
{
	private Socket socket;
	
	private String ip;
	
	private int port;
	
	private String id;
	
	DataOutputStream dos;
	
	DataInputStream dis;
	
	public Client(String ip, int port,String id)
	{
		try
		{
			this.ip = ip;
			this.port = port;
			this.id = id;
			this.socket = new Socket(ip, port);
			//this.socket.setKeepAlive(true);
			dos = new DataOutputStream(socket.getOutputStream());
			dis = new DataInputStream(socket.getInputStream());
			new Thread(new heartThread()).start();
			new Thread(new MsgThread()).start();
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
	}
	
	public void sendMsg(Object content)
	{
		try
		{
            int len = ObjectToByte(content).length;  
  
            ByteBuffer dataLenBuf = ByteBuffer.allocate(4);  
            dataLenBuf.order(ByteOrder.LITTLE_ENDIAN);  
            dataLenBuf.putInt(0, len);  
            dos.write(dataLenBuf.array(), 0 , 4);
            dos.flush();
			dos.write(ObjectToByte(content));
			dos.flush();
		}
		catch (Exception e)
		{
			e.printStackTrace();
			closeSocket();
		}
	}
	
	public void closeSocket()
	{
		try
		{
			socket.close();
			dos.close();
			dis.close();
		}
		catch (IOException e)
		{
			e.printStackTrace();
		}
	}
	
	public static byte[] ObjectToByte(Object obj)
	{
		byte[] bytes = null;
		try
		{
			// object to bytearray
			ByteArrayOutputStream bo = new ByteArrayOutputStream();
			ObjectOutputStream oo = new ObjectOutputStream(bo);
			oo.writeObject(obj);
			bytes = bo.toByteArray();
			bo.close();
			oo.close();
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		return bytes;
	}
	
	public static Object ByteToObject(byte[] bytes)
	{
		Object obj = null;
		try
		{
			// bytearray to object
			ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
			ObjectInputStream oi = new ObjectInputStream(bi);
			obj = oi.readObject();
			bi.close();
			oi.close();
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		return obj;
	}

	class heartThread implements Runnable
	{
		@Override
		public void run()
		{
			while(true)
			{
				try
				{
					Thread.sleep(1000);
					long time = System.currentTimeMillis();
					//System.out.println("client send:" + time);
					sendMsg("Client" + id + "," + time);
				}
				catch (Exception e)
				{
					e.printStackTrace();
				}
			}
		}
	}
	
	class MsgThread implements Runnable
	{
		@Override
		public void run()
		{
			int temp;
			while(true)
			{
				try
				{
					if(socket.getInputStream().available() > 0)
					{
						byte[] bytes = new byte[1024];  
				        int len = 0;
						while((char)(temp = dis.read()) != '\n')
						{
							bytes[len]=(byte)temp;  
						    len++; 
						}
						System.out.println(ByteToObject(bytes));
					}
				}
				catch (Exception e)
				{
					closeSocket();
				}
			}
		}
	}
	
	public static void main(String[] args)
	{
		Client client1 = new Client("127.0.0.1", 55555, "1");
		client1.sendMsg(new Pojo("songfeng", 26, new ArrayList<String>()));
		try
		{
			Thread.sleep(500);
		}
		catch (InterruptedException e)
		{
			e.printStackTrace();
		}
		Client client2 = new Client("127.0.0.1", 55555, "2");
	}
}

数据类:

package com.feng.test.longconnection1;

import java.io.Serializable;
import java.util.List;

/**
 * 
 * @author songfeng
 * @version 1.0
 * @since 2015-10-16
 * @category com.feng.test.longconnection
 *
 */
public class Pojo implements Serializable
{

	/**
	 * 序列化
	 */
	private static final long serialVersionUID = -8868529619983791261L;
	
	private String name;
	
	private int age;
	
	private List<String> likeThing;
	
	public Pojo(String name, int age, List<String> likeThing)
	{
		super();
		this.name = name;
		this.age = age;
		this.likeThing = likeThing;
	}

	public String getName()
	{
		return name;
	}
	
	public void setName(String name)
	{
		this.name = name;
	}

	public int getAge()
	{
		return age;
	}
	
	public void setAge(int age)
	{
		this.age = age;
	}
	
	public List<String> getLikeThing()
	{
		return likeThing;
	}
	
	public void setLikeThing(List<String> likeThing)
	{
		this.likeThing = likeThing;
	}
}

 

7
6
分享到:
评论
2 楼 南疆战士 2015-10-26  
qindongliang1922 写道
为啥不用的第三方的RPC协议,例如thrift,probuf,avro

基础的东西还是自己实现一遍的好,不一定要达到商用的目的,但是对原理性的东西可以加深理解。
1 楼 qindongliang1922 2015-10-26  
为啥不用的第三方的RPC协议,例如thrift,probuf,avro

相关推荐

    Socket长连接+心跳包+发送读取

    Socket长连接+心跳包+发送+读取,用到的全在这里了,自己看看哪里不需要的就不要添加了!代码很清晰很明白了!

    java nio 实现socket

    java nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socketjava nio 实现socket

    《NIO与Socket编程技术指南》高洪岩.zip

    非常详细地讲解了NIO中的缓冲区、通道、选择器、编码,以及使用Socket技术实现TCP/IP和UDP编程,细化到了演示全部SocketOption的特性,这对理解基于NIO和Socket技术为基础所开发的NIO框架是非常有好处的,本书以案例...

    基于NIO的socket举例

    基于NIO的socket举例 基于NIO的socket举例 基于NIO的socket举例 基于NIO的socket举例 基于NIO的socket举例基于NIO的socket举例 基于NIO的socket举例

    《NIO与Socket编程技术指南》_高洪岩

    《NIO与Socket编程技术指南》_高洪岩

    java socketNIO 实现多客户端聊天室 代码

    利用socketNIO实现的多客户端聊天室,非阻塞式IO,java代码编写,使用方法:先启动服务端代码再启动客户端代码,可启动多个客户端代码。若使用多个电脑启动客户端,需在客户端代码中更改一下ip地址。

    nio的socket

    nio的socket小玩意儿, 对于初学者有点用处,大家可以相互学习下嘛

    java NIO socket聊天室

    使用NIO socket不需要多线程来处理多个连接的请求,效率非常高 可以作为NIO socket入门的例子,Reactor模式,重点理解key.attach, jar文件里包含了源代码 1,运行server.bat启动服务器,可以打开编辑,修改端口号 ...

    用nio实现异步连接池

    用nio实现异步连接池

    java NIO socket聊天

    java NIO 高性能 socket通讯,服务端采用单线程,降低了cpu的压力,普通io socket通讯,server需要每个连接运行个线程,容易出现问题,效率也低

    java基于NIO实现Reactor模型源码.zip

    java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现Reactor模型源码java基于NIO实现...

    JAVA nio异步长连接服务端与客户端

    JAVA.NIO 异步长连接客户端与服务端都有,大家可以看看,另不知道怎么样将客户端读取的BUFF后的数据进行处理可以给出修改吗?

    使用NIO实现非阻塞socket通信

    Java编写的简易聊天工具,使用NIO实现非阻塞socket通信,使用Java原生sdk实现,可以运行。

    Java NIO Socket基本

    NULL 博文链接:https://b-l-east.iteye.com/blog/1254693

    java nio socket 例子

    本例包含服务器端和客户端,多线程,每线程多次发送,Eclipse工程,启动服务器使用 nu.javafaq.server.NioServer,启动客户端使用 nu.javafaq.client.NioClient。另本例取自javafaq.nv上的程序修改而成

    nio异步长连接服务端与客户端

    JAVA.NIO 异步长连接客户端与服务端都有,大家可以看看,另不知道怎么样将客户端读取的BUFF后的数据进行处理可以给出修改吗?

    Socket 之 BIO、NIO、Netty 简单实现

    《Socket 之 BIO、NIO、Netty 简单实现》博客附件。 博客地址:https://blog.csdn.net/Supreme_Sir/article/details/112725728

    NioSocket,包括server端和client端

    NioSocket,包括server端和client端。server端有自动判定client掉线机制,client端有自动重连机制。本人已在项目实用,未经允许禁止转载!

    基于Java NIO实现五子棋游戏.zip

    基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现五子棋游戏.zip基于Java NIO实现五子棋游戏.zip 基于Java NIO实现...

    采用NIO实现一个Socket服务器

    NULL 博文链接:https://wjy320.iteye.com/blog/2002237

Global site tag (gtag.js) - Google Analytics