Spring的事件机制(二)—— 源码解读

Spring事件机制

在上一篇博文 Spring事件机制 中,已经介绍了 Java 中对于发布-订阅模式的一些基本框架实现,以及在 Spring 中如何通过 Spring 提供的事件机制完成发布-订阅这一解耦模式的实现。
简单回顾,在 Spring 中若想实现事件机制,主要涉及三个类:ApplicationEvent、ApplicationListener以及ApplicationEventPublisher。相比于 Java 提供的原生机制,Spring由于其本身具有 BeanFactory 进行 Bean 的管理,省去了我们进行监听者注册的这一步骤。
本篇文章主要通过跟踪 Spring 事件部分的源码,来追踪一下 Spring 是如何基于 JDK 进行事件机制的扩展的。
PS.本文所使用 Spring 源码版本为 Spring 5.0.8。不同版本代码可能略有不同。
前文已经写过通过 Spring 来实现事件机制的简单实例,具体请参见Spring的事件机制

从 ApplicationEvent 谈起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Class to be extended by all application events. Abstract as it
* doesn't make sense for generic events to be published directly.
*
* @author Rod Johnson
* @author Juergen Hoeller
*/
public abstract class ApplicationEvent extends EventObject {
/** use serialVersionUID from Spring 1.2 for interoperability */
private static final long serialVersionUID = 7099057708183571937L;
/** System time when the event happened */
private final long timestamp;
/**
* Create a new ApplicationEvent.
* @param source the object on which the event initially occurred (never {@code null})
*/
public ApplicationEvent(Object source) {
super(source);
this.timestamp = System.currentTimeMillis();
}
/**
* Return the system time in milliseconds when the event happened.
*/
public final long getTimestamp() {
return this.timestamp;
}
}

由以上 Spring 中 ApplicationEvent 源代码可以看出,ApplicationEvent 继承了 JDK 中的事件对象 EventObject,在 Spring 中所有事件对象均应继承自 ApplicationEvent。在Spring基础上,其增加了事件发生时的时间戳属性以及序列化ID,并提供了通过事件源进行构建的构造方法。
Spring 中的 ApplicationEvent 设置成抽象类,由于一个单独的 ApplicationEvent 是没有任何语义的,所以需要根据不同场景进行扩展,在其之上为事件赋予意义。此类的说明中,作者也很好的说明了这一点。具体对 Application 的实现实例,请参见前文。

事件监听者 ApplicationListener

ApplicationListener 简介

JDK 中提供了 EventListener 接口,作为事件监听者标记。Spring 在 EventListener 接口的基础上,提供了 ApplicationListener 接口。该接口接收一个 ApplicationEvent 的子类,完成事件的监听流程。具体源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Interface to be implemented by application event listeners.
* Based on the standard {@code java.util.EventListener} interface
* for the Observer design pattern.
*
* <p>As of Spring 3.0, an ApplicationListener can generically declare the event type
* that it is interested in. When registered with a Spring ApplicationContext, events
* will be filtered accordingly, with the listener getting invoked for matching event
* objects only.
*
* @author Rod Johnson
* @author Juergen Hoeller
* @param <E> the specific ApplicationEvent subclass to listen to
* @see org.springframework.context.event.ApplicationEventMulticaster
*/
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/**
* Handle an application event.
* @param event the event to respond to
*/
void onApplicationEvent(E event);
}

由以上 ApplicationListener 接口可以看出,该接口是一个函数型接口(函数型接口定义在此不做解释,具体Google),提供了一个 onApplicationEvent(E extends Application) 方法定义,所有自行实现的监听者均需要实现该接口,并在该方法中进行事件的处理。

监听者注册

Spring 中,不需要我们手动进行监听器注册。ApplicationListener 对象一旦在 Spring 容器中被注册,Spring 会进行监听器的注册,实现事件的监听。

在介绍监听者注册流程之前,首先需要介绍介绍一下 org.springframework.context.event.ApplicationEventMulticaster,其主要定义了管理事件监听者,与发布事件到监听者的相关操作,若没有定义,Spring 容器将默认实例化 SimpleApplicationEventMulticaster

