在这里插入图片描述

肖哥弹架构 跟大家“弹弹” BIO/NIO/AIO设计与实战应用,需要代码关注

欢迎 关注,点赞,留言。

关注公号Solomon肖哥弹架构获取更多精彩内容

历史热点文章

基于 Java AIO 构建的智能家居网关系统,轻松支持 10,000+ 设备并发连接,指令延迟 <100ms!本文从架构设计到代码实现,手把手教你开发高性能物联网中枢平台,包含:
完整项目源码:服务端 + 设备模拟器(智能灯/空调)
AIO 核心技术:异步非阻塞 I/O + 自定义二进制协议
性能优化秘籍:零拷贝、无锁设计、心跳检测
真实业务场景:设备认证、状态上报、指令下发

1. 项目概述

智能家居控制网关系统是基于Java AIO(Asynchronous I/O)技术构建的高性能物联网中枢平台

1.1 业务场景

  • 集中管理智能家居设备(灯光、空调、窗帘等)
  • 实时接收设备状态上报
  • 异步响应手机APP控制指令
  • 支持设备异常告警

1.2 技术指标

指标
支持设备数 ≥10,000
指令延迟 <100ms
协议 自定义二进制协议
消息吞吐 50,000 msg/s

1.3 项目根目录结构

smart-home-gateway/
├── pom.xml                     # Maven项目配置文件
├── README.md                    # 项目说明文档
├── src/
│   ├── main/
│   │   ├── java/                # 主Java源代码
│   │   │   └── com/
│   │   │       └── smartgateway/
│   │   ├── resources/           # 资源配置文件
│   │   └── assembly/            # 打包配置
│   └── test/                    # 测试代码
├── docs/                        # 项目文档
└── scripts/                     # 部署脚本

2. 系统架构图

2.1 项目架构图

在这里插入图片描述

2.2 手机APP客户端

在这里插入图片描述

  • 功能组件
    • 控制模块:处理用户操作(如开关灯)
    • API客户端:封装RESTful/WebSocket调用
    • 消息监听:实时接收设备状态推送

2.3 设备端

在这里插入图片描述

  • 关键行为
    • 设备认证流程
    • 定期发送心跳包
    • 随机生成状态上报

2.4 协议交互流程

2.4.1 设备注册流程

在这里插入图片描述

2.4.2 指令下发流程

在这里插入图片描述

2.5 部署拓扑图

在这里插入图片描述

3. 完整服务端代码

3.1 主服务入口

import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * 智能家居网关主服务
 * 核心功能:
 * 1. 设备长连接管理
 * 2. 指令异步下发
 * 3. 状态实时监控
 */
public class SmartHomeGateway {
    private static final int PORT = 8888;
    private static final int BUFFER_SIZE = 1024;
    
    // 设备会话管理 <设备ID, 会话>
    private static final ConcurrentHashMap<String, DeviceSession> deviceSessions 
        = new ConcurrentHashMap<>();
    
    // 指令回调队列
    private static final BlockingQueue<CommandTask> commandQueue 
        = new LinkedBlockingQueue<>();
    
    public static void main(String[] args) throws Exception {
        // 1. 启动指令处理线程池
        ExecutorService commandExecutor = Executors.newFixedThreadPool(4);
        startCommandDispatcher(commandExecutor);
        
        // 2. 初始化AIO服务端
        AsynchronousServerSocketChannel server = 
            AsynchronousServerSocketChannel.open()
                .bind(new InetSocketAddress(PORT));
        
        System.out.printf("""
            ==============================
            智能家居网关启动
            版本: v2.1
            端口: %d
            协议: SHGP-v1.2
            ==============================
            %n""", PORT);

        // 3. 接受设备连接
        server.accept(null, new CompletionHandler<>() {
            @Override
            public void completed(AsynchronousSocketChannel channel, Object attachment) {
                // 继续接受新连接
                server.accept(null, this);
                
                // 创建设备会话
                DeviceSession session = new DeviceSession(channel);
                ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
                
                // 异步读取设备数据
                channel.read(buffer, buffer, new DeviceReadHandler(session));
                
                System.out.printf("[设备连接] 会话ID: %s%n", session.getSessionId());
            }
            
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.err.println("接受连接失败: " + exc.getMessage());
            }
        });
        
