ETL 是从一个或多个源中提取、转换和加载数据到目标的过程。查看文章ETL 管道解释了解 ETL 管道的一般概述。

这是有关 ETL 管道的系列文章的第一篇。

  • 提取物(本文)

  • 变换

  • 负载

在ETL管道中提取数据

ETL 管道的第一步是提取数据,我们将在以后的步骤中对其进行转换加载。在提取阶段,决策是从哪些数据源中提取以及提取的准确程度(API、授权、数据库......)。

在示例中,将使用jsonplaceholder.typicode.com。它是一个 REST API,有很多选项,并且可以免费进行测试和原型设计。我们将创建两个函数。一个用于提取数据,另一个用于编排 ETL 管道的不同阶段。

在现实世界的示例中,数据源可能是数据库,但对于此示例,占位符 API 很好。由于必须遵循相同的方法,因此只有与数据库的接口(直接连接和查询或通过中间件)会有所不同。

Extract 阶段的重要内容是:

  • 选择数据源

  • 决定提取什么数据

  • 如何查询或检索数据(可用什么方法,取决于数据源)

让我们从创建基础开始,然后执行以下步骤:

  • 1.查询数据源

创建或添加项目文件夹。

mkdir node-etl

进入全屏模式 退出全屏模式

使用npm init -y初始化项目,以便能够安装节点包。

cd node-etl
npm init -y

进入全屏模式 退出全屏模式

安装node-fetch

npm install node-fetch

进入全屏模式 退出全屏模式

1.查询数据源

创建一个extract.js文件,用于处理查询数据。

touch extract.js

进入全屏模式 退出全屏模式

如果您想了解有关发出 API 请求的更多信息,请查看如何在节点中发出 API 请求。

添加测试代码。我们将查询特定相册的照片。

const fetch = require('node-fetch');

const URL = 'https://jsonplaceholder.typicode.com/albums';

async function getPhotos(albumId) {
  return await fetch(`${URL}/${albumId}/photos`).then(response =>
    response.json(),
  );
}

module.exports = { getPhotos };

进入全屏模式 退出全屏模式

getPhotos()异步函数将albumId作为参数,将向假 API 发出GET请求并返回一个数组,其中每个对象具有以下接口:

interface Photo {
  albumId: number;
  id: number;
  title: string;
  url: string;
  thumbnailUrl: string;
}

进入全屏模式 退出全屏模式

例如,我们将跳过对 albumId 的输入参数的验证,它不应超过 100,否则将返回一个空数组。我们正在导出函数getPhotos,因此我们可以将其导入index.js,这将协调 ETL 管道。查看如何组织 Node文章以获取可维护的 Node.js 代码。

2.设置管道

创建一个index.js文件,该文件将作为应用程序的入口点,用于编排 ETL 管道。

touch index.js

进入全屏模式 退出全屏模式

我们将创建一个orchestrateEtlPipeline()函数,它将协调 ETL 管道中的每个步骤:提取数据、转换数据并将其加载到目的地。

const { getPhotos } = require('./extract');

const orchestrateEtlPipeline = async () => {
  try {
    const photosAlbum = await getPhotos(1);
    console.log(photosAlbum);

    // TODO - TRANSFORM

    // TODO - LOAD
  } catch (error) {
    console.error(error);
  }
};

orchestrateEtlPipeline();

进入全屏模式 退出全屏模式

orchestrateEtlPipeline()函数在try/catch块中再次使用async/await来处理任何错误或承诺拒绝。

3.等待多个请求

在示例代码中,我们只发出一个请求,但是如果我们想对同一个数据源发出更多请求或从多个源中提取数据怎么办。使用方法Promise.all,我们可以在继续之前等待所有提取的数据。但是,此方法会立即触发所有请求,这可能会使某些源不堪重负(想想对 DB 的多个密集请求)。阅读更多关于 Node.js](https://www.mariokandut.com/how-to-wait-for-multiple-promises-in-node-javascript/)中的[多个 Promise 的信息。

从多个相册请求照片的示例如下所示:

const { getPhotos } = require('./extract');

const orchestrateEtlPipeline = async () => {
  try {
    const allPhotos = Promise.all([
      getPhotos(1),
      getPhotos(2),
      getPhotos(3),
    ]);
    const [photos1, photos2, photos3] = await allPhotos;

    console.log(photos1[0], photos2[0], photos3[0]); // to log the first photo object of all three albums

    // TODO - TRANSFORM

    // TODO - LOAD
  } catch (error) {
    console.error(error);
  }
};

orchestrateEtlPipeline();

进入全屏模式 退出全屏模式

提取数据后,ETL 管道的下一步就是对其进行转换。

TL;DR

  • ETL 管道的第一步是从数据源中提取数据。

  • 此阶段重要的是选择数据源,决定提取哪些数据以及如何查询或检索数据。

感谢您阅读,如果您有任何问题,请使用评论功能或给我发消息 @mariokandut

如果您想了解更多关于 节点 的信息,请查看这些节点教程。

参考(非常感谢):

HeyNode,MDN 异步/等待

Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