第4章-ES与消息中间件RabbitMQ

学习目标:

  • 了解Elasticsearch的特点及体系结构
  • 完成Elasticsearch安装,能够调用RestAPI完成基本增删改查操作
  • 完成Head插件安装,熟悉Head插件的基本使用方法
  • 完成IK分词器的安装,能够使用IK分词器进行分词
  • 使用SpringDataElasticsearch完成搜索微服务的开发(重点)
  • 使用logstash完成mysql与Elasticsearch的同步工作
  • 完成Elasticsearch在docker下的安装
  • 能够说出消息队列的应用场景以及RabbitMQ的主要概念
  • 完成RabbitMQ安装以及RabbitMQ三种模式的入门案例

1 ElasticSearch简介

1.1 什么是ElasticSearch

​ Elasticsearch是一个实时的分布式搜索和分析引擎。它可以帮助你用前所未有的速度去处理大规模数据。ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。

1.2 ElasticSearch特点

(1)可以作为一个大型分布式集群(数百台服务器)技术,处理PB级数据,服务大公司;也可以运行在单机上

(2)将全文检索、数据分析以及分布式技术,合并在了一起,才形成了独一无二的ES;

(3)开箱即用的,部署简单

(4)全文检索,同义词处理,相关度排名,复杂数据分析,海量数据的近实时处理

1.3 ElasticSearch体系结构

下表是Elasticsearch与MySQL数据库逻辑结构概念的对比

Elasticsearch关系型数据库Mysql
索引(index)数据库(databases)
类型(type)表(table)
文档(document)行(row)

2 走进ElasticSearch

2.1 ElasticSearch部署与启动

下载ElasticSearch 5.6.8版本

https://www.elastic.co/downloads/past-releases/elasticsearch-5-6-8

资源\微服务资源\配套软件中也提供了安装包

无需安装,解压安装包后即可使用

在命令提示符下,进入ElasticSearch安装目录下的bin目录,执行命令

elasticsearch

即可启动。

我们打开浏览器,在地址栏输入http://127.0.0.1:9200/ 即可看到输出结果

{
  "name" : "uV2glMR",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "RdV7UTQZT1-Jnka9dDPsFg",
  "version" : {
    "number" : "5.6.8",
    "build_hash" : "688ecce",
    "build_date" : "2018-02-16T16:46:30.010Z",
    "build_snapshot" : false,
    "lucene_version" : "6.6.1"
  },
  "tagline" : "You Know, for Search"
}

2.2 Postman调用RestAPI

2.2.1 新建索引

例如我们要创建一个叫articleindex的索引 ,就以put方式提交

http://127.0.0.1:9200/articleindex/

2.2.2 新建文档

新建文档:

以post方式提交 http://127.0.0.1:9200/articleindex/article

body:

{
	"title":"SpringBoot2.0",
	"content":"发布啦"
}

返回结果如下:

{
    "_index": "articleindex",
    "_type": "article",
    "_id": "AWPKsdh0FdLZnId5S_F9",
    "_version": 1,
    "result": "created",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "created": true
}

_id是由系统自动生成的。 为了方便之后的演示,我们再次录入几条测试数据。

2.2.3 查询全部文档

查询某索引某类型的全部数据,以get方式请求

http://127.0.0.1:9200/articleindex/article/_search 返回结果如下:

{
    "took": 5,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 2,
        "max_score": 1,
        "hits": [
            {
                "_index": "articleindex",
                "_type": "article",
                "_id": "AWPKrI4pFdLZnId5S_F7",
                "_score": 1,
                "_source": {
                    "title": "SpringBoot2.0",
                    "content": "发布啦"
                }
            },
            {
                "_index": "articleindex",
                "_type": "article",
                "_id": "AWPKsdh0FdLZnId5S_F9",
                "_score": 1,
                "_source": {
                    "title": "elasticsearch入门",
                    "content": "零基础入门"
                }
            }
        ]
    }
}

2.2.4 修改文档

以put形式提交以下地址:

http://127.0.0.1:9200/articleindex/article/AWPKrI4pFdLZnId5S_F7

body:

{
	"title":"SpringBoot2.0正式版",
	"content":"发布了吗"
}

返回结果:

{
    "_index": "articleindex",
    "_type": "article",
    "_id": "AWPKsdh0FdLZnId5S_F9",
    "_version": 2,
    "result": "updated",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "created": false
}

