ELK日志分析系统

前言

随着业务量的增长,每天业务服务器将会产生上亿条的日志,单个日志文件达几个GB,这时我们发现用Linux自带工具,cat grep awk 分析越来越力不从心了,而且除了服务器日志,还有程序报错日志,分布在不同的服务器,查阅繁琐。

一、ELK简介

ELK是三个软件的统称,即Elasticsearch、Logstash和Kibana三个开源软件的缩写。这三款软件都是开源软件,通常配合使用,并且都先后归于Elastic.co企业名下,故被简称为ELK协议栈。ELK主要用于部署在企业架构中,收集多台设备上多个服务的日志信息,并将其统一整合后提供给用户。它可以从任何来源、任何格式进行日志搜索、分析与可视化展示。

ELK官网网址如下:https://www.elastic.co/cn/。

(1)ELK日志分析系统组成

在ELK架构中,Elasticsearch、Logstash和Kibana三款软件作用如下:

  • elasticsearch (es) :通过搭建群集;存储日志数据,索引日志数据
  • logstash :收集日志,收集到了后给es存储
  • kibana :视图形式展现日志信息,更加人性化

① Elasticsearch(es)

Elasticsearch是一个高度可扩展的全文搜索和分析引擎,基于Apache Lucence(事实上,Lucence也是百度所采用的搜索引擎)构建,能够对大容量的数据进行接近实时的存储、搜索和分析操作。

②Logstash

Logstash是一个数据收集引擎,它可以动态的从各种数据源搜集数据,并对数据进行过滤、分析和统一格式等操作,并将输出结果存储到指定位置上。Logstash支持普通的日志文件和自定义Json格式的日志解析。

③Kibana

Kibana是一个数据分析和可视化平台,通常与Elasticsearch配合使用,用于对其中的数据进行搜索、分析,并且以统计图标的形式展示。

(2)日志处理步骤

  1. 将日志进行集中化管理
  2. 将日志格式化(Logstash)并输出到Elasticsearch
  3. 对格式化后的数据进行索引和存储(Elasticsearch)
  4. 前端数据的展示(Kibana)

二、Elasticsearch

(1)Elasticsearch概述

提供了一个分布式多用户能力的全文搜索引擎

(2)Elasticsearch核心概念

①接近实时(NRT)

elasticsearch是一个接近实时的搜索平台,这意味着,从索引一个文档直到这个文档能够被搜索到有一个轻微的延迟(通常是1秒)

②cluster集群,ES是一个分布式的系统

  1. 一个集群就是由一个或多个节点组织在一起,它们共同持有你整个的数据,并一起提供索引和搜索功能。其中一个节点为主节点,这个主节点是可以通过选举产生的,并提供跨节点的联合索引和搜索的功能。集群有一个唯一性标示的名字,默认是elasticsearch,集群名字很重要,每个节点是基于集群名字加入到其集群中的。因此,确保在不同环境中使用不同的集群名字。
  2. —个集群可以只有一个节点。强烈建议在配置elasticsearch时,配置成集群模式。es具有集群机制,节点通过集群名称加入到集群中,同时在集群中的节点会有一个自己的唯一身份标识(自己的名称)

③Node节点,就是集群中的一台服务器

  1. 节点就是一台单一的服务器,是集群的一部分,存储数据并参与集群的索引和搜索功能。像集群一样,节点也是通过名字来标识,默认是在节点启动时随机分配的字符名。当然,你可以自己定义。该名字也很重要,在集群中用于识别服务器对应的节点。
  2. 节点可以通过指定集群名字来加入到集群中。默认情况,每个节点被设置成加入到elasticsearch集群。如果启动了多个节点,假设能自动发现对方,他们将会自动组建一个名为elasticsearch的集群。

④index索引

一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的索引、一个产品目录的索引、还有一个订单数据的索引。一个索引用一个名字来标识(必须全部是小写字母组合),并且当我们要对相应的索引中的文档进行索引、收缩、更新和删除的时候,都要用到这个名字。在一个集群中,可以定义多个索引。(索引相对于关系型数据库的库)

