OpenTelemetry 分布式跟踪 Rust

在本文中,我将分享我将 OpenTelemetry 分布式跟踪添加到 Rust 应用程序的经验。

我将尝试回答以下问题:

  • 如何在 Rust 中检测 Opentelemetry?

  • 如何在 Rust 应用程序中添加手动和自动检测?

  • 如何使用tracing来调试Rust应用?

  • 如何可视化跟踪和跨度?

  • 如何在多线程环境中保存我们的 span 上下文?

  • 在 Rust 中进行跟踪时,你必须知道哪些 crate?

首先我会说我喜欢 Rust 作为一种编程语言,并且在过去的一年里我一直在尝试它。尽管它具有创新性并且拥有强大的生态系统,但 Rust 具有相当陡峭的学习曲线并不是什么秘密。因此,即使这是一篇介绍性文章,我也不会深入研究 Rust 的特性、语法或语义,除非它以某种方式与我们的主题(Opentelemetry 和分布式跟踪)相关联。

什么是 OpenTelemetry?

OpenTelemetry 是 API 和 SDK 的集合,允许我们收集、导出和生成跟踪、日志和指标(也称为可观察性的三大支柱)。

它是一个 CNCF 社区驱动的开源项目(Cloud Native Computing Foundation,负责 Kubernetes 的人)。

在云原生环境中,我们使用 OpenTelemetry(简称 OTel)从我们的系统操作和事件中收集数据。换句话说,检测我们的分布式服务。这些数据使我们能够了解和调查我们软件的行为并解决性能问题和错误。

OpenTelemetry 用作标准可观察性框架,可在单一规范下捕获所有数据。它提供了几个组件,包括:

  • 用于生成遥测的每种编程语言的 API 和 SDK。

  • OpenTelemetry Collector 接收、处理遥测数据并将其导出到不同的目的地。

  • OTLP用于传送遥测数据的协议

为了更深入地了解这项技术,包括其结构和主要动机,访问本指南。

对于这篇 OpenTelemetry Rust 文章,您需要了解以下术语:

  • 跨度:最基本的单位。跨度表示我们系统中的事件(例如,HTTP 请求或跨越时间的数据库操作)。一个跨度通常是另一个跨度的父级、它的子级或两者兼而有之。

  • Trace:分布式服务的“调用堆栈”。跟踪表示以子/父关系连接的跨度树。跟踪指定了我们应用程序中不同服务和组件(数据库、数据源、队列等)的请求进展。例如,向 user-service 发送 API 调用会导致对 users-db 的 DB 查询。

  • Exporter:一旦我们创建了一个 span,exporter 就会处理将数据发送到我们的后端(例如,在内存中、Jaeger Tracing 或控制台输出中)

  • 上下文传播 – 允许我们跨分布式服务关联事件的机制。上下文被称为我们收集和传输的元数据。传播是上下文如何在服务之间打包和传输,通常通过 HTTP 标头。上下文传播是 OpenTelemetry 的亮点之一。

  • Instrumentation – 仪器库收集数据并根据我们应用程序中的不同库(Actix-Web、Postgres...)生成跨度。

要了解更多 OpenTelemetry 术语,请访问官方文档。

Rust 中的开放遥测和分布式跟踪

Rust 中的跟踪

在 Rust 中,我们有一个很棒的 crate,简称为 tracking。它是我们跟踪 Rust 程序的核心框架。 crates 文档概述了由这些术语组成的核心概念和 API——spans、events、subscribers.

我介绍了 Spans 和 Events 是什么,但 Subscriber 是一个新术语。要记录跨度和事件,必须实现订阅者特征。这意味着只需实现这些方法:

  • enter:表示输入一个span。

  • exit:表示结束一个span。

  • 事件:表示有事件发生。

要开始记录跟踪,我们需要初始化一个订阅者。板条箱_tracing_subscriber_帮助我们做到这一点。

我们初始化 Registry 结构:

let subscriber = tracing_subscriber::Registry::default()

这个结构实现了一个 subscriber 并向我们展示了这个 crate 的另一个重要特性——层。使用层,我们可以配置我们的 Subscriber 以在与跨度和事件交互时应用特定行为。

例如,如果我们想要过滤我们的一些数据,将其导出、格式化或编辑,我们可以创建或使用现有层并与注册表订阅者组合它。

let subscriber = tracing_subscriber::Registry::default().with(SomeLayer)