在 Spring 中,初始化容器时会调用 org.springframework.context.ConfigurableApplicationContext 接口中的 reFresh() 方法进行 Bean的加载,该方法会进行事件的监听注册。具体代码如下:

监听者注册的代码位于 org.springframework.context.supportAbstractApplicationContext 类的 refresh() 方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// Initialize event multicaster for this context.
//初始化ApplicationEventMulticaster
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
// Check for listener beans and register them.
// 注册监听者 Bean
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
finishRefresh();
}
catch (BeansException ex) {
if (logger.isWarnEnabled()) {
logger.warn("Exception encountered during context initialization - " +
"cancelling refresh attempt: " + ex);
}
// Destroy already created singletons to avoid dangling resources.
destroyBeans();
// Reset 'active' flag.
cancelRefresh(ex);
// Propagate exception to caller.
throw ex;
}
finally {
// Reset common introspection caches in Spring's core, since we
// might not ever need metadata for singleton beans anymore...
resetCommonCaches();
}
}
}

在 refresh() 方法的代码中,注意以下两项:

  • 调用 initApplicationEventMulticaster() 方法初始化一个 ApplicationEventMulticaster,默认情况下初始化为 SimpleApplicationEventMulticaster。
  • 调用 registerListeners() 方法进行事件监听者的注册。该方法具体实现如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /**
    * Add beans that implement ApplicationListener as listeners.
    * Doesn't affect other listeners, which can be added without being beans.
    */
    protected void registerListeners() {
    // Register statically specified listeners first.
    for (ApplicationListener<?> listener : getApplicationListeners()) {
    getApplicationEventMulticaster().addApplicationListener(listener);
    }
    // Do not initialize FactoryBeans here: We need to leave all regular beans
    // uninitialized to let post-processors apply to them!
    String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
    for (String listenerBeanName : listenerBeanNames) {
    getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
    }
    // Publish early application events now that we finally have a multicaster...
    Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
    this.earlyApplicationEvents = null;
    if (earlyEventsToProcess != null) {
    for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
    getApplicationEventMulticaster().multicastEvent(earlyEvent);
    }
    }
    }

由上文代码可见,注册监听者的过程主要可以分为以下三部分:

  • 添加容器中指定的监听器,通常这部分添加的监听器由 Spring 控制;
  • BeanFactory 中获取全部实现了 ApplicationListener 接口的 BeanNames,并把其推送给 ApplicationEventMulticaster
  • 若有需要立即执行的事件,直接执行这些事件的发布

以上三步就是 Spring 在初始化 Beans 时进行的事件监听者注册相关逻辑。在 Bean 加载过程中,就完成了事件的监听者注册,我们无需另外自行为自定义事件注册监听者。

事件发布 ApplicationEventPublisher

事件监听者作为基础,由 Spring 在 Bean 初始化时进行注册,免去了我们自行注册监听者的过程。关注了监听者如何注册后,我们继续来看在 Spring 中发布一个事件后,监听者是如何响应的。

发布一个事件

在上文 Spring的事件机制 中,已经介绍了一个简单的从事件源定义、事件对象定义到发布事件全过程的示例。在 Spring 中,发布一个自定义事件的过程可以由以下一行代码概括:

1
applicationEventPublisher.publishEvent(new TestEvent(this, msg));

其中,applicationEventPublisher是通过 Spring 注入的 ApplicationEventPublisher 实例。在事件源中通过上述代码,便可以在 Spring 中发布一个自定义事件。跟踪 publishEvent 方法,其默认实现位于 AbstractApplicationContext 类中,具体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Publish the given event to all listeners.
* @param event the event to publish (may be an {@link ApplicationEvent}
* or a payload object to be turned into a {@link PayloadApplicationEvent})
* @param eventType the resolved event type, if known
* @since 4.2
*/
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
if (logger.isTraceEnabled()) {
logger.trace("Publishing event in " + getDisplayName() + ": " + event);
}
// Decorate event as an ApplicationEvent if necessary
//若事件是 ApplicationEvent 的子类,则把其作为一个 ApplicationEvent 来处理
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}