类型相对于关系型数据库的表 ——》索引(库)-》类型(表)-》文档(记录)

⑤类型(type)

类型(type)在一个索引中,你可以定义一种或多种类型。一个类型是你的索引的一个逻辑上的分类分区,其寓意完全由你来定义。通常,会为具有一组共同字段的文档定义一个类型。比如:我们假设运营一个博客平台并且将所有的数据存储到一个索引中,在这个索引中,你可以为用户数据定义一个类型,为博客数据定义一个类型,也可以为评论数据定义另一个类型。(类型相对于关系型数据库的表)

⑥文档(document)

文档就是最终的数据了,可以认为一个文档就是一条记录。是ES里面最小的数据单元,就好比表里面的一条数据

⑦ 分片和副本(shards & replicas)

在实际情况下,索引存储的数据可能超过单个节点的硬件限制。如一个10亿文档需1TB空间可能不适合存储在单个节点的磁盘上或者从单个节点搜索请求太慢了。为了解决这个问题,elasticsearch提供将索引分成多个分片的功能。当在创建索引时,可以定义想要分片的数量。每一个分片就是一个全功能的独立的索引,可以位于集群中任何节点上。

三、Logstash

(1)Logstash简介

  1. —款强大的数据处理工具
  2. 可实现数据传输、格式处理、格式化输出
  3. 数据输入(从业务输入)、数据加工(如过滤、改写等)以及数据输出(输出到Elasticsearch群集)

(2)Logstash的主要组件

  1. shipper:日志收集者,负责监控本地日志文件的变化,及时把日志文件的最新内容收集起来。通常,远程代理端(agent)只需要运行这个组件即可
  2. indexer:日志存储者,负责接收日志并写入到本地文件
  3. broker:日志hub,负责连接多个shipper和多个indexer
  4. search and storage:允许对事件进行搜索和存储
  5. web interface:基于Web的展示界面

以上组件在Logstash架构中可以独立部署,因此提供了很好的集群扩展性

四、Kibana

(1)Kibana简介

  1. 一个针对Elasticsearch的开源分析及可视化平台;
  2. 搜索、查看存储在Elasticsearch索引中的数据;
  3. 通过各种图标进行高级数据分析及展示;
  4. 让海量数据更容易理解;
  5. 操作简单,基于浏览器地用户界面就可以快速创建仪表板(dashboard)实时显示Elasticsearch查询动态;
  6. 设置安装Kibana非常简单,无需编写代码,几分钟内就可以完成Kibana安装并启动Elasticsearch监测。

(2)Kibana主要功能

(1)Elasticsearch无缝之集成。Kibana架构为Elasticsearch定制,可以将任何结构化和非结构化数据加入Elasticsearch索引。Kibana还充分利用了Elasticsearch强大的搜索和分析功能。
(2)整合数据:Kibana能够更好地处理海量数据,并据此创建柱形图、折线图、散点图、直方图、饼图和地图。
(3)复杂数据分析:Kibana提升了Elasticsearch分析能力,能够更加智能地分析数据,执行数学转换并且根据要求对数据切割分块。
(4)让更多团队成员受益:强大的数据库可视化接口让各业务岗位都能够从数据集合受益。
(5)接口灵活,分享更容易:使用Kibana可以更加方便地创建、保存、分享数据,并将可视化数据快速交流。
(6)配置简单:Kibana的配置和启用非常简单,用户体验非常友好。Kibana自带Web服务器,可以快速启动运行。
(7)可视化多数据源:Kibana可以非常方便地把来自Logstash、ES-Hadoop、Beats或第三方技术的数据整合到Elasticsearch,支持的第三方技术包括Apache Flume、Fluentd等。
(8)简单数据导出:Kibana可以方便地导出感兴趣的数据,与其它数据集合并融合后快速建模分析,发现新结果。

五、ELK架构