我们将在实践部分看到更多关于如何使用这些 crate 来检测 Rust 程序的示例。

Rust 中的 OpenTelemetry

为 OpenTelemetry 提供支持的 crate 叫做 – 多么方便 – OpenTelemetry[see here].

OpenTelemetry 提供一组 API、库、代理和收集器服务,以从您的应用程序中捕获分布式跟踪和指标。”

让我们回顾一下它的一些关键 API:

  • 示踪剂。在 crate 内部,有一个名为 traces 的模块。它向我们介绍了一个称为 Tracer 的特性。这就是跟踪和连接我们的 span 以创建跟踪的内容。为了使用 Tracer,我们创建了一个订阅者层。这就是订阅者如何知道将跨度发送到哪里以及如何生成跟踪的方式。

  • 全球。该模块提供了一个 API,因此无论我们当前处于代码的哪个部分,我们都可以访问订阅者、跟踪器和传播上下文。

  • SDK。该模块为 OpenTelemetry 的常见用例提供了实现。例如,它提供了一个导出器来将跟踪发送到标准输出。它还实现了我们可以使用而不是自己实现的上下文传播方法。

git 存储库名为 opentelemetry-rust 包含几个扩展 opentelemetry 生态系统的 crate 的实现。在那里你可以找到常用的工具、导出器和订阅者在 Rust 程序中使用它们。

OpenTelemetry Rust 中的分布式跟踪

使用来自 opentelemetry-rustrepo 的 crates,我们可以跨多个服务检测我们的应用程序并将它们发送到分布式平台。

例如,我们可以使用 opentelemetry-jaegercrate 将跟踪发送到远程 Jaeger 平台。

另一个例子是 OpenTelementy-otlpcrate

可以使用此 crate 中的导出器将 OTLP 格式的跟踪导出到 OpenTelemetry Collector。 OpenTelemetry Collector 以与供应商无关的方式接受、处理和导出跟踪。

实用 OpenTelemetry Rust 示例

对于这个例子,我想构建一个简单的系统,可以用来集成和可视化跟踪。我找到了Actix-Web框架和Diesel的自动工具,这是一个用于基于 SQL 的数据库的 ORM 和查询构建器实用程序。

我选择使用这些工具并按照这些框架给出的示例构建一个简单的 REST api 服务来创建、读取和删除用户。

这是源代码的链接。

创建服务

首先,我按照 Actix Web 和 Diesel 文档创建了一个简单的 Web 服务器,它公开用户服务并与本地 Postgres 数据库通信:

mod models;
mod schema;
mod users;
#[macro_use]
extern crate diesel;
extern crate dotenv;
use dotenv::dotenv;
use actix_web::{web, App, HttpServer};
use diesel::r2d2::{self, ConnectionManager};
use diesel::pg::PgConnection
use telemetry::init_telemetry;
use users::get_users_service;
pub type Connection = PgConnection
pub type DbPool = r2d2::Pool<ConnectionManager<Connection>>;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
  dotenv().ok();
  let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set");
  let manager = ConnectionManager::<InstrumentedPgConnection>::new(database_url);
  let pool: DbPool = r2d2::Pool::builder()
      .build(manager)
      .expect("Failed to create pool.");
  HttpServer::new(move || {
      App::new()
          .app_data(web::Data::new(pool.clone()))
          .service(get_users_service())
  })
  .bind(("127.0.0.1", 3000))?
  .run()
  .await
}

对于这个博客,我将只展示一个端点及其流程。您可以在源代码中找到其余部分。

use crate::DbPool;
use crate::{models::NewUser};
use actix_web::{delete, get, post, web, Error, HttpResponse};
use uuid::Uuid;
#[post("")]
async fn create_user(
  db: web::Data<DbPool>,
  new_user: web::Json<NewUser>,
) -> Result<HttpResponse, Error> {
  let user = web::block(move || {
      let conn = db.get()?;
      db_operations::insert_new_user(&new_user.name, &conn)
  })
  .await?
  .map_err(actix_web::error::ErrorInternalServerError)?;
  Ok(HttpResponse::Ok().json(user))
}
pub fn get_users_service() -> actix_web::Scope {
  web::scope("/users")
      .service(get_users)
      .service(get_user_by_id)
      .service(create_user)
      .service(delete_user)
}
mod db_operations {
pub fn insert_new_user(nm: &str, conn: &PgConnection) -> Result<models::User, DbError> {
  use crate::schema::users::dsl::*;
  let new_user = models::User {
      id: Uuid::new_v4().to_string(),
      name: nm.to_owned(),
  };
  diesel::insert_into(users).values(&new_user).execute(conn)?;
  Ok(new_user)
}
}

