SpringCloud、Eureka、Zuul、Rabbitmq
1.eureka注册中心注册中心服务端主要对外提供了三个功能:服务注册:服务提供者启动时,会通过 Eureka Client 向 Eureka Server 注册信息,Eureka Server 会存储该服务的信息,Eureka Server 内部有二层缓存机制来维护整个注册表。提供注册表:服务消费者在调用服务时,如果 Eureka Client 没有缓存注册表的话,会从 Eureka Serve
1.eureka注册中心
注册中心服务端主要对外提供了三个功能:
服务注册:
服务提供者启动时,会通过 Eureka Client 向 Eureka Server 注册信息,Eureka Server 会存储该服务的信息,Eureka Server 内部有二层缓存机制来维护整个注册表。
提供注册表:
服务消费者在调用服务时,如果 Eureka Client 没有缓存注册表的话,会从 Eureka Server 获取最新的注册表。
同步状态:
Eureka Client 通过注册、心跳机制和 Eureka Server 同步当前客户端的状态。
2.创建项目
2.1添加eureka依赖
2.2修改pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud1</artifactId>
<groupId>cn.tedu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sp05-eureka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>sp05-eureka</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>
</project>
2.3修改application.yml文件
# 应用名称
spring:
pplication:
name: eureka-server
#2001,3001,4001,5001,6001
server:
port: 2001
eureka:
server:
enable-self-preservation: false #禁用自我保护机制
instance:
hostname: eureka1 #主机名
client:
register-with-eureka: false #不向自己注册
fetch-registry: false #不从自己拉取
Eureka四条运行机制:
1.注册:客户端会一次一次的反复注册,直到注册成功为止。
2.拉取:客户端每隔30秒,重复的拉取、刷新本地缓存的注册表。
3.心跳:客户端每隔30秒向服务器发送心跳,如果服务器连续3次收不到一个服务的心跳,就会删除该服务的注册信息。
4.自我保护模式:
网络不稳定,或网络中断时,15分钟内85%服务器都出现心跳异常
,会进入自动保护模式;
这种特殊情况下,会保护所有注册信息不删除;
等待网络回复正常后,自动退出保护模式;
开发调试期间应该禁用保护模式,避免影响测试。
- eureka 集群服务器之间,通过 hostname 来区分。
- eureka.server.enable-self-preservation:eureka 的自我保护状态:心跳失败的比例,在15分钟内是否超过85%,如果出现了超过的情况,Eureka
Server会将当前的实例注册信息保护起来,同时提示一个警告,一旦进入保护模式,Eureka
Server将会尝试保护其服务注册表中的信息,不再删除服务注册表中的数据。也就是不会注销任何微服务。- eureka.client.register-with-eureka=false:不向自身注册。
- eureka.client.fetch-registry=false:不从自身拉取注册信息。
- eureka.instance.lease-expiration-duration-in-seconds:最后一次心跳后,间隔多久认定微服务不可用,默认90。
2.4启动类添加@EnableEurekaServer注解
2.5为02、03、04工程添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
2.6 02工程配置文件添加
spring:
application:
name: user-service
server:
port: 8101
#用户的demo数据
#[{id:7,username:xx,password:xx},{8..},{9..}]
sp:
user-service:
users: "[{\"id\":7, \"username\":\"abc\",\"password\":\"123\"},
{\"id\":8, \"username\":\"def\",\"password\":\"456\"},
{\"id\":9, \"username\":\"ghi\",\"password\":\"789\"}]"
#/eureka 子路径是客户端调用的 REST API 路径,浏览器不能访问
eureka:
client:
service-url:
defaultZone: http://eureka1:2001/eureka
- defaultZone:默认地点,也可以从云服务商购买不同地点的eurea服务
2.7修改 hosts 文件,添加 eureka 域名映射
C:\Windows\System32\drivers\etc\hosts
管理员打开
添加内容:
127.0.0.1 eureka1
127.0.0.1 eureka2
2.8启动,并访问测试
http://eureka1:2001
3.Eureka 和 “服务提供者”的高可用
3.1item-service 高可用
启动参数 --server.port 可以覆盖yml中的端口配置;
通过启动参数设置启动的端口:
java -jar item.jar --server.port=8001
java -jar item.jar --server.port=8002
启动测试:
3.2Erueka高可用
添加两个服务器的 profile 配置文件:
application-eureka1.yml:
#2001,3001,4001,5001,6001
server:
port: 2001
eureka:
instance:
hostname: eureka1 #主机名
client:
register-with-eureka: true #false不向自己注册,profile的配置会覆盖公用配置
fetch-registry: true #false不从自己拉取,profile的配置会覆盖公用配置
service-url:
defaultZone: http://eureka2:2002/eureka #eureka1启动时向eureka2注册
application-eureka2.yml:
#2001,3001,4001,5001,6001
server:
port: 2002
eureka:
instance:
hostname: eureka2 #主机名
client:
register-with-eureka: true #false不向自己注册,profile的配置会覆盖公用配置
fetch-registry: true #false不从自己拉取,profile的配置会覆盖公用配置
service-url:
defaultZone: http://eureka1:2001/eureka #eureka1启动时向eureka2注册
访问 eureka 服务器,查看注册信息:
http://eureka1:2001/
http://eureka2:2002/
3.3eureka客户端注册时,向两个服务器注册
修改以下微服务:
sp02-itemservice
sp03-userservice
sp04-orderservice
#/eureka 子路径是客户端调用的 REST API 路径,浏览器不能访问
eureka:
client:
service-url:
defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
4. 02orderservice远程调用02和03
4.1添加feign依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
4.2启动类添加@EbableFeignClients注解
4.3添加两个远程调用接口
package cn.tedu.sp04.feign;
import cn.tedu.sp01.pojo.Item;
import cn.tedu.sp01.web.util.JsonResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import java.util.List;
/**
* 1.调用哪个服务
* 2.调用这个服务的哪个路径
* 3.向这个路径提交什么参数
* */
@FeignClient(name="item-service",contextId = "itemClient")
public interface ItemClient {
//获取订单的商品列表
@GetMapping("/{orderId}")
JsonResult<List<Item>> getItems(@PathVariable("orderId") String orderId);
//减少商品库存
@PostMapping("/decreaseNumber")
JsonResult<?> decreaseNumber(@RequestBody List<Item> items);
}
package cn.tedu.sp04.feign;
import cn.tedu.sp01.pojo.User;
import cn.tedu.sp01.web.util.JsonResult;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
@FeignClient(name = "user-service",contextId = "userClient")
public interface UserClient {
//获取用户
@GetMapping("/{userId}")
JsonResult<User> getUser(@PathVariable("userId") Integer userId);
//增加用户积分
@GetMapping("/{userId}/score") //?score=1000
JsonResult<?> addScore(@PathVariable("userId") Integer userId,
@RequestParam Integer score);
}
4.4在订单实现类远程调用两个接口
5.Feign集成Ribbon
Ribbon提供负载均衡和重试的功能
Ribbon重试:调用后台服务失败(异常、超时、宕机),可以自动发起重试调用。
随机向一台服务器请求,请求失败(重试次数默认为0)则会更换服务器(更换服务器次数默认为1)。
单台服务器的重试次数MaxAutoRetries(默认0)(x),更换服务器的次数 MaxAutoRetriesNextServer(默认1)(y),最大请求次数(x+1)*(y+1)
在itemController模拟阻塞运算:
//模拟阻塞运算
if (Math.random()<0.9){//90%概率执行阻塞代码
//阻塞时长随机0-5秒
int t = new Random().nextInt(5000);
log.info("阻塞:"+t);
Thread.sleep(t);
}
- ribbon.MaxAutoRetries:重试次数,默认为0
- ribbon.MaxAutoRetriesNextServer:更换服务器次数,默认为1
- ribbon.ReadTimeout:超时时间,默认1000
- ribbon.ConnectTimeout:与后台服务器建立连接的超时时间,默认1000
- ribbon.OkToRetryOnAllOperations:是否对所有类型请求都重试,默认只对GET请求重试
6.Zuul API网关
统一的入口
统一的权限校验
集成Ribbon
集成Hystrix
统一的入口:
1.新建spring模块:sp06-zuul
选择eureka client依赖
2.添加依赖:zuul、eureka client、sp01
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud1</artifactId>
<groupId>cn.tedu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sp06-zuul</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>sp06-zuul</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR9</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-zuul</artifactId>
</dependency>
</dependencies>
</project>
3.配置yml:
# 应用名称
spring:
application:
name: zuul
# 2001,3001,4001,5001,6001
server:
port: 3001
eureka:
client:
service-url:
defaultZone: http://eureks1:2001/eureka, http://eureks2:2002/eureka,
#路由转发规则
#最好手动配置
zuul:
routes:
item-service: /item-service/** # **包含深层子路径,*只含当前一层路径
user-service: /user-service/**
order-service: /order-service/**
4.启动类加@EnableZuulProxy注解
启用zuul网关
先启动两个eureka服务,启动02、03、04,最后启动zuul
7.统一权限校验
模拟登录检查:
http://localhost:3001/order-service/order123456 没有登录,不允许访问
http://localhost:3001/order-service/order123456?token=fdshnkj 已经登录可以访问
1.新建过滤器,继承ZuulFilter
2.按照规则实现ZuulFilter
3.添加注解@Component
- zuul的自动配置类会自动配置过滤器
添加过滤类:
@Component
public class AccessFilter extends ZuulFilter {
//过滤器类型,pre前置,routing 运行 post后置,error错误
@Override
public String filterType() {
//return "pre";
return FilterConstants.PRE_TYPE;//前置过滤器
}
//过滤器顺序号
@Override
public int filterOrder() {
return 6;//放在第6位
}
//针对当前请求,是否执行过滤代码
@Override
public boolean shouldFilter() {
//调用item-service需要判断权限
//否则,不判断权限,直接跳过过滤代码
//获取请求上下文对象
RequestContext ctx = RequestContext.getCurrentContext();
//从上下文对象获得调用的服务id
String serviceId = (String) ctx.get(FilterConstants.SERVICE_ID_KEY);//("serviceId)
//如果服务id是 item-service,返回true,否则返回false
return "item-service".equalsIgnoreCase(serviceId);//IgnoreCase忽略大小写
}
//过滤代码
@Override
public Object run() throws ZuulException {
// http://localhost:3001/order-service/order123456?token=fdshnkj
//获取请求上下文对象
RequestContext ctx = RequestContext.getCurrentContext();
//从上下文获得request对象
HttpServletRequest request = ctx.getRequest();
//从request取出token参数
String token = request.getParameter("token");
//如果没有token,null,""
if (StringUtils.isBlank(token)) {
//阻止继续调用
ctx.setSendZuulResponse(false);
//直接向客户端返回响应
String json = JsonResult.build().code(400).msg("未登录").toString();
ctx.addZuulResponseHeader("Content-Type", "application/json;charset=UTF-8");
ctx.setResponseBody(json);
}
return null;
}
}
Zuul集成Ribbon
- 默认启用了负载均衡
- 默认不启用重试,一般不在网关添加重试功能,否则可能造成后台服务器压力过大,出现大面积故障,重试功能应该尽量靠后添加
Zuul启用重试:
1.添加依赖:
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
2.修改配置文件:
Zuul集成Hystrix
向后台服务转发调用,使用Hystrix进行容错和限流
Zuul默认已经启用了Hystrix。不做任何配置
8.Hystrix
容错(降级)和限流(熔断)工具
Zuul集成Hystrix添加降级
1.新建降级类,实现FallBackProvider接口
2.按接口规则实现
3.添加注解:@Component
zuul的自动配置类可以完成自动配置
Hystrix熔断:
流量过大,出现故障,可以熔断,断开链路,减轻后台服务的压力
- 熔断的触发条件: 10秒20次请求 ,50%请求出错
- 断路器打开后一段时间,会进入“半开状态”:半开状态下会尝试发送一次客户端调用,调用成功,关闭断路器恢复正常;调用失败,继续保持打开状态
使用Actuator暴露Hystrix监控日志
Hystrix利用Actuator来暴露自己的监控日志
添加Actuator:
Actuator时Springboot提供的一个项目指标工具
- 健康状态
- spring容器 中所有对象
- springmvc映射的所有路径
- java虚拟机堆内存镜像
Actuator依赖:
暴露监控日志:
Hystrix数据监控-Hystrix Dashboard
1.新建spring模块:sp07-hystrix-dashboard
2.添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud1</artifactId>
<groupId>cn.tedu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sp07-hystrix-bashboard</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>sp07-hystrix-bashboard</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
</dependencies>
</project>
3.配置文件
server:
port: 4001
hystrix:
dashboard:
proxy-stream-allow-list: localhost
4.启动类加注解@EnableHystrixDashboard
5.访问: http://localhost:4001/hystrix
然后在里边输入http://localhost:3001/actuator/hystrix.stream测试
不断刷新此网址http://localhost:3002/item-service/156?token=asfdassfad,观察检测信息
06网关高可用
9.创建Turbine
从多台服务器抓取日志,进行聚合。
Hystrix Dashboard可以从Turbine抓取聚合后的日志数据。
1.新建sp08-turbine
2.添加依赖:turbine、eureka client
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud1</artifactId>
<groupId>cn.tedu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sp08-turbine</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>sp08-turbine</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-turbine</artifactId>
</dependency>
</dependencies>
</project>
3.配置yml文件
抓取的访问列表: zuul,a,b,c
为聚合后的数据命名: new String(“default”)
# 应用名称
spring:
application:
name: turbine
# eureka2001,zuul3001,dashboard4001,turbine5001
server:
port: 5001
eureka:
client:
service-url:
defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
turbine:
app-config: zuul
cluster-name-expression: new String("default")
4.启动类注解:@EnableTurbine
5.访问聚合日志:http://localhist:5001/turbine.stream
在此处进入命令窗口,输入zb -n 2000 -c 50 http://localhost:3002/item-service/156?token=asfdassfad
Spring cloud config 配置中心
集中的管理,
在springcloud1下创建config文件夹,并创建配置文件:
- item-service-dev.yml
- user-service-dev.yml
- order-service-dev.yml
搭建配置中心:
1.新建sp09-config
2.添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springcloud1</artifactId>
<groupId>cn.tedu</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>sp09-config</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>sp09-config</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
</project>
3.配置文件
# 应用名称
spring:
application:
name: config-service
cloud:
config:
server:
git:
uri: https://gitee.com/jiahaobz/springcloud.git
server:
port: 6001
eureka:
client:
service-url:
defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
4.启动类加注解:
@EnableConfigServer
5.访问测试:
http://localhost:6001/item-service/dev
http://localhost:6001/user-service/dev
http://localhost:6001/order-service/dev
配置中心的客户端:
1.把2,3,4的application.yml注释掉
2.添加依赖:
03添加依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
3.添加新的配置文件:bootstrap.yml
bootstrap.yml,引导配置,应用启动之前先执行
application.yml应用期待着之后执行
eureka:
client:
service-url:
defaultZone: http://eureka1:2001/eureka,http://eureka2:2002/eureka
spring:
cloud:
config:
discovery:
enabled: true
service-id: config-service
#下载配置文件
name: user-service
profile: dev
检查确认:
1.按顺序启动项目:
05-eureka,等待完全启动
09-config,等待完全启动
03、04、06…
2.检查配置中心
注册表中有config-server
http://localhost:6001/user-service/dev
http://localhost:6001/item-service/dev
http://localhost:6001/order-service/dev
3.03-user的控制台日志,有没有连接6001服务器
10.RabbitMQ
消息队列、消息服务、消息中间件Broker
常见服务器:Rabbitmq、Activemq、Rocketmq、Kafka、Tubemq
搭建Rabbitmq服务器:
设置vm网段:
ls
./ip-dhcp
ifconfig
ip设置失败:
nmcli n on
systemcli restart NetworkManager
1.克隆一份虚拟机,docker-base,传文件
1.1从docker-base克隆一份虚拟机:rabbirmq
2.设置固定ip
./ip-static
ip:192.168.64.140
ifconfig
3.上传文件到/root/
4.导入镜像
systemctl restart docker
docker load -i rabbit-image.gz
docker images
5.安装Rabbitmq:
Docker 启动Rabbitmq
关闭防火墙:
systemctl stop firewalld
systemctl disable firewalld
# 重启 docker 系统服务
systemctl restart docker
配置管理员用户名和密码:
mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf
# 添加两行配置:
default_user = admin
default_pass = admin
启动Rabbitmq:
docker run -d --name rabbit \
-p 5672:5672 \
-p 15672:15672 \
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
--restart=always \
rabbitmq:management
访问管理控制台 http://192.168.64.140:15672
用户名密码是 admin
11.Rabbitmq创建
创建一个空的project,然后船舰一个maven工程:
添加依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.tedu</groupId>
<artifactId>rabbitmq-api</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.4.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
创建服务者(发送消息):
package m1;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接服务器
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();
Channel c = con.createChannel(); //通信通道
//在服务器上创建一个队列:helloworld
//如果队列在服务器上存在,不会重复创建
//第二个参数:是否是持久队列;
//第三个参数:是否是排他队列、独占队列
//第四个参数:是否自动删除
//第五个参数:队列的其他属性
c.queueDeclare("helloworld",false,false,false,null);
//向helloworld队列发送消息
c.basicPublish("", "helloworld", null, "Hello World".getBytes());
//不执行消息,关闭连接,关闭通信通道
c.close();
con.close();
}
}
创建消费者(接收消息):
package m1;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();
Channel c = con.createChannel(); //通信通道
//创建队列
c.queueDeclare("helloworld",false,false,false,null);
System.out.println("等待接收数据");
//创建回调对象
DeliverCallback deliverCallback = (consumerTag, message) -> {
byte[] a = message.getBody();
String s = new String(a);
System.out.println("收到:"+s);
};
CancelCallback cancelCallback = consumerTag ->{};
//开始接收消息,把消息传递给一个回调对象进行处理
//第二个参数:是否自动确认(true),autoAck
c.basicConsume("helloworld", true,deliverCallback,cancelCallback);
}
}
合理分发:
1.让服务器可以知道消费者有没有处理完消息,手动确认
2. QPS=1,与抓取的消息数量,每次只接收一条消息,处理完之前不收下一条,手动确认模式才有效
3. 我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。
创建服务者:
package m2;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接服务器
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();
Channel c = con.createChannel(); //通信通道
//在服务器上创建一个队列:helloworld
//如果队列在服务器上存在,不会重复创建
//第二个参数:是否是持久队列;
//第三个参数:是否是排他队列、独占队列
//第四个参数:是否自动删除
//第五个参数:队列的其他属性
c.queueDeclare("helloworld", false, false, false, null);
//向helloworld队列发送消息
while (true) {
System.out.println("输入消息:");
String s = new Scanner(System.in).nextLine();
c.basicPublish("", "helloworld", null, s.getBytes());
}
//不执行消息,关闭连接,关闭通信通道
}
}
创建消费者:
package m2;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();
Channel c = con.createChannel(); //通信通道
//创建队列
c.queueDeclare("helloworld",false,false,false,null);
System.out.println("等待接收数据");
//创建回调对象
DeliverCallback deliverCallback = (consumerTag, message) -> {
String s = new String(message.getBody());
System.out.println("收到:"+s);
//遍历访问每一个字符,每遇到一个‘.’,暂停一秒
for (int i = 0; i < s.length(); i++) {
if (s.charAt(i)=='.'){
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
}
//发送回执
//1:回执,2:是否同时确认之前收到过的多条消息
c.basicAck(message.getEnvelope().getDeliveryTag(),false);
System.out.println("...........消息处理完成");
};
CancelCallback cancelCallback = consumerTag ->{};
//QPS=1,每次受一条,处理完之前不收下一条,手动ack模式才有效
c.basicQos(1);
//开始接收消息,把消息传递给一个回调对象进行处理
//第二个参数:是否自动确认(true),autoAck
c.basicConsume("helloworld", false,deliverCallback,cancelCallback);
}
}
消息持久化:
1.队列持久化
添加交换机:
创建生产者:
package m3;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接服务器
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();
Channel c = con.createChannel(); //通信通道
//创建fanout交换机:logs
//c.exchangeDeclare("logs", "fanout");
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);//默认是非持久,FANOUT表示持久
//向交换机发送消息
while (true){
System.out.println("输入消息:");
String s = new Scanner(System.in).nextLine();
//对交换机,第二个参数无效
c.basicPublish("logs", "", null, s.getBytes());
}
}
}
创建消费者:
package m3;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//连接
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672); //5672用来收发消息,15672时管理控制台端口
f.setUsername("admin");
f.setPassword("admin");
Connection con = f.newConnection();
Channel c = con.createChannel(); //通信通道
//1.创建队列
String queue = UUID.randomUUID().toString();
c.queueDeclare(queue,false,true,true,null);
//2.创建交换机
c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
//3.绑定
c.queueBind(queue,"logs", "");
}
}
12.BUS配置刷新,Rabbitmq整合到微服务中
在2、3、4、9添加BUS、Rabbitmq
1.在2、3、4、9添加依赖:rabbitmq、bus、binder-rabbit
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-bus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
09再单独加一个依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
2.修改09的application.yml,添加Rabbitmq连接配置
spring:
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
management:
endpoints:
web:
exposure:
include: bus-refresh
3.修改config目录中的三个配置文件,添加Rabbitmq连接配置
在spring下一层添加
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
4.提交推送到gitee仓库
5.启动:
先等待05启动完成,等待09启动完成,
然后访问测试:http://localhost:6001/order-service/dev,http://localhost:6001/item-service/dev,http://localhost:6001/user-service/dev
02 03 04 06启动后,查看控制台,要有连接6001服务器,
再访问:http://localhost:6001/actuator ,查看有没有bus-refresh,
有就用post请求:http://localhost:6001/actuator/bus-refresh
消息服务案例
1.bus配置刷新
向rabbitmq发送刷新指令,其他模块接受指令并执行
2.sleuth+zipkin链路跟踪
产生的链路跟踪日志发送到rabbitmq,zipkin从rabbitmq接收日志,
简单模式
3.订单的流量削峰
购物系统的订单,先发送到rabbitmq,后台消费者模块一个一个接收,存储订单,短时间内产生的大量订单,变成顺序处理,处理时间拉长。
简单模式或工作模式
13. sleuth+zipkin链路跟踪
sleuth
用来产生链路监控日志,
在2、3、4、6添加sleuth依赖,sleuth是0配置
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>
zipkin
通过消息服务转发,解耦,流浪削峰,
2、3、4、6添加zipkin客户端,向rabbitmq发送日志:
1.在2、3、4、6添加zipkin客户端依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
2.在6中添加rabbitmq依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.在6中添加rabbitmq连接配置
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
4.在2、3、4、6添加发送方式配置:rabbitmq
zipkin:
sender:
type: rabbit
下载zipkin服务器,开启服务器,连接rabbitmq:
java -jar zipkin-server-2.23.16-exec.jar --zipkin.collector.rabbitmq.uri=amqp://admin:admin@192.168.64.140:5672
测试:
http://localhost:9411/zipkin
14.eureka客户端选择正确网卡,注册ip地址
选择正确网卡,eureka客户端会自动选择网卡,可能会选择错误网卡进行注册,
手动指定注册网卡:
修改bootstrap.yml文件
spring:
cloud:
inetutils:
ignored-interfaces: # 忽略的网卡
- VM.* #.任意字符 *0到多个
preferred-networks: # 要是用的网卡的网段
- 192\.168\.0\..+ # +是一到多个
15.拼多商城项目
2.springboot版本改成2.3.2RELEASE
3.如果数据库文件导入失败,执行下面sql语句,增大mysql缓存大小:
set global max_allowed_packet=100000000;
set global net_buffer_length=100000;
set global interactive_timeout=28800000;
set global wait_timeout=28800000;
4.删除测试数据
delete from pd_user
delete from pd_order
delete from pd_order_item
5.选择SDK1.8
6.启动项目
修改RunPdApp的启动配置,设置working directory工作目录:
购物系统生成的订单发送到rabbitmq
1.启动rabbitmq服务,可以重启docker容器
2.添加rabbitmq依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.2</version>
</dependency>
3.yml添加rabbit连接配置
spring:
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
#使用rabbitmq下自己空间
virtual-host: gjh
4.在启动类,或添加自动配置类里,添加队列参数配置
package com.pd.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置使用的队列的参数
*/
@Configuration
public class QueueConfig {
//RabbitAutoConfiguration自动配置类,会根据这里设置参数,在服务器上创建队列
@Bean
public Queue orderQueue() {
//持久,非独占,不自动删除
return new Queue("orderQueue", true, false, false);
}
}
5.在orderServiceImpl,注入AmqpTemplate工具,使用这个工具发送订单
复制一份pd-web改名为pd-web-consumer
修改端口为81,
在consumer创建OrderConsumer类
package com.pd;
import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 自动创建实例
* 自动注册成为消费
* 自动开始接收消息
* 自动处理收到的消息
* */
@Component
//用来接收消息
@RabbitListener(queues = "orderQueue")
public class OrderConsumer {
@Autowired
private OrderService orderService;
@RabbitHandler //指定处理消息的方法,在同一个类中,只能设置一次
public void receive(PdOrder pdOrder) throws Exception {
orderService.saveOrder(pdOrder);
}
}
spring-boot整合rabbitmq
创建springboot项目
添加依赖:
修改配置文件,添加rabbitmq配置:
spring:
rabbitmq:
host: 192.168.64.140
port: 5672
username: admin
password: admin
#使用rabbitmq下自己空间
virtual-host: gjh
创建m1包
添加启动类:
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//配置helloworld队列的参数
@Bean
public Queue helloWorldQueue(){
return new Queue("helloworld",false,false,false);
}
@Autowired
private Producer p;
/*
springboot 执行流程:
扫描创建实例-->自动注入-->@PostConstruct-->后续流程
*/
@PostConstruct
public void test() {
//在新的线程中执行阻塞
new Thread(() -> {
try {
Thread.sleep(3000); //等待消费者启动再发消息
} catch (InterruptedException e) {
}
p.send();
}).start();
}
}
生产者:
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(){
t.convertAndSend("helloworld","helloWorld!");
System.out.println("消息已发送");
}
}
消费者:
/**
* 自动创建实例
* 自动注册成为消费
* 自动开始接收消息
* 自动处理收到的消息
* */
@Component
public class Consumer {
@RabbitListener(queues = "helloworld")
public void receive(String msg){
System.out.println("收到:"+msg);
}
}
测试:
创建m2包(工作模式,轮询)
在主程序中创建名为**task_queue
**的持久队列:
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//配置htask_queue持久队列的参数
@Bean
public Queue task_queue() {
return new Queue("task_queue", true, false, false);
}
@Autowired
private Producer p;
/*
springboot 执行流程:
扫描创建实例-->自动注入-->@PostConstruct-->后续流程(启动消费者)
*/
@PostConstruct
public void test() {
//在新的线程中执行阻塞
new Thread(() -> {
while (true){
System.out.println("输入消息:");
String s = new Scanner(System.in).nextLine();
p.send(s);
}
}).start();
}
}
生产者:
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(String msg){
t.convertAndSend("task_queue",msg.getBytes());
System.out.println("消息已发送");
}
}
消费者(创建两个):
@RabbitListener会创建一个消费者
@Component
public class Consumer {
@RabbitListener(queues = "task_queue")
public void receive1(String msg){
System.out.println("消费者1收到:"+msg);
}
@RabbitListener(queues = "task_queue")
public void receive2(String msg){
System.out.println("消费者2收到:"+msg);
}
}
Ack模式:
- 合理分发:
-
1.手动Ack
-
Spring集成Rabbitmq,默认就是手动Ack,spring会自动发送回执
-
2.qos=1
-
yml配置添加prefetch参数,默认值250
- 消息持久化:
-
1.队列持久化 "task_queue", true, false, false
-
2.消息的持久化,spring默认已经添加持久参数
设置Ack模式,配置ynl文件:
抓取数量:
工作模式中, 为了合理地分发数据, 需要将 qos 设置成 1, 每次只接收一条消息, 处理完成后才接收下一条消息
spring:
rabbitmq:
listener:
simple:
prefetch: 1
创建m3包(发布和订阅模式)
广播消息,两个消费者同时能收到消息
修改main类:
创建队列修改创建交换机
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//创建交换机
@Bean
public FanoutExchange logs(){
//参数1:非持久参数2:不自动删除
return new FanoutExchange("logs",false,false);
}
@Autowired
private Producer p;
/*
springboot 执行流程:
扫描创建实例-->自动注入-->@PostConstruct-->后续流程(启动消费者)
*/
@PostConstruct
public void test() {
//在新的线程中执行阻塞
new Thread(() -> {
while (true){
System.out.println("输入消息:");
String s = new Scanner(System.in).nextLine();
p.send(s);
}
}).start();
}
}
生产者:
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(String msg){
t.convertAndSend("logs","",msg.getBytes());
System.out.println("消息已发送");
}
}
消费者:
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
//随机队列,spring会自动随机命名,非持久、独占、自动删除(false,true,true)
value = @Queue(),
//交换机
//declare = "false"不在这里创建交换机,只用名称引用交换机
exchange = @Exchange(name = "logs",declare = "false")
))
public void receive1(String msg){
System.out.println("消费者1收到:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
//随机队列,spring会自动随机命名,非持久、独占、自动删除(false,true,true)
value = @Queue(),
//交换机
//declare = "false"不在这里创建交换机,只用名称引用交换机
exchange = @Exchange(name = "logs",declare = "false")
))
public void receive2(String msg){
System.out.println("消费者2收到:"+msg);
}
}
路由模式
- 使用
direct 交换机
- 队列和交换机绑定时, 设置**
绑定键
** - 发送消息时, 指定**
路由键
**
main启动类:
创建交换机为direct:
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
//创建交换机
@Bean
public DirectExchange logs(){
//参数1:非持久参数2:不自动删除
return new DirectExchange("direct_logs",false,false);
}
@Autowired
private Producer p;
/*
springboot 执行流程:
扫描创建实例-->自动注入-->@PostConstruct-->后续流程(启动消费者)
*/
@PostConstruct
public void test() {
//在新的线程中执行阻塞
new Thread(() -> {
while (true){
System.out.println("输入消息:");
String s = new Scanner(System.in).nextLine();
System.out.println("输入路由键:");
String k = new Scanner(System.in).nextLine();
p.send(k,s);
}
}).start();
}
}
生产者:
@Component
public class Producer {
@Autowired
private AmqpTemplate t;
public void send(String k,String msg){
t.convertAndSend("direct_logs",k,msg.getBytes());
System.out.println("消息已发送");
}
}
消费者:
@Component
public class Consumer {
@RabbitListener(bindings = @QueueBinding(
//随机队列,spring会自动随机命名,非持久、独占、自动删除(false,true,true)
value = @Queue(),
//交换机
//declare = "false"不在这里创建交换机,只用名称引用交换机
exchange = @Exchange(name = "direct_logs",declare = "false"),
//绑定键关键词
key = {"error"}
))
public void receive1(String msg){
System.out.println("消费者1收到:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
//随机队列,spring会自动随机命名,非持久、独占、自动删除(false,true,true)
value = @Queue(),
//交换机
//declare = "false"不在这里创建交换机,只用名称引用交换机
exchange = @Exchange(name = "direct_logs",declare = "false"),
//绑定键关键词
key = {"info","warning","error"}
))
public void receive2(String msg){
System.out.println("消费者2收到:"+msg);
}
}
测试:
主题模式
主题模式不过是具有特殊规则的路由模式, 代码与路由模式基本相同:
- 使用
topic 交换机
- 使用特殊的绑定键和路由键规则
main启动类:
生产者:把交换机名字改为topic_logs
消费者:把交换机名字改为topic_logs
测试:
更多推荐
所有评论(0)