当今利用分析的最流行和最有效的企业用例之一是日志分析。今天几乎每个组织都有多个系统和基础设施日复一日地运行。为了有效地保持业务运行,这些组织需要知道他们的基础设施是否发挥了最大潜力。找出包括分析系统和应用程序日志,甚至可能对日志数据应用预测分析。所涉及的日志数据量通常很大,具体取决于所涉及的组织基础架构的类型和在其上运行的应用程序。

Doug Henschen 的日志数据处理管道。

图片来源:

日志数据处理管道

由于计算限制,我们仅限于在单台机器上分析数据样本的日子已经一去不复返了。在大数据、更好的分布式计算以及Apache Spark等用于大数据处理和开源分析的框架的支持下,我们每天可以对潜在的数十亿条日志消息执行可扩展的日志分析。这个以案例研究为导向的教程的目的是采用实践方法,展示我们如何利用 Spark 对半结构化日志数据进行大规模的日志分析。如果您对使用 Spark 的可扩展SQL感兴趣,请随时使用 Spark_](https://towardsdatascience.com/sql-at-scale-with-apache-spark-sql-and-dataframes-concepts-architecture-and-examples-c567853a702f)查看[_SQL。

虽然有许多出色的开源框架和工具可用于日志分析,例如Elasticsearch,但本教程由两部分组成,旨在展示如何利用 Spark 来大规模分析日志。在现实世界中,您当然可以在分析日志数据时自由选择自己的工具箱。

让我们开始吧!

主要目标:NASA 日志分析

更多 Python 资源

  • 什么是IDE?

  • 备忘单:适用于初学者的 Python 3.7

  • 顶级 Python GUI 框架

  • 下载:7 个必不可少的 PyPI 库

  • 红帽开发者

  • 最新 Python 内容

正如我们之前提到的,Apache Spark 是一个优秀且理想的开源框架,用于大规模处理、分析和建模结构化和非结构化数据!在本教程中,我们的主要目标是行业中最流行的用例之一——日志分析。服务器日志是常见的企业数据源,通常包含可操作的见解和信息的金矿。在这些条件下,日志数据来自许多来源,例如 Web、客户端和计算服务器、应用程序、用户生成的内容和平面文件。这些日志可用于监控服务器、改进业务和客户智能、构建推荐系统、欺诈检测等等。

Spark 允许您廉价地转储日志并将其存储到磁盘上的文件中,同时仍提供丰富的 API 来执行大规模数据分析。这个动手案例研究将向您展示如何在来自NASA的实际生产日志中使用 Apache Spark,同时学习数据整理和用于探索性数据分析的基本但强大的技术。在这项研究中,我们将分析来自佛罗里达州NASA 肯尼迪航天中心Web 服务器的日志数据集。

完整的数据集(包含对 NASA 肯尼迪航天中心的所有 HTTP 请求的两个月的价值)可在此处免费获取以供下载。或者,如果您更喜欢 FTP:

  • 7 月 1 日至 7 月 31 日,ASCII 格式,20.7 MB gzip 压缩,205.2 MB 未压缩:ftp://ita.ee.lbl.gov/traces/NASA_access_log_Jul95.gz

  • 8 月 4 日至 8 月 31 日,ASCII 格式,21.8 MB gzip 压缩,167.8 MB 未压缩:ftp://ita.ee.lbl.gov/traces/NASA_access_log_Aug95.gz

接下来,如果您想继续学习,请从my GitHub下载教程,并将这两个文件放在与教程的Jupyter Notebook相同的目录中。

设置依赖

第一步是确保您可以访问 Spark 会话和集群。对于此步骤,您可以使用自己的本地 Spark 设置或基于云的设置。通常,如今大多数云平台都提供 Spark 集群,您还可以选择免费选项,包括Databricks 社区版。本教程假设您已经设置了 Spark,因此我们不会花费额外的时间从头开始配置或设置 Spark。

通常,在您启动 Jupyter Notebook 服务器时,预配置的 Spark 设置已经预加载了必要的环境变量或依赖项。就我而言,我可以在笔记本中使用以下命令检查它们:

spark

火花会话

这些结果表明我的集群目前正在运行 Spark 2.4.0。我们还可以使用以下代码检查 sqlContext 是否存在:

sqlContext

**<pyspark.sql.context.SQLContext at 0x7fb1577b6400>**

现在,如果您没有预先配置这些变量并出现错误,您可以使用以下代码加载和配置它们:

# 配置火花变量

从 pyspark.context 导入 SparkContext

从 pyspark.sql.context 导入 SQLContext

从 pyspark.sql.session 导入 SparkSession

sc \u003d SparkContext()

sqlContext \u003d SQLContext(sc)

火花 \u003d SparkSession(sc)

# 加载其他依赖

重新进口

将熊猫导入为 pd

我们还需要加载其他库来处理DataFrames和正则表达式。使用正则表达式是解析日志文件的主要方面之一。该工具提供了一种强大的模式匹配技术,可用于提取和查找半结构化和非结构化数据中的模式。

来自 xkcd 的 Perl 问题。

图片来源:

Perl 问题从xkcd中剥离。

正则表达式可能非常有效和强大,但它们也可能会让人不知所措和困惑。不过不用担心,通过练习,您可以真正发挥他们的最大潜力。以下示例展示了在Python中使用正则表达式的方法。在这里,我们尝试在给定的输入句子中找到单词_'spark'_的所有出现。

m \u003d re.finditer(r'.*?(spark).*?', "我正在 PySpark 中寻找火花", re.I)

在 m 中匹配:

打印(匹配,匹配开始(),匹配结束())

**<_sre.SRE_Match object; span=(0, 25), match=“I’m searching for a spark”> 0 25 <_sre.SRE_Match object; span=(25, 36), match=’ in PySpark’> 25 36**

让我们继续分析的下一部分。

加载和查看 NASA 日志数据集

鉴于我们的数据存储在以下路径中(以平面文件的形式),让我们将其加载到 DataFrame 中。我们将分步进行。以下代码加载我们磁盘的日志数据文件名:

导入全局

原始_data_files \u003d glob.glob('*.gz')

原始_数据_文件

**[‘NASA_access_log_Jul95.gz’, ‘NASA_access_log_Aug95.gz’]**

现在,我们将使用 sqlContext.read.text()spark.read.text() 来读取文本文件。此代码生成一个 DataFrame,其中包含一个名为 value 的字符串列:

基础_df u003d spark.read.text(原始_data_files)

基础_df.printSchema()

**root |-- value: string (nullable = true)**

此输出允许我们查看我们将很快检查的日志数据模式的文本。您可以使用以下代码查看保存我们日志数据的数据结构类型:

type(base_df)

**pyspark.sql.dataframe.DataFrame**

在本教程中,我们使用 Spark DataFrames。但是,如果需要,您还可以通过添加以下代码将 DataFrame 转换为弹性分布式数据集 (RDD)— Spark 的原始数据结构 () — 如果需要:

基地_df_rdd \u003d基地_df.rdd

类型(基础_df_rdd)

**pyspark.rdd.RDD**

现在让我们看一下 DataFrame 中的实际日志数据:

base_df.show(10, truncate=False)

数据帧内的日志数据。

图片来源:

base_df.show 数据框中的日志数据

这个结果绝对看起来像标准的半结构化服务器日志数据。在这个文件有用之前,我们肯定需要做一些数据处理和争论。请记住,从 RDD 访问数据略有不同,如下所示:

base_df_rdd.take(10)

来自弹性分布式数据集的日志数据。

图片来源:

图 5:通过 base_df_rdd 的数据帧内的日志数据

现在我们已经加载并查看了我们的日志数据,让我们对其进行处理和处理。

数据整理

在本节中,我们清理和解析我们的日志数据集,以从每条日志消息中提取具有有意义信息的结构化属性。

日志数据理解

如果您熟悉 Web 服务器日志,您会发现上面显示的数据采用通用日志格式。这些字段是:

**remotehost**** rfc931 ****authuser**** [date] "request" status bytes**

场地

描述

远程主机

远程主机名(如果 DNS 主机名不可用或DNSLookup关闭,则为 IP 号)。

rfc931

用户的远程登录名(如果存在)。

authuser

HTTP服务器认证后远程用户的用户名。

[日期]

请求的日期和时间。

“要求”

请求,与来自浏览器或客户端的请求完全相同。

地位

HTTP 状态码服务器发回给客户端。

字节

传输到客户端的字节数 (Content-Length)。

我们现在需要从日志数据中解析、匹配和提取这些属性的技术。

用正则表达式进行数据解析和提取

接下来,我们必须将半结构化日志数据解析为单独的列。我们将使用特殊的内置 regexp_extract() 函数进行解析。此函数将列与具有一个或多个的正则表达式匹配 capturegroups,并允许您提取匹配的组之一。我们将为要提取的每个字段使用一个正则表达式。

您现在一定已经听过或使用过相当多的正则表达式。如果您发现正则表达式令人困惑(而且它们肯定会_可以_),并且您想了解更多关于它们的信息,我们建议您查看RegexOne 网站。您可能还会发现 Goyvaerts 和 Levithan 编写的Regular Expressions Cookbook是有用的参考。

让我们看看我们在数据集中处理的日志总数:

print((base_df.count(), len(base_df.columns)))

**(3461613, 1)**

看起来我们总共有大约 346 万条日志消息。不是一个小数目!让我们提取并查看一些示例日志消息:

样本_logs \u003d [item['value'] for item in base_df.take(15)]

样本_日志

示例日志消息。

图片来源:

示例日志消息。

提取主机名

让我们编写一些正则表达式来从日志中提取主机名:

主机_pattern \u003d r'(^\S+.[\S+.]+\S+)\s'

主机 \u003d [re.search(host_pattern, item).group(1)

如果重新搜索(主机_模式,项目)

否则'不匹配'

样品中的项目_logs]

主机

**[‘199.72.81.55’, ‘unicomp6.unicomp.net’, ‘199.120.110.21’, ‘burger.letters.com’, …, …, ‘unicomp6.unicomp.net’, ‘d104.aa.net’, ‘d104.aa.net’]**

提取时间戳

让我们使用正则表达式从日志中提取时间戳字段:

ts_pattern \u003d r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{ 4})]'

