博主智算菩萨,专注于人工智能、Python编程、音视频处理及UI窗体程序设计等方向。致力于以通俗易懂的方式拆解前沿技术,从零基础入门到高阶实战,陪伴开发者共同成长。目前已开设五大技术专栏,累计发布多篇原创技术文章,深受读者好评。

📌 专栏导航

  • 人工智能前沿知识(已更191篇):深度剖析Transformer架构、生成式AI、强化学习、具身智能、神经符号系统、大模型及智能体(Agent)技术,系统性解析AI核心技术体系与前沿趋势。
  • Python基础小白编程(已更232篇):从零开始,以保姆式教程讲解变量、数据类型、流程控制、函数等核心语法,配有大量实战代码与避坑指南,真正做到学以致用。
  • 机器学习与深度学习(125篇):系统化拆解线性模型、决策树、随机森林、梯度提升树、神经网络等算法原理与工程实践,覆盖从公式推导到代码实现的全链路内容。
  • 音频、图像与视频处理理论与实战(81篇):涵盖FFmpeg多媒体处理、audio_shop开源工具、ComfyUI-WanVideoWrapper视频生成等实用技术,从基础操作到高级应用一应俱全。
  • UI窗体程序设计实战(78篇):深入讲解UI设计、动态窗体生成、游戏UI框架设计等实战技巧,提供从配置到编码的完整解决方案。
    智算菩萨,以代码为经,以算法为纬,在人工智能的星辰大海中,做你前行路上最可靠的导航者。本人最常用AI工具为AIGCBAR

引言:从理论到实践的跨越

经过前面49讲的学习,我们已经掌握了TCP和UDP的核心原理——三次握手、四次挥手、滑动窗口、拥塞控制…这些理论知识就像建筑的设计蓝图,但真正的建筑还需要一砖一瓦地搭建。在本讲中,我们将通过三个完整的综合案例,把前面学到的所有理论知识转化为可运行的代码,让你真正掌握Python网络编程的精髓。

这三个案例分别是:

  1. TCP文件传输系统 —— 基于TCP的可靠文件传输,包含进度显示和MD5校验
  2. UDP聊天室 —— 基于UDP的多用户实时聊天系统
  3. 多线程并发Echo服务器 —— 使用线程池处理高并发连接

1. 案例1:TCP文件传输系统

1.1 设计思路

文件传输是TCP最经典的应用场景之一。与简单的文本传输不同,文件传输需要解决以下问题:

  1. 大文件分块:一个10GB的文件不能一次性读入内存,需要分块传输
  2. 传输进度:用户需要看到传输进度,知道还要等多久
  3. 完整性校验:网络传输可能出错,需要校验文件是否完整
  4. 并发处理:服务器需要同时服务多个客户端

1.2 TCP文件传输流程

服务器 客户端 服务器 客户端 阶段1:获取文件列表 阶段2:请求文件 阶段3:传输文件(分块) 显示进度: 20% loop [分块传输] 阶段4:校验完整性 阶段5:关闭连接 LIST file1.txt (1024B)\nfile2.pdf (2048KB) GET file1.txt OK 1024 MD5:abc123 DATA [chunk 1/10] ACK 1 DATA [chunk 2/10] ACK 2 MD5校验 MATCH / MISMATCH BYE BYE TCP文件传输流程

1.3 完整代码实现

# ======= 案例1: TCP文件传输系统 =======
import socket
import hashlib
import os
import struct
import time
from threading import Thread

# ============ 协议定义 ============
# 使用简单的文本协议进行通信
# 命令格式: COMMAND [args...]\n
CHUNK_SIZE = 8192  # 每次传输的数据块大小(8KB)


def compute_md5(filepath):
    """
    计算文件的MD5哈希值

    参数:
        filepath: 文件路径
    返回:
        32位十六进制字符串
    """
    md5_hash = hashlib.md5()
    with open(filepath, 'rb') as f:
        for chunk in iter(lambda: f.read(CHUNK_SIZE), b''):
            md5_hash.update(chunk)
    return md5_hash.hexdigest()


def send_line(sock, line):
    """发送一行文本(自动添加换行符)"""
    sock.sendall((line + '\n').encode('utf-8'))


def recv_line(sock):
    """接收一行文本(以换行符为结束标志)"""
    buffer = b''
    while not buffer.endswith(b'\n'):
        data = sock.recv(1)
        if not data:
            break
        buffer += data
    return buffer.decode('utf-8').strip()


def send_file(sock, filepath, callback=None):
    """
    发送文件(分块传输,带进度回调)

    参数:
        sock: 已连接的socket
        filepath: 要发送的文件路径
        callback: 进度回调函数 callback(sent, total, percentage)
    """
    filesize = os.path.getsize(filepath)
    sent = 0

    with open(filepath, 'rb') as f:
        while sent < filesize:
            chunk = f.read(CHUNK_SIZE)
            if not chunk:
                break
            # 先发送4字节的长度头,再发送数据
            sock.sendall(struct.pack('!I', len(chunk)))
            sock.sendall(chunk)
            sent += len(chunk)

            if callback:
                callback(sent, filesize, sent / filesize * 100)

    # 发送0长度表示文件传输结束
    sock.sendall(struct.pack('!I', 0))