以上代码,就是 Spring 中发布事件的具体过程实现,具体可以分为以下三步:

  1. 将事件分为 ApplicationEventPayloadApplicationEvent 两部分。其中,我们在 Spring 中自定义的事件均为 ApplicationEvent 类型,PayloadApplicationEvent 通常为 Spring 框架自身的事件;
  2. multicaster 若还未加载,将其存入 EarlyApplicationEvents 队列,并在 multicaster 初始化成功后立即发布;
  3. 同样发布事件到父级 ApplicationContext。
    以上三步便是发布一个事件的过程,由于我们发布的自定义事件通常在容器加载之后,且自定义事件均是 ApplicationEvent 过程,所以通常涉及到的仅是 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType) 方法的调用。

    multicastEvent 方法详解

    SimpleApplicationEventMulticaster 类中的 multicastEvent 定义如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Override
    public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
    ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
    for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
    Executor executor = getTaskExecutor();
    if (executor != null) {
    executor.execute(() -> invokeListener(listener, event));
    }
    else {
    invokeListener(listener, event);
    }
    }
    }

此处以 SimpleApplicationEventMulticaster 中的方法定义为例,作为默认注入的类型,通常我们在默认情况下的事件发布流程均遵循该实现。
从程序中可以看出,multicastEvent的主要逻辑可以分为三部分:

  1. 直接获取或通过事件对象的类型信息来获取事件类型
  2. 从 BeanFactory(以及相关的缓存机制)获取到该事件的全部监听者
  3. 遍历监听者集合,通过 multicaster 内持有的 Executor 进行通知,此处最后调用了 ApplicationListener 中的 onApplicationEvent 方法,这一方法正是我们在自定义 ApplicationListener 时必须要覆写的方法。

一点意外收获

在初读 SimpleApplicationEventMulticaster 源代码时,注释上有 By default, all listeners are invoked in the calling thread. 这样一句话。看到这句话使我在读源码的时候很疑惑,明明 multicastEvent 方法中持有了一个 Executor,并在通知监听者的时候,使用 executor 去执行 invoke 任务,为什么注释中还提到是在同一个线程里顺序进行监听者调用。
阅读源码中,看到 SimpleApplicationEventMulticaster 中针对 setTaskExecutor(Executor) 方法的说明。该方法在默认情况下,会为 SimpleApplicationEventMulticaster 类注入一个 SyncTaskExecutor 同步执行器,该执行器内部定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SyncTaskExecutor implements TaskExecutor, Serializable {
/**
* Executes the given {@code task} synchronously, through direct
* invocation of it's {@link Runnable#run() run()} method.
* @throws IllegalArgumentException if the given {@code task} is {@code null}
*/
@Override
public void execute(Runnable task) {
Assert.notNull(task, "Runnable must not be null");
task.run();
}
}

由源代码可见,该Executor 内部提供的 execute 方法中调用的是 Runnable 的 run() 方法,也就是在本线程中执行逻辑,为同步调用。
Spring 中除了提供 SyncTaskExecutor 之外,还提供了 SimpleAsyncTaskExecutor 这一异步执行器的实现。我们可以通过在 xml 文件中声明 bean 时为 multicaster 注入 SimpleAsyncTaskExecutor 或者通过重写 BeanFactory 的方式来实现 ApplicationEvent 的异步通知。

总结

Spring 源码博大精深,本文仅是截取小部分事件相关的代码进行简易的流程分析。有关于 Bean 初始化过程中对于监听器的注册,以及事件通知的过程中通过事件及类型信息获取全部监听器列表的过程,仍值得推敲。
关于 Spring 事件机制,写了一个小例子,例子主要针对于相同事件多个监听者、不同事件不同监听者之间,进行模拟耗时调用。例子采用同步通知模型进行。具体请移步java-demo/spring-event
初探 Spring,若有不严谨之处,还望大佬指正,多多交流。欢迎email至 JaydenRansom@outlook.com