创建我们的跟踪设置

现在我有了一个工作服务,我可以添加遥测数据。我将需要配置一个跟踪订阅者及其层。

use opentelemetry::sdk::export::trace::stdout;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::{trace, Resource};
use opentelemetry::KeyValue;
use opentelemetry::{global};
use std::collections::HashMap;
use std::env;
use tracing_subscriber::Registry;
use tracing_subscriber::{prelude::*, EnvFilter};
pub fn init_telemetry() {
  // Define Tracer
  let stdout_tracer = stdout::new_pipeline().install_simple();
  let subscriber = Registry::default();
  // Layer to filter traces based on level - trace, debug, info, warn, error.
  let env_filter = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"));
  // Layer to add our configured tracer.
  let tracing_leyer = tracing_opentelemetry::layer().with_tracer(stdout_tracer);
  // Setting a trace context propagation data.
  global::set_text_map_propagator(TraceContextPropagator::new());
  subscriber
      .with(env_filter)
      .with(tracing_leyer)
      .init();
}

此配置添加了一个 env_filter 层和一个跟踪层。它将所有级别等于或高于“信息”日志级别的跨度打印到标准输出。

为 Actix-Web 框架添加 OpenTelemetry Instrumentation

现在让我们通过检测我们采取的行动来添加一些跨度。让我们从检测 Actix web 所采取的操作开始。

为此,我将使用 actix_web_opentelemetry crate,它提供了一个名为 RequestTracing 的中间件。顾名思义,这个中间件从请求中提取跟踪信息并创建一个具有相同上下文的跨度。