如果我们在地址中的ID不存在,则会创建新文档

以put形式提交以下地址:

http://127.0.0.1:9200/articleindex/article/1

body:

{
	"title":"十次方课程好给力",
	"content":"知识点很多"
}

返回信息:

{
    "_index": "articleindex",
    "_type": "article",
    "_id": "1",
    "_version": 1,
    "result": "created",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "created": true
}

再次查询,看是否有新增的这条文档

2.2.5 按ID查询文档

GET方式请求

http://127.0.0.1:9200/articleindex/article/1

2.2.6 基本匹配查询

根据某列进行查询 get方式提交下列地址:

http://127.0.0.1:9200/articleindex/article/_search?q=title:十次方课程好给力

以上为按标题查询,返回结果如下:

{
    "took": 10,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 1,
        "max_score": 2.0649285,
        "hits": [
            {
                "_index": "articleindex",
                "_type": "article",
                "_id": "1",
                "_score": 2.0649285,
                "_source": {
                    "title": "十次方课程好给力",
                    "content": "知识点很多"
                }
            }
        ]
    }
}

2.2.7 模糊查询

我们可以用*代表任意字符:

http://127.0.0.1:9200/articleindex/article/_search?q=title:*s*

2.2.8 删除文档

根据ID删除文档,删除ID为1的文档 DELETE方式提交

http://127.0.0.1:9200/articleindex/article/1

返回结果如下:

{
    "found": true,
    "_index": "articleindex",
    "_type": "article",
    "_id": "1",
    "_version": 2,
    "result": "deleted",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    }
}

再次查看全部是否还存在此记录

3 Head插件的安装与使用

3.1 Head插件安装

如果都是通过rest请求的方式使用Elasticsearch,未免太过麻烦,而且也不够人性化。我们一般都会使用图形化界面来实现Elasticsearch的日常管理,最常用的就是Head插件

步骤1:

下载head插件:https://github.com/mobz/elasticsearch-head

配套资料中已提供。 elasticsearch-head-master.zip

步骤2:

解压到任意目录,但是要和elasticsearch的安装目录区别开。

步骤3:

安装node js ,安装cnpm

npm install -g cnpm --registry=https://registry.npm.taobao.org

步骤4:

将grunt安装为全局命令 。Grunt是基于Node.js的项目构建工具。它可以自动运行你所设定的任务

cnpm install -g grunt-cli

步骤5:安装依赖

cnpm install

步骤6:

进入head目录启动head,在命令提示符下输入命令

grunt server

步骤7:

打开浏览器,输入 http://localhost:9100

步骤8:

点击连接按钮没有任何相应,按F12发现有如下错误

No ‘Access-Control-Allow-Origin’ header is present on the requested resource

这个错误是由于elasticsearch默认不允许跨域调用,而elasticsearch-head是属于前端工程,所以报错。

我们这时需要修改elasticsearch的配置,让其允许跨域访问。

修改elasticsearch配置文件:elasticsearch.yml,增加以下两句命令:

http.cors.enabled: true
http.cors.allow-origin: "*"

此步为允许elasticsearch跨域访问 点击连接即可看到相关信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YQCuaVUW-1591408722024)(image/4_1.png)]

3.2 Head插件操作

3.2.1 新建索引

选择“索引”选项卡,点击“新建索引”按钮

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f3dYN2ub-1591408722027)(image\4_2.png)]

输入索引名称点击OK

3.2.2 新建或修改文档

在复合查询中提交地址,输入内容,提交方式为PUT

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kOdQiFxI-1591408722029)(image\4_3.png)]

点击数据浏览 ,点击要查询的索引名称,右侧窗格中显示文档信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UWSlXBnL-1591408722031)(image/4_5.png)]

点击文档信息:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0g0PpTtI-1591408722033)(image/4_6.png)]

我们再次回到刚才的界面

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CnCe3PTI-1591408722034)(image/4_7.png)]

修改数据后重新提交请求 , 此时因为ID已经存在,所以执行的是修改操作。

重新查询此记录,发现版本为2 。也就是说每次修改后版本都会增加1.

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o7hOIX7L-1591408722035)(image/4_8.png)]

3.2.3 搜索文档

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zC5yWsVD-1591408722036)(image\4_9.png)]

3.2.4 删除文档

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FOTowntl-1591408722037)(image\4_10.png)]

4 IK分词器

4.1什么是IK分词器

