go简单信令服务
 

package main

import (
	"encoding/json"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
)

type SignalMessage map[string]interface{}

type Client struct {
	UserID string
	Conn   *websocket.Conn
	Send   chan []byte
}

var (
	clients   = make(map[string]*Client) // userId -> client
	clientsMu sync.RWMutex

	upgrader = websocket.Upgrader{
		ReadBufferSize:  1024,
		WriteBufferSize: 1024,
		// DEMO: 允许所有来源;生产必须校验 Origin
		CheckOrigin: func(r *http.Request) bool { return true },
	}
)

const (
	writeWait      = 10 * time.Second
	pongWait       = 60 * time.Second
	pingPeriod     = (pongWait * 9) / 10
	maxMessageSize = 1024 * 64
)

func main() {
	http.HandleFunc("/ws", wsHandler)
	http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
		w.WriteHeader(http.StatusOK)
		_, _ = w.Write([]byte("ok"))
	})

	addr := ":8080"
	log.Printf("signal server started at ws://0.0.0.0%s/ws", addr)
	log.Fatal(http.ListenAndServe(addr, nil))
}

func wsHandler(w http.ResponseWriter, r *http.Request) {
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("upgrade error:", err)
		return
	}

	client := &Client{
		Conn: conn,
		Send: make(chan []byte, 256),
	}

	go client.writePump()
	client.readPump()
}

func (c *Client) readPump() {
	defer func() {
		c.cleanup()
	}()

	c.Conn.SetReadLimit(maxMessageSize)
	_ = c.Conn.SetReadDeadline(time.Now().Add(pongWait))
	c.Conn.SetPongHandler(func(string) error {
		return c.Conn.SetReadDeadline(time.Now().Add(pongWait))
	})

	for {
		_, data, err := c.Conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				log.Printf("read error user=%s err=%v", c.UserID, err)
			}
			break
		}

		var msg SignalMessage
		if err := json.Unmarshal(data, &msg); err != nil {
			log.Printf("invalid json: %v", err)
			continue
		}

		msgType, _ := msg["type"].(string)

		switch msgType {
		case "login":
			// {type:"login", from:"alice"}
			from, _ := msg["from"].(string)
			if from == "" {
				c.sendJSON(SignalMessage{
					"type":  "error",
					"error": "login.from required",
				})
				continue
			}
			c.bindUser(from)
			c.sendJSON(SignalMessage{
				"type": "login_ok",
				"from": "server",
				"to":   from,
			})

		default:
			// 透传:必须有 to
			to, _ := msg["to"].(string)
			if to == "" {
				c.sendJSON(SignalMessage{
					"type":  "error",
					"error": "field 'to' required",
				})
				continue
			}
			forwardToUser(to, data)
		}
	}
}

func (c *Client) writePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		_ = c.Conn.Close()
	}()

	for {
		select {
		case message, ok := <-c.Send:
			_ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				_ = c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			if err := c.Conn.WriteMessage(websocket.TextMessage, message); err != nil {
				return
			}

		case <-ticker.C:
			_ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

func (c *Client) bindUser(userID string) {
	c.UserID = userID

	clientsMu.Lock()
	defer clientsMu.Unlock()

	// 挤掉旧连接(同一账号后登录)
	if old, ok := clients[userID]; ok && old != c {
		close(old.Send)
		_ = old.Conn.Close()
	}
	clients[userID] = c
	log.Printf("user online: %s", userID)
}

func (c *Client) cleanup() {
	if c.UserID != "" {
		clientsMu.Lock()
		if cur, ok := clients[c.UserID]; ok && cur == c {
			delete(clients, c.UserID)
			log.Printf("user offline: %s", c.UserID)
		}
		clientsMu.Unlock()
	}

	close(c.Send)
	_ = c.Conn.Close()
}

func (c *Client) sendJSON(v interface{}) {
	b, err := json.Marshal(v)
	if err != nil {
		return
	}
	select {
	case c.Send <- b:
	default:
		// 发送队列满,断开慢连接
		close(c.Send)
	}
}

func forwardToUser(to string, raw []byte) {
	clientsMu.RLock()
	target, ok := clients[to]
	clientsMu.RUnlock()

	if !ok {
		// 对端不在线可忽略,或按需做离线通知
		return
	}

	select {
	case target.Send <- raw:
	default:
		// 对端过慢,断开
		close(target.Send)
	}
}

flutter客户端调试

import 'dart:convert';
import 'package:flutter/material.dart';
import 'package:flutter_webrtc/flutter_webrtc.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
//https://28xin.com/ 藏宝库
void main() {
  runApp(const MyApp());
}

/// =====================
/// 你的信令服务器地址
/// =====================
const String kSignalingUrl = 'ws://YOUR_SIGNAL_SERVER:8080/ws';
const String kSelfId = 'alice'; // 当前用户ID,实际项目请登录后动态赋值
const String kPeerId = 'bob';   // 对端用户ID,实际项目从通讯录/会话选择

class MyApp extends StatelessWidget {
  const MyApp({super.key});

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      title: 'Flutter WebRTC P2P Demo',
      theme: ThemeData(useMaterial3: true, colorSchemeSeed: Colors.blue),
      home: const CallPage(),
    );
  }
}