时间戳 \u003d [re.search(ts_pattern, item).group(1) for item in sample_logs]

时间戳

**[‘01/Jul/1995:00:00:01 -0400’, ‘01/Jul/1995:00:00:06 -0400’, ‘01/Jul/1995:00:00:09 -0400’, …, …, ‘01/Jul/1995:00:00:14 -0400’, ‘01/Jul/1995:00:00:15 -0400’, ‘01/Jul/1995:00:00:15 -0400’]**

提取 HTTP 请求方法、URI 和协议

现在让我们使用正则表达式从日志中提取 HTTP 请求方法、URI、和协议模式字段:

方法_uri_protocol_pattern \u003d r'"(\S+)\s(\S+)\s*(\S*)"'

方法_uri_protocol \u003d [re.search(method_uri_protocol_pattern, item).groups()

如果重新搜索(方法_uri_protocol_pattern,项目)

否则'不匹配'

样品中的项目_logs]

方法_url\协议

**[(‘GET’, ‘/history/apollo/’, ‘HTTP/1.0’), (‘GET’, ‘/shuttle/countdown/’, ‘HTTP/1.0’), …, …, (‘GET’, ‘/shuttle/countdown/count.gif’, ‘HTTP/1.0’), (‘GET’, ‘/images/NASA-logosmall.gif’, ‘HTTP/1.0’)]**

