CloudNativeEra
  • Introduction
  • 名词解释
  • Computer Science
    • Computer Organization
      • CPU
      • 二进制、电路、加法器、乘法器
      • 编译、链接、装载
      • 存储器
      • IO
    • Operating System
      • 操作系统基础知识
      • 系统初始化
      • 进程管理
      • Everything about Memory
      • 文件系统
      • 并行编程
      • Linux
        • CPU
        • IO 多路复用
        • DMA IO and Linux Zero Copy
    • Computer Network
      • 网络相关命令
      • 评估系统的网络性能
      • 网络抓包
      • Linux 最多支撑的 TCP 连接
      • 网络虚拟化
      • DHCP 工作原理
    • Data Structure and Algorithm
      • 题目列表
      • Summarize
        • 方法总结
        • 二分思想
        • 树的演化
        • 算法思想总结
      • Data Structure
        • Data Struct - Array
        • Tree
        • Heap
        • Hash
        • 字符串
      • Algorithm
        • Sorting Algorithm
        • 查找
        • 贪心算法
        • 动态规划
        • 位运算
      • Practice Topics
        • Data Struct in SDK
        • Topic - Tree
        • Topic - Graph
        • Topic - 滑动窗口
        • 剑指 Offer 题解
    • 并发编程
      • 并发模式
      • 并发模型
  • 系统设计
    • 软件设计
      • 软件架构
      • 编程范式
      • 系统设计题
      • 设计原则
      • 计算机程序的构造和解释 SICP
    • 领域驱动设计
      • 应用:在线请假考勤管理
      • 应用: library
    • 微服务与云原生
      • Designing and deploying microservices
      • 容器技术
      • Docker
      • Etcd
      • Kubernetes
        • Kubernetes - Mapping External Services
      • Istio
      • 监控
    • 分布式系统
      • 分布式理论
      • 分布式事务
    • 后端存储设计
      • 缓存设计
      • 数据库架构设计
    • CI/CD
    • 设计最佳实践
    • 测试
    • 安全
    • 综合
      • 开发实践
      • 分布式锁
      • 分布式计数服务
      • 弹幕系统设计
      • 消息队列设计
      • 分布式ID生成算法
      • 限流设计
      • 网关设计
      • 通用的幂等设计
      • 分布式任务调度
        • Timer
        • ScheduledExecutorService
        • Spring Task
        • Quartz
      • 交易系统
      • 权限设计
  • 编程语言
    • 编程语言
    • C & C++
    • Java
      • JVM
        • JVM Bytecode
      • Java 核心技术
      • Java 8 新特性
      • Java 集合框架
      • Java NIO
      • 并发编程
        • 线程生命周期与线程中断
        • 三个线程交替打印
        • 两个线程交替打印奇偶
        • 优雅终止线程
        • 等待通知机制
        • 万能钥匙:管程
        • 限流器
        • 无锁方案 CAS
    • Java 源码阅读
      • Unsafe
      • 异步计算 Future
      • Java Queue
      • CoalescingRingBuffer 分析
      • Java Collections
        • PriorityQueue 分析
        • HashMap 分析
        • TreeMap
    • Golang
    • Python
  • 框架/组件/类库
    • Guava
      • Guava Cache
      • Guava EventBus
    • RxJava
    • Apache MINA
    • Netty
      • 网络 IO 模型
      • Netty 生产问题
    • Apache Tomcat
    • MyBatis
    • 限流框架
    • Spring Framework
      • Spring Core
      • Spring 事务管理
    • Spring Boot
    • Spring Cloud
      • Feign & OpenFeign
      • Ribbon
      • Eurake
      • Spring Cloud Config
    • FixJ
    • Metrics
    • Vert.x
  • 中间件
    • Redis
      • Redis 基础
        • Redis 数据结构设计与实现
        • Redis 高性能网络模型
      • Redis checklist
      • 应用案例 - Redis 数据结构
      • 应用案例 - Redis 缓存应用
      • 应用案例 - Redis 集群
      • Redis 客户端
      • Redis 生产案例
        • [译] 在 Redis 中存储数亿个简单键值对
    • MySQL
      • MySQL 基础
      • MySQL Index
      • MySQL Transaction
      • MySQL 优化
      • MySQL 内核
      • MySQL Command
      • MySQL Checklist
      • MySQL Analysis Tool
      • 实现 MySQL
    • State Machine
    • 数据库连接池
    • MQ
      • 高性能内存队列 Disruptor
      • Kafka
      • Pulsar
      • RocketMQ
        • Broker 的设计与实现
      • NSQ
  • 实际案例
    • 线上 Case
      • Request Aborted
      • MySQL - Specified key was too long
      • Java 应用 CPU 100% 排查优化
      • 频繁 GC 导致的 Java 服务不响应
      • 导出优化
  • 大数据
    • 流计算
    • Flink
  • 其他
    • 工具
    • 读书
      • 设计数据密集型应用
      • 实现领域驱动设计
      • 精通比特币
      • 提问的智慧
    • 论文
    • 工程博客
    • 阅读源码
    • 面试
      • 如何在最短的时间里对对方有个全面的了解
    • 分享
    • 软技能
    • Todo
  • Blog
    • #算法
      • 查找
      • 位运算
      • 树
    • #架构
      • 1- 通信
    • Design & Dev & Opt
      • High Performance Data structure Design
  • Tiny Project
    • A Simple WeChat-like Instant Messaging Platform
