Java高级进阶:P2P通信原理深度剖析与完整实现

一、引言

P2P(Peer-to-Peer)通信是现代分布式系统、即时通讯、区块链、文件共享等领域的核心技术。与传统的C/S(客户端-服务器)架构不同,P2P网络中每个节点既是客户端也是服务器,彼此直接通信,消除中心化瓶颈。

然而,P2P通信面临一个核心挑战:NAT穿透。绝大多数设备位于NAT(网络地址转换)之后,没有公网IP,如何让两个内网节点直接建立连接?

本文将深入剖析P2P通信的核心原理,并给出一个基于Java的完整可运行的实现。


二、P2P通信的核心原理

2.1 NAT的类型

NAT(Network Address Translation)将内网IP:端口映射为公网IP:端口。根据映射行为,NAT分为四种类型:

┌─────────────────────────────────────────────────────────┐
│                    NAT 类型分类                          │
├─────────────┬───────────────────────────────────────────┤
│  完全锥型     │ 任何外部主机都能通过映射地址访问内网        │
│  (Full Cone) │                                           │
├─────────────┼───────────────────────────────────────────┤
│  受限锥型     │ 只有内网主机曾经通信过的外部IP才能访问      │
│  (Restricted)│                                           │
├─────────────┼───────────────────────────────────────────┤
│  端口受限锥型  │ 只有内网主机曾通信过的IP:Port才能访问      │
│  (Port Rest) │                                           │
├─────────────┼───────────────────────────────────────────┤
│  对称NAT     │ 同一内网IP:Port发往不同目标IP:Port,        │
│  (Symmetric) │ 映射为不同的公网IP:Port                     │
└─────────────┴───────────────────────────────────────────┘

对称NAT是最严格的类型,TCP穿透几乎不可能完成,但UDP穿透仍有一定成功率。

2.2 UDP打洞原理(UDP Hole Punching)

UDP打洞是P2P通信中最经典的NAT穿透技术,核心思想:

         ┌──────────────────┐
         │   Signalling     │
         │   Server         │
         │  (公网)           │
         └──────┬────┬──────┘
                │    │
         ① 注册  │    │ ② 注册
         告知B   │    │ 告知A
                │    │
         ┌──────┘    └──────┐
         ▼                   ▼
    ┌──────────┐       ┌──────────┐
    │  Peer A  │       │  Peer B  │
    │ 内网:10086│       │ 内网:20086│
    │ 公网:1.2.3│       │ 公网:4.5.6│
    └──────────┘       └──────────┘
         │                   │
         └────────③─────────┘
             互相发送UDP包
           (同时向对方公网地址发)

三步流程

  1. 注册:A和B分别向信令服务器注册,信令服务器记录各自的内外网地址
  2. 地址交换:信令服务器把A的地址告诉B,B的地址告诉A
  3. 同时打洞:A向B的公网地址发UDP包,B也向A的公网地址发UDP包。当A的包到达B的NAT设备时,NAT会认为"这是B之前发过包的那个地址",放行通过——洞打成功

关键在于双方同时发。如果只有一方先发,NAT设备会把包丢弃(因为NAT没有建立映射记录),所以必须并发穿透。

2.3 TCP打洞原理

TCP打洞比UDP复杂得多,核心是同时打开连接(TCP Simultaneous Open)。

// 关键:使用SO_REUSEADDR和SO_REUSEPORT
// 让多个Socket绑定到同一端口
Socket a = new Socket();
a.setReuseAddress(true);
a.bind(new InetSocketAddress(localPort));
// 然后同时向对方发起connect

TCP打洞的成功率和NAT类型强相关,完全锥型和受限锥型成功率较高,对称NAT基本不可行。


三、完整Java实现:P2P即时通讯系统

下面实现一个完整的P2P通信系统,包含三个部分: - 信令服务器(Signalling Server):公网部署,协助节点交换地址 - P2P节点(Peer Node):实现UDP打洞和点对点通信 - 文本协议:自定义简单协议,支持穿透、心跳、消息收发

