如何使用 Python 和 Apache Spark 处理日志数据
当今利用分析的最流行和最有效的企业用例之一是日志分析。今天几乎每个组织都有多个系统和基础设施日复一日地运行。为了有效地保持业务运行,这些组织需要知道他们的基础设施是否发挥了最大潜力。找出包括分析系统和应用程序日志,甚至可能对日志数据应用预测分析。所涉及的日志数据量通常很大,具体取决于所涉及的组织基础架构的类型和在其上运行的应用程序。
图片来源:
日志数据处理管道
由于计算限制,我们仅限于在单台机器上分析数据样本的日子已经一去不复返了。在大数据、更好的分布式计算以及Apache Spark等用于大数据处理和开源分析的框架的支持下,我们每天可以对潜在的数十亿条日志消息执行可扩展的日志分析。这个以案例研究为导向的教程的目的是采用实践方法,展示我们如何利用 Spark 对半结构化日志数据进行大规模的日志分析。如果您对使用 Spark 的可扩展SQL感兴趣,请随时使用 Spark_](https://towardsdatascience.com/sql-at-scale-with-apache-spark-sql-and-dataframes-concepts-architecture-and-examples-c567853a702f)查看[_SQL。
虽然有许多出色的开源框架和工具可用于日志分析,例如Elasticsearch,但本教程由两部分组成,旨在展示如何利用 Spark 来大规模分析日志。在现实世界中,您当然可以在分析日志数据时自由选择自己的工具箱。
让我们开始吧!
主要目标:NASA 日志分析
更多 Python 资源
-
什么是IDE?
-
备忘单:适用于初学者的 Python 3.7
-
顶级 Python GUI 框架
-
下载:7 个必不可少的 PyPI 库
-
红帽开发者
-
最新 Python 内容
正如我们之前提到的,Apache Spark 是一个优秀且理想的开源框架,用于大规模处理、分析和建模结构化和非结构化数据!在本教程中,我们的主要目标是行业中最流行的用例之一——日志分析。服务器日志是常见的企业数据源,通常包含可操作的见解和信息的金矿。在这些条件下,日志数据来自许多来源,例如 Web、客户端和计算服务器、应用程序、用户生成的内容和平面文件。这些日志可用于监控服务器、改进业务和客户智能、构建推荐系统、欺诈检测等等。
Spark 允许您廉价地转储日志并将其存储到磁盘上的文件中,同时仍提供丰富的 API 来执行大规模数据分析。这个动手案例研究将向您展示如何在来自NASA的实际生产日志中使用 Apache Spark,同时学习数据整理和用于探索性数据分析的基本但强大的技术。在这项研究中,我们将分析来自佛罗里达州NASA 肯尼迪航天中心Web 服务器的日志数据集。
完整的数据集(包含对 NASA 肯尼迪航天中心的所有 HTTP 请求的两个月的价值)可在此处免费获取以供下载。或者,如果您更喜欢 FTP:
-
7 月 1 日至 7 月 31 日,ASCII 格式,20.7 MB gzip 压缩,205.2 MB 未压缩:ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz
-
8 月 4 日至 8 月 31 日,ASCII 格式,21.8 MB gzip 压缩,167.8 MB 未压缩:ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz
接下来,如果您想继续学习,请从my GitHub下载教程,并将这两个文件放在与教程的Jupyter Notebook相同的目录中。
设置依赖
第一步是确保您可以访问 Spark 会话和集群。对于此步骤,您可以使用自己的本地 Spark 设置或基于云的设置。通常,如今大多数云平台都提供 Spark 集群,您还可以选择免费选项,包括Databricks 社区版。本教程假设您已经设置了 Spark,因此我们不会花费额外的时间从头开始配置或设置 Spark。
通常,在您启动 Jupyter Notebook 服务器时,预配置的 Spark 设置已经预加载了必要的环境变量或依赖项。就我而言,我可以在笔记本中使用以下命令检查它们:
spark
这些结果表明我的集群目前正在运行 Spark 2.4.0。我们还可以使用以下代码检查 sqlContext 是否存在:
sqlContext
**<pyspark.sql.context.SQLContext at 0x7fb1577b6400>**
现在,如果您没有预先配置这些变量并出现错误,您可以使用以下代码加载和配置它们:
# 配置火花变量
从 pyspark.context 导入 SparkContext
从 pyspark.sql.context 导入 SQLContext
从 pyspark.sql.session 导入 SparkSession
sc \u003d SparkContext()
sqlContext \u003d SQLContext(sc)
火花 \u003d SparkSession(sc)
# 加载其他依赖
重新进口
将熊猫导入为 pd
我们还需要加载其他库来处理DataFrames和正则表达式。使用正则表达式是解析日志文件的主要方面之一。该工具提供了一种强大的模式匹配技术,可用于提取和查找半结构化和非结构化数据中的模式。
图片来源:
Perl 问题从xkcd中剥离。
正则表达式可能非常有效和强大,但它们也可能会让人不知所措和困惑。不过不用担心,通过练习,您可以真正发挥他们的最大潜力。以下示例展示了在Python中使用正则表达式的方法。在这里,我们尝试在给定的输入句子中找到单词_'spark'_的所有出现。
m \u003d re.finditer(r'.*?(spark).*?', "我正在 PySpark 中寻找火花", re.I)
在 m 中匹配:
打印(匹配,匹配开始(),匹配结束())
**<_sre.SRE_Match object; span=(0, 25), match=“I’m searching for a spark”> 0 25 <_sre.SRE_Match object; span=(25, 36), match=’ in PySpark’> 25 36**
让我们继续分析的下一部分。
加载和查看 NASA 日志数据集
鉴于我们的数据存储在以下路径中(以平面文件的形式),让我们将其加载到 DataFrame 中。我们将分步进行。以下代码加载我们磁盘的日志数据文件名:
导入全局
原始_data_files \u003d glob.glob('*.gz')
原始_数据_文件
**[‘NASA_access_log_Jul95.gz’, ‘NASA_access_log_Aug95.gz’]**
现在,我们将使用 sqlContext.read.text() 或 spark.read.text() 来读取文本文件。此代码生成一个 DataFrame,其中包含一个名为 value 的字符串列:
基础_df u003d spark.read.text(原始_data_files)
基础_df.printSchema()
**root |-- value: string (nullable = true)**
此输出允许我们查看我们将很快检查的日志数据模式的文本。您可以使用以下代码查看保存我们日志数据的数据结构类型:
type(base_df)
**pyspark.sql.dataframe.DataFrame**
在本教程中,我们使用 Spark DataFrames。但是,如果需要,您还可以通过添加以下代码将 DataFrame 转换为弹性分布式数据集 (RDD)— Spark 的原始数据结构 () — 如果需要:
基地_df_rdd \u003d基地_df.rdd
类型(基础_df_rdd)
**pyspark.rdd.RDD**
现在让我们看一下 DataFrame 中的实际日志数据:
base_df.show(10, truncate=False)
图片来源:
base_df.show 数据框中的日志数据
这个结果绝对看起来像标准的半结构化服务器日志数据。在这个文件有用之前,我们肯定需要做一些数据处理和争论。请记住,从 RDD 访问数据略有不同,如下所示:
base_df_rdd.take(10)
图片来源:
图 5:通过 base_df_rdd 的数据帧内的日志数据
现在我们已经加载并查看了我们的日志数据,让我们对其进行处理和处理。
数据整理
在本节中,我们清理和解析我们的日志数据集,以从每条日志消息中提取具有有意义信息的结构化属性。
日志数据理解
如果您熟悉 Web 服务器日志,您会发现上面显示的数据采用通用日志格式。这些字段是:
**remotehost**** rfc931 ****authuser**** [date] "request" status bytes**
场地
描述
远程主机
远程主机名(如果 DNS 主机名不可用或DNSLookup关闭,则为 IP 号)。
rfc931
用户的远程登录名(如果存在)。
authuser
HTTP服务器认证后远程用户的用户名。
[日期]
请求的日期和时间。
“要求”
请求,与来自浏览器或客户端的请求完全相同。
地位
HTTP 状态码服务器发回给客户端。
字节
传输到客户端的字节数 (Content-Length
)。
我们现在需要从日志数据中解析、匹配和提取这些属性的技术。
用正则表达式进行数据解析和提取
接下来,我们必须将半结构化日志数据解析为单独的列。我们将使用特殊的内置 regexp_extract() 函数进行解析。此函数将列与具有一个或多个的正则表达式匹配 capturegroups,并允许您提取匹配的组之一。我们将为要提取的每个字段使用一个正则表达式。
您现在一定已经听过或使用过相当多的正则表达式。如果您发现正则表达式令人困惑(而且它们肯定会_可以_),并且您想了解更多关于它们的信息,我们建议您查看RegexOne 网站。您可能还会发现 Goyvaerts 和 Levithan 编写的Regular Expressions Cookbook是有用的参考。
让我们看看我们在数据集中处理的日志总数:
print((base_df.count(), len(base_df.columns)))
**(3461613, 1)**
看起来我们总共有大约 346 万条日志消息。不是一个小数目!让我们提取并查看一些示例日志消息:
样本_logs \u003d [item['value'] for item in base_df.take(15)]
样本_日志
图片来源:
示例日志消息。
提取主机名
让我们编写一些正则表达式来从日志中提取主机名:
主机_pattern \u003d r'(^\S+.[\S+.]+\S+)\s'
主机 \u003d [re.search(host_pattern, item).group(1)
如果重新搜索(主机_模式,项目)
否则'不匹配'
样品中的项目_logs]
主机
**[‘199.72.81.55’, ‘unicomp6.unicomp.net’, ‘199.120.110.21’, ‘burger.letters.com’, …, …, ‘unicomp6.unicomp.net’, ‘d104.aa.net’, ‘d104.aa.net’]**
提取时间戳
让我们使用正则表达式从日志中提取时间戳字段:
ts_pattern \u003d r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{ 4})]'
时间戳 \u003d [re.search(ts_pattern, item).group(1) for item in sample_logs]
时间戳
**[‘01/Jul/1995:00:00:01 -0400’, ‘01/Jul/1995:00:00:06 -0400’, ‘01/Jul/1995:00:00:09 -0400’, …, …, ‘01/Jul/1995:00:00:14 -0400’, ‘01/Jul/1995:00:00:15 -0400’, ‘01/Jul/1995:00:00:15 -0400’]**
提取 HTTP 请求方法、URI 和协议
现在让我们使用正则表达式从日志中提取 HTTP 请求方法、URI、和协议模式字段:
方法_uri_protocol_pattern \u003d r'"(\S+)\s(\S+)\s*(\S*)"'
方法_uri_protocol \u003d [re.search(method_uri_protocol_pattern, item).groups()
如果重新搜索(方法_uri_protocol_pattern,项目)
否则'不匹配'
样品中的项目_logs]
方法_url\协议
**[(‘GET’, ‘/history/apollo/’, ‘HTTP/1.0’), (‘GET’, ‘/shuttle/countdown/’, ‘HTTP/1.0’), …, …, (‘GET’, ‘/shuttle/countdown/count.gif’, ‘HTTP/1.0’), (‘GET’, ‘/images/NASA-logosmall.gif’, ‘HTTP/1.0’)]**
提取HTTP状态码
现在让我们使用正则表达式从日志中提取 HTTP 状态代码:
状态_pattern \u003d r'\s(\d{3})\s'
status \u003d [re.search(status_pattern, item).group(1) for item in sample_logs]
打印(状态)
**[‘200’, ‘200’, ‘200’, ‘304’, …, ‘200’, ‘200’]**
提取HTTP响应内容大小
现在让我们使用正则表达式从日志中提取 HTTP 响应内容大小:
内容_size_pattern \u003d r'\s(\d+)$'
content_size \u003d [re.search(content_size_pattern, item).group(1) for item in sample_logs]
打印(内容_size)
**[‘6245’, ‘3985’, ‘4085’, ‘0’, …, ‘1204’, ‘40310’, ‘786’]**
放在一起
现在让我们利用我们之前构建的所有正则表达式模式,并使用 regexp_extract(...)
方法来构建我们的 DataFrame,并将所有日志属性整齐地提取到它们自己的单独列中。
从 pyspark.sql.functions 导入正则表达式_extract
logs_df \u003d base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', ts_pattern, 1).alias('timestamp'),
regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
日志_df.show(10,截断\u003d真)
打印((日志_df.count(),长度(日志_df.columns)))
图片来源:
使用 regexp_extract(...) 提取的日志数据帧
查找缺失值
缺失值和空值是数据分析和机器学习的祸根。让我们看看我们的数据解析和提取逻辑工作得如何。首先,让我们验证原始 DataFrame 中没有空行:
(基础_df
.filter(base_df['值']
。一片空白())
。数数())
祖兹 100227
都好!现在,如果我们的数据解析和提取工作正常,我们就不应该有任何具有潜在空值的行。让我们尝试对其进行测试:
坏\行\df \u003d logo\df.filter(logs\df['host'].isNull()|
日志_df['时间戳'].isNull() |
日志_df['方法'].isNull() |
日志_df['端点'].isNull() |
日志_df['状态'].isNull() |
日志_df['content_size'].isNull()|
日志_df['protocol'].isNull())
坏_rows_df.count()
**33905**
哎哟!看起来我们的数据中有超过 33K 的缺失值!我们能处理这个吗?
请记住,这不是一个常规的 pandas (link) DataFrame,您可以直接查询并获取哪些列具有 null。我们所谓的_大数据集_驻留在磁盘上,可能存在于火花集群的多个节点中。那么我们如何找出哪些列有潜在的空值呢?
查找空计数
我们通常可以使用以下技术来找出哪些列具有空值。
注意: 此方法改编自[上StackOverflow的优秀答案](http://stackoverflow.com/a/33901312)。
从 pyspark.sql.functions 导入 col
从 pyspark.sql.functions 导入 sum 作为 spark_sum
def count_null(col_name):
返回 spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)
# 建立一个列表达式列表,每列一个。
exprs \u003d [count\null(col\name) 用于 logs\df.columns] 中的列名
# 运行聚合。 *exprs 将表达式列表转换为
# 可变函数参数。
日志_df.agg(*exprs).show()
图片来源:
使用 count_null() 检查哪些列具有空值
好吧,看起来我们在 status 列中缺少一个值,而其他所有值都在 content_size 列中。让我们看看我们是否能找出问题所在!
处理 HTTP 状态中的空值
我们对 status 列的原始解析正则表达式是:
正则表达式_extract('value', r'\s(\d{3})\s', 1).cast('integer')
.else('状态')
是不是有更多的数字使我们的正则表达式出错了?还是数据点本身不好?让我们来了解一下。
注意:在下面的表达式中,波浪号(**~**)
表示“不”。
null_status_df \u003d base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))
空_status_df.count()
**1**
让我们看看这个糟糕的记录是什么样的:
null_status_df.show(truncate=False)
图片来源:
通过 null_status_df 丢失信息的不良记录
看起来像是有很多缺失信息的记录。让我们通过我们的日志数据解析管道传递它:
bad_status_df \u003d null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
regexp_extract('value', ts_pattern, 1).alias('timestamp'),
regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))
坏\状态\df.show(截断\u003d假)
图片来源:
不包含任何信息和两个空条目的完整错误日志记录。
看起来记录本身是不完整的记录,没有有用的信息,最好的选择是删除此记录,如下所示:
logs_df \u003d logs_df[logs_df['status'].isNotNull()]
exprs \u003d [count\null(col\name) 用于 logs\df.columns] 中的列名
日志_df.agg(*exprs).show()
图片来源:
掉落的记录。
处理 HTTP 内容大小中的空值
根据我们之前的正则表达式,我们对 content_size 列的原始解析正则表达式是:
正则表达式_extract('value', r'\s(\d+)$', 1).cast('integer')
.alias('内容_size')
我们的原始数据集本身会丢失数据吗?让我们来了解一下。我们首先在基础 DataFrame 中找到可能缺少内容大小的记录:
null_content_size_df \u003d base_df.filter(~base_df['value'].rlike(r'\s\d+$'))
空_content_size_df.count()
**33905**
该数字似乎与我们处理的 DataFrame 中缺失的内容大小值的数量相匹配。让我们看一下我们的数据框中缺少内容大小的前 10 条记录:
祖兹 100233
图片来源:
缺少内容大小的前 10 个数据框记录。
很明显,错误的原始数据记录对应于错误响应,其中没有发回任何内容,服务器为**content_size**
字段发出 -
。由于我们不想从分析中丢弃这些行,让我们用 0 估算或填充它们。
修复空内容_size的行
最简单的解决方案是将**logs_df**
中的空值替换为 0,就像我们之前讨论的那样。 Spark DataFrame API 提供了一组专门为处理空值而设计的函数和字段,其中包括:
-
zwz100177 **fillna** zwz100178 zwz100176 zwz100180 **()** zwz100181 zwz100179 )
,用指定的非空值填充空值。 -
zwz100183 **na** zwz100184 zwz100182
,它返回一个zwz100186 DataFrameNaFunctions zwz100187 zwz100185
对象,其中包含许多用于操作空列的函数。
有几种方法可以调用此函数。最简单的方法就是用已知值替换所有空列。但是,为了安全起见,最好传递包含**(column_name, value)**
映射的 Python 字典。这就是我们要做的。文档中的一个示例如下所示:
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+----+------+-------+
|年龄|身高|姓名|
+----+------+-------+
| 10| 80|爱丽丝|
| 5|空|鲍勃|
| 50|空|汤姆|
| 50|空|未知|
+----+------+-------+
现在我们使用此函数将**content_size**
字段中的所有缺失值填充为 0:
日志_df \u003d 日志_df.na.fill({'content_size': 0})
exprs \u003d [count\null(col\name) 用于 logs\df.columns] 中的列名
日志_df.agg(*exprs).show()
图片来源:
空值现在被零替换。
看那个,没有缺失值!
处理时间字段(时间戳)
现在我们有了一个干净的、已解析的 DataFrame,我们必须将时间戳字段解析为实际的时间戳。通用日志格式时间有点不标准。用户定义函数 (UDF)是解析它的最直接方法:
从 pyspark.sql.functions 导入 udf
月_map \u003d {
'John':1,'Pheb':2,'Mar':3,'Ape':4,'May':5,'June':6,'Jul':7,
“八月”:8,“九月”:9,“十月”:10,“十一月”:11,“十二月”:12
}
def 解析_clf_time(文本):
""" 将 Common Log 时间格式转换为 Python 日期时间对象
参数:
text (str): Apache 时间格式的日期和时间 [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
回报:
适合传递给 CAST('timestamp') 的字符串
"""
# 注意:我们在这里忽略了时区,可能需要根据您要解决的问题进行处理
返回“{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}”.format(
整数(文本[7:11]),
月_map[文本[3:6]],
整数(文本[0:2]),
整数(文本[12:14]),
整数(文本[15:17]),
整数(文本[18:20])
)
现在让我们使用这个函数来解析 DataFrame 的**time**
列:
udf_parse_time \u003d udf(parse_clf_time)
logs_df \u003d (logs_df.select('*', udf_parse_time(logs_df['timestamp'])
.cast('时间戳')
.alias('时间'))
.drop('时间戳')
日志_df.show(10,截断\u003d真)
图片来源:
使用用户定义函数 (UDF) 解析的时间戳。
事情似乎看起来不错!让我们通过检查 DataFrame 的模式来验证这一点:
logs_df.printSchema()
**root |-- host: string (nullable = true) |-- method: string (nullable = true) |-- endpoint: string (nullable = true) |-- protocol: string (nullable = true) |-- status: integer (nullable = true) |-- content_size: integer (nullable = false)** **|-- time: timestamp (nullable = true)**
现在让我们缓存 logs_df
,因为我们将在本系列的第二部分的数据分析部分中广泛使用它。
logs_df.cache()
结论
获取、处理和整理数据是任何端到端数据科学或分析用例中最重要的步骤。在大规模处理半结构化或非结构化数据时,事情开始变得更加困难。本案例研究为您提供了一个逐步实践的方法,以利用 Python 和 Spark 等开源工具和框架的强大功能来大规模处理和处理半结构化的 NASA 日志数据。一旦我们准备好一个干净的数据集,我们终于可以开始使用它来获得有关 NASA 服务器的有用信息。点击进入本系列的第二篇文章,获取动手教程使用 Python 和 Apache Spark 分析和可视化 NASA 日志数据。
zoz100036 * *
本文最初出现在 Medium 的Towards Data Science频道,经许可转载。
zoz100037 * *
更多推荐
所有评论(0)