eBPF: 从 BPF to BPF Calls 到 Tail Calls
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。引言参考linux安全之seccompUsing seccomp to limit the kernel attack surfaceSECure COMPuting with filtersseccomp(2) — Linux manu
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
本作品 (李兆龙 博文, 由 李兆龙 创作),由 李兆龙 确认,转载请注明版权。
引言
这篇文章首先介绍尾调用的一般限制和用法,并与BPF to BPF calls做对比,最后给出一个我对内核源码中tail call sample做的一个修改版本(应用CO-RE)。(我在学习尾调用的时候苦于没有一个能跑起来的简单易懂的例子,所以最后自己撸了一个,这个版本我认为是目前能找到的所有例子里对初学者最友好,逻辑最清晰的一个)。
Tail Call
BPF 提供了一种在内核事件和用户程序事件发生时安全注入代码的能力,这就让非内核开发人员也可以对内核进行控制,但是因为11 个 64 位寄存器和 32 位子寄存器、一个程序计数器和一个 512 字节的 BPF 堆栈空间以及100万条指令(5.1+),递归深度33的固有限制,使得可以实现的逻辑是有限的(非图灵完备)。
内核栈是很宝贵的,一般BPF到BPF的会使用额外的栈帧,尾调用最大的优势就是其复用了当前的栈帧并跳转至另外一个eBPF程序,可以在[5]中看到如下描述:
The important detail that it’s not a normal call, but a tail call. The kernel stack is precious, so this helper reuses the current stack frame and jumps into another BPF program without adding extra call frame.
eBPF程序都是独立验证的(调用者的堆栈和寄存器中的值被调用者不可访问),所以状态的传递一般可以使用per-CPU map传递,TC还可以使用skb_buff->cb这样的特殊数据项去传递数据[8];其次类型相同的 BPF 程序才可以尾调用,而且还要与 JIT 编译器相匹配, 因此一个给定的 BPF 程序 要么是 JIT编译执行,要么是解释器执行(invoke interpreted programs)
尾调用的步骤需要用户态和内核态配合,主要由两个部分组成:
- 用户态:
BPF_MAP_TYPE_PROG_ARRAY
类型的特殊map,存储自定义index到bpf_program_fd的到映射 - 内核态:
bpf_tail_call
辅助函数,其负责跳转到另一个 eBPF 程序,其函数定义是这样的static long (*bpf_tail_call)(void *ctx, void *prog_array_map, __u32 index)
,ctx是上下文,prog_array_map是前面说的BPF_MAP_TYPE_PROG_ARRAY
类型的map,用于用户态设置跳转程序和用户自定义index的映射,index就是用户自定义索引了。
bpf_tail_call
如果运行成功,内核立即运行新eBPF程序的第一条指令(永远不会返回到之前的程序)。如果跳转的目标程序不存在(即 index
在 prog_array_map
中不存在),或者此程序链已达到最大尾调用数,则调用可能会失败,如果调用失败,调用者继续执行后续指令。
[5]中我们可以看到如下文字:
The chain of tail calls can form unpredictable dynamic loops therefore tail_call_cnt is used to limit the number of calls and currently is set to 32.
这个限制在内核中由宏 MAX_TAIL_CALL_CNT (用户空间不可访问)定义,当前设置为 32(我并不知道这个所谓的unpredictable dynamic loops
是什么)。
上面提到了尾调用可以省内核栈空间,除了这一点以外我认为其最大的优势如下:
- 用于增加可执行的eBPF程序指令的最大执行数
- eBPF程序编排
上面两个优势不是在我在想当然的胡诌,举个两个例子分别解释上面两个观点:
- [10]在BMC中有一个eBPF程序中有一个大循环,虽然eBPF程序只有142行,但是字节码已经到了七十多万行,如果不做逻辑拆分会在 verify 阶段被拒绝。
- [9]中提出了一种通过配置文件任意组合eBPF程序的eBPF编排策略,给我的感觉是他们像做一个三方存储,然后可以通过配置自动拉去需要的eBPF程序,然后自动编排,载入,执行,这里编排的过程是用尾调用实现的,基本的流程如下:
BPF to BPF Calls
从 Linux 4.16 和 LLVM 6.0 开始,这个限制得到了解决,加载器、校验器、解释器和 JIT 中都开始支持函数调用。
最大的优势是减小了生成的 BPF 代码大小,因此对 CPU instruction cache 更友好。BPF 辅助函数的调用约定也适用于 BPF 函数间调用,即 r1 - r5 用于传递参数,返回 结果放到 r0。r1 - r5 是 scratch registers,r6 - r9 像往常一样是保留寄存器。最大嵌套调用深度是 8。调用方可以传递指针(例如,指向调用方的栈帧的指针) 给被调用方。
尾调用的缺点是生成的程序镜像大,但是省内存;BPF to BPF Calls的优点是镜像小,但是内存消耗大。内核5.9以前不允许 tail Call 和 BPF to BPF Call 调用协同工作,在5.10以后的X86架构上,允许同时使用这两种调用类型。
在[7]中提到同时使用两种类型有一定限制,否则会导致内核栈溢出:
我们以上图的调用链举例,这里所说的限制就是每一个子程序的栈空间( stack size)不能超过 256 字节(如果校验器检测到 bpf to bpf 调用,那主程序也会被当做子程序),这个限制使得 BPF 程序调用链最多能使用 8KB 的栈空间,计算方式:256 byte/stack 乘以尾调用数量上限 33。如果没有这个限制,BPF 程序将使用 512 字节栈空间,最终消耗最多 16KB 的总栈空间,在某些架构上会导致栈溢出。
这里需要提的一点是两种类型在同时使用时到底是如何省内存的,举个例子,subfunc1 执行 Tail Call 调用 func2,此时subfunc的栈帧已经被func2复用了,然后func2 执行 BPF to BPF Calls 调用 subfunc2,此时第三个栈帧被创建,然后执行 Tail Call 调用 func3,五个逻辑过程使用了三个栈,这就节省了内存。
然后因为开始时调用subfunc1,所以最终的程序执行权仍然会回到func1。
CO-RE Sample
我们可以在[11][12]中看到kernel中对于Tail Call的官方实例,我使用libbpf CO-RE特性修改了[11],使得User程序更容易理解 libbpf 使用 Tail Call。其次虽然挂载ebpf程序可以使用其他命令辅助挂载,比如tc,prctl,但是示例程序我认为可能老老实实调接口挂容易理解些,我们使用[11]作为例子。
例子中用到了seccomp filter
,这个过滤器用于减少内核中系统调用暴露于应用程序的范围,说人话就是限制每个进程使用的系统调用(参数修改是task_struct级别的),支持SECCOMP_SET_MODE_STRICT
和SECCOMP_SET_MODE_FILTER
两种过滤模式,用于限定子集(read,write, _exit,sigreturn)的过滤和eBPF形式的过滤。[3]中提到可以避免TOCTTOU,因为open被限制了,自然也就没有TOCTTOU错误的风险了[6]。
[3]中对seccomp filter
的解释是这样的:
System call filtering isn’t a sandbox. It provides a clearly defined mechanism for minimizing the exposed kernel surface. It is meant to be a tool for sandbox developers to use.
[2]中提到:
Designed to sandbox compute-bound programs that deal with untrusted byte code.
tracex5_kern.bpf.c
#include "vmlinux.h"
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_tracing.h>
#include <bpf/bpf_core_read.h>
#include "bpf_helpers.h"
char LICENSE[] SEC("license") = "Dual BSD/GPL";
struct {
__uint(type, BPF_MAP_TYPE_PROG_ARRAY);
__uint(max_entries, 1024);
__type(key, u32);
__type(value, u32);
} progs SEC(".maps");
SEC("kprobe/__seccomp_filter")
int BPF_KPROBE(__seccomp_filter, int this_syscall, const struct seccomp_data *sd, const bool recheck_after_trace)
{
// 这里注意ebpf程序栈空间只有512字节,太大这里会报错的,可以自己调大一点看看
char comm_name[30];
bpf_get_current_comm(comm_name, sizeof(comm_name));
// 调用失败以后会直接 fall through
bpf_tail_call(ctx, &progs, this_syscall);
char fmt[] = "syscall=%d common=%s\n";
bpf_trace_printk(fmt, sizeof(fmt), this_syscall, comm_name);
return 0;
}
/* we jump here when syscall number == __NR_write */
SEC("kprobe/SYS__NR_write")
int bpf_func_SYS__NR_write(struct pt_regs *ctx)
{
struct seccomp_data sd;
bpf_probe_read(&sd, sizeof(sd), (void *)PT_REGS_PARM2(ctx));
if (sd.args[2] > 0) {
char fmt[] = "write(fd=%d, buf=%p, size=%d)\n";
bpf_trace_printk(fmt, sizeof(fmt),
sd.args[0], sd.args[1], sd.args[2]);
}
return 0;
}
SEC("kprobe/SYS__NR_read")
int bpf_func_SYS__NR_read(struct pt_regs *ctx)
{
struct seccomp_data sd;
bpf_probe_read(&sd, sizeof(sd), (void *)PT_REGS_PARM2(ctx));
if (sd.args[2] > 0 && sd.args[2] <= 1024) {
char fmt[] = "read(fd=%d, buf=%p, size=%d)\n";
bpf_trace_printk(fmt, sizeof(fmt),
sd.args[0], sd.args[1], sd.args[2]);
}
return 0;
}
SEC("kprobe/SYS__NR_open")
int bpf_func_SYS__NR_open(struct pt_regs *ctx)
{
struct seccomp_data sd;
bpf_probe_read(&sd, sizeof(sd), (void *)PT_REGS_PARM2(ctx));
char fmt[] = "open(fd=%d, path=%p)\n";
bpf_trace_printk(fmt, sizeof(fmt), sd.args[0], sd.args[1]);
return 0;
}
tracex5_user.c
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#include <time.h>
#include <assert.h>
#include <errno.h>
#include <sys/resource.h>
#include <linux/if_link.h>
#include <linux/limits.h>
#include <bpf/libbpf.h>
#include "trace.skel.h"
#define BPF_SYSFS_ROOT "/sys/fs/bpf"
enum {
SYS__NR_read = 3,
SYS__NR_write = 4,
SYS__NR_open = 5,
};
struct bpf_progs_desc {
char name[256];
enum bpf_prog_type type;
int map_prog_idx;
struct bpf_program *prog;
};
static struct bpf_progs_desc progs[] = {
{"kprobe/__seccomp_filter", BPF_PROG_TYPE_KPROBE, -1, NULL},
{"kprobe/SYS__NR_read", BPF_PROG_TYPE_KPROBE, SYS__NR_read, NULL},
{"kprobe/SYS__NR_write", BPF_PROG_TYPE_KPROBE, SYS__NR_write, NULL},
{"kprobe/SYS__NR_open", BPF_PROG_TYPE_KPROBE, SYS__NR_open, NULL},
};
static int libbpf_print_fn(enum libbpf_print_level level, const char *format, va_list args)
{
return vfprintf(stderr, format, args);
}
static volatile bool exiting = false;
static void sig_handler(int sig)
{
exiting = true;
}
int main(int argc, char **argv)
{
struct trace_bpf *skel;
int map_progs_fd, main_prog_fd, prog_count;
int err;
// 设置一些debug信息的回调
libbpf_set_print(libbpf_print_fn);
signal(SIGINT, sig_handler);
signal(SIGTERM, sig_handler);
// Load and verify BPF application
skel = trace_bpf__open();
if (!skel) {
fprintf(stderr, "Failed to open and load BPF skeleton\n");
return 1;
}
// Load and verify BPF programs
err = trace_bpf__load(skel);
if (err) {
fprintf(stderr, "Failed to load and verify BPF skeleton\n");
goto cleanup;
}
map_progs_fd = bpf_object__find_map_fd_by_name(skel->obj, "progs");
prog_count = sizeof(progs) / sizeof(progs[0]);
for (int i = 0; i < prog_count; i++) {
progs[i].prog = bpf_object__find_program_by_title(skel->obj, progs[i].name);
if (!progs[i].prog) {
fprintf(stderr, "Error: bpf_object__find_program_by_title failed\n");
return 1;
}
bpf_program__set_type(progs[i].prog, progs[i].type);
}
for (int i = 0; i < prog_count; i++) {
int prog_fd = bpf_program__fd(progs[i].prog);
if (prog_fd < 0) {
fprintf(stderr, "Error: Couldn't get file descriptor for program %s\n", progs[i].name);
return 1;
}
// -1指的是主程序
if (progs[i].map_prog_idx != -1) {
unsigned int map_prog_idx = progs[i].map_prog_idx;
if (map_prog_idx < 0) {
fprintf(stderr, "Error: Cannot get prog fd for bpf program %s\n", progs[i].name);
return 1;
}
// 给 progs map 的 map_prog_idx 插入 prog_fd
err = bpf_map_update_elem(map_progs_fd, &map_prog_idx, &prog_fd, 0);
if (err) {
fprintf(stderr, "Error: bpf_map_update_elem failed for prog array map\n");
return 1;
}
}
}
// 只载入主程序,尾调用不载入,所以不可以调用trace_bpf__attach
struct bpf_link* link = bpf_program__attach(skel->progs.__seccomp_filter);
if (link == NULL) {
fprintf(stderr, "Error: bpf_program__attach failed\n");
return 1;
}
while(exiting){
// 写个裸循环会吃巨多CPU的
sleep(1);
}
cleanup:
// Clean up
trace_bpf__destroy(skel);
return err < 0 ? -err : 0;
}
执行如下指令:
- clang -g -O2 -target bpf -D__TARGET_ARCH_x86 -I/usr/src/kernels/5.4.119-19-0009.1(改成自己的)/include/ -idirafter /usr/local/include -idirafter /usr/include -c trace.bpf.c -o trace.bpf.o
- gen skeleton trace.bpf.o > trace.skel.h
- clang -g -O2 -Wall -I . -c trace.c -o trace.o
- clang -Wall -O2 -g trace.o -lelf -lz -o trace
- cat /sys/kernel/debug/tracing/trace_pipe
可以看到预期输出。
tail call costs in eBPF
[13]评估了为缓解 Spectre 缺陷而引入的一些优化给eBPF的尾调用性能带来了多少性能损耗(Spectre 是指大多数CPU(英特尔、AMD、ARM)上存在的一系列利用硬件漏洞的漏洞)。
这篇文章还没看,有精力了拜读一下,先插个眼。
总结
eBPF环境现在我还是很头疼,开始在VMware Fusion上跑虚拟机,但是m1不支持VMware Tools,导致使用极其不便,而且镜像也需要特殊的修改才能用到m1上,用高版本的内核没那么容易;其次买的云服务器版本只到5.4.119,一些eBPF的特性也没法用;至于MAC m1双系统,我不太有精力去吃这个螃蟹了,一是资料少,其次 Tail Calls 和 BPF to BPF Calls 同时调用目前也只支持X86,收益也不大。
好吧,我承认,归根结底,还是不想花钱去买Parallels Desktop。
参考:
- linux安全之seccomp
- Using seccomp to limit the kernel attack surface
- SECure COMPuting with filters
- seccomp(2) — Linux manual page
- bpf: introduce bpf_tail_call() helper
- Time-of-check to time-of-use wiki
- cilium document
- eBPF: Traffic Control Subsystem
- Introducing Walmart’s L3AF Project: Control plane, chaining eBPF programs, and open-source plans
- BMC: Accelerating Memcached using Safe In-kernel Caching and Pre-stack Processing
- kernel tracex5_kern.c
- kernel sockex3_kern.c
- Evaluation of tail call costs in eBPF
更多推荐
所有评论(0)