我们在浏览器地址栏输入http://127.0.0.1:9200/_analyze?analyzer=chinese&pretty=true&text=我是程序员,浏览器显示效果如下

{
  "tokens" : [
    {
      "token" : "我",
      "start_offset" : 0,
      "end_offset" : 1,
      "type" : "<IDEOGRAPHIC>",
      "position" : 0
    },
    {
      "token" : "是",
      "start_offset" : 1,
      "end_offset" : 2,
      "type" : "<IDEOGRAPHIC>",
      "position" : 1
    },
    {
      "token" : "程",
      "start_offset" : 2,
      "end_offset" : 3,
      "type" : "<IDEOGRAPHIC>",
      "position" : 2
    },
    {
      "token" : "序",
      "start_offset" : 3,
      "end_offset" : 4,
      "type" : "<IDEOGRAPHIC>",
      "position" : 3
    },
    {
      "token" : "员",
      "start_offset" : 4,
      "end_offset" : 5,
      "type" : "<IDEOGRAPHIC>",
      "position" : 4
    }
  ]
}

默认的中文分词是将每个字看成一个词,这显然是不符合要求的,所以我们需要安装中文分词器来解决这个问题。

IK分词是一款国人开发的相对简单的中文分词器。虽然开发者自2012年之后就不在维护了,但在工程应用中IK算是比较流行的一款!我们今天就介绍一下IK中文分词器的使用。

4.2 IK分词器安装

下载地址:https://github.com/medcl/elasticsearch-analysis-ik/releases   下载5.6.8版本 课程配套资源也提供了: 资源\配套软件\elasticsearch\elasticsearch-analysis-ik-5.6.8.zip

(1)先将其解压,将解压后的elasticsearch文件夹重命名文件夹为ik

(2)将ik文件夹拷贝到elasticsearch/plugins 目录下。

(3)重新启动,即可加载IK分词器

4.3 IK分词器测试

IK提供了两个分词算法ik_smart 和 ik_max_word

其中 ik_smart 为最少切分,ik_max_word为最细粒度划分

我们分别来试一下

(1)最小切分:在浏览器地址栏输入地址

http://127.0.0.1:9200/_analyze?analyzer=ik_smart&pretty=true&text=我是程序员

输出的结果为:

{
  "tokens" : [
    {
      "token" : "我",
      "start_offset" : 0,
      "end_offset" : 1,
      "type" : "CN_CHAR",
      "position" : 0
    },
    {
      "token" : "是",
      "start_offset" : 1,
      "end_offset" : 2,
      "type" : "CN_CHAR",
      "position" : 1
    },
    {
      "token" : "程序员",
      "start_offset" : 2,
      "end_offset" : 5,
      "type" : "CN_WORD",
      "position" : 2
    }
  ]
}

(2)最细切分:在浏览器地址栏输入地址

http://127.0.0.1:9200/_analyze?analyzer=ik_max_word&pretty=true&text=我是程序员

输出的结果为:

{
  "tokens" : [
    {
      "token" : "我",
      "start_offset" : 0,
      "end_offset" : 1,
      "type" : "CN_CHAR",
      "position" : 0
    },
    {
      "token" : "是",
      "start_offset" : 1,
      "end_offset" : 2,
      "type" : "CN_CHAR",
      "position" : 1
    },
    {
      "token" : "程序员",
      "start_offset" : 2,
      "end_offset" : 5,
      "type" : "CN_WORD",
      "position" : 2
    },
    {
      "token" : "程序",
      "start_offset" : 2,
      "end_offset" : 4,
      "type" : "CN_WORD",
      "position" : 3
    },
    {
      "token" : "员",
      "start_offset" : 4,
      "end_offset" : 5,
      "type" : "CN_CHAR",
      "position" : 4
    }
  ]
}

4.4 自定义词库

我们现在测试"传智播客",浏览器的测试效果如下:

http://127.0.0.1:9200/_analyze?analyzer=ik_smart&pretty=true&text=传智播客

{
  "tokens" : [
    {
      "token" : "传",
      "start_offset" : 0,
      "end_offset" : 1,
      "type" : "CN_CHAR",
      "position" : 0
    },
    {
      "token" : "智",
      "start_offset" : 1,
      "end_offset" : 2,
      "type" : "CN_CHAR",
      "position" : 1
    },
    {
      "token" : "播",
      "start_offset" : 2,
      "end_offset" : 3,
      "type" : "CN_CHAR",
      "position" : 2
    },
    {
      "token" : "客",
      "start_offset" : 3,
      "end_offset" : 4,
      "type" : "CN_CHAR",
      "position" : 3
    }
  ]
}

