Guava EventBus

Guava EventBus 允许组件之间的发布-订阅式通信,而无需组件之间显式注册。

适用场景:进程内通信,不支持进程间,不是一个通用的发布订阅系统,使用显式注册代替传统的 Java 进程内事件分发,传统的事件分发需要定义 Listener、注册每个事件监听到列表中等

使用 EventBus

感受一下 EventBus 的简单与强悍

public class EventBus01 {
    public static void main(String[] args) {
        EventBus eventBus = new EventBus("OrderEventBus");
        eventBus.register(new OrderEventSubscribe());

        eventBus.post(new OrderCreatedEvent("O187312", "120"));
        eventBus.post(new Object());
        eventBus.post(new OrderCancelledEvent("O9999", System.currentTimeMillis()));

        EventBus eventBus1 = new EventBus((exception, context) -> {
            System.out.println(exception);
            System.out.println(context);
        });

        eventBus1.register(new NoopEventSubscribe());
        eventBus1.post(new OrderCreatedEvent("123", "13.0"));
    }

    static class OrderEventSubscribe {
        @Subscribe
        public void handleOrderCreated(OrderCreatedEvent event) {
            System.out.println("Handle 1: " + event);
        }

        @Subscribe
        public void handleOrder2(OrderCreatedEvent event) {
            System.out.println("Handle 2: " + event);
        }

        @Subscribe
        public void handleOrder3(Object event) {
            System.out.println("Handle 3: " + event);
        }
    }

    static class NoopEventSubscribe {
        @Subscribe
        public void handleOrderCreated(DeadEvent event) {
            System.out.println("Dead Handle 1: " + event);
            throw new RuntimeException("Mock");
        }
    }

    @Data
    @ToString
    @AllArgsConstructor
    static class OrderCreatedEvent {
        private String orderId;
        private String amount;
        // ...
    }

    @Data
    @ToString
    @AllArgsConstructor
    static class OrderCancelledEvent {
        private String orderId;
        private long cancelledAt;
        // ...
    }
}

事件的订阅者通过 @Subscribe 来标识,是方法级别的。

为什么使用注解标记处理,而不是要求侦听器实现接口?因为注解可以表达接口想要表达的意图,此外注解可以允许你将事件处理程序放在任意想放置的地方,灵活性更好。传统的 Java 事件使用一个监听器接口,有几个缺点:

  1. 任何一个类只能对给定事件实现单个响应

  2. 侦听器接口方法可能会冲突

  3. 该方法必须以事件(例如handleChangeEvent)命名,而不是其用途(例如recordChangeInJournal)命名

  4. 每个事件通常都有其自己的接口,而没有用于一系列事件(例如,所有UI事件)的公共父接口

使用传统的 Java 的方式由于难以干净利落地实现,因此产生了一种模式,特别是在 Swing 应用程序中常见的模式,即使用微小的匿名类来实现事件监听器接口。如下:

如果使用了 EventBus, 就是如下这样的,对比明显:

理解 EventBus 的设计

EventBus 内部有 4 个重要的组件:executor, exceptionHandler, subscribers, dispatcher

  1. subscribers 用来存放所有已经注册到当前 EventBus 的订阅者, 它的实现是一个订阅者注册表 SubscriberRegistry

  2. dispatcher 分发事件给订阅者的处理器,提供了不同场景下的不同事件分发顺序的保证,比如 perThreadDispatchQueue 确保每个线程内部发布的事件是有序的;immediate 表示立即消息,当消息发布后会被立即执行;legacyAsync 使用了一个全局队列来存储当前 EventBus 所有已发布的事件,然后分发给 executor 处理

  3. executor 任务执行器的抽象,在 EventBus 中承担调用执行订阅者的职责,依赖该抽象既可以实现同步调用订阅者,也可以实现异步调用,满足了不同场景的灵活度,AsyncEventBus 就是一个可以灵活定制 Executor 的可实现异步执行调用者的类

  4. exceptionHandler EventBus 提供了一个默认的异常处理器,但自定义的异常处理器可以满足对异常处理的特殊需求

EventBus 发布事件的流程

  1. 根据 event 查询事件订阅者列表

  2. 如果该事件有订阅者,事件分发器将事件分发给订阅者,其中 Subscriber 表示一个订阅者,包含了目标对象和需要执行的方法,订阅者在执行时会将方法的调用执行委托给 executor,在执行调用订阅者时使用了反射机制;如果在执行过程中出现异常,交给 exceptionHandler 去处理,整个处理结束

  3. 如果事件没有订阅者,事件就被当作 DeadedEvent 进行处理,如果注册了 DeadedEvent 的订阅者,那么就会处理 DeadedEvent,否则会丢弃这个事件

EventBus 实现

EventBus 的两个核心是 register subscribe 和 post event

  • register subscribe

SubscriberRegistry 是注册 Subscriber 的地方,为了实现一个线程安全的 EventBus,SubscriberRegistry 内部使用了 ConcurrentMap 作为存放 Subscriber 的容器,并且在需要更新 Subscriber 时,利用了具有写时复制能力的 CopyOnWriteArraySet,这样在并发注册 Subscriber 时也可以保证线程安全和正确性,之所以使用 CopyOnWriteArraySet 是因为注册订阅者是一个小概率发生的事情,一般情况下都是在启动时注册,运行时使用,一个典型的读多写少的场景。

Note: 在平常编写程序时,应该多思考具体的应用场景来选择合适的数据结构和算法,这样能最大化程序的性能,同时也让设计和实现变得优雅,这是一个工程师的内核能力。

  • post event

总结

EventBus 虽然是实现了一个小功能,但他内部的设计与实现是非常值得学习的。有几个设计上的点值得思考:

  1. 封装复杂性,提供简单、易用、清晰的接口,在使用 EventBus 时,我们只需要定义事件和实现事件的订阅者,然后注册到 EventBus 中,就可以轻松实现进程内的事件通信,支持一个事件被多个订阅者消费;

  2. 使用更加能表达意图的 @Subscribe 注解来声明订阅者,而不是实现特定接口,使得编程时代码的灵活度更好,因为可以在任何应该放置订阅者的地方放置这个订阅者,而不是被强制实现接口和使用一个固定的方法名(因为接口是事先定义好的);

  3. 在设计与实现时,尽量使用 OO 设计原则和设计思想, 将 EventBus 的功能划分为功能相对清晰单一的子功能,然后进行组合,比如抽象出普适的 Object event;抽象出 Subscriber, 并使用 @Subscribe 和反射机制来自动扫描、自动发现 Subscriber; 抽象出 Dispatcher 进行事件分发,进而实现满足不同需求的分发策略;抽象出 Executor,将 Subscriber 的执行委托给 Executor,进而可以灵活的控制是异步执行还是同步执行等;抽象出 exceptionHandler 来处理异常,并且可以将异常传播给自定义的异常处理组件,由开发者控制

  4. 根据应用场景灵活选择并发模式(ThreadLocal, 并发写控制等)、数据结构与算法(CopyOnWriteArraySet, MultiMap, Queue etc.)等

Reference

最后更新于

这有帮助吗?