别再死记硬背了!用Python模拟RDT协议(可靠数据传输)的发送与接收状态机
·
用Python构建RDT协议状态机:从理论到代码的深度实践
在计算机网络的世界里,可靠数据传输(RDT)协议是构建稳定通信的基石。但教科书上的状态转换图往往让学习者感到抽象难懂——直到我们用代码将其具象化。本文将带你用Python实现rdt3.0协议的发送方和接收方状态机,通过可运行的代码演示停等协议、序列号校验和超时重传等核心机制。不同于单纯的理论讲解,我们会用面向对象的设计模式构建可扩展的FSM框架,并模拟比特差错和丢包场景,让协议行为变得肉眼可见。
1. 环境准备与基础架构
1.1 协议核心组件设计
我们先定义协议中的关键数据结构。使用Python的dataclass简化报文结构的定义:
from dataclasses import dataclass
from enum import Enum, auto
import random
class PacketType(Enum):
DATA = auto()
ACK = auto()
@dataclass
class Packet:
seq_num: int # 0或1的序列号
data: str # 实际传输的数据
checksum: int # 校验和
ptype: PacketType = PacketType.DATA
def is_corrupted(self) -> bool:
"""模拟比特差错检测"""
return random.random() < 0.3 # 30%概率模拟比特差错
1.2 有限状态机基类实现
状态机的核心是状态转换逻辑。我们构建一个可复用的FSM基类:
class FSM:
def __init__(self):
self.current_state = None
def transition(self, event):
"""处理事件并转换状态"""
handler = getattr(self, f"state_{self.current_state}", None)
if not handler:
raise RuntimeError(f"未定义状态处理函数: {self.current_state}")
next_state = handler(event)
self.current_state = next_state or self.current_state
def start(self):
"""启动状态机"""
self.current_state = "initial"
self.transition("startup")
2. 发送方状态机实现
2.1 发送方状态定义
发送方需要处理三种主要事件:
- 上层应用调用rdt_send()
- 收到ACK/NAK
- 定时器超时
class SenderFSM(FSM):
def __init__(self):
super().__init__()
self.buffer = None # 数据缓存
self.seq_num = 0 # 当前序列号
self.timer = None # 模拟定时器
self.timeout = 2 # 超时时间(秒)
def state_wait_call(self, event):
"""等待上层调用状态"""
if event == "rdt_send":
data = input("输入要发送的数据: ")
packet = Packet(self.seq_num, data, compute_checksum(data))
send_to_channel(packet)
start_timer(self.timeout)
return "wait_ack"
def state_wait_ack(self, event):
"""等待ACK状态"""
if event == "timeout":
print("[超时] 重传数据包")
send_to_channel(self.buffer)
start_timer(self.timeout)
return "wait_ack"
elif isinstance(event, Packet) and event.ptype == PacketType.ACK:
if event.is_corrupted():
print("[损坏的ACK] 忽略并等待超时")
return "wait_ack"
if event.seq_num == self.seq_num:
print("[正确ACK] 准备发送下一数据包")
self.seq_num ^= 1 # 切换序列号
stop_timer()
return "wait_call"
else:
print("[过时ACK] 可能是冗余ACK,忽略")
return "wait_ack"
2.2 定时器模拟实现
在真实网络中,定时器是异步执行的。我们用简单线程模拟:
import threading
def start_timer(timeout):
print(f"⏰ 定时器启动,{timeout}秒后超时")
# 实际实现中应使用异步回调
timer = threading.Timer(timeout, lambda: sender_fsm.transition("timeout"))
timer.start()
return timer
def stop_timer():
print("⏹️ 定时器停止")
3. 接收方状态机实现
3.1 接收方核心逻辑
接收方需要处理数据包到达事件,并执行校验和序列号检查:
class ReceiverFSM(FSM):
def __init__(self):
super().__init__()
self.expected_seq = 0 # 期望的序列号
def state_wait_packet(self, event):
"""等待数据包状态"""
if isinstance(event, Packet) and event.ptype == PacketType.DATA:
if event.is_corrupted():
print("[损坏数据包] 发送上次ACK")
send_ack(self.expected_seq ^ 1) # 发送冗余ACK
return "wait_packet"
if event.seq_num == self.expected_seq:
print(f"[正确数据] 交付数据: {event.data}")
self.expected_seq ^= 1
send_ack(event.seq_num)
return "wait_packet"
else:
print("[冗余数据包] 已处理过,发送ACK")
send_ack(event.seq_num)
return "wait_packet"
3.2 校验和计算
实现简单的校验和算法用于差错检测:
def compute_checksum(data: str) -> int:
"""计算16位校验和"""
total = 0
for char in data:
total += ord(char)
total = (total & 0xffff) + (total >> 16) # 回卷处理
return ~total & 0xffff # 取反
4. 信道模拟与集成测试
4.1 模拟不可靠信道
我们创建信道模拟函数,引入随机丢包和比特差错:
def send_to_channel(packet):
"""模拟不可靠信道传输"""
if random.random() < 0.2: # 20%丢包率
print("💢 信道丢包!数据包丢失")
return
# 传递到接收方
if packet.ptype == PacketType.DATA:
receiver_fsm.transition(packet)
else:
sender_fsm.transition(packet)
def send_ack(seq_num):
"""发送ACK包"""
ack = Packet(seq_num, "", 0, PacketType.ACK)
print(f"⬆️ 发送ACK({seq_num})")
send_to_channel(ack)
4.2 端到端测试案例
让我们模拟一个完整的通信流程:
# 初始化状态机
sender_fsm = SenderFSM()
receiver_fsm = ReceiverFSM()
# 启动状态机
sender_fsm.start()
receiver_fsm.start()
# 模拟发送过程
print("\n=== 测试案例1: 正常传输 ===")
sender_fsm.transition("rdt_send") # 输入"Hello"
print("\n=== 测试案例2: ACK丢失 ===")
sender_fsm.transition("rdt_send") # 输入"World"
print("\n=== 测试案例3: 数据包损坏 ===")
sender_fsm.transition("rdt_send") # 输入"Python"
典型输出结果示例:
=== 测试案例1: 正常传输 ===
输入要发送的数据: Hello
⏰ 定时器启动,2秒后超时
[正确数据] 交付数据: Hello
⬆️ 发送ACK(0)
[正确ACK] 准备发送下一数据包
⏹️ 定时器停止
=== 测试案例2: ACK丢失 ===
输入要发送的数据: World
⏰ 定时器启动,2秒后超时
[正确数据] 交付数据: World
⬆️ 发送ACK(1)
💢 信道丢包!数据包丢失
[超时] 重传数据包
[冗余数据包] 已处理过,发送ACK
⬆️ 发送ACK(1)
[正确ACK] 准备发送下一数据包
⏹️ 定时器停止
5. 高级扩展与优化建议
5.1 滑动窗口协议改造
当前实现基于停等协议,效率较低。可以扩展为滑动窗口协议:
class SlidingWindowSender(FSM):
def __init__(self, window_size=4):
super().__init__()
self.window_size = window_size
self.base_seq = 0
self.next_seq = 0
self.packets = {} # 已发送未确认的包
def send_packet(self, seq_num, data):
"""发送单个数据包"""
packet = Packet(seq_num, data, compute_checksum(data))
self.packets[seq_num] = packet
send_to_channel(packet)
if self.base_seq == self.next_seq: # 窗口内第一个包
start_timer(self.timeout)
5.2 性能指标监控
添加吞吐量和时延监控:
class PerformanceMonitor:
def __init__(self):
self.sent_packets = 0
self.retransmissions = 0
self.start_time = time.time()
@property
def throughput(self):
elapsed = time.time() - self.start_time
return self.sent_packets / max(elapsed, 1e-6)
@property
def loss_rate(self):
return self.retransmissions / max(self.sent_packets, 1)
5.3 可视化状态转换
使用graphviz生成状态转换图:
from graphviz import Digraph
def visualize_fsm(fsm_class):
dot = Digraph()
# 通过反射获取状态处理方法
methods = [m for m in dir(fsm_class) if m.startswith('state_')]
for method in methods:
state = method[6:] # 移除'state_'前缀
dot.node(state)
# 添加转换边(需根据实际代码补充)
dot.edge('wait_call', 'wait_ack', label='rdt_send')
return dot
在实现RDT协议时,最常遇到的坑是未能正确处理冗余ACK和序列号回绕。一个实用的调试技巧是在每个状态转换时打印完整的上下文信息,包括当前序列号、期望序列号和缓存内容。对于更复杂的网络模拟,可以考虑使用像ns-3这样的专业网络模拟器,但我们的Python实现已经足够展示协议的核心机制。
更多推荐

所有评论(0)