由 GitBook 提供支持
在本页
  • 使用 EventBus
  • 理解 EventBus 的设计
  • 总结
  • Reference

这有帮助吗?

  1. 框架/组件/类库
  2. Guava

Guava EventBus

Guava EventBus 允许组件之间的发布-订阅式通信,而无需组件之间显式注册。

适用场景:进程内通信,不支持进程间,不是一个通用的发布订阅系统,使用显式注册代替传统的 Java 进程内事件分发,传统的事件分发需要定义 Listener、注册每个事件监听到列表中等

使用 EventBus

感受一下 EventBus 的简单与强悍

public class EventBus01 {
    public static void main(String[] args) {
        EventBus eventBus = new EventBus("OrderEventBus");
        eventBus.register(new OrderEventSubscribe());

        eventBus.post(new OrderCreatedEvent("O187312", "120"));
        eventBus.post(new Object());
        eventBus.post(new OrderCancelledEvent("O9999", System.currentTimeMillis()));

        EventBus eventBus1 = new EventBus((exception, context) -> {
            System.out.println(exception);
            System.out.println(context);
        });

        eventBus1.register(new NoopEventSubscribe());
        eventBus1.post(new OrderCreatedEvent("123", "13.0"));
    }

    static class OrderEventSubscribe {
        @Subscribe
        public void handleOrderCreated(OrderCreatedEvent event) {
            System.out.println("Handle 1: " + event);
        }

        @Subscribe
        public void handleOrder2(OrderCreatedEvent event) {
            System.out.println("Handle 2: " + event);
        }

        @Subscribe
        public void handleOrder3(Object event) {
            System.out.println("Handle 3: " + event);
        }
    }

    static class NoopEventSubscribe {
        @Subscribe
        public void handleOrderCreated(DeadEvent event) {
            System.out.println("Dead Handle 1: " + event);
            throw new RuntimeException("Mock");
        }
    }

    @Data
    @ToString
    @AllArgsConstructor
    static class OrderCreatedEvent {
        private String orderId;
        private String amount;
        // ...
    }

    @Data
    @ToString
    @AllArgsConstructor
    static class OrderCancelledEvent {
        private String orderId;
        private long cancelledAt;
        // ...
    }
}

事件的订阅者通过 @Subscribe 来标识,是方法级别的。

为什么使用注解标记处理,而不是要求侦听器实现接口?因为注解可以表达接口想要表达的意图,此外注解可以允许你将事件处理程序放在任意想放置的地方,灵活性更好。传统的 Java 事件使用一个监听器接口,有几个缺点:

  1. 任何一个类只能对给定事件实现单个响应

  2. 侦听器接口方法可能会冲突

  3. 该方法必须以事件(例如handleChangeEvent)命名,而不是其用途(例如recordChangeInJournal)命名

  4. 每个事件通常都有其自己的接口,而没有用于一系列事件(例如,所有UI事件)的公共父接口

使用传统的 Java 的方式由于难以干净利落地实现,因此产生了一种模式,特别是在 Swing 应用程序中常见的模式,即使用微小的匿名类来实现事件监听器接口。如下:

class ChangeRecorder {
  void setCustomer(Customer cust) {
      // 使用匿名类实现特定的 Listener 接口
    cust.addChangeListener(new ChangeListener() {
      public void customerChanged(ChangeEvent e) {
        recordChange(e.getChange());
      }
    };
  }
}

如果使用了 EventBus, 就是如下这样的,对比明显:

// Class is typically registered by the container.
class EventBusChangeRecorder {
    // 使用 EventBus 的注解来实现
  @Subscribe 
  public void recordCustomerChange(ChangeEvent e) {
    recordChange(e.getChange());
  }
}

理解 EventBus 的设计

EventBus 内部有 4 个重要的组件:executor, exceptionHandler, subscribers, dispatcher

  1. subscribers 用来存放所有已经注册到当前 EventBus 的订阅者, 它的实现是一个订阅者注册表 SubscriberRegistry

  2. dispatcher 分发事件给订阅者的处理器,提供了不同场景下的不同事件分发顺序的保证,比如 perThreadDispatchQueue 确保每个线程内部发布的事件是有序的;immediate 表示立即消息,当消息发布后会被立即执行;legacyAsync 使用了一个全局队列来存储当前 EventBus 所有已发布的事件,然后分发给 executor 处理

  3. executor 任务执行器的抽象,在 EventBus 中承担调用执行订阅者的职责,依赖该抽象既可以实现同步调用订阅者,也可以实现异步调用,满足了不同场景的灵活度,AsyncEventBus 就是一个可以灵活定制 Executor 的可实现异步执行调用者的类

  4. exceptionHandler EventBus 提供了一个默认的异常处理器,但自定义的异常处理器可以满足对异常处理的特殊需求

EventBus 发布事件的流程

  1. 根据 event 查询事件订阅者列表

  2. 如果该事件有订阅者,事件分发器将事件分发给订阅者,其中 Subscriber 表示一个订阅者,包含了目标对象和需要执行的方法,订阅者在执行时会将方法的调用执行委托给 executor,在执行调用订阅者时使用了反射机制;如果在执行过程中出现异常,交给 exceptionHandler 去处理,整个处理结束

  3. 如果事件没有订阅者,事件就被当作 DeadedEvent 进行处理,如果注册了 DeadedEvent 的订阅者,那么就会处理 DeadedEvent,否则会丢弃这个事件

EventBus 实现

EventBus 的两个核心是 register subscribe 和 post event

  • register subscribe

SubscriberRegistry 是注册 Subscriber 的地方,为了实现一个线程安全的 EventBus,SubscriberRegistry 内部使用了 ConcurrentMap 作为存放 Subscriber 的容器,并且在需要更新 Subscriber 时,利用了具有写时复制能力的 CopyOnWriteArraySet,这样在并发注册 Subscriber 时也可以保证线程安全和正确性,之所以使用 CopyOnWriteArraySet 是因为注册订阅者是一个小概率发生的事情,一般情况下都是在启动时注册,运行时使用,一个典型的读多写少的场景。

Note: 在平常编写程序时,应该多思考具体的应用场景来选择合适的数据结构和算法,这样能最大化程序的性能,同时也让设计和实现变得优雅,这是一个工程师的内核能力。

  /** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
    // 使用 MultiMap 存储 Subscriber, e.g.: 
    //  io.github.demo.OrderCreatedEvent -> [Subscriber1, Subscriber2]
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      // CopyOnWriteArraySet 存储 Subscriber 的列表,这个列表里的订阅者按顺序处理事件
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      // 添加最新注册的订阅者,使用写时复制机制
      eventSubscribers.addAll(eventMethodsInListener);
    }
}

// 发送事件时,就从 eventSubscribers 中查询订阅者列表,它返回的是一个快照
/**
 * Gets an iterator representing an immutable snapshot of all subscribers to the given event at
 * the time this method is called.
 */
Iterator<Subscriber> getSubscribers(Object event) {
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
        CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
        if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
        }
    }

    return Iterators.concat(subscriberIterators.iterator());
}
  • post event

// 发布事件
public void post(Object event) {
    // 获取订阅者列表
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
        // 分发事件,订阅者按顺序执行
        // dispatcher 的默认实现有三个实现策略:
        //  1. PerThreadQueuedDispatcher  每个线程使用一个 Queue 存储 event,使用了 ThreadLocal 并发模式
        //  2. LegacyAsyncDispatcher  同一个 EventBus 内部使用一个全局的队列存放 event
        //  3. ImmediateDispatcher  立即执行,不使用队列存储
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      // 如果找不到订阅者,就生成一个 DeadedEvent 发布出去
      post(new DeadEvent(this, event));
    }

总结

EventBus 虽然是实现了一个小功能,但他内部的设计与实现是非常值得学习的。有几个设计上的点值得思考:

  1. 封装复杂性,提供简单、易用、清晰的接口,在使用 EventBus 时,我们只需要定义事件和实现事件的订阅者,然后注册到 EventBus 中,就可以轻松实现进程内的事件通信,支持一个事件被多个订阅者消费;

  2. 使用更加能表达意图的 @Subscribe 注解来声明订阅者,而不是实现特定接口,使得编程时代码的灵活度更好,因为可以在任何应该放置订阅者的地方放置这个订阅者,而不是被强制实现接口和使用一个固定的方法名(因为接口是事先定义好的);

  3. 在设计与实现时,尽量使用 OO 设计原则和设计思想, 将 EventBus 的功能划分为功能相对清晰单一的子功能,然后进行组合,比如抽象出普适的 Object event;抽象出 Subscriber, 并使用 @Subscribe 和反射机制来自动扫描、自动发现 Subscriber; 抽象出 Dispatcher 进行事件分发,进而实现满足不同需求的分发策略;抽象出 Executor,将 Subscriber 的执行委托给 Executor,进而可以灵活的控制是异步执行还是同步执行等;抽象出 exceptionHandler 来处理异常,并且可以将异常传播给自定义的异常处理组件,由开发者控制

  4. 根据应用场景灵活选择并发模式(ThreadLocal, 并发写控制等)、数据结构与算法(CopyOnWriteArraySet, MultiMap, Queue etc.)等

Reference

上一页Guava Cache下一页RxJava

最后更新于4年前

这有帮助吗?

EventBus Explained
Guava-EventBus 使用详解