微服务远程调用

提供者和消费者

  • 服务提供者:一次业务中,被其他微服务调用的服务。(提供接口给其他微服务)
  • 服务消费者:一次业务中,调用其他微服务的服务。(调用其他微服务提供的接口)
  • 提供者与消费者是相对的
  • 一个服务可以同时是服务提供者和服务消费者

Eureka注册中心

Eureka的作用

  1. 注册服务信息
  2. 拉去服务 user-serice的信息
  3. 负载均衡
  4. 远程调用
  • 消费者该如何获取服务提供者具体信息?
    • 服务提供者启动时向eureka注册自己的信息
    • eureka保存这些信息
    • 消费者根据服务名称向eureka拉去提供者信息
  • 如果有多个服务提供者,消费者该如何选择?
    • 服务消费者利用负载均衡算法,从服务列表中挑选一个
  • 消费者如何感知服务提供者健康状态?
    • 服务提供者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
    • eureka会更新记录服务列表信息,心跳不正常会被剔除
    • 消费者就可以拉取到最新的信息

Eureka架构

在Eureka架构中微服务角色有两类:

  • EurekaServer:服务端,注册中心
    • 记录服务信息
    • 心跳监控
  • EurekaClient:客户端
    • Provider:服务提供者,例如案例中的user-service
      • 注册自己的信息到EurekaServer
      • 每隔30秒向EurekaServer发送心跳
    • consumer:服务消费者,例如案例中的order-serice
      • 根据服务名称从EurekaServer拉去服务列表
      • 基于服务列表做负载均衡,选中一个微服务后发起远程调用

搭建EurekaServer

搭建EurekaServer服务步骤如下:

  1. 创建项目,引入spring-cloud-starter-netflix-eureka-server的依赖
    1
    2
    3
    4
    <dependency>  
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
  2. 编写启动类,添加@EnableEurekaServer注解
  3. 添加application.yml文件,编写下面的配置:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    server:
    port: 10086
    spring:
    applicatiopn:
    name: eurekaserver
    eureka:
    client:
    service-url:
    defaultZone: http://127.0.0.1:10086/eureka

注册user-service

将user-service服务注册都按EurekaServer步骤如下:

  1. 在user-service项目引入spring-cloud-starter-netflix-eureka-client的依赖
    1
    2
    3
    4
    5
    <!--eureka客户端依赖-->  
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
  2. 在application.yml文件,编写下面的配置:
    1
    2
    3
    4
    5
    6
    7
    spring:
    application:
    name: userservice
    eureka:
    client:
    service-url:
    defaultZone: http://127.0.0.1:10086/eureka/

另外,我们可以将user-service多次启动,模拟多实例部署,但为了避免端口冲突,需要修改端口设置:
Pasted image 20231010114855

服务拉取

在order-service完成服务拉取

服务拉取是基于服务名称获取服务列表,然后在对服务列表做负载均衡

  1. 修改OrderService的代码,修改访问的url路径,用服务名代替ip、端口:
    1
    String url = "http://userservice/user/" + order.getUserId();
  2. 在order-service项目的启动类OrderApplication中的RestTemplate添加负载均衡注解:
    1
    2
    3
    4
    5
    @Bean
    @LoadBalanced //负载均衡注解
    public RestTemplate restTemplate(){
    return new RestTemplate();
    }

Ribbon负载均衡

负载均衡流程

[Pasted image 20231010151331.png]

负载均衡策略

Ribbon的负载均衡规则是一个叫做IRule的接口来定义的,每一个子接口都是一个规则:
[Pasted image 20231010162016.png]

[Pasted image 20231010162039.png]

通过定义IRule实现可以修改负载均衡规则,有两种方式:

  1. 代码方式:在order-service中的OrderApplication类中,定义一个 新的IRule:
    1
    2
    3
    4
    @Bean
    public IRule randomRule(){
    return new RandomRule();
    }
  2. 配置文件方式:在order-service的application.yml文件中,添加新的配置也可以修改规则:
    1
    2
    3
    userservice:
    ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule #负载均衡规则

饥饿加载

Ribbon默认是采用懒加载,及第一次访问时才会创建LoadBalanceClient,请求时间会很长。
而饥饿加载则会在项目启动时创建,降低第一次访问的耗时,通过下面配置开启饥饿加载:

1
2
3
4
ribbon:
eager-load:
enabled: true
clients: userservice #指定对userservice这个服务器饥饿加载

Nacos注册中心

服务注册到Nacos

  1. 在cloud-demo父工程中添加spring-cloud-alibaba的管理依赖:
    1
    2
    3
    4
    5
    6
    7
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies<artifactId>
    <version>2.2.5。RELEASE</version>
    <type>pom</type>
    <scope>import</scope>
    </dependency>
  2. 注释掉order-service和user-service中原有的eureka依赖。
  3. 添加nacos的客户端依赖:
    1
    2
    3
    4
    5
    <!--nacos客户端依赖 -->
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
  4. 修改user-service&order-service中的application.yml文件,注释eureka地址,添加nacos地址:
    1
    2
    3
    4
    spring:
    cloud:
    nacos:
    server-addr: localhost:8848

服务集群属性

  1. 修改application.yml,添加如下内容:
    1
    2
    3
    4
    5
    6
    spring:
    cloud:
    nacos:
    server-addr: localhost:8848
    discovery:
    cluster-name: HZ # 配置集群名称,也就是机房位置,例如:HZ,杭州
  2. 在Nacos控制台可以看到集群变化:
    [Pasted image 20231011153258.png]

根据集群负载均衡

  1. 修改order-service中的application.yml,设置集群为HZ:
    1
    2
    3
    4
    5
    6
    spring:
    cloud:
    nacos:
    server-addr: localhost:8848
    discovery:
    cluster-name: HZ # 配置集群名称,也就是机房位置
  2. 然后在order-service中设置负载均衡的Rule为NacosRule,这个规则优先会寻找与自己同集群的服务:
    1
    2
    3
    userservice:
    ribbon:
    NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule #负载均衡规则
  3. 注意将user-service的权重都设置为1

根据权重负载均衡

实际部署中会出现这样的场景:

  • 服务器设备性能有差异,部分实例所在机器性能较好,另一些较差,我们希望性能好的机器承担更多的用户请求
    Nacos提供了权重配置来控制访问频率,权重越大则访问频率越高
  1. 在Nacos控制台可以设置实例的权重值,首先选中实例后面的编辑按钮
    [Pasted image 20231011164637.png]
  2. 将权重设置为0.1,测试可以发现8081被访问到的频率大大降低
    [Pasted image 20231011164858.png]
  3. 权重设置为0则完全不会被访问

环境隔离-namespace

Nacos中服务存储和数据存储的最外层都是一个名为namespace的东西,用来做最外层隔离
[Pasted image 20231012085904.png]

  1. 在Nacos控制台可以创建namespace,用来隔离不同环境
    [Pasted image 20231012110758.png]
  2. 然后填写一个新的命名空间信息:
    [Pasted image 20231012110848.png]
  3. 保存后会在控制台看到这个命名空间的id:
    [Pasted image 20231012111026.png]
  4. 修改order-service的application.yml,添加namespace:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    spring:  
    datasource:
    url: jdbc:mysql://localhost:3306/cloud_user?useSSL=false&characterEncoding=utf-8&serverTimezone=UTC
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
    application:
    name: userservice
    cloud:
    nacos:
    server-addr: localhost:8848 #nacos服务地址
    discovery:
    cluster-name: SH #上海
    # 命名空间添ID
    namespace: 78523f49-6663-4d0d-b5a4-9d4cada9023e
  5. 重启order-service后,再来查看控制台:
    [Pasted image 20231012162256.png]
    [Pasted image 20231012162306.png]
  6. 此时访问order-service,因为namespace不同,会导致找不到userservice,控制台会报错
    [Pasted image 20231012162532.png]

nacos注册中心细节分析

[Pasted image 20231015203656.png]

临时实例和非临时实例

服务注册到Nacos时,可以选择注册为临时或非临时实例,通过以下的配置来设置:

1
2
3
4
5
6
spring:
cloud:
nacos:
server-addr: localhost:8848
discovery:
ephemeral: false # 是否为临时实例 true 是临时实例 false 是非临时实例 默认true

总结

  1. Nacos与eureka的共同点
    1. 都支持服务注册饥和服务拉取
    2. 都支持服务提供者心跳方式组偶健康检测
  2. Nacos与Eureka的区别
    1. Nacos支持服务端主动检测提供者状态:临时实例采用心跳模式,非临时实例采用主动检测模式
    2. 临时实例心跳不正常会被剔除,非临时实例则不会被踢出
    3. Nacos支持服务列表变更的消息推送模式,服务列表更新更及时
    4. Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式:Eureka采用AP方式

Nacos配置管理

统一配置管理

  1. 在Nacos中添加配置信息:
    [Pasted image 20231015210554.png]

  2. 在弹出表单中填写配置信息:
    [Pasted image 20231015211126.png]