3.1 协议设计

请求格式:
[操作码(1字节)][数据长度(4字节)][数据]

操作码定义:
0x01 - REGISTER     注册
0x02 - PEER_LIST    获取在线列表
0x03 - HOLE_PUNCH   打洞请求(中继地址)
0x04 - RELAY        中继消息
0x05 - P2P_DATA     直接点对点数据
0x06 - HEARTBEAT    心跳
0x07 - RESPONSE     响应

3.2 信令服务器实现

package com.p2p.signal;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * P2P 信令服务器
 * 部署在公网,负责节点注册、地址交换、打洞协调
 */
public class SignallingServer {

    private static final int PORT = 8888;
    // 在线节点表: username -> PeerInfo(内外网地址)
    private static final ConcurrentHashMap<String, PeerInfo> peers = new ConcurrentHashMap<>();

    public static void main(String[] args) throws IOException {
        DatagramSocket serverSocket = new DatagramSocket(PORT);
        System.out.println("[信令服务器] 启动成功,监听端口: " + PORT);
        System.out.println("[信令服务器] 公网IP: " + getPublicIp());

        // 启动心跳超时检查
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(
            SignallingServer::checkHeartbeat, 30, 15, TimeUnit.SECONDS);

        byte[] buffer = new byte[8192];
        while (true) {
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            serverSocket.receive(packet);

            ByteArrayInputStream bais = new ByteArrayInputStream(packet.getData(), 0, packet.getLength());
            DataInputStream dis = new DataInputStream(bais);

            int opCode = dis.readUnsignedByte();
            int len = dis.readInt();
            byte[] data = new byte[len];
            dis.readFully(data);

            // 获取发送方的公网地址
            InetSocketAddress publicAddr =
                new InetSocketAddress(packet.getAddress(), packet.getPort());

            handleRequest(opCode, data, publicAddr, serverSocket);
        }
    }

    private static void handleRequest(int opCode, byte[] data,
            InetSocketAddress publicAddr, DatagramSocket socket) throws IOException {
        switch (opCode) {
            case 0x01: // REGISTER
                handleRegister(data, publicAddr, socket);
                break;
            case 0x02: // PEER_LIST
                handlePeerList(publicAddr, socket);
                break;
            case 0x03: // HOLE_PUNCH
                handleHolePunch(data, publicAddr, socket);
                break;
            case 0x06: // HEARTBEAT
                handleHeartbeat(data, publicAddr, socket);
                break;
            default:
                System.out.println("[未知操作码] " + opCode);
        }
    }

    private static void handleRegister(byte[] data, InetSocketAddress publicAddr,
            DatagramSocket socket) throws IOException {
        String msg = new String(data, "UTF-8");
        // 格式: username|localAddr|localPort
        String[] parts = msg.split("\\|");
        String username = parts[0];
        String localAddr = parts[1];
        int localPort = Integer.parseInt(parts[2]);

        PeerInfo info = new PeerInfo(
            username, localAddr, localPort,
            publicAddr.getAddress().getHostAddress(),
            publicAddr.getPort(),
            System.currentTimeMillis()
        );

        peers.put(username, info);
        System.out.printf("[注册] %s | 内网 %s:%d | 公网 %s:%d%n",
            username, localAddr, localPort,
            info.publicIp, info.publicPort);

        sendResponse(socket, publicAddr, "OK:" + username + "|" + info.publicIp + "|" + info.publicPort);
    }

