netty demo 保证可以运行
netty demo地址 https://gitee.com/Kkk9527/netty-dome.git1,运行ObjectServerMain.java2,运行ObjectClientMain.java3,看效果netty项目demo pom需要引入<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
·
netty demo地址 https://gitee.com/Code2013/netty-dome.git
1,运行src\main\java\com\lk\object\dome\server\ObjectServerMain.java
2,运行src\main\java\com\lk\object\dome\client\ObjectClientMain.java
3,看效果
自己搭建demo所需类
pom需要引入
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.5.Final</version>
</dependency>
服务器启动端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* 启动服务
* @author gyn
*
*/
public class ObjectServerMain {
private int port;
public ObjectServerMain(int port) {
this.port = port;
}
public static void main(String[] args) {
new ObjectServerMain(2000).run();
}
public void run() {
EventLoopGroup acceptor = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.option(ChannelOption.SO_BACKLOG, 1024).group(acceptor, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() { // 匿名内部类的方式
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectDecoder(1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectServerHandler());
}
});
try {
System.out.println("服务器已启动");
// 服务器绑定端口监听
Channel channel = bootstrap.bind(port).sync().channel();
// 监听服务器关闭监听
channel.closeFuture().sync();
System.out.println("服务器guanbi");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 退出
acceptor.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
服务器的Handler
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ObjectServerHandler extends SimpleChannelInboundHandler<Object> {
private static HashMap<Integer, Channel> hm = new HashMap<Integer, Channel>();
/**
* 负责接收发送 服務器的接收發送方法 接到新的
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object obj) throws Exception {
Channel channel = ctx.channel();
// 接收方法
MyObject myObject = (MyObject) obj;
System.out.println("我是服务端");
System.out.println("收到客户端 [" + channel.remoteAddress() + "] " + "rece: " + myObject);
int id = myObject.getId();
// 保存客户端 關聯user
if (!hm.containsKey(myObject.getId())) {
hm.put(myObject.getId(), channel);
}
/**
* 發送給需要的user
*/
java.util.Iterator<Entry<Integer, Channel>> iter = hm.entrySet().iterator();
while (iter.hasNext()) {
System.out.println("进入循环map");
Map.Entry entry = (Map.Entry) iter.next();
int key = (Integer) entry.getKey();
System.out.println("id=" + id);
System.out.println("key=" + key);
if (id != key) {
channel = (Channel) entry.getValue();
// 发送方法
channel.writeAndFlush("发给客户端ID" + key);
System.out.println("发给客户端ID" + key);
} else {
System.out.println("不需要发送给自己");
}
}
System.out.println("处理完毕----------------------");
}
/**
* 第一次接通会调用此发送方法 通知对方连接成功 异步发送
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
System.out.println("收到客户端 [" + channel.remoteAddress() + "] " + "上线");
MyObject newObjext = new MyObject();
newObjext.setId(1);
newObjext.setMessage("我知道客户端上线了");
ctx.writeAndFlush(newObjext);
}
}
客户端服务端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* 客户端
* @author gyn
*/
public class ObjectClientMain {
private String host;
private int port;
public ObjectClientMain(String host, int port) {
this.host = host;
this.port = port;
}
public static void main(String[] args) {
ObjectClientHandler.ID=10;
new ObjectClientMain("127.0.0.1", 2000).run();
}
public void run() {
EventLoopGroup worker = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectDecoder(1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectClientHandler());
}
});
try {
bootstrap.connect(host, port).sync().channel();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("connect fail");
System.exit(1);
}
}
}
客户端的Handler
import java.util.Random;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class ObjectClientHandler extends SimpleChannelInboundHandler<Object> {
/**
* 负责接收发送
*/
@Override
protected void channelRead0(ChannelHandlerContext arg0, Object obj) throws Exception {
//Thread.sleep(5 * 1000);
Channel channel = arg0.channel();
System.out.println("我是客户端");
// 接收方法
// MyObject myObject = (MyObject) obj;
System.out.println("收到服务端 [" + channel.remoteAddress() + "] " + "client receive" + obj);
// MyObject newObjext = new MyObject();
// newObjext.setId(myObject.getId() + 1);
// newObjext.setMessage("客户端说我接受到服务器的信息了");
// // 发送方法
// arg0.writeAndFlush(newObjext);
}
public static int ID;
/**
* 第一次接通会调用此发送方法 通知对方连接成功 异步发送
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("我是客户端"+ID);
// channel连接成功时发送
StringBuilder sb = new StringBuilder();
MyObject myObject = new MyObject();
sb.append("我是客户端"+ID);
myObject.setId(ID);
myObject.setMessage(sb.toString());
ctx.write(myObject);
ctx.flush();
System.out.println("客户端发送 send seccess");
}
}
消息的实体类
import java.io.Serializable;
/**
* 用来模拟传输的载体,例子中的信息只有id,message
* @author gyn
*/
public class MyObject implements Serializable {
//一定要序列化!!!
private static final long serialVersionUID = 1L;
private int id;
private String message;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "MyObject [id=" + id + ", message=" + message + "]";
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)