Spring Event 事件通知

观察者模式

spring event的事件驱动模型是使用观察者模式进行解耦,所以我们在了解这个事件驱动模型之前,我们先来了解一下观察者模式这个设计模式。

当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。

拥有一些值得关注的状态的对象通常被称为目标(Subject), 由于它要将自身的状态改变通知给其他对象, 我们也将其称为发布者(Publisher)。 所有希望关注发布者状态变化的其他对象被称为观察者(Observer ) , 我们也可以叫做订阅者(Subscribers)

观察者模式建议你为发布者类添加订阅机制, 让每个对象都能订阅或取消订阅发布者事件流。这并不像听上去那么复杂。 实际上, 该机制包括

1) 一个用于存储订阅者对象引用的列表成员变量;

2) 几个用于添加或删除该列表中订阅者的公有方法。

下面通过一个比较简单的demo来演示观察者模式

首先我们需要创建一个发布者,然后在发布者中维护了一个订阅者的列表,用于遍历广播给所有的订阅者,然后有一个state的成员变量,当调用setState方法的时候会调用notifyAllSubscribers()方法广播给所有的订阅者, 并调用订阅者的update方法

package org.example;

import lombok.Data;

import java.util.ArrayList;
import java.util.List;

@Data
public class Publisher {

    private List<Subscriber> subscribers = new ArrayList<>();

    private int state;

    public void attach(Subscriber subscriber) {
        subscribers.add(subscriber);
    }

    public void notifyAllSubscribers() {
        for (Subscriber subscriber : subscribers) {
            subscriber.update();
        }
    }

    public void setState(int state) {
        this.state = state;
        notifyAllSubscribers();
    }
}

然后我们再来创建订阅者,这里定义了一个抽象类,其中声明了一个发布者用于获取发布者当前的state,然后还声明了一个抽象的update方法

package org.example;

import lombok.Data;

@Data
public abstract class Subscriber {
    private final Publisher publisher;

    public Subscriber(Publisher publisher) {
        this.publisher = publisher;
    }

    public abstract void update();
}

接下来是两个具体的实现的订阅者

package org.example;

public class SubscriberA extends Subscriber{
    public SubscriberA(Publisher publisher) {
        super(publisher);
        publisher.attach(this);
    }

    @Override
    public void update() {
        System.out.println("SubscriberA update..." + this.getPublisher().getState());
    }
}

package org.example;

public class SubscriberB extends Subscriber {
    public SubscriberB(Publisher publisher) {
        super(publisher);
        publisher.attach(this);
    }

    @Override
    public void update() {
        System.out.println("SubscriberB update..." + this.getPublisher().getState());
    }
}

最后是测试方法

package org.example;

import org.junit.jupiter.api.Test;

public class ObserverPatternTest {


    @Test
    void testObserverPattern() {
        Publisher publisher = new Publisher();

        SubscriberA subscriberA = new SubscriberA(publisher);
        SubscriberB subscriberB = new SubscriberB(publisher);

        publisher.setState(15);
        System.out.println("=========");
        publisher.setState(10);
    }
}

运行结果如下:

SubscriberA update...15
SubscriberB update...15
=========
SubscriberA update...10
SubscriberB update...10

通过上面简单的代码,当我们的目标发生变化的时候,就会广播给所有的订阅者也就是观察者,以后当这个变化需要通知给其他的观察者,只需要创建一个新的订阅者,并添加到订阅者列表中即可。

在实际项目中也有很多的应用场景,比如有一个添加评论的方法,在评论添加成功之后需要进行修改redis缓存、给用户添加积分等等操作。当然可以在添加评论的代码后面假设这些操作,但是这样的代码违反了设计模式的多项原则:单一职责原则、迪米特法则、开闭原则。也就是我们说的耦合性太大了,比如将来评论添加成功之后还需要有另外一个操作,这时候我们就需要去修改我们的添加评论代码了。所以我们就可以通过观察者模式来进行解耦。

在以前的代码中,我们需要自己实现观察者模式来解决这个问题。不过Spring中已经存在了一个升级版观察者模式的机制,这就是spring event的事件监听模式。

参考:观察者模式

观察者模式 -菜鸟教程

Spring Event事件处理

Spring提供了很方便的事件的处理机制,包括事件类ApplicationEvent和事件监听类ApplicationListener。 他实现的是设计者模式,如果实现了ApplicationListener接口的bean部署到Spring容器中,则每次ApplicationEvent发布到ApplicationContext时,都会通知该bean。

基于继承ApplicationEvent的Event

我们可以自定义自己的Event,然后继承Spring 的ApplicationEvent

package org.example;

import org.springframework.context.ApplicationEvent;

public class EventDemo extends ApplicationEvent {
    private String message;


    public EventDemo(Object source, String message) {
        super(source);
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}

然后我们来定义两个事件监听者

package org.example;

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

// 定义一个事件监听者
@Component
public class EventDemoListener implements ApplicationListener<EventDemo> {
    @Override
    public void onApplicationEvent(EventDemo event) {
        System.out.println("===========");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("receiver " + event.getMessage());
    }
}
package org.example;

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

// 定义一个事件监听者
@Component
public class EventDemo2Listener implements ApplicationListener<EventDemo> {
    @Override
    public void onApplicationEvent(EventDemo event) {
        System.out.println("===========");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("receiver2 " + event.getMessage());
    }
}

最后再写一个事件发布的类

package org.example;

import lombok.RequiredArgsConstructor;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

// 事件发布
@Component
@RequiredArgsConstructor
public class EventDemoPublish {
    private final ApplicationEventPublisher applicationEventPublisher;