    private static void handlePeerList(InetSocketAddress publicAddr,
            DatagramSocket socket) throws IOException {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, PeerInfo> e : peers.entrySet()) {
            if (sb.length() > 0) sb.append(";");
            PeerInfo p = e.getValue();
            sb.append(e.getKey()).append("|")
              .append(p.localIp).append("|").append(p.localPort).append("|")
              .append(p.publicIp).append("|").append(p.publicPort);
        }
        sendResponse(socket, publicAddr, sb.toString());
    }

    private static void handleHolePunch(byte[] data, InetSocketAddress publicAddr,
            DatagramSocket socket) throws IOException {
        String targetUser = new String(data, "UTF-8");
        PeerInfo target = peers.get(targetUser);

        if (target == null) {
            sendResponse(socket, publicAddr, "ERROR:UserNotFound");
            return;
        }

        // 把目标节点的地址信息发给请求方
        String peerAddr = targetUser + "|" + target.localIp + "|" + target.localPort
            + "|" + target.publicIp + "|" + target.publicPort;
        sendResponse(socket, publicAddr, "PUNCH:" + peerAddr);

        // 同时告诉目标节点,请求方的地址
        // 找到请求方是谁
        String requester = null;
        for (Map.Entry<String, PeerInfo> e : peers.entrySet()) {
            InetSocketAddress addr = new InetSocketAddress(
                InetAddress.getByName(e.getValue().publicIp), e.getValue().publicPort);
            if (addr.equals(publicAddr)) {
                requester = e.getKey();
                break;
            }
        }

        if (requester != null) {
            PeerInfo reqInfo = peers.get(requester);
            if (reqInfo != null) {
                String notify = "PUNCH_REQ:" + requester + "|" + reqInfo.localIp + "|" + reqInfo.localPort
                    + "|" + reqInfo.publicIp + "|" + reqInfo.publicPort;
                sendResponse(socket,
                    new InetSocketAddress(InetAddress.getByName(target.publicIp), target.publicPort),
                    notify);
            }
        }
    }

    private static void handleHeartbeat(byte[] data, InetSocketAddress publicAddr,
            DatagramSocket socket) throws IOException {
        String username = new String(data, "UTF-8");
        PeerInfo info = peers.get(username);
        if (info != null) {
            info.lastHeartbeat = System.currentTimeMillis();
        }
        sendResponse(socket, publicAddr, "HEARTBEAT_OK");
    }

    private static void checkHeartbeat() {
        long now = System.currentTimeMillis();
        List<String> expired = new ArrayList<>();
        for (Map.Entry<String, PeerInfo> e : peers.entrySet()) {
            if (now - e.getValue().lastHeartbeat > 60000) { // 60s超时
                expired.add(e.getKey());
            }
        }
        for (String name : expired) {
            peers.remove(name);
            System.out.println("[超时移除] " + name);
        }
    }

    private static void sendResponse(DatagramSocket socket, InetSocketAddress addr,
            String msg) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);

        byte[] data = msg.getBytes("UTF-8");
        dos.writeByte(0x07);  // RESPONSE
        dos.writeInt(data.length);
        dos.write(data);

        DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(),
            addr.getAddress(), addr.getPort());
        socket.send(packet);
    }

    private static String getPublicIp() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        } catch (Exception e) {
            return "unknown";
        }
    }

    static class PeerInfo {
        String username, localIp, publicIp;
        int localPort, publicPort;
        long lastHeartbeat;

        PeerInfo(String username, String localIp, int localPort,
                 String publicIp, int publicPort, long lastHeartbeat) {
            this.username = username;
            this.localIp = localIp;
            this.localPort = localPort;
            this.publicIp = publicIp;
            this.publicPort = publicPort;
            this.lastHeartbeat = lastHeartbeat;
        }
    }
}

3.3 P2P节点实现

P2P节点是架构的核心,它实现了UDP打洞、双通道监听(信令通道 + P2P通道)、自动重连。

package com.p2p.core;

import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;

/**
 * P2P 节点
 * 支持:UDP打洞、NAT穿透检测、消息收发、心跳保活
 */
public class PeerNode {

    private static final int SIGNAL_PORT = 8888;

    private final String username;
    private final String signalHost;
    private final int p2pListenPort;

    private DatagramSocket p2pSocket;       // P2P通信Socket
    private DatagramSocket signalSocket;    // 信令通信Socket
    private final Map<String, PeerAddr> peerCache = new ConcurrentHashMap<>();

    private volatile boolean running = true;
    private Consumer<String> messageHandler;  // 消息回调

