CloudNativeEra
  • Introduction
  • 名词解释
  • Computer Science
    • Computer Organization
      • CPU
      • 二进制、电路、加法器、乘法器
      • 编译、链接、装载
      • 存储器
      • IO
    • Operating System
      • 操作系统基础知识
      • 系统初始化
      • 进程管理
      • Everything about Memory
      • 文件系统
      • 并行编程
      • Linux
        • CPU
        • IO 多路复用
        • DMA IO and Linux Zero Copy
    • Computer Network
      • 网络相关命令
      • 评估系统的网络性能
      • 网络抓包
      • Linux 最多支撑的 TCP 连接
      • 网络虚拟化
      • DHCP 工作原理
    • Data Structure and Algorithm
      • 题目列表
      • Summarize
        • 方法总结
        • 二分思想
        • 树的演化
        • 算法思想总结
      • Data Structure
        • Data Struct - Array
        • Tree
        • Heap
        • Hash
        • 字符串
      • Algorithm
        • Sorting Algorithm
        • 查找
        • 贪心算法
        • 动态规划
        • 位运算
      • Practice Topics
        • Data Struct in SDK
        • Topic - Tree
        • Topic - Graph
        • Topic - 滑动窗口
        • 剑指 Offer 题解
    • 并发编程
      • 并发模式
      • 并发模型
  • 系统设计
    • 软件设计
      • 软件架构
      • 编程范式
      • 系统设计题
      • 设计原则
      • 计算机程序的构造和解释 SICP
    • 领域驱动设计
      • 应用:在线请假考勤管理
      • 应用: library
    • 微服务与云原生
      • Designing and deploying microservices
      • 容器技术
      • Docker
      • Etcd
      • Kubernetes
        • Kubernetes - Mapping External Services
      • Istio
      • 监控
    • 分布式系统
      • 分布式理论
      • 分布式事务
    • 后端存储设计
      • 缓存设计
      • 数据库架构设计
    • CI/CD
    • 设计最佳实践
    • 测试
    • 安全
    • 综合
      • 开发实践
      • 分布式锁
      • 分布式计数服务
      • 弹幕系统设计
      • 消息队列设计
      • 分布式ID生成算法
      • 限流设计
      • 网关设计
      • 通用的幂等设计
      • 分布式任务调度
        • Timer
        • ScheduledExecutorService
        • Spring Task
        • Quartz
      • 交易系统
      • 权限设计
  • 编程语言
    • 编程语言
    • C & C++
    • Java
      • JVM
        • JVM Bytecode
      • Java 核心技术
      • Java 8 新特性
      • Java 集合框架
      • Java NIO
      • 并发编程
        • 线程生命周期与线程中断
        • 三个线程交替打印
        • 两个线程交替打印奇偶
        • 优雅终止线程
        • 等待通知机制
        • 万能钥匙:管程
        • 限流器
        • 无锁方案 CAS
    • Java 源码阅读
      • Unsafe
      • 异步计算 Future
      • Java Queue
      • CoalescingRingBuffer 分析
      • Java Collections
        • PriorityQueue 分析
        • HashMap 分析
        • TreeMap
    • Golang
    • Python
  • 框架/组件/类库
    • Guava
      • Guava Cache
      • Guava EventBus
    • RxJava
    • Apache MINA
    • Netty
      • 网络 IO 模型
      • Netty 生产问题
    • Apache Tomcat
    • MyBatis
    • 限流框架
    • Spring Framework
      • Spring Core
      • Spring 事务管理
    • Spring Boot
    • Spring Cloud
      • Feign & OpenFeign
      • Ribbon
      • Eurake
      • Spring Cloud Config
    • FixJ
    • Metrics
    • Vert.x
  • 中间件
    • Redis
      • Redis 基础
        • Redis 数据结构设计与实现
        • Redis 高性能网络模型
      • Redis checklist
      • 应用案例 - Redis 数据结构
      • 应用案例 - Redis 缓存应用
      • 应用案例 - Redis 集群
      • Redis 客户端
      • Redis 生产案例
        • [译] 在 Redis 中存储数亿个简单键值对
    • MySQL
      • MySQL 基础
      • MySQL Index
      • MySQL Transaction
      • MySQL 优化
      • MySQL 内核
      • MySQL Command
      • MySQL Checklist
      • MySQL Analysis Tool
      • 实现 MySQL
    • State Machine
    • 数据库连接池
    • MQ
      • 高性能内存队列 Disruptor
      • Kafka
      • Pulsar
      • RocketMQ
        • Broker 的设计与实现
      • NSQ
  • 实际案例
    • 线上 Case
      • Request Aborted
      • MySQL - Specified key was too long
      • Java 应用 CPU 100% 排查优化
      • 频繁 GC 导致的 Java 服务不响应
      • 导出优化
  • 大数据
    • 流计算
    • Flink
  • 其他
    • 工具
    • 读书
      • 设计数据密集型应用
      • 实现领域驱动设计
      • 精通比特币
      • 提问的智慧
    • 论文
    • 工程博客
    • 阅读源码
    • 面试
      • 如何在最短的时间里对对方有个全面的了解
    • 分享
    • 软技能
    • Todo
  • Blog
    • #算法
      • 查找
      • 位运算
      • 树
    • #架构
      • 1- 通信
    • Design & Dev & Opt
      • High Performance Data structure Design
  • Tiny Project
    • A Simple WeChat-like Instant Messaging Platform
由 GitBook 提供支持
在本页

这有帮助吗?

  1. 编程语言
  2. Java 源码阅读