    public void publish(String message) {
        //如果代码结构较复杂,多处发布相同的事件,建议发布事件时将this作为source传递,便于通过分析日志确定发布源
        EventDemo demo = new EventDemo(this, message);
        applicationEventPublisher.publishEvent(demo);
    }
}

然后我们来写测试代码

package org.example;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class EventDemoPublishTest {

    @Autowired
    private EventDemoPublish eventDemoPublish;


    @Test
    void testEventDemoPublish() {
        eventDemoPublish.publish("test message");
    }
}

执行结果

===========
receiver2 test message
===========
receiver test message

执行结果显示:EventDemo2ListenerEventDemoListener的执行间隔1秒。结果表示我们的响应程序是同步执行的,一个响应程序的执行会阻塞下一个响应程序的执行。

注意:以上处理事件都是同步的,如果发布事件处的业务存在事务,监听器处理也会在相同的事务中。如果对于事件的处理不想受到影响,可以onApplicationEvent方法上加@Async支持异步

首先我们给onApplicationEvent方法加上@Async

package org.example;

import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

// 定义一个事件监听者
@Component
public class EventDemoListener implements ApplicationListener<EventDemo> {

    @Async
    @Override
    public void onApplicationEvent(EventDemo event) {
        System.out.println("===========");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("receiver " + event.getMessage());
    }
}
package org.example;

import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

// 定义一个事件监听者
@Component
public class EventDemo2Listener implements ApplicationListener<EventDemo> {

    @Async
    @Override
    public void onApplicationEvent(EventDemo event) {
        System.out.println("===========");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("receiver2 " + event.getMessage());
    }
}

然后在主类加入@EnableAsync的注解

package org.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class EventApp {

    public static void main(String[] args) {
        SpringApplication.run(EventApp.class, args);
    }
}

然后测试类

package org.example;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class EventDemoPublishTest {

    @Autowired
    private EventDemoPublish eventDemoPublish;


    @Test
    void testEventDemoPublish() throws InterruptedException {
        eventDemoPublish.publish("test message");
        Thread.sleep(5000);
    }
}

运行结果

===========
===========
receiver2 test message
receiver test message

额外的小提示:IDEA可以帮我们快速定位到我们的Listener

在这里插入图片描述

基于注解@EventListener的Event

在spring4.2中我们可以以更加简洁的方式来监听event的发布,监听事件我们不必再实现ApplicationListener接口了,只要在方法上添加注解@EventListener即可

自定义Event

package org.example.annotation;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class MyEvent {

    private String message;
}

带有注解@EventListener的方法

package org.example.annotation;

import org.example.EventDemo;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class MyEventListener {

    @EventListener(condition = "#myEvent.message!='test'") ///以使用SPEL表达式来过滤监听到事件,即只有符合某种条件的才进行接收处理
    @Order(2)
    @Async
    public void handleEvent(MyEvent myEvent) {
        System.out.println("handleEvent1===========");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("handleEvent1:" + myEvent.getMessage());
    }


    @EventListener
    @Order(1) //可以在方法上使用spring的@order注解来定义多个监听器的顺序
    @Async
    public void handleEvent2(MyEvent myEvent) {
        System.out.println("handleEvent2===========");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("handleEvent2:" +myEvent.getMessage());
    }


    @EventListener(classes = {MyEvent.class, EventDemo.class})
    @Async
    @Order(3)
    public void handleEvent3(Object object) {
        //如果要监听多个事件类型的发布,可以在@EventListener(classes = {XXX.class,YYY.class})指定
        // spring会多次调用此方法来处理多个事件。
        // 但是注意此时,方法参数不能有多个,否则会发生转换异常,可以将使用多个事件的父类作为唯一的方法参数来接收处理事件,
        // 但除非必要否则并不推荐监听多个事件的发布。
        System.out.println("handleEvent3===========");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (object instanceof MyEvent) {
            System.out.println("handleEvent3:" +((MyEvent)object).getMessage());
        }
        if (object instanceof EventDemo) {
            System.out.println("handleEvent3:" +((EventDemo)object).getMessage());
        }
    }

}

同理的带有@EventListener的方法执行都是同步的,如果要想异步就需要加上 @Async, 同时使用@Order来定义多个监听器的顺序

然后是发布的类

package org.example.annotation;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

@Component
public class MyEventPublisher {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(MyEvent myEvent) {
        applicationEventPublisher.publishEvent(myEvent);
    }
}

最后是测试类

package org.example;

import org.example.annotation.MyEvent;
import org.example.annotation.MyEventPublisher;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class MyEventPublishTest {

    @Autowired
    private MyEventPublisher myEventPublisher;


    @Test
    void testEventDemoPublish() throws InterruptedException {
        myEventPublisher.publish(new MyEvent("test"));
        myEventPublisher.publish(new MyEvent("test1"));
        Thread.sleep(5000);
    }
}

测试结果

handleEvent2===========
handleEvent3===========
handleEvent2===========
handleEvent1===========
handleEvent3===========
handleEvent2:test
handleEvent2:test1
handleEvent3:test
handleEvent3:test1
handleEvent1:test1

参考:Spring5参考指南-事件Event

spring event的事件驱动模型的最佳实践@EventListener

Spring Event事件通知机制

ApplicationEventPublisher的publishEvent实现异步快速

ApplicationEvent使用时注意默认的事件机制是同步的

ApplicationEventPublisher的使用学习

源代码

https://gitee.com/cckevincyh/spring-event-demo

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