enum CallState {
  idle,
  calling,
  ringing,
  connecting,
  connected,
  ended,
}

class CallPage extends StatefulWidget {
  const CallPage({super.key});

  @override
  State<CallPage> createState() => _CallPageState();
}

class _CallPageState extends State<CallPage> {
  final RTCVideoRenderer _localRenderer = RTCVideoRenderer();
  final RTCVideoRenderer _remoteRenderer = RTCVideoRenderer();

  WebSocketChannel? _ws;
  RTCPeerConnection? _pc;
  MediaStream? _localStream;

  CallState _state = CallState.idle;
  bool _micEnabled = true;
  bool _camEnabled = true;

  final List<RTCIceCandidate> _remoteCandidatesBuffer = [];

  // 生产请使用你自己的 STUN/TURN
  final Map<String, dynamic> _pcConfig = {
    'iceServers': [
      {'urls': 'stun:stun.l.google.com:19302'},
      // {
      //   'urls': 'turn:your.turn.server:3478?transport=udp',
      //   'username': 'user',
      //   'credential': 'pass',
      // },
      // {
      //   'urls': 'turns:your.turn.server:5349?transport=tcp',
      //   'username': 'user',
      //   'credential': 'pass',
      // }
    ],
    'sdpSemantics': 'unified-plan',
  };

  @override
  void initState() {
    super.initState();
    _initAll();
  }

  Future<void> _initAll() async {
    await _localRenderer.initialize();
    await _remoteRenderer.initialize();
    await _connectSignal();
    await _createLocalStream();
  }

  Future<void> _connectSignal() async {
    _ws = WebSocketChannel.connect(Uri.parse(kSignalingUrl));

    _ws!.stream.listen((event) async {
      final Map<String, dynamic> msg = jsonDecode(event);
      await _onSignal(msg);
    }, onDone: () {
      debugPrint('signal closed');
    }, onError: (e) {
      debugPrint('signal error: $e');
    });

    _send({
      'type': 'login',
      'from': kSelfId,
    });
  }

  Future<void> _createLocalStream() async {
    final mediaConstraints = <String, dynamic>{
      'audio': true,
      'video': {
        'facingMode': 'user',
        'width': {'ideal': 1280},
        'height': {'ideal': 720},
        'frameRate': {'ideal': 24},
      }
    };

    _localStream = await navigator.mediaDevices.getUserMedia(mediaConstraints);
    _localRenderer.srcObject = _localStream;
    setState(() {});
  }

  Future<void> _createPeerConnectionIfNeeded() async {
    if (_pc != null) return;

    _pc = await createPeerConnection(_pcConfig);

    // 添加本地轨道
    if (_localStream != null) {
      for (var track in _localStream!.getTracks()) {
        await _pc!.addTrack(track, _localStream!);
      }
    }

    _pc!.onIceCandidate = (candidate) {
      _send({
        'type': 'webrtc_candidate',
        'from': kSelfId,
        'to': kPeerId,
        'candidate': candidate.toMap(),
      });
    };

    _pc!.onTrack = (event) {
      if (event.streams.isNotEmpty) {
        _remoteRenderer.srcObject = event.streams.first;
        setState(() {});
      }
    };

    _pc!.onConnectionState = (state) {
      debugPrint('pc connection state: $state');
      if (state == RTCPeerConnectionState.RTCPeerConnectionStateConnected) {
        setState(() => _state = CallState.connected);
      }
      if (state == RTCPeerConnectionState.RTCPeerConnectionStateFailed ||
          state == RTCPeerConnectionState.RTCPeerConnectionStateDisconnected ||
          state == RTCPeerConnectionState.RTCPeerConnectionStateClosed) {
        setState(() => _state = CallState.ended);
      }
    };
  }