def recv_file(sock, filepath, filesize=None, callback=None):
    """
    接收文件(分块接收,带进度回调)

    参数:
        sock: 已连接的socket
        filepath: 保存文件的路径
        filesize: 预期的文件大小(如果已知)
        callback: 进度回调函数 callback(received, total, percentage)
    """
    # 如果未提供文件大小,则尝试从socket读取
    if filesize is None:
        filesize_str = recv_line(sock)
        filesize = int(filesize_str)
    received = 0

    with open(filepath, 'wb') as f:
        while received < filesize:
            # 接收4字节的长度头
            len_bytes = sock.recv(4)
            if len(len_bytes) < 4:
                break
            chunk_len = struct.unpack('!I', len_bytes)[0]

            if chunk_len == 0:
                break

            # 接收数据块
            chunk = b''
            while len(chunk) < chunk_len:
                data = sock.recv(chunk_len - len(chunk))
                if not data:
                    break
                chunk += data

            f.write(chunk)
            received += len(chunk)

            if callback:
                callback(received, filesize, received / filesize * 100)


# ============ 服务器端 ============
def tcp_file_server(host='0.0.0.0', port=9001, shared_dir='./shared'):
    """
    TCP文件传输服务器

    功能:
    1. 提供文件列表查询
    2. 支持文件下载
    3. 支持MD5校验
    4. 多客户端并发处理

    参数:
        host: 监听地址
        port: 监听端口
        shared_dir: 共享文件目录
    """
    # 创建共享目录
    if not os.path.exists(shared_dir):
        os.makedirs(shared_dir)
        print(f"[服务器] 创建共享目录: {shared_dir}")

    # 创建TCP socket
    server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server_sock.bind((host, port))
    server_sock.listen(5)

    print(f"[服务器] 启动成功,监听 {host}:{port}")
    print(f"[服务器] 共享目录: {os.path.abspath(shared_dir)}")
    print("[服务器] 等待客户端连接...\n")

    def handle_client(client_sock, addr):
        """处理单个客户端连接"""
        print(f"[服务器] 客户端 {addr} 已连接")

        try:
            while True:
                # 接收命令
                command = recv_line(client_sock)
                if not command:
                    break

                parts = command.split()
                cmd = parts[0].upper()

                if cmd == 'LIST':
                    # 返回文件列表
                    files = os.listdir(shared_dir)
                    file_list = []
                    for f in files:
                        fpath = os.path.join(shared_dir, f)
                        if os.path.isfile(fpath):
                            size = os.path.getsize(fpath)
                            file_list.append(f"{f}|{size}")

                    send_line(client_sock, f"OK {len(file_list)}")
                    for f in file_list:
                        send_line(client_sock, f)
                    print(f"[服务器] 向 {addr} 发送文件列表 ({len(file_list)} 个文件)")

                elif cmd == 'GET' and len(parts) >= 2:
                    # 发送文件
                    filename = parts[1]
                    filepath = os.path.join(shared_dir, filename)

                    if not os.path.exists(filepath):
                        send_line(client_sock, "ERROR File not found")
                        print(f"[服务器] 文件不存在: {filename}")
                        continue

                    filesize = os.path.getsize(filepath)
                    filemd5 = compute_md5(filepath)

                    send_line(client_sock, f"OK {filesize} {filemd5}")
                    print(f"[服务器] 开始发送文件: {filename} ({filesize} bytes)")

                    # 发送文件内容
                    start_time = time.time()
                    send_file(client_sock, filepath)
                    elapsed = time.time() - start_time
                    speed = filesize / elapsed / 1024 if elapsed > 0 else 0
                    print(f"[服务器] 文件发送完成: {filename} "
                          f"({elapsed:.2f}s, {speed:.2f} KB/s)")

                elif cmd == 'MD5' and len(parts) >= 2:
                    # 校验文件MD5
                    filename = parts[1]
                    filepath = os.path.join(shared_dir, filename)

                    if os.path.exists(filepath):
                        filemd5 = compute_md5(filepath)
                        send_line(client_sock, f"OK {filemd5}")
                    else:
                        send_line(client_sock, "ERROR File not found")

                elif cmd == 'BYE':
                    send_line(client_sock, "BYE")
                    break

                else:
                    send_line(client_sock, "ERROR Unknown command")

        except Exception as e:
            print(f"[服务器] 处理客户端 {addr} 时出错: {e}")
        finally:
            client_sock.close()
            print(f"[服务器] 客户端 {addr} 已断开")

    # 主循环:接受客户端连接
    try:
        while True:
            client_sock, addr = server_sock.accept()
            # 为每个客户端创建新线程
            thread = Thread(target=handle_client, args=(client_sock, addr))
            thread.daemon = True
            thread.start()
    except KeyboardInterrupt:
        print("\n[服务器] 正在关闭...")
    finally:
        server_sock.close()
        print("[服务器] 已关闭")