提取HTTP状态码

现在让我们使用正则表达式从日志中提取 HTTP 状态代码:

状态_pattern \u003d r'\s(\d{3})\s'

status \u003d [re.search(status_pattern, item).group(1) for item in sample_logs]

打印(状态)

**[‘200’, ‘200’, ‘200’, ‘304’, …, ‘200’, ‘200’]**

提取HTTP响应内容大小

现在让我们使用正则表达式从日志中提取 HTTP 响应内容大小:

内容_size_pattern \u003d r'\s(\d+)$'

content_size \u003d [re.search(content_size_pattern, item).group(1) for item in sample_logs]

打印(内容_size)

**[‘6245’, ‘3985’, ‘4085’, ‘0’, …, ‘1204’, ‘40310’, ‘786’]**

放在一起

现在让我们利用我们之前构建的所有正则表达式模式,并使用 regexp_extract(...) 方法来构建我们的 DataFrame,并将所有日志属性整齐地提取到它们自己的单独列中。

从 pyspark.sql.functions 导入正则表达式_extract

logs_df \u003d base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),

regexp_extract('value', ts_pattern, 1).alias('timestamp'),

regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),

regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),

regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),

regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),

regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

日志_df.show(10,截断\u003d真)

打印((日志_df.count(),长度(日志_df.columns)))

使用 regexp_extract(...) 提取的日志数据帧

图片来源:

使用 regexp_extract(...) 提取的日志数据帧

查找缺失值

缺失值和空值是数据分析和机器学习的祸根。让我们看看我们的数据解析和提取逻辑工作得如何。首先,让我们验证原始 DataFrame 中没有空行:

(基础_df

.filter(base_df['值']

。一片空白())

。数数())

祖兹 100227

都好!现在,如果我们的数据解析和提取工作正常,我们就不应该有任何具有潜在空值的行。让我们尝试对其进行测试:

坏\行\df \u003d logo\df.filter(logs\df['host'].isNull()|

日志_df['时间戳'].isNull() |

日志_df['方法'].isNull() |

日志_df['端点'].isNull() |

日志_df['状态'].isNull() |

日志_df['content_size'].isNull()|

日志_df['protocol'].isNull())

坏_rows_df.count()

**33905**

哎哟!看起来我们的数据中有超过 33K 的缺失值!我们能处理这个吗?

请记住,这不是一个常规的 pandas (link) DataFrame,您可以直接查询并获取哪些列具有 null。我们所谓的_大数据集_驻留在磁盘上,可能存在于火花集群的多个节点中。那么我们如何找出哪些列有潜在的空值呢?

查找空计数

我们通常可以使用以下技术来找出哪些列具有空值。

注意: 此方法改编自[上StackOverflow的优秀答案](http://stackoverflow.com/a/33901312)。

从 pyspark.sql.functions 导入 col

从 pyspark.sql.functions 导入 sum 作为 spark_sum

def count_null(col_name):

返回 spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

# 建立一个列表达式列表,每列一个。

exprs \u003d [count\null(col\name) 用于 logs\df.columns] 中的列名

# 运行聚合。 *exprs 将表达式列表转换为

# 可变函数参数。

日志_df.agg(*exprs).show()

检查哪些列有空值。

图片来源:

使用 count_null() 检查哪些列具有空值

好吧,看起来我们在 status 列中缺少一个值,而其他所有值都在 content_size 列中。让我们看看我们是否能找出问题所在!

处理 HTTP 状态中的空值

我们对 status 列的原始解析正则表达式是:

正则表达式_extract('value', r'\s(\d{3})\s', 1).cast('integer')

.else('状态')

是不是有更多的数字使我们的正则表达式出错了?还是数据点本身不好?让我们来了解一下。

注意:在下面的表达式中,波浪号(**~**)表示“不”。

null_status_df \u003d base_df.filter(~base_df['value'].rlike(r'\s(\d{3})\s'))

空_status_df.count()

**1**

让我们看看这个糟糕的记录是什么样的:

null_status_df.show(truncate=False)

缺少信息的不良记录。

图片来源:

通过 null_status_df 丢失信息的不良记录

看起来像是有很多缺失信息的记录。让我们通过我们的日志数据解析管道传递它:

bad_status_df \u003d null_status_df.select(regexp_extract('value', host_pattern, 1).alias('host'),

regexp_extract('value', ts_pattern, 1).alias('timestamp'),

regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),

regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),

regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),

regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),

regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

坏\状态\df.show(截断\u003d假)

不包含任何信息和两个空条目的完整错误日志记录。

图片来源:

不包含任何信息和两个空条目的完整错误日志记录。

看起来记录本身是不完整的记录,没有有用的信息,最好的选择是删除此记录,如下所示:

logs_df \u003d logs_df[logs_df['status'].isNotNull()]

exprs \u003d [count\null(col\name) 用于 logs\df.columns] 中的列名

日志_df.agg(*exprs).show()

被丢弃的记录。

图片来源:

掉落的记录。

处理 HTTP 内容大小中的空值

根据我们之前的正则表达式,我们对 content_size 列的原始解析正则表达式是:

正则表达式_extract('value', r'\s(\d+)$', 1).cast('integer')

.alias('内容_size')

我们的原始数据集本身会丢失数据吗?让我们来了解一下。我们首先在基础 DataFrame 中找到可能缺少内容大小的记录:

null_content_size_df \u003d base_df.filter(~base_df['value'].rlike(r'\s\d+$'))

空_content_size_df.count()

**33905**

该数字似乎与我们处理的 DataFrame 中缺失的内容大小值的数量相匹配。让我们看一下我们的数据框中缺少内容大小的前 10 条记录:

祖兹 100233

缺少内容大小的前 10 个数据框记录。

图片来源:

缺少内容大小的前 10 个数据框记录。

很明显,错误的原始数据记录对应于错误响应,其中没有发回任何内容,服务器为**content_size**字段发出 -。由于我们不想从分析中丢弃这些行,让我们用 0 估算或填充它们。