配置获取的步骤如下:
[Pasted image 20231015211414.png]

  1. 引入Nacos的配置管理客户端依赖
    1
    2
    3
    4
    5
    <!-- nacos配置管理依赖 -->  
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>
  2. 在userservice中的resource目录添加一个bootstrap.yml文件,这个文件时引导文件,优先级高于application.yml:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    spring:  
    application:
    name: userservice
    profiles:
    active: dev # 开发环境,这里是dev
    cloud:
    nacos:
    server-addr: localhost:8848 # Nacos地址
    config:
    file-extension: yaml # 文件后缀名
    我们在user-service中将pattern.dateformat这个属性注入到UserController中做测试:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @RestController  
    @RequestMapping("/user")
    public class UserController {

    @Autowired
    private UserService userService;

    // 注入nacos中的配置属性
    @Value("${pattern.dateformat}")
    private String dateformat;

    @GetMapping("now")
    public String now(){
    return LocalDateTime.now().format(DateTimeFormatter.ofPattern(dateformat));
    }

配置自动刷新

Nacos中的配置文件变更后,微服务无需重启就可以感知。不过需要通过下面两种配置实现:

  • 方式一:在@Value注入的变量所在类上添加注解@RefreshScope
    1
    2
    3
    4
    5
    6
    7
    8
    @Slf4j  
    @RestController
    @RequestMapping("/user")
    @RefreshScope
    public class UserController {

    @Value("${pattern.dateformat}")
    private String dateformat;
  • 方式二:使用@ConfigurationProperties注解
    1
    2
    3
    4
    5
    6
    @Data  
    @Component
    @ConfigurationProperties(prefix = "pattern")
    public class PatternProperties {
    private String dateformat;
    }

多环境配置共享

微服务启动时会从nacos读取多个配置文件:

  • [spring.application.name]-[spring.profiles.active].yaml,例如:userseevice-dev.yaml
  • [spring.application.name].yaml,例如:userservice.yaml
    无论profile如何变化,[spring.application.name].yaml这个文件一定会加载,因此多环境共享配置可以写入这个文件
    ![[Pasted image 20231016100557.png]]

多种配置的优先级:

  • 服务名-profile.yaml > 服务名称.yaml > 本地配置

Nacos集群搭建

集群搭建步骤:

  1. 搭建MySQL集群并初始化数据库表
  2. 下载解压nacos
  3. 修改集群配置(节点信息)、数据库配置
  4. 分别启动多个nacos节点
  5. nginx反向代理

http客户端Feign

Feign替换RestTemplate

RestTemplate方式调用存在的问题

先来看看我们以前利用RestTemplate发起远程调用的代码:

1
2
String url = "http://userservice/user/" + order.getUserId();
User user = restTemplate.getForObject(url, User.class);

存在下面的问题:

  • 代码可读性差,编程体验不统一
  • 参数复杂URL难以维护

Feign的介绍

Feign是一个声明式的http客户端,官方地址:https://github.com/OpenFeign/feign
其作用就是帮助我们优雅的实现http请求的发送,解决上面的问题。

定义和使用Feign客户端

使用Feign的步骤如下:

  1. 引入依赖:
    1
    2
    3
    <dependency> 
    <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
  2. 在order-service的启动类添加注解开启Feign的功能:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @EnableFeignClients //开启Feign
    @MapperScan("cn.itcast.order.mapper")
    @SpringBootApplication
    public class OrderApplication {

    public static void main(String[] args) {
    SpringApplication.run(OrderApplication.class, args);
    }
    }
  3. 编写Feign客户端:
    1
    2
    3
    4
    5
    @FeignClient("userservice")
    public interface UserClient {
    @GetMapping("/user/{id}")
    user findById(@PathVariable("id") Long id);
    }
    主要是基于SPringMVC的注解来声明远程调用的信息,比如:
  • 服务名称:userservice
  • 请求方式:GET
  • 请求路径:/user/{id}
  • 请求参数:Long id
  • 返回值类型:User
  1. 用Feign客户端代替RestTemplate
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Autowired  
    private UserClient userClient;
    public Order queryOrderById(Long orderId) {
    // 1.查询订单
    Order order = orderMapper.findById(orderId);
    //2.用Feign远程调用
    User user = userClient.findById(order.getUserId());
    //3.封装user到order
    order.setUser(user);
    // 4.返回
    return order;
    }

自定义Feign的配置

Fegin运行自定义配置来覆盖默认配置,可以修改的配置如下:
[Pasted image 20231016110655.png]
一般我们需要配置的就是日志级别。

配置Feign日志有两种方式:

方式一:配置文件方式

  1. 全局生效:
    1
    2
    3
    4
    5
    feign:
    client:
    config:
    default: #这里用default就是全局配置,如果是写服务名称,则是针对某个微服务的配置
    loggerLevel: FULL #日志级别
  2. 局部生效:
    1
    2
    3
    4
    5
    feign:
    client:
    config:
    userservice: #这里用default就是全局配置,如果是写服务名称,则是针对某个微服务的配置
    loggerLevel: FULL #日志级别
    配置Feign日志的方式二:java代码方式,需要先声明一个Bean:
    1
    2
    3
    4
    5
    6
    public class FeignClientConfiguration{
    @Bean
    public Logger.Level.feignLogLevel(){
    return Logger.Level.BASIC;
    }
    }
  3. 而后如果是全局配置,则把它放到@EnableFeignClients这个注解中:
    1
    @EnableFeignClients(defaultConfiguration = FeignClientConfiguration.class)
  4. 如果是局部配置,则把它放到@FeignClient这个注解中:
    1
    @FeignClient(value = "userservice",configuration = FeignClientConfiguration.class)

Feign的性能优化

Feign底层的客户端实现:

  • URLConnection:默认实现,不支持连接池
  • Apache HttpClient:支持连接池
  • OKHttp:支持连接池
    因此优化Feign的性能主要包括:
  1. 使用连接池代替默认的URLConnection
  2. 日志级别,最好用basic或none

Feign的性能优化-连接池配置

Fegin添加HttpClient的支持:
引入依赖:

1
2
3
4
<dependency> 
<groupId>io.github.openfeign</groupId>
<artifactId>feign-httpclient</artifactId>
</dependency>

配置连接池:

1
2
3
4
5
6
7
8
9
feign:
client:
config:
default: #default全局配置
loggerLevel: BASIC #日志级别,BASIC就是基本的请求和相应信息
httpclient:
enabled: ture # 开启feign对HttpClient的支持
max-connections: 200 #最大的连接数
max-connections-per-route: 50 # 每个路径的最大连接数

Fegin的最佳实践

方式一(继承):给消费者的FeignClient和提供者的controller定义统一的父接口作为标准
缺点:

  • 服务紧耦合
  • 父接口参数列表中的映射不会被继承
    1
    2
    3
    4
    public interface UserAPI{
    @GetMapping("/user/{id}")
    User findById(@PathVariable("id") Long id);
    }
    1
    2
    @FeignClient(value = "userservice")
    public interface UserClient extends UserAPI{}
    1
    2
    3
    4
    @RestController
    public class UserController implements USerAPI{

    }
    方式二(抽取):将FeignClient抽取为独立模块,并把接口有关的POJO,默认的Feign配置都放到这个模块中,提供给所有消费者使用
    [Pasted image 20231016143704.png]

抽取FeignClient

实现最佳实践方式二的步骤如下:

  1. 首先创建一个module,命名为feign-api,然后引入feign的starter依赖
  2. 将order-service中编写的UserClient、User、DefaultFeignCOnfiguration都复制到feign-api项目中
  3. 在order-service中引入feign-api的依赖
  4. 修改order-service中的所有与上述三个组件有关的import部分,改成导入feign-api中的包
  5. 重启测试
    当定义的FeignClient不在SpringBootApplication的扫描包范围时,这些FeignClient无法使用。有两种方式解决:
    方式一:指定FeignClient所在包
    1
    @EnableFeignClients(basePackages = "cn.itcase.feign.clients")
    方式二:指定FeignClient字节码
    1
    @EnableFeignClients(clicents = {UserClient.class})

统一网关Gateway

为什么需要网关

网关功能:

  • 身份认证和权限校验
  • 服务路由、负载均衡
  • 请求限流

网关的技术实现

在SpringCloud中网关的实现包括两种:

  • gateway
  • zuul
    Zuul是基于Servlet的实现,属于阻塞式编程。而SpringCloudGateway则是基于Spring5中提供的WebFlux,属于响应式编程的实现,具备更好的性能。

搭建网关服务

搭建网关服务的步骤:

  1. 创建新的module,引入SpringCloudGateway的依赖和nacos的服务发现依赖:
    1
    2
    3
    4
    5
    6
    7
    8
    <!-- 网关依赖 -->
    <dependency>
    <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId>
    </dependency>
    <!-- nacos服务发现依赖 -->
    <dependency>
    <groupId>org.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
  2. 编写路由配置及nacos地址
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    server:
    port: 10010 # 网关端口
    spring:
    application:
    name: gateway #服务名称
    cloud:
    nacos:
    server-addr: localhost:8848 #nacos地址
    gateway:
    routes: # 网关路由配置
    - id: user-service # 路由id,自定义,只要唯一即可
    # uri http://127.0.0.1:8081 #路由的目标地址 http就是固定地址
    uri: lb://userservice # 路由的目标地址 lb就是负载均衡,后面跟服务名称
    predicates: # 路由断言,也就是判断请求是否符合路由规则的条件
    - Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求

路由断言工厂Route Predicate Factoy

网关路由可以配置的内容包括:

  • 路由id:路由唯一标识

  • uri:路由目的地。支持lb和http两种

  • predicates:路由断言,判断请求是否符合要求,符合则转发到路由目的地

  • filters:路由过滤器,处理请求或响应

  • 我们在配置文件中写的断言规则只是字符串,这些字符串会被Predicate Factory读取并处理,转变为路由判断的条件

  • 例如Path=/user/** 是按照路径匹配,这个规则是由org.springframework.cloud.gateway.handler.predicate.PathRoutePredicateFactory类来处理的

  • 像这样的断言工厂在SpringCloudGateway还有十几个

Spring提供了11种基本的Predicate工厂:
[Pasted image 20231016164528.png]

路由过滤器 GatewayFilter

GatewayFilter是网关中提供的一种过滤器,可以对进行网关的请求和微服务返回的响应做处理:
[Pasted image 20231016173504.png]

过滤器工厂 GatewayFilterFactory

Spring提供了31种不同的路由过滤器工厂,例如:
[Pasted image 20231016173759.png]

案例:给所有进入userservice的请求添加一个请求头

给所有进入userservice的请求添加一个请求头:Truth=itcast is freaking awesome!
实现方式:在gateway中修改application.yml文件,给userservice的路由添加过滤器:

1
2
3
4
5
6
7
8
9
10
spring:
cloud:
gateway:
routes: #网关路由配置
- id: user-service
uri: lb//userservice
predicates:
- Path=/user/**
filters: # 过滤器
- AddRequestHeader=Truth=itcast is freaking awesome! #添加请求头

默认过滤器

如果要对所有的路由都生效,则可以将过滤工厂写到Default下。格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
server:  
port: 10010
spring:
application:
name: gateway
cloud:
nacos:
server-addr: localhost:8848 #nacos地址
gateway:
routes:
- id: user-service # 路由标识,必须唯一
uri: lb://userservice # 路由的目标地址
predicates: # 路由断言,判断请求是否符合规则
- Path=/user/** # 路径断言,判断路径是否以/user开头,如果是则符合
- id: order-service
uri: lb://orderservice
predicates:
- Path=/order/**
default-filters: # 默认过滤器,会对所有的路由请求都生效

全局过滤器 GlobalFilter

全局过滤器的作用也是处理一切进入网关的请求和微服务响应,与GatewayFilter的作用一样。
区别在于GatewayFilter通过配置定义,处理逻辑是固定的。而GlobalFilter的逻辑需要自己写代码实现。
定义方式是实现GlobalFilter接口。

1
2
3
public interface GlobalFilter{
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

步骤一:自定义过滤器
自定义类,实现GlobalFilter接口,添加@Order注解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Order(-1)  
@Component
public class AuthorizeFilter implements GlobalFilter {
@Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//1.获取请求参数
ServerHttpRequest request = exchange.getRequest();
MultiValueMap<String, String> queryParams = request.getQueryParams();
//2.获取参数中的authorization 参数
String auth = queryParams.getFirst("authorization");
//3.判断参数值是否等于admin
if("admin".equals(auth)){
//4.是,放行
return chain.filter(exchange);
}
//5.否,拦截
//5.1.设置状态码
exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
//5.2.拦截请求
return exchange.getResponse().setComplete();
}
}

过滤器执行顺序

请求进入网关会碰到三类过滤器:当前路由的过滤器、DefaultFilter、GlobalFilter
请求路由后,会将当前路由过滤器和DefaultFilter、GlobalFilter,合并到一个过滤器链(集合)中,排序后依次执行每个过滤器

  • 每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前
  • GlobalFilter通过实现Ordered接口,或者添加@Order注解来指定order值,由我们自己指定
  • 路由过滤器和defaultFilter的order由Spring指定,默认是按照声明顺序从1递增
  • 当过滤器的order值一样时,会按照defaultFilter > 路由过滤器 > GlobalFilter的顺序执行
    可以参考下面几个类的源码来查看:
    [Pasted image 20231016205931.png]

跨域问题处理

跨域:域名不一致就是跨域,主要包括:

网关处理跨域采用的同样是CORS方案,并且只需要简单配置即可实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spring:   
cloud:
nacos:
server-addr: localhost:8848 #nacos地址
gateway:
globalcors:
add-to-simple-url-handler-mapping: true #解决options请求被拦截问题
corsConfigurations:
'[/**]':
allowedOrigins: #允许那些网站的跨域请求
- "http://localhost:8090"
- "http://www.leyou.com"
allowedMethods: #允许的跨域ajax的请求方式
- "GET"
- "POST"
- "DELETE"
- "PUT"
- "OPTIONS"
allowedHeaders: "*" # 允许在请求中携带的头信息
allowCredentials: true #是否允许携带cookie
maxAge: 360000 #这次跨域检测的有效期

初始Docker

项目部署的问题

大型项目组件较多,运行环境也较为复杂,部署时会碰到一些问题:

  • 依赖关系复杂,容易出现兼容性问题
  • 开发、测试、生产环境有差异
    [Pasted image 20231017074555.png]

Docker

Docker如何解决依赖的兼容问题的?

  • 将应用的Libs(函数库)、Deps(依赖)、配置与应用一起打包
  • 将每个应用放到一个隔离容器去运行,避免互相干扰
    不同环境的操作系统不同,Docker如何解决?我们先来了解下操作系统架构
    ![[Pasted image 20231017075912.png]]
    内核与硬件交互,提供操作硬件的指令
    系统应用封装内核指令为函数,便于程序员调用
    用户程序基于系统函数库实现功能
    Ubuntu和CentOS都是基于Linux内核,只是系统应用不同,提供的函数库有差异

Docker如何解决不同系统环境的问题?

  • Docker将用户程序与所需要调用的系统(比如Ubuntu)函数库一起打包
  • Docker运行到不同操作系统时,直接基于打包的库函数,借助于操作系统的Linux内核来运行
    Docker如何解决大型项目依赖关系复杂,不同组件依赖的兼容性问题?
  • Docker允许开发中将应用、依赖、函数库、配置一起打包,形成可移植镜像
  • Docker应用运行在容器中,使用沙箱机制,相互隔离
    Docker如何解决开发、测试、生存环境有差异的问题
  • Docker镜像中包含完整运行环境,包括系统函数库,仅依赖系统的Linux内核,因此可以再任意Linux操作系统上运行
    Docker是一个快速交付应用、运行应用的技术:
  1. 可以将程序及其依赖、运行环境一起打包为一个镜像,可以迁移到任意Linux操作系统
  2. 运行时利用沙箱机制形成隔离容器,各个应用互不干扰
  3. 启动、移除都可以通过一行命令完成,方便快捷

Docker与虚拟机

虚拟机(virtual machine)是在操作系统重模拟硬件设备,然后运行另一个操作系统,比如在Windows系统里面运行Ubuntu系统,这样就可以运行任意的Ubuntu应用了
[Pasted image 20231017081809.png]
[Pasted image 20231017081828.png]
[Pasted image 20231017081857.png]
Docker和虚拟机的差异:

  • docker是一个系统进程;虚拟机是在操作系统重的操作系统
  • docker体积小、启动速度快、性能好;虚拟机体积大、启动速度慢、性能一般

镜像和容器

镜像(Image):Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起,称为镜像
容器(Container):镜像中的应用程序运行后形成的进程就是容器,只是Docker会给容器做隔离,对外不可见

Docker和DockerHub

  • DockerHub:DockerHub是一个Docker镜像的托管平台。这样的平台称为Docker Registry
  • 国内也有类似于DockerHub的公开服务,比如网易云镜像服务、阿里云镜像库等

docker架构

Docker是一个CS架构的程序,有两部分组成:

  • 服务端(server):Docker守护进程,负责处理Docker指令,管理镜像、容器等
  • 客户端(client):通过命令或RestAPI向Docker服务端发送指令。可以再本地或远程向服务端发送指令
    [Pasted image 20231017083507.png]

Docker基本操作

镜像相关命令

  • 镜像名称一般分两部分组成:[repository]:[tag]。
  • 在没有指定tag时,默认是latest,代表最新版本的镜像
    [Pasted image 20231017100702.png]

镜像操作命令

[Pasted image 20231017100959.png]

案例:从DockerHub中拉取一个nginx镜像并查看

  1. 首先去镜像仓库搜索nginx镜像,比如DockerHub:
    [Pasted image 20231017101811.png]
  2. 根据查看到的镜像名称,拉取自己需要的镜像,通过命令:docker pull nginx
    [Pasted image 20231017101922.png]
  3. 通过命令:docker images 查看拉取到的镜像
    [Pasted image 20231017102010.png]

案例:利用docker save将nginx镜像导出磁盘,然后在通过load加载回来

步骤一:利用docker xx –help命令查看docker save和docker load的语法

步骤二:使用docker tag 创建新镜像mynginx1.0

步骤三:使用docker save导出镜像到磁盘

镜像操作有哪些?

  • docker images 查看镜像
  • docker rmi 删除
  • docker pull 拉取
  • docker push 推送
  • docker save 保存
  • docker load 加载

容器相关命令

[Pasted image 20231017105030.png]

案例:创建运行一个nginx容器

步骤一:去docker hub查看Nginx的容器运行命令

1
docker run --name containerName -p 80:80 -d nginx

命令解读:

  • docker run:创建并运行一个容器
  • --name:给容器起一个名字,比如叫做mn
  • -p:将宿主机端口与容器端口映射,冒号左侧是宿主机端口,右侧是容器端口
  • -d:后台运行容器
  • nginx:镜像名称,例如nginx
    查看容器日志的命令:
  • docker logs
  • 添加 -f 参数可以持续查看日志
    查看容器状态:
  • docker ps

案例:进入Nginx容器,修改HTML文件内容,添加“Hello”

步骤一:进入容器。进入我们刚刚创建的nginx容器的命令为:

1
docker exec -it mn bash

命令解读:

  • docker exec:进入容器内部,执行一个命令
  • -it:给当前进入的容器创建一个标椎输入、输出终端,允许我们与容器交互
  • mn:要进入的容器的名称
  • bash:进入容器后执行的命令,bash是一个linux终端交互命令

数据卷

容器与数据耦合的问题
[Pasted image 20231017113857.png]

数据卷(volume):是一个虚拟目录,指向宿主机文件系统的某个目录
[Pasted image 20231017114419.png]

操作数据卷

数据卷操作的基本语法如下:

1
docker volume[COMMAND]

docker volume命令是数据卷操作,根据命令后跟随的command来确定下一步的操作:

  • create 创建一个volume
  • inspect 显示一个或多个volume的信息
  • ls 列出所有的volume
  • prune 删除未使用的volume
  • rm 删除一个或多个指定的volume

创建一个数据卷,并查看数据卷在宿主机的目录位置

  1. 创建数据卷
    1
    docker volume create html
  2. 查看所有数据
    1
    docker volume ls
  3. 查看数据卷详细信息卷
    1
    docker volume inspect html

挂载数据卷

我们在创建容器时,可以通过-v参数来挂载一个数据卷到某个容器目录

1
2
3
4
5
docker run: 就是创建并运行容器
-- name mn: 给容器起个名字叫mn
-v html:/root/html: 把html数据卷挂载到容器内的/root/html这个目录中
-p 8080:80: 把宿主机的8080端口映射到容器内的80端口
nginx: 镜像名称

创建一个nginx容器,修改容器内的html目录内的index.html内容

需求说明:上个案例中,我们进入nginx容器内部,已经知道nginx的html目录所在位置/user/share/nginx/html,我们需要把这个目录挂载到html这个数据卷上,方便操作其中的内容。
提示:运行容器时使用-v参数挂载数据卷
步骤:

  1. 创建容器并挂载数据卷到容器内的HTML目录
    1
    docker run --name mn -v html:/usr/share/nginx/html -p 80:80 -d nginx
  2. 进入html数据卷所在位置,并修改HTML内容
    1
    2
    3
    4
    5
    6
    # 查看html数据卷的位置
    docker volume inspect html
    # 进入该目录
    cd /var/lib/docker/volumes/html/_data
    # 修改文件
    vi index.html

创建并运行一个MySQL容器,将宿主机目录直接挂载到容器

提示:目录挂载与数据卷挂载的语法是类似的:

  • -v[宿主机目录]:[容器内目录]
  • -v[宿主机文件]:[容器内文件]
    实现思路如下:
  1. 在将课前资料中的mysql.tar文件上传到虚拟机,通过load命令加载为镜像
  2. 创建目录/tmp/myql/data
  3. 创建目录/tmp/myql/conf,将课前资料提供的hmy.cnf文件上传到/tpm/myql/conf
  4. 去DockerHub查阅资料,创建并运行MySQL容器,要求:
    1. 挂载/tmp/myql/data到mysql容器内数据存储目录
    2. 挂载/tmp/myql/conf/hmy.cnf到msql容器的配置文件
    3. 设置MySQL密码

数据卷挂载的方式对比

[Pasted image 20231017160552.png]

Dcokerfile自定义镜像

镜像结构

  • 镜像是指将应用程序及其需要的系统函数库、环境、配置、依赖打包而成
  • 入口(Entrypoint)
    • 镜像运行入口,一般是程序启动的脚本和参数
  • 层(Layer)
    • 在BaseImage基础上添加安装包、依赖、配置等,每次操作都形成新的一层
  • 基础镜像(BaseImage)
    • 应用依赖的系统函数库、环境、配置、文件等

什么是Dockerfile

Docekrfile就是一个文本文件,其中包含一个个的指令(Instruction),用指令来说明要执行什么操作来构建镜像。每一个指令都会形成一层Layer
[Pasted image 20231017162228.png]

案例

[Pasted image 20231017163840.png]
[Pasted image 20231017163859.png]

DockerCompose

什么是DockerCompose

  • Docker Compose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器!
  • Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    version: "3.8"

    services:
    mysql:
    image: mysql:8.0
    environment:
    MYSQL_ROOT_PASSWORD: 123
    volumes:
    - /tmp/mysql/data:/var/lib/mysql
    - /tmp/mysql/conf/hmy.cnf:/etc/mysql/conf.d/hmy.cnf
    web:
    build: .
    ports:
    - 8090: 8090

Docker镜像仓库

常见镜像仓库服务

镜像仓库(Docker Registry)有公共的和私有的两种形式:

  • 公共仓库:例如Docker官方的Docker Hub,国内也有一些云服务商提供类似于Docker Hub的公开服务,比如网易云镜像服务、DaoCloud镜像服务、阿里云镜像服务等
  • 除了使用公开仓库外,用户还可以再本地搭建私有Docker Registry。企业自己的镜像最好是采用私有Docker Registry来实现

在私有镜像仓库推送或拉取镜像

推送镜像到私有镜像服务必须先tag,步骤如下:

  1. 重新tag本地镜像,名称前缀为私有仓库的地址:192.168.140.130:8080/
    1
    docker tag nginx:latest 192.168.140.130:8080/nginx:1.0
  2. 推送镜像
    1
    docker push 192.168.140.130:8080/nginx:1.0
  3. 拉取镜像
    1
    docker pull 192.168.140.130:8080/nginx:1.0

初识MQ

同步通讯和异步通讯

[Pasted image 20231018092107.png]

同步调用的问题

微服务间基于Feign的调用就属于同步方式,存在一些问题
[Pasted image 20231018092747.png]

  1. 耦合度高
    • 每次加入新的需求,都要修改原来的代码
  2. 性能下降
    • 调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
  3. 资源浪费
    • 调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
  4. 级联失败
    • 如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障
      同步调用的优点:
  • 时效性较强,可以立即得到结果
    同步调用的问题:
  • 耦合度高
  • 性能和吞吐能力下降
  • 有额外的资源消耗
  • 有级联失败问题

异步调用方案

异步调用常见实现就是事件驱动模式
[Pasted image 20231018094900.png]

事件驱动优势

优势一:服务解耦
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,不担心级联失败问题
优势四:流量削峰

异步通信的缺点

缺点一:依赖于Broker的可靠性、安全性、吞吐能力
缺点二:架构复杂了,业务没有明显的流程线,不好追踪管理

什么是MQ

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker
[Pasted image 20231018100927.png]

RabbitMQ快速入门

RabbitMQ概述

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:(https://www.rabbitmq.com/)

RabbitMQ的结构和概念
[Pasted image 20231018105546.png]
概念:

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

常见消息模型

MQ的官方文档中给出了5个MQ的Demo示例,对应了几种不同的用法:

  • 基本消息队列(BasicQueue)
  • [Pasted image 20231018110645.png]
  • 工作消息队列(WorkQueue)
  • [Pasted image 20231018110658.png]
  • 发布订阅(Publish、Subscribe),有根据交换机类型不同分为三种:
    • Fanout Exchange:广播
    • [Pasted image 20231018110714.png]
    • Direct Exchange:路由
    • [Pasted image 20231018110726.png]
    • Topic Exchange:主题
    • [Pasted image 20231018110734.png]

HelloWorld案例

官方的HellowWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到对类queue
  • queue:消息队列里,负责接收并缓存消息
  • consumer:订阅队列,处理队列中的消息
    [Pasted image 20231018111117.png]

完成官方Demo中的hello world案例

实现步骤:

  • 导入课前资料中的demo工程
  • 运行publisher服务中的测试类PublisherTest中的测试方法testSendMessage()
  • 查看RabbitMQ控制台的消息
  • 启动consumer服务,查看是否能接收消息
    基本消息队列的消息发送流程:
  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息
    基本消息队列的消息接收流程:
  5. 建立connection
  6. 创建channel
  7. 利用channel声明队列
  8. 定义consumer的消费行为handleDelivery()
  9. 利用channel将消费者与队列绑定

SpringAMQP

什么SpringAMQP

SpringAmqp的官方地址:(https://spring.io/pojects/spring-amqp)
[Pasted image 20231018114602.png]
[Pasted image 20231018114610.png]

利用SpringAMQP实现HelloWorld中的基础消息队列功能

流程如下:

  1. 在父工程中引入spring-amqp的依赖
  2. 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
  3. 在consumer服务中编写消费逻辑,绑定simple.queue这个队列
    步骤1:引入AMQP依赖
    因为publisher和consumer服务都需要amqp依赖,因此这里吧依赖直接放到父工程mq-demo中:
    1
    2
    3
    4
    5
    <!-- AMQP依赖,包含RabbitMQ -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    步骤2:在publisher中编写测试方法,向simple.queue发送消息
  4. 在publisher服务中编写application.yml,添加mq连接信息:
    1
    2
    3
    4
    5
    6
    7
    spring:
    rabbitmq:
    host: 192.168.140.130 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: huanji # 用户名
    password: root # 密码
  5. 在publisher服务中新建一个测试类,编写测试方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RunWith(SpringRunner.class)  
    @SpringBootTest
    public class SpringAmqpTest {
    @Autowired private RabbitTemplate rabbitTemplate;
    @Test
    public void testSimpleQueue(){
    String queueName = "simple.queue";
    String message = "hello,spring amqp";
    rabbitTemplate.convertAndSend(queueName,message);
    }

    }
    步骤3:在consumer中编写消费逻辑,监听simple.queue
  6. 在consumer服务中编写application.yml,添加mq连接信息:
    1
    2
    3
    4
    5
    6
    7
    spring:  
    rabbitmq:
    host: 192.168.140.130 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: huanji # 用户名
    password: root # 密码
  7. 在consumer服务中新建一个类,编写消费逻辑:
    1
    2
    3
    4
    5
    6
    7
    @Component  
    public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException{
    System.out.println("spring 消费者接收到消息 : 【"+ msg +"】");
    }
    }

Work Queue工作队列

Work queue,工作队列,可以提高消息处理速度,避免队列消息堆积

模拟WorkQueue,实现一个队列绑定多个消费者

基本思路如下:

  1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
  2. 在consumer服务中定义两个消息监听者,都监听simplq.queue队列
  3. 消费者1每秒处理50条消息,消费者2每秒处理10条信息

消费预取限制

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限:

1
2
3
4
5
6
7
8
9
10
spring:  
rabbitmq:
host: 192.168.140.130 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: huanji # 用户名
password: root # 密码
listenter:
simple:
prefetch:1 #每次只能获取一条消息,处理完成才能获取下一个消息

发布(Publish)、订阅(Subscribe)

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)
常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题
    ![[Pasted image 20231019074657.png]]
    注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

发布订阅-Fanout Exchange

Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue
[Pasted image 20231019075136.png]

利用SpringAMQP演示FanoutExchange的使用

实现思路如下:

  1. 在consumer服务中,利用代码声明队列、交换机,并将两者绑定
  2. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  3. 在publisher中编写测试方法,向huanji.fanout发送消息
    步骤1:在consumer服务声明Exchange、Queue、Binding
    SpringAMQP提供了声明交换机、队列、绑定关系的API,例如:
    [Pasted image 20231019075950.png]
    在consumer服务常见一个类,添加@Configuration注解,并声明FanoutExchange、Queue和绑定关系对象Binding,代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    @Configuration  
    public class FanoutConfig {
    // huanji.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
    return new FanoutExchange("huanji.fanout");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
    return new Queue("fanout.queue1");
    }
    //绑定队列1到交换机
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
    return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }
    //fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
    return new Queue("fanout.queue2");
    }
    //绑定队列2到交换机
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
    return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
    }
    步骤2:在consumer服务声明两个消费者

    在consumer服务的SpringRabbitListener类中,添加两个方法,分别监听fanout.queue1和fanout.queue2:
    1
    2
    3
    4
    5
    6
    7
    8
    @RabbitListener(queues = "fanout.queue1")  
    public void listenFanoutQueue1(String msg){
    System.err.println("消费者接收到fanout.queue1的消息 : 【"+ msg +"】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){
    System.err.println("消费者接收到fanout.queue2的消息 : 【"+ msg +"】");
    }
    步骤3:在publisher服务发送消息到FanoutExchange
    在publisher服务的SpringAmqpTest类中添加测试方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test  
    public void testSendFanoutExchange() {
    // 交换机名称
    String exchangeName = "huanji.fanout";
    // 消息
    String message = "hello, every one!";
    // 发送消息
    rabbitTemplate.convertAndSend(exchangeName,"",message);
    }

发送订阅-DirectExchange

Direct Exchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
    [Pasted image 20231019101904.png]

利用SpringAMQP演示DirectExchange的使用

实现思路如下:

  1. 利用@RabbitListener声明Exchange、Queue、RoutingKey
  2. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  3. 在publisher中编写测试方法,向huanji.direct发送消息
    步骤1:在consumer服务声明Exchange、Queue
  4. 在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
  5. 并利用@RabbitListener声明Exchange、Queue、RoutingKey
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @RabbitListener(queues = "fanout.queue2")  
    public void listenFanoutQueue2(String msg) throws InterruptedException{
    System.err.println("消费者接收到fanout.queue2的消息 : 【"+ msg +"】");
    }
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "huanji.direct",type = ExchangeTypes.DIRECT),
    key = {"red","blue"}
    ))
    public void listenerDirectQueue1(String msg){
    System.out.println("消费者收到direct.queue1的消息:【" + msg + "】");
    }
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "huanji.direct",type = ExchangeTypes.DIRECT),
    key = {"red","yellow"}
    ))
    public void listenerDirectQueue2(String msg){
    System.out.println("消费者收到direct.queue2的消息:【" + msg + "】");
    }
    步骤2:在publisher服务发送消息到DirectExchange
    在publisher服务的SpringAmqpTest类中添加测试方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    public void testDirectExchange(){
    //队列名称
    String exchangeName = "huanji.direct";
    //消息
    String messge = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"
    //发送消息,参数依次为:交换机名称,RoutingKey,消息
    rabbitTemplate.convertAndSend(exchangeName,"red",message);
    }

发布订阅-TopicExchange

TopicExchange与DIrectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
Queue与Exchange指定BindingKey时可以使用通配符:
#: 代指0个或多个单词
*: 代指一个单词
[Pasted image 20231019114704.png]

利用SpringAMQP演示TopicExchange的使用

实现思路:

  1. 并利用@RabbitListener声明Exchange、Queue、RoutingKey
  2. 在consumer服务中,编写两个消息者方法,分别监听topic.queue1和topic.queue2
  3. 在publisher中编写测试方法,向huanji.topic发送消息
    步骤1:在consumer服务声明Exchange、Queue
  4. 在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2,
  5. 并利用@RabbitListener声明Exchange、Queue、RoutingKey
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @RabbitListener(bindings = @QueueBinding(  
    value = @Queue(name = "topic.queue1"),
    exchange = @Exchange(name = "huanji.topic",type = ExchangeTypes.TOPIC),
    key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
    System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "huanji.topic",type = ExchangeTypes.TOPIC),
    key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
    System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
    步骤2:在publisher服务发送消息到TopicExchange
    在publisher服务的SpringAmqpTest类中添加测试方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    public void testTopicExchange(){
    //队列名称
    String exchangeName = "huanji.topic";
    //消息
    String messge = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"
    //发送消息,参数依次为:交换机名称,RoutingKey,消息
    rabbitTemplate.convertAndSend(exchangeName,"china.news",message);
    }

SpringAMQP-消息转换器

测试发送Object类型消息

说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
我们在consumer中利用@Bean声明一个队列:

1
2
3
4
@Bean
public Queue objectMessageQueue(){
return new Queue("object.queue");
}

在publisher中发送消息以测试:

1
2
3
4
5
6
7
8
9
@Test
public void testSendMap() throws InterruptedException{
//准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name","Jack");
msg.put("age",21);
//发送消息
rabbitTemplate.convertAndSend("object.queue",msg);
}

消息转换器

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter类型的Bean即可。推荐用JSON方式序列化,步骤如下:

  • 我们在publisher服务引入依赖
    1
    2
    3
    4
    <dependency>  
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    </dependency>
  • 我们在consumer服务定义MessageConverter:
    1
    2
    3
    4
    @Bean  
    public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
    }
  • 然后定义一个消费者,监听object.queue队列并消费消息:
    1
    2
    3
    4
    @RabbitListener(queues = "object.queue")
    public void listenObjectQueue(Map<String,Object> msg){
    System.out.println("收到消息:【" + msg + "】");
    }

初识elasticsearch

什么是elasticsearch

elasticsearch是一款非常强大的开源搜索引擎,可以帮助我们从海量数据中快速找到需要的内容
elasticsearch结合kibana、Logstash、Beats,也就是elastic stack(ELK)。被广泛应用在日志数据分析、实时监控等领域
elasticsearch是elastic stack的核心,负责存储、搜索、分析数据。
[Pasted image 20231019154919.png]
[Pasted image 20231019155111.png]
[Pasted image 20231019154941.png]

elasticsearch的发展

Lucene是一个Java语言的搜索引擎类库,是Apache公司的顶级项目,由DougCutting于1999年研发。
官网地址:(https://lucene.apache.org/)。
Lucene的优势:

  • 易扩展
  • 高性能(基于倒排索引)
    Lucene的缺点:
  • 只限于Java语言开发
  • 学习曲线陡峭
  • 不支持水平扩展

2004年Shay Banon基于Lucene开发了Compass
2010年Shay Banon重写了Compass,取名为Elasticsearch。
官网地址:(https://www.elastic.co/cn/)
目前最新的版本是:8.10.4
相比于lucene,elasticsearch具备下列优势:

  • 支持分布式,可水平扩展
  • 提供Restful接口,可被任何语言调用

为什么学习elasticsearch?

搜索引擎技术排名:

  1. Elasticsearch:开源的分布式搜索引擎
  2. Splunk:商业项目
  3. Solr:Apache的开源搜索引擎

正向索引和倒排索引

传统数据库(如MySQL采用正向索引,例如给下表(tb_goods)中的id创建索引):
[Pasted image 20231019161233.png]
elasticsearch采用倒排索引:

  • 文档(document):每条数据就是一个文档
  • 词条(term):文档按照语义分成的词语
    [Pasted image 20231019161720.png]
    [Pasted image 20231019161906.png]

文档

elasticsearch是面向文档存储的,可以是数据库中的一条商品数据,一个订单信息。
文档数据会被序列化为json格式后存储在elasticsearch中
[Pasted image 20231019162626.png]

索引(index)

  • 索引(index):相同类型的文档的集合
  • 映射(mapping):索引中文档的字段约束信息,类似表的结构约束
    [Pasted image 20231019162834.png]

概念对比

[Pasted image 20231019163113.png]

架构

MySQL:擅长事务类型操作,可以确保数据的安全和一致性
Elasticsearch:擅长海量数据的搜索、分析、计算
[Pasted image 20231019163806.png]

安装elasticsearch、kibana

运行Elasticsearch

1
2
3
4
5
6
7
8
9
10
11
docker run -d \
--name es \
-e "ES_JAVA_OPTS=-Xms1024m -Xmx1024m" \
-e "discovery.type=single-node" \
-v es-data:/usr/share/elasticsearch/data \
-v es-plugins:/usr/share/elasticsearch/plugins \
--privileged \
--network es-net \
-p 9200:9200 \
-p 9300:9300 \
elasticsearch:7.12.1

运行kibana

1
2
3
4
5
6
docker run -d \
--name kibana \
-e ELASTICSEARCH_HOSTS=http://es:9200 \
--network=es-net \
-p 5601:5601 \
kibana:7.12.1

分词器

es在创建倒排索引时需要对文档分词;在搜索时,需要对用户输入内容分词。但默认的分词规则对中文处理并不友好。
我们在kibana的DevTools中测试:

1
2
3
4
5
POST /_analyze
{
"analyzer": "standard",
"text": ["黑马程序员学习java太棒了!"]
}

语法说明:

ik分词器-模式

  • ik_smart:智能切分,粗粒度
    1
    2
    3
    4
    5
    POST /_analyze
    {
    "analyzer": "ik_smart",
    "text": ["蚌埠住了"]
    }
  • ik_max_word:最细切分,细粒度
    1
    2
    3
    4
    5
    POST /_analyze
    {
    "analyzer": "ik_max_word",
    "text": ["蚌埠住了"]
    }

ik分词器-扩展词库

要扩展ik分词器的词库,只需要修改一个ik分词器目录中的config目录中的IKAnalyzer.cfg.xml文件:

1
2
3
4
5
6
7
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Aanalyzer 扩展配置</comment>
<!--用户可以在这里配置自己的扩展字典 * * * 添加扩展词典-->
<entry key="ext_dict">ext.dic</entry>
</properties>

ik分词器-停用词库

要禁用某些敏感词条,只需要修改一个ik分词器目录中的config目录中的IkAnalyzer.cfg.xml文件:

1
2
3
4
5
6
7
8
9
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<comment>IK Aanalyzer 扩展配置</comment>
<!-- 用户可以在这里配置自己的扩展字典 -->
<entry key="ext_dict">ext.dic</entry>
<!-- 用户可以在这里配置自己的扩展停止词字典 * * * 添加停用词词典 -->
<entry key="ext_stopwords">stopword.dic</entry>
</properties>

然后在名为stopword.dic的文件中,添加想要拓展的词语即可

索引库操作

mapping属性

mapping是对索引库中文档的约束,常见的mapping属性包括:

  • type:字段数据类型,常见的简单类型有:
    • 字符串:text(可分词的文本)、keyword(精确值,例如:品牌、国家、ip地址)
    • 数值:long、integer、short、byte、double、float
    • 布尔:boolean
    • 日期:date
    • 对象:object
  • index:是否创建索引,默认为true
  • analyzer:使用那种分词器
  • properties:该字段的子字段

创建索引库

ES中通过Restful请求操作索引库、文档。请求文档用DSL语句来表示。创建索引库和mapping的DSL语法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
PUt /索引库名称
{
"mappings":{
"properties":{
"字段名":{
"type":"text",
"analyzer":"ik_smart"
},
"字段名2":{
"type":"keyword",
"index":"false"
},
"字段名3":{
"properties":{
"子字段":{
"type":"keyword"
}
}
},
// ...略
}
}
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 创建索引库
PUT /heima
{
"mappings": {
"properties": {
"info":{
"type": "text",
"analyzer": "ik_smart"
},
"email":{
"type":"keyword",
"index": false
},
"name":{
"type": "object",
"properties": {
"firstName":{
"type":"keyword"
},
"lastName":{
"type":"keyword"
}
}
}
}
}
}

查看、删除索引库

查看索引库语法:

1
GET /索引库名

示例:

1
GET /heima

删除索引库的语法:

1
DELETE /索引库名

示例:

1
DELETE /heima

修改索引库

索引库和mapping一旦创建无法修改,但是可以添加新的字段,语法如下:

1
2
3
4
5
6
7
8
PUT /索引库名/_mapping
{
"properties":{
"新字段名":{
"type": "integer"
}
}
}

示例:

1
2
3
4
5
6
7
8
PUT /heima/_mapping
{
"properties":{
"age":{
"type": "integer"
}
}
}

文档操作

添加文档

新增文档的DSL语法如下:

1
2
3
4
5
6
7
8
9
10
POST /索引库名/_doc/文档id
{
"字段1":"值1",
"字段2":"值2",
"字段3":{
"子属性1":"值3",
"子属性2":"值4"
},
// ...
}

示例:

1
2
3
4
5
6
7
8
9
POST /heima/_doc/1
{
"info":"黑马程序员Java讲师",
"email":"zy@itcast.cn",
"name": {
"firstName": "云",
"lastName": "赵"
}
}

查看、删除文档

查看文档语法:

1
GET /索引库名/_doc/文档id

示例:

1
GET /heima/_doc/1

删除索引库的语法:

1
DELETE /索引库名/_doc/文档id

示例:

1
DELETE /heima/_doc/1

修改文档

方式一:全量修改,会删除旧文档,添加新文档

1
2
3
4
5
6
PUT /索引库名/_doc/文档id
{
"字段1": "值1",
"字段2": "值2",
// ...略
}

示例:

1
2
3
4
5
6
7
8
9
10
PUT /heima/_doc/1
{
"info":"黑马程序员Java讲师",
"email":"ZhaoYun@itcase.cn",
"name":{
"firstName":"云",
"lastName":"赵"
}
}

方式二:增量修改,修改指定字段值

1
2
3
4
5
6
POST /索引库名/_update/文档id
{
"doc": {
"字段1": "值1",
}
}

示例:

1
2
3
4
5
6
POST /heima/_update/1
{
"doc": {
"email": "ZhaoYun@itcast.cn"
}
}

RestClient操作索引库

什么是RestClient

ES官方提供了各种不同语言的客户端,用来操作ES。这些客户端的本质就是组装DSL语句,通过http请求发送给ES。官方文档地址:
https://www.elastic.co/guide/en/elasticsearch/client/index.html

利用JavaRestClient实现创建、删除索引库、判断索引库是否存在

根据课前资料提供的酒店数据创建索引库,索引库名为hotel,mapping属性根据数据库结构定义。
基本步骤如下:

  1. 导入课前资料Demo
  2. 分析数据结构,定义mapping属性
  3. 初始化JavaRestClient
  4. 利用JavaRestClient创建索引库
  5. 利用JavaRestClient删除索引库
  6. 利用JavaRestClient判断索引库是否存在
    步骤1:导入课前资料Demo
    首先导入课前资料提供的数据库数据:
    [Pasted image 20231020162716.png]
    然后导入课前资料提供的项目:
    [Pasted image 20231020162800.png]
    步骤2:分析数据结构
    mapping要考虑的问题:
    字段名、数据类型、是否参与搜索、是否分词、如果分词,分词器是什么?
    [Pasted image 20231020164505.png]
    示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    PUT /hotel
    {
    "mappings": {
    "properties": {
    "id": {
    "type": "keyword"
    },
    "name": {
    "type": "text",
    "analyzer": "ik_max_word",
    "copy_to": "all"
    },
    "address": {
    "type": "keyword",
    "index": false
    },
    "price": {
    "type": "integer"
    },
    "score": {
    "type": "integer"
    },
    "brand": {
    "type": "keyword",
    "copy_to": "all"
    },
    "city": {
    "type": "keyword"
    },
    "star_name": {
    "type": "keyword"
    },
    "business": {
    "type": "keyword",
    "copy_to": "all"
    },
    "location":{
    "type": "geo_point"
    },
    "pic": {
    "type": "keyword",
    "index": false
    },
    "all": {
    "type": "text",
    "analyzer": "ik_max_word"
    }
    }
    }
    }
    步骤3:初始化JavaRestClient
  7. 引入es的RestHighLevelClient依赖:
    1
    2
    3
    4
    <dependency>  
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    </dependency>
  8. 因为SpringBoot默认的ES版本是7.6.2,所以我们需要覆盖默认的ES版本:
    1
    2
    3
    4
    <dependency>  
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    </dependency>
  9. 初始化RestHighLevelClient:
    1
    2
    3
    RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(  
    HttpHost.create("http://192.168.140.130:9200")
    ));
    步骤4:创建索引库
    创建索引库代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test  
    void testCreateHotelIndex() throws IOException {
    //1.创建Request对象
    CreateIndexRequest request = new CreateIndexRequest("hotel");
    //2.请求参数,MAPPING_TEMPLATE是静态常量字符串,内容是创建索引库的DSL语句
    request.source(MAPPING_TEMPLATE, XContentType.JSON);
    //3.发起请求
    client.indices().create(request, RequestOptions.DEFAULT);
    }
    步骤5:删除索引库、判断索引库是否存在
  • 删除索引库代码如下:
    1
    2
    3
    4
    5
    6
    7
    @Test  
    void testDeleteHotelIndex() throws IOException {
    //1. 创建Request对象
    DeleteIndexRequest request = new DeleteIndexRequest("hotel");
    //2.发起请求
    client.indices().delete(request,RequestOptions.DEFAULT);
    }
  • 判断索引库是否存在
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test  
    void testExistsHotelIndex() throws IOException {
    //1.创建Request对象
    GetIndexRequest hotel = new GetIndexRequest("hotel");
    //2.发起请求
    boolean exists = client.indices().exists(hotel,RequestOptions.DEFAULT);
    //3.输出
    System.out.println(exists);
    }

RestClient操作文档

利用JavaRestClient实现文档的CRUD

去数据库查询酒店数据,导入到hotel索引库,实现酒店数据的CRUD。
基本步骤如下:

  1. 初始化JavaRestClient
  2. 利用JavaRestClient新增酒店数据
  3. 利用JavaRestClient根据id查询酒店数据
  4. 利用JavaRestClient删除酒店数据
  5. 利用JavaRestClient修改酒店数据
    步骤1:初始化JavaRestClient
    新建一个测试类,实现文档相关操作,并且完成JavaRestClient的初始化
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class ElasticsearchDocumentTest{
    private RestHighLevelClient client;
    @BeforeEach
    void setUp() {
    this.client = new RestHighLevelClient(RestClient.builder(
    HttpHost.create("http://192.168.140.130:9200")
    ));
    }

    @AfterEach
    void tearDown() throws IOException {
    this.client.close();
    }
    }
    步骤2:添加酒店数据到索引库
    先查询酒店数据,然后给这条数据创建倒排索引,即可完成添加:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Test  
    void testAddDocument() throws IOException {
    //根据id查看酒店数据
    Hotel hotel = hotelService.getById(36934L);
    //转换为文档类型
    HotelDoc hotelDoc = new HotelDoc(hotel);

    //1.准备Request对象
    IndexRequest indexRequest = new IndexRequest("hotel").id(hotelDoc.getId().toString());
    //2.准备JSON文档
    indexRequest.source(JSON.toJSONString(hotelDoc),XContentType.JSON);
    client.index(indexRequest,RequestOptions.DEFAULT);
    }
    步骤3:根据id查询酒店数据
    根据id查询到的文档数据是json,需要反序列化为json对象:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test  
    void testCreateHotelIndex() throws IOException {
    //1.创建Request对象
    CreateIndexRequest request = new CreateIndexRequest("hotel");
    //2.请求参数,MAPPING_TEMPLATE是静态常量字符串,内容是创建索引库的DSL语句
    request.source(MAPPING_TEMPLATE, XContentType.JSON);
    //3.发起请求
    client.indices().create(request, RequestOptions.DEFAULT);
    }
    步骤4:根据id修改酒店数据
    修改文档数据有两种方式:
    方式一:全量更新。再次写入id一样的文档,就会删除旧文档,添加新文档
    方式二:局部更新。只更新部分字段,我们演示方式二
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Test  
    void testUpdateDocument() throws IOException {
    //1.准备Request
    UpdateRequest hotel = new UpdateRequest("hotel", "36934");
    //2.准备请求参数
    hotel.doc(
    "price","700",
    "starName","四钻"
    );
    //3.发送请求
    client.update(hotel,RequestOptions.DEFAULT);
    }
    步骤5:根据id删除文档数据
    删除文档代码如下:
    1
    2
    3
    4
    5
    6
    7
    @Test  
    void testDeleteDocument() throws IOException {
    //1.准备Request
    DeleteRequest request = new DeleteRequest("hotel", "36934");
    //2.发送请求
    client.delete(request,RequestOptions.DEFAULT);
    }

利用JavaRestClient批量导入酒店数据到ES

需求:批量查询酒店数据,然后批量导入索引库中
思路:

  1. 利用mybatis-plus查询酒店数据
  2. 将查询到的酒店数据(Hotel)转换为文档类型数据(HotelDoc)
  3. 利用JavaRestClient中的Bulk批处理,实现批量新增文档,示例代码如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Test  
    void testBulkRequest() throws IOException {
    //批量查询酒店数据
    List<Hotel> list = hotelService.list();
    //1.创建Request
    BulkRequest request = new BulkRequest();
    //2.准备参数
    list.stream().map(arg -> new HotelDoc(arg)).collect(Collectors.toList()).forEach((arg) -> {
    request.add(new IndexRequest("hotel")
    .id(arg.getId().toString())
    .source(JSON.toJSONString(arg),XContentType.JSON));
    });
    //3.发送请求
    client.bulk(request,RequestOptions.DEFAULT);
    }

DSL查询语法

DSL Query的分类

Elasticasearch提供了基于JSON的DSL(Domain Specific Language)来定义查询。常见的查询类型包括:

  • 查询所有:查询出所有数据,一般测试用。例如:match_all
  • 全文检索(full text)查询:利用分词器对用户输入内容分词,然后去倒排索引库中匹配。例如:
    • match_query
    • multi_match_query
  • 精确查询:根据精确词条查找数据,一般是查找keyword、数值、日期、boolean等类型字段。例如:
    • ids
    • range
    • term
  • 地理(geo)查询:根据经纬度查询。例如:
    • geo_distance
    • geo_bounding_box
  • 复合(compound)查询:复合查询可以将上述各种查询条件组合起来,合并查询条件。例如:
    • bool
    • function_score

DSL Query基本语法

查询的基本语法如下:

1
2
3
4
5
6
7
8
GET /indexName/_search
{
"query": {
"查询类型": {
"查询条件": "条件值"
}
}
}

示例:

1
2
3
4
5
6
7
GET /indexName/_search
{
"query": {
"match_all": {
}
}
}

全文检索查询

全文检索查询,会对用户输入内容分词,常用于搜索框搜索:

[Pasted image 20231021144312.png]
[Pasted image 20231021144417.png]

match查询:全文检索查询的一种,会对用户输入内容分词,然后去倒排索引库检索,语法:

1
2
3
4
5
6
7
8
GET /indexName/_search
{
"query": {
"match": {
"FIELD": "TEXT"
}
}
}

multi_match:与match查询类似,只不过允许同时查询多个字段,语法:

1
2
3
4
5
6
7
8
9
GET /indexName/_search
{
"query": {
"multi_match": {
"query" : "TEXT",
"fields": ["FIELD1","FIELD12"]
}
}
}

精确查询

精确查询一般是查找keyword、数值、日期、boolean等类型字段。所以不会对搜索条件分词。常见的有:

  • term:根据词条精确值查询
  • range:根据值的范围查询

精确查询-语法

精确查询一般是根据id、数值、keyword类型、或布尔字段来查询。语法如下:
term查询:

1
2
3
4
5
6
7
8
9
10
11
// term查询
GET /indexName/_search
{
"query": {
"term": {
"FIELD": {
"value": "VALUE"
}
}
}
}

range查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
// range查询
GET /hotel/_search
{
"query": {
"range": {
"price": {
"gte": 100,
"lte": 400
}
}
}
}

地理查询

根据经纬度查询。常见的使用场景包括:

  • 携程:搜索我附近的酒店
  • 滴滴:搜索我附近的出租车
  • 微信:搜索我附近的人
    根据经纬度查询,例如:
  • geo_bounding_box:查询geo_point值落在某个矩形范围的所有文档
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // geo_bounding_box查询
    GET /indexName/_search
    {
    "query": {
    "geo_bounding_box": {
    "FIELD": {
    "top_left": {
    "lat": 31.1,
    "lon": 121.5
    },
    "bottom_right": {
    "lat": 30.9,
    "lon": 121.7
    }
    }
    }
    }
    }
  • geo_distance:查询到指定中心点小于某个距离值的所有文档
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // geo_distance 查询
    GET /indexName/_search
    {
    "query": {
    "geo_distance": {
    "distance": "15km",
    "FIELD": "31.21,121.5"
    }
    }
    }

复合查询

复合(compound)查询:复合查询可以将其他简单查询组合起来,实现更复杂的搜索逻辑,例如:

  • fuction score:算分函数查询,可以控制文档相关性算分,控制文档排名。例如百度竞价

相关性算分

当我们利用match查询时,文档结果会根据与搜索词条的关联度打分(_score),返回结果时按照分值降序排列。
例如,我们搜索”虹桥如家”,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[
{
"_score": 17.850193,
"_source": {
"name": "虹桥如家酒店真不错",
}
},
{
"_score": 12.259849,
"_source": {
"name": "外滩如家酒店真不错",
}
},
{
"_score": 11.91091,
"_source": {
"name": "迪士尼如家酒店真不错",
}
}
]

elsasticsearch中的相关性打分算法:

  • TF-IDF:在elasticsearch5.0之前,会随着词频增加而越来越大
    [Pasted image 20231022083816.png]
    [Pasted image 20231022083829.png]
  • BM25:在elasticsearch5.0之后,会随着词频增加而增大,但增长曲线会趋于水平
    [Pasted image 20231022083852.png]

Function Score Query

使用function score query,可以修改文档的相关性算分(query score),根据新得到的算分排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET /hotel/_search
{
"query": {
"function_score": {
"query": { "match": {"all": "外滩"}},
"functions": [
{
"filter": {"term": {"id": "1"}},
"weight": 10
}
],
"boost_mode": "multiply"
}
}
}
  • “query”: { “match”: {“all”: “外滩”}}:
    原始查询条件,搜索文档并根据相关性打分(query_score)
  • “filter”: {“term”: {“id”: “1”}}:
    过滤条件,符合条件的文档才会被重新算分
  • “weight”: 10
    算分函数,算分函数的结果称为function_score,将来会与query score运算,得到新算法,常见的算分函数有:
    • weight:给一个常量值,作为函数结果(function score)
    • field_value_factor:用文档中的某个字段值作为函数结果
    • random_score:随机生成一个值,作为函数结果
    • script_score:自定义计算公式,公式结果作为函数结果
  • “boost_mode”: “multiply”
    加权模式,定义function score与query_score的运算方式,包括:
    • multiply:两者相乘。默认就是这个
    • replace:用function score替换query score
    • 其他:sum、avg、max、min

给“如家”这个品牌的酒店排名靠前一点

把这个问题翻译一下,function score需要的三要素:

  1. 那些文档需要算分加权?
    • 品牌为如家的酒店
  2. 算分函数是什么?
    • weight就可以
  3. 加权模式是什么?
    • 求和
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      GET /hotel/_search
      {
      "query": {
      "function_score": {
      "query": {...},
      "functions": [ //算分函数
      {
      "filter": { // 满足的条件,品牌必须是如家
      "term": {
      "brand": "如家"
      }
      },
      "weight": 2 //算分权重为2
      }
      ],
      "boost_mode": "sum"
      }
      }
      }

function score query三要素:

  • 过滤条件:哪些文档要加分
  • 算分函数:如何计算function score
  • 加权方式:function score与 query score如何运算

复合查询 Boolean Query

布尔查询是一个或多个查询子句的组合。子查询的组合方式有:

  • must:必须匹配每个子查询,类似“与”
  • should:选择性匹配子查询,类似“或”
  • must_not:必须不匹配,不参与算分,类似“非”
  • filter:必须匹配,不参与算分
    示例:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    GET /hotel/_search
    {
    "query": {
    "bool": {
    "must": [
    {"term": {"city":"上海"}}
    ],
    "should": [
    {"term": {"brand":"皇冠假日"}},
    {"term": {"brand":"华美达"}}
    ],
    "must_not": [
    {"range":{"price":{"lte": 500}}}
    ],
    "filter": [
    {"range":{"score":{"get": 45}}}
    ]
    }
    }
    }

利用bool查询实现功能

需求:搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
GET /hotel/_search
{
"query": {
"bool": {
"must": [
{
"match": {"name": "如家"}
}
],
"must_not": [
{
"range": {"price":{"gt": 400}}
}
],
"filter": [
{
"geo_distance": {
"distance":"10km","location":
{"lat":31.21,"lon":121.5}
}
}
]
}
}
}

搜索结果处理

排序

elasticsearch支持对搜索结果排序,默认是根据相关度算分(_score)来排序。可以排序字段类型有:keyword类型、数值类型、地理坐标类型、日期类型等

1
2
3
4
5
6
7
8
9
10
11
GET /indexName/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"FIELD": "desc" //排序字段和排序方式ASC、DESC
}
]
}

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET /indexName/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"FIELD": "纬度,经度",
"order": "asc",
"unit": "km"
}
}
]
}

实现对酒店数据按照到你的位置坐标的距离升序排序

获取经纬度的方式:(https://lbs.amap.com/demo/jsapi-v2/example/map/click-to-get-lnglat/)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET /hotel/_search
{
"query": {
"match_all": {}
},
"sort": [
{
"_geo_distance": {
"location": {
"lat": 31.034661,
"lon": 121.612282
},
"order": "asc",
"unit": "km"
}
}
]
}

分页

elasticsearch默认情况下只返回top10的数据。而如果要查询更多数据就需要修改分页参数了。
elasticsearch中通过修改from、size参数来控制要返回的分页结果:

1
2
3
4
5
6
7
8
9
10
11
GET /hotel/_search
{
"query": {
"match_all": {}
},
"from": 990, //分页开始的位置,默认为0
"size": 10, //期望获取的文档总数
"sort": [
{"price":"asc"}
]
}

深度分页问题

ES是分布式的,所以会面临深度分页问题。例如按price排序后,获取from=990,size=10的数据:

  1. 首先在每个数据分片上都排序并查询前1000条文档。
  2. 然后将所有节点的结果聚合,在内存中重新排序选出前1000条文档
  3. 最后从这1000条中,选取从990开始的10条文档
    如果搜索页数过深,或者结果集(from + size)越大,对内存和CPU的消耗也越高。因此ES设定结果集查询的上限是10000
    ![[Pasted image 20231024075341.png]]

深度分页解决方案

针对深度分页,ES提供了两种解决方案,官方文档

  • search after:分页时需要排序,原理是从上一次的排序值开始,查询下一页数据。官方推荐使用的方式
  • scroll:原理将排序数据形成快照,保存在内存。官方已经不推荐使用

分页总结

from + size:

  • 优点:支持随机翻页
  • 缺点:深度分页问题,默认查询上限(from + size)是10000
  • 场景:百度、京东、谷歌、淘宝这样的随机翻页搜索
    after search:
  • 优点:没有查询上限(单次查询的size不超过10000)
  • 缺点:只能向后逐页查询,不支持随机翻页
  • 场景:没有随机翻页需求的搜索,例如手机向下滚动翻页
    scroll:
  • 优点:没有查询上限(单次查询的size不超过10000)
  • 缺点:会有额外内存消耗,并且搜索结果是非实时的
  • 场景:海量数据的获取和迁移。从ES7.1开始不推荐,建议用after search方案。

高亮

高亮:就是在搜索结果中把搜索关键字突出显示。
原理是这样的:

  • 将搜索结果中的关键字用标签标记出来
  • 在页面中给标签添加css样式
    语法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    GET /hotel/_search
    {
    "query": {
    "match": {
    "FIELD": "TEXT"
    }
    },
    "highlight": {
    "fields": { // 指定要高亮的字段
    "FIELD": {
    "pre_tags": "<em>",// 用来标记高亮字段的前置标签
    "post_tags": "</em>" //用来标记高亮字段的后置标签
    }
    }
    }
    }

搜索结果处理整体语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
GET /hotel/_search
{
"query": {
"match": {
"name": "如家"
}
},
"from": 0, //分页开始的位置
"size": 20, //期望获取的文档总数
"sort": [
{ "price": "asc"}, //普通排序
{
"_geo_distance": { //距离排序
"location" : "31.040699,121.618075",
"order": "asc",
"unit": "km"
}
}
],
"highlight": {
"fields": { //高亮字段
"name": {
"pre_tags": "<em>", //用来标记高亮字段的前置标签
"post_tags": "</em>", //用来标记高亮字段的后置标签
}
}
}
}

RestClient查询文档

快速入门

我们通过match_all来演示下基本的API,先看请求的DSL的组织:

1
2
3
4
5
6
7
8
9
10
11
@Test
void testMatchAll() throws IOException{
//1.准备Request
SearchRequest request = new SearchRequest("hotel");
//2.组织DSL参数
request.source()
.query(QueryBuilders.matchAllQuery());
//3.发送请求,得到响应结果
SearchResponse response = client.search(request,RequestOptions.DEFAULT);
//...解析响应结果
}

再看结果的解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
void testMatchAll() throw IOException {
// ...略
//4.解析结果
SearchHits searchHits = search.getHits();
//4.1.查询的总条数
long value = searchHits.getTotalHits().value;
//4.2.查询的结果数组
SearchHit[] hits = searchHits.getHits();
for (SearchHit hit : hits) {
//4.3.得到source
String sourceAsString = hit.getSourceAsString();
System.out.println(searchHits);
}
}

RestAPI中其中构建DSL是通过HighLevelRestClient中的resource()来实现的,其中包含了查询、排序、分页、高亮等所有功能:
[Pasted image 20231024101117.png]
RestAPI中其中构建查询条件的核心部分是由一个名为QueryBuilders的工具类提供的,其中包含了各种查询方法:
[Pasted image 20231024101308.png]

全文检索查询

全文检索的match和multi_match查询与match_all的API基本一致。差别是查询条件,也就是query的部分。
同样是利用QueryBuilders提供的方法:

1
2
3
4
//单字段查询
QueryBuilders.matchQuery("all","如家");
//多字段查询
QueryBuilders.multiMatchQuery("如家","name","business");

精确查询

精确查询常见的有term查询和range查询,同样利用QueryBuilders实现:

1
2
3
4
// 词条查询
QueryBuilders.termQuery("city","杭州");
// 范围查询
QueryBuilders.rangeQuery("price").gte(100).lte(150);

复合查询-boolean query

精确查询常见的有term查询和range查询,同样利用QueryBuilders实现:

1
2
3
4
5
6
//创建布尔查询
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
//添加must条件
boolQuery.must(QueryBuilders.termQuery("city","杭州"));
//添加filter条件
boolQuery.filter(QueryBuilders.rangeQuery("price").lte(250));

排序和分页

搜索结果的排序和分页是与query同级的参数,对应的API如下:

1
2
3
4
5
6
//查询
request.source().query(QueryBuilders.matchAllQuery());
//分页
request.source().from(0).size(5);
//价格排序
request.source().sort("price",SortOrder.ASC);

高亮

高亮API包括请求DSL构建和结果解析两部分。我们先看请求的DSL构建:

1
2
3
4
request.source().highlighter(new HighlightBuilder)
.field("name")
//是否需要与查询字段匹配
.requireFieldMatch(false)

高亮结果解析

高亮的结果处理相对比较麻烦:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//4.3.得到source  
String sourceAsString = hit.getSourceAsString();
//反序列化
HotelDoc hotelDoc = JSON.parseObject(sourceAsString, HotelDoc.class);
//获取高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
if(!CollectionUtils.isEmpty(highlightFields)){
//根据字段名获取高亮结果
HighlightField highlightField = highlightFields.get("name");
if(highlightField != null) {
//获取高亮值
String name = highlightField.getFragments()[0].string();
//覆盖非高亮结果
hotelDoc.setName(name);
}
}

黑马旅游案例

案例1:实现黑马旅游的酒店搜索功能,完成关键字搜索和分页

课前提供的hotel-demo项目中,自带了前端页面,启动后可以看到:
[Pasted image 20231024120216.png]
先实现其中的关键字搜索功能,实现步骤如下:

  1. 定义实体类,接收前端请求
  2. 定义controller接口,接收页面请求,调用IHotelService的search方法
  3. 定义IHotelService中的search方法,利用match查询实现根据关键字搜索酒店信息
    步骤1:定义类,接收前端请求参数
    格式如下:
    1
    2
    3
    4
    5
    6
    7
    @Data
    public class RequestParms{
    private String key;
    private Integer page;
    private Integer size;
    private String sortBy;
    }
    步骤2:定义controller接口,接收前端请求
    定义一个HotelController,声明查询接口,满足下列要求:
  • 请求方式:Post
  • 请求路径:/hotel/list
  • 请求参数:对象,类型为RequestParam
  • 返回值:PageResult,包含两个属性
    • Long total:总条数
    • List<HotelDoc> hotels: 酒店数据

案例2:添加品牌、城市、星级、价格等过滤功能

步骤:

  1. 修改RquestParams类,添加brand、city、starName、minPrice、maxPrice等参数
  2. 修改search方法的实现,在关键字搜索时,如果brand等参数存在,对其做过滤
    步骤一:扩展IUserService的search方法的参数列表
    修改RequestParms类,接收所有参数:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Data
    public class RequestParams {
    private String key;
    private Integer page;
    private Integer size;
    private String sortBy;
    private String brand;
    private String starName;
    private String city;
    private Integer minPrice;
    private Integer maxPrice;
    }
    步骤二:修改search方法,在match查询基础上添加过滤条件
    过滤条件包括:
  • city精确匹配
  • brand精确匹配
  • starName精确匹配
  • price范围过滤
    注意事项:
  • 多个条件之间是AND关系,组合多条件用BooleanQuery
  • 参数存在才需要过滤,做好非空判断

案例3:我附近的酒店

前端页面点击后,会将你所在位置发送到后台:
[Pasted image 20231024173047.png]
我们要根据这个坐标,将酒店结果按照到这个点的距离升序排序
实现思路如下:

  • 修改RequestParams参数,接收location字段
  • 修改search方法业务逻辑,如果location有值,添加根据geo_distance排序的功能

距离排序

距离排序与普通排序有所差异,API如下:

1
2
3
4
5
6
7
8
//价格排序
request.source().sort("price",SortOrder.ASC);
//距离排序
request.source().sort(SortBuilders
.geoDistanceSort("location",new GeoPoint("31.21,121.5"))
.order(SortOrder.ASC)
.unit(DistanceUnit.KILOMETERS)
);

案例4:让指定的酒店在搜索结果中排名置顶

我们给需要置顶的酒店文档添加标记。然后利用function score给带有标记的文档增加权重
实现步骤分析:

  1. 给HotelDoc类添加isAD字段,Boolean类型
  2. 挑选几个你喜欢的酒店,给它的文档数据添加isAD字段,值为true
  3. 修改search方法,添加function score功能,给isAD值为true的酒店增加权重

组合查询-function score

Function Score查询可以控制文档的相关性算分,使用方式如下:

1
2
3
4
5
6
7
8
9
10
11
FunctionScoreQueryBuilder functionScoreQueryBuilder = 
QueryBuilders.functionScoreQuery(
QueryBuilders.matchQuery("name","外滩"),
new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{
new FunctionScoreQueryBuilder.FilterFunctionBuilder(
QueryBuilders.termQuery("brand","如家"),
ScoreFunctionBuilders.weightFactorFunction(5)
)
}
);
sourceBuilder.query(functionScoreQueryBuilder);

数据聚合

聚合的分类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

  • 桶(Bucket)聚合:用来对文档做分组
    • TermAggregation:按照文档字段值分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stans:同时求max、min、avg、sum等
  • 管道(pipeline)聚合:其它聚合的结果为基础做聚合

DSL实现Bucket聚合

现在,我们要统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合。
类型为term类型,DSL示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /hotel/_search
{
"size": 0, //设置size为0,结果中不包含文档,只包含聚合结果
"aggs": { //定义聚合
"brandAgg": { //给聚合起个名字
"terms": { //聚合的类型,按照品牌值聚合,所以选择term
"field": "brand", //参与聚合的字段
"size": 20 //希望获取的聚合结果数量

}
}
}
}

Bucket聚合-聚合结果排序

默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序。
我们可以修改结果排序方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"order": {
"_count": "asc" //按照_count升序排序
},
"size": 20
}
}
}
}

Bucket聚合-限定聚合范围

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET /hotel/_search
{
"query": {
"range": {
"price": {
"lte": 200 //只对200元以下的文档聚合
}
}
},
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
}
}
}
}

DSL实现Metrics聚合

例如,我们要求获取每个品牌的用户评分的min、max、avg等值
我们可以利用stats聚合:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET /hotel/_search
{
"size": 0,
"aggs": {
"brandAgg": {
"terms": {
"field": "brand",
"size": 20
},
"aggs": { //是brands聚合的子聚合,也就是分组后对每组分别计算
"score_stats": { //聚合名称
"stats": { //聚合类型,这里stats可以计算min、max、avg等
"field": "score" //聚合字段,这里是score
}
}
}
}
}
}

RestAPi实现聚合

我们以品牌聚合为例,演示下Java的RestClient使用,先看请求组装:

1
2
3
4
5
6
7
request.source().size(0);
request.source().aggregation(
AggregationBuilders
.terms("brand_agg")
.field("brand")
.size(20)
);

再看下聚合结果解析

1
2
3
4
5
6
7
8
9
10
11
12
//解析聚合结果  
Aggregations aggregations = search.getAggregations();
//根据名称获取聚合结果
Terms brandTerms = aggregations.get("brandAgg");
//获取桶
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
//遍历
for (Terms.Bucket bucket : buckets) {
//获取key,也就是品牌信息
String brandName = bucket.getKeyAsString();
System.out.println(brandName);
}

案例:在IUservice中定义方法,实现对品牌、城市、星级的聚合

需求:搜索页面的品牌、城市等信息不应该是在页面写死,而是通过聚合索引库中的酒店数据得来的:
在IUservice中定义一个方法,实现对品牌、城市、星级的聚合,方法声明如下:

1
Map<String, List<String>> filter();

对接前端接口

前端页面会向服务端发起请求,查询品牌、城市、星级等字段的聚合结果:
[Pasted image 20231025160138.png]
可以看到请求参数与之前search时的RequestParam完全一致,这是在限定聚合时的文档范围。
例如:用户搜索”外滩“,价格在300~600,那聚合必须是在这个搜索条件基础上完成。
因此我们需要:

  1. 编写controller接口,接收该请求
  2. 修改IUserService#getFilters()方法,添加RequestParam参数
  3. 修改getFilters方法的业务,聚合时添加query条件

自动补全

自动补全需求说明

当用户在搜索框输入字符时,我们应该提示出与该字符有关的搜索项,如图:
[Pasted image 20231025164202.png]

使用拼音分词

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:(https://github.com/medcl/elasticsearch-analysis-pinyin)
安装方式与IK分词器一样,分三步:

  1. 解压
  2. 上传到虚拟机中,elasticsearch的plugin目录
  3. 重启elasticsearch
  4. 测试
    示例:
    1
    2
    3
    4
    5
    POST /_analyze
    {
    "text": ["如家酒店还不错"],
    "analyzer": "pinyin" //拼音分词器
    }

自定义分词器

elasticsearch中分词器(analyzer)的组成包含三部分:

  • character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分次;还有ik_smart
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等
    我们可以在创建索引库时,通过settings来配置自定义的analyzer(分词器):
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    PUT /test
    {
    "settings": {
    "analysis": {
    "analyzer": { //自定义分词器
    "my_analyzer": { //分词器名称
    "tokenizer": "ik_max_word",
    "filter": "pinyin"
    }
    }
    }
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
PUT /test
{
"settings": {
"analysis": {
"analyzer": { //自定义分词器
"my_analyzer": { //分词器名称
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": { //自定义tokenizer filter
"py": { //过滤器名称
"type": "pinyin", //过滤器类型,这里是pinyin
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_original": true,
"limit_first_letter_length": 16,
"remove_duplicated_term": true,
"none_chiness_pinyin_tokenize": false
}
}
}
}
}

拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。
创建倒排索引时:
[Pasted image 20231026092538.png]
搜索时,用户搜索“狮子”:
[Pasted image 20231026092606.png]
因此字段在创建倒排索引时应该用my_analyzer分词器;字段在搜索时应该使用ik_smart分词器;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
PUT /test
{
"settings": {
"analysis": {
"analyzer": { //自定义分词器
"my_analyzer": { //分词器名称
"tokenizer": "ik_max_word",
"filter": "py"
}
},
"filter": { //自定义tokenizer filter
"py": { ... }
}
}
},
"mappings": {
"properties": {
"name": {
"type": "text",
"analyzer": "my_analyzer",
"search_analyzer": "ik_smart"
}
}
}
}

completion suggester查询

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

  • 参与补全查询的字段必须是completion类型。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 创建索引库
    PUT test
    {
    "mappings": {
    "properties": {
    "title": {
    "type": "completion"
    }
    }
    }
    }
  • 字段的内容一般是用来补全的多个词条组成的数组
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    //示例数据
    POST test/_doc
    {
    "title":["Sony","WH-1000XM3"]
    }
    POST test/_doc
    {
    "title":["SK-II","PITERA"]
    }
    POST test/_doc
    {
    "title":["Nintendo","switch"]
    }
    查询语法如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    GET /test/_search
    {
    "suggest": {
    "title_suggest": {
    "text": "s", // 关键字
    "completion": {
    "field": "title", //补全查询的字段
    "skip_duplicates": true, // 跳过重复的
    "size": 10 //获取前10条结果
    }
    }
    }
    }

案例:实现hotel索引库的自动补全、拼音搜索功能

实现思路如下:

  1. 修改hotel索引库结构,设置自定义拼音分词器
  2. 修改索引库的name、all字段,使用自定义分词器
  3. 索引库添加一个新字段suggestion,类型为completion类型,使用自定义的分词器
  4. 给HotelDoc类添加suggestion字段,内容包含brand、business
  5. 重新导入数据到hotel库

RestAPI实现自动补全

先看请求参数的构造API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//1.准备请求
SearchRequest request = new SearchRequest("hotel");
//2.请求参数
request.source()
.suggest(new SuggestBuilder().addSuggestion(
"mySuggestion",
SuggestBuilders
.completionSuggestion("title")
.prefix("h")
.skipDuplicates(true)
.size(10)
));
//3.发送请求
client.search(request,RequestOptions.DEFAULT);

对应:

1
2
3
4
5
6
7
8
9
10
11
12
13
GET /hotel/_search
{
"suggest": {
"suggestions": {
"text": "h",
"completion": {
"field": "suggestion",
"skip_duplicates": true,
"size": 10
}
}
}
}

再来看结果解析:

1
2
3
4
5
6
7
8
9
10
//4.处理结果
Suggest suggest = response.getSuggest();
//4.1.根据名称获取补全结果
CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggestion");
//4.2.获取options并遍历
for(CompletionSuggestion.Entry.Option option : suggestion.getOptions()){
//4.3.获取一个option中的text,也就是补全的词条
String text = option.getText().string();
System.out.println(text);
}

实现酒店搜索页面输入框的自动补全

查看前端页面,可以发现当我们在输入框键入时,前端会发起ajax请求:
[Pasted image 20231030091238.png]
在服务端编写接口,接收该请求,返回补全结果的集合,类型为LIst<string>

数据同步

数据同步问题分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticearch也必须发生改变,这个就是elasticearch与mysql之间的数据同步

在微服务中,负责酒店管理(操作mysql)的业务与负责酒店搜索(操作elasticsearch)的业务可能在两个不同的微服务上,数据同步该如何实现呢?
方案一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高
    [Pasted image 20231030093438.png]
    方案二:异步通知
  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性
    [Pasted image 20231030093851.png]
    方案三:监听binlog
  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高
    [Pasted image 20231030093940.png]

利用MQ实现mysql与elasticearch数据同步

利用课前资料提供的hotel-admin项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
步骤:

  • 导入课前资料提供的hotel-admin项目,启动并测试酒店数据的CRUD
  • 声明exchange、queue、RoutingKey
  • 在hotel-admin中的增、删、改业务中完成消息发送
  • 在hotel-demo中完成消息监听,并更新elasticsearch中数据
  • 启动并测试数据同步功能

elasticsearch集群

ES集群结构

单击的elasticserch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。

  • 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
    [Pasted image 20231030154015.png]
  • 单点故障问题:将分片数据在不同节点备份(replica)
    [Pasted image 20231030154323.png]

搭建ES集群

我们计划利用3个docker容器模拟3个es的节点。具体步骤参考elasticsearch第一天课程的课前资料:
[安装elasticsearch]

ES集群的节点角色

elasticsearch中集群节点有不同的职责划分:
[Pasted image 20231031084716.png]
elasticsearch中的每个节点角色都有不同的职责,因此建议集群部署时,每个节点都有独立的角色。
[Pasted image 20231031090955.png]

ES集群的脑裂

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其他候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。
为了避免脑裂,需要要求选票超过(eligible节点数量+1)/2才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题
[Pasted image 20231031091948.png]
[Pasted image 20231031091652.png]

ES集群的分布式存储

当新增文档时,应该保存到不同分片,保证数据均衡,那么coordinating node如何确定数据该存储到哪个分片呢?
elasticsearch会通过hash算法来计算文档应该存储到哪个分片:
[Pasted image 20231031104133.png]
说明:

  • _routing默认是文档的id
  • 算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!
    新增文档流程:
    [Pasted image 20231031111704.png]
    elasticsearch的查询分成两个阶段:
  • scatter phase:分散阶段,coordinating node会把请求分发到每一个切片
  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户
    [Pasted image 20231031112206.png]

ES集群的故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这个叫做故障转移。

初始Sentinel

雪崩问题

微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用,这就是雪崩
[Pasted image 20231031141724.png]
解决雪崩问题的常见方式有四种:

  • 超时处理:设定超过时间,请求超过一定时间没有响应就返回错误信息,不会无休止等待
    [Pasted image 20231031141923.png]
  • 舱壁模式:限定每个业务能使用的线程数,避免耗尽整个tomcat的资源,因此也叫线程隔离
    [Pasted image 20231031142252.png]
  • 熔断降级模式:由断路器统计业务执行的异常比例,如果超过阈值则会熔断该业务,拦截访问该业务的一切请求
    [Pasted image 20231031142614.png]
  • 流量控制:限制业务访问的QPS,避免服务因流量的突增而故障
    [Pasted image 20231031142831.png]

服务保护技术对比

[Pasted image 20231031143324.png]

认识Sentinel

Sentinel是阿里巴巴开源的一款微服务流量控制组件。官网地址:(http://sentinelguard.io/zh-cn/index.html)
Sentinel具有以下特征:

  • 丰富的应用场景:Sentinel承接了阿里巴巴近10年的双十一大促场景的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等
  • 完备的实时监控:Sentinel同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至500台以下规模的集群的汇总运行情况。
  • 广泛的开源生态:Sentinel提供开箱即用的与其它开源框架/库的整合模块,例如与Spring Cloud、Dobbo、gRPC的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入Sentinel
  • 完善的SPI扩展点:Sentinel提供简单易用、完善的SPI扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等

安装Sentinel控制台

sentinel官方提供了UI控制台,方便我们对系统做限流设置。大家可以在GitHub下载。课前资料提供了下载好的jar包:
[Pasted image 20231031150131.png]

  1. 将其拷贝到一个你能记住的非中文目录,然后运行命令:
    1
    java -jar sentinel-dashboard-1.8.1.jar
  2. 然后访问:localhost:8080即可看到控制台页面,默认的账户和密码都是sentinel
  • [Pasted image 20231031151239.png]
    如果要修改Sentinel的默认端口、账户、密码,可以通过下列配置:
    [Pasted image 20231031151606.png]
    举例说明:
    1
    java -jar sentinel-dashboard-1.8.1.jar -Dserver.port=8090

引入cloud-demo

要使用Sentinel肯定要结合微服务,这里我们使用SpringCloud实用篇中的cloud-demo工程。没有的小伙伴可以在课前资料中找到:
[Pasted image 20231031152117.png]
项目结构如下:
[Pasted image 20231031152137.png]

微服务整合Sentinel

我们在order-service中整合Sentinel,并且连接Sentinel的控制台,步骤如下:

  1. 引入sentinel依赖:
    1
    2
    3
    4
    5
    <!-- sentinel -->
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
    </dependency>
  2. 配置控制台地址:
    1
    2
    3
    4
    5
    spring:
    cloud:
    sentinel:
    transport:
    dashboard: localhost:8080
  3. 访问微服务的任意端点,触发sentinel监控

限流规则

簇点链路

簇点链路:就是项目内的调用链路,链路中被监控的每个接口就是一个资源。默认情况下sentinel会监控SpringMVC的每一个端点(Endpoint),
因此SpringMVC的每一个端点(Endpoint)就是调用链路中的一个资源
流控、熔断等都是针对簇点链路中的资源来设置的,因此我们可以点击对应资源后面的按钮来设置规则:
[Pasted image 20231031155557.png]

快速入门

点击资源/order/{orderId}后面的流控按钮,就可以弹出表单。表单中可以添加流控规则,如下图所示:
[Pasted image 20231031160002.png]
其含义是限制/order/{orderId}这个资源的单击QPS为1,即每秒只允许1次请求,超出的请求会被拦截并报错。

流控规则入门案例

需求:给/order/{orderId}这个资源设置流控规则,QPS不能超过5。然后利用jemeter测试

  1. 设置流控规则:
    [Pasted image 20231031161410.png]
  2. jemeter测试:
    [Pasted image 20231031161432.png]

流控模式

在添加限流规则时,点击高级选项,可以选择三种流控模式:

  • 直接:统计当前资源的请求,触发阈值时对当前资源直接限流,也是默认的模式
  • 关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流
  • 链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流
    [Pasted image 20231031161902.png]

流控模式-关联

  • 关联模式:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流
  • 使用场景:比如用户支付时需要修改订单状态,同时用户要查询订单。查询和修改操作会争抢数据库锁,产生竞争。业务需求是有限支付和更新订单的业务,因此当修改订单业务触发阈值时,需要对查询订单业务限流
    [Pasted image 20231031162600.png]
    当/write资源访问量触发阈值时,就会对/read资源限流,避免影响/write资源

案例-流控模式-关联

需求:

  • 在OrderController新建两个端点:/order/query和/order.update,无序实现业务
  • 配置流控规则,当/order/update资源被访问的QPS超过5时,对/order/query请求限流
    小结:
    满足下面条件可以使用关联模式:
    • 两个有竞争关系的资源
    • 一个优先级较高,一个优先级较低

流控模式-链路

链路模式:只针对从指定链路访问到本资源的请求做统计,判断是否超过阈值
例如有两条请求链路:

  • /test1 -> /common
  • /test2 -> /common
    如果只希望统计从/test2进入到/common的请求,则可以这样配置:
    [Pasted image 20231031163902.png]

案例-流控模式-链路

需求:有查询订单和创建订单业务,两者都需要查询商品。针对从查询订单进入到查询商品的请求统计,并设置限流
步骤:

  1. 在OrderService中添加一个queryGoods方法,不用实现业务
  2. 在OrderController中,改造/order/query端点,调用OrderService中的queryGoods方法
  3. 在OrderController中添加一个/order/save的端点,调用OrderService的queryGoods方法
  4. 给queryGoods设置限流规则,从/order/query进入queryGoods的方法限制QPS必须小于2
  • Sentinel默认只标记Controller中的方法为资源,如果要标记其他方法,需要利用@SentinelResource注解,示例:
    1
    2
    3
    4
    @SentinelResource("goods")
    public void queryGoods(){
    System.out.println("查询商品");
    }
  • Sentinel默认会将Controller方法做context整合,导致链路模式的流控失效,需要修改application.yml,添加配置:
    1
    2
    3
    4
    spring:
    cloud:
    sentinel:
    web-context-unify: false # 关闭context整合

流控效果

流控效果是指请求达到流控阈值时应该采取的措施,包括三种:

  • 快速失败:达到阈值后,新的请求会被立即拒绝并抛出FlowException异常。是默认的处理方式
  • warm up:预热模式,对超出阈值的请求同样是拒绝并抛出异常。但这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值
  • 排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长
    [Pasted image 20231031170426.png]

流控效果-warm up

warm up也叫预热模式,是应对服务冷启动的一种方案。请求阈值初始值是threshold/ coldFactor,持续指定时长后,逐渐提高到threshold值。而coldFactor的默认值是3
例如,我设置QPS的threshold为10,预热时间为5秒,那么初始阈值就是10/3,也就是3,然后在5秒后逐渐增长到10
[Pasted image 20231031171135.png]

流控效果-排队等待

当请求超过QPS阈值时,快速失败和warm up会拒绝新的请求并抛出异常。而排队等待则是让所有请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须等待前面执行完成,如果请求预期的等待时间超出最大时长,则会被拒绝
例如:QPS=5,意味着每200ms处理一个队列中的请求;timeout=2000,意味着预期等待超过2000ms的请求会被拒绝并抛出异常
[Pasted image 20231031172315.png]
[Pasted image 20231031172336.png]

热点参数限流

之前的限流是统计访问某个资源的所有请求,判断是否超过QPS阈值。而热点参数限流是分别统计参数值相同的请求,判断是否超过QPS阈值
[Pasted image 20231031173040.png]
配置示例:
[Pasted image 20231031173105.png]
代表的含义是:对hot这个资源的0号参数(第一个参数)做统计,每1秒相同参数值的请求数不能超过5
在热点参数限流的高级选项中,可以对部分参数设置例外配置:
[Pasted image 20231031173349.png]
结合上一个配置,这里的含义是对0号的long类型参数限流,每1秒相同参数额的QPS不能超过5,有两个例外:

  • 如果参数值是100,则每1秒允许的QPS为10
  • 如果参数值是101,则每1秒允许的QPS为15

隔离和降级

虽然限流可以尽量避免因高并发而引起的服务故障,但服务还会因为其他原因而故障。而要将这些故障控制在一定范围,避免雪崩,就要靠线程隔离(舱壁模式)和熔断降级手段了
不管是线程隔离还是熔断降级,都是对客户端(调用方) 的保护
[Pasted image 20231101083846.png]
[Pasted image 20231101083941.png]

Feign整合Sentinel

SpringCloud中,微服务调用都是通过Feign来实现的,因此做客户端保护必须整合Feign和Sentinel

  1. 修改OrderService的application.yml文件,开启Feign的Sentinel功能
    1
    2
    3
    feign:
    sentinel:
    enabled: true # 开启Feign的Sentinel功能
  2. 给FeignClient编写失败后的降级逻辑
    1. 方式一:FallbackClass,无法对远程调用的异常做处理
    2. 方式二:FallbackFactory,可以对远程调用的异常做处理,我们选择这种
      步骤一:在feign-api项目中定义类,实现FallbackFactory:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      @Slf4j
      public class UserClientFallbackFactory implements FallbackFactory<UserClient>{
      @Override
      public UserClient create(Throwable throwable){
      //创建UserClient接口实现类,实现其中的方法,编写失败降级的处理逻辑
      return new UserClient(){
      @Override
      public User findById(Long id){
      //记录异常信息
      long.error("查询用户失败",throwable);
      //根据业务需求返回默认的数据,这里是空用户
      return new User();
      }
      }
      }
      }
      步骤二:在feign-api项目中的DefaultFeignConfiguration类中将UserClientFallbackFactory注册为一个Bean:
      1
      2
      3
      4
      @Bean
      public UserClientFallbackFactory userClientFallback(){
      return new UserClientFallbackFactory();
      }
      步骤三:在feign-api项目中的UserClient接口中使用UserClientFallbackFactory:
      1
      2
      3
      4
      5
      @FeignClient(value = "userservice",fallbackFactory = UserClientFallbackFactory.class)
      public interface UserClient{
      @GetMapping("/user/{id}")
      User findById(@PathVariable("id") Long id);
      }

线程隔离

线程隔离有两种方式实现:

  • 线程池隔离
  • 信号量隔离(Sentinel默认采用)
    [Pasted image 20231101092513.png]
    [Pasted image 20231101092916.png]

线程隔离(舱壁模式)

在添加限流规则时,可以选择两种阈值类型:
[Pasted image 20231101093034.png]

  • QPS:就是每秒的请求数,在快速入门已经演示过
  • 线程数:是该资源能使用用的tomcat线程数的最大数。也就是通过限制线程数量,实现舱壁模式

熔断降级

熔断降级是解决雪崩问题的重要手段。其思路是由断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。即拦截访问呢该服务的一切请求;而当服务恢复时,断路器会放行访问该服务的请求

熔断降级-慢调用

断路器熔断策略有三种:慢调用、异常比例、异常数

  • 慢调用:业务的响应时长(RT)大于指定时长的请求认定为慢调用请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断。例如:
    [Pasted image 20231101094238.png]
    解读:RT超过500ms的调用是慢调用,统计最近10000ms内的请求,如果请求量超过10次,并且慢调用比例不低于0.5,则触发熔断,熔断时长为5秒。然后进入half-open状态,方形一次请求做测试

熔断策略-异常比例、异常数

断路器熔断策略有三种:慢调用、异常比例或异常数

  • 异常比例或异常数:统计指定时间内的调用,如果调用次数超过指定请求数,并且出现异常的比例达到设定的比例阈值(或超过指定异常数),则触发熔断。例如:
    [Pasted image 20231101095223.png]
    [Pasted image 20231101095436.png]
    解读:统计最近1000ms内的请求,如果请求量超过10次,并且异常比例不低于0.5,则触发熔断,熔断时长为5秒。然后进入half-open状态,放行一次请求做测试。

授权规则

授权规则

授权规则可以对调用方的来源做控制,有白名单和黑名单两种方式

  • 白名单:来源(origin)在白名单内的调用者允许访问
  • 黑名单:来源(origin)在黑名单内的调用者不允许访问
    [Pasted image 20231101100507.png]
    例如,我们限定只允许从网关来的请求访问order-service,那么流控应用中就填写网关的名称
    [Pasted image 20231101100605.png]
    Sentinel是通过RequestOriginParser这个接口的parseOrigin来获取请求的来源的
    1
    2
    3
    4
    5
    6
    public interface RequestOriginParser{
    /**
    * 从请求request对象中获取origin,获取方式自定义
    */
    String parseOrigin(HttpServletRequest request);
    }
    例如,我们尝试从request中获取一个名为origin的请求头,作为origin的值:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class HeaderOriginParser implements RequestOriginParser{
    @Override
    public String parseOrigin(HttpServletRequest request){
    String origin = request.getHeader("origin");
    if(StringUtils.isEmpty(origin)){
    return "blank";
    }
    return origin;
    }
    }
    我们还需要在gateway服务中,利用网关的过滤器添加名为gateway的origin头:
    1
    2
    3
    4
    5
    spring:
    cloud:
    gateway:
    default-filters:
    - AddRequestHeader=origin,gateway # 添加名为origin的请求头,值为gateway
    给/order/{orderId}配置授权规则:
    [Pasted image 20231101102129.png]

自定义异常结果

默认情况下,发生限流、降级、授权拦截时,都会抛出异常到调用方。如果要自定义异常时的返回结果,需要实现BlockExceptionHandler接口:

1
2
3
4
5
6
public interface BlockExceptionHandler{
/**
* 处理请求被限流、降级,授权拦截时抛出的异常:BlockException
*/
void handle(HttpServletRequest request,HttpServletResponse response,BlockException e)throws Exception;
}

而BlockException包含很多个子类,分别对应不同的场景:
[Pasted image 20231101105753.png]
我们在order-service中定义类,实现BlockExceptionHandler接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component  
public class SentinelExceptionHandler implements BlockExceptionHandler {
@Override public void handle(HttpServletRequest request, HttpServletResponse response, BlockException e) throws Exception {
String msg = "未知异常";
int status = 429;

if (e instanceof FlowException) {
msg = "请求被限流了";
} else if (e instanceof ParamFlowException) {
msg = "请求被热点参数限流";
} else if (e instanceof DegradeException) {
msg = "请求被降级了";
} else if (e instanceof AuthorityException) {
msg = "没有权限访问";
status = 401;
}

response.setContentType("application/json;charset=utf-8");
response.setStatus(status);
response.getWriter().println("{\"msg\": " + msg + ", \"status\": " + status + "}");
}
}

规则持久化

规则管理模式

Sentinel的控制台规则管理有三种模式:

  • 原始模式:Sentinel的默认模式,将规则保存在内存,重启服务会丢失
  • pull模式
  • push模式

规则管理模式-pull模式

pull模式:控制台将配置的规则推送到Sentinel客户端,而客户端会将配置规则保存在本地文件或数据库中。以后会定时去本地文件或数据库中查询,更新本地规则
[Pasted image 20231101140540.png]

规则管理模式-push模式

push模式:控制台将配置规则推送到远程配置中心,例如Nacos。Sentinel客户端监听Nacos,获取配置变更的推送消息,完成本地配置更新
[Pasted image 20231101140918.png]

实现push模式

push模式实现最为复杂,依赖于nacos,并且需要修改Sentinel控制台源码
详细步骤可以参考课前资料的[[sentinel规则持久化]]

分布式事务

事务的ACID原则

[Pasted image 20231101142537.png]

分布式服务案例

微服务下单业务,在下单时会调用订单服务,创建订单并写入数据库。然后订单服务调用账户服务和库存服务:

  • 账户服务负责扣减用户余额
  • 库存服务负责扣减商品库存
    [Pasted image 20231101142834.png]

分布式服务的事务问题

在分布式系统下,一个业务跨越多个服务或数据源,每个服务都是一个分支事务,要保证所有分支事务最终状态一直,这样的事务就是分布式事务
[Pasted image 20231101145459.png]

分布式服务理论基础

CAP定理

1998年,加州大学的计算机科学家Eric Brewer提出,分布式系统有三个指标:

  • Consistency(一致性)
  • Availability(可用性)
  • Partition tolerance(分区容错性)
    Eric Brewer说,分布式系统无法同时满足这三个指标。
    这个结论叫做CAP定理
    [Pasted image 20231101151431.png]

CAP定理-Consistency

Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致
[Pasted image 20231101151640.png]

CAP定理-Availability

Availability(可用性):用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝
[Pasted image 20231101151949.png]

CAP定理-Partition tolerance

Partition(分区):因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接,形成独立分区
Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务
[Pasted image 20231101152256.png]

CAP总结

简述CAP定理内容?

  • 分布式系统节点通过网络连接,一定会出现分区问题(P)
  • 当分区出现时,系统的一致性(C)和可用性(A)就无法同时满足
    思考:elasticsearch集群是CP还是AP?
  • ES集群出现分区时,故障节点会被剔除集群,数据分片会重新分配到其他节点,保证数据一致。因此是低可用性,高一致性,属于CP

BASE理论

BASE理论是对CAP的一种解决思路,包含三个思想:

  • Basically Avaliable(基本可用):分布式系统在出现故障时,允许损失部分可用性,既保证核心可用
  • Soft State(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态
  • Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致
    而分布式事务最大的问题是各个子事务的一致性问题,因此可以借鉴CAP定理和BASE理论:
  • AP模式:各子事务分别执行和提交,允许出现结果不一致,然后采用弥补措施恢复数据即可,实现最终一致
  • CP模式:各个子事务执行后互相等待,同时提交,同时回滚,达成强一致。但事务等待过程中,处于弱可用状态

分布式事务模型

解决分布式事务,各个子系统之间必须能感知到彼此的事务状态,才能保证状态一致,因此需要一个事务协调者来协调每一个事务的参与者(子系统事务)
这里的子系统事务,称为分支事务;有关联的各个分支事务在一起称为全局事务
[Pasted image 20231101161538.png]

初识Seata

Seata是2019年1月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案。致力于提供高性能和简单易用的分布式事务服务,为用户打造一站式的分布式解决方案
官网地址:(http://seata.io/),其中的文档、播客中提供了大量的使用说明、源码分析

Seata架构

Seata事务管理中有三个重要的角色:

  • TC(Transaction Coordinator)- 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚
  • TM(Transaction Manager)- 事务管理者:定义全局事务的范围、开始全局事务、提交或回滚全局事务
  • RM(Resource Manager)- 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚
    ![[Pasted image 20231102083542.png]]

初识Seata

Seata提供了四种不同的分布式事务解决方案:

  • XA模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
  • TCC模式:最终一直的分阶段事务模式,有业务侵入
  • AT模式:最终一直的分阶段事务模式,无业务侵入,也是Seata的默认模式
  • SAGA模式:长事务模式,有业务侵入

部署TC服务

参考课前资料提供的文档[[seata的部署和集成]]

微服务集成Seata

  1. 首先,引入seata相关依赖:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <!--seata-->  
    <dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions> <!--版本较低,1.3.0,因此排除-->
    <exclusion>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    </exclusion>
    </exclusions>
    </dependency>
    <!--seata starter 采用1.4.2版本-->
    <dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>${seata.version}</version>
    </dependency>
  2. 然后,配置application.yml,让微服务通过注册中心找到seata-tc-server:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    seata:  
    registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址
    # 参考tc服务自己的registry.conf中的配置
    # 包括:地址、namespace、group、application-name、cluster
    type: nacos
    nacos: # tc
    server-addr: 127.0.0.1:8848
    namespace: ""
    group: DEFAULT_GROUP
    application: seata-tc-server # tc服务在nacos中的服务名称
    username: nacos
    password: nacos
    tx-service-group: seata-demo # 事务组,根据这个获取tc服务的cluster名称
    service:
    vgroup-mapping: # 事务组与TC服务cluster的映射关系
    seata-demo: default

动手实践

XA模式原理

XA规范是 x/Open组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,XA规范描述了全局的TM与局部的RM之前的接口,几乎所有主流的数据库都对XA规范提供了支持
[Pasted image 20231102100552.png]
[Pasted image 20231102100635.png]

seata的XA模式

seata的XA模式做了一些调整,但大体相似:
RM一阶段的工作:

  1. 注册分支事务到TC
  2. 执行分支事务sql但不提交
  3. 报告执行状态到TC
    TC二阶段的工作:
  • TC检测各分支事务执行状态
    • 如果都失败,通知所有RM提交事务
    • 如果有失败,通知所有RM回滚事务
      RM二阶段的工作:
  • 接收TC指令,提交或回滚事务
    ![[Pasted image 20231102101010.png]]

实现XA模式

Seata的starter已经完成了XA模式的自动装配,实现非常简单,步骤如下:

  1. 修改application.yml文件(每个参与事务的微服务),开启XA模式:
    1
    2
    seata:
    data-source-proxy-mode: XA # 开启数据源代理的XA模式
  2. 给发起全局事务的入口方法添加@GlobalTransactional注解,本例中是OrderServiceImpl中的create方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Override
    @GlobalTransactional
    public Long create(Order order){
    //创建订单
    orderMapper.insert(order);
    //扣余额 ...略
    //扣减库存 ...略
    return order.getId();
    }
  3. 重启服务并测试

AT模式原理

AT模式同样是分阶段提交的事务模型,不过缺弥补了XA模型中资源锁定周期过长的缺陷
阶段一RM的工作:

  • 记录分支事务
  • 记录undo-log(数据快照)
  • 执行业务sql并提交
  • 报告事务状态
    阶段二提交时RM的工作:
  • 删除undo-log即可
    阶段二回滚时RM的工作:
  • 根据undo-log恢复数据到更新前
    [Pasted image 20231102103856.png]
    例如,一个分支业务的SQL是这样的:update tb_account set money = money - 10 where id = 1
    [Pasted image 20231102104301.png]
    总结:
    简述AT模式与XA模式最大的区别是什么?
  • XA模式一阶段不提交事务,锁定资源;AT模式一阶段直接提交,不锁定资源
  • XA模式以来数据库机制实现回滚;AT模式利用数据快照实现数据回滚
  • XA模式情义值;AT模式最终一致

AT模式的脏写问题

[Pasted image 20231102105008.png]

AT模式的写隔离

[Pasted image 20231102105313.png]
[Pasted image 20231102105655.png]

AT模式的优缺点

AT模式的优点:

  • 一阶段完成直接提交事务,释放数据库资源,性能比较好
  • 利用全局锁实现读写隔离
  • 没有代码侵入,框架自动完成回滚和提交
    AT模式的缺点:
  • 两阶段之间属于软状态,属于最终一致
  • 框架的快照功能回影响性能,但比XA模式要好很多

实现AT模式

AT模式中的快照生成、回滚等动作都是由框架自动完成,没有任何代码侵入,因此实现非常简单

  1. 导入课前资料提供的Sql文件:seata-at.sql,其中lock_table导入到TC服务关联的数据库,undo_log表导入到微服务关联的数据库:
  2. 修改application.yml文件,将事务模式修改为AT模式即可:
    1
    2
    seata:
    data-source-proxy-mode: AT # 开启数据源代理的AT模式
  3. 重启服务并测试

TCC模式原理

TCC模式与AT模式非常相似,每阶段都是独立事务,不同的是TCC通过人工编码来实现数据恢复。需要实现三个方法:

  • Try:资源的检测和预留;
  • Confirm:完成资源操作业务;要求Try成功Confirm一定要能成功
  • Cancel:预留资源释放,可以理解为try的反向操作
    举例,一个扣减用户余额的业务。假设账户A原来余额是100,需求余额扣减30元
  • 阶段一(Try):检查余额是否充足,如果充足则冻结金额增加30元,可用余额扣除30
    [Pasted image 20231102112601.png]
  • 阶段二:加入要提交(Confirm),则冻结金额扣减30
    [Pasted image 20231102112612.png]
  • 阶段二:如果要回滚(Cancel),则冻结金额扣减30,可用余额增加30
    [Pasted image 20231102112619.png]
    TCC的工作模型图:
    [Pasted image 20231102112850.png]
    总结:
    TCC模式的每个阶段是做什么的?
  • Try:资源检查和预留
  • Confirm:业务执行和提交
  • Cancel:预留资源的释放
    TCC的优点是什么?
  • 一阶段完成直接提交事务,释放数据库资源,性能好
  • 相比AT模型,无需生成快照,无需使用全局锁,性能最强
  • 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库
    TCC的缺点是什么?
  • 有代码侵入,需要人为编写try、Confirm和Cancel接口,太麻烦
  • 软状态,事务是最终一致
  • 需要考虑Confirm和Cancel的失败情况,做好幂等处理

TCC的空回滚和业务悬挂

当某分支事务的try阶段阻塞时,可能导致全局事务而触发二阶段的cancel操作。在未执行try操作时先执行了cancel操作,这时cancel不能做回滚,就是空回滚。
[Pasted image 20231102114713.png]
对于已经空回滚的业务,如果以后继续执行try,就永远不可能confirm或cancel,这就是业务悬挂。应当阻止执行空回滚后的try操作,避免悬挂

业务分析

为了实现空回滚、防止业务悬挂,以及幂等性要求。我们必须在数据库记录冻结金额的同时,记录当前事务id和执行状态,为此我们设计了一张表:

1
2
3
4
5
6
7
CREATE TABLE 'account_freeze_tbl'(
'xid' varchar(128) NOT NULL,
'user_id'varchar(255) DEFAULT NULL COMMENT '用户id',
'freeze_money' int(11) unsigned DEFAULT '0' COMMENT '冻结金额',
'state' int(1) DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
PRIMARY KEY('xid') USING BTREE
)ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;

Try业务:

  • 记录冻结金额和事务状态到account_freeze表
  • 扣减account表可用金融
    Confirm业务:
  • 根据xid删除account_freeze表的冻结记录
    Cancel业务:
  • 修改account_freeze表,冻结金额为0,state为2
  • 修改account表,恢复可用金额
    如何判断是否空回滚:
  • cancel业务中,根据xid查询account_freeze,如果为null则说明try还没做,需要空回滚
    如何避免业务悬挂:
  • try业务中,根据xid查询account_freeze,如果已经存在则证明Cancel已经执行,拒绝执行try业务

声明TCC接口

TCC的Try、Confirm、Cancel方法都需要在接口中基于注解来声明,语法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@LocalTCC
public interface TCCService{
/**
* Try逻辑,@TwoPhaseBusinessAction中的name属性要与当前方法名一致,用于指定Try逻辑对应的方法
* @param userId 用户id
* @param money 金额
*/
@TwoPhaseBusinessAction(name = "deduct",commitMethod = "confirm",rollbackMethod = "cancel")
void deduct(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money") int money);

/**
* 二阶段confirm确认方法,可以另命名,但要保证与commitMethod一致
* @param context 上下文,可以传递try方法的参数
* @return boolean 执行是否成功
*/
boolean confirm(BusinessActionContext context);

/**
* 二阶段回滚方法,要保证与rollbackMethod一致
* @param context 上下文,可以传递try方法的参数
* @return boolean 执行是否成功
*/
boolean cancel(BusinessActionContext context);
}

Saga模式

Saga模式是SEATA提供的长事务解决方案。也分为两个阶段:

  • 一阶段:直接提交本地事务
  • 二阶段:成功则什么都不做;失败则通过编写补偿业务来回滚
    [Pasted image 20231102151514.png]
    Saga模式优点:
  • 事务参与者可以基于事件驱动实现异步调用,吞吐高
  • 一阶段直接提交事务,无锁,性能好
  • 不用编写TCC中的三个阶段,实现简单
    缺点:
  • 软状态持续时间不确定,时效性差
  • 没有锁,没有事务隔离,会有脏写

四种模式对比

[Pasted image 20231102152039.png]

高可用

TC的异地多机房容灾架构

TC服务作为Seata的核心服务,一定要保证高可用和异地容灾。
[Pasted image 20231102152905.png]
具体实现请参考[seata的部署和集成]

Redis持久化

RDB

RDB全称Redis Database Backup file(Redis数据备份文件),也被叫做Redis数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当Redis实例故障重启后,从磁盘读取快照文件,恢复数据。
快照文件被称为RDB文件,默认是保存在当前运行目录。
[Pasted image 20231103090039.png]
Redis停机时会执行一次RDB。
首先需要在Linux系统中安装一个Redis,如果尚未安装的同学,可以参考[[Redis集群]]
Redis内部有触发RDB的机制,可以在redis.conf文件中找到,格式如下:
[Pasted image 20231103093930.png]
RDB的其他配置也可以在redis.conf文件中设置:
[Pasted image 20231103094032.png]
bgsave开始时会fork主进程得到子进程,子进程共享主进程的内存数据.完成fork后读取内存数据并写入RDB文件。
fork采用的是copy-on-write技术:

  • 当主进程执行读操作时,访问共享内存;
  • 当主进程执行写操作时,则会拷贝一份数据,执行写操作
    ![[Pasted image 20231103102410.png]]
    总结:
    RDB方式bgsave的基本流程?
  • fork主进程得到一个子进程,共享内存空间
  • 子进程读取内存数据并写入新的RDB文件
  • 用新RDB文件替换旧的RDB文件
    RDB会在什么时候执行?save 60 1000代表什么含义?
  • 默认是服务停止时。
  • 代表60秒内至少执行1000次修改则触发RDB
    RDB的缺点?
  • RDB执行间隔时间长,两次RDB之间写入数据有丢失的风险
  • fork子进程、压缩、写出RED文件都比较耗时

AOF

AOF全称为Append Only File(追加文件)。Redis处理的每一个写命令都会记录在AOF文件,可以看做是命令日志文件。
[Pasted image 20231103104057.png]
AOF默认是关闭的,需要修改redis.conf配置文件来开启AOF:
[Pasted image 20231103104422.png]
AOP的命令记录的频率也可以通过redis.conf文件来配:
[Pasted image 20231103104526.png]
[Pasted image 20231103105212.png]
因为是记录命令,AOF文件会比RDB文件大的多。而且AOF会记录会对同一个key的多次写操作,但只有最后一次写操作才有意义。通过执行bgrewriteaof命令,可以让AOF文件执行重写功能,用最少的命令达到相同效果
[Pasted image 20231103113103.png]
Redis也会在触发阈值时自动去重写AOF文件。阈值也可以在redis.conf中配置:
[Pasted image 20231103113204.png]
RDB和AOF各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用
[Pasted image 20231103113936.png]

Redis主从

搭建主从架构

单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离
[Pasted image 20231103115733.png]
具体搭建流程参考[[Redis集群]]

数据同步原理

主从第一次同步是全量同步:
[Pasted image 20231103154205.png]
master如何判断slave是不是第一次来同步数据?这里会用到两个很重要的概念:

  • Replication id:简称replid,是数据集的标记,id一直则说明是同一数据集。每一个master都有唯一的replid,slave则会继承master节点的replid
  • offset:偏移量,随着记录在repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的offset。如果slave的offset小于master的offset,说明slave数据落后于master,需要更新
    [Pasted image 20231103162029.png]
    主从第一次同步是全量同步,但如果slave重启后同步,则执行增量同步
    [Pasted image 20231103164254.png]
    [Pasted image 20231103164317.png]
    可以从以下几个方面来优化Redis主从就集群:
  • 在master中配置repl-diskless-sync yes启用无磁盘复制,避免全量同步时的磁盘IO
  • Redis单节点上的内存占用不要太大,减少RDB导致的过都磁盘IO
  • 适当提高repl_backlog的大小,发现slave宕机时尽快实现故障恢复,尽可能避免全量同步
  • 限制一个master上的slave节点数量,如果实在是太多slave,则可以采用主-从-从链式结构,减少master压力
    [Pasted image 20231103170314.png]

Redis哨兵

哨兵的作用

Redis提供了哨兵(Sentinel)机制来实现主从集群的自动故障恢复。哨兵的结构和作用如下:

  • 监控:Sentinel会不断检查你的master和slave是否按预期工作
  • 自动故障恢复:如果master故障,Sentinel会将一个slave提升为master。当故障实例恢复后也以新的master为主
  • 通知:Sentinel充当Redis客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给Redis的客户端
    [Pasted image 20231104083630.png]

服务状态监控

Sentinel基于心跳机制检测服务状态,每隔1秒向集群的每个实例发送ping命令:

  • 主观下线:如果某sentinel节点发现某实例未在规定时间响应,则认为该实例主观下线
  • 客观下线:若超过指定数量(quorum)的sentinel都认为该实例主观下线,则该实例客观下线。quorum值最好超过Sentinel实例数量的一半
    [Pasted image 20231104084244.png]

选举新的master

一旦发现master故障,sentinel需要在salve中选择一个作为新的master,选择依据是这样的:

  • 首先会判断salve节点与master节点断开时间长短,如果超过指定值(down-after-milliseconds*10)则会排除该slave节点
  • 然后判断slave节点的slave-priority,越小优先级越高,如果是0则永不参与选举
  • 如果slave-prority一样,则判断slave节点的offset值,越大说明数据越新,优先级越高
  • 最后是判断slave节点的运行id大小,越小优先级越高

如何实现故障转移

当选中了其中一个slave为新的master后(例如slave1),故障的转移的步骤如下:

  • sentinel给备选的slave1节点发送slaveof no one命令,让该节点成为master
  • sentinel给所有其它slave发送slaveof 192.168.140.131 7002命令,让这些slave成为新master的从节点,开始从新的master上同步数据
  • 最后,sentinel将故障节点标记为slave,当故障节点恢复后会自动成为新的master的slave节点
    [Pasted image 20231104085831.png]

搭建哨兵架构

具体搭建流程参考[Redis集群]

RedisTemplate的哨兵模式

在Sentinel集群监管下的Redis主从集群,其节点会因为自动故障转移而发生变化,Redis的客户端必须感知这种变化,及时更新连接信息。Spring的RedisTemplate底层利用lettuce实现了节点的感知和自动切换
首先,引入课前资料提供的Demo工程:redis-demo

  1. 在pom文件中引入redis的starter依赖:
    1
    2
    3
    4
    <dependency>  
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
  2. 然后在配置文件application.yml中指定sentinel相关信息:
    1
    2
    3
    4
    5
    6
    7
    8
    spring:  
    redis:
    sentinel:
    master: mymaster # 指定master名称
    nodes: # 指定redis-sentinel集群信息
    - 192.168.140.131:27001
    - 192.168.140.131:27002
    - 192.168.140.131:27003
  3. 配置主从读写分离
    1
    2
    3
    4
    @Bean  
    public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){
    return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
    }
    这里的ReadFrom是配置Redis的读取策略,是一个枚举,包括下面选择:
  • MASTER:从主节点读取
  • MASTER_PREFERRED:优先从master节点读取,master不可用才读取replica
  • REPLICA:从slave(replica)节点读取
  • REPLICA_PREFERRED:优先从slave(replica)节点读取,所有的slave都不可用才读取master

Redis分片集群

分片集群结构

主从和哨兵可以解决高可用、高并发读的问题。但是依然有两个问题没有解决:

  • 海量数据存储问题
  • 高并发写的问题
    使用分片集群可以解决上述问题,分片集群特征:
  • 集群中有多个master,每个master保存不同数据
  • 每个master都可以有多个slave节点
  • master之间通过ping监测彼此健康状态
  • 客户端请求可以访问集群任意节点,最终都会被转发到正确节点
    [Pasted image 20231104101128.png]

搭建分片集群

具体搭建流程参考[Redis集群]

散列插槽

Redis会把每一个master节点映射到0~16383共16384个插槽(hash slot)上,查看集群信息时就能看到:
[Pasted image 20231104102453.png]

数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,分两种情况:

  • key中包含”{}”,且”{}”中至少包含1个字符,”{}”中的部分是有效部分
  • key中不包含”{}”,整个key都是有效部分
    例如:key是num,那么就根据num计算,如果是{itcast}num,则根据itcast计算。计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。
    [Pasted image 20231104103640.png]

添加一个节点到集群

redis-cli –cluster提供了很多操作集群的命令,可以通过下面方式查看:
[Pasted image 20231104104219.png]
比如,添加节点的命令:
[Pasted image 20231104104245.png]

故障转移

当集群中有一个master宕机会发生什么呢?

  1. 首先是该实例与其它实例失去连接
  2. 然后是疑似宕机
    [Pasted image 20231104141707.png]
  3. 最后是确定下线,自动提升一个slave为新的master:
    [Pasted image 20231104141756.png]

数据迁移

利用cluster failover命令可以手动让集群中的某个master宕机,切换到执行cluster failover命令的这个slave节点,实现无感知的数据迁移。其流程如下:
手动的Failover支持三种不同模式:

  • 缺省:默认的流程,如图1~6步
  • force:省略了对offset的一致性校验
  • takeover:直接执行第5步,忽略数据一致性、忽略master状态和其它master的意见

RedisTemplate访问分片集群

RedisTemplate底层同样基于lettuce实现了分片集群的支持,而使用的步骤与哨兵模式基本一致:

  1. 引入redis的starter依赖
  2. 配置分片集群地址
  3. 配置读写分离
    与哨兵模式相比,其中只有分片集群的配置方式略有差异,如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    spring:  
    redis:
    cluster:
    nodes: # 指定分片集群的每一个节点信息
    - 192.168.140.131:7001
    - 192.168.140.131:7002
    - 192.168.140.131:7003
    - 192.168.140.131:8001
    - 192.168.140.131:8002
    - 192.168.140.131:8003

多级缓存

传统缓存的问题

传统的缓存策略一般是请求到达Tomcat后,先查询Redis,如果未命中则查询数据库,存在下面的问题:

  • 请求要经过Tomcat处理,Tomcat的性能成为整个系统的瓶颈
  • Redis缓存失效时,会对数据库产生冲击
    [Pasted image 20231104145400.png]

多级缓存方案

多级缓存就是充分利用请求处理的每个环节,分贝添加缓存,减轻Tomcat压力,提升服务性能:
![[Pasted image 20231104145648.png]](Pasted image 20231104145648.png)
用作缓存的Nginx是业务Nginx,需要部署为集群,再有专门的Nginx用来做反向代理:
[Pasted image 20231104150742.png]

JVM进程缓存

导入商品案例

参考课前资料提供的文档来导入案例:[[案例导入说明]]

本地进程缓存

缓存在日常开发中启动至关重要的作用,由于是存储在内存中,数据的读取速度是非常快的,能大量减少对数据库的访问,减少数据库的压力。我们把缓存分为两类:

  • 分布式缓存,例如Redis:
    • 优点:存储容量更大、可靠性更好、可以在集群间共享
    • 缺点:访问缓存有网络开销
    • 场景:缓存数据量较大、可靠性要求较高、需要在集群间共享
  • 进程本地缓存,例如HashMap、GuavaCache:
    • 优点:读取本地内存,没有网络开销,速度更快
    • 缺点:存储容量有限、可靠性较低、无法共享
    • 场景:性能要求较高,缓存数据量较小
      Caffeine是一个基于Java8开发的,提供了近乎最佳命中率的高性能的本地缓存库。目前Spring内部的缓存使用的就是Caffrine。GitHub地址:(https://github.com/ben-manes/caffrine)
      [Pasted image 20231104163349.png]

Caffeine示例

可以通过item-service项目中的单元测试来学习Caffeine的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Test
void testBasicOps(){
//创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder().build();
//存数据
cache.put("gf""迪丽热巴");
//取数据,不存在则返回null
String gf = cache.getIfPresent("gf");
System.out.println("gf=" + gf);
//取数据,不存在则去数据库查询
String defaultGF = cache.get("defaultGF",key -> {
//这里可以去数据库根据 key查询value
return "柳岩";
});
System.out.println("defaultGF =" + defaultGF);
}

Caffeine提供了三种缓存驱逐策略:

  • 基于容量:设置缓存的数量上限
    1
    2
    3
    4
    //创建混存对象
    Cache<String, String> cache = Caffeine.newBuilder()
    .maximumSize(1) // 设置缓存大小上限为1
    .build();
  • 基于时间:设置缓存的有效时间
    1
    2
    3
    4
    5
    //创建缓存对象
    Cache<String, String> cache = Caffeine.newBuilder()
    //设置缓存有效期为 10 秒,从最后一次写入开始计时
    .expireAfterWrite(Duration.ofSeconds(10))
    .builder();
  • 基于引用:设置缓存为软引用或弱引用,利用GC来回收缓存数据。性能较差,不建议使用。
    在默认情况下,当一个缓存元素过期的时候。Caffeine不会自动立即将其清理和驱逐。而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐

Lua语法入门

初始Lua

Lua是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放,其设计目的就是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网:(https://www.lua.org/)
[Pasted image 20231105084319.png]

HelloWorld

  1. 在linux虚拟机的任意目录下,新建一个hello.lua文件
    1
    touch hello.lua
  2. 添加下面的内容
    1
    print("Hello World")
  3. 运行
    1
    lua hello.lua

数据类型

[Pasted image 20231105085132.png]
可以利用type函数测试给定变量或者值的类型:

1
print(type("Hello World"))

变量

Lua声明变量的时候,并不需要指定数据类型:

1
2
3
4
5
6
7
8
9
10
-- 声明字符串
local str = "hello"
-- 声明数字
local num = 21
-- 声明布尔类型
local flag = true
-- 声明数组 key为索引的 table
local arr = {'java','python','lua'}
-- 声明table,类似java的map
local map = {name='Jack',age=21}

访问table:

1
2
3
4
5
-- 访问数组,lua数组的角标从1开始
print(arr[1])
-- 访问table
print(map['name'])
print(map.name)

循环

数组、table都可以利用for循环来遍历:

  • 遍历数组:
    1
    2
    3
    4
    5
    6
    -- 声明数组 key为索引的 table
    local arr = {'java','python','lua'}
    -- 遍历数据
    for index,value in ipairs(arr) do
    print(index,value)
    end
  • 遍历table:
    1
    2
    3
    4
    5
    6
    -- 声明map,也就是table
    local map = {name='Jack',age=21}
    -- 遍历table
    for key,value in pairs(map) do
    print(key,value)
    end

函数

定义函数的语法:

1
2
3
4
function 函数名(argument1, argument2..., argumentn)
-- 函数体
return 返回值
end

例如,定义一个函数,用来打印数组:

1
2
3
4
5
function printArr(arr)
for index, value in ipairs(arr) do
print(value)
end
end

条件控制

类似Java的条件控制,例如if、else语法:

1
2
3
4
5
6
if(布尔表达式)
then
-- [布尔表达式为 true 时执行该语句块]
else
-- [布尔表达式为 false 时执行该语句块]
end

与java不同,布尔表达式中的逻辑运算是基于英文单词:
[Pasted image 20231105092924.png]

多级缓存

初识OpenResty

OpenResty是一个基于Nginx的高性能Web平台,用来方便地搭建能够处理超高并发、扩展性极高的动态Web应用、Web服务和动态网关。具备下列特点:

  • 具备Nginx的完整功能
  • 基于Lua语言进行扩展,集成了大量精良的Lua库、第三方模块
  • 允许Lua自定义业务逻辑、自定义库
    官方网站:(https://openresty.org/cn/)
    [Pasted image 20231105093633.png]
    安装OpenResty可以参考[[安装OpenResty]]

案例:OpenResty快速入门,实现商品详情页数据查询

商品详情页面目前展示的是假数据,在浏览器的控制台可以看到查询商品信息的请求:
[Pasted image 20231105095257.png]
而这个请求最终被反向代理到虚拟机的OpenResty集群:
[Pasted image 20231105095334.png]
需求:在OpenResty中接收这个请求,并返回一段商品的假数据。

步骤一:修改nginx.conf文件

  1. 在nginx.conf的http下面,添加对OpenResty的Lua模块的加载:
    1
    2
    3
    4
    # 加载lua 模块
    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    # 加载c模块
    lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
  2. 在nginx.conf的server下面,添加对/api/item这个路径的监听:
    1
    2
    3
    4
    5
    location /api/item
    # 响应类型,这里返回json
    default_type application/json;
    # 响应数据由 lua/item.lua这个文件来决定
    content_by_lua_file lua/item.lua;

步骤二:编写item.lua文件

  1. 在nginx目录创建文件夹:lua
    [Pasted image 20231105100905.png]
  2. 在lua文件夹下,新建文件:item.lua
    [Pasted image 20231105100934.png]
  3. 内容如下:
    1
    2
    -- 返回假数据,这里的ngx.say()函数,就是写数据到Response中
    ngx.say('{"id":10001,"name":"SALSA AIR"}')
  4. 重新加载配置
    1
    nginx -s reload

OpenResty获取请求参数

OpenResty提供了各种API用来获取不同类型的请求参数:
[Pasted image 20231105102151.png]

多级缓存需求

[Pasted image 20231105103009.png]

nginx内部发送Http请求

nginx提供了内部API用以发送http请求:

1
2
3
4
5
local resp = ngx.localtion.capture("/path",{
method = ngx.HTTP_GET, -- 请求方式
args = {a=1,b=2}, -- get方式传参数
body = "c=3&d=4" -- post方式传参数
})

返回的响应内容包括:

  • resp.status:响应状态码
  • resp.header:响应头,是一个table
  • resp.body:响应体,就是响应数据
    注意:这里的path是路径,并不包含IP和端口。这个请求会被nginx内部的server监听并处理。
    但是我们希望这个请求发送到Tomcat服务器,所以还需要编写一个server来对这个路径做反向代理:
    1
    2
    3
    4
    location /path{
    # 这里是windows电脑的ip和Java服务端口,需要确保windows防火墙处于关闭状态
    proxy_pass http://192.168.111.17:8081;
    }

封装http查询的函数

我们可以把http查询的请求封装为一个函数,放到OpenResty函数库中,方便后期使用

  1. 在/usr/local/openresty/lualib目录下创建common.lua文件:
    1
    vi /usr/local/openresty/lualib/common.lua
  2. 在common.lua中封装http查询的函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    -- 封装函数,发送http请求,并解析响应
    local function read_http(path, params)
    local resp = ngx.location.capture(path,{
    method = ngx.HTTP_GET,
    args = params,
    })
    if not resp then
    -- 记录错误信息,返回404
    ngx.log(ngx.ERR,"http not found,path:", path , ",args:", args)
    ngx.exit(404)
    end
    return resp.body
    end
    -- 将方法导出
    local _M = {
    read_http = read_http
    }
    return _M

JSON结果处理

OpenResty提供了一个cjson的模块用来处理JSON的序列化和反序列化
官方地址:(https://github.com/openresty/lua-cjson)

  • 引入cjson模块:
    1
    local cjson = require "cjson"
  • 序列化
    1
    2
    3
    4
    5
    local obj = {
    name = 'jack',
    age = 21
    }
    local json = cjson.encode(obj)
  • 反序列化
    1
    2
    3
    4
    local json = '{"name":"jack","age":21}'
    -- 反序列化
    local obj = cjson.decode(json);
    print(obj.name)

Tomcat集群的负载均衡

[Pasted image 20231105140924.png]

添加redis缓存的需求

[Pasted image 20231105141318.png]

冷启动与缓存预热

冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品数据都在第一次查询时添加缓存,可能会给数据库带来较大压力
缓存预热:在实际开发中,我们可以利用大数据统计用户访问的热点数据,在项目启动时将这些热点数据提前查询并保存到Redis中

我们数据量较少,可以在启动时将所有数据都放入缓存中

缓存预热

  1. 利用Docker安装Redis
    1
    docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes
  2. 在item-service服务中引入Redis依赖
    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
  3. 配置Redis地址
    1
    2
    3
    spring:
    redis:
    host: 192.168.140.131
  4. 编写初始化类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    public class RedisHandler implements InitializingBean {
    @Autowired
    private StringRedisTemplate redisTemplate;
    @Override
    public void afterPropertiesSet() throws Exception {
    // 初始化缓存 ...
    }
    }

OpenResty的Redis模块

OpenResty提供了操作Redis的模块,我们只要引入该模块就能直接使用:

  • 引入Redis模块,并初始化Redis对象
    1
    2
    3
    4
    5
    6
    -- 引入redis模块
    local redis = require("resty.redis")
    -- 初始化Redis对象
    local red = redis::new()
    -- 设置Redis超时时间
    red:set_timeouts(1000, 1000, 1000)
  • 封装函数,用来释放Redis连接,其实是放入连接池
    1
    2
    3
    4
    5
    6
    7
    8
    9
    -- 关闭redis连接的工具方法,其实是放入连接池
    local function close_redis(red)
    local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒
    local pool_size = 100 --连接池大小
    local ok, err = red:set_keepalive(pool_max_idle_time,pool_size)
    if not ok then
    ngx.log(ngx.ERR,"放入Redis连接池失败:", err)
    end
    end
  • 封装函数,从Redis读数据并返回
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    -- 查询redis的方法 ip和port是redis地址,key是查询的key
    local function read_redis(ip, port, key)
    -- 获取一个连接
    local ok, err = red:connect(ip, port)
    if not ok then
    ngx.log(ngx.ERR,"连接redis失败:",err)
    return nil
    end
    -- 查询redis
    local resp,err = red:get(key)
    -- 查询失败处理
    if not resp then
    ngx.log(ngx.ERR,"查询Redis失败:",err,", key = ", key)
    end
    -- 得到的数据为空处理
    if resp == ngx.null then
    resp = nil
    ngx.log(ngx.ERR,"查询Redis数据为空,key = ", key)
    end
    close_redis(red)
    return resp
    end

ngxin本地缓存

OpenResty为Nginx提供了shard dict的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。

  • 开启共享字典,在nginx.conf的http下添加配置:
    1
    2
    # 共享字典,也就是本地缓存,名称叫做:item_cache,大小150m
    lua_shared_dict item_cache 150m;
  • 操作共享字典:
    1
    2
    3
    4
    5
    6
    -- 获取本地缓存对象
    local item_cache = ngx.shared.item_cache
    -- 存储,指定key、value、过期时间,单位s,默认为0代表永不过期
    item_cache:set('key','value',1000)
    -- 读取
    local val = item_cache:get('key')
    修改后的查询逻辑:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    -- 封装函数,先查询本地缓存,在查询redis,在查询http
    local function read_data(key, expire,path,params)
    -- 读取本地缓存
    local val = item_cache:get(key)
    if not val then
    -- 缓存未命中,记录日志
    ngx.log(ngx.ERR,"本地缓存查询失败,key: ",key,",尝试redis查询")
    -- 查询redis
    val = read_redis("127.0.0.1",6379,key)
    -- 判断redis是否命中
    if not val then
    ngx.log(ngx.ERR,"Redis缓存查询失败,key",key,",尝试http查询")
    -- Redis查询失败,查询http
    val = read_http(path, params)
    end
    end
    -- 写入本地缓存
    item_cache:set(key,val,expire)
    return val
    end
    -- 根据id查询商品
    local itemJSON = read_data('item:id' .. id,1800,"/item/".. id,nil)
    -- 根据id查询商品库存
    local itemStockJSON = read_data('item:stock:id' .. id,60,"/item/stock/".. id,nil)

缓存同步

缓存同步策略

缓存数据同步的常见方式有三种:

  • 设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新
    • 优势:简单、方便
    • 缺点:时效性差,缓存过期之前可能不一致
    • 场景:更新频率较低,时效性要求低的业务
  • 同步双写:在修改数据库的同时,直接修改缓存
    • 优势:时效性强,缓存与数据库强一致
    • 缺点:有代码侵入,耦合度高;
    • 场景:对一致性、时效性要求较高的缓存数据
  • 异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
    • 优势:低耦合,可以同时通知多个缓存服务
    • 缺点:时效性一般,可能存在中间不一致状态
    • 场景:时效性要求一般,有多个服务需要同步
      基于MQ的异步通知:
      [Pasted image 20231106093510.png]
      基于Canal的异步通知:
      [Pasted image 20231106093544.png]

初识Canal

Canal,译意为水管/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:(https://github.com/alibaba/canal)
Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:

  • MySQL master将数据变更写入二进制日志(binary log),其中记录的数据叫做binary log events
  • MySQL slave将master的binary log events拷贝到它的中继日志(relay log)
  • MySQL slave重放relay log中事件,将数据变更反映它自己的数据
    [Pasted image 20231106095841.png]
    Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
    [Pasted image 20231106100021.png]

安装和配置Canal

安装和配置Canal参考[安装Canal]

Canal客户端

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。
[Pasted image 20231106142924.png]
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。不过这里我们会使用GitHub上的第三方开源的canal-starter。地址:(https://github.com/NormanGyllenhaal/canal-client)
引入依赖:

1
2
3
4
5
6
<!-- canal -->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>

编写配置:

1
2
3
canal:
destination: heima #canal实例名称,要跟canal-server运行时设置的destionation一致
server: 192.168.140.131:11111 # canal地址

编写监听器,监听Canal消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.heima.item.canal;

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {
@Override
public void insert(Item item) {
//新增数据到redis
}
@Override
public void update(Item before,Item after){
// 更新redis数据
//更新本地缓存
}
@Override
public void delete(Item item){
//删除redis数据
//清理本地缓存
}
}

Canal推送给canal-client的是被修改的这一行数据(row),而我们引入的canal-client则会帮我们把行数据封装到Item实体类中。这个过程中需要知道数据库与实体的映射关系,要用到JPA的几个注解:
[Pasted image 20231106144512.png]

消息可靠性

消息可靠性问题

消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?

  • 发送时丢失:
    • 生产者发送的消息未送达exchange
    • 消息到达exchange后未到达queue
  • MQ宕机,queue将消息丢失
  • consumer接收到消息后未消费就宕机
    ![[Pasted image 20231106145924.png]]

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

  • publisher-confirm,发送者确认
    • 消息成功投递到交换机,返回ack
    • 消息未投递到交换机,返回nack
  • publisher-return,发送者回执
    • 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因

SpringAMQP实现生产者确认

  1. 在publisher这个微服务的application.yml中添加配置:
    1
    2
    3
    4
    5
    6
    spring:
    rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
    mandatory: true
    配置说明:
  • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
    • simple:同步等待confirm结果,直到超时
    • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
  • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
  1. 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContextAware{
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    // 获取RabbitTemplate
    RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
    //设置ReturnCallback
    rabbitTemplate.setReturnCallback((message, replyCode, replyText,exchange,routingKey) -> {
    log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
    replyCode, replyText, exchange, routingKey,message.toString());
    });
    }
    }
  2. 发送消息,指定消息ID、消息ConfirmCallback
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
    //消息体
    String message = "hello, spring amqp!";
    //消息ID,需要封装到CorrelationData中
    CorrelationData corrlationData = new CorrelationData(UUID.randomUUID().toString());
    //添加callback
    correlationData.getFuture().addCallback(
    result -> {
    if(result.isAck()){
    // ack,消息成功
    log.debug("消息发送成功,ID:{}",correlationData.getId());
    }else{
    // nack,消息失败
    log.error("消息发送失败,ID:{},原因{}",correlationData.getId(),result.getReason());
    }
    },
    ex -> log.error("消息发送异常,ID:{},原因{}",correlationData.getId(),ex.getMessage())
    );
    // 发送消息
    rabbitTemplate.convertAndSend("amq.direct","simple",nessage,correlationData);
    }

消息持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

  1. 交换机持久化:
    1
    2
    3
    4
    5
    @Bean
    public DirectExchange simpleExchange() {
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct",true,false);
    }
  2. 队列持久化
    1
    2
    3
    4
    5
    @Bean
    public Queue simpleQueue() {
    //使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
    }
  3. 消息持久化,SpringAMQP中的消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定的:
    1
    2
    3
    4
    Message msg = MessageBuilder
    .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体
    .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
    .build();

消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息
而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    配置方式是修改application.yml文件,添加下面配置:
    1
    2
    3
    4
    5
    6
    spring:
    rabbitmq:
    listener:
    simple:
    prefetch: 1
    acknowledge-mode: none # none,关闭ack;manual,手动ack;auto;自动ack

失败重试机制

当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
[Pasted image 20231106164342.png]
我们可以利用Spirng的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
listener:
simple:
prefetch: 1
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts:3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

消费者失败消息处理策略

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
    [Pasted image 20231106170243.png]
    测试下RepublishMessageRecoverer处理模式:
  • 首先,定义接收失败消息的交换机、队列及其绑定关系:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Bean
    public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
    return new Queue("error.queue",true);
    }
    @Bean
    public Binding errorBinding(){
    return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
    }
  • 然后,定义RepublishMessageRecoverer:
    1
    2
    3
    4
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    }

总结

如何确保RabbitMQ消息的可靠性?

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

死信交换机

初识死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的Requeue参数设置为false
  • 消息是一个过期消息,超时无人消费
  • 要投递的队列消息推挤满了,最早的消息可能成为死信
    如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
    [Pasted image 20231107084027.png]

TTL

TTL,也就是Time-To-Live。如果一个队列中的消息TTL仍未消费,则会变为死信,ttl超时分为两种情况:

  • 消息所在的队列设置了存活时间
  • 消息本身设置了存活时间
    [Pasted image 20231107084546.png]
    我们声明一组死信交换机和队列,基于注解方式:
    1
    2
    3
    4
    5
    6
    7
    8
    @RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "dl.queue",durable = "true"),
    exchange = @Exchange(name = "dl.direct"),
    key = "dl"
    ))
    public void listenDlQueue(String msg){
    log.info("接收到 dl.queue的延迟消息:{}",msg);
    }
    要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Bean
    public DirectExchange ttlExchange(){
    return new DirectExchange("ttl.direct");
    }
    @Bean
    public Queue ttlQueue(){
    return QueueBuilder.durable("ttl.queue") //指定队列名称,并持久化
    .ttl(10000) //设置队列的超时时间,10秒
    .deadLetterExchange("dl.direct") //指定死信交换机
    .deadLetterRoutingKey("dl") // 指定死信RoutingKey
    .build();
    }
    @Bean
    public Binding simpleBinding(){
    return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
    发送消息时,给消息本身设置超时时间
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Test
    public void testTTLMsg(){
    //创建消息
    Message message = MessageBuilder
    .withBody("hello,ttl message".getBytes(StandardCharsets.UTF_8))
    .setExpiration("5000")
    .build();
    //消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    //发送消息
    rabbitTemplate.convertAndSend("ttl.direct","ttl",message,correlationData);
    }

延迟队列

利用TTL结合死信交换机,我们实现了消息发出后,消息者延迟收到消息的效果。这种消息模式被称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

延迟队列插件

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
详细安装过程参考[RabbitMQ部署指南]

SpringAMQP使用延迟队列插件

DelayExchange的本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。
基于注解方式:

1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.direct",delayed = "true"),
key = "delay"
))
public void listenDelayedQueue(String msg){
log.info("接收到 delay.queue的延迟消息:{}",msg);
}