在这里插入图片描述
如上图所示,Logstash安装在各个设备上,用于收集日志信息,收集到的日志信息统一汇总到Elasticsearch上,然后由Kibana负责web端的展示。其中,如果终端设备过多,会导致Elasticsearch过载的现象,此时,我们可以采用一台Redis设备作为消息队列,以暂时缓存数据,避免Elasticsearch压力突发。

六、ELK优点

  1. 处理方式灵活。 Elasticsearch是全文索引,具有强大的搜索能力。
  2. 配置相对简单。 Kibana的配置非常简单,Elasticsearch则全部使用Json接口,配置也不复杂,Logstash的配置使用模块的方式,配置也相对简单。
  3. 检索性能高。 ELK架构通常可以达到百亿级数据的查询秒级响应。
  4. 集群线性扩展。 Elasticsearch本身没有单点的概念,自动默认集群模式,Elasticsearch和Logstash都可以
  5. 灵活扩展。
  6. 页面美观。 Kibana的前端设计美观,且操作简单。

七、Elasticsearch集群部署

(1)环境准备

systemctl stop firewalld.service
setenforce 0
cd /opt/
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.4-x86_64.rpm

在这里插入图片描述
在这里插入图片描述

(2)安装部署Elsaticsearch软件

yum install elasticsearch-7.17.4-x86_64.rpm -y

在这里插入图片描述

(3)配置Elasticsearch配置文件<各节点配置差不多,注意节点名>

vim /etc/elasticsearch/elasticsearch.yml

--17--取消注释,指定集群名字
cluster.name: youzi
--23--取消注释,指定节点名字:Node1节点为node1,Node2节点为node2
node.name: node1
--33--取消注释,指定数据存放路径
path.data: /data/elk_data
--37--取消注释,指定日志存放路径
path.logs: /var/log/elasticsearch/
--43--取消注释,改为在启动的时候不锁定内存
bootstrap.memory_lock: false
--55--取消注释,设置监听地址,0.0.0.0代表所有地址
network.host: 0.0.0.0
--59--取消注释,ES 服务的默认监听端口为9200
http.port: 9200
--68--取消注释,集群发现通过单播实现,指定要发现的节点 node1、node2
discovery.zen.ping.unicast.hosts: ["IP地址或主机名", "IP地址或主机名","IP地址或主机名"]
--74--取消注释,节点可选为主节点的节点名
cluster.initial_master_nodes: ["node1","node2"]

在这里插入图片描述

(4)查看节点信息和集群信息

在这里插入图片描述
在这里插入图片描述

八、安装Elasticsearch-head插件

Elasticsearch 在 5.0 版本后,Elasticsearch-head 插件需要作为独立服务进行安装,需要使用npm工具(NodeJS的包管理工具)安装。
安装 Elasticsearch-head 需要提前安装好依赖软件 node 和 phantomjs。

node:是一个基于 Chrome V8 引擎的 JavaScript 运行环境。
phantomjs:是一个基于 webkit 的JavaScriptAPI,可以理解为一个隐形的浏览器,任何基于 webkit 浏览器做的事情,它都可以做到。

(1)前置包下载

地址: https://nodejs.org/en/download/

在这里插入图片描述
在这里插入图片描述

(2)解包并安装

cd /opt/
tar -xf node-v16.16.0-linux-x64.tar.xz
mv node-v16.16.0-linux-x64 /usr/local/node

vim /etc/profile

export NODE_HOME=/usr/local/node
export PATH=$NODE_HOME/bin:$PATH

source /etc/profile
node -v
npm -v

在这里插入图片描述

(3)下载head插件

插件下载地址:https://github.com/mobz/elasticsearch-head

在这里插入图片描述

(4)解包

unzip elasticsearch-head-master.zip
mv elasticsearch-head-master /usr/local/elasticsearch-head

在这里插入图片描述

(5)安装启动服务并配置es

cd /usr/local/elasticsearch-head/
npm install
npm run start