# ============ 客户端端 ============
def tcp_file_client(server_host, server_port, action='list', filename=None):
    """
    TCP文件传输客户端

    功能:
    1. 获取服务器文件列表
    2. 下载文件(带进度条)
    3. MD5校验

    参数:
        server_host: 服务器地址
        server_port: 服务器端口
        action: 'list' 或 'get'
        filename: 要下载的文件名(action='get'时需要)
    """
    # 创建TCP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((server_host, server_port))
    print(f"[客户端] 已连接到服务器 {server_host}:{server_port}")

    try:
        if action == 'list':
            # 获取文件列表
            send_line(sock, "LIST")

            response = recv_line(sock)
            if response.startswith("OK"):
                num_files = int(response.split()[1])
                print(f"\n{'文件名':<30} {'大小':>15}")
                print("-" * 50)

                for _ in range(num_files):
                    file_info = recv_line(sock)
                    name, size = file_info.rsplit('|', 1)
                    size_str = format_size(int(size))
                    print(f"{name:<30} {size_str:>15}")

        elif action == 'get' and filename:
            # 请求下载文件
            send_line(sock, f"GET {filename}")

            response = recv_line(sock)
            if response.startswith("OK"):
                parts = response.split()
                filesize = int(parts[1])
                server_md5 = parts[2]

                print(f"[客户端] 开始下载: {filename}")
                print(f"[客户端] 文件大小: {format_size(filesize)}")
                print(f"[客户端] 服务器MD5: {server_md5}")

                # 下载目录
                download_dir = './downloads'
                if not os.path.exists(download_dir):
                    os.makedirs(download_dir)

                save_path = os.path.join(download_dir, filename)

                # 定义进度回调函数
                def progress_callback(received, total, percentage):
                    bar_length = 40
                    filled = int(bar_length * percentage / 100)
                    bar = '#' * filled + '-' * (bar_length - filled)
                    print(f"\r[{bar}] {percentage:.1f}% "
                          f"({format_size(received)}/{format_size(total)})",
                          end='', flush=True)

                # 直接接收文件内容(服务器在OK响应后直接发送文件)
                start_time = time.time()
                recv_file(sock, save_path, filesize, progress_callback)
                elapsed = time.time() - start_time

                print()  # 换行

                # 校验MD5
                local_md5 = compute_md5(save_path)
                print(f"[客户端] 本地MD5:  {local_md5}")
                print(f"[客户端] 校验结果: {'OK 匹配' if local_md5 == server_md5 else 'FAIL 不匹配'}")
                print(f"[客户端] 下载完成: {save_path}")
                print(f"[客户端] 耗时: {elapsed:.2f}s, "
                      f"速度: {filesize / elapsed / 1024:.2f} KB/s")
            else:
                print(f"[客户端] 错误: {response}")

        elif action == 'md5' and filename:
            send_line(sock, f"MD5 {filename}")
            response = recv_line(sock)
            print(f"[客户端] MD5: {response}")

    finally:
        send_line(sock, "BYE")
        sock.close()
        print("[客户端] 连接已关闭")


def format_size(size_bytes):
    """将字节大小转换为人类可读的格式"""
    if size_bytes < 1024:
        return f"{size_bytes} B"
    elif size_bytes < 1024 ** 2:
        return f"{size_bytes / 1024:.2f} KB"
    elif size_bytes < 1024 ** 3:
        return f"{size_bytes / 1024 ** 2:.2f} MB"
    else:
        return f"{size_bytes / 1024 ** 3:.2f} GB"


def demo_file_transfer():
    """演示TCP文件传输"""
    import tempfile
    import shutil

    # 创建临时目录和测试文件
    temp_dir = tempfile.mkdtemp(prefix='tcp_file_demo_')
    shared_dir = os.path.join(temp_dir, 'shared')
    os.makedirs(shared_dir)

    # 创建一个100KB的测试文件
    test_file = os.path.join(shared_dir, 'test.txt')
    with open(test_file, 'w') as f:
        f.write("Hello, TCP File Transfer!\n" * 5000)  # 约105KB

    print(f"[演示] 创建测试文件: {test_file}")
    print(f"[演示] 文件大小: {os.path.getsize(test_file)} bytes")
    print(f"[演示] 文件MD5: {compute_md5(test_file)}")

    # 启动服务器(在线程中)
    server_thread = Thread(target=tcp_file_server,
                           args=('127.0.0.1', 19001, shared_dir))
    server_thread.daemon = True
    server_thread.start()
    time.sleep(0.5)  # 等待服务器启动

    # 客户端获取文件列表
    print("\n" + "=" * 60)
    print("步骤1: 获取文件列表")
    print("=" * 60)
    tcp_file_client('127.0.0.1', 19001, 'list')

    # 客户端下载文件
    print("\n" + "=" * 60)
    print("步骤2: 下载文件")
    print("=" * 60)
    tcp_file_client('127.0.0.1', 19001, 'get', 'test.txt')

    # 清理
    shutil.rmtree(temp_dir)
    download_dir = './downloads'
    if os.path.exists(download_dir):
        shutil.rmtree(download_dir)
    print("\n[演示] 临时文件已清理")


if __name__ == "__main__":
    print("=" * 60)
    print("TCP文件传输系统")
    print("=" * 60)
    print("\n使用方式:")
    print("1. 启动服务器: tcp_file_server('0.0.0.0', 9001, './shared')")
    print("2. 客户端列表: tcp_file_client('127.0.0.1', 9001, 'list')")
    print("3. 客户端下载: tcp_file_client('127.0.0.1', 9001, 'get', 'filename')")
    print("\n" + "=" * 60)
    print("运行演示...")
    print("=" * 60)
    demo_file_transfer()

1.4 运行说明

启动服务器

# 方式1:直接调用函数
tcp_file_server('0.0.0.0', 9001, './shared')

# 方式2:命令行
# python -c "from tcp_file_transfer import tcp_file_server; tcp_file_server()"

客户端操作

# 获取文件列表
tcp_file_client('127.0.0.1', 9001, 'list')

# 下载文件
tcp_file_client('127.0.0.1', 9001, 'get', 'example.pdf')

服务器端输出示例

============================================================
TCP文件传输系统
============================================================

[服务器] 创建共享目录: ./shared
[服务器] 启动成功,监听 0.0.0.0:9001
[服务器] 共享目录: /home/user/shared
[服务器] 等待客户端连接...