默认的分词并没有识别“传智播客”是一个词。如果我们想让系统识别“传智播客”是一个词,需要编辑自定义词库。

步骤:

(1)进入elasticsearch/plugins/ik/config目录

(2)新建一个my.dic文件,编辑内容:

传智播客

修改IKAnalyzer.cfg.xml(在ik/config目录下)

<properties>
	<comment>IK Analyzer 扩展配置</comment>
	<!--用户可以在这里配置自己的扩展字典 -->
	<entry key="ext_dict">my.dic</entry>
	 <!--用户可以在这里配置自己的扩展停止词字典-->
	<entry key="ext_stopwords"></entry>
</properties>

重新启动elasticsearch,通过浏览器测试分词效果

{
  "tokens" : [
    {
      "token" : "传智播客",
      "start_offset" : 0,
      "end_offset" : 4,
      "type" : "CN_WORD",
      "position" : 0
    }
  ]
}

5 搜索微服务开发

5.1 需求分析

  1. 添加文章
  2. 实现对文章标题和内容的模糊查询

5.2 代码编写

5.2.1 模块搭建

(1)创建模块tensquare_search ,pom.xml引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>com.tensquare</groupId>
            <artifactId>tensquare_common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

(2)application.yml

server:
  port: 9007
spring:
  application:
    name: tensquare-search #指定服务名
  data:
    elasticsearch:
      cluster-nodes: 127.0.0.1:9300

(3)创建包com.tensquare.search ,包下创建启动类

@SpringBootApplication
public class SearchApplication {

	public static void main(String[] args) {
		SpringApplication.run(SearchApplication.class, args);
	}

	@Bean
	public IdWorker idWorkker(){
		return new IdWorker(1, 1);
	}	
}

5.2.2 添加文章

(1)创建实体类

创建com.tensquare.search.pojo包,包下建立类

/**
 * 文章实体类
 */
@Document(indexName="tensquare",type="article")
public class Article implements Serializable{
    @Id
    private String id;//ID

    @Field(index= true ,analyzer="ik_max_word",searchAnalyzer="ik_max_word")
    private String title;//标题

    @Field(index= true ,analyzer="ik_max_word",searchAnalyzer="ik_max_word")
    private String content;//文章正文
   
    private String state;//审核状态
  
	//getter and setter ......

}

(2)创建数据访问接口

创建com.tensquare.search.dao包,包下建立接口

/**
 * 文章数据访问层接口
 */
public interface ArticleSearchDao extends ElasticsearchRepository<Article,String> {
    
}

(3)创建业务逻辑类

创建com.tensquare.search.service包,包下建立类

@Service
public class ArticleSearchService {

    @Autowired
    private ArticleSearchDao articleSearchDao;

    /**
     * 增加文章
     * @param article
     */
    public void add(Article article){
        articleSearchDao.save(article);
    }

}

(4)创建控制器类

创建com.tensquare.search.controller包,包下建立类

@RestController
@CrossOrigin
@RequestMapping("/article")
public class ArticleSearchController {

    @Autowired
    private ArticleSearchService articleSearchService;

    @RequestMapping(method= RequestMethod.POST)
    public Result save(@RequestBody Article article){
        articleSearchService.add(article);
        return new Result(true, StatusCode.OK, "操作成功");
    }

}

5.2.3 文章搜索

(1)ArticleSearchDao新增方法定义

    /**
     * 检索
     * @param
     * @return
     */
    public Page<Article> findByTitleOrContentLike(String title, String content, Pageable pageable);

(2)ArticleSearchService新增方法

public Page<Article> findByKeywordsLike(String keywords, int page, int size){
   PageRequest pageRequest = PageRequest.of(page-1, size);
   return articleSearchDao.findByTitleOrContentLike(keywords,keywords, pageRequest);  
}

(3)ArticleSearchController方法

@RequestMapping(value="/search/{keywords}/{page}/{size}",method= RequestMethod.GET)
public Result findByKeywordsLike(@PathVariable String keywords, @PathVariable int page, @PathVariable int size){
        Page<Article> articlePage = articleSearchService.findByTitleLike(keywords,page,size);
        return new Result(true, StatusCode.OK, "查询成功",
                new PageResult<Article>(articlePage.getTotalElements(), articlePage.getContent()));
}