CoalescingRingBuffer 分析

Coalescing Ring Buffer 解决的问题是:有效地缓冲生产者和消费者线程之间的消息,在这种情况下,只有给定主题的最新值才是感兴趣的,所有其他的消息都可以被立即丢弃。举个例子:

一个自动交易系统会一直监听股票交易所或者数字货币交易所的价格变动,以便找到价格估值较低的股票或数字货币,这些数据从交易所传来之后很可能被放入队列等待处理,但是在决定任何一个资产是否值得投资时,需要一定的时间,但是这个时候可能旧的价格已经被更新了,产生了最新的价格,而旧的价格就不是我们想要的了,因为只对最新的价格感兴趣,所以我们期望把旧的价格更新为新的价格,丢弃掉旧的额价格;Coalescing Ring Buffer 就是来解决这个问题的,它是一个缓冲区,可以容纳传入的价格更新,并检查是否可以更新一个现有的值,而不是在消费者准备好处理它们之前增长缓冲区。

Coalescing Ring Buffer 借鉴了一些 Disruptor 的设计原则:

  1. 使用数组作为数据结构,因为它们的内存位置

  2. 使用无锁并发,它避免了内核仲裁

  3. 采用单写原则,避免 cache line 争用

public final class CoalescingRingBuffer<K, V> implements CoalescingBuffer<K, V> {

    private final AtomicLong nextWrite = new AtomicLong(1); // the next write index
    private long lastCleaned = 0; // the last index that was nulled out by the producer
    private final AtomicLong rejectionCount = new AtomicLong(0);
    private final K[] keys;
    private final AtomicReferenceArray<V> values;

    @SuppressWarnings("unchecked")
    private final K nonCollapsibleKey = (K) new Object();
    private final int mask;
    private final int capacity;

    private volatile long firstWrite = 1; // the oldest slot that is is safe to write to
    private final AtomicLong lastRead = new AtomicLong(0); // the newest slot that it is safe to overwrite

    @SuppressWarnings("unchecked")
    public CoalescingRingBuffer(int capacity) {
        this.capacity = nextPowerOfTwo(capacity);
        this.mask = this.capacity - 1;

        this.keys = (K[]) new Object[this.capacity];
        this.values = new AtomicReferenceArray<V>(this.capacity);
    }

    private int nextPowerOfTwo(int value) {
        return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
    }

    @Override
    public int size() {

        // loop until you get a consistent read of both volatile indices
        while (true) {
            long lastReadBefore     = lastRead.get();
            long currentNextWrite   = this.nextWrite.get();
            long lastReadAfter      = lastRead.get();

            if (lastReadBefore == lastReadAfter) {
                return (int) (currentNextWrite - lastReadBefore) - 1;
            }
        }
    }

    @Override
    public int capacity() {
        return capacity;
    }

    public long rejectionCount() {
        return rejectionCount.get();
    }

    public long nextWrite() {
        return nextWrite.get();
    }

    public long firstWrite() {
        return firstWrite;
    }

    @Override
    public boolean isEmpty() {
        return firstWrite == nextWrite.get();
    }

    @Override
    public boolean isFull() {
        return size() == capacity;
    }

    @Override
    public boolean offer(K key, V value) {
        long nextWrite = this.nextWrite.get();

        for (long updatePosition = firstWrite; updatePosition < nextWrite; updatePosition++) {
            int index = mask(updatePosition);

            if(key.equals(keys[index])) {
                values.set(index, value);

                if (updatePosition >= firstWrite) {  // check that the reader has not read beyond our update point yet
                    return true;
                } else {
                    break;
                }
            }
        }

        return add(key, value);
    }

    @Override
    public boolean offer(V value) {
        return add(nonCollapsibleKey, value);
    }

    private boolean add(K key, V value) {
        if (isFull()) {
            rejectionCount.lazySet(rejectionCount.get() + 1);
            return false;
        }

        cleanUp();
        store(key, value);
        return true;
    }

    private void cleanUp() {
        long lastRead = this.lastRead.get();

        if (lastRead == lastCleaned) {
            return;
        }

        while (lastCleaned < lastRead) {
            int index = mask(++lastCleaned);
            keys[index] = null;
            values.lazySet(index, null);
        }
    }

    private void store(K key, V value) {
        long nextWrite = this.nextWrite.get();
        int index = mask(nextWrite);

        keys[index] = key;
        values.set(index, value);

        this.nextWrite.lazySet(nextWrite + 1);
    }

    @Override
    public int poll(Collection<? super V> bucket) {
        return fill(bucket, nextWrite.get());
    }

    @Override
    public int poll(Collection<? super V> bucket, int maxItems) {
        long claimUpTo = min(firstWrite + maxItems, nextWrite.get());
        return fill(bucket, claimUpTo);
    }

    private int fill(Collection<? super V> bucket, long claimUpTo) {
        firstWrite = claimUpTo;
        long lastRead = this.lastRead.get();

        for (long readIndex = lastRead + 1; readIndex < claimUpTo; readIndex++) {
            int index = mask(readIndex);
            bucket.add(values.get(index));
            values.set(index, null);
        }

        this.lastRead.lazySet(claimUpTo - 1);
        return (int) (claimUpTo - lastRead - 1);
    }

    private int mask(long value) {
        return ((int) value) & mask;
    }

}

上一页Java Queue下一页Java Collections

最后更新于4年前

这有帮助吗?

via:

Blog is

and

源码地址
The Coalescing Ring Buffer
Talk about Coalescing Ring Buffer
Talk's PPT