        // 保持主线程
        Thread.currentThread().join();
    }
    
    /**
     * 启动指令分发线程
     */
    private static void startCommandDispatcher(ExecutorService executor) {
        new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    CommandTask task = commandQueue.take();
                    executor.submit(() -> processCommand(task));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }
    
    /**
     * 处理控制指令
     */
    private static void processCommand(CommandTask task) {
        DeviceSession session = deviceSessions.get(task.getDeviceId());
        if (session != null && session.getChannel().isOpen()) {
            try {
                ByteBuffer cmdBuffer = ProtocolEncoder.encodeCommand(
                    task.getCommandType(), 
                    task.getPayload()
                );
                
                session.getChannel().write(cmdBuffer).get(1, TimeUnit.SECONDS);
                task.getCallback().onSuccess();
            } catch (Exception e) {
                task.getCallback().onFailure(e);
            }
        } else {
            task.getCallback().onFailure(new Exception("设备离线"));
        }
    }
    
    // 内部类:设备会话
    private static class DeviceSession {
        private final String sessionId;
        private final AsynchronousSocketChannel channel;
        private String deviceId;
        private long lastHeartbeat;
        
        public DeviceSession(AsynchronousSocketChannel channel) {
            this.sessionId = UUID.randomUUID().toString();
            this.channel = channel;
            this.lastHeartbeat = System.currentTimeMillis();
        }
        
        // getters & setters...
    }
    
    // 内部类:指令任务
    private static class CommandTask {
        private final String deviceId;
        private final String commandType;
        private final byte[] payload;
        private final CommandCallback callback;
        
        // constructor & getters...
    }
    
    // 回调接口
    public interface CommandCallback {
        void onSuccess();
        void onFailure(Throwable t);
    }
}

3.2 协议处理器

/**
 * 设备数据读取处理器
 */
class DeviceReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    private final SmartHomeGateway.DeviceSession session;
    
    public DeviceReadHandler(SmartHomeGateway.DeviceSession session) {
        this.session = session;
    }
    
    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        if (result == -1) { // 连接关闭
            closeSession();
            return;
        }
        
        buffer.flip();
        try {
            // 1. 协议解码
            ProtocolDecoder.DeviceMessage msg = 
                ProtocolDecoder.decode(buffer);
            
            // 2. 处理消息类型
            switch (msg.getType()) {
                case HEARTBEAT:
                    handleHeartbeat(msg);
                    break;
                case STATUS_REPORT:
                    handleStatusReport(msg);
                    break;
                case AUTH_REQUEST:
                    handleAuth(msg);
                    break;
            }
            
            // 3. 继续读取下个消息
            buffer.clear();
            session.getChannel().read(buffer, buffer, this);
        } catch (ProtocolException e) {
            System.err.println("协议错误: " + e.getMessage());
            closeSession();
        }
    }
    
    @Override
    public void failed(Throwable exc, ByteBuffer buffer) {
        System.err.println("读取失败: " + exc.getMessage());
        closeSession();
    }
    
    private void handleHeartbeat(ProtocolDecoder.DeviceMessage msg) {
        session.setLastHeartbeat(System.currentTimeMillis());
        System.out.printf("[心跳] 设备: %s%n", session.getDeviceId());
    }
    
    private void handleAuth(ProtocolDecoder.DeviceMessage msg) {
        String deviceId = new String(msg.getPayload());
        session.setDeviceId(deviceId);
        SmartHomeGateway.deviceSessions.put(deviceId, session);
        
        // 返回认证成功
        ByteBuffer ack = ProtocolEncoder.encodeAck(true);
        session.getChannel().write(ack);
    }
    
    private void closeSession() {
        if (session.getDeviceId() != null) {
            SmartHomeGateway.deviceSessions.remove(session.getDeviceId());
        }
        try {
            session.getChannel().close();
        } catch (IOException e) {
            System.err.println("关闭会话失败: " + e.getMessage());
        }
    }
}

