iOS/Mac 平台下 apple 提供了非常好用的 dispatch_queue 能够很方便的进行线程的管理以及各个线程之间的切换(当然还有很多其他特性)。虽说 C++ 的标准库中提供了很多线程管理的方法,但相比于 dispatch_queue 还是弱爆了。由于项目中会经常用到,GitHub 上找了一些类似的实现都不太理想,于是自己实现了一版简单的主要支持一下特性:
- 支持并发调用,并支持并发的任务处理
- 支持任务的同步执行和异步执行
- 可创建指定数量的任务线程
- 执行同步任务时,在任务线程中可继续执行同步任务而不卡死
- 同一任务可重复执行
- 支持 lambda 表达式
- 支持任务线程的安全退出
结构设计

上图是 DispatchQueue 类结构图,结构比较简单,DispatchQueue 是核心抽象类;QueueTask 就是任务的抽象基类了,ClosureTask 类是一个模板类,主要用来实现 lambda 表达式;DisruptorImp 与 MutexQueueImp 则是两个具体的 DispatchQueue 的实现,disruptor 包是一个第三方库,后面会有详细介绍,。
DispatchQueue 抽象类的定义
以下是 DispatchQueue 接口的定义和实现:
|
|
DispatchQueue 是基于建造者模式进行设计,将同步方法、异步方法对 lambda 表达式的支持,放在父类中实现,因为这部分代码相对固定,而具体的同步与异步的实现都推迟到子类中实现,因为这部分可以有不同的实现方式。抽象的父类相对简单,只实现了对 lambda 的支持,在来看看 QueueTask 的类设计如下:
|
|
virtual void run() = 0 是纯虚函数用来给子类重载,signal()/wait() 是两个具体方法用来支持任务的同步执行,在生产线程中执行 wait() 方法用来等待当前任务执行完成,而在任务线程中 QueueTask 执行完成后会调用 signal 来通知生产线程完成等待。reset 方法可以让该任务执行完成后重置内部状态,以便可继续将当前任务添加都队列中。ClosureTask 模板类则用来包装 lambda 表达式。
基于 std::mutex/std::queue 的实现
基于标准库实现的思路很简单,使用标准库中提供的 std::mutex 和 std::queue ,在进行插入任务和执行任务时,对任务队列进行加锁操作,这里使用递归锁 std::recursive_mutex 而非 std::mutex。具体实现代码如下:
|
|
该类除了实现父类的sync_imp/async_imp 方法外,还有用来创建线程的 create_thread 方法,该方法每调用一次可以产生一个新的任务线程;在类的析构方法中会停止当前所有线程,并等待线程的安全退出。
在 MutexQueueImp 实现中,是典型的 生产者-消费者 线程模型,在 async_imp 方法中将任务插入到队列中,并通知任务线程;任务线程接收到任务加入队列的信号后,循环的从任务队列中取出任务,当所有任务处理完成,进入到休眠模式,直到下一个任务加入队列。
在 sync_imp 方法中会判断当前线程是否为任务线程,如果是任务线程,并且只有一个任务线程时,则直接执行任务,以免造成当前线程等待自己的情况,以免造成死锁。
基于 disruptor 的实现
Disruptor 最初是在 Java 上被发明的,这里使用它的 C++ 实现版本原理和 Java 版本是一致的。但由于 Disruptor 是基于 发布者-订阅者 的分发模型,所以当一个任务来到时,所以等待的线程,都将被唤醒,该任务可能被多个线程同时执行,在当前我们的 Dispatch Queue 中是不被允许的,所以只有在单线程时,才会使用 Disruptor 来作为 Dispatch Queue 的实现,以确保高效和正确性。
C++ 版本的阻塞等待工具类 BlockingStrategy 的实现有错误,无法唤醒,所以不要使用 BlockingStrategy 作为等待策略。
以下是 disruptor 实现的主要代码:
|
|
DisruptorImp 类的结构与 MutexQueueImp 基本一致,只有换成了 Disruptor 实现,至于对 Disruptor 的使用可参看文章后面的链接即可。最后 DispatchQueue 接口对象的创建使用一下方法来创建:
|
|
总结
DispatchQueue 可以说满足了基本的线程管理的需要,配合 lambda 表达式,使用起来也非常方便了。当然你也把它当做线程池来使用。后续如果有需要可以方便的扩展其他特性例如:延迟执行、任务之间的依赖关系等,完整的代码可在我的 GitHub 上找到。