netty demo地址 https://gitee.com/Code2013/netty-dome.git

1,运行src\main\java\com\lk\object\dome\server\ObjectServerMain.java
2,运行src\main\java\com\lk\object\dome\client\ObjectClientMain.java
3,看效果

自己搭建demo所需类
pom需要引入

		<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.5.Final</version>
		</dependency>

服务器启动端


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * 启动服务
 * @author gyn
 * 
 */
public class ObjectServerMain {

	private int port;

	public ObjectServerMain(int port) {
		this.port = port;
	}

	public static void main(String[] args) {
		new ObjectServerMain(2000).run();
		
		
	}

	public void run() {
		EventLoopGroup acceptor = new NioEventLoopGroup();
		EventLoopGroup worker = new NioEventLoopGroup();
		ServerBootstrap bootstrap = new ServerBootstrap();
		bootstrap.option(ChannelOption.SO_BACKLOG, 1024).group(acceptor, worker)
				.channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() { // 匿名内部类的方式
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new ObjectDecoder(1024 * 1024,
								ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
						pipeline.addLast(new ObjectEncoder());
						pipeline.addLast(new ObjectServerHandler());
					}

				});

		try {
			System.out.println("服务器已启动");
			// 服务器绑定端口监听
			Channel channel = bootstrap.bind(port).sync().channel();
			// 监听服务器关闭监听
			channel.closeFuture().sync();
			System.out.println("服务器guanbi");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			// 退出
			acceptor.shutdownGracefully();
			worker.shutdownGracefully();
		}
	}
}

服务器的Handler

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ObjectServerHandler extends SimpleChannelInboundHandler<Object> {
	private static HashMap<Integer, Channel> hm = new HashMap<Integer, Channel>();

	/**
	 * 负责接收发送 服務器的接收發送方法 接到新的
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object obj) throws Exception {
		Channel channel = ctx.channel();
		// 接收方法
		MyObject myObject = (MyObject) obj;
		System.out.println("我是服务端");
		System.out.println("收到客户端 [" + channel.remoteAddress() + "] " + "rece: " + myObject);
		int id = myObject.getId();
		// 保存客户端 關聯user
		if (!hm.containsKey(myObject.getId())) {
			hm.put(myObject.getId(), channel);
		}
		/**
		 * 發送給需要的user
		 */
		java.util.Iterator<Entry<Integer, Channel>> iter = hm.entrySet().iterator();
		while (iter.hasNext()) {
			System.out.println("进入循环map");
			Map.Entry entry = (Map.Entry) iter.next();
			int key = (Integer) entry.getKey();
			System.out.println("id=" + id);
			System.out.println("key=" + key);
			if (id != key) {
				channel = (Channel) entry.getValue();
				// 发送方法
				channel.writeAndFlush("发给客户端ID" + key);
				System.out.println("发给客户端ID" + key);
			} else {
				System.out.println("不需要发送给自己");
			}
		}
		System.out.println("处理完毕----------------------");

	}

	/**
	 * 第一次接通会调用此发送方法 通知对方连接成功 异步发送
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		Channel channel = ctx.channel();
		System.out.println("收到客户端 [" + channel.remoteAddress() + "] " + "上线");
		MyObject newObjext = new MyObject();
		newObjext.setId(1);
		newObjext.setMessage("我知道客户端上线了");
		ctx.writeAndFlush(newObjext);

	}

}

客户端服务端



import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

/**
 * 客户端
 * @author gyn
 */
public class ObjectClientMain {

	private String host;
	private int port;

	public ObjectClientMain(String host, int port) {
		this.host = host;
		this.port = port;
	}

	public static void main(String[] args) {
	
		ObjectClientHandler.ID=10;
		new ObjectClientMain("127.0.0.1", 2000).run();
			
		
		
	}

	public void run() {
		EventLoopGroup worker = new NioEventLoopGroup();
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(worker).channel(NioSocketChannel.class)
				.handler(new ChannelInitializer<Channel>() {
					@Override
					protected void initChannel(Channel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new ObjectDecoder(1024 * 1024,
								ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
						pipeline.addLast(new ObjectEncoder());
						pipeline.addLast(new ObjectClientHandler());
					}
				});
		try {
			bootstrap.connect(host, port).sync().channel();
		} catch (InterruptedException e) {
			e.printStackTrace();
			System.out.println("connect fail");
			System.exit(1);
		}
	}
}

客户端的Handler


import java.util.Random;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class ObjectClientHandler extends SimpleChannelInboundHandler<Object> {
	/**
	 * 负责接收发送
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext arg0, Object obj) throws Exception {
		//Thread.sleep(5 * 1000);
		Channel channel = arg0.channel();
		System.out.println("我是客户端");
		// 接收方法
	//	MyObject myObject = (MyObject) obj;
		System.out.println("收到服务端 [" + channel.remoteAddress() + "] " + "client receive" + obj);
//		MyObject newObjext = new MyObject();
//		newObjext.setId(myObject.getId() + 1);
//		newObjext.setMessage("客户端说我接受到服务器的信息了");
//		// 发送方法
//		arg0.writeAndFlush(newObjext);
	}
	public static int ID;
	/**
	 * 第一次接通会调用此发送方法 通知对方连接成功 异步发送
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("我是客户端"+ID);
		// channel连接成功时发送

		StringBuilder sb = new StringBuilder();

		MyObject myObject = new MyObject();
		sb.append("我是客户端"+ID);
		myObject.setId(ID);
		myObject.setMessage(sb.toString());
		ctx.write(myObject);
		ctx.flush();
		System.out.println("客户端发送 send seccess");
	}

}

消息的实体类


import java.io.Serializable;

/**
 * 用来模拟传输的载体,例子中的信息只有id,message
 * @author gyn
 */
public class MyObject implements Serializable {
	//一定要序列化!!!
	private static final long serialVersionUID = 1L;
	private int id;
	private String message;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getMessage() {
		return message;
	}
	public void setMessage(String message) {
		this.message = message;
	}
	@Override
	public String toString() {
		return "MyObject [id=" + id + ", message=" + message + "]";
	}
	
}

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