[服务器] 客户端 ('127.0.0.1', 54321) 已连接
[服务器] 向 ('127.0.0.1', 54321) 发送文件列表 (3 个文件)
[服务器] 客户端 ('127.0.0.1', 54322) 已连接
[服务器] 开始发送文件: report.pdf (1048576 bytes)
[服务器] 文件发送完成: report.pdf (1.23s, 832.50 KB/s)
[服务器] 客户端 ('127.0.0.1', 54322) 已断开

客户端输出示例

[客户端] 已连接到服务器 127.0.0.1:9001

文件名                                    大小
--------------------------------------------------
report.pdf                             1.00 MB
photo.jpg                             256.00 KB
data.csv                               15.50 KB

[客户端] 已连接到服务器 127.0.0.1:9001
[客户端] 开始下载: report.pdf
[客户端] 文件大小: 1.00 MB
[客户端] 服务器MD5: d41d8cd98f00b204e9800998ecf8427e
[████████████████████████████████████████] 100.0% (1.00 MB/1.00 MB)
[客户端] 本地MD5:  d41d8cd98f00b204e9800998ecf8427e
[客户端] 校验结果: ✓ 匹配
[客户端] 下载完成: ./downloads/report.pdf
[客户端] 耗时: 1.23s, 速度: 832.50 KB/s
[客户端] 连接已关闭

2. 案例2:UDP聊天室

2.1 设计思路

UDP是无连接的协议,适合实时性要求高但允许少量丢包的场景,如聊天室、语音通话等。UDP聊天室的设计要点:

  1. 用户注册:客户端首次发送消息时自动注册
  2. 消息转发:服务器收到消息后转发给所有在线用户
  3. 心跳检测:定期检测用户是否还在线
  4. 用户列表:维护在线用户列表

2.2 UDP聊天室架构

UDP聊天室架构

消息+地址

消息+地址

消息+地址

转发消息

转发消息

转发消息

UDP服务器
端口5000

客户端A
Alice

客户端B
Bob

客户端C
Carol

用户列表管理
addr → username

2.3 完整代码实现

# ======= 案例2: UDP聊天室 =======
import socket
import threading
import time
import json

# ============ UDP聊天室服务器 ============
class UDPServer:
    """
    UDP聊天室服务器
    
    功能:
    1. 接收客户端消息
    2. 维护在线用户列表
    3. 转发消息给所有在线用户
    4. 处理用户加入/退出
    5. 心跳检测
    
    协议格式(JSON):
    {
        'type': 'message' | 'join' | 'leave' | 'heartbeat' | 'list',
        'username': '用户名',
        'content': '消息内容',
        'timestamp': 1234567890
    }
    """
    
    def __init__(self, host='0.0.0.0', port=5000):
        """
        初始化UDP聊天室服务器
        
        参数:
            host: 监听地址
            port: 监听端口
        """
        self.host = host
        self.port = port
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.sock.bind((host, port))
        
        # 用户管理: {addr: {'username': str, 'last_seen': timestamp}}
        self.users = {}
        self.lock = threading.Lock()
        
        print(f"[服务器] UDP聊天室启动成功 {host}:{port}")
        print(f"[服务器] 等待客户端连接...\n")
    
    def handle_client(self, data, addr):
        """
        处理客户端消息
        
        参数:
            data: 收到的原始数据
            addr: 客户端地址 (ip, port)
        """
        try:
            message = json.loads(data.decode('utf-8'))
            msg_type = message.get('type', 'message')
            username = message.get('username', '匿名')
            
            if msg_type == 'join':
                # 新用户加入
                with self.lock:
                    self.users[addr] = {
                        'username': username,
                        'last_seen': time.time()
                    }
                
                join_msg = f"🎉 {username} 加入了聊天室! "
                join_msg += f"当前在线: {len(self.users)}人"
                print(f"[加入] {username} ({addr[0]}:{addr[1]})")
                self.broadcast(join_msg, exclude_addr=None)
                self.send_user_list(addr)
            
            elif msg_type == 'message':
                # 普通消息
                with self.lock:
                    if addr in self.users:
                        self.users[addr]['last_seen'] = time.time()
                    else:
                        # 未注册的用户,自动注册
                        self.users[addr] = {
                            'username': username,
                            'last_seen': time.time()
                        }
                
                content = message.get('content', '')
                formatted_msg = f"[{username}] {content}"
                print(f"[消息] {formatted_msg}")
                self.broadcast(formatted_msg, exclude_addr=addr)
            
            elif msg_type == 'leave':
                # 用户离开
                with self.lock:
                    if addr in self.users:
                        username = self.users[addr]['username']
                        del self.users[addr]
                
                leave_msg = f"👋 {username} 离开了聊天室"
                print(f"[离开] {username}")
                self.broadcast(leave_msg)
            
            elif msg_type == 'heartbeat':
                # 心跳包,更新最后活跃时间
                with self.lock:
                    if addr in self.users:
                        self.users[addr]['last_seen'] = time.time()
            
            elif msg_type == 'list':
                # 请求用户列表
                self.send_user_list(addr)
        
        except json.JSONDecodeError:
            print(f"[错误] 收到无效数据来自 {addr}: {data[:100]}")
        except Exception as e:
            print(f"[错误] 处理消息时出错: {e}")
    
    def broadcast(self, message, exclude_addr=None):
        """
        广播消息给所有在线用户
        
        参数:
            message: 要广播的消息字符串
            exclude_addr: 排除的地址(不发给自己)
        """
        data = json.dumps({
            'type': 'broadcast',
            'content': message,
            'timestamp': time.time()
        }).encode('utf-8')
        
        with self.lock:
            users_copy = list(self.users.items())
        
        for addr, user_info in users_copy:
            if exclude_addr and addr == exclude_addr:
                continue
            try:
                self.sock.sendto(data, addr)
            except Exception as e:
                print(f"[错误] 发送给 {addr} 失败: {e}")
    
    def send_user_list(self, addr):
        """发送在线用户列表给指定客户端"""
        with self.lock:
            user_list = [
                {'username': info['username'], 'addr': f"{a[0]}:{a[1]}"}
                for a, info in self.users.items()
            ]
        
        data = json.dumps({
            'type': 'user_list',
            'users': user_list,
            'count': len(user_list)
        }).encode('utf-8')
        
        try:
            self.sock.sendto(data, addr)
        except Exception as e:
            print(f"[错误] 发送用户列表失败: {e}")
    
    def check_heartbeat(self):
        """心跳检测:清理超时用户"""
        timeout = 60  # 60秒无心跳视为离线
        while True:
            time.sleep(10)  # 每10秒检测一次
            
            current_time = time.time()
            expired_users = []
            
            with self.lock:
                for addr, info in list(self.users.items()):
                    if current_time - info['last_seen'] > timeout:
                        expired_users.append((addr, info['username']))
                
                for addr, _ in expired_users:
                    del self.users[addr]
            
            for addr, username in expired_users:
                timeout_msg = f"⏰ {username} 连接超时,已自动离线"
                print(f"[超时] {username}")
                self.broadcast(timeout_msg)
    
    def run(self):
        """启动服务器"""
        # 启动心跳检测线程
        heartbeat_thread = threading.Thread(target=self.check_heartbeat)
        heartbeat_thread.daemon = True
        heartbeat_thread.start()
        
        print("[服务器] 开始接收消息...")
        try:
            while True:
                data, addr = self.sock.recvfrom(4096)
                # 为每个消息创建新线程处理
                thread = threading.Thread(
                    target=self.handle_client, 
                    args=(data, addr)
                )
                thread.daemon = True
                thread.start()
        except KeyboardInterrupt:
            print("\n[服务器] 正在关闭...")
        finally:
            self.sock.close()
            print("[服务器] 已关闭")