    // 已建立P2P连接的节点
    private final Set<String> connectedPeers = ConcurrentHashMap.newKeySet();

    public PeerNode(String username, String signalHost, int p2pListenPort) {
        this.username = username;
        this.signalHost = signalHost;
        this.p2pListenPort = p2pListenPort;
    }

    /**
     * 设置消息回调
     */
    public void onMessage(Consumer<String> handler) {
        this.messageHandler = handler;
    }

    /**
     * 启动P2P节点
     */
    public void start() throws IOException {
        // 创建P2P监听Socket(用于P2P直接通信)
        p2pSocket = new DatagramSocket(p2pListenPort);
        p2pSocket.setSoTimeout(5000);

        // 创建信令Socket(用于与信令服务器通信)
        signalSocket = new DatagramSocket();
        signalSocket.setSoTimeout(3000);

        // 1. 向信令服务器注册
        register();

        // 2. 启动P2P消息监听线程
        new Thread(this::listenP2P, "p2p-listener").start();

        // 3. 启动信令响应监听线程
        new Thread(this::listenSignal, "signal-listener").start();

        // 4. 启动心跳线程
        startHeartbeat();

        System.out.printf("[%s] P2P节点启动完成,监听端口: %d%n", username, p2pListenPort);
        System.out.printf("[%s] 信令服务器: %s:%d%n", username, signalHost, SIGNAL_PORT);
    }

    /**
     * 向信令服务器注册
     */
    private void register() throws IOException {
        String localIp = InetAddress.getLocalHost().getHostAddress();
        String msg = username + "|" + localIp + "|" + p2pListenPort;

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        byte[] data = msg.getBytes("UTF-8");
        dos.writeByte(0x01);  // REGISTER
        dos.writeInt(data.length);
        dos.write(data);

        DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(),
            InetAddress.getByName(signalHost), SIGNAL_PORT);
        signalSocket.send(packet);