3.3 协议编解码器

/**
 * 智能家居网关协议编解码
 * 协议格式:
 * +--------+--------+--------+--------+--------+
 * | 魔数(2) | 版本(1)| 类型(1)| 长度(2) | 数据(N)|
 * +--------+--------+--------+--------+--------+
 */
class ProtocolDecoder {
    public static DeviceMessage decode(ByteBuffer buffer) throws ProtocolException {
        // 校验魔数
        if (buffer.getShort() != 0x55AA) {
            throw new ProtocolException("无效魔数");
        }
        
        byte version = buffer.get();
        byte type = buffer.get();
        int length = buffer.getShort() & 0xFFFF;
        
        byte[] payload = new byte[length];
        buffer.get(payload);
        
        return new DeviceMessage(version, MessageType.fromValue(type), payload);
    }
    
    public enum MessageType {
        HEARTBEAT(0x01),
        AUTH_REQUEST(0x02),
        STATUS_REPORT(0x03);
        
        private final byte value;
        // constructor & getter...
    }
    
    public static class DeviceMessage {
        private final byte version;
        private final MessageType type;
        private final byte[] payload;
        // constructor & getters...
    }
}

class ProtocolEncoder {
    public static ByteBuffer encodeCommand(String type, byte[] payload) {
        ByteBuffer buffer = ByteBuffer.allocate(6 + payload.length);
        buffer.putShort((short) 0x55AA);
        buffer.put((byte) 1); // version
        buffer.put(MessageType.COMMAND.getValue());
        buffer.putShort((short) payload.length);
        buffer.put(payload);
        buffer.flip();
        return buffer;
    }
}

4. 客户端

4.1 智能灯客户端代码

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.TimeUnit;

/**
 * 智能灯设备模拟器
 * 功能:
 * 1. 连接网关并认证
 * 2. 接收开关/调光指令
 * 3. 定时上报状态
 */
public class SmartLightSimulator {
    private static final String GATEWAY_IP = "localhost";
    private static final int GATEWAY_PORT = 8888;
    private static final String DEVICE_ID = "LIGHT-001";
    private static final String DEVICE_KEY = "light-secret-123";
    
    private boolean powerOn = false;
    private int brightness = 50; // 亮度百分比
    private int colorTemp = 4000; // 色温(K)

    public static void main(String[] args) throws Exception {
        new SmartLightSimulator().start();
    }

    public void start() throws Exception {
        // 1. 连接网关
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        channel.connect(new InetSocketAddress(GATEWAY_IP, GATEWAY_PORT), null, 
            new CompletionHandler<Void, Void>() {
                @Override
                public void completed(Void result, Void attachment) {
                    System.out.println("连接网关成功");
                    sendAuthPacket(channel); // 发送认证
                    startReading(channel);   // 开始接收指令
                    startStatusReport(channel); // 定时上报状态
                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    System.err.println("连接失败: " + exc.getMessage());
                }
            });
        
        // 保持主线程
        Thread.currentThread().join();
    }