# ============ UDP聊天室客户端 ============
class UDPClient:
    """
    UDP聊天室客户端
    
    功能:
    1. 连接到服务器
    2. 发送消息
    3. 接收消息(多线程)
    4. 心跳保活
    5. 显示在线用户
    """
    
    def __init__(self, server_host='127.0.0.1', server_port=5000, username='匿名'):
        """
        初始化UDP聊天室客户端
        
        参数:
            server_host: 服务器地址
            server_port: 服务器端口
            username: 用户名
        """
        self.server_addr = (server_host, server_port)
        self.username = username
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        
        # 绑定一个本地端口用于接收
        self.sock.bind(('', 0))
        self.local_addr = self.sock.getsockname()
        
        self.running = False
        self.online_users = []
    
    def join(self):
        """加入聊天室"""
        message = json.dumps({
            'type': 'join',
            'username': self.username,
            'timestamp': time.time()
        }).encode('utf-8')
        
        self.sock.sendto(message, self.server_addr)
        print(f"[客户端] {self.username} 正在加入聊天室...")
    
    def send_message(self, content):
        """发送消息"""
        message = json.dumps({
            'type': 'message',
            'username': self.username,
            'content': content,
            'timestamp': time.time()
        }).encode('utf-8')
        
        self.sock.sendto(message, self.server_addr)
    
    def leave(self):
        """离开聊天室"""
        message = json.dumps({
            'type': 'leave',
            'username': self.username,
            'timestamp': time.time()
        }).encode('utf-8')
        
        self.sock.sendto(message, self.server_addr)
        self.running = False
    
    def send_heartbeat(self):
        """发送心跳包"""
        while self.running:
            message = json.dumps({
                'type': 'heartbeat',
                'username': self.username,
                'timestamp': time.time()
            }).encode('utf-8')
            
            try:
                self.sock.sendto(message, self.server_addr)
            except:
                break
            
            time.sleep(30)  # 每30秒发送一次心跳
    
    def receive_messages(self):
        """接收消息的线程函数"""
        while self.running:
            try:
                self.sock.settimeout(1.0)
                data, addr = self.sock.recvfrom(4096)
                
                message = json.loads(data.decode('utf-8'))
                msg_type = message.get('type', '')
                
                if msg_type == 'broadcast':
                    content = message.get('content', '')
                    print(f"\n💬 {content}")
                    print(f"[{self.username}] ", end='', flush=True)
                
                elif msg_type == 'user_list':
                    self.online_users = message.get('users', [])
                    count = message.get('count', 0)
                    print(f"\n📋 在线用户 ({count}人):")
                    for user in self.online_users:
                        print(f"   👤 {user['username']}")
                    print(f"[{self.username}] ", end='', flush=True)
            
            except socket.timeout:
                continue
            except Exception as e:
                if self.running:
                    print(f"\n[错误] 接收消息失败: {e}")
    
    def start_interactive(self):
        """启动交互式聊天"""
        self.running = True
        
        # 加入聊天室
        self.join()
        time.sleep(0.5)  # 等待服务器响应
        
        # 启动接收线程
        receive_thread = threading.Thread(target=self.receive_messages)
        receive_thread.daemon = True
        receive_thread.start()
        
        # 启动心跳线程
        heartbeat_thread = threading.Thread(target=self.send_heartbeat)
        heartbeat_thread.daemon = True
        heartbeat_thread.start()
        
        print(f"\n{'='*50}")
        print(f"欢迎, {self.username}!")
        print("输入消息按回车发送")
        print("输入 /list 查看在线用户")
        print("输入 /quit 退出")
        print(f"{'='*50}\n")
        
        try:
            while self.running:
                try:
                    user_input = input(f"[{self.username}] ")
                except EOFError:
                    break
                
                if not user_input:
                    continue
                
                if user_input.lower() == '/quit':
                    self.leave()
                    break
                elif user_input.lower() == '/list':
                    self.request_user_list()
                else:
                    self.send_message(user_input)
        
        except KeyboardInterrupt:
            print("\n正在退出...")
            self.leave()
        finally:
            self.running = False
            self.sock.close()
            print("已退出聊天室")
    
    def request_user_list(self):
        """请求在线用户列表"""
        message = json.dumps({
            'type': 'list',
            'username': self.username
        }).encode('utf-8')
        self.sock.sendto(message, self.server_addr)