vim /etc/elasticsearch/elasticsearch.yml

http.cors.enabled: true
http.cors.allow-origin: "*"

systemctl restart elasticsearch

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
验证结果
在这里插入图片描述

九、Logstash部署

Logstash 一般部署在需要监控其日志的服务器。在本案例中,Logstash 部署在 Apache 服务器上,用于收集 Apache 服务器的日志信息并发送到 Elasticsearch。

(1)下载安装包

wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.4-x86_64.rpm

在这里插入图片描述

(2)安装logstash

yum install logstash-7.17.4-x86_64.rpm -y
systemctl start logstash.service
systemctl enable logstash.service
ln -s /usr/share/logstash/bin/logstash /usr/local/bin/

在这里插入图片描述
在这里插入图片描述

(3)测试Logstash

①Logstash命令常用选项

-f:通过这个选项可以指定 Logstash 的配置文件,根据配置文件配置 Logstash 的输入和输出流。
-e:从命令行中获取,输入、输出后面跟着字符串,该字符串可以被当作 Logstash 的配置(如果是空,则默认使用 stdin 作为输入,stdout 作为输出)。
-t:测试配置文件是否正确,然后退出。

②定义输入输出流

输入采用标准输入,输出采用标准输出(类似管道)
指定数据输入端口,默认为9600~9700

logstash -e ‘input { stdin{} } output { stdout{} }’

十、kibana部署

(1)安装kibana

wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.4-x86_64.rpm
yum install kibana-7.17.4-x86_64.rpm -y

在这里插入图片描述

(2)设置kibana的主配置文件

vim /etc/kibana/kibana.yml

--2--取消注释,Kiabana 服务的默认监听端口为5601
server.port: 5601
--7--取消注释,设置 Kiabana 的监听地址,0.0.0.0代表所有地址
server.host: "0.0.0.0"
--32--取消注释,设置和 Elasticsearch 建立连接的地址和端口
elasticsearch.url: "http://192.168.80.10:9200" 
--36--取消注释,设置在 elasticsearch 中添加.kibana索引
kibana.index: ".kibana"

在这里插入图片描述

(3)启动kibana服务

systemctl start kibana.service
systemctl enable kibana.service
netstat -natp | grep 5601

在这里插入图片描述

(4)验证一下

在这里插入图片描述

十一、Filebeat原理

(1)Filebeat简介

Filebeat由两个主要组成部分组成:prospector(探勘者)和 harvesters(矿车)。这些组件一起工作来读取文件并将事件数据发送到指定的output。

  • prospector: 负责找到所有需要进行读取的数据源
  • harvesters:负责读取单个文件的内容,并将内容发送到output中,负责文件的打开和关闭。

(2)Filebeat工作方式

启动Filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志,Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容,并将新日志数据发送到libbeat,libbeat将聚集事件,并将聚集的数据发送到为Filebeat配置的输出。

(3)Filebeat工作原理

Filebeat可以保持每个文件的状态,并且频繁地把文件状态从注册表里更新到磁盘。这里所说的文件状态是用来记录上一次Harvster读取文件时读取到的位置,以保证能把全部的日志数据都读取出来,然后发送给output。如果在某一时刻,作为output的ElasticSearch或者Logstash变成了不可用,Filebeat将会把最后的文件读取位置保存下来,直到output重新可用的时候,快速地恢复文件数据的读取。在Filebaet运行过程中,每个Prospector的状态信息都会保存在内存里。如果Filebeat出行了重启,完成重启之后,会从注册表文件里恢复重启之前的状态信息,让FIlebeat继续从之前已知的位置开始进行数据读取。

(4)Filebeat用途

  • 适用于集群环境下,服务多,且部署在不同机器

①为什么要用filebeat来收集日志?为什么不直接用logstash收集日志?