        System.out.println("[注册] 等待信令服务器响应...");
    }

    /**
     * 向目标节点发起P2P连接(UDP打洞)
     */
    public void connectToPeer(String targetUsername) throws IOException {
        System.out.printf("[%s] 正在向 %s 发起P2P连接...%n", username, targetUsername);

        // 1. 向信令服务器请求打洞
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        byte[] data = targetUsername.getBytes("UTF-8");
        dos.writeByte(0x03);  // HOLE_PUNCH
        dos.writeInt(data.length);
        dos.write(data);

        DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(),
            InetAddress.getByName(signalHost), SIGNAL_PORT);
        signalSocket.send(packet);
    }

    /**
     * 发送消息(自动选择路径:P2P直连 或 信令中继)
     */
    public void sendMessage(String targetUser, String message) throws IOException {
        if (connectedPeers.contains(targetUser)) {
            sendP2PMessage(targetUser, message);
        } else {
            relayMessage(targetUser, message);
        }
    }

    /**
     * P2P直连发送
     */
    private void sendP2PMessage(String targetUser, String message) throws IOException {
        PeerAddr addr = peerCache.get(targetUser);
        if (addr == null) {
            System.out.println("[发送失败] 未知节点: " + targetUser);
            return;
        }

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        byte[] msgData = (username + ":" + message).getBytes("UTF-8");
        dos.writeByte(0x05);  // P2P_DATA
        dos.writeInt(msgData.length);
        dos.write(msgData);

        // 同时向内外网地址发送(提高穿透成功率)
        DatagramPacket localPkt = new DatagramPacket(baos.toByteArray(), baos.size(),
            InetAddress.getByName(addr.localIp), addr.localPort);
        p2pSocket.send(localPkt);

        DatagramPacket publicPkt = new DatagramPacket(baos.toByteArray(), baos.size(),
            InetAddress.getByName(addr.publicIp), addr.publicPort);
        p2pSocket.send(publicPkt);

        System.out.printf("[%s -> %s] P2P消息已发送: %s%n", username, targetUser, message);
    }

    /**
     * 通过信令服务器中继(兜底方案)
     */
    private void relayMessage(String targetUser, String message) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        String relayData = username + "|" + targetUser + "|" + message;
        byte[] data = relayData.getBytes("UTF-8");
        dos.writeByte(0x04);  // RELAY
        dos.writeInt(data.length);
        dos.write(data);

        DatagramPacket packet = new DatagramPacket(baos.toByteArray(), baos.size(),
            InetAddress.getByName(signalHost), SIGNAL_PORT);
        signalSocket.send(packet);

        System.out.printf("[%s -> %s(中继)] 消息已发送: %s%n", username, targetUser, message);
    }

    /**
     * 监听P2P消息(直接来自其他节点的数据)
     */
    private void listenP2P() {
        byte[] buffer = new byte[8192];
        while (running) {
            try {
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                p2pSocket.receive(packet);

                ByteArrayInputStream bais = new ByteArrayInputStream(
                    packet.getData(), 0, packet.getLength());
                DataInputStream dis = new DataInputStream(bais);

                int opCode = dis.readUnsignedByte();
                int len = dis.readInt();
                byte[] data = new byte[len];
                dis.readFully(data);

                if (opCode == 0x05) {
                    // P2P_DATA: 收到直接消息
                    String msg = new String(data, "UTF-8");
                    System.out.println("[P2P接收] " + msg);
                    if (messageHandler != null) {
                        messageHandler.accept(msg);
                    }
                } else if (opCode == 0x08) {
                    // PUNCH_PROBE: 打洞探测包
                    String probeInfo = new String(data, "UTF-8");
                    PeerAddr addr = parseAddr(probeInfo);
                    if (addr != null) {
                        String user = probeInfo.split("\\|")[0];
                        peerCache.put(user, addr);
                        connectedPeers.add(user);
                        System.out.printf("[打洞成功] 与 %s 的P2P通道已建立%n", user);
                    }
                }
            } catch (SocketTimeoutException e) {
                // 正常超时,继续循环
            } catch (IOException e) {
                if (running) {
                    System.err.println("[P2P监听异常] " + e.getMessage());
                }
            }
        }
    }

    /**
     * 监听信令服务器响应
     */
    private void listenSignal() {
        byte[] buffer = new byte[8192];
        while (running) {
            try {
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                signalSocket.receive(packet);

                ByteArrayInputStream bais = new ByteArrayInputStream(
                    packet.getData(), 0, packet.getLength());
                DataInputStream dis = new DataInputStream(bais);

                int opCode = dis.readUnsignedByte();
                int len = dis.readInt();
                byte[] data = new byte[len];
                dis.readFully(data);

                String response = new String(data, "UTF-8");
                handleSignalResponse(response);

            } catch (SocketTimeoutException e) {
                // 正常超时
            } catch (IOException e) {
                if (running) {
                    System.err.println("[信令监听异常] " + e.getMessage());
                }
            }
        }
    }

    /**
     * 处理信令服务器响应
     */
    private void handleSignalResponse(String response) throws IOException {
        System.out.println("[信令响应] " + response);

        if (response.startsWith("PUNCH:")) {
            // 收到打洞地址,开始穿透
            String addrStr = response.substring(6);
            startHolePunch(addrStr);

        } else if (response.startsWith("PUNCH_REQ:")) {
            // 有人要打洞过来,我们也同时打回去
            String addrStr = response.substring(10);
            startHolePunch(addrStr);

        } else if (response.startsWith("RELAY:")) {
            // 收到中继消息
            String relayMsg = response.substring(6);
            if (messageHandler != null) {
                messageHandler.accept("[中继]" + relayMsg);
            }
        }
    }

    /**
     * 执行UDP打洞
     */
    private void startHolePunch(String addrStr) throws IOException {
        // 格式: username|localIp|localPort|publicIp|publicPort
        String[] parts = addrStr.split("\\|");
        String remoteUser = parts[0];
        String localIp = parts[1];
        int localPort = Integer.parseInt(parts[2]);
        String publicIp = parts[3];
        int publicPort = Integer.parseInt(parts[4]);

        PeerAddr remoteAddr = new PeerAddr(localIp, localPort, publicIp, publicPort);
        peerCache.put(remoteUser, remoteAddr);

        System.out.printf("[打洞] 向 %s 发起穿透: 内网 %s:%d, 公网 %s:%d%n",
            remoteUser, localIp, localPort, publicIp, publicPort);

        // 关键:构造探测包,同时向内网地址和公网地址发送
        // 这是UDP打洞成功的关键所在
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        String probeInfo = username + "|" + getLocalIp() + "|" + p2pListenPort;
        byte[] probeData = probeInfo.getBytes("UTF-8");
        dos.writeByte(0x08);  // PUNCH_PROBE
        dos.writeInt(probeData.length);
        dos.write(probeData);

        byte[] packetBytes = baos.toByteArray();

        // 向对端公网地址连续发送多个包(提高穿透成功率)
        DatagramPacket pubPkt = new DatagramPacket(packetBytes, packetBytes.length,
            InetAddress.getByName(publicIp), publicPort);

        // 同时也尝试内网地址(针对同一内网的情况)
        DatagramPacket lanPkt = new DatagramPacket(packetBytes, packetBytes.length,
            InetAddress.getByName(localIp), localPort);

        // 并发发送打洞包,每个方向发3轮
        for (int i = 0; i < 3; i++) {
            p2pSocket.send(pubPkt);
            p2pSocket.send(lanPkt);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }

        connectedPeers.add(remoteUser);
        System.out.printf("[打洞] 已向 %s 发送穿透包,等待对方响应...%n", remoteUser);
    }

    /**
     * 启动心跳
     */
    private void startHeartbeat() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DataOutputStream dos = new DataOutputStream(baos);
                byte[] hbData = username.getBytes("UTF-8");
                dos.writeByte(0x06);
                dos.writeInt(hbData.length);
                dos.write(hbData);

                DatagramPacket pkt = new DatagramPacket(baos.toByteArray(), baos.size(),
                    InetAddress.getByName(signalHost), SIGNAL_PORT);
                signalSocket.send(pkt);
            } catch (IOException e) {
                // 心跳失败,静默处理
            }
        }, 10, 15, TimeUnit.SECONDS);
    }

    /**
     * 获取在线节点列表
     */
    public void listPeers() throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        byte[] data = new byte[0];
        dos.writeByte(0x02);
        dos.writeInt(0);
        dos.write(data);

        DatagramPacket pkt = new DatagramPacket(baos.toByteArray(), baos.size(),
            InetAddress.getByName(signalHost), SIGNAL_PORT);
        signalSocket.send(pkt);
    }

    public String getLocalIp() {
        try {
            return InetAddress.getLocalHost().getHostAddress();
        } catch (Exception e) {
            return "127.0.0.1";
        }
    }

    public void shutdown() {
        running = false;
        if (p2pSocket != null && !p2pSocket.isClosed()) p2pSocket.close();
        if (signalSocket != null && !signalSocket.isClosed()) signalSocket.close();
    }

    private PeerAddr parseAddr(String data) {
        String[] parts = data.split("\\|");
        if (parts.length >= 5) {
            return new PeerAddr(parts[1], Integer.parseInt(parts[2]),
                parts[3], Integer.parseInt(parts[4]));
        }
        return null;
    }

    static class PeerAddr {
        String localIp, publicIp;
        int localPort, publicPort;

        PeerAddr(String localIp, int localPort, String publicIp, int publicPort) {
            this.localIp = localIp;
            this.localPort = localPort;
            this.publicIp = publicIp;
            this.publicPort = publicPort;
        }
    }
}