def demo_chat_room():
    """演示UDP聊天室"""
    import subprocess
    import sys
    
    print("="*60)
    print("UDP聊天室演示")
    print("="*60)
    
    # 启动服务器
    server = UDPServer('127.0.0.1', 15000)
    server_thread = threading.Thread(target=server.run)
    server_thread.daemon = True
    
    print("[演示] 启动服务器...")
    server_thread.start()
    time.sleep(0.3)
    
    # 模拟几个客户端发送消息
    print("[演示] 模拟客户端交互...\n")
    
    # 客户端1 - Alice
    client1 = UDPClient('127.0.0.1', 15000, 'Alice')
    client1.join()
    time.sleep(0.2)
    
    # 客户端2 - Bob
    client2 = UDPClient('127.0.0.1', 15000, 'Bob')
    client2.join()
    time.sleep(0.2)
    
    # 客户端3 - Carol
    client3 = UDPClient('127.0.0.1', 15000, 'Carol')
    client3.join()
    time.sleep(0.5)
    
    # 模拟对话
    print("\n--- 模拟对话开始 ---")
    client1.send_message("大家好!我是Alice")
    time.sleep(0.3)
    client2.send_message("嗨Alice,我是Bob")
    time.sleep(0.3)
    client3.send_message("你们好呀,Carol报到!")
    time.sleep(0.3)
    client1.send_message("这个聊天室真不错~")
    time.sleep(0.3)
    client2.send_message("是啊,UDP传输还挺快的")
    time.sleep(0.5)
    
    # 客户端2离开
    client2.leave()
    time.sleep(0.3)
    
    client1.send_message("Bob怎么走了?")
    time.sleep(0.3)
    client3.send_message("可能是去吃饭了吧 😄")
    time.sleep(0.5)
    
    # 清理
    client1.leave()
    client3.leave()
    
    print("\n--- 演示结束 ---")
    print("\n使用说明:")
    print("1. 启动服务器: server = UDPServer('0.0.0.0', 5000); server.run()")
    print("2. 启动客户端: client = UDPClient('127.0.0.1', 5000, '用户名')")
    print("3. 开始聊天: client.start_interactive()")


if __name__ == "__main__":
    demo_chat_room()

2.4 运行说明

启动服务器

server = UDPServer('0.0.0.0', 5000)
server.run()

启动客户端(交互式)

client = UDPClient('127.0.0.1', 5000, 'Alice')
client.start_interactive()

客户端交互命令

  • 输入消息 → 发送给所有人
  • /list → 查看在线用户
  • /quit → 退出聊天室

输出示例

============================================================
欢迎, Alice!
输入消息按回车发送
输入 /list 查看在线用户
输入 /quit 退出
============================================================

[Alice] 
💬 🎉 Bob 加入了聊天室! 当前在线: 2人
[Alice] 
💬 [Bob] 嗨Alice!
[Alice] 你好Bob!
[Alice] 
💬 [Bob] 你在学什么?
[Alice] 正在学计算机网络!
[Alice] 
📋 在线用户 (2人):
   👤 Alice
   👤 Bob
[Alice] /quit
已退出聊天室

3. 案例3:多线程并发Echo服务器

3.1 设计思路

Echo服务器是网络编程中最经典的服务器模型——客户端发送什么,服务器就回送什么。看似简单,但通过多线程和线程池的改造,可以实现高效的并发处理。

设计要点:

  1. 多线程处理:每个客户端连接由一个独立线程处理
  2. 线程池限制:使用ThreadPoolExecutor限制最大并发数
  3. 优雅关闭:正确处理Ctrl+C信号
  4. 连接统计:记录活跃连接数和处理请求数

3.2 完整代码实现

# ======= 案例3: 多线程并发Echo服务器 =======
import socket
import threading
import time
import signal
import sys
from concurrent.futures import ThreadPoolExecutor