use actix_web_opentelemetry::RequestTracing;
// add the middleware to the server.
HttpServer::new(move || {
      App::new()
          .wrap(RequestTracing::new())
          .app_data(web::Data::new(pool.clone()))
          .service(get_users_service())

现在,当运行应用程序并调用创建用户端点时,我们将 spans 打印到终端。

试图从这张印刷品中理解一些东西是相当困难的。此外,我们今天聚集在这里是为了可视化痕迹,而不是将它们打印到终端。

所以我更改了跟踪器以将跟踪导出到 OTLP 收集器。我将跟踪器配置为将跟踪发送到 Aspecto 平台。

使用Aspecto 可视化跟踪

要继续,您可以打开一个新的永远免费Aspecto 帐户或登录到您现有的帐户。

下面,确保将 {ASPECTO_API_KEY} 替换为您唯一的 Aspecto 令牌 ID –https://app.aspecto.io/app/integration/token(设置 > 集成 > 令牌)

use opentelemetry_otlp::WithExportConfig;
let exporter = opentelemetry_otlp::new_exporter()
      .http()
      .with_endpoint("https://otelcol.aspecto.io/v1/traces")
      .with_headers(HashMap::from([(
          "Authorization".into(),
          env::var("ASPECTO_API_KEY").unwrap().to_string(),
      )]));
let otlp_tracer = opentelemetry_otlp::new_pipeline()
      .tracing()
      .with_exporter(exporter)
      .with_trace_config(
          trace::config().with_resource(Resource::new(vec![KeyValue::new(
              opentelemetry_semantic_conventions::resource::SERVICE_NAME,
              env::var("SERVICE_NAME").unwrap().to_string(),
          )])),
      )
      .install_batch(opentelemetry::runtime::Tokio)
      .expect("Error - Failed to create tracer.");
let tracing_leyer = tracing_opentelemetry::layer().with_tracer(otlp_tracer);

再次运行应用程序时,我可以在 Aspecto 中查看我的踪迹:

Aspecto 分布式跟踪平台显示我们的 Rust 服务跟踪

单击跟踪,我可以查看跨度及其数据:

Aspecto 分布式跟踪平台显示我们的 Rust 服务跟踪的原始数据

手动检测 Rust 函数

让我们为我们的服务添加更多工具。

通过在函数上使用宏#[instrument],我们可以为这个函数创建一个跨度。

这意味着跨度名称将是函数的名称。它的属性将包括模块箱、函数库和函数的属性。

通过使用_instrument(skip(...), field(...))_,我们可以跳过函数的一些参数,这些参数对我们记录和手动插入新属性并不重要。

让我们试一试:

#[instrument(skip(db))]
#[post("")]
async fn create_user(
  db: web::Data<DbPool>,
  new_user: web::Json<NewUser>,
) -> Result<HttpResponse, Error> {
  let user = web::block(move || {
      let conn = db.get()?;
      db_operations::insert_new_user(&new_user.name, &conn)
  })
  .await?
  .map_err(actix_web::error::ErrorInternalServerError)?;
  Ok(HttpResponse::Ok().json(user))
}

我现在为函数 create_user 添加了工具。我跳过了 db 参数。让我们在 Aspecto 中查看这个新的 span:

Aspecto 跟踪 Rust 服务,为函数 create_user 添加了工具

我们可以清楚地看到添加到跟踪中的新跨度。单击新跨度,我们可以看到我之前提到的所有属性:

Aspecto 跟踪 Rust 服务,为函数 create_user 添加了工具,查看属性

为 Diesel 添加 OpenTelemetry Instrumentation

为了全面了解,我将为 db_operation 函数添加工具:

#[instrument(skip(conn))]
pub fn insert_new_user(nm: &str, conn: &Connection) -> Result<models::User, DbError> {
  use crate::schema::users::dsl::*;
  let new_user = models::User {
      id: Uuid::new_v4().to_string(),
      name: nm.to_owned(),
  };
  diesel::insert_into(users).values(&new_user).execute(conn)?;
  Ok(new_user)
}

对于我们正在使用的柴油库。要添加柴油仪表,我们需要使用 _diesel_tracing_ crate 并将我们的 PgConnection 替换为 InstrumentedPgConnection:

use diesel_tracing::pg::InstrumentedPgConnection;
pub type Connection = InstrumentedPgConnection;
pub type DbPool = r2d2::Pool<ConnectionManager<Connection>>;

我再次运行应用程序并观察 Aspecto 中的痕迹:

Aspecto 跟踪平台显示来自我们服务的多个跟踪

我们可以看到许多不相关的痕迹。这是因为数据库连接结构通过调用建立函数定期与数据库通信以确保连接仍然有效。

在不同线程上执行闭包时如何保留我们的跨度上下文?

但是这里有一个问题。你能看见它吗?出于某种原因,代表我们请求的跨度和代表数据库查询的跨度没有组合在一起。这是由于以下代码而发生的:

#[instrument(skip(db))]
#[post("")]
async fn create_user(
  db: web::Data<DbPool>,
  new_user: web::Json<NewUser>,
) -> Result<HttpResponse, Error> {
  let user = web::block(move || {
      let conn = db.get()?;
      db_operations::insert_new_user(&new_user.name, &conn)
  })
  .await?
  .map_err(actix_web::error::ErrorInternalServerError)?;
  Ok(HttpResponse::Ok().json(user))
}

我们在传递给 web::block 函数的闭包中调用我们的 db 操作。我们失去了执行流程的跨度上下文。然而,web::block 功能是至关重要的。

它在专用于异步阻塞操作的线程池上执行阻塞函数。那么我们如何保持跟踪的上下文呢?

我们可以通过使用我们自己的跟踪函数包装 web::block 函数来做到这一点:

fn traced_web_block<F, R>(f: F) -> impl Future<Output = Result<R, BlockingError>>
where
  F: FnOnce() -> R + Send + 'static,
  R: Send + 'static,
{
  let current_span = tracing::Span::current();
  web::block(move || current_span.in_scope(f))
}

在这里,我们获得了当前范围并在其范围内调用闭包。

现在让我们查看我们的痕迹:

Aspecto 跟踪平台显示来自我们服务的完整端到端跟踪

我们现在可以全面了解应用程序的流程。

Rust 中的 OpenTelemetry 分布式跟踪:总结

我们回顾了如何使用 OpenTelemetry 向 Rust 应用程序添加分布式跟踪。我们探索了如何添加手动检测和库提供的检测。

我们使用 Aspecto 来可视化系统的流程,并了解流程中断时该怎么做。

总而言之,我喜欢处理这个例子。我认为 Rust 在跟踪方面采取的方法很有趣,在编写 Rust 程序时值得考虑。此外,生态系统正在增长,所以我相信很快会有更多的库提供工具。

我希望你学到了一些新东西。如有任何问题,请随时与联系。

Logo

CI/CD社区为您提供最前沿的新闻资讯和知识内容

更多推荐