为 Slack 中的流式推文构建 dockerized ETL 管道
我的数据科学训练营中的一个项目是关于创建推文数据库以及他们的情绪得分,并在 Slack 频道中发布积极的推文。该管道必须与Docker Compose进行协调。管道如下所示:
[
](https://res.cloudinary.com/practicaldev/image/fetch/s--2ZnUH2Vo--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://n8n.io /blog/content/images/2021/04/ETL_pipeline_simple2-1.png)
在这篇文章中,我将向您展示我是如何设置每个步骤的。
0\。先决条件和技术栈
下面是我在这个项目中使用的应用程序、服务和库的概述:
应用程序和数据库
Python 库
推特
tweepy&vader
松弛
slackclient
MongoDB
pymongo
PostgreSQL
psycopg2-binary&sqlalchemy
码头工人组成
-
1\。收集推文
为了收集推文,我使用了Twitter API和tweepy库。
首先,我在 Twitter上创建了一个应用程序并获得了我的凭据(API 密钥和访问令牌)。然后,我使用tweepy和我的 Twitter 凭据编写了用于流式传输实时推文](https://github.com/lorenanda/tweets-docker-pipeline/tree/main/docker-compose/tweet_collector)的[Python 代码。我选择流式传输标签 #OnThisDay (认为每天收到几年前发生的事情的通知会很有趣)并收集推文文本和用户句柄。
from tweepy import OAuthHandler, Stream, API
from tweepy.streaming import StreamListener
tweet = {
'username': t['user']['screen_name'],
'text': t['text'],
}
stream_listener = StreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(track=['OnThisDay'])
进入全屏模式 退出全屏模式
2\。在 MongoDB 中存储推文
收集推文后,我必须将它们存储在 MongoDB 中,这是一个非关系 (NoSQL) 数据库,将数据存储在类似 JSON 的文档中。由于推文数据是作为键值对(JSON 格式)收集的,因此 MongoDB 是存储此信息的好方法。
首先,我必须创建一个 MongoDB 实例,设置一个集群,并在其中创建一个数据库和一个集合:
1.创建一个MongoDB账户
2、搭建集群:cloud.mongodb.com > Clusters > Create New Cluster
-
创建数据库:Cluster > Collections > Create Database
-
创建集合:Cluster > Collections > Database > Create Collection
5.创建一个字段:Collection > Insert document > 在_id下面输入字段text
-
允许访问数据库:项目 > 安全 > 网络访问 > IP 访问列表 > 添加您的 IP 地址。
-
从终端连接到数据库:\
mongo "mongodb+srv://YourClusterName.mongodb.net/YourDatabaseName" --username YourUsername
其次,我使用pymongo库编写了Python 代码,用于在 MongoDB中存储推文。
import pymongo
client = pymongo.MongoClient(host='mongo_container', port=27018)
db = client.tweets_db
def warning_log(tweet):
logging.critical('\n\nTWEET: ' + tweet['username'] + 'just tweeted: ' + tweet['text'])
db.collections.onthisday.insert_one(tweet)
进入全屏模式 退出全屏模式
主机mongo_container是 Docker 容器之一,在第 5 节中进行了说明。
3\。执行 ETL 作业
ETL(提取、转换、加载)作业涉及三个操作:从 MongoDB 中提取推文,分析其情绪,并将它们存储到新的 Postgres 数据库中。这是 ETL 作业](https://github.com/lorenanda/tweets-docker-pipeline/tree/main/docker-compose/etl_job)的[Python 代码。
3.1。从 MongoDB 中提取推文
为了从 MongoDB 中提取推文文本,我再次使用了pymongo库。
def extract_tweets():
tweets = list(db.onthisday.find())
if tweets:
t = random.choice(tweets)
logging.critical("Random tweet: "+t["text"])
return t
进入全屏模式 退出全屏模式
3.2.使用情绪分数转换推文
为了分析推文的情绪,我使用了VADER库,它返回(以及其他)复合情绪分数。
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
def transform_tweets(tweet):
tweet_text = tweet['text'].replace("\'","")
sia = SentimentIntensityAnalyzer()
tweet_sia = sia.polarity_scores(tweet_text)['compound']
return tweet_sia
进入全屏模式 退出全屏模式
3.3。将推文加载到 PostgreSQL
要将带有情绪分数的推文加载到 Postgres 数据库中,首先您需要一个 Postgres 数据库。我安装了 Postgres,然后直接从终端创建了一个数据库和一个推文表:
1.连接到Postgres:psql
2.创建数据库:createdb twitter
3.进入创建的数据库:psql twitter
4.在数据库中创建列:CREATE TABLE tweets (text varchar(280), score numeric(4,3));
然后,我使用sqlalchemy库编写了用于将推文插入tweets表的 Python 代码。
def load_tweets(tweet, sentiment):
insert_query = """
INSERT INTO tweets VALUES ('{tweet["text"]}', {tweet_sia});
"""
engine.execute(insert_query)
logging.critical(f'Tweet {tweet["text"]} loaded into Postgres.')
进入全屏模式 退出全屏模式
4\。从 Postgres 中提取推文
在建立了推文数据库及其情绪评分后,我必须选择并提取_一些_推文,然后将其发送到 Slack。
query = pg.execute(
"SELECT text FROM tweets ORDER BY sentiment DESC LIMIT 1")
msg = str(list(query))
output = f'NEW TWEET! {user} just tweeted: {msg} \nSentiment score: {blob_score}'
进入全屏模式 退出全屏模式
5\。使用 Slackbot 发布推文
管道的最后一步是在 Slack 频道中发布推文。为此,首先 I创建了一个 Slackbot。
然后,我编写了用于在 Slack 频道中发布推文的 Python 代码,包括上一步中的代码:
import time
import slack
from sqlalchemy import create_engine
import config
engine = config.PG_ENGINE
webhook_url = config.WEBHOOK_SLACK
while True:
logging.critical("\n\nPositive tweet:\n")
query = pg.execute(
"SELECT text FROM tweets ORDER BY sentiment DESC LIMIT 1")
msg = str(list(query))
logging.critical(msg + "\n")
output = f'NEW TWEET! {user} just tweeted: {msg} \nSentiment score: {blob_score}'
data = {'text': output}
requests.post(url=webhook_url, json=data)
time.sleep(30)
进入全屏模式 退出全屏模式
还有 🎉 –– 这是在 Slack 上发布的推文:
[
](https://res.cloudinary.com/practicaldev/image/fetch/s--quU4N1NY--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https://lorenaciutacu.com/assets/ img/tweetbyslackbot.webp) Slackbot 发布的推文
6\。创建 Docker Compose 管道
这个项目的最后一点是_orchestration_。当您手动运行它们时,每个步骤的各个 Python 脚本都可以工作,但目标是仅使用一个命令从头到尾运行此管道。这就是 Docker Compose 的用武之地。
Compose 是一个用于定义和运行多容器 Docker 应用程序的工具。使用 Compose,您可以使用 YAML 文件来配置应用程序的服务。然后,使用一个命令,您可以从您的配置中创建并启动所有服务。
前面五个步骤中的每一个(或我混乱模式中的矩形)都代表一个Docker 容器,所以在我的docker_compose.yml文件中,我有五个容器(服务):tweet_container、postgres_container、mongo_container、etl_container和slackbot_container.
对于这两个数据库容器,我使用了 Docker 镜像,因为它们不依赖于存储在我的项目文件夹中的自定义代码。对于其他三个容器,我引用了各自的代码位置 (build) 及其依赖项 (depends_on)(例如,tweet_collector依赖于postgres和mongo,因为推文存储在这些数据库中)。
我还使用Dockervolumes在容器停止时保留数据(数据持久性)。
version: '3'
services:
tweet_container:
build: tweet_collector/
depends_on:
- postgres_container
- mongo_container
volumes:
- ./tweet_collector/:/app
postgres_container:
build: postgresdb
image: postgres:13.0
ports:
- 5555:5432
environment:
- POSTGRES_USER=your_user
- POSTGRES_PASSWORD=your_password
mongo_container:
build: mongodb
image: mongo
ports:
- 27018:27018
volumes:
- ./mongodb:/app
etl_container:
build: etl_job/
depends_on:
- postgres_container
- mongo_container
volumes:
- ./etl_job/:/app
slackbot_container:
build: slackbot/
depends_on:
- mongo_container
- postgres_container
volumes:
- ./slackbot/:/app
进入全屏模式 退出全屏模式
最后,这里是我用于管理 Docker 容器的一些 CLI 命令(您可以在他们的文档中找到更多信息):
-
docker images列出所有使用的图像(postgres 和 mongo) -
docker ps -a列出我所有的容器 -
docker -v挂载卷 -
docker build从 Docker 文件构建映像 -
docker run运行容器
就是这样:我的第一个 dockerized ETL 管道 – 一周的工作和几个小时的写作,写在一篇 6 分钟的博客文章中。
更多推荐

所有评论(0)