6 elasticsearch与MySQL数据同步

6.1 Logstash

6.1.1什么是Logstash

elasticsearch:存数据

Logstash:处理日志以及收集日志

ELK:https://www.cnblogs.com/aresxin/p/8035137.html

Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。

6.1.2 Logstash安装与测试

解压,进入bin目录

logstash -e 'input { stdin { } } output { stdout {} }'

控制台输入字符,随后就有日志输出

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YX2neV3H-1591408722039)(image/4_11.png)]

stdin,表示输入流,指从键盘输入

stdout,表示输出流,指从显示器输出

命令行参数:

-e 执行

–config 或 -f 配置文件,后跟参数类型可以是一个字符串的配置或全路径文件名或全路径路径(如:/etc/logstash.d/,logstash会自动读取/etc/logstash.d/目录下所有*.conf 的文本文件,然后在自己内存里拼接成一个完整的大配置文件再去执行)

6.2 MySQL数据导入Elasticsearch

(1)在logstash-5.6.8安装目录下创建文件夹mysqletc (名称随意)

(2)文件夹下创建mysql.conf (名称随意) ,内容如下:

input {
  jdbc {
	  # mysql jdbc connection string to our backup databse  后面的test对应mysql中的test数据库
	  jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/tensquare_article?characterEncoding=UTF8"
	  # the user we wish to excute our statement as
	  jdbc_user => "root"
	  jdbc_password => "123456"
	  # the path to our downloaded jdbc driver  
	  jdbc_driver_library => "D:/logstash-5.6.8/mysqletc/mysql-connector-java-5.1.46.jar"
	  # the name of the driver class for mysql
	  jdbc_driver_class => "com.mysql.jdbc.Driver"
	  jdbc_paging_enabled => "true"
	  jdbc_page_size => "50000"
	  #以下对应着要执行的sql的绝对路径。
	  statement => "select id,title,content from tb_article"
	  #定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
  }
}

output {
  elasticsearch {
	  #ESIP地址与端口
	  hosts => "localhost:9200" 
	  #ES索引名称(自己定义的)
	  index => "tensquare"
	  #自增ID编号
	  document_id => "%{id}"
	  document_type => "article"
  }
  stdout {
      #以JSON格式输出
      codec => json_lines
  }
}

(3)将mysql驱动包mysql-connector-java-5.1.46.jar拷贝至D:/logstash-5.6.8/mysqletc/ 下 。D:/logstash-5.6.8是你的安装目录

(4)命令行下执行

logstash -f ../mysqletc/mysql.conf

观察控制台输出,每间隔1分钟就执行一次sql查询。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tPCv5CHa-1591408722040)(image/4_12.png)]

再次刷新elasticsearch-head的数据显示,看是否也更新了数据。

7 Elasticsearch Docker环境下安装

7.1 容器的创建与远程连接

(1)下载镜像(此步省略)

docker pull elasticsearch:5.6.8

(2)创建容器

docker run -di --name=tensquare_elasticsearch -p 9200:9200 -p 9300:9300 镜像的id

(3)浏览器输入地址:

http://192.168.66.128:9200/ 即可看到如下信息

{
  "name" : "WmBn0H-",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "2g-VVbm9Rty7J4sksZNJEg",
  "version" : {
    "number" : "5.6.8",
    "build_hash" : "688ecce",
    "build_date" : "2018-02-16T16:46:30.010Z",
    "build_snapshot" : false,
    "lucene_version" : "6.6.1"
  },
  "tagline" : "You Know, for Search"
}

(4)我们修改项目的application.yml

 spring:
  data:
    elasticsearch:
      cluster-nodes: 192.168.66.128:9300

(5)运行程序,发现会报如下错误

