CloudNativeEra
  • Introduction
  • 名词解释
  • Computer Science
    • Computer Organization
      • CPU
      • 二进制、电路、加法器、乘法器
      • 编译、链接、装载
      • 存储器
      • IO
    • Operating System
      • 操作系统基础知识
      • 系统初始化
      • 进程管理
      • Everything about Memory
      • 文件系统
      • 并行编程
      • Linux
        • CPU
        • IO 多路复用
        • DMA IO and Linux Zero Copy
    • Computer Network
      • 网络相关命令
      • 评估系统的网络性能
      • 网络抓包
      • Linux 最多支撑的 TCP 连接
      • 网络虚拟化
      • DHCP 工作原理
    • Data Structure and Algorithm
      • 题目列表
      • Summarize
        • 方法总结
        • 二分思想
        • 树的演化
        • 算法思想总结
      • Data Structure
        • Data Struct - Array
        • Tree
        • Heap
        • Hash
        • 字符串
      • Algorithm
        • Sorting Algorithm
        • 查找
        • 贪心算法
        • 动态规划
        • 位运算
      • Practice Topics
        • Data Struct in SDK
        • Topic - Tree
        • Topic - Graph
        • Topic - 滑动窗口
        • 剑指 Offer 题解
    • 并发编程
      • 并发模式
      • 并发模型
  • 系统设计
    • 软件设计
      • 软件架构
      • 编程范式
      • 系统设计题
      • 设计原则
      • 计算机程序的构造和解释 SICP
    • 领域驱动设计
      • 应用:在线请假考勤管理
      • 应用: library
    • 微服务与云原生
      • Designing and deploying microservices
      • 容器技术
      • Docker
      • Etcd
      • Kubernetes
        • Kubernetes - Mapping External Services
      • Istio
      • 监控
    • 分布式系统
      • 分布式理论
      • 分布式事务
    • 后端存储设计
      • 缓存设计
      • 数据库架构设计
    • CI/CD
    • 设计最佳实践
    • 测试
    • 安全
    • 综合
      • 开发实践
      • 分布式锁
      • 分布式计数服务
      • 弹幕系统设计
      • 消息队列设计
      • 分布式ID生成算法
      • 限流设计
      • 网关设计
      • 通用的幂等设计
      • 分布式任务调度
        • Timer
        • ScheduledExecutorService
        • Spring Task
        • Quartz
      • 交易系统
      • 权限设计
  • 编程语言
    • 编程语言
    • C & C++
    • Java
      • JVM
        • JVM Bytecode
      • Java 核心技术
      • Java 8 新特性
      • Java 集合框架
      • Java NIO
      • 并发编程
        • 线程生命周期与线程中断
        • 三个线程交替打印
        • 两个线程交替打印奇偶
        • 优雅终止线程
        • 等待通知机制
        • 万能钥匙:管程
        • 限流器
        • 无锁方案 CAS
    • Java 源码阅读
      • Unsafe
      • 异步计算 Future
      • Java Queue
      • CoalescingRingBuffer 分析
      • Java Collections
        • PriorityQueue 分析
        • HashMap 分析
        • TreeMap
    • Golang
    • Python
  • 框架/组件/类库
    • Guava
      • Guava Cache
      • Guava EventBus
    • RxJava
    • Apache MINA
    • Netty
      • 网络 IO 模型
      • Netty 生产问题
    • Apache Tomcat
    • MyBatis
    • 限流框架
    • Spring Framework
      • Spring Core
      • Spring 事务管理
    • Spring Boot
    • Spring Cloud
      • Feign & OpenFeign
      • Ribbon
      • Eurake
      • Spring Cloud Config
    • FixJ
    • Metrics
    • Vert.x
  • 中间件
    • Redis
      • Redis 基础
        • Redis 数据结构设计与实现
        • Redis 高性能网络模型
      • Redis checklist
      • 应用案例 - Redis 数据结构
      • 应用案例 - Redis 缓存应用
      • 应用案例 - Redis 集群
      • Redis 客户端
      • Redis 生产案例
        • [译] 在 Redis 中存储数亿个简单键值对
    • MySQL
      • MySQL 基础
      • MySQL Index
      • MySQL Transaction
      • MySQL 优化
      • MySQL 内核
      • MySQL Command
      • MySQL Checklist
      • MySQL Analysis Tool
      • 实现 MySQL
    • State Machine
    • 数据库连接池
    • MQ
      • 高性能内存队列 Disruptor
      • Kafka
      • Pulsar
      • RocketMQ
        • Broker 的设计与实现
      • NSQ
  • 实际案例
    • 线上 Case
      • Request Aborted
      • MySQL - Specified key was too long
      • Java 应用 CPU 100% 排查优化
      • 频繁 GC 导致的 Java 服务不响应
      • 导出优化
  • 大数据
    • 流计算
    • Flink
  • 其他
    • 工具
    • 读书
      • 设计数据密集型应用
      • 实现领域驱动设计
      • 精通比特币
      • 提问的智慧
    • 论文
    • 工程博客
    • 阅读源码
    • 面试
      • 如何在最短的时间里对对方有个全面的了解
    • 分享
    • 软技能
    • Todo
  • Blog
    • #算法
      • 查找
      • 位运算
      • 树
    • #架构
      • 1- 通信
    • Design & Dev & Opt
      • High Performance Data structure Design
  • Tiny Project
    • A Simple WeChat-like Instant Messaging Platform