  Future<void> startCall() async {
    await _createPeerConnectionIfNeeded();
    setState(() => _state = CallState.calling);

    _send({
      'type': 'call_invite',
      'from': kSelfId,
      'to': kPeerId,
      'media': 'video',
    });

    final offer = await _pc!.createOffer({
      'offerToReceiveAudio': 1,
      'offerToReceiveVideo': 1,
    });
    await _pc!.setLocalDescription(offer);

    _send({
      'type': 'webrtc_offer',
      'from': kSelfId,
      'to': kPeerId,
      'sdp': offer.sdp,
    });

    setState(() => _state = CallState.connecting);
  }

  Future<void> _acceptCallAndAnswer(String offerSdp, String from) async {
    await _createPeerConnectionIfNeeded();

    await _pc!.setRemoteDescription(
      RTCSessionDescription(offerSdp, 'offer'),
    );

    for (final c in _remoteCandidatesBuffer) {
      await _pc!.addCandidate(c);
    }
    _remoteCandidatesBuffer.clear();

    final answer = await _pc!.createAnswer({
      'offerToReceiveAudio': 1,
      'offerToReceiveVideo': 1,
    });
    await _pc!.setLocalDescription(answer);

    _send({
      'type': 'webrtc_answer',
      'from': kSelfId,
      'to': from,
      'sdp': answer.sdp,
    });

    setState(() => _state = CallState.connecting);
  }

  Future<void> _onSignal(Map<String, dynamic> msg) async {
    final type = msg['type'];

    switch (type) {
      case 'call_invite':
        // 收到来电
        if (msg['to'] == kSelfId) {
          setState(() => _state = CallState.ringing);
          _showIncomingDialog(msg['from'] as String);
        }
        break;

      case 'call_reject':
        if (msg['to'] == kSelfId) {
          _showToast('对方已拒绝');
          await _endCallLocal();
        }
        break;

      case 'call_hangup':
        if (msg['to'] == kSelfId) {
          _showToast('对方已挂断');
          await _endCallLocal();
        }
        break;

      case 'webrtc_offer':
        if (msg['to'] == kSelfId) {
          final from = msg['from'] as String;
          final sdp = msg['sdp'] as String;
          // 自动接听(你也可以改成用户点击“接听”再执行)
          await _acceptCallAndAnswer(sdp, from);
        }
        break;

      case 'webrtc_answer':
        if (msg['to'] == kSelfId) {
          final sdp = msg['sdp'] as String;
          await _pc?.setRemoteDescription(
            RTCSessionDescription(sdp, 'answer'),
          );
        }
        break;

      case 'webrtc_candidate':
        if (msg['to'] == kSelfId) {
          final c = msg['candidate'];
          final candidate = RTCIceCandidate(
            c['candidate'],
            c['sdpMid'],
            c['sdpMLineIndex'],
          );

          final hasRemoteDesc = _pc?.getRemoteDescription() != null;
          if (hasRemoteDesc) {
            await _pc?.addCandidate(candidate);
          } else {
            _remoteCandidatesBuffer.add(candidate);
          }
        }
        break;
    }
  }

  void _send(Map<String, dynamic> data) {
    _ws?.sink.add(jsonEncode(data));
  }

  Future<void> hangup() async {
    _send({
      'type': 'call_hangup',
      'from': kSelfId,
      'to': kPeerId,
    });
    await _endCallLocal();
  }

  Future<void> _endCallLocal() async {
    await _pc?.close();
    _pc = null;
    _remoteRenderer.srcObject = null;
    _remoteCandidatesBuffer.clear();
    setState(() => _state = CallState.ended);

    await Future.delayed(const Duration(milliseconds: 300));
    if (mounted) setState(() => _state = CallState.idle);
  }

  void toggleMic() {
    if (_localStream == null) return;
    _micEnabled = !_micEnabled;
    for (var t in _localStream!.getAudioTracks()) {
      t.enabled = _micEnabled;
    }
    setState(() {});
  }