3.4 启动入口

package com.p2p;

import com.p2p.core.PeerNode;
import com.p2p.signal.SignallingServer;

import java.util.Scanner;

/**
 * P2P聊天演示启动器
 *
 * 使用步骤:
 * 1. 在公网服务器运行: java -cp . com.p2p.signal.SignallingServer
 * 2. 节点A运行: java com.p2p.Launcher nodeA <serverIp> 9001
 * 3. 节点B运行: java com.p2p.Launcher nodeB <serverIp> 9002
 * 4. 在节点A控制台输入: /connect nodeB
 * 5. 打通后可直接发消息
 */
public class Launcher {

    public static void main(String[] args) throws Exception {
        if (args.length < 3) {
            System.out.println("用法:");
            System.out.println("  启动信令服务器: java com.p2p.Launcher server");
            System.out.println("  启动P2P节点:   java com.p2p.Launcher <用户名> <信令服务器IP> <P2P端口>");
            return;
        }

        if (args[0].equals("server")) {
            SignallingServer.main(new String[]{});
            return;
        }

        String username = args[0];
        String signalHost = args[1];
        int p2pPort = Integer.parseInt(args[2]);

        PeerNode node = new PeerNode(username, signalHost, p2pPort);
        node.onMessage(msg -> System.out.println("\n[收到消息] " + msg));
        node.start();

        // 控制台交互
        Scanner scanner = new Scanner(System.in);
        System.out.println("\n===== P2P 聊天控制台 =====");
        System.out.println("命令列表:");
        System.out.println("  /connect <用户名>  - 发起P2P连接");
        System.out.println("  /list              - 查看在线节点");
        System.out.println("  /msg <用户> <内容>  - 发送消息");
        System.out.println("  /status            - 查看连接状态");
        System.out.println("  /quit              - 退出");
        System.out.println("===========================\n");

        while (true) {
            String input = scanner.nextLine().trim();
            if (input.isEmpty()) continue;

            if (input.equals("/quit")) {
                node.shutdown();
                break;
            } else if (input.equals("/list")) {
                node.listPeers();
            } else if (input.startsWith("/connect ")) {
                String target = input.substring(9).trim();
                node.connectToPeer(target);
            } else if (input.startsWith("/msg ")) {
                String rest = input.substring(5).trim();
                int spaceIdx = rest.indexOf(" ");
                if (spaceIdx > 0) {
                    String target = rest.substring(0, spaceIdx);
                    String msg = rest.substring(spaceIdx + 1);
                    node.sendMessage(target, msg);
                }
            } else if (input.equals("/status")) {
                System.out.println("[状态] 当前节点: " + username);
            } else {
                System.out.println("[未知命令] 输入 /help 查看帮助");
            }
        }

        scanner.close();
    }
}