由 GitBook 提供支持
在本页

这有帮助吗?

  1. 系统设计
  2. 综合
  3. 分布式任务调度

Timer

从源码层面分析 Timer

上一页分布式任务调度下一页ScheduledExecutorService

最后更新于4年前

这有帮助吗?

Timer 是 JDK 提供的一个任务调度类,了解它的设计和实现有助于我们了解调度的基本设计思路。

Timer 在 JDK 中的解释是:它是一个线程调度工具类,以后台线程的形式调度未来的任务执行,这些任务可能是一次性调度,也可能是特定间隔的重复调度;每个 Timer 对象绑定了一个后台线程用于执行和当前 timer 绑定的所有任务,这些任务被串形执行。

Timer 的设计

JDK 中的 Timer 调度设计,编程入口是 Timer 和 TimerTask

Timer 的设计是线程安全的,多个线程可以共用一个 Timer 而不需要使用额外的同步控制;但是 Timer 并不保证严格的实时保证,它是使用 object.wait(long timeout) 来调度任务的;Java 的 JUC 包中提供了ScheduledThreadPoolExecutor 来替代 Timer , 它提供了更多的能力。

源码解读

Timer 的实现主要有三个部分组成:TimerTask 用来定义需要被执行的任务;TaskQueue 用来存储将要被执行的任务,可以添加很多个 TimerTask;TimerThread 是一个单线程,用来处理 TaskQueue 中需要被调度的任务;可见,Timer 的模型是单线程处理所有要被调度的任务,任务按照 Task 的 nextExecutionTime 排序的方式存储在一个优先级队列中,加入任务时,是把任务加入到任务队列中,何时真的被调度由 TimerThread 控制。

注意:这里的 TaskQueue 的实现是基于二叉堆实现的优先级队列,关于数据结构堆看这里

// Timer.java
public class Timer {
    // 共享的优先级队列
    private final TaskQueue queue = new TaskQueue();
    private final TimerThread thread = new TimerThread(queue);
    // 在启动一个 Timer 时,默认会启动一个 TimerThread
    public Timer(String name, boolean isDaemon) {
        thread.setName(name);
        thread.setDaemon(isDaemon);
        // 启动 Timer 线程
        thread.start();
    }
    
    // ...
    // 将 TimerTask 加入任务队列,等待被调度
    private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");

        // Constrain value of period sufficiently to prevent numeric
        // overflow while still being effectively infinitely large.
        if (Math.abs(period) > (Long.MAX_VALUE >> 1))
            period >>= 1;

        // queue 本身不是线程安全的,使用同步锁来支持并发
        synchronized(queue) {
            if (!thread.newTasksMayBeScheduled)
                throw new IllegalStateException("Timer already cancelled.");

            // 对 task 进行并发控制,防止多线程并发,加入到不同的 Timer 中,
            // 导致任务调度出错;也就是说一个 TimerTask 只能被成功加入一个 Timer 中调度
            synchronized(task.lock) {
                if (task.state != TimerTask.VIRGIN)
                    throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                task.nextExecutionTime = time;
                task.period = period;
                task.state = TimerTask.SCHEDULED;
            }

            // 入队
            queue.add(task);
            // ?
            if (queue.getMin() == task)
                queue.notify();
        }
    }
}

// TaskQueue
// TaskQueue 是一个 TimerTask 的优先级队列,使用二叉堆实现,按照 nextExecutionTime 构造
// 小顶堆
class TaskQueue {
    private TimerTask[] queue = new TimerTask[128];
    private int size = 0;
    ...
}

// TimerThread
// 这个是任务调度器的执行线程,会监听任务队列
class TimerThread extends Thread {
    // 调度器执行线程是否有效的标志
    boolean newTasksMayBeScheduled = true;
    // 任务队列
    private TaskQueue queue;
    
    public void run() {
        try {
            // 任务监听
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            // 如果该线程被强制杀死,被当作 Timer 取消的操作来处理
            synchronized(queue) {
                // 执行线程状态被修改,且队列被清空
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
    }
    
    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired; // 任务是否被触发
                synchronized(queue) {
                    // Wait for queue to become non-empty
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break; // Queue is empty and will forever remain; die

                    // Queue nonempty; look at first evt and do the right thing
                    long currentTime, executionTime;
                    task = queue.getMin();
                    synchronized(task.lock) {
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    if (!taskFired) // Task hasn't yet fired; wait
                        queue.wait(executionTime - currentTime);
                }
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }
}

Timer 设计的难点是队列在多线程环境下的线程安全,Timer 采用同步块的方式,synchronized(queue) 锁住队列后,进行操作;此外,TimerTask 的设计也需要注意 Task 应该只在一个 Timer 中被调度,以免出现并发问题,如果 一个 TimerTask 试图加入到两个 Timer 中,会抛出异常;TimerTask 的状态流转:

Timer 的设计
TimerTask 状态流转