class ThreadedEchoServer:
    """
    多线程并发Echo服务器
    
    功能:
    1. 接收客户端连接
    2. 为每个客户端创建独立线程
    3. 线程池限制并发数
    4. 统计连接信息
    5. 优雅关闭
    
    Echo协议:
    - 客户端发送任意文本
    - 服务器回送相同文本(Echo)
    - 发送 'QUIT' 断开连接
    """
    
    def __init__(self, host='0.0.0.0', port=9002, max_workers=10):
        """
        初始化Echo服务器
        
        参数:
            host: 监听地址
            port: 监听端口
            max_workers: 线程池最大工作线程数
        """
        self.host = host
        self.port = port
        self.max_workers = max_workers
        
        # 创建TCP socket
        self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server_sock.bind((host, port))
        self.server_sock.listen(max_workers)
        
        # 线程池
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        
        # 统计信息
        self.stats = {
            'total_connections': 0,
            'active_connections': 0,
            'total_messages': 0,
            'total_bytes': 0
        }
        self.stats_lock = threading.Lock()
        
        self.running = True
        
        # 设置信号处理
        signal.signal(signal.SIGINT, self._signal_handler)
    
    def _signal_handler(self, signum, frame):
        """处理Ctrl+C信号"""
        print("\n[服务器] 收到关闭信号,正在优雅关闭...")
        self.running = False
        self.server_sock.close()
    
    def _update_stats(self, key, delta=1):
        """线程安全地更新统计"""
        with self.stats_lock:
            self.stats[key] += delta
    
    def _get_stats(self):
        """获取当前统计信息副本"""
        with self.stats_lock:
            return dict(self.stats)
    
    def handle_client(self, client_sock, addr):
        """
        处理单个客户端连接
        
        参数:
            client_sock: 客户端socket
            addr: 客户端地址
        """
        self._update_stats('total_connections')
        self._update_stats('active_connections')
        
        client_id = f"{addr[0]}:{addr[1]}"
        print(f"[连接] 客户端 {client_id} 已连接 "
              f"(活跃: {self._get_stats()['active_connections']})")
        
        try:
            client_sock.settimeout(60)  # 60秒超时
            
            while self.running:
                # 接收数据
                data = client_sock.recv(4096)
                if not data:
                    break
                
                message = data.decode('utf-8').strip()
                self._update_stats('total_messages')
                self._update_stats('total_bytes', len(data))
                
                # 处理特殊命令
                if message.upper() == 'QUIT':
                    client_sock.sendall(b'Goodbye!\n')
                    break
                elif message.upper() == 'STATS':
                    stats = self._get_stats()
                    response = (f"服务器统计:\n"
                               f"总连接数: {stats['total_connections']}\n"
                               f"活跃连接: {stats['active_connections']}\n"
                               f"总消息数: {stats['total_messages']}\n"
                               f"总字节数: {stats['total_bytes']}\n")
                    client_sock.sendall(response.encode('utf-8'))
                elif message.upper() == 'TIME':
                    response = f"服务器时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n"
                    client_sock.sendall(response.encode('utf-8'))
                else:
                    # Echo回送
                    response = f"Echo: {message}\n"
                    client_sock.sendall(response.encode('utf-8'))
        
        except socket.timeout:
            print(f"[超时] 客户端 {client_id} 连接超时")
        except ConnectionResetError:
            print(f"[断开] 客户端 {client_id} 强制断开")
        except Exception as e:
            if self.running:
                print(f"[错误] 处理客户端 {client_id}: {e}")
        finally:
            client_sock.close()
            self._update_stats('active_connections', -1)
            print(f"[断开] 客户端 {client_id} 已断开 "
                  f"(活跃: {self._get_stats()['active_connections']})")
    
    def run(self):
        """启动服务器"""
        print(f"[服务器] Echo服务器启动 {self.host}:{self.port}")
        print(f"[服务器] 最大并发数: {self.max_workers}")
        print(f"[服务器] 按 Ctrl+C 关闭服务器\n")
        
        try:
            while self.running:
                try:
                    client_sock, addr = self.server_sock.accept()
                    # 提交到线程池
                    self.executor.submit(self.handle_client, client_sock, addr)
                except OSError:
                    # socket已关闭
                    break
        except Exception as e:
            if self.running:
                print(f"[错误] 服务器异常: {e}")
        finally:
            self.shutdown()
    
    def shutdown(self):
        """优雅关闭服务器"""
        print("\n[服务器] 正在关闭...")
        self.running = False
        
        # 关闭socket
        try:
            self.server_sock.close()
        except:
            pass
        
        # 关闭线程池
        self.executor.shutdown(wait=True)
        
        # 打印最终统计
        stats = self._get_stats()
        print(f"\n{'='*40}")
        print("服务器运行统计")
        print(f"{'='*40}")
        print(f"总连接数: {stats['total_connections']}")
        print(f"总消息数: {stats['total_messages']}")
        print(f"总字节数: {stats['total_bytes']}")
        print(f"{'='*40}")
        print("[服务器] 已关闭")


# ============ Echo客户端 ============
def echo_client(server_host='127.0.0.1', server_port=9002, messages=None):
    """
    Echo客户端
    
    参数:
        server_host: 服务器地址
        server_port: 服务器端口
        messages: 要发送的消息列表(None则交互式输入)
    """
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((server_host, server_port))
    
    client_addr = sock.getsockname()
    print(f"[客户端 {client_addr[1]}] 已连接到服务器")
    
    try:
        if messages:
            # 批量发送消息
            for msg in messages:
                sock.sendall((msg + '\n').encode('utf-8'))
                data = sock.recv(4096)
                print(f"[客户端 {client_addr[1]}] 发送: {msg}")
                print(f"[客户端 {client_addr[1]}] 收到: {data.decode('utf-8').strip()}")
                time.sleep(0.3)
            
            # 发送QUIT
            sock.sendall(b'QUIT\n')
            data = sock.recv(4096)
            print(f"[客户端 {client_addr[1]}] 收到: {data.decode('utf-8').strip()}")
        else:
            # 交互式模式
            print("输入消息发送给服务器,输入 QUIT 退出")
            while True:
                user_input = input("> ")
                sock.sendall((user_input + '\n').encode('utf-8'))
                if user_input.upper() == 'QUIT':
                    break
                data = sock.recv(4096)
                print(f"< {data.decode('utf-8').strip()}")
    
    finally:
        sock.close()
        print(f"[客户端 {client_addr[1]}] 已断开")


