go的tcp的读写超时
通过分析读超时,我们发现了go作者将epoll集成到了runtime中,go的chan和网络io就很自然的通过gopark和goready来控制它的状态,可以通过它的waitReasonIOWait和waitReasonChanReceiveNilChan等来区分是哪一种阻塞,真的很巧妙,毕竟协程的gopark阻塞在用户态,没进入内核,开销小,要将所有的集成到里面,还是需要一番考量的,读超时就是利
在我们写tcp编程的时候,是不是经常写SetReadDeadLine,SetWriteDeadLine,超时会返回"i/o timeout"的错误,那这个错误和内部的机制都是怎么实现的了,我现在以go1.24.0,os=linux来讲解
1.SetReadDeadLine的调用
我们以读为例子,这个函数最终会调用netpoll.go文件的
// 通过go:linkname指令将当前函数与internal/poll包中的runtime_pollSetDeadline函数关联
// 这允许跨包访问内部函数,是Go运行时内部实现的常见做法
//go:linkname poll_runtime_pollSetDeadline internal/poll.runtime_pollSetDeadline
func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
// 对pollDesc加锁,确保并发安全,避免多goroutine同时修改其状态
lock(&pd.lock)
// 如果当前I/O描述符正在关闭,则无需设置截止时间,解锁后直接返回
if pd.closing {
unlock(&pd.lock)
return
}
// 保存当前的读写截止时间和组合状态,用于后续比较是否有变化
rd0, wd0 := pd.rd, pd.wd // 保存原始的读写截止时间
combo0 := rd0 > 0 && rd0 == wd0 // 原始状态:读写截止时间是否相同且有效
// 处理截止时间:将相对时间转换为绝对时间
if d > 0 {
// 加上当前系统纳秒时间,将相对时间(d纳秒后)转为绝对时间(具体时间点)
d += nanotime()
// 检查是否溢出(如果计算后时间 <=0,说明溢出)
if d <= 0 {
// 溢出时设置为int64能表示的最大时间(避免无效值)
d = 1<<63 - 1
}
}
// 根据模式更新pollDesc中的读写截止时间
if mode == 'r' || mode == 'r'+'w' {
pd.rd = d // 更新读操作截止时间
}
if mode == 'w' || mode == 'r'+'w' {
pd.wd = d // 更新写操作截止时间
}
// 发布更新后的信息,通知其他依赖该状态的组件(如轮询器)
pd.publishInfo()
// 检查当前读写截止时间是否相同且有效(用于判断是否可以合并计时器)
combo := pd.rd > 0 && pd.rd == pd.wd
// 默认为读截止时间类型,若读写截止时间相同则使用组合类型
rtf := netpollReadDeadline
if combo {
rtf = netpollDeadline // 组合类型:读写超时使用同一个计时器
}
// 处理读操作计时器
if !pd.rrun { // 读计时器当前未运行
if pd.rd > 0 { // 读截止时间有效(已设置且为未来时间)
// 修改读计时器:设置绝对截止时间、类型、关联参数和序列号
// 序列号用于验证计时器有效性,避免处理已过时的超时事件
pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
pd.rrun = true // 标记读计时器已运行
}
} else if pd.rd != rd0 || combo != combo0 {
// 读截止时间已改变,或组合状态已改变(需要更新计时器)
pd.rseq++ // 递增序列号,使旧计时器失效
if pd.rd > 0 {
// 更新读计时器为新的截止时间和参数
pd.rt.modify(pd.rd, 0, rtf, pd.makeArg(), pd.rseq)
} else {
// 读截止时间无效,停止计时器并标记为未运行
pd.rt.stop()
pd.rrun = false
}
}
// 处理写操作计时器(逻辑类似读计时器,但仅在非组合状态下生效)
if !pd.wrun { // 写计时器当前未运行
// 写截止时间有效且不与读截止时间组合(避免重复计时)
if pd.wd > 0 && !combo {
// 修改写计时器:设置绝对截止时间、类型、关联参数和序列号
pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
pd.wrun = true // 标记写计时器已运行
}
} else if pd.wd != wd0 || combo != combo0 {
// 写截止时间已改变,或组合状态已改变(需要更新计时器)
pd.wseq++ // 递增序列号,使旧计时器失效
if pd.wd > 0 && !combo {
// 更新写计时器为新的截止时间和参数
pd.wt.modify(pd.wd, 0, netpollWriteDeadline, pd.makeArg(), pd.wseq)
} else {
// 写截止时间无效,停止计时器并标记为未运行
pd.wrun = false
pd.wt.stop()
}
}
// 处理已超时的情况:如果截止时间已过,唤醒阻塞的I/O操作
delta := int32(0) // 用于调整等待者计数的 delta 值
var rg, wg *g // 被阻塞的读/写goroutine
// 检查读截止时间是否已过期(rd < 0 表示立即超时)
if pd.rd < 0 {
// 解除读操作阻塞,获取被阻塞的goroutine并更新delta
rg = netpollunblock(pd, 'r', false, &delta)
}
// 检查写截止时间是否已过期(wd < 0 表示立即超时)
if pd.wd < 0 {
// 解除写操作阻塞,获取被阻塞的goroutine并更新delta
wg = netpollunblock(pd, 'w', false, &delta)
}
// 完成所有状态修改后解锁
unlock(&pd.lock)
// 唤醒被解除阻塞的读goroutine(优先级3)
if rg != nil {
netpollgoready(rg, 3)
}
// 唤醒被解除阻塞的写goroutine(优先级3)
if wg != nil {
netpollgoready(wg, 3)
}
// 调整I/O等待者的计数,维护运行时内部状态
netpollAdjustWaiters(delta)
}
其中pollDesc是核心结构体,这个解释已经很清楚了
// pollDesc 是 Go 运行时中用于管理 I/O 对象(如文件描述符、网络连接)的「轮询描述符」结构体
// 核心职责:跟踪 I/O 操作的状态(读写阻塞、超时计时器、关闭状态等),是 runtime 与系统 I/O 轮询(如 epoll/kqueue)交互的核心载体
type pollDesc struct {
// _ 是内存布局占位符,sys.NotInHeap 标记该结构体**不能分配在 Go 堆上**
// 原因:pollDesc 与系统级 I/O 资源强绑定,需避免 GC 移动其地址(可能导致系统调用出错),通常分配在特殊内存区域
_ sys.NotInHeap
// link:用于将当前 pollDesc 链接到 pollcache(pollDesc 缓存池)的链表指针
// 访问保护:受 pollcache.lock 互斥锁保护,确保缓存池的并发安全操作(如获取/放回 pollDesc)
link *pollDesc
// fd:关联的系统级文件描述符(如 socket 句柄、文件句柄),uintptr 类型适配不同系统的指针宽度
// 生命周期:在 pollDesc 被初始化后,fd 的值始终不变(直到 pollDesc 被销毁)
fd uintptr
// fdseq:原子操作的无符号指针,用于**防止访问「过期的 pollDesc」**
// 原理:fd 被复用(如关闭后重新打开)时,fdseq 会递增;操作 pollDesc 前需校验 fdseq 与当前 fd 的序列是否匹配,避免操作已失效的 I/O 资源
fdseq atomic.Uintptr
// atomicInfo:原子操作的 32 位无符号整数,存储「I/O 状态摘要」
// 核心作用:为 netpollcheckerr(无法加锁的函数)提供非阻塞的状态查询,避免锁竞争
// 存储内容(通过位运算打包):
// 1. closing 状态位:标记 pollDesc 是否正在关闭
// 2. rd/wd 状态位:标记读写截止时间是否有效/过期
// 3. eventErr 状态位:标记 I/O 轮询是否检测到错误(如连接重置)
// 读写规则:
// - 写操作:必须在持有 pollDesc.lock 时修改,且修改后需调用 publishInfo() 将状态同步到 atomicInfo
// - 读操作:无锁,通过原子加载直接获取(如 netpollcheckerr 快速判断 I/O 是否可用)
atomicInfo atomic.Uint32
// rg/wg:原子操作的无符号指针,存储「等待 I/O 的 goroutine」
// 取值范围:
// - pdNil(空指针):无 goroutine 等待
// - pdWait:标记「正在等待 I/O 就绪」的状态(避免重复阻塞)
// - G 指针:等待读/写 I/O 就绪的 goroutine(唤醒时直接调度该 goroutine)
// 设计目的:用 atomic.Uintptr 包装 goroutine 指针,适配原子操作(类似 runtime 中的 guintptr 类型)
rg atomic.Uintptr // 等待「读 I/O 就绪」的 goroutine
wg atomic.Uintptr // 等待「写 I/O 就绪」的 goroutine
// lock:互斥锁,保护下方所有非原子字段的并发访问
// 注意:所有修改 closing/rrun/wrun 等字段的操作,必须先持有该锁
lock mutex
// closing:标记 pollDesc 是否正在关闭(如 fd 被关闭时设为 true)
// 作用:避免在关闭过程中执行新的 I/O 操作(如设置截止时间、启动计时器)
closing bool
// rrun/wrun:标记「读/写计时器是否正在运行」
// 与 rt/wt 计时器绑定:rrun=true 表示 rt 计时器已启动,false 表示未启动/已停止
rrun bool // rt(读计时器)的运行状态
wrun bool // wt(写计时器)的运行状态
// user:用户可设置的 32 位标记(cookie),用于自定义扩展(如关联业务层状态)
// 运行时不主动修改该字段,完全由用户代码(如 internal/poll 包)控制
user uint32
// rseq/wseq:无符号指针,用于「防止处理过期的计时器事件」
// 原理:每次修改读/写计时器(如更新截止时间)时,对应的 seq 会递增;
// 计时器触发时,会校验 seq 与当前 pollDesc 的 seq 是否一致,不一致则忽略该事件(说明计时器已被重置)
rseq uintptr // 保护读计时器(rt)的序列号
wseq uintptr // 保护写计时器(wt)的序列号
// rt/wt:读/写操作的截止时间计时器(runtime 内部 timer 类型)
// 功能:到达截止时间时,触发超时逻辑(唤醒阻塞的 goroutine,终止 I/O 操作)
rt timer // 读操作的超时计时器
wt timer // 写操作的超时计时器
// rd/wd:读/写操作的截止时间(单位:纳秒,绝对时间,即从 epoch 到超时时间的总纳秒数)
// 取值规则:
// - >0:有效截止时间(未来的某个时间点)
// - <=0:已过期(-1 表示主动设置为过期,0 或负数表示超时已触发)
rd int64 // 读操作截止时间
wd int64 // 写操作截止时间
// self:指向当前 pollDesc 的指针,用于「间接传递 pollDesc 接口」
// 核心用途:在计时器事件(rt/wt)触发时,通过 self 传递当前 pollDesc 到回调函数
// 背景:runtime.timer 的回调参数是固定格式,self 用于包装 pollDesc 自身,避免参数传递限制(见 (*pollDesc).makeArg() 方法)
self *pollDesc
}
通过上面的代码的注释,我们知道这段代码就是设置定时器,设置超时时间和函数,我们以读为例,就是超时的回调函数就是netpollReadDeadline
func netpollReadDeadline(arg any, seq uintptr, delta int64) {
netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
}
// netpolldeadlineimpl 是 Go 运行时中处理 I/O 截止时间超时事件的核心函数
// 当读/写操作的截止时间到达时,该函数会被计时器(rt/wt)回调,执行超时逻辑:
// 1. 标记 I/O 操作已超时
// 2. 解除阻塞的 goroutine
// 3. 维护运行时状态一致性
// 参数说明:
// - pd: 指向 pollDesc 的指针,代表当前 I/O 对象的轮询描述符
// - seq: 计时器创建时的序列号(rseq 或 wseq),用于校验计时器有效性
// - read: 是否为读操作超时
// - write: 是否为写操作超时
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
// 对 pollDesc 加锁,确保超时处理过程中状态修改的并发安全
lock(&pd.lock)
// 校验计时器的序列号是否有效(防止处理过期的计时器事件)
// 序列号是在设置计时器时记录的 rseq(读)或 wseq(写)
currentSeq := pd.rseq // 默认取读序列号
if !read {
currentSeq = pd.wseq // 若为写操作,取写序列号
}
// 若传入的序列号与当前序列号不匹配,说明计时器已被重置或 pollDesc 已复用
// 此时忽略该超时事件(避免处理过时的超时)
if seq != currentSeq {
unlock(&pd.lock) // 解锁后直接返回
return
}
// delta 用于记录等待者数量的变化,供后续调整运行时状态
delta := int32(0)
var rg *g // 被阻塞的读操作 goroutine
// 处理读操作超时
if read {
// 校验状态一致性:若读截止时间未过期(rd>0)或读计时器未运行(!rrun),则属于异常状态
// 这种情况理论上不应发生,抛出异常便于调试
if pd.rd <= 0 || !pd.rrun {
throw("runtime: inconsistent read deadline")
}
// 将读截止时间标记为已过期(-1 表示超时)
pd.rd = -1
// 发布更新,将 rd 的状态同步到 atomicInfo(供无锁查询)
pd.publishInfo()
// 解除读操作的阻塞状态,获取被阻塞的 goroutine,并更新等待者计数 delta
rg = netpollunblock(pd, 'r', false, &delta)
}
var wg *g // 被阻塞的写操作 goroutine
// 处理写操作超时
if write {
// 校验状态一致性:若写截止时间未过期(wd>0)或写计时器未运行(!wrun)且非读写同时超时,属于异常
if pd.wd <= 0 || !pd.wrun && !read {
throw("runtime: inconsistent write deadline")
}
// 将写截止时间标记为已过期(-1 表示超时)
pd.wd = -1
// 发布更新,将 wd 的状态同步到 atomicInfo
pd.publishInfo()
// 解除写操作的阻塞状态,获取被阻塞的 goroutine,并更新等待者计数 delta
wg = netpollunblock(pd, 'w', false, &delta)
}
// 完成超时状态修改后解锁
unlock(&pd.lock)
// 唤醒被解除阻塞的读操作 goroutine(优先级 0)
if rg != nil {
netpollgoready(rg, 0)
}
// 唤醒被解除阻塞的写操作 goroutine(优先级 0)
if wg != nil {
netpollgoready(wg, 0)
}
// 调整 I/O 等待者的计数,维护运行时内部状态的一致性
netpollAdjustWaiters(delta)
}
func (pd *pollDesc) publishInfo() {
var info uint32
if pd.closing {
info |= pollClosing
}
if pd.rd < 0 {
info |= pollExpiredReadDeadline
}
if pd.wd < 0 {
info |= pollExpiredWriteDeadline
}
info |= uint32(pd.fdseq.Load()&pollFDSeqMask) << pollFDSeq
// Set all of x except the pollEventErr bit.
x := pd.atomicInfo.Load()
for !pd.atomicInfo.CompareAndSwap(x, (x&pollEventErr)|info) {
x = pd.atomicInfo.Load()
}
}
func netpollgoready(gp *g, traceskip int) {
goready(gp, traceskip+1)
}
从上面的代码可以看出当定时器超时后,执行pd.rd = -1,将pollDesc的原子字段atomicInfo加上pollExpiredReadDeadline(4)的错误位,通过netpollunblock(这个函数我们先不分析)得到阻塞的rg的协程,然后goready唤醒它,它是怎么唤醒的了,我们进行读来分析
2.Read的调用
// Read 从文件描述符(FD)中读取数据到字节切片p,返回读取的字节数和可能的错误
// 实现了io.Reader接口,是Go中文件、网络连接等I/O对象读取操作的底层入口
func (fd *FD) Read(p []byte) (int, error) {
// 获取读锁,确保同一时间只有一个读操作执行(并发安全控制)
// 读锁与写锁互斥,避免读写冲突
if err := fd.readLock(); err != nil {
return 0, err
}
// 延迟释放读锁,确保函数退出时无论是否发生错误都能解锁
defer fd.readUnlock()
// 处理空切片读取:直接返回0和nil
// 避免调用系统级Read时返回0,nil被误解为io.EOF(文件结束)
if len(p) == 0 {
return 0, nil
}
// 准备读操作:初始化pollDesc状态,检查是否已关闭或有未处理的错误
// isFile标记是否为文件类型(文件与网络套接字的处理逻辑略有不同)
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
// 对于流式I/O(如网络连接),限制单次读取的最大字节数(maxRW通常为4KB)
// 避免单次读取过大导致的性能问题或系统限制
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
// 循环读取数据:处理非阻塞I/O的重试逻辑
for {
// 调用系统级Read函数读取数据,忽略EINTR错误(系统调用被信号中断可重试)
// ignoringEINTRIO会自动重试被中断的系统调用
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0 // 读取失败时字节数记为0
// 处理EAGAIN错误(资源暂时不可用,非阻塞I/O的典型情况)
// 同时检查该FD是否支持轮询(pollable)
if err == syscall.EAGAIN && fd.pd.pollable() {
// 等待读操作就绪(阻塞当前goroutine,直到数据可读取或超时)
// 若等待成功(无错误),则继续循环重试读取
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
// 处理EOF错误:根据读取字节数和错误类型判断是否为真正的文件结束
err = fd.eofError(n, err)
return n, err
}
}
func ignoringEINTRIO(fn func(fd int, p []byte) (int, error), fd int, p []byte) (int, error) {
for {
n, err := fn(fd, p)
if err != syscall.EINTR {
return n, err
}
}
}
首先读写fd要加锁,为什么要加锁,有fd串话,和tcp流并发错误等等,要加锁,而且要忽略EINTR等中断信号,accept,write等等一些函数都会收到这个中断信号,但是这个不是错误,我们计算机最基础的书籍unix的环境高级编程就能找到答案
err == syscall.EAGAIN 而这个错误,要阻塞,这个代表的是,对于读来说内核buffer没有数据要读了,对于写来说就是buffer满了,要阻塞等待了(这里强调一点,一般阻塞io对于这种情况是直接阻塞的,非阻塞io会返回这个错误让你不阻塞,但是这里为什么我们还要阻塞它了,这就不得不说go,由于go的协程开销小,可以直接阻塞,防止cpu空转,而且go的作者要想将epoll像chan一样集成到runtime里,用gopark和goready来控制它,所以这里就先阻塞),只有等到内核buffer有数据来的时候,就通知你,处理了,那它是谁来通知了,它就是epoll
我们接着分析,它的waitRead函数
func convertErr(res int, isFile bool) error {
switch res {
case pollNoError:
return nil
case pollErrClosing:
return errClosing(isFile)
case pollErrTimeout:
return ErrDeadlineExceeded
case pollErrNotPollable:
return ErrNotPollable
}
println("unreachable: ", res)
panic("unreachable")
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
// 通过go:linkname指令将当前函数与internal/poll包中的runtime_pollWait函数关联
// 允许跨包访问内部函数,是Go运行时内部实现的常见方式
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
// poll_runtime_pollWait 阻塞等待I/O操作就绪(读或写)
// 功能:当I/O资源(如网络数据、文件数据)未就绪时,阻塞当前goroutine,直到资源可用或发生错误
// 参数:
// - pd: 指向pollDesc的指针,代表需要等待的I/O对象
// - mode: 等待模式('r'表示读就绪,'w'表示写就绪)
// 返回值:错误码(pollNoError表示成功,其他值表示对应错误)
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// 先检查当前I/O状态是否已有错误(如已关闭、超时等)
// 避免不必要的阻塞等待
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode // 有错误直接返回,无需等待
}
// 特殊处理:对于Solaris、illumos、AIX和wasip1系统,使用水平触发(level-triggered)I/O
// 需要先激活(arm)轮询器,通知系统监控该I/O对象的事件
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" || GOOS == "wasip1" {
netpollarm(pd, mode)
}
// 循环等待I/O就绪:netpollblock会阻塞当前goroutine,直到被唤醒
// 唤醒后再次检查状态,处理可能的虚假唤醒或超时重置情况
for !netpollblock(pd, int32(mode), false) {
// 被唤醒后再次检查错误状态
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode // 有错误则返回
}
// 可能的场景:超时已触发并唤醒了我们,但在我们运行前,超时被重置
// 此时假装未发生过,重新进入等待
}
// 成功等待到I/O就绪,返回无错误
return pollNoError
}
// netpollcheckerr 检查I/O轮询描述符(pollDesc)的错误状态
// 功能:在进行I/O操作或等待I/O就绪前,快速判断当前状态是否存在错误(如已关闭、超时等)
// 无需加锁即可查询,通过读取pollDesc的atomicInfo实现高效状态检查
// 参数:
// - pd: 指向pollDesc的指针,代表要检查的I/O对象
// - mode: 操作模式('r'表示读操作,'w'表示写操作)
// 返回值:错误码(pollNoError表示无错误,其他值表示对应错误类型)
func netpollcheckerr(pd *pollDesc, mode int32) int {
// 获取pollDesc的当前状态信息(通过atomicInfo原子加载,无锁操作)
// info是对atomicInfo的包装,提供了解析状态位的方法
info := pd.info()
// 检查I/O对象是否正在关闭(closing状态位为true)
if info.closing() {
return pollErrClosing // 返回"关闭中"错误码
}
// 检查是否已超过截止时间(超时)
// 读模式:检查读截止时间是否已过期
// 写模式:检查写截止时间是否已过期
if (mode == 'r' && info.expiredReadDeadline()) ||
(mode == 'w' && info.expiredWriteDeadline()) {
return pollErrTimeout // 返回"超时"错误码
}
// 仅对读操作检查事件错误(eventErr状态位)
// 原因:写操作的错误会在后续写调用中被更具体地捕获和报告
// eventErr通常表示轮询过程中检测到的底层错误(如连接重置)
if mode == 'r' && info.eventErr() {
return pollErrNotPollable // 返回"不可轮询"错误码
}
// 所有检查通过,返回无错误
return pollNoError
}
我们来看怎么阻塞的,首先waitRead->wait->poll_runtime_pollWait->netpollblock(这个函数暂时不分析),到这个函数协程gopark阻塞了,那等到epoll唤醒了 之后,我会从这里开始执行->netpollcheckerr,由于上面我们超时(mode == 'r' && info.expiredReadDeadline()),返回错误码pollErrTimeout,->convertErr 的返回case pollErrTimeout: return ErrDeadlineExceeded ,这个就是"i/o timeout"的ErrDeadlineExceeded错误
3.读协程的通知
通过上面的框架的梳理,我们知道当执行waitRead之后,通过netpollblock阻塞协程,但是epoll和定时器都可以唤醒它,他们都是通过netpollunblock来唤醒的。
1.epoll的唤醒
epoll本身就是一个很简单的东西,你把tcp流看做是一个消息队列,epoll就是监听哪些fd(注意这里的fd并不一定是指网络fd)触发了哪些消息(读,写,关闭等)。
在netpoll_epoll文件中,netpoll函数
// netpoll 是 Go 运行时在 Linux/类 Unix 系统上的核心 I/O 事件轮询函数,基于 epoll 实现
// 核心职责:
// 1. 等待指定时长内的 I/O 事件(如网络数据到达、写缓冲区空闲、连接关闭等)
// 2. 收集因这些事件就绪而阻塞的 goroutine,标记为可运行状态
// 3. 维护运行时 I/O 等待者计数,确保调度器状态一致性
// 参数说明:
// - delay: 等待时长(单位:纳秒),取值含义如下:
// - delay < 0: 无限阻塞等待,直到有 I/O 事件就绪
// - delay == 0: 非阻塞查询,立即返回当前就绪事件(无论有无)
// - delay > 0: 超时阻塞等待,最多等待 delay 纳秒后返回
// 返回值说明:
// - gList: 因 I/O 就绪可恢复运行的 goroutine 列表,由调度器后续调度执行
// - int32: 等待者计数变化量(delta),正数表示等待者减少,负数表示增加
func netpoll(delay int64) (gList, int32) {
// 检查 epoll 实例句柄(epfd)是否初始化:epfd == -1 表示未初始化 epoll
// 未初始化时无法进行事件轮询,直接返回空列表和 0 等待者变化
if epfd == -1 {
return gList{}, 0
}
// 将输入的纳秒级等待时长(delay)转换为 epoll_wait 所需的毫秒级等待时间(waitms)
// epoll_wait 的等待精度为毫秒,需做单位转换和边界处理
var waitms int32
if delay < 0 {
// 无限等待:epoll_wait 会一直阻塞,直到有事件就绪
waitms = -1
} else if delay == 0 {
// 非阻塞模式:epoll_wait 立即返回,不等待任何事件
waitms = 0
} else if delay < 1e6 {
// 等待时长小于 1 毫秒(1e6 纳秒):epoll 最小精度为毫秒,按 1 毫秒等待
waitms = 1
} else if delay < 1e15 {
// 正常范围:纳秒转毫秒(1 毫秒 = 1e6 纳秒)
waitms = int32(delay / 1e6)
} else {
// 限制最大等待时长:1e9 毫秒 ≈ 11.5 天,避免异常大的 delay 导致资源泄漏
waitms = 1e9
}
// 定义事件存储数组:单次 epoll_wait 最多处理 128 个就绪事件
// 数组大小 128 是平衡内存占用(约 128*12 = 1536 字节)和系统调用频率的经验值
var events [128]syscall.EpollEvent
// retry 标签:用于处理 EINTR 错误时重新执行 epoll_wait
retry:
// 调用系统级 epoll_wait 函数,等待 epfd 上的 I/O 事件就绪
// 参数依次为:epoll 实例句柄、事件存储数组、数组长度、等待毫秒数
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
// 处理 epoll_wait 返回的错误(errno != 0 表示调用失败)
if errno != 0 {
// 区分错误类型:EINTR 是系统调用被信号中断(如进程收到 SIGINT),属于可恢复错误
if errno != _EINTR {
// 其他错误(如 epfd 无效、权限不足)属于运行时致命错误,打印信息并终止程序
println("runtime: epollwait on fd", epfd, "failed with", errno)
throw("runtime: netpoll failed")
}
// 处理 EINTR 错误:
// 1. 若为「超时等待」(waitms > 0):返回空列表,让上层重新计算剩余等待时长
// 2. 若为「无限等待」(waitms = -1):跳回 retry 重新执行 epoll_wait
if waitms > 0 {
return gList{}, 0
}
goto retry
}
// 初始化结果变量:
// - toRun:存储因 I/O 就绪而可恢复运行的 goroutine 列表
// - delta:等待者计数变化量,用于更新运行时整体等待者统计
var toRun gList
delta := int32(0)
// 遍历所有就绪的 epoll 事件(n 是 epoll_wait 返回的就绪事件总数)
for i := int32(0); i < n; i++ {
ev := events[i] // 当前遍历到的就绪事件
// 跳过空事件(理论上不会出现,防御性处理,避免后续逻辑异常)
if ev.Events == 0 {
continue
}
// -------------------------- 处理特殊事件:netpollEventFd --------------------------
// netpollEventFd 是 Go 运行时创建的「事件通知文件描述符」,用于主动唤醒阻塞的 epoll_wait
// 场景:当需要中断 epoll 等待(如定时器到期、手动触发 I/O 检查)时,会向该 fd 写入数据
// 此处通过 unsafe 转换提取 ev.Data 中的指针,判断是否指向 netpollEventFd
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollEventFd {
// 校验事件类型:netpollEventFd 仅应触发 EPOLLIN(读就绪)事件,其他类型属于异常
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: eventfd ready for", ev.Events)
throw("runtime: netpoll: eventfd ready for something unexpected")
}
// 仅在「阻塞等待」(delay != 0)时处理:读取 eventfd 数据以重置计数器
// 原因:非阻塞查询(delay=0)无需处理唤醒信号,避免重复消费
if delay != 0 {
var one uint64 // eventfd 存储的数据是 8 字节无符号整数
// 读取 eventfd 数据(EFD_SEMAPHORE 未设置,读取后计数器重置为 0)
// noescape 用于告诉编译器该指针不会逃逸到堆上,优化内存分配
read(int32(netpollEventFd), noescape(unsafe.Pointer(&one)), int32(unsafe.Sizeof(one)))
// 清除唤醒信号标记,避免后续逻辑误判
netpollWakeSig.Store(0)
}
continue // 跳过后续普通 I/O 事件处理,进入下一个事件遍历
}
// -------------------------- 处理普通 I/O 事件:解析与关联 pollDesc --------------------------
// 1. 解析事件对应的操作模式(读/写):
// - 读相关事件:EPOLLIN(数据可读)、EPOLLRDHUP(读端关闭)、EPOLLHUP(连接挂断)、EPOLLERR(错误)
// - 写相关事件:EPOLLOUT(数据可写)、EPOLLHUP(连接挂断)、EPOLLERR(错误)
// 注:EPOLLHUP/EPOLLERR 是通用事件,需同时触发读和写的就绪逻辑
var mode int32
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r' // 标记为读模式
}
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w' // 标记为写模式
}
// 2. 若模式有效(读或写),处理事件关联的 pollDesc(I/O 轮询描述符)
if mode != 0 {
// 从 ev.Data 中提取 taggedPointer:Go 运行时自定义类型,绑定 pollDesc 指针与 fd 序列号
// 作用:通过序列号校验 pollDesc 有效性,防止处理过期的 I/O 事件
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
pd := (*pollDesc)(tp.pointer()) // 提取 pollDesc 指针(指向当前 I/O 对象的轮询状态)
tag := tp.tag() // 提取 fd 序列号(tag):与 pollDesc.fdseq 对应
// 校验 pollDesc 有效性:当前 fd 序列号(pd.fdseq)是否与 tag 一致
// 防止场景:fd 被关闭后复用,旧的 pollDesc 事件仍被触发(此时 fdseq 已更新,tag 不匹配)
if pd.fdseq.Load() == tag {
// 设置事件错误状态:若事件类型是 EPOLLERR(I/O 错误),标记 pollDesc 的 eventErr 位
// 后续 netpollcheckerr 会检查该位,返回 pollErrNotPollable 错误
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
// 唤醒因该 I/O 阻塞的 goroutine:
// netpollready 会从 pd 中取出阻塞的读/写 goroutine,加入 toRun 列表
// 返回值 delta 是等待者减少的数量(因 goroutine 被唤醒,等待者计数降低)
delta += netpollready(&toRun, pd, mode)
}
}
}
// 返回结果:
// 1. toRun:就绪的 goroutine 列表,调度器会将这些 goroutine 加入可运行队列
// 2. delta:等待者计数变化量,用于更新运行时的等待者统计(如 netpollAdjustWaiters)
return toRun, delta
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
delta := int32(0)
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true, &delta)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
return delta
}
通过上面的代码,我们知道netpoll->netpollready->netpollunblock得到协程,然后加入可运行的队列。
有个小tips
pd.setEventErr(ev.Events == syscall.EPOLLERR, tag)
EPOLLERR的waitRead返回的错误是case pollErrNotPollable: return ErrNotPollable,文字"not pollable"
有人就疑惑了,那netpoll在哪里调用了,在协程切换,sysmon等等很多地方调用
//proc.go
func findRunnable(){
.
.
.
if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
if list, delta := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
netpollAdjustWaiters(delta)
trace := traceAcquire()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.ok() {
trace.GoUnpark(gp, 0)
traceRelease(trace)
}
return gp, false, false
}
}
.
.
.
还有一次
}
func sysmon() {
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
sched.lastpoll.CompareAndSwap(lastpoll, now)
list, delta := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(&list)
incidlelocked(1)
netpollAdjustWaiters(delta)
}
}
}
2.定时器的唤醒
定时器的唤醒我们上面讲了,是通过
rg = netpollunblock(pd, 'r', false, &delta)
epoll是通过
rg = netpollunblock(pd, 'r', true, &delta)
waitRead阻塞是通过
netpollblock(pd, int32(mode), false)
我们现在看这两个函数的代码
// netpollblock 使当前 goroutine 阻塞等待 I/O 就绪,返回结果表示阻塞的最终状态
// 返回值含义:
// - true:I/O 已就绪(可执行读/写操作)
// - false:等待超时 或 I/O 对象已关闭(无需继续等待)
// 参数说明:
// - pd:指向 pollDesc 的指针,代表要等待的 I/O 对象(维护读写状态、阻塞 goroutine 等)
// - mode:等待模式('r' 表示读 I/O 就绪,'w' 表示写 I/O 就绪)
// - waitio:是否「仅等待完成的 I/O」(true 时忽略非致命错误,仅因 I/O 就绪/超时/关闭返回;false 时若有错误直接返回)
// 并发限制:同一 mode 下禁止并发调用 netpollblock!
// 原因:pollDesc 的 rg/wg 字段(每个 mode 对应一个)仅能存储一个等待中的 goroutine,并发调用会导致状态混乱
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// 1. 确定当前 mode 对应的「goroutine 指针存储地址」(rg 对应读,wg 对应写)
// gpp 是指向 atomic.Uintptr 类型的指针(rg/wg 字段),用于原子操作管理等待状态
gpp := &pd.rg
if mode == 'w' { // 若为写模式,切换到 wg 字段
gpp = &pd.wg
}
// 2. 原子操作设置 gpp 的状态:尝试将其从「空闲(pdNil)」设为「等待中(pdWait)」
// 状态机说明:rg/wg 可能的状态值(均为 uintptr 类型):
// - pdNil:无 goroutine 等待,空闲状态
// - pdWait:有 goroutine 正在等待(当前函数设置的状态)
// - pdReady:I/O 已就绪(由 netpollready 等函数设置,通知等待的 goroutine)
// - 其他值:指向等待的 goroutine(g 指针)
for {
// 先尝试「消费已存在的就绪通知」:若 gpp 是 pdReady(已就绪),则原子替换为 pdNil,直接返回 true(I/O 就绪)
if gpp.CompareAndSwap(pdReady, pdNil) {
return true
}
// 若 gpp 是 pdNil(空闲),则原子替换为 pdWait(标记为等待中),跳出循环准备挂起
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}
// 防御性检查:若 gpp 状态既不是 pdReady 也不是 pdNil,说明存在并发调用或状态损坏
// 这种情况违反「同一 mode 禁止并发调用」的限制,直接抛出异常终止程序
if v := gpp.Load(); v != pdReady && v != pdNil {
throw("runtime: double wait")
}
}
// 3. 重新检查错误状态:设置 gpp 为 pdWait 后必须再次校验 I/O 状态
// 原因:其他函数(如 netpollunblock、netpollSetDeadline)的操作顺序是「改状态→publishInfo→读 rg/wg」
// 这里若不重新检查,可能错过「设置 pdWait 前已发生的超时/关闭」,导致不必要的挂起
// 条件说明:
// - waitio 为 true:仅等待完成的 I/O,忽略非致命错误,直接进入挂起
// - waitio 为 false:先调用 netpollcheckerr 检查是否有错误(关闭/超时/事件错误),无错误才挂起
if waitio || netpollcheckerr(pd, mode) == pollNoError {
// 挂起当前 goroutine,等待 I/O 就绪或被唤醒
// gopark 参数含义:
// - netpollblockcommit:唤醒时的回调函数(处理状态确认)
// - unsafe.Pointer(gpp):传递给回调的参数(指向 rg/wg 的指针)
// - waitReasonIOWait:等待原因(用于调度器跟踪和诊断)
// - traceBlockNet:阻塞类型标记(网络 I/O 阻塞)
// - 5:优先级(调度器的优先级排序依据)
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
}
// 4. 唤醒后清理状态:将 gpp 原子交换为 pdNil(重置为空闲状态),获取交换前的旧状态
// 目的:确保无论唤醒原因(I/O 就绪/超时/关闭),都清理等待标记,避免状态残留
old := gpp.Swap(pdNil)
// 5. 异常检查:若旧状态大于 pdWait,说明状态被非法篡改(如写入无效指针),抛出异常
if old > pdWait {
throw("runtime: corrupted polldesc")
}
// 6. 返回结果:若旧状态是 pdReady,说明唤醒原因是 I/O 就绪(返回 true);否则是超时/关闭(返回 false)
return old == pdReady
}
// netpollunblock 是 Go 运行时中「解除 I/O 阻塞」的核心函数
// 核心职责:
// 1. 将 pollDesc 中对应模式(读/写)的等待状态(rg/wg)设置为 pdReady(I/O 就绪)
// 2. 返回因该 I/O 阻塞的 goroutine(若存在)
// 3. 调整 I/O 等待者计数(通过 delta 指针传递调整量,需在 goroutine 被标记为就绪后应用)
// 参数说明:
// - pd:指向 pollDesc 的指针,代表要解除阻塞的 I/O 对象
// - mode:操作模式('r' 对应读等待 rg,'w' 对应写等待 wg)
// - ioready:是否因「I/O 实际就绪」而解除阻塞(true=I/O 就绪,false=超时/关闭等非就绪场景)
// - delta:等待者计数调整量的指针(函数内修改 delta 值,上层需调用 netpollAdjustWaiters 应用)
// 返回值:
// - *g:被解除阻塞的 goroutine(nil 表示无阻塞的 goroutine,或无需解除)
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
// 1. 确定要操作的「等待状态字段」:读模式用 rg,写模式用 wg
// gpp 是指向 atomic.Uintptr 类型的指针(rg/wg 存储等待状态或 goroutine 指针)
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 2. 原子循环操作:通过 CAS(CompareAndSwap)修改 gpp 状态,确保并发安全
// 循环目的:处理多 goroutine 并发修改 gpp 的场景(如同时解除阻塞和设置就绪)
for {
// 加载 gpp 当前的状态(old 可能是 pdNil/pdWait/pdReady/ goroutine 指针)
old := gpp.Load()
// 场景1:当前状态已为 pdReady(I/O 已就绪),无需重复操作,返回 nil
if old == pdReady {
return nil
}
// 场景2:当前状态为 pdNil(无 goroutine 等待)且非 I/O 就绪(ioready=false)
// 说明:仅当 ioready=true 时才需要主动设置 pdReady;非就绪场景(如关闭/超时)无需设置,返回 nil
if old == pdNil && !ioready {
// 注释补充:netpollblock 会在等待前检查超时/关闭,此处无需额外处理
return nil
}
// 3. 确定要设置的新状态(new):
// - 若 I/O 就绪(ioready=true):设为 pdReady,通知后续等待的 goroutine 直接就绪
// - 若非 I/O 就绪(ioready=false):设为 pdNil,重置为空闲状态
new := pdNil
if ioready {
new = pdReady
}
// 4. CAS 原子替换:若当前状态仍为 old,将其改为 new
if gpp.CompareAndSwap(old, new) {
// CAS 成功后,处理 old 状态对应的逻辑:
// 子场景1:old 是 pdWait(netpollblock 中设置的「等待中」状态)
// 说明:此时还未存储 goroutine 指针,重置为 pdNil,返回 nil(无阻塞的 goroutine)
if old == pdWait {
old = pdNil
}
// 子场景2:old 不是 pdNil(即 old 是 goroutine 指针)
// 说明:有 goroutine 因该 I/O 阻塞,需将等待者计数减 1(delta 记录调整量)
else if old != pdNil {
*delta -= 1 // 等待者减少 1,上层需通过 netpollAdjustWaiters 应用该调整
}
// 将 old(goroutine 指针)转换为 *g 类型返回,上层需唤醒该 goroutine
return (*g)(unsafe.Pointer(old))
}
// CAS 失败(old 状态已被其他 goroutine 修改),重新进入循环加载新状态
}
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
netpollAdjustWaiters(1)
}
return r
}
pollDesc的rg atomic.Uintptr // pdReady, pdWait, G waiting for read or pdNil,
这个字段能存储pdReady, pdWait, G waiting for read or pdNil这三种状态和自己本身的协程的指针,通过cas来流转状态。
比如完整的流转是pdNil->netpollblock(阻塞)->pdWait-》gopark-》netpollblockcommit-》gp(自己的协程指针)-》netpollunblock-》pdReady。注意这个函数返回的是gp自己协程的指针,将状态置成了pdReady,返回自己的指针,就将它加入了运行队列。
自己也可以自行分析,当io读完成了,但是定时超时触发了,会返回nil,后面的就不会执行了
当超时触发了,io还没来,也会返回nil,但是下面都有nil判断,也不会执行了,大家可以根据这个自行画出状态流转图。
注意return (*g)(unsafe.Pointer(old))如果old=pdNil=0,这个返回的指针也是nil,也是不会执行后面的
总结
通过分析读超时,我们发现了go作者将epoll集成到了runtime中,go的chan和网络io就很自然的通过gopark和goready来控制它的状态,可以通过它的waitReasonIOWait和waitReasonChanReceiveNilChan等来区分是哪一种阻塞,真的很巧妙,毕竟协程的gopark阻塞在用户态,没进入内核,开销小,要将所有的集成到里面,还是需要一番考量的,读超时就是利用定时器和状态流转来控制协程的。
本文有些解释是通过ai来解释的,最后本人能力有限,有不足之处,望大家指出
更多推荐
所有评论(0)