【计算机网络考研】P50: TCP与UDP实战——Python网络编程综合案例
目录
博主智算菩萨,专注于人工智能、Python编程、音视频处理及UI窗体程序设计等方向。致力于以通俗易懂的方式拆解前沿技术,从零基础入门到高阶实战,陪伴开发者共同成长。目前已开设五大技术专栏,累计发布多篇原创技术文章,深受读者好评。
📌 专栏导航
- 人工智能前沿知识(已更191篇):深度剖析Transformer架构、生成式AI、强化学习、具身智能、神经符号系统、大模型及智能体(Agent)技术,系统性解析AI核心技术体系与前沿趋势。
- Python基础小白编程(已更232篇):从零开始,以保姆式教程讲解变量、数据类型、流程控制、函数等核心语法,配有大量实战代码与避坑指南,真正做到学以致用。
- 机器学习与深度学习(125篇):系统化拆解线性模型、决策树、随机森林、梯度提升树、神经网络等算法原理与工程实践,覆盖从公式推导到代码实现的全链路内容。
- 音频、图像与视频处理理论与实战(81篇):涵盖FFmpeg多媒体处理、audio_shop开源工具、ComfyUI-WanVideoWrapper视频生成等实用技术,从基础操作到高级应用一应俱全。
- UI窗体程序设计实战(78篇):深入讲解UI设计、动态窗体生成、游戏UI框架设计等实战技巧,提供从配置到编码的完整解决方案。
智算菩萨,以代码为经,以算法为纬,在人工智能的星辰大海中,做你前行路上最可靠的导航者。本人最常用AI工具为AIGCBAR。
引言:从理论到实践的跨越
经过前面49讲的学习,我们已经掌握了TCP和UDP的核心原理——三次握手、四次挥手、滑动窗口、拥塞控制…这些理论知识就像建筑的设计蓝图,但真正的建筑还需要一砖一瓦地搭建。在本讲中,我们将通过三个完整的综合案例,把前面学到的所有理论知识转化为可运行的代码,让你真正掌握Python网络编程的精髓。
这三个案例分别是:
- TCP文件传输系统 —— 基于TCP的可靠文件传输,包含进度显示和MD5校验
- UDP聊天室 —— 基于UDP的多用户实时聊天系统
- 多线程并发Echo服务器 —— 使用线程池处理高并发连接
1. 案例1:TCP文件传输系统
1.1 设计思路
文件传输是TCP最经典的应用场景之一。与简单的文本传输不同,文件传输需要解决以下问题:
- 大文件分块:一个10GB的文件不能一次性读入内存,需要分块传输
- 传输进度:用户需要看到传输进度,知道还要等多久
- 完整性校验:网络传输可能出错,需要校验文件是否完整
- 并发处理:服务器需要同时服务多个客户端
1.2 TCP文件传输流程
1.3 完整代码实现
# ======= 案例1: TCP文件传输系统 =======
import socket
import hashlib
import os
import struct
import time
from threading import Thread
# ============ 协议定义 ============
# 使用简单的文本协议进行通信
# 命令格式: COMMAND [args...]\n
CHUNK_SIZE = 8192 # 每次传输的数据块大小(8KB)
def compute_md5(filepath):
"""
计算文件的MD5哈希值
参数:
filepath: 文件路径
返回:
32位十六进制字符串
"""
md5_hash = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(CHUNK_SIZE), b''):
md5_hash.update(chunk)
return md5_hash.hexdigest()
def send_line(sock, line):
"""发送一行文本(自动添加换行符)"""
sock.sendall((line + '\n').encode('utf-8'))
def recv_line(sock):
"""接收一行文本(以换行符为结束标志)"""
buffer = b''
while not buffer.endswith(b'\n'):
data = sock.recv(1)
if not data:
break
buffer += data
return buffer.decode('utf-8').strip()
def send_file(sock, filepath, callback=None):
"""
发送文件(分块传输,带进度回调)
参数:
sock: 已连接的socket
filepath: 要发送的文件路径
callback: 进度回调函数 callback(sent, total, percentage)
"""
filesize = os.path.getsize(filepath)
sent = 0
with open(filepath, 'rb') as f:
while sent < filesize:
chunk = f.read(CHUNK_SIZE)
if not chunk:
break
# 先发送4字节的长度头,再发送数据
sock.sendall(struct.pack('!I', len(chunk)))
sock.sendall(chunk)
sent += len(chunk)
if callback:
callback(sent, filesize, sent / filesize * 100)
# 发送0长度表示文件传输结束
sock.sendall(struct.pack('!I', 0))
def recv_file(sock, filepath, filesize=None, callback=None):
"""
接收文件(分块接收,带进度回调)
参数:
sock: 已连接的socket
filepath: 保存文件的路径
filesize: 预期的文件大小(如果已知)
callback: 进度回调函数 callback(received, total, percentage)
"""
# 如果未提供文件大小,则尝试从socket读取
if filesize is None:
filesize_str = recv_line(sock)
filesize = int(filesize_str)
received = 0
with open(filepath, 'wb') as f:
while received < filesize:
# 接收4字节的长度头
len_bytes = sock.recv(4)
if len(len_bytes) < 4:
break
chunk_len = struct.unpack('!I', len_bytes)[0]
if chunk_len == 0:
break
# 接收数据块
chunk = b''
while len(chunk) < chunk_len:
data = sock.recv(chunk_len - len(chunk))
if not data:
break
chunk += data
f.write(chunk)
received += len(chunk)
if callback:
callback(received, filesize, received / filesize * 100)
# ============ 服务器端 ============
def tcp_file_server(host='0.0.0.0', port=9001, shared_dir='./shared'):
"""
TCP文件传输服务器
功能:
1. 提供文件列表查询
2. 支持文件下载
3. 支持MD5校验
4. 多客户端并发处理
参数:
host: 监听地址
port: 监听端口
shared_dir: 共享文件目录
"""
# 创建共享目录
if not os.path.exists(shared_dir):
os.makedirs(shared_dir)
print(f"[服务器] 创建共享目录: {shared_dir}")
# 创建TCP socket
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_sock.bind((host, port))
server_sock.listen(5)
print(f"[服务器] 启动成功,监听 {host}:{port}")
print(f"[服务器] 共享目录: {os.path.abspath(shared_dir)}")
print("[服务器] 等待客户端连接...\n")
def handle_client(client_sock, addr):
"""处理单个客户端连接"""
print(f"[服务器] 客户端 {addr} 已连接")
try:
while True:
# 接收命令
command = recv_line(client_sock)
if not command:
break
parts = command.split()
cmd = parts[0].upper()
if cmd == 'LIST':
# 返回文件列表
files = os.listdir(shared_dir)
file_list = []
for f in files:
fpath = os.path.join(shared_dir, f)
if os.path.isfile(fpath):
size = os.path.getsize(fpath)
file_list.append(f"{f}|{size}")
send_line(client_sock, f"OK {len(file_list)}")
for f in file_list:
send_line(client_sock, f)
print(f"[服务器] 向 {addr} 发送文件列表 ({len(file_list)} 个文件)")
elif cmd == 'GET' and len(parts) >= 2:
# 发送文件
filename = parts[1]
filepath = os.path.join(shared_dir, filename)
if not os.path.exists(filepath):
send_line(client_sock, "ERROR File not found")
print(f"[服务器] 文件不存在: {filename}")
continue
filesize = os.path.getsize(filepath)
filemd5 = compute_md5(filepath)
send_line(client_sock, f"OK {filesize} {filemd5}")
print(f"[服务器] 开始发送文件: {filename} ({filesize} bytes)")
# 发送文件内容
start_time = time.time()
send_file(client_sock, filepath)
elapsed = time.time() - start_time
speed = filesize / elapsed / 1024 if elapsed > 0 else 0
print(f"[服务器] 文件发送完成: {filename} "
f"({elapsed:.2f}s, {speed:.2f} KB/s)")
elif cmd == 'MD5' and len(parts) >= 2:
# 校验文件MD5
filename = parts[1]
filepath = os.path.join(shared_dir, filename)
if os.path.exists(filepath):
filemd5 = compute_md5(filepath)
send_line(client_sock, f"OK {filemd5}")
else:
send_line(client_sock, "ERROR File not found")
elif cmd == 'BYE':
send_line(client_sock, "BYE")
break
else:
send_line(client_sock, "ERROR Unknown command")
except Exception as e:
print(f"[服务器] 处理客户端 {addr} 时出错: {e}")
finally:
client_sock.close()
print(f"[服务器] 客户端 {addr} 已断开")
# 主循环:接受客户端连接
try:
while True:
client_sock, addr = server_sock.accept()
# 为每个客户端创建新线程
thread = Thread(target=handle_client, args=(client_sock, addr))
thread.daemon = True
thread.start()
except KeyboardInterrupt:
print("\n[服务器] 正在关闭...")
finally:
server_sock.close()
print("[服务器] 已关闭")
# ============ 客户端端 ============
def tcp_file_client(server_host, server_port, action='list', filename=None):
"""
TCP文件传输客户端
功能:
1. 获取服务器文件列表
2. 下载文件(带进度条)
3. MD5校验
参数:
server_host: 服务器地址
server_port: 服务器端口
action: 'list' 或 'get'
filename: 要下载的文件名(action='get'时需要)
"""
# 创建TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((server_host, server_port))
print(f"[客户端] 已连接到服务器 {server_host}:{server_port}")
try:
if action == 'list':
# 获取文件列表
send_line(sock, "LIST")
response = recv_line(sock)
if response.startswith("OK"):
num_files = int(response.split()[1])
print(f"\n{'文件名':<30} {'大小':>15}")
print("-" * 50)
for _ in range(num_files):
file_info = recv_line(sock)
name, size = file_info.rsplit('|', 1)
size_str = format_size(int(size))
print(f"{name:<30} {size_str:>15}")
elif action == 'get' and filename:
# 请求下载文件
send_line(sock, f"GET {filename}")
response = recv_line(sock)
if response.startswith("OK"):
parts = response.split()
filesize = int(parts[1])
server_md5 = parts[2]
print(f"[客户端] 开始下载: {filename}")
print(f"[客户端] 文件大小: {format_size(filesize)}")
print(f"[客户端] 服务器MD5: {server_md5}")
# 下载目录
download_dir = './downloads'
if not os.path.exists(download_dir):
os.makedirs(download_dir)
save_path = os.path.join(download_dir, filename)
# 定义进度回调函数
def progress_callback(received, total, percentage):
bar_length = 40
filled = int(bar_length * percentage / 100)
bar = '#' * filled + '-' * (bar_length - filled)
print(f"\r[{bar}] {percentage:.1f}% "
f"({format_size(received)}/{format_size(total)})",
end='', flush=True)
# 直接接收文件内容(服务器在OK响应后直接发送文件)
start_time = time.time()
recv_file(sock, save_path, filesize, progress_callback)
elapsed = time.time() - start_time
print() # 换行
# 校验MD5
local_md5 = compute_md5(save_path)
print(f"[客户端] 本地MD5: {local_md5}")
print(f"[客户端] 校验结果: {'OK 匹配' if local_md5 == server_md5 else 'FAIL 不匹配'}")
print(f"[客户端] 下载完成: {save_path}")
print(f"[客户端] 耗时: {elapsed:.2f}s, "
f"速度: {filesize / elapsed / 1024:.2f} KB/s")
else:
print(f"[客户端] 错误: {response}")
elif action == 'md5' and filename:
send_line(sock, f"MD5 {filename}")
response = recv_line(sock)
print(f"[客户端] MD5: {response}")
finally:
send_line(sock, "BYE")
sock.close()
print("[客户端] 连接已关闭")
def format_size(size_bytes):
"""将字节大小转换为人类可读的格式"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 ** 2:
return f"{size_bytes / 1024:.2f} KB"
elif size_bytes < 1024 ** 3:
return f"{size_bytes / 1024 ** 2:.2f} MB"
else:
return f"{size_bytes / 1024 ** 3:.2f} GB"
def demo_file_transfer():
"""演示TCP文件传输"""
import tempfile
import shutil
# 创建临时目录和测试文件
temp_dir = tempfile.mkdtemp(prefix='tcp_file_demo_')
shared_dir = os.path.join(temp_dir, 'shared')
os.makedirs(shared_dir)
# 创建一个100KB的测试文件
test_file = os.path.join(shared_dir, 'test.txt')
with open(test_file, 'w') as f:
f.write("Hello, TCP File Transfer!\n" * 5000) # 约105KB
print(f"[演示] 创建测试文件: {test_file}")
print(f"[演示] 文件大小: {os.path.getsize(test_file)} bytes")
print(f"[演示] 文件MD5: {compute_md5(test_file)}")
# 启动服务器(在线程中)
server_thread = Thread(target=tcp_file_server,
args=('127.0.0.1', 19001, shared_dir))
server_thread.daemon = True
server_thread.start()
time.sleep(0.5) # 等待服务器启动
# 客户端获取文件列表
print("\n" + "=" * 60)
print("步骤1: 获取文件列表")
print("=" * 60)
tcp_file_client('127.0.0.1', 19001, 'list')
# 客户端下载文件
print("\n" + "=" * 60)
print("步骤2: 下载文件")
print("=" * 60)
tcp_file_client('127.0.0.1', 19001, 'get', 'test.txt')
# 清理
shutil.rmtree(temp_dir)
download_dir = './downloads'
if os.path.exists(download_dir):
shutil.rmtree(download_dir)
print("\n[演示] 临时文件已清理")
if __name__ == "__main__":
print("=" * 60)
print("TCP文件传输系统")
print("=" * 60)
print("\n使用方式:")
print("1. 启动服务器: tcp_file_server('0.0.0.0', 9001, './shared')")
print("2. 客户端列表: tcp_file_client('127.0.0.1', 9001, 'list')")
print("3. 客户端下载: tcp_file_client('127.0.0.1', 9001, 'get', 'filename')")
print("\n" + "=" * 60)
print("运行演示...")
print("=" * 60)
demo_file_transfer()
1.4 运行说明
启动服务器:
# 方式1:直接调用函数
tcp_file_server('0.0.0.0', 9001, './shared')
# 方式2:命令行
# python -c "from tcp_file_transfer import tcp_file_server; tcp_file_server()"
客户端操作:
# 获取文件列表
tcp_file_client('127.0.0.1', 9001, 'list')
# 下载文件
tcp_file_client('127.0.0.1', 9001, 'get', 'example.pdf')
服务器端输出示例:
============================================================
TCP文件传输系统
============================================================
[服务器] 创建共享目录: ./shared
[服务器] 启动成功,监听 0.0.0.0:9001
[服务器] 共享目录: /home/user/shared
[服务器] 等待客户端连接...
[服务器] 客户端 ('127.0.0.1', 54321) 已连接
[服务器] 向 ('127.0.0.1', 54321) 发送文件列表 (3 个文件)
[服务器] 客户端 ('127.0.0.1', 54322) 已连接
[服务器] 开始发送文件: report.pdf (1048576 bytes)
[服务器] 文件发送完成: report.pdf (1.23s, 832.50 KB/s)
[服务器] 客户端 ('127.0.0.1', 54322) 已断开
客户端输出示例:
[客户端] 已连接到服务器 127.0.0.1:9001
文件名 大小
--------------------------------------------------
report.pdf 1.00 MB
photo.jpg 256.00 KB
data.csv 15.50 KB
[客户端] 已连接到服务器 127.0.0.1:9001
[客户端] 开始下载: report.pdf
[客户端] 文件大小: 1.00 MB
[客户端] 服务器MD5: d41d8cd98f00b204e9800998ecf8427e
[████████████████████████████████████████] 100.0% (1.00 MB/1.00 MB)
[客户端] 本地MD5: d41d8cd98f00b204e9800998ecf8427e
[客户端] 校验结果: ✓ 匹配
[客户端] 下载完成: ./downloads/report.pdf
[客户端] 耗时: 1.23s, 速度: 832.50 KB/s
[客户端] 连接已关闭
2. 案例2:UDP聊天室
2.1 设计思路
UDP是无连接的协议,适合实时性要求高但允许少量丢包的场景,如聊天室、语音通话等。UDP聊天室的设计要点:
- 用户注册:客户端首次发送消息时自动注册
- 消息转发:服务器收到消息后转发给所有在线用户
- 心跳检测:定期检测用户是否还在线
- 用户列表:维护在线用户列表
2.2 UDP聊天室架构
2.3 完整代码实现
# ======= 案例2: UDP聊天室 =======
import socket
import threading
import time
import json
# ============ UDP聊天室服务器 ============
class UDPServer:
"""
UDP聊天室服务器
功能:
1. 接收客户端消息
2. 维护在线用户列表
3. 转发消息给所有在线用户
4. 处理用户加入/退出
5. 心跳检测
协议格式(JSON):
{
'type': 'message' | 'join' | 'leave' | 'heartbeat' | 'list',
'username': '用户名',
'content': '消息内容',
'timestamp': 1234567890
}
"""
def __init__(self, host='0.0.0.0', port=5000):
"""
初始化UDP聊天室服务器
参数:
host: 监听地址
port: 监听端口
"""
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((host, port))
# 用户管理: {addr: {'username': str, 'last_seen': timestamp}}
self.users = {}
self.lock = threading.Lock()
print(f"[服务器] UDP聊天室启动成功 {host}:{port}")
print(f"[服务器] 等待客户端连接...\n")
def handle_client(self, data, addr):
"""
处理客户端消息
参数:
data: 收到的原始数据
addr: 客户端地址 (ip, port)
"""
try:
message = json.loads(data.decode('utf-8'))
msg_type = message.get('type', 'message')
username = message.get('username', '匿名')
if msg_type == 'join':
# 新用户加入
with self.lock:
self.users[addr] = {
'username': username,
'last_seen': time.time()
}
join_msg = f"🎉 {username} 加入了聊天室! "
join_msg += f"当前在线: {len(self.users)}人"
print(f"[加入] {username} ({addr[0]}:{addr[1]})")
self.broadcast(join_msg, exclude_addr=None)
self.send_user_list(addr)
elif msg_type == 'message':
# 普通消息
with self.lock:
if addr in self.users:
self.users[addr]['last_seen'] = time.time()
else:
# 未注册的用户,自动注册
self.users[addr] = {
'username': username,
'last_seen': time.time()
}
content = message.get('content', '')
formatted_msg = f"[{username}] {content}"
print(f"[消息] {formatted_msg}")
self.broadcast(formatted_msg, exclude_addr=addr)
elif msg_type == 'leave':
# 用户离开
with self.lock:
if addr in self.users:
username = self.users[addr]['username']
del self.users[addr]
leave_msg = f"👋 {username} 离开了聊天室"
print(f"[离开] {username}")
self.broadcast(leave_msg)
elif msg_type == 'heartbeat':
# 心跳包,更新最后活跃时间
with self.lock:
if addr in self.users:
self.users[addr]['last_seen'] = time.time()
elif msg_type == 'list':
# 请求用户列表
self.send_user_list(addr)
except json.JSONDecodeError:
print(f"[错误] 收到无效数据来自 {addr}: {data[:100]}")
except Exception as e:
print(f"[错误] 处理消息时出错: {e}")
def broadcast(self, message, exclude_addr=None):
"""
广播消息给所有在线用户
参数:
message: 要广播的消息字符串
exclude_addr: 排除的地址(不发给自己)
"""
data = json.dumps({
'type': 'broadcast',
'content': message,
'timestamp': time.time()
}).encode('utf-8')
with self.lock:
users_copy = list(self.users.items())
for addr, user_info in users_copy:
if exclude_addr and addr == exclude_addr:
continue
try:
self.sock.sendto(data, addr)
except Exception as e:
print(f"[错误] 发送给 {addr} 失败: {e}")
def send_user_list(self, addr):
"""发送在线用户列表给指定客户端"""
with self.lock:
user_list = [
{'username': info['username'], 'addr': f"{a[0]}:{a[1]}"}
for a, info in self.users.items()
]
data = json.dumps({
'type': 'user_list',
'users': user_list,
'count': len(user_list)
}).encode('utf-8')
try:
self.sock.sendto(data, addr)
except Exception as e:
print(f"[错误] 发送用户列表失败: {e}")
def check_heartbeat(self):
"""心跳检测:清理超时用户"""
timeout = 60 # 60秒无心跳视为离线
while True:
time.sleep(10) # 每10秒检测一次
current_time = time.time()
expired_users = []
with self.lock:
for addr, info in list(self.users.items()):
if current_time - info['last_seen'] > timeout:
expired_users.append((addr, info['username']))
for addr, _ in expired_users:
del self.users[addr]
for addr, username in expired_users:
timeout_msg = f"⏰ {username} 连接超时,已自动离线"
print(f"[超时] {username}")
self.broadcast(timeout_msg)
def run(self):
"""启动服务器"""
# 启动心跳检测线程
heartbeat_thread = threading.Thread(target=self.check_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
print("[服务器] 开始接收消息...")
try:
while True:
data, addr = self.sock.recvfrom(4096)
# 为每个消息创建新线程处理
thread = threading.Thread(
target=self.handle_client,
args=(data, addr)
)
thread.daemon = True
thread.start()
except KeyboardInterrupt:
print("\n[服务器] 正在关闭...")
finally:
self.sock.close()
print("[服务器] 已关闭")
# ============ UDP聊天室客户端 ============
class UDPClient:
"""
UDP聊天室客户端
功能:
1. 连接到服务器
2. 发送消息
3. 接收消息(多线程)
4. 心跳保活
5. 显示在线用户
"""
def __init__(self, server_host='127.0.0.1', server_port=5000, username='匿名'):
"""
初始化UDP聊天室客户端
参数:
server_host: 服务器地址
server_port: 服务器端口
username: 用户名
"""
self.server_addr = (server_host, server_port)
self.username = username
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 绑定一个本地端口用于接收
self.sock.bind(('', 0))
self.local_addr = self.sock.getsockname()
self.running = False
self.online_users = []
def join(self):
"""加入聊天室"""
message = json.dumps({
'type': 'join',
'username': self.username,
'timestamp': time.time()
}).encode('utf-8')
self.sock.sendto(message, self.server_addr)
print(f"[客户端] {self.username} 正在加入聊天室...")
def send_message(self, content):
"""发送消息"""
message = json.dumps({
'type': 'message',
'username': self.username,
'content': content,
'timestamp': time.time()
}).encode('utf-8')
self.sock.sendto(message, self.server_addr)
def leave(self):
"""离开聊天室"""
message = json.dumps({
'type': 'leave',
'username': self.username,
'timestamp': time.time()
}).encode('utf-8')
self.sock.sendto(message, self.server_addr)
self.running = False
def send_heartbeat(self):
"""发送心跳包"""
while self.running:
message = json.dumps({
'type': 'heartbeat',
'username': self.username,
'timestamp': time.time()
}).encode('utf-8')
try:
self.sock.sendto(message, self.server_addr)
except:
break
time.sleep(30) # 每30秒发送一次心跳
def receive_messages(self):
"""接收消息的线程函数"""
while self.running:
try:
self.sock.settimeout(1.0)
data, addr = self.sock.recvfrom(4096)
message = json.loads(data.decode('utf-8'))
msg_type = message.get('type', '')
if msg_type == 'broadcast':
content = message.get('content', '')
print(f"\n💬 {content}")
print(f"[{self.username}] ", end='', flush=True)
elif msg_type == 'user_list':
self.online_users = message.get('users', [])
count = message.get('count', 0)
print(f"\n📋 在线用户 ({count}人):")
for user in self.online_users:
print(f" 👤 {user['username']}")
print(f"[{self.username}] ", end='', flush=True)
except socket.timeout:
continue
except Exception as e:
if self.running:
print(f"\n[错误] 接收消息失败: {e}")
def start_interactive(self):
"""启动交互式聊天"""
self.running = True
# 加入聊天室
self.join()
time.sleep(0.5) # 等待服务器响应
# 启动接收线程
receive_thread = threading.Thread(target=self.receive_messages)
receive_thread.daemon = True
receive_thread.start()
# 启动心跳线程
heartbeat_thread = threading.Thread(target=self.send_heartbeat)
heartbeat_thread.daemon = True
heartbeat_thread.start()
print(f"\n{'='*50}")
print(f"欢迎, {self.username}!")
print("输入消息按回车发送")
print("输入 /list 查看在线用户")
print("输入 /quit 退出")
print(f"{'='*50}\n")
try:
while self.running:
try:
user_input = input(f"[{self.username}] ")
except EOFError:
break
if not user_input:
continue
if user_input.lower() == '/quit':
self.leave()
break
elif user_input.lower() == '/list':
self.request_user_list()
else:
self.send_message(user_input)
except KeyboardInterrupt:
print("\n正在退出...")
self.leave()
finally:
self.running = False
self.sock.close()
print("已退出聊天室")
def request_user_list(self):
"""请求在线用户列表"""
message = json.dumps({
'type': 'list',
'username': self.username
}).encode('utf-8')
self.sock.sendto(message, self.server_addr)
def demo_chat_room():
"""演示UDP聊天室"""
import subprocess
import sys
print("="*60)
print("UDP聊天室演示")
print("="*60)
# 启动服务器
server = UDPServer('127.0.0.1', 15000)
server_thread = threading.Thread(target=server.run)
server_thread.daemon = True
print("[演示] 启动服务器...")
server_thread.start()
time.sleep(0.3)
# 模拟几个客户端发送消息
print("[演示] 模拟客户端交互...\n")
# 客户端1 - Alice
client1 = UDPClient('127.0.0.1', 15000, 'Alice')
client1.join()
time.sleep(0.2)
# 客户端2 - Bob
client2 = UDPClient('127.0.0.1', 15000, 'Bob')
client2.join()
time.sleep(0.2)
# 客户端3 - Carol
client3 = UDPClient('127.0.0.1', 15000, 'Carol')
client3.join()
time.sleep(0.5)
# 模拟对话
print("\n--- 模拟对话开始 ---")
client1.send_message("大家好!我是Alice")
time.sleep(0.3)
client2.send_message("嗨Alice,我是Bob")
time.sleep(0.3)
client3.send_message("你们好呀,Carol报到!")
time.sleep(0.3)
client1.send_message("这个聊天室真不错~")
time.sleep(0.3)
client2.send_message("是啊,UDP传输还挺快的")
time.sleep(0.5)
# 客户端2离开
client2.leave()
time.sleep(0.3)
client1.send_message("Bob怎么走了?")
time.sleep(0.3)
client3.send_message("可能是去吃饭了吧 😄")
time.sleep(0.5)
# 清理
client1.leave()
client3.leave()
print("\n--- 演示结束 ---")
print("\n使用说明:")
print("1. 启动服务器: server = UDPServer('0.0.0.0', 5000); server.run()")
print("2. 启动客户端: client = UDPClient('127.0.0.1', 5000, '用户名')")
print("3. 开始聊天: client.start_interactive()")
if __name__ == "__main__":
demo_chat_room()
2.4 运行说明
启动服务器:
server = UDPServer('0.0.0.0', 5000)
server.run()
启动客户端(交互式):
client = UDPClient('127.0.0.1', 5000, 'Alice')
client.start_interactive()
客户端交互命令:
- 输入消息 → 发送给所有人
/list→ 查看在线用户/quit→ 退出聊天室
输出示例:
============================================================
欢迎, Alice!
输入消息按回车发送
输入 /list 查看在线用户
输入 /quit 退出
============================================================
[Alice]
💬 🎉 Bob 加入了聊天室! 当前在线: 2人
[Alice]
💬 [Bob] 嗨Alice!
[Alice] 你好Bob!
[Alice]
💬 [Bob] 你在学什么?
[Alice] 正在学计算机网络!
[Alice]
📋 在线用户 (2人):
👤 Alice
👤 Bob
[Alice] /quit
已退出聊天室
3. 案例3:多线程并发Echo服务器
3.1 设计思路
Echo服务器是网络编程中最经典的服务器模型——客户端发送什么,服务器就回送什么。看似简单,但通过多线程和线程池的改造,可以实现高效的并发处理。
设计要点:
- 多线程处理:每个客户端连接由一个独立线程处理
- 线程池限制:使用ThreadPoolExecutor限制最大并发数
- 优雅关闭:正确处理Ctrl+C信号
- 连接统计:记录活跃连接数和处理请求数
3.2 完整代码实现
# ======= 案例3: 多线程并发Echo服务器 =======
import socket
import threading
import time
import signal
import sys
from concurrent.futures import ThreadPoolExecutor
class ThreadedEchoServer:
"""
多线程并发Echo服务器
功能:
1. 接收客户端连接
2. 为每个客户端创建独立线程
3. 线程池限制并发数
4. 统计连接信息
5. 优雅关闭
Echo协议:
- 客户端发送任意文本
- 服务器回送相同文本(Echo)
- 发送 'QUIT' 断开连接
"""
def __init__(self, host='0.0.0.0', port=9002, max_workers=10):
"""
初始化Echo服务器
参数:
host: 监听地址
port: 监听端口
max_workers: 线程池最大工作线程数
"""
self.host = host
self.port = port
self.max_workers = max_workers
# 创建TCP socket
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_sock.bind((host, port))
self.server_sock.listen(max_workers)
# 线程池
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# 统计信息
self.stats = {
'total_connections': 0,
'active_connections': 0,
'total_messages': 0,
'total_bytes': 0
}
self.stats_lock = threading.Lock()
self.running = True
# 设置信号处理
signal.signal(signal.SIGINT, self._signal_handler)
def _signal_handler(self, signum, frame):
"""处理Ctrl+C信号"""
print("\n[服务器] 收到关闭信号,正在优雅关闭...")
self.running = False
self.server_sock.close()
def _update_stats(self, key, delta=1):
"""线程安全地更新统计"""
with self.stats_lock:
self.stats[key] += delta
def _get_stats(self):
"""获取当前统计信息副本"""
with self.stats_lock:
return dict(self.stats)
def handle_client(self, client_sock, addr):
"""
处理单个客户端连接
参数:
client_sock: 客户端socket
addr: 客户端地址
"""
self._update_stats('total_connections')
self._update_stats('active_connections')
client_id = f"{addr[0]}:{addr[1]}"
print(f"[连接] 客户端 {client_id} 已连接 "
f"(活跃: {self._get_stats()['active_connections']})")
try:
client_sock.settimeout(60) # 60秒超时
while self.running:
# 接收数据
data = client_sock.recv(4096)
if not data:
break
message = data.decode('utf-8').strip()
self._update_stats('total_messages')
self._update_stats('total_bytes', len(data))
# 处理特殊命令
if message.upper() == 'QUIT':
client_sock.sendall(b'Goodbye!\n')
break
elif message.upper() == 'STATS':
stats = self._get_stats()
response = (f"服务器统计:\n"
f"总连接数: {stats['total_connections']}\n"
f"活跃连接: {stats['active_connections']}\n"
f"总消息数: {stats['total_messages']}\n"
f"总字节数: {stats['total_bytes']}\n")
client_sock.sendall(response.encode('utf-8'))
elif message.upper() == 'TIME':
response = f"服务器时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n"
client_sock.sendall(response.encode('utf-8'))
else:
# Echo回送
response = f"Echo: {message}\n"
client_sock.sendall(response.encode('utf-8'))
except socket.timeout:
print(f"[超时] 客户端 {client_id} 连接超时")
except ConnectionResetError:
print(f"[断开] 客户端 {client_id} 强制断开")
except Exception as e:
if self.running:
print(f"[错误] 处理客户端 {client_id}: {e}")
finally:
client_sock.close()
self._update_stats('active_connections', -1)
print(f"[断开] 客户端 {client_id} 已断开 "
f"(活跃: {self._get_stats()['active_connections']})")
def run(self):
"""启动服务器"""
print(f"[服务器] Echo服务器启动 {self.host}:{self.port}")
print(f"[服务器] 最大并发数: {self.max_workers}")
print(f"[服务器] 按 Ctrl+C 关闭服务器\n")
try:
while self.running:
try:
client_sock, addr = self.server_sock.accept()
# 提交到线程池
self.executor.submit(self.handle_client, client_sock, addr)
except OSError:
# socket已关闭
break
except Exception as e:
if self.running:
print(f"[错误] 服务器异常: {e}")
finally:
self.shutdown()
def shutdown(self):
"""优雅关闭服务器"""
print("\n[服务器] 正在关闭...")
self.running = False
# 关闭socket
try:
self.server_sock.close()
except:
pass
# 关闭线程池
self.executor.shutdown(wait=True)
# 打印最终统计
stats = self._get_stats()
print(f"\n{'='*40}")
print("服务器运行统计")
print(f"{'='*40}")
print(f"总连接数: {stats['total_connections']}")
print(f"总消息数: {stats['total_messages']}")
print(f"总字节数: {stats['total_bytes']}")
print(f"{'='*40}")
print("[服务器] 已关闭")
# ============ Echo客户端 ============
def echo_client(server_host='127.0.0.1', server_port=9002, messages=None):
"""
Echo客户端
参数:
server_host: 服务器地址
server_port: 服务器端口
messages: 要发送的消息列表(None则交互式输入)
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((server_host, server_port))
client_addr = sock.getsockname()
print(f"[客户端 {client_addr[1]}] 已连接到服务器")
try:
if messages:
# 批量发送消息
for msg in messages:
sock.sendall((msg + '\n').encode('utf-8'))
data = sock.recv(4096)
print(f"[客户端 {client_addr[1]}] 发送: {msg}")
print(f"[客户端 {client_addr[1]}] 收到: {data.decode('utf-8').strip()}")
time.sleep(0.3)
# 发送QUIT
sock.sendall(b'QUIT\n')
data = sock.recv(4096)
print(f"[客户端 {client_addr[1]}] 收到: {data.decode('utf-8').strip()}")
else:
# 交互式模式
print("输入消息发送给服务器,输入 QUIT 退出")
while True:
user_input = input("> ")
sock.sendall((user_input + '\n').encode('utf-8'))
if user_input.upper() == 'QUIT':
break
data = sock.recv(4096)
print(f"< {data.decode('utf-8').strip()}")
finally:
sock.close()
print(f"[客户端 {client_addr[1]}] 已断开")
def demo_echo_server():
"""演示多线程并发Echo服务器"""
print("="*60)
print("多线程并发Echo服务器演示")
print("="*60)
# 创建服务器
server = ThreadedEchoServer('127.0.0.1', 19002, max_workers=5)
# 启动服务器
server_thread = threading.Thread(target=server.run)
server_thread.daemon = True
server_thread.start()
time.sleep(0.5)
# 模拟多个客户端同时连接
print("\n[演示] 模拟5个客户端同时连接...\n")
client_messages = [
["Hello", "World", "STATS", "TIME"],
["你好", "服务器", "Echo测试"],
["Client 3 here", "Testing"],
["Stats please", "STATS"],
["Goodbye", "QUIT"],
]
threads = []
for i, messages in enumerate(client_messages):
t = threading.Thread(
target=echo_client,
args=('127.0.0.1', 19002, messages)
)
t.start()
threads.append(t)
time.sleep(0.1) # 稍微错开连接时间
# 等待所有客户端完成
for t in threads:
t.join()
time.sleep(0.5)
print(f"\n{'='*60}")
print("演示完成!")
print(f"{'='*60}")
# 关闭服务器
server.shutdown()
if __name__ == "__main__":
demo_echo_server()
3.3 运行说明
启动服务器:
server = ThreadedEchoServer('0.0.0.0', 9002, max_workers=10)
server.run()
启动客户端:
echo_client('127.0.0.1', 9002) # 交互式模式
# 或批量发送消息
echo_client('127.0.0.1', 9002, ['Hello', 'STATS', 'QUIT'])
服务器端输出示例:
[服务器] Echo服务器启动 0.0.0.0:9002
[服务器] 最大并发数: 10
[服务器] 按 Ctrl+C 关闭服务器
[连接] 客户端 127.0.0.1:54321 已连接 (活跃: 1)
[连接] 客户端 127.0.0.1:54322 已连接 (活跃: 2)
[断开] 客户端 127.0.0.1:54321 已断开 (活跃: 1)
[连接] 客户端 127.0.0.1:54323 已连接 (活跃: 2)
========================================
服务器运行统计
========================================
总连接数: 5
总消息数: 15
总字节数: 1200
========================================
[服务器] 已关闭
3.4 三种服务器模型对比
| 模型 | 代码复杂度 | 并发能力 | 资源消耗 | 适用场景 |
|---|---|---|---|---|
| 单线程 | 简单 | 无并发 | 低 | 单次服务 |
| 每连接一线程 | 中等 | 高 | 中 | 中小型服务 |
| 线程池 | 中等 | 可控 | 可控 | 生产环境 |
| 异步IO | 复杂 | 极高 | 极低 | 高并发服务 |
4. TCP vs UDP 实战选择指南
| 应用场景 | 推荐协议 | 原因 |
|---|---|---|
| 文件传输 | TCP | 需要可靠传输、完整性校验 |
| Web浏览 | TCP | HTTP/HTTPS基于TCP |
| 邮件传输 | TCP | SMTP/POP3/IMAP基于TCP |
| 实时聊天 | UDP/TCP | 低延迟优先,可容忍少量丢包 |
| 在线游戏 | UDP | 低延迟,状态可周期性同步 |
| DNS查询 | UDP | 请求响应小,一次交互完成 |
| 视频直播 | UDP | 实时性优先,可丢帧 |
| 远程登录 | TCP | SSH/Telnet需要可靠传输 |
5. 考研要点回顾
5.1 TCP vs UDP 核心对比
| 特性 | TCP | UDP |
|---|---|---|
| 连接方式 | 面向连接 | 无连接 |
| 可靠性 | 可靠(确认、重传) | 不可靠 |
| 有序性 | 有序(序号机制) | 无序 |
| 流量控制 | 有(滑动窗口) | 无 |
| 拥塞控制 | 有 | 无 |
| 首部开销 | 20字节 | 8字节 |
| 传输效率 | 较低 | 较高 |
| 应用场景 | 文件传输、网页浏览 | 视频流、DNS、游戏 |
5.2 三个案例对应的TCP/UDP特性
| 案例 | 使用协议 | 体现的核心特性 |
|---|---|---|
| 文件传输 | TCP | 可靠性、流量控制、面向连接 |
| 聊天室 | UDP | 低延迟、无连接、广播 |
| Echo服务器 | TCP | 并发处理、面向连接 |
6. 本节小结
通过这三个综合案例,我们从理论走向了实践:
-
TCP文件传输 让我们理解了如何利用TCP的可靠性进行大文件传输,包括分块传输、进度显示和MD5完整性校验。
-
UDP聊天室 展示了无连接协议的高效性,以及如何在应用层实现用户管理和消息广播。
-
多线程Echo服务器 演示了生产环境中如何处理并发连接,线程池是限制资源消耗的关键。
学习建议:把这三个案例的代码亲自运行一遍,尝试修改参数、添加新功能(如文件上传、私聊功能、断点续传等),这样才能真正掌握Python网络编程。
更多推荐


所有评论(0)