因为logstash是jvm跑的,资源消耗比较大,启动一个logstash就需要消耗500M左右的内存(这就是为什么logstash启动特别慢的原因),而filebeat只需要10来M内存资源。常用的ELK日志采集方案中,大部分的做法就是将所有节点的日志内容通过filebeat发送到logstash,logstash根据配置文件进行过滤。然后将过滤之后的文件输送到elasticsearch中,通过kibana去展示。

②filebeat结合logstash带来好处

  1. 通过Logstash,具有基于磁盘的自适应缓冲系统,该系统将吸收传入的吞吐量,从而减轻Elasticsearch持续写入数据的压力
  2. 从其他数据源(例如数据库,s3对象存储或消息传递队列)中提取
  3. 将数据发送到多个目的地,例如S3,HDFS(Hadoop分布式文件系统)或写入文件
  4. 使用条件数据流逻辑组成更复杂的处理管道

③Filebeat和Logstash的区别

LogstashFilebeat
内存
CPU
插件
功能从多种输入端采集并实时解析和转换数据并输出到多种输出端传输
轻重相对较重轻量级二进制文件
过滤能力强大的过滤能力有过滤能力但是弱
进程一台服务器只允许一个logstash进程,挂掉之后需要手动拉起
原理Logstash使用管道的方式进行日志的搜集和输出,分为输入input处理filter(不是必须的)输出output,每个阶段都有不同的替代方式开启进程后会启动一个或多个探测器(prospectors)去检测指定的日志目录或文件,对于探测器找出的每一个日志文件,filebeat启动收割进程(harvester) ,每一个收割进程读取一个日志文件的新内容,并发送这些新的日志数据到处理程序(spooler),处理程序会集合这些事件,最后filebeat会发送集合的数据到你指定的地点
集群单节点单节点
输出到多个接收方支持6.0之前支持
二次开发或者扩展开发

十二、Kafka原理

(1)kafka简介

Kafka是一种消息队列,主要用来处理大量数据状态下的消息队列,一般用来做日志的处理。既然是消息队列,那么Kafka也就拥有消息队列的相应的特性了。
可以在系统中起到“削峰填谷”的作用,也可以用于异构、分布式系统中海量数据的异步化处理。

①为什么需要消息队列

主要原因是由于在高并发环境下,同步请求来不及处理,请求往往会发生阻塞。比如大量的请求并发访问数据库,导致行锁表锁,最后请求线程会堆积过多,从而触发 too many connection 错误,引发雪崩效应。
我们使用消息队列,通过异步处理请求,从而缓解系统的压力。消息队列常应用于异步处理,流量削峰,应用解耦,消息通讯等场景。

当前比较常见的 MQ 中间件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。

②消息队列的好处

解耦合
耦合的状态表示当你实现某个功能的时候,是直接接入当前接口,而利用消息队列,可以将相应的消息发送到消息队列,这样的话,如果接口出了问题,将不会影响到当前的功能。

异步处理
异步处理替代了之前的同步处理,异步处理不需要让流程走完就返回结果,可以将消息发送到消息队列中,然后返回结果,剩下让其他业务处理接口从消息队列中拉取消费处理即可。

流量削峰
高流量的时候,使用消息队列作为中间件可以将流量的高峰保存在消息队列中,从而防止了系统的高请求,减轻服务器的请求处理压力。

③kafka的特性

高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
可扩展性:kafka集群支持热扩展
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
高并发:支持数千个客户端同时读写

④kafka作为存储系统

  1. 任何允许发布与消费它们分离的消息的消息队列实际上充当了正在进行的消息的存储系统。
  2. Kafka的不同之处在于它是一个非常好的存储系统。
  3. 写入Kafka的数据将写入磁盘并进行复制以实现容错。Kafka允许生产者等待确认,以便在完全复制之前写入不被认为是完整的,并且即使写入的服务器失败也保证写入仍然存在。
  4. 磁盘结构Kafka很好地使用了规模 - 无论服务器上有50 KB还是50 TB的持久数据,Kafka都会执行相同的操作。

(2)kafka消费模式