def demo_echo_server():
    """演示多线程并发Echo服务器"""
    print("="*60)
    print("多线程并发Echo服务器演示")
    print("="*60)
    
    # 创建服务器
    server = ThreadedEchoServer('127.0.0.1', 19002, max_workers=5)
    
    # 启动服务器
    server_thread = threading.Thread(target=server.run)
    server_thread.daemon = True
    server_thread.start()
    time.sleep(0.5)
    
    # 模拟多个客户端同时连接
    print("\n[演示] 模拟5个客户端同时连接...\n")
    
    client_messages = [
        ["Hello", "World", "STATS", "TIME"],
        ["你好", "服务器", "Echo测试"],
        ["Client 3 here", "Testing"],
        ["Stats please", "STATS"],
        ["Goodbye", "QUIT"],
    ]
    
    threads = []
    for i, messages in enumerate(client_messages):
        t = threading.Thread(
            target=echo_client,
            args=('127.0.0.1', 19002, messages)
        )
        t.start()
        threads.append(t)
        time.sleep(0.1)  # 稍微错开连接时间
    
    # 等待所有客户端完成
    for t in threads:
        t.join()
    
    time.sleep(0.5)
    print(f"\n{'='*60}")
    print("演示完成!")
    print(f"{'='*60}")
    
    # 关闭服务器
    server.shutdown()


if __name__ == "__main__":
    demo_echo_server()

3.3 运行说明

启动服务器

server = ThreadedEchoServer('0.0.0.0', 9002, max_workers=10)
server.run()

启动客户端

echo_client('127.0.0.1', 9002)  # 交互式模式

# 或批量发送消息
echo_client('127.0.0.1', 9002, ['Hello', 'STATS', 'QUIT'])

服务器端输出示例

[服务器] Echo服务器启动 0.0.0.0:9002
[服务器] 最大并发数: 10
[服务器] 按 Ctrl+C 关闭服务器

[连接] 客户端 127.0.0.1:54321 已连接 (活跃: 1)
[连接] 客户端 127.0.0.1:54322 已连接 (活跃: 2)
[断开] 客户端 127.0.0.1:54321 已断开 (活跃: 1)
[连接] 客户端 127.0.0.1:54323 已连接 (活跃: 2)

========================================
服务器运行统计
========================================
总连接数: 5
总消息数: 15
总字节数: 1200
========================================
[服务器] 已关闭

3.4 三种服务器模型对比

模型 代码复杂度 并发能力 资源消耗 适用场景
单线程 简单 无并发 单次服务
每连接一线程 中等 中小型服务
线程池 中等 可控 可控 生产环境
异步IO 复杂 极高 极低 高并发服务

4. TCP vs UDP 实战选择指南

协议选择决策树

需要传输数据

需要可靠性?

数据量大?

选择UDP

需要流量控制?

选择TCP
短连接

选择TCP
长连接/文件传输

UDP + 应用层重传

应用场景:
视频流/游戏/DNS

应用场景:
HTTP短请求

应用场景:
文件传输/数据库

应用场景:
实时音视频

应用场景 推荐协议 原因
文件传输 TCP 需要可靠传输、完整性校验
Web浏览 TCP HTTP/HTTPS基于TCP
邮件传输 TCP SMTP/POP3/IMAP基于TCP
实时聊天 UDP/TCP 低延迟优先,可容忍少量丢包
在线游戏 UDP 低延迟,状态可周期性同步
DNS查询 UDP 请求响应小,一次交互完成
视频直播 UDP 实时性优先,可丢帧
远程登录 TCP SSH/Telnet需要可靠传输

5. 考研要点回顾

5.1 TCP vs UDP 核心对比

特性 TCP UDP
连接方式 面向连接 无连接
可靠性 可靠(确认、重传) 不可靠
有序性 有序(序号机制) 无序
流量控制 有(滑动窗口)
拥塞控制
首部开销 20字节 8字节
传输效率 较低 较高
应用场景 文件传输、网页浏览 视频流、DNS、游戏

5.2 三个案例对应的TCP/UDP特性

案例 使用协议 体现的核心特性
文件传输 TCP 可靠性、流量控制、面向连接
聊天室 UDP 低延迟、无连接、广播
Echo服务器 TCP 并发处理、面向连接

6. 本节小结

通过这三个综合案例,我们从理论走向了实践:

  1. TCP文件传输 让我们理解了如何利用TCP的可靠性进行大文件传输,包括分块传输、进度显示和MD5完整性校验。

  2. UDP聊天室 展示了无连接协议的高效性,以及如何在应用层实现用户管理和消息广播。

  3. 多线程Echo服务器 演示了生产环境中如何处理并发连接,线程池是限制资源消耗的关键。

学习建议:把这三个案例的代码亲自运行一遍,尝试修改参数、添加新功能(如文件上传、私聊功能、断点续传等),这样才能真正掌握Python网络编程。

更多推荐