    /**
     * 发送设备认证包
     */
    private void sendAuthPacket(AsynchronousSocketChannel channel) {
        String authData = DEVICE_ID + "|" + DEVICE_KEY;
        ByteBuffer buffer = ByteBuffer.wrap(("AUTH|" + authData + "\n").getBytes());
        channel.write(buffer, buffer, new CompletionHandler<>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                System.out.println("认证请求已发送");
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.err.println("发送认证失败: " + exc.getMessage());
            }
        });
    }

    /**
     * 开始接收网关指令
     */
    private void startReading(AsynchronousSocketChannel channel) {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer, buffer, new CompletionHandler<>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                if (result == -1) {
                    System.out.println("连接已关闭");
                    return;
                }

                attachment.flip();
                String command = new String(attachment.array(), 0, attachment.limit());
                processCommand(command.trim());
                
                // 继续读取下个指令
                buffer.clear();
                channel.read(buffer, buffer, this);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.err.println("读取指令失败: " + exc.getMessage());
            }
        });
    }

    /**
     * 处理控制指令
     */
    private void processCommand(String command) {
        System.out.println("收到指令: " + command);
        
        if (command.startsWith("POWER|")) {
            powerOn = command.split("\|")[1].equals("ON");
            System.out.println("灯光状态: " + (powerOn ? "开启" : "关闭"));
        } 
        else if (command.startsWith("BRIGHTNESS|")) {
            brightness = Integer.parseInt(command.split("\|")[1]);
            System.out.println("亮度调整为: " + brightness + "%");
        }
        else if (command.startsWith("COLORTEMP|")) {
            colorTemp = Integer.parseInt(command.split("\|")[1]);
            System.out.println("色温调整为: " + colorTemp + "K");
        }
    }

    /**
     * 定时上报设备状态
     */
    private void startStatusReport(AsynchronousSocketChannel channel) {
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(10);
                    
                    String status = String.format("STATUS|%s|%d|%d|%d\n",
                        powerOn ? "ON" : "OFF",
                        brightness,
                        colorTemp,
                        System.currentTimeMillis() / 1000);
                    
                    ByteBuffer buffer = ByteBuffer.wrap(status.getBytes());
                    channel.write(buffer);
                    System.out.println("状态已上报: " + status.trim());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

4.1 智能空调客户端代码

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.TimeUnit;

/**
 * 智能空调设备模拟器
 * 功能:
 * 1. 连接网关并认证
 * 2. 接收温度/模式指令
 * 3. 模拟温度变化
 */
public class SmartACSimulator {
    private static final String GATEWAY_IP = "localhost";
    private static final int GATEWAY_PORT = 8888;
    private static final String DEVICE_ID = "AC-001";
    private static final String DEVICE_KEY = "ac-secret-456";
    
    private boolean powerOn = false;
    private int currentTemp = 26; // 当前温度
    private int targetTemp = 26;  // 设定温度
    private String mode = "COOL"; // COOL/HEAT/AUTO

    public static void main(String[] args) throws Exception {
        new SmartACSimulator().start();
    }

    public void start() throws Exception {
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
        channel.connect(new InetSocketAddress(GATEWAY_IP, GATEWAY_PORT), null, 
            new CompletionHandler<Void, Void>() {
                @Override
                public void completed(Void result, Void attachment) {
                    System.out.println("空调连接网关成功");
                    sendAuthPacket(channel);
                    startReading(channel);
                    startTempSimulation(channel);
                }

                @Override
                public void failed(Throwable exc, Void attachment) {
                    System.err.println("空调连接失败: " + exc.getMessage());
                }
            });
        
        Thread.currentThread().join();
    }

    private void sendAuthPacket(AsynchronousSocketChannel channel) {
        String authData = DEVICE_ID + "|" + DEVICE_KEY;
        ByteBuffer buffer = ByteBuffer.wrap(("AUTH|" + authData + "\n").getBytes());
        channel.write(buffer);
    }

    private void startReading(AsynchronousSocketChannel channel) {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer, buffer, new CompletionHandler<>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                attachment.flip();
                String command = new String(attachment.array(), 0, attachment.limit());
                processCommand(command.trim());
                
                buffer.clear();
                channel.read(buffer, buffer, this);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                System.err.println("空调读取指令失败: " + exc.getMessage());
            }
        });
    }

    /**
     * 处理空调控制指令
     */
    private void processCommand(String command) {
        System.out.println("空调收到指令: " + command);
        
        if (command.startsWith("POWER|")) {
            powerOn = command.split("\|")[1].equals("ON");
            System.out.println("空调电源: " + (powerOn ? "开启" : "关闭"));
        } 
        else if (command.startsWith("SETTEMP|")) {
            targetTemp = Integer.parseInt(command.split("\|")[1]);
            System.out.println("设定温度: " + targetTemp + "°C");
        }
        else if (command.startsWith("SETMODE|")) {
            mode = command.split("\|")[1];
            System.out.println("运行模式: " + mode);
        }
    }

    /**
     * 模拟温度变化并上报
     */
    private void startTempSimulation(AsynchronousSocketChannel channel) {
        new Thread(() -> {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(5);
                    
                    // 模拟温度变化
                    if (powerOn) {
                        if (mode.equals("COOL") {
                            currentTemp = Math.max(targetTemp, currentTemp - 1);
                        } else if (mode.equals("HEAT")) {
                            currentTemp = Math.min(targetTemp, currentTemp + 1);
                        }
                    }
                    
                    // 上报状态
                    String status = String.format("STATUS|%s|%d|%d|%s\n",
                        powerOn ? "ON" : "OFF",
                        currentTemp,
                        targetTemp,
                        mode);
                    
                    channel.write(ByteBuffer.wrap(status.getBytes()));
                    System.out.println("空调状态: " + status.trim());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

5. 关键设计解析

5.1 AIO 核心机制

在这里插入图片描述

5.2 业务特性实现

功能 实现方案
设备认证 首次连接发送设备ID
心跳检测 定时检查lastHeartbeat
指令队列 BlockingQueue + 线程池
状态上报 异步写入Redis

5.3 性能优化点

  1. 无锁设计
    • 使用 ConcurrentHashMap 管理会话
    • 指令队列避免同步阻塞
  2. 零拷贝优化
    • 直接操作ByteBuffer
    • 避免数据多次序列化
  3. 资源控制
    • 限制指令线程池大小
    • 心跳超时自动断开

6. 关键组件说明

6.1 服务端核心

组件 技术实现 功能
连接接收器 AsynchronousServerSocketChannel 处理10K+并发设备连接
协议解码器 自定义二进制协议 解析设备数据帧
设备管理器 ConcurrentHashMap<String, DeviceSession> 维护在线设备会话
指令队列 LinkedBlockingQueue + 线程池 异步化指令处理

6.2 客户端实现

客户端类型 技术栈 通信方式
Android/iOS APP Kotlin/Swift + Retrofit WebSocket + MQTT
Web控制台 Vue.js + Axios RESTful API
设备模拟器 Java NIO 自定义TCP协议

7. 设备-网关交互协议说明

7.1 协议格式

方向 指令格式 示例
设备→网关 `AUTH <device_id> ` `AUTH LIGHT-001 light-secret-123`
设备→网关 `STATUS …` `STATUS ON 50 4000`
网关→设备 `POWER <ON/OFF>` `POWER ON`
网关→设备 `SETTEMP <温度>` `SETTEMP 24`

7.2 状态码定义

设备类型 状态参数
智能灯 `STATUS <ON/OFF> <亮度%> <色温K>`
智能空调 `STATUS <ON/OFF> <当前温度> <目标温度> <模式>`

8. 测试场景

8.1 测试智能灯

# 启动智能灯模拟器
java SmartLightSimulator

# 预期输出
连接网关成功
认证请求已发送
收到指令: POWER|ON
灯光状态: 开启
收到指令: BRIGHTNESS|75
亮度调整为: 75%
状态已上报: STATUS|ON|75|4000|1634567890

8.2 测试智能空调

# 启动空调模拟器
java SmartACSimulator

# 预期输出
空调连接网关成功
空调收到指令: POWER|ON
空调电源: 开启
空调收到指令: SETTEMP|22
设定温度: 22°C
空调状态: STATUS|ON|24|22|COOL

9. 部署建议

  1. 高可用部署
    # 启动多个实例 + Nginx负载均衡
    java -Xms2g -Xmx2g SmartHomeGateway
    
  2. 监控集成
    // 添加Micrometer指标
    Metrics.gauge("active.devices", deviceSessions::size);
    
  3. 安全增强
    • TLS加密通信
    • 设备双向认证
  4. 协议升级
    • 支持Protobuf二进制协议
    • 添加压缩功能
Logo

纵情码海钱塘涌,杭州开发者创新动! 属于杭州的开发者社区!致力于为杭州地区的开发者提供学习、合作和成长的机会;同时也为企业交流招聘提供舞台!

更多推荐