Kafka的消费模式主要有两种:

  1. 一种是一对一的消费,也即点对点的通信,即一个发送一个接收。
  2. 第二种为一对多的消费,即一个消息发送到消息队列,消费者根据消息队列的订阅拉取消息消费。

①一对一

在这里插入图片描述
消息生产者发布消息到Queue队列中,通知消费者从队列中拉取消息进行消费。消息被消费之后则删除,Queue支持多个消费者,但对于一条消息而言,只有一个消费者可以消费,即一条消息只能被一个消费者消费。

②一对多

在这里插入图片描述
这种模式也称为发布/订阅模式,即利用Topic存储消息,消息生产者将消息发布到Topic中,同时有多个消费者订阅此topic,消费者可以从中消费消息,注意发布到Topic中的消息会被多个消费者消费,消费者消费数据之后,数据不会被清除,Kafka会默认保留一段时间,然后再删除。

(3)kafka的基础架构

Kafka像其他Mq一样,也有自己的基础架构,主要存在生产者Producer、Kafka集群Broker、消费者Consumer、注册消息Zookeeper

①kafka架构

在这里插入图片描述

  1. Producer:Producer即生产者,消息的产生者,是消息的入口。
  2. Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号.
  3. Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
  4. Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
  5. Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
  6. Message:每一条发送的消息主体。
  7. Consumer:消费者,即消息的消费方,是消息的出口。
  8. Consumer Group:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
  9. Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
  10. Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
  11. Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader。

②工作流程

producer就是生产者,是数据的入口。Producer在写入数据的时候永远的找leader,不会直接将数据写入follower

在这里插入图片描述

  1. 先从集群获取分区的leader
  2. Producter将消息发送给leader
  3. Leader将消息写入本地文件
  4. Followers从leader同步消息
  5. Follower将消息写入本地后向leader发送ACK确认消息
  6. Leader收到所有副本的ACK后,向producter发送ACK

注:消息写入leader后,follower是主动的去leader进行同步的

③分区的原因

  1. 便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
  2. 可以提高并发,因为可以以Partition为单位读写了。

④分区目的

producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的。
在这里插入图片描述
数据会写入到不同的分区,分区的目的是

  1. 方便扩展:因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
  2. 提高并发:以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

(4)kafka原则

类似于负载均衡,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:

  1. partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
  2. 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
  3. 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。

  1. 0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
  2. 1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
  3. all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

十三、Zookeeper简介

ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务:分布式锁服务。分布式应用可以基于它实现更高级的服务,实现诸如同步服务、配置维护和集群管理或者命名的服务。

Zookeeper服务自身组成一个集群,2n+1个(奇数)服务允许n个失效,集群内一半以上机器可用,Zookeeper就可用。

假设 3台机器组成的集群,可以有允许一台失效,如果有2台失效,这个集群就不可用,1<1.5,一般的搭建zookeeper集群时,以奇数台机器来搭建。目的:是为了提高容错能允许多损失一台。

(1)zookeeper工作机制

Zookeeper从设计模式角度来理解:是一个基于观察者模式设计的分布式服务管理框架它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。也就是说 Zookeeper =文件系统+通知机制。
在这里插入图片描述

(2)zookeeper特点

在这里插入图片描述

(1)Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
(2)Zookeepe集群中只要有半数以上节点存活,Zookeeper集群就能正常服务。所以zookeeper适合安装奇数台服务器。
(3)全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个Server,数据都是一致的。
(4)更新请求顺序执行,来自同一个client的更新请求按其发送顺序依次执行,即先进先出。
(5)数据更新原子性,一次数据更新要么成功,要么失败。
(6)实时性,在一定时间范围内,client能读到最新数据。

(3)Zookeeper数据结构

ZooKeeper数据模型的结构与Linux文件系统很类似,整体上可以看作是一棵树,每个节点称做一个ZNode。每一个ZNode默认能够存储1MB的数据,每个ZNode都可以通过其路径唯一标识。

在这里插入图片描述

(4)zookeeper应用场景