NoNodeAvailableException[None of the configured nodes are available: [{#transport#-1}{exvgJLR-RlCNMJy-hzKtnA}{192.168.66.128}{192.168.66.128:9300}]
]
	at org.elasticsearch.client.transport.TransportClientNodesService.ensureNodesAreAvailable(TransportClientNodesService.java:347)
	at org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:245)
	at org.elasticsearch.client.transport.TransportProxyClient.execute(TransportProxyClient.java:59)

这是因为elasticsearch从5版本以后默认不开启远程连接,需要修改配置文件

(6)我们进入容器

docker exec -it tensquare_elasticsearch  /bin/bash

此时,我们看到elasticsearch所在的目录为/usr/share/elasticsearch ,进入config看到了配置文件

elasticsearch.yml

我们通过vi命令编辑此文件,尴尬的是容器并没有vi命令 ,咋办?我们需要以文件挂载的方式创建容器才行,这样我们就可以通过修改宿主机中的某个文件来实现对容器内配置文件的修改

(7)拷贝配置文件到宿主机

首先退出容器,然后执行命令:

docker cp tensquare_elasticsearch:/usr/share/elasticsearch/config/elasticsearch.yml /usr/share/elasticsearch.yml

(8)停止和删除原来创建的容器

docker stop tensquare_elasticsearch 
docker rm  tensquare_elasticsearch

(9)重新执行创建容器命令

docker run -di --name=tensquare_elasticsearch -p 9200:9200 -p 9300:9300 -v /usr/share/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml 镜像id

(10)修改/usr/share/elasticsearch.yml 将transport.host: 0.0.0.0前的#去掉后保存文件退出。其作用是允许任何ip地址访问elasticsearch .开发测试阶段可以这么做,生产环境下指定具体的IP

将transport.host: 0.0.0.0前的#去掉后保存文件退出

(11)修改/usr/share/elasticsearch.yml ,添加允许跨域配置

http.cors.enabled: true
http.cors.allow-origin: "*"

(12)重启启动

docker restart tensquare_elasticsearch

重启后发现重启启动失败了,这时什么原因呢?这与我们刚才修改的配置有关,因为elasticsearch在启动的时候会进行一些检查,比如最多打开的文件的个数以及虚拟内存区域数量等等,如果你放开了此配置,意味着需要打开更多的文件以及虚拟内存,所以我们还需要系统调优。

(13)系统调优

我们一共需要修改两处

修改/etc/security/limits.conf ,追加内容

* soft nofile 65536
* hard nofile 65536

nofile是单个进程允许打开的最大文件个数 soft nofile 是软限制 hard nofile是硬限制

修改/etc/sysctl.conf,追加内容

vm.max_map_count=655360

限制一个进程可以拥有的VMA(虚拟内存区域)的数量

执行下面命令 修改内核参数马上生效

sysctl -p

(14)重新启动虚拟机,再次启动容器,发现已经可以启动并远程访问

7.2 IK分词器安装

(1)快捷键alt+p进入sftp , 将ik文件夹上传至宿主机

sftp> put -r d:\setup\ik

(2)在宿主机中将ik文件夹拷贝到容器内 /usr/share/elasticsearch/plugins 目录下。

docker cp ik tensquare_elasticsearch:/usr/share/elasticsearch/plugins/

(3)重新启动,即可加载IK分词器

docker restart tensquare_elasticsearch

7.3 HEAD插件安装

(1)重新启动elasticseach容器

(2)下载head镜像(此步省略)

docker pull mobz/elasticsearch‐head:5

(3)创建head容器

docker pull mobz/elasticsearch‐head:5  --拉取镜像
docker run -di --name=myhead -p 9100:9100 镜像id --创建容器

8 RabbitMQ简介

8.1消息队列中间件简介

​ 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题实现高性能,高可用,可伸缩和最终一致性[架构] 使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

以下介绍消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景

应用场景说明:http://www.cnblogs.com/linjiqin/p/5720865.html

8.2什么是RabbitMQ

RabbitMQ 是一个由 Erlang(二郎) 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

1.可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

2.灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

3.消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

4.高可用(Highly Available Queues)

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

5.多种协议(Multi-protocol)

RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

6.多语言客户端(Many Clients)

RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

7.管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

8.跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

9.插件机制(Plugin System)

RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

8.3架构图与主要概念

8.3.1架构图

Exchange:交换器

RoutingKey:通俗理解“消息队列名称”

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XDU5lNt3-1591408722042)(image\5-1.jpg)]

8.3.2主要概念

RabbitMQ Server: 也叫broker server,它是一种传输服务。 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

Producer: 消息生产者,如图A、B、C,数据的发送方。消息生产者连接RabbitMQ服务器然后将消息投递到Exchange。

**Consumer:**消息消费者,如图1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ将Queue中的消息发送到消息消费者。

**Exchange:**生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ中的Exchange有direct、fanout、topic、headers四种类型,每种类型对应不同的路由规则。