  void toggleCamera() {
    if (_localStream == null) return;
    _camEnabled = !_camEnabled;
    for (var t in _localStream!.getVideoTracks()) {
      t.enabled = _camEnabled;
    }
    setState(() {});
  }

  Future<void> switchCamera() async {
    final videoTracks = _localStream?.getVideoTracks();
    if (videoTracks != null && videoTracks.isNotEmpty) {
      await Helper.switchCamera(videoTracks.first);
    }
  }

  void _showIncomingDialog(String from) {
    showDialog(
      context: context,
      barrierDismissible: false,
      builder: (_) => AlertDialog(
        title: const Text('来电'),
        content: Text('$from 邀请你视频通话'),
        actions: [
          TextButton(
            onPressed: () {
              Navigator.pop(context);
              _send({
                'type': 'call_reject',
                'from': kSelfId,
                'to': from,
              });
              setState(() => _state = CallState.idle);
            },
            child: const Text('拒绝'),
          ),
          FilledButton(
            onPressed: () {
              Navigator.pop(context);
              // 真正 offer 在 webrtc_offer 里处理,这里只更新状态
              setState(() => _state = CallState.connecting);
            },
            child: const Text('接听'),
          ),
        ],
      ),
    );
  }

  void _showToast(String text) {
    ScaffoldMessenger.of(context).showSnackBar(SnackBar(content: Text(text)));
  }

  @override
  void dispose() {
    _ws?.sink.close();
    _pc?.close();
    _localStream?.dispose();
    _localRenderer.dispose();
    _remoteRenderer.dispose();
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    final remoteReady = _remoteRenderer.srcObject != null;

    return Scaffold(
      appBar: AppBar(
        title: Text('P2P视频通话 - ${_state.name}'),
      ),
      body: Stack(
        children: [
          Positioned.fill(
            child: Container(
              color: Colors.black,
              child: remoteReady
                  ? RTCVideoView(
                      _remoteRenderer,
                      objectFit: RTCVideoViewObjectFit.RTCVideoViewObjectFitCover,
                    )
                  : const Center(
                      child: Text(
                        '等待远端视频…',
                        style: TextStyle(color: Colors.white70),
                      ),
                    ),
            ),
          ),
          Positioned(
            right: 12,
            top: 12,
            width: 120,
            height: 180,
            child: Container(
              decoration: BoxDecoration(
                border: Border.all(color: Colors.white24),
                borderRadius: BorderRadius.circular(8),
              ),
              clipBehavior: Clip.antiAlias,
              child: RTCVideoView(
                _localRenderer,
                mirror: true,
                objectFit: RTCVideoViewObjectFit.RTCVideoViewObjectFitCover,
              ),
            ),
          ),
        ],
      ),
      bottomNavigationBar: SafeArea(
        child: Padding(
          padding: const EdgeInsets.fromLTRB(12, 8, 12, 12),
          child: Wrap(
            alignment: WrapAlignment.center,
            spacing: 12,
            runSpacing: 8,
            children: [
              FilledButton.icon(
                onPressed: (_state == CallState.idle || _state == CallState.ended)
                    ? startCall
                    : null,
                icon: const Icon(Icons.call),
                label: const Text('发起'),
              ),
              FilledButton.tonalIcon(
                onPressed: (_state == CallState.connecting || _state == CallState.connected || _state == CallState.calling)
                    ? hangup
                    : null,
                icon: const Icon(Icons.call_end),
                label: const Text('挂断'),
              ),
              OutlinedButton.icon(
                onPressed: toggleMic,
                icon: Icon(_micEnabled ? Icons.mic : Icons.mic_off),
                label: Text(_micEnabled ? '麦克风开' : '麦克风关'),
              ),
              OutlinedButton.icon(
                onPressed: toggleCamera,
                icon: Icon(_camEnabled ? Icons.videocam : Icons.videocam_off),
                label: Text(_camEnabled ? '摄像头开' : '摄像头关'),
              ),
              OutlinedButton.icon(
                onPressed: switchCamera,
                icon: const Icon(Icons.cameraswitch),
                label: const Text('切换前后摄'),
              ),
            ],
          ),
        ),
      ),
    );
  }
}

最近在研究P2P音视频通话,发现打洞成功率及低 综合成功率15-20%.

有什么能节省服务器中继带宽的其他方案吗?

详细测试环境报告 关于webrtcP2P音视频通话问题测试 - 藏宝库It社区

更多推荐