提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。

①统一命名服务

  • 在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。例如:IP不容易记住,而域名容易记住。

②统一配置管理

  1. 分布式环境下,配置文件同步非常常见。一般要求一个集群中,所有节点的配置信息是一致的,比如Kafka集群。对配置文件修改后,希望能够快速同步到各个节点上。
  2. 配置管理可交由ZooKeeper实现。可将配置信息写入ZooKeeper上的一个Znode。各个客户端服务器监听这个Znode。一旦Znode中的数据被修改,ZooKeeper将通知各个客户端服务器。

③统一集群管理

  1. 分布式环境中,实时掌握每个节点的状态是必要的。可根据节点实时状态做出一些调整。
  2. ZooKeeper可以实现实时监控节点状态变化。可将节点信息写入ZooKeeper上的一个2Node。监听这个DMode可获取它的实时状态变化。

④服务器动态上下线

  • 客户端能实时洞察到服务器上下线的变化。

⑤软负载均衡

  • 在Zookeeper中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。

(5)zookeeper选举机制

①第一次启动选举机制

在这里插入图片描述

  1. 服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING;
  2. 服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的myid比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
  3. 服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader。服务器1,2更改状态为FOLLOWING,服务器3更改状态为LEADING;
  4. 服务器4启动,发起一次选举。此时服务器1,2,3已经不是LooKING状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLOWING;
  5. 服务器5启动,同4一样当小弟。

②非第一次启动选举机制

在这里插入图片描述
当ZooKeeper集群中的一台服务器出现以下两种情况之一时,就会开始进入Leader选举:

  1. 服务器初始化启动。
  2. 服务器运行期间无法和Leader保持连接。

而当一台机器进入Leader选举流程时,当前集群也可能会处于以下两种状态:

  1. **集群中本来就己经存在一个Leader。**对于已经存在Leader的情况,机器试图去选举Leader时,会被告知当前服务器的Leader信息,对于该机器来说,仅仅需要和Leader机器建立连接,并进行状态同步即可。
  2. **集群中确实不存在Leader。**假设ZooKeeper由5台服务器组成,SID分别为1、2、3、4、5,ZXID分别为8、8、8、7、,并且此时sID为3的服务器是。一时刻,3和5服务器出现故障,因此开始进行Leader选举。

选举Leader规则:

  1. EPOCH大的直接胜出
  2. EPOCH相同,事务id大的胜出
  3. 事务id相同,服务器id大的胜出

十四、安装zookeeper

(1)下载安装zookeeper

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz --no-check-certificate
tar -xf apache-zookeeper-3.7.1-bin.tar.gz
mv apache-zookeeper-3.7.1-bin /usr/local/zookeeper
cd /usr/local/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg

在这里插入图片描述
在这里插入图片描述

(2)修改配置文件

vim zoo.cfg

在这里插入图片描述
在这里插入图片描述

(3)分别给三台机器配置节点号

mkdir data logs
echo 1 > /usr/local/zookeeper/data/myid 

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

(4)启动zookeeper

cd /usr/local/zookeeper/bin
./zkServer.sh start

在这里插入图片描述

(5)开启之后,查看三个节点zookeeper状态

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

十五、安装kafka

(1)安装kafka(3台都安装)

tar zxf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka

在这里插入图片描述

(2)修改配置文件

cd /usr/local/kafka/config/
vim server.properties

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

(3)将相关命令加入到系统环境中

vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

在这里插入图片描述
在这里插入图片描述

(4)启动kafka

cd /usr/local/kafka/config/
kafka-server-start.sh -daemon server.properties
netstat -antp | grep 9092

在这里插入图片描述

(5)kafka命令行操作

创建topic

kafka-topics.sh --create --zookeeper 192.168.159.230:2181,192.168.159.231:2181,192.168.159.232:2181 --replication-factor 2 --partitions 3 --topic test