Queue:(队列)是RabbitMQ的内部对象,用于存储消息。消息消费者就是通过订阅队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

**RoutingKey:**生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。

Connection: (连接):Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。

Channels: (信道):它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

**VirtualHost:**权限控制的基本单位,一个VirtualHost里面有若干Exchange和MessageQueue,以及指定被哪些user使用

9 走进RabbitMQ

9.1 RabbitMQ安装与启动

9.1.1 windows环境下的安装

(1)下载并安装 Eralng

配套软件中已提供otp_win64_20.2.exe (以管理员身份运行安装)

(2)下载并安装rabbitmq

配套软件中已提供rabbitmq-server-3.7.4.exe。双击安装,注意不要安装在包含中文和空格的目录下!安装后window服务中就存在rabbitMQ了,并且是启动状态。

(3)安装管理界面(插件)

进入rabbitMQ安装目录的sbin目录,输入命令

rabbitmq-plugins enable rabbitmq_management

(4)重新启动服务

(5)打开浏览器,地址栏输入http://127.0.0.1:15672 ,即可看到管理界面的登陆页

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-48ZlzKqT-1591408722043)(image\5-2.jpg)]

输入用户名和密码,都为guest 进入主界面:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3UupKbKp-1591408722043)(image\5-10.jpg)]

最上侧的导航以此是:概览、连接、信道、交换器、队列、用户管理

9.1.2 docker环境下的安装

(1)下载镜像:(此步省略)

docker pull rabbitmq:management

(2)创建容器,rabbitmq需要有映射以下端口: 5671 5672 4369 15671 15672 25672

  • 15672 (if management plugin is enabled)

  • 15671 management监听端口

  • 5672, 5671 (AMQP 0-9-1 without and with TLS)

  • 4369 (epmd) epmd  代表 Erlang 端口映射守护进程

  • 25672 (Erlang distribution)

docker run -di --name=tensquare_rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 镜像id

浏览器访问 http://192.168.66.128:15672/#/

9.2 直接模式(Direct)

9.2.1 什么是Direct模式

我们需要将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OJq13Zdu-1591408722044)(image\5-4.jpg)]

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue。

1.一般情况可以使用rabbitMQ自带的Exchange:”"(该Exchange的名字为空字符串,下文称其为default Exchange)。

2.这种模式下不需要将Exchange进行任何绑定(binding)操作

3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。

4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

9.2.2 创建队列

做下面的例子前,我们先建立一个叫itcast的队列。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ibyyMXWZ-1591408722045)(image\5-5.jpg)]

Durability:是否做持久化 Durable(持久) transient(临时)

Auto delete : 是否自动删除

9.2.3 代码实现-消息生产者

(1)创建工程rabbitmq_demo,引入amqp起步依赖 ,pom.xml如下:

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.1.RELEASE</version>
		<relativePath/> 
	</parent>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

(2)编写配置文件application.yml

spring:
  rabbitmq:
    host: 192.168.66.128

(3)编写启动类

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

(4)编写测试类

@RunWith(SpringRunner.class)
@SpringBootTest(classes=Application.class)
public class MqTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend(){
        rabbitTemplate.convertAndSend("itcast","我要红包");
    }
}

运行测试方法

9.2.4 代码实现-消息消费者

(1)编写消息消费者类

@Component
@RabbitListener(queues="itcast" )
public class Customer1 {

    @RabbitHandler
    public void showMessage(String message){
        System.out.println("itcast接收到消息:"+message);
    }
}

(2)运行启动类,可以在控制台看到刚才发送的消息

9.2.5 测试

开启多个消费者工程,测试运行消息生产者工程,会发现只有一个消费者工程可以接收到消息。

如何在IDEA中多次启动同一个程序呢?

(1)选择IDEA右上角的类名称按钮

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V9pVtmbG-1591408722046)(image\5_1.png)]

(2)选择Edit Configurations

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-B4rBpwQB-1591408722047)(image\5_2.png)]

(3)在弹出窗口中取消单例模式 ,点击OK

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rXCUDMnk-1591408722048)(image\5_3.png)]

(4)每次运行前修改application.yml,指定不同的端口

server:
  port: 9202

运行后在控制台可以看到多个窗口

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y26KhRbt-1591408722049)(image\5_4.png)]

9.3 分列模式(Fanout)

9.3.1 什么是分列(Fanout)模式