基于java代码的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public DirectExchange delayedExchange(){
return ExhcangeBuilder
.directExchange("delay.direct") // 指定交换机类型和名称
.delayed() //设置delay属性为true
.durable(true) //持久化
.build();
}
@Bean
public Queue delayedQueue(){
return new Queue("delay.queue");
}
@Bean
public Binding delayedBinding(){
return BindingBuilder.bind(delayedQueue().to(delayedExchange()).with("delay"));
}

然后我们向这个delay为true的交换机中发送消息,一定要给消息添加一个header: x-delay,值为延迟的时间,单位为毫秒:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testDelayedMsg(){
// 创建消息
Message message = MessageBuilder
.withBody("hello,delayde message".getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay",10000)
.builder();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 发送消息
rabbitTemplate.convertAndSend("delay.direct","delay",message,correlationData);
log.debug("发送消息成功");
}

惰性队列

消息堆积问题

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
[Pasted image 20231107113320.png]
解决消息堆积有三种思路:

  • 增加更多消费者,提高消费速度
  • 在消费者内开启线程加快消息处理速度
  • 扩大队列容积,提高堆积上限

惰性队列

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。
惰性队列的特征如下:

  • 接收到消息后直接存到磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
    而要设置一个惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
    1
    rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
    用SpringAMQP声明惰性队列分两种方式:
  • @Bean的方式
    1
    2
    3
    4
    5
    6
    7
    @Bean
    public Queue lazyQueue(){
    return QueueBuilder
    .durable("lazy.queue")
    .lazy() // 开启x-queue-mode为lazy
    .build();
    }
  • 注解方式:
    1
    2
    3
    4
    5
    6
    7
    8
    @RabbitListener(queuesToDeclare = @Queue(
    name = "lazy.queue",
    durable = "true",
    arguments = @Argument(name = "x-queue-mode",value = "lazy")
    ))
    public void listenLazyQueue(String msg){
    log.info("接收到 lazy.queue的消息:{}",msg);
    }

MQ集群

集群分类

RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:

  • 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力
  • 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性
    镜像集群虽然支持主从,但主从同步并不不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。

普通集群

普通集群,或者叫标准集群(classic cluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失
    [Pasted image 20231107151616.png]
    详细的搭建步骤参考[RabbitMQ部署指南]

镜像集群

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到的其他节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主
    [Pasted image 20231107154652.png]
    详细的搭建步骤可以参考[RabbitMQ部署指南]

仲裁队列

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致
    详细的搭建步骤可以参考:[[RabbitMQ部署指南]]
    SpringAMQP创建仲裁队列:
    1
    2
    3
    4
    5
    6
    7
    @Bean
    public Queue quorumQueue(){
    return QueueBuilder
    .durable("quorum.queue") // 持久化
    .quorum() // 仲裁队列
    .build();
    }
    SpringAMQP连接集群,只需要在yaml中配置即可:
    1
    2
    3
    4
    5
    6
    spring:
    rabbitmq:
    addresses: 192.168.140.131:8071,192.168.140.131:8072,192.168.140.131:8073
    username: huanji
    password: root
    virtual-host: /