–zookeeper:定义 zookeeper 集群服务器地址,如果有多个 IP 地址使用逗号分割,一般使用一个 IP 即可
–replication-factor:定义分区副本数,1 代表单副本,建议为 2
–partitions:定义分区数
–topic:定义 topic 名称

查看当前服务器中的所有 topic

kafka-topics.sh --list --zookeeper 192.168.159.230:2181,192.168.159.231:2181,192.168.159.232:2181

查看某个 topic 的详情

kafka-topics.sh  --describe --zookeeper 192.168.159.230:2181,192.168.159.231:2181,192.168.159.232:2181

发布消息

kafka-console-producer.sh --broker-list 192.168.159.230:9092,192.168.159.231:9092,192.168.159.232:9092  --topic test

消费消息

kafka-console-consumer.sh --bootstrap-server 192.168.159.230:9092,192.168.159.231:9092,192.168.159.232:9092 --topic test --from-beginning

修改分区数

kafka-topics.sh --zookeeper 192.168.159.230:2181,192.168.159.231:2181,192.168.159.232:2181 --alter --topic test --partitions 6

删除 topic

kafka-topics.sh --delete --zookeeper 192.168.159.230:2181,192.168.159.231:2181,192.168.159.232:2181 --topic test

(6)创建topic

[root@localhost bin]# pwd
/usr/local/kafka/bin
[root@localhost bin]# kafka-topics.sh --create --zookeeper \
> 192.168.159.230:2181,192.168.159.231:2181,192.168.159.232:2181 \
> --partitions 3 \
> --replication-factor 2 \
> --topic test
Created topic test.
[root@localhost bin]# kafka-topics.sh 
--describe --zookeeper 192.168.159.230:2181

在这里插入图片描述

(7)测试topic

发布消息

kafka-console-producer.sh --broker-list 192.168.159.230:9092,192.168.159.231:9092,192.168.159.232:9092 --topic test

在这里插入图片描述

 kafka-console-producer.sh --broker-list 192.168.159.230:9092 --topic test

在这里插入图片描述

消费消息

kafka-console-consumer.sh --bootstrap-server 192.168.159.230.9092 –topic test --from-beginning

在这里插入图片描述

十六、配置数据采集层filebeat

(1)定制日志格式

vim /usr/local/nginx/nginx/nginx.conf

log_format  json '{"@timestamp":"$time_iso8601",'
                       '"@version":"1",'
                       '"client":"$remote_addr",'
                       '"url":"$uri",'
                       '"status":"$status",'
                       '"domain":"$host",'
                       '"host":"$server_addr",'
                       '"size":$body_bytes_sent,'
                       '"responsetime":$request_time,'
                       '"referer": "$http_referer",'
                       '"ua": "$http_user_agent"'
           '}';

access_log  /var/log/nginx/access.log  json;
## 没有日志目录和文件需手动创建

在这里插入图片描述

(2)下载解压安装包

 wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.4-linux-x86_64.tar.gz

在这里插入图片描述
在这里插入图片描述

(3)修改配置文件filebeat.yml

vim filebeat.yml

在这里插入图片描述
在这里插入图片描述

(4)启动filebeat

./filebeat -c filebeat.yml &

在这里插入图片描述
在这里插入图片描述

十七、所有组件部署完成之后,开始配置部署

(1)在kafka上创建一个话题nginx-es

kafka-topics.sh --create --zookeeper 192.168.159.230:2181,192.168.159.231:2181,192.168.159.232:2181 --replication-factor 1 --partitions 1 –topic nginx-es

在这里插入图片描述
在这里插入图片描述

(2)修改logstash的配置文件

input {
kafka {
    topics => "nginx-es"
    #codec => "json"
    decorate_events => true
    bootstrap_servers => "192.168.159.230:9092,192.168.159.231:9092,192.168.159.232:9092"
}
}
output {
        elasticsearch {
        hosts => ["192.168.159.240:9200"]
        index => 'nginx-%{+YYYY-MM-dd}'
}
}

在这里插入图片描述

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