当我们需要将消息一次发给多个队列时,需要使用这种模式。如下图:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LIPEL8tz-1591408722050)(image\5-6.jpg)]

任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。

1.可以理解为路由表的模式

2.这种模式不需要RouteKey

3.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。

4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

9.3.2 交换器绑定队列

(1)在queue中添加队列itheima 和kudingyu

(2)新建交换器chuanzhi

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-883OxHNW-1591408722051)(image\5_5.png)]

(3)将kudingyu和itheima两个队列绑定到交换器chuanzhi

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pNC4hPmI-1591408722052)(image\5_6.png)]

点击chuanzhi进入交换器管理界面

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QpAKNb4m-1591408722053)(image\5_7.png)]

点击Bindings添加绑定 itheima和kudingyu

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5UJEDbpQ-1591408722054)(image\5_9.png)] 绑定后效果如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V0mvALG4-1591408722054)(image\5_8.png)]

9.3.3 代码实现-消息生产者

	@Test
	public void testSendFanout(){
		rabbitTemplate.convertAndSend("chuanzhi","", "分列模式走起");
	}

9.3.4 代码实现-消息消费者

创建消息监听类,用于监听itheima的消息

@Component
@RabbitListener(queues="itheima" )
public class Customer2 {
    @RabbitHandler
    public void showMessage(String message){
        System.out.println("itheima接收到消息:"+message);
    }
}

创建消息监听类,用于监听kudingyu的消息

@Component
@RabbitListener(queues="kudingyu" )
public class Customer3 {
    @RabbitHandler
    public void showMessage(String message){
        System.out.println("kudingyu接收到消息:"+message);
    }
}

9.3.5 测试

启动消费者工程,发送消息测试

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DqlvLpRy-1591408722055)(image\5_10.png)]

9.4 主题模式(Topic)

9.4.1 什么是主题模式

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CiBGNZ5M-1591408722056)(image\5_11.png)]

如上图所示 
此类交换器使得来自不同的源头的消息可以到达一个对列,其实说的更明白一点就是模糊匹配的意思,例如:上图中红色对列的routekey为usa.#,#代表匹配任意字符,但是要想消息能到达此对列,usa.必须匹配后面的#好可以随意。图中usa.news usa.weather,都能找到红色队列,符号#匹配一个或多个词,符号*匹配不多不少一个词。因此usa.#能够匹配到usa.news.XXX,但是usa.*只会匹配到usa.XXX。 
注: 
交换器说到底是一个名称与队列绑定的列表。当消息发布到交换器时,实际上是由你所连接的信道,将消息路由键同交换器上绑定的列表进行比较,最后路由消息。

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。

2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。

3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。

4.“#”表示0个或若干个关键字,“”表示一个关键字。如“log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。

5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息

9.4.2 创建队列与绑定

(1)新建一个交换器 ,类型选择topic

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wGAqfSCy-1591408722057)(image\5_14.png)]

(2)点击新建的交换器topictest

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kUoGZ7g4-1591408722058)(image\5_15.png)]

添加匹配规则,添加后列表如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BWoZ0JNo-1591408722058)(image\5_13.png)]

9.4.3 代码实现

编写测试类方法:

    @Test
    public void testSendTopic1(){
        rabbitTemplate.convertAndSend("topictest","goods.aaa","主题模式");
    }

输出结果:itcast接收到消息:主题模式

    @Test
    public void testSendTopic2(){
        rabbitTemplate.convertAndSend("topictest","article.content.log","主题模式");
    }

输出结果:itheima接收到消息:主题模式

    @Test
    public void testSendTopic3(){
        rabbitTemplate.convertAndSend("topictest","goods.log","主题模式");
    }

输出结果:

itheima接收到消息:主题模式
itcast接收到消息:主题模式
kudingyu接收到消息:主题模式

面试问题总结

你在项目中如何开发搜索模块

elasticsearch Spring DataElasticsearch

你如何实现数据库与索引库的同步?

十次方项目ELK

你如何实现索引库的分词

IK 两种算法 最少切分 最细切分

Solr 和Elasticsearch性能区分

https://www.cnblogs.com/chowmin/articles/4629220.html

项目中哪部分业务用到消息队列

用户注册发送短信验证码

项目中使用哪种消息队列?

rabbitMQ

RabbitMQ 有哪几种发送模式

直接模式 分列模式 主题模式 headers

Logo

更多推荐