四、NAT穿透的核心技术难点

4.1 端口预测

对称NAT下,相同内网地址发往不同目标会产生不同映射端口。但部分NAT设备的端口分配有规律(如递增)。通过向信令服务器发送探测包收集映射端口序列,可以预测发往对端的映射端口:

/**
 * 端口预测算法 - 用于对称NAT场景
 * 通过多次探测信令服务器,分析端口分配规律
 */
public class PortPredictor {

    private final DatagramSocket socket;
    private final String signalHost;
    private final int signalPort;

    public PortPredictor(DatagramSocket socket, String signalHost, int signalPort) {
        this.socket = socket;
        this.signalHost = signalHost;
        this.signalPort = signalPort;
    }

    /**
     * 预测连接到对端时的公网端口
     */
    public int predictPort(String targetIp, int targetPort, int samples) throws IOException {
        List<Integer> ports = new ArrayList<>();

        for (int i = 0; i < samples; i++) {
            // 向不同目标发送探测,观察端口分配
            ports.add(probePort(InetAddress.getByName("8.8.8.8"), 53 + i));
        }

        return analyzePattern(ports);
    }

    private int probePort(InetAddress addr, int port) throws IOException {
        // 发送一个空包,观察NAT分配的源端口
        byte[] dummy = new byte[1];
        DatagramPacket pkt = new DatagramPacket(dummy, dummy.length, addr, port);
        socket.send(pkt);
        return socket.getLocalPort();
    }

