// Class is typically registered by the container.
class EventBusChangeRecorder {
// 使用 EventBus 的注解来实现
@Subscribe
public void recordCustomerChange(ChangeEvent e) {
recordChange(e.getChange());
}
}
/** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
// 使用 MultiMap 存储 Subscriber, e.g.:
// io.github.demo.OrderCreatedEvent -> [Subscriber1, Subscriber2]
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
// CopyOnWriteArraySet 存储 Subscriber 的列表,这个列表里的订阅者按顺序处理事件
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
// 添加最新注册的订阅者,使用写时复制机制
eventSubscribers.addAll(eventMethodsInListener);
}
}
// 发送事件时,就从 eventSubscribers 中查询订阅者列表,它返回的是一个快照
/**
* Gets an iterator representing an immutable snapshot of all subscribers to the given event at
* the time this method is called.
*/
Iterator<Subscriber> getSubscribers(Object event) {
ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators =
Lists.newArrayListWithCapacity(eventTypes.size());
for (Class<?> eventType : eventTypes) {
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
// eager no-copy snapshot
subscriberIterators.add(eventSubscribers.iterator());
}
}
return Iterators.concat(subscriberIterators.iterator());
}
// 发布事件
public void post(Object event) {
// 获取订阅者列表
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
// 分发事件,订阅者按顺序执行
// dispatcher 的默认实现有三个实现策略:
// 1. PerThreadQueuedDispatcher 每个线程使用一个 Queue 存储 event,使用了 ThreadLocal 并发模式
// 2. LegacyAsyncDispatcher 同一个 EventBus 内部使用一个全局的队列存放 event
// 3. ImmediateDispatcher 立即执行,不使用队列存储
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
// 如果找不到订阅者,就生成一个 DeadedEvent 发布出去
post(new DeadEvent(this, event));
}