修复空内容_size的行

最简单的解决方案是将**logs_df**中的空值替换为 0,就像我们之前讨论的那样。 Spark DataFrame API 提供了一组专门为处理空值而设计的函数和字段,其中包括:

  • zwz100177 **fillna** zwz100178 zwz100176 zwz100180 **()** zwz100181 zwz100179 ),用指定的非空值填充空值。

  • zwz100183 **na** zwz100184 zwz100182,它返回一个 zwz100186 DataFrameNaFunctions zwz100187 zwz100185 对象,其中包含许多用于操作空列的函数。

有几种方法可以调用此函数。最简单的方法就是用已知值替换所有空列。但是,为了安全起见,最好传递包含**(column_name, value)**映射的 Python 字典。这就是我们要做的。文档中的一个示例如下所示:

>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()

+----+------+-------+

|年龄|身高|姓名|

+----+------+-------+

| 10| 80|爱丽丝|

| 5|空|鲍勃|

| 50|空|汤姆|

| 50|空|未知|

+----+------+-------+

现在我们使用此函数将**content_size**字段中的所有缺失值填充为 0:

日志_df \u003d 日志_df.na.fill({'content_size': 0})

exprs \u003d [count\null(col\name) 用于 logs\df.columns] 中的列名

日志_df.agg(*exprs).show()

空值现在被零替换。

图片来源:

空值现在被零替换。

看那个,没有缺失值!

处理时间字段(时间戳)

现在我们有了一个干净的、已解析的 DataFrame,我们必须将时间戳字段解析为实际的时间戳。通用日志格式时间有点不标准。用户定义函数 (UDF)是解析它的最直接方法:

从 pyspark.sql.functions 导入 udf

月_map \u003d {

'John':1,'Pheb':2,'Mar':3,'Ape':4,'May':5,'June':6,'Jul':7,

“八月”:8,“九月”:9,“十月”:10,“十一月”:11,“十二月”:12

}

def 解析_clf_time(文本):

""" 将 Common Log 时间格式转换为 Python 日期时间对象

参数:

text (str): Apache 时间格式的日期和时间 [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]

回报:

适合传递给 CAST('timestamp') 的字符串

"""

# 注意:我们在这里忽略了时区,可能需要根据您要解决的问题进行处理

返回“{0:04d}-{1:02d}-{2:02d} {3:02d}:{4:02d}:{5:02d}”.format(

整数(文本[7:11]),

月_map[文本[3:6]],

整数(文本[0:2]),

整数(文本[12:14]),

整数(文本[15:17]),

整数(文本[18:20])

)

现在让我们使用这个函数来解析 DataFrame 的**time** 列:

udf_parse_time \u003d udf(parse_clf_time)

logs_df \u003d (logs_df.select('*', udf_parse_time(logs_df['timestamp'])

.cast('时间戳')

.alias('时间'))

.drop('时间戳')

日志_df.show(10,截断\u003d真)

使用用户定义函数 (UDF) 解析的时间戳。

图片来源:

使用用户定义函数 (UDF) 解析的时间戳。

事情似乎看起来不错!让我们通过检查 DataFrame 的模式来验证这一点:

logs_df.printSchema()

**root |-- host: string (nullable = true) |-- method: string (nullable = true) |-- endpoint: string (nullable = true) |-- protocol: string (nullable = true) |-- status: integer (nullable = true) |-- content_size: integer (nullable = false)** **|-- time: timestamp (nullable = true)**

现在让我们缓存 logs_df,因为我们将在本系列的第二部分的数据分析部分中广泛使用它。

logs_df.cache()

结论

获取、处理和整理数据是任何端到端数据科学或分析用例中最重要的步骤。在大规模处理半结构化或非结构化数据时,事情开始变得更加困难。本案例研究为您提供了一个逐步实践的方法,以利用 Python 和 Spark 等开源工具和框架的强大功能来大规模处理和处理半结构化的 NASA 日志数据。一旦我们准备好一个干净的数据集,我们终于可以开始使用它来获得有关 NASA 服务器的有用信息。点击进入本系列的第二篇文章,获取动手教程使用 Python 和 Apache Spark 分析和可视化 NASA 日志数据。

zoz100036 * *

本文最初出现在 Medium 的Towards Data Science频道,经许可转载。

zoz100037 * *

Logo

学AI,认准AI Studio!GPU算力,限时免费领,邀请好友解锁更多惊喜福利 >>>

更多推荐