我们使用 LinkedHashMap 这个数据结构,它的底层是 HashMap,然后把元素入 Map 的顺序用双向链表连接起来,HashMap 在组织数据时数组包含多个 bucket,具有相同 hash index 的元素使用链表或者红黑树来组织数据并放入同一个 bucket 中,它们在内存中的布局大概率不是连续的,在实际执行时 CPU 的 cache line 命中率有优化空间,也就是说很大概率在取某个元素时,需要到内存里把数据 load 到 CPU cache 中,不能很好的利用 CPU 的 cache line 的特性
V2 - 使用 CAS 优化并发控制
针对上面的问题 1,一般优化的方向是使用 lock-free 算法来解决,在 Java 中可以使用在 Unsafe 中 CAS 相关的一些操作,它实现了 CPU 指令级别的原子操作 (compare and swap)
public class CASBuffer<K, V> implements Buffer<K, V> {
private int capacity;
private AtomicReference<LinkedHashMap<K, V>> referenceMaps = new AtomicReference<>();
// ...
@Override
public boolean offer(K key, V value) {
boolean success = false;
while (!success) {
LinkedHashMap<K, V> current = referenceMaps.get();
LinkedHashMap<K, V> modified = (LinkedHashMap<K, V>) current.clone();
if (modified.containsKey(key)) {
modified.put(key, value);
} else if (modified.size() == capacity) {
return false;
} else {
modified.put(key, value);
}
success = referenceMaps.compareAndSet(current, modified);
}
return true;
}
@Override
public int poll(Collection<? super V> bucket) {
LinkedHashMap<K, V> current = referenceMaps.getAndSet(new LinkedHashMap<K, V>(capacity));
for (V v : current.values()) {
bucket.add(v);
}
return current.size();
}
}
那么如何利用环形数组来解决问题呢?首先,我们要把实际的元素存储在数组中,这样在 load 内存里的数据到 CPU cache 中就可以把相邻的元素也同时加载进去,省去了多次 load 内存的操作;其次,在写入和读取数据时需要位置指针,也就是下一次要写入的位置,和最新读取到的位置,可以使用 int 来表示即可,但是对于位置的更新需要做并发控制,这个时候我们就可以考虑使用 CAS 了
public class ArrayBuffer<K, V> implements Buffer<K, V> {
private volatile long nextWrite = 1;
private final K[] keys;
private final AtomicReferenceArray<V> referenceArray;
private volatile long firstWrite = 1;
private volatile long lastRead = 0;
private int capacity;
// ...
@Override
public boolean offer(K key, V value) {
long nextWrite = this.nextWrite;
for (long updatePosition = firstWrite; updatePosition < nextWrite; updatePosition++) {
int index = computeIndex(updatePosition);
if (key.equals(keys[index])) {
referenceArray.set(index, value);
if (updatePosition >= firstWrite) {
return true;
} else {
break;
}
}
}
return add(key, value);
}
private boolean add(K key, V value) {
if (isFull()) {
return false;
}
long nextWrite = this.nextWrite;
long index = computeIndex(nextWrite);
keys[index] = key;
referenceArray.set(index, value);
this.nextWrite = nextWrite + 1;
return true;
}
@Override
public int poll(Collection<? super V> bucket) {
long lastRead = this.lastRead;
long nextWrite = this.nextWrite;
firstWrite = nextWrite;
for (long readIndex = lastRead + 1; readIndex < nextWrite; readIndex++) {
int index = computeIndex(readIndex);
// 注意这里:消费线程也执行了写入操作,更新了 keys 和 values,有优化空间
keys[index] = null;
bucket.add(referenceArray.getAndSet(index, null));
}
this.lastRead = nextWrite - 1;
return (int) (nextWrite - lastRead - 1);
}
private int computeIndex(long value) {
return ((int) value) % capacity;
}
}
...
for (long readIndex = lastRead + 1; readIndex < nextWrite; readIndex++) {
int index = computeIndex(readIndex);
// 把这里的更新操作,移动到 offer 的方法中,这样在执行时生产者线程会主动去释放已经
// 使用完的线程
// keys[index] = null;
// bucket.add(referenceArray.getAndSet(index, null));
bucket.add(referenceArray.get(index));
}
...
// 为生产者线程添加一个已经清空过的最新位置,只有生产者线程会使用,所以不需要并发控制
private long lastCleaned = 0;
// 在每次添加元素之前先来检测以下是否需要清除使用过的元素
private void cleanUp() {
long lastRead = this.lastRead;
if (lastRead == lastCleaned) {
return;
}
while (lastCleaned < lastRead) {
int index = computeIndex(++lastCleaned);
// gc help
keys[index] = null;
atomicReferenceArray.set(index, null);
}
}
这样优化之后的性能指标可以达到 46mOPS。使用 Single Writer 原则在设计上更加合理,可维护性更好; 此外这种方式让修改只在一个线程中进行,避免因修改共享数据导致的多个 CPU Cache 中的数据失效问题,这样在处理数据时只需要本线程内修改数据后写会内存,而这个写回内存的操作不需要强制保证和其他线程的可见性,延迟可见就可以,所以这里也是一个优化点,使用具有 store-store 内存屏障语义的 lazySet 比使用具有 store-load 内存屏障语义的 volatile 在性能上更加好。
V5 - 使用 lazySet 进一步提升性能
在更新 lastRead, nextWrite, referenceArray set null 时都是使用的 volatile 来实现可见性,它的问题是每次都必须把 CPU cache 中的数据写回内存,并把其他 CPU 核心中的 Cache 失效掉,这样一定程度上可能会影响性能;根据这几个变量的特点,即使晚几个 CPU 的时钟周期写回内存也是没有关系的,所以我们完全可以不强制使用 volatile,改用原子类的 lazySet,可以看这里的 set 和 lazySet 的说明.
锁策略:Locking strategies require an arbitrator(仲裁), usually the operating system kernel, to get involved when the contention occurs to decide who gains access and in what order. This can be a very expensive process often requiring many more CPU cycles than the actual transaction to be applied to the business logic would use. Those waiting to enter the critical section(临界区), in advance of performing the mutation must queue, and this queuing effect (Little's Law) causes latency to become unpredictable and ultimately restricts throughput.
Most locking strategies are composed from optimistic strategies for changing the lock state or mutual exclusion primitive. (大多数的锁策略是由改变锁状态的乐观策略或互斥原语组成的)
If a system is decomposed into components that keep their own relevant state model, without a central shared model, and all communication is achieved via message passing then you have a system without contention naturally. This type of system obeys the single writer principle if the messaging passing sub-system is not implemented as queues. If you cannot move straight to a model like this, but are finding scalability issues related to contention, then start by asking the question, “How do I change this code to preserve the Single Writer Principle and thus avoid the contention?”
The Single Writer Principle is that for any item of data, or resource, that item of data should be owned by a single execution context for all mutations. (单一写入者原则适用于任何一项数据或资源,该数据项的所有修改都应该由同一个执行上下文拥有)
But if there is only a single writer we don't need to do that, as we know no one will ever change the data but us. And from that follows that strictly speaking lazySet is at the very least as correct as a volatile set for a single writer. (如果只有一个单一写入者,也就是没有其他的人修改数据,只有我们自己;基于这个结论可以严格的说单一写入者模式下的 lazySet 和 volatile 具有同样正确的语义)
At this point the question is when (if at all) will the value set be made visible to other threads. (基于这个点问题就变成了被修改的值何时被其他线程看到)