    private int analyzePattern(List<Integer> ports) {
        // 简单的线性预测:计算平均增量
        if (ports.size() < 2) return ports.get(0);
        int sumDelta = 0;
        for (int i = 1; i < ports.size(); i++) {
            sumDelta += ports.get(i) - ports.get(i - 1);
        }
        int avgDelta = sumDelta / (ports.size() - 1);
        return ports.get(ports.size() - 1) + avgDelta;
    }
}

4.2 TCP穿透的SO_REUSEADDR技巧

TCP打洞不如UDP成熟,但通过SO_REUSEADDR可以让多个Socket绑定到同一端口进行同时连接:

ServerSocket server = new ServerSocket();
server.setReuseAddress(true);
server.bind(new InetSocketAddress(localPort));

// 同时发起对外连接
Socket outgoing = new Socket();
outgoing.setReuseAddress(true);
outgoing.bind(new InetSocketAddress(localPort));
// 同时connect...

五、生产级P2P架构设计参考

5.1 连接状态机

         ┌──────────┐
         │  INIT    │ 初始状态
         └────┬─────┘
              │ 注册成功
         ┌────▼─────┐
         │ READY    │ 等待连接
         └────┬─────┘
              │ 发起/接收打洞
         ┌────▼─────┐
         │ PUNCHING │ 打洞中
         └────┬─────┘
       ┌──────┴──────┐
       ▼              ▼
  ┌─────────┐   ┌─────────┐
  │CONNECTED│   │ FAILED  │
  │ P2P直连  │   │ 打洞失败 │
  └─────────┘   └────┬────┘
       │              │ 使用中继
       ▼              ▼
  ┌─────────┐   ┌─────────┐
  │ RELAY   │   │         │
  │ 中继模式  │   └─────────┘
  └─────────┘

5.2 架构演进路线

阶段一:基础P2P
  公网信令服务器 + UDP打洞

阶段二:增强穿透
  多路并发打洞 + 端口预测 + TURN中继兜底

阶段三:生产集群
  信令服务器集群 + NAT类型数据库 +  
  智能路由选择(P2P/中继/中转)

六、完整运行测试

6.1 本地测试(模拟公网环境)

# 终端1:启动信令服务器
javac com/p2p/signal/SignallingServer.java
java com.p2p.signal.SignallingServer

# 终端2:启动NodeA
javac com/p2p/core/PeerNode.java
java com.p2p.Launcher nodeA 127.0.0.1 9001

# 终端3:启动NodeB
java com.p2p.Launcher nodeB 127.0.0.1 9002

# 在NodeA控制台:查看在线列表
/list

# 在NodeA控制台:连接NodeB
/connect nodeB

# 在NodeA控制台:发送消息
/msg nodeB 你好,P2P连接已建立!

6.2 跨NAT测试(真实场景)

将信令服务器部署到公网云服务器(如阿里云、腾讯云),两个P2P节点分别位于不同网络环境(如家庭宽带、公司网络、手机热点),即可验证真实的NAT穿透效果。


七、总结

核心概念 说明
信令服务器 部署在公网,仅做地址交换和协调,不中转数据
UDP打洞 双方同时向对方NAT发送UDP包,建立NAT映射
双通道策略 同时向内网和公网地址发送数据,提高成功率
中继兜底 P2P穿透失败时降级为信令服务器中继
心跳保活 NAT映射有超时时间(通常30秒~2分钟),需要心跳维持

P2P通信的核心思想是"去中心化",让数据在两端之间直接流动。本文实现的UDP打洞方案在大部分网络环境下(除对称NAT外)都能成功建立P2P通道。对称NAT场景则需要配合端口预测或TURN中继服务。

实际生产环境建议使用成熟的P2P库如Netty,并结合ICE(Interactive Connectivity Establishment)框架进行完整的NAT类型探测和穿透策略选择。


本文代码已上传至GitHub,完整项目包含ANT构建脚本和Docker化部署方案。如果你觉得文章有帮助,欢迎点赞、收藏、转发。

更多推荐