std::async#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#include <chrono>
#include <future>
#include <iostream>
#include <string>
#include <thread>
// async 获取异步返回值
std::string fetchDataFromDB(std::string query) {
std::cout << "Subthread suspended is fetching data from DB..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(4));
return "Data: " + query;
}
void use_async() {
// 异步获取数据
std::future<std::string> fetchedData =
std::async(std::launch::async, fetchDataFromDB, "query");
// 主线程正常进行
for (int i = 0; i < 10; i++) {
std::cout << "Main thread is doing something else x" << i + 1 << " time(s)."
<< std::endl;
}
// get() 是阻塞的, 只有数据返回后才会继续执行
std::string data = fetchedData.get();
std::cout << data << std::endl;
}
int main()
{
use_async();
return 0;
}
|
async 的启动模式#
std::launch::async 立即执行
std::launch::defer 调用std::future::get时才会执行
std::launch::deferred|std::launch::async 不同的环境执行结果不同
std::future#
std::future::get 调用时会阻塞,调用后std::future就会失效
std::future::wait 阻塞调用,如果任务完成就会返回,没有则继续等待,可以多次调用,但是一旦调用了std::future::get 因为对象std::future的失效,就会异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
std::string fetchDataFromDB(std::string query) {
std::cout << "Subthread suspended is fetching data from DB..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(4));
return "Data: " + query;
}
void use_async_future_wait() {
// 异步获取数据
std::future<std::string> fetchedData =
std::async(std::launch::async, fetchDataFromDB, "query");
// 主线程正常进行
for (int i = 0; i < 10; i++) {
std::cout << "Main thread is doing something else x" << i + 1 << " time(s)."
<< std::endl;
}
// get() 是阻塞的, 只有数据返回后才会继续执行
std::cout << "first call `fetchedData.get()` " << std::endl;
fetchedData.wait();
// std::string data = fetchedData.get();
// std::cout << data << std::endl;
for (int i = 10; i < 20; i++) {
std::cout << "Main thread is doing something else x" << i + 1 << " time(s)."
<< std::endl;
}
std::cout << "call `future::wait()` again " << std::endl;
fetchedData.wait();
}
|
std::packaged_task#
std::packaged_task可以将异步任务包装成std::future,运行在另一个线程上,可以捕获任务的返回值或异常
使用:
- 创建一个
std::packaged_task 对象,包装一个任务
- 调用
std::packaged_task::get_future 获取一个std::future对象
- 在另一个线程上调用
std::packaged_task的operator()执行任务
- 调用
std::future::get获取返回值或异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
int my_task() {
std::this_thread::sleep_for(std::chrono::seconds(4));
std::cout << "Subthread suspended is running task..." << std::endl;
return 42;
}
void use_packaged_task() {
std::packaged_task<int()> task(my_task);
for (int i = 0; i < 10; i++) {
std::cout << "Main thread is doing something else x" << i + 1 << " time(s)."
<< std::endl;
}
std::cout << "Call `task.get_future()`" << std::endl;
auto res = task.get_future();
std::cout << "Get result from `my_task()`" << std::endl;
// 需要开辟一个子线程处理`std::packaged_task` 必须使用`std::move` std::packaged_task只支持移动语义
std::thread t(std::move(task));
t.detach();
std::cout << res.get();
}
|
std::promise#
std::promise也可以在线程中获取异步返回值,保存在std::future中,在另外一个线程中获取这个值或异常。
与std::packaged_task不同的是,std::packaged_task绑定的是一个函数,也就是只有等待函数执行完才能拿到结果,
而std::promise 可以在函数中的任意一步定义,拿到值之后可以直接在其他线程中得到值。
std::promise也是只支持移动操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
void set_value(std::promise<int> prom)
{
std::cout << "Subthread suspended is setting value..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(4));
prom.set_value(42);
std::cout << "promise has set value to 42" << std::endl;
}
void use_promise()
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t(set_value, std::move(prom)); // 开辟的子线程会运行,但只有通过`std::future`对象才能获取到`std::promise`中的值
std::cout << "Main thread is doing something else..." << std::endl;
std::cout << "Waiting for subthread to set value "<< std::endl;
std::cout << "get value for subthread: " << fut.get() << std::endl;
t.join();
}
// 使用promise 捕获异常
void promise_one_exception(std::promise<void> promise) {
try {
throw std::runtime_error("Oops, Error");
} catch (...) {
promise.set_exception(std::current_exception());
}
}
void use_promise_exception() {
std::promise<void> promise;
std::future<void> future = promise.get_future();
std::thread t(promise_one_exception, std::move(promise));
std::cout << "Main thread is doing something else..." << std::endl;
std::cout << "Waiting for subthread to set value " << std::endl;
future.wait();
try {
future.get();
} catch (std::exception &e) {
std::cout << "Caught exception: " << e.what() << std::endl;
}
t.join();
}
// 搭配std::shared_future
void function(std::promise<int> &&prom) {
std::this_thread::sleep_for(std::chrono::seconds(10));
prom.set_value(10);
}
void threadfunction(std::shared_future<int> future) {
try {
int result = future.get();
std::cout << "Result: " << result << std::endl;
} catch (std::future_error &e) {
std::cout << "future error: " << e.what() << std::endl;
}
}
void use_shared_future() {
std::promise<int> prom;
std::shared_future<int> future = prom.get_future();
std::thread t1(function, std::move(prom));
std::thread t2(threadfunction,
future); // 不可以通过 std::move(future)的方式传递
std::thread t3(threadfunction, future);
t1.join(), t2.join(), t3.join();
}
|
last but not least#
C++线程池的一种实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
#include <condition_variable>
#include <cstddef>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <queue>
#include <thread>
#include <utility>
#include <vector>
class ThreadPool {
using TASK = std::packaged_task<void()>;
private:
static std::shared_ptr<ThreadPool> _instance;
std::atomic<int> _thread_count;
std::atomic<bool> _stop;
std::mutex _mtx;
std::condition_variable _cond;
std::vector<std::thread> _workers;
std::queue<TASK> _tasks;
private:
void work_thread() {
while (!this->_stop.load()) {
TASK task;
{
std::unique_lock<std::mutex> lock(this->_mtx);
this->_cond.wait(lock, [this] {
return this->_stop.load() || !this->_tasks.empty();
});
if (this->_tasks.empty())
return;
task = std::move(this->_tasks.front());
this->_tasks.pop();
}
this->_thread_count--;
task();
this->_thread_count++;
}
}
void start() {
for (int i = 0; i < _thread_count; ++i) {
_workers.emplace_back(&ThreadPool::work_thread, this);
}
}
void stop() {
_stop.store(true);
_cond.notify_all();
for (auto &t : _workers) {
if (t.joinable()) {
std::cout << "Join thread with id = " << t.get_id() << std::endl;
t.join();
}
}
}
ThreadPool(size_t thread_count = 4) : _stop(false) {
if (thread_count <= 1)
_thread_count = 1;
else
_thread_count = thread_count;
start();
}
ThreadPool(const ThreadPool &other) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
public:
~ThreadPool() { stop(); }
static std::shared_ptr<ThreadPool> GetInstance() {
std::once_flag s_flag;
std::call_once(s_flag, [&]() {
if (_instance == nullptr)
_instance = std::shared_ptr<ThreadPool>(new ThreadPool(4));
});
return _instance;
}
template <class F, class... Args>
auto commit(F &&f, Args &&...args) -> std::future<decltype(f(args...))> {
using RETURN_TYPE = decltype(f(args...));
if (_stop.load())
return std::future<RETURN_TYPE>{};
auto task = std::make_shared<std::packaged_task<RETURN_TYPE()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<RETURN_TYPE> res = task->get_future();
{
std::unique_lock<std::mutex> lck(_mtx);
_tasks.emplace([task]() { (*task)(); });
}
_cond.notify_one();
return res;
}
auto available_threads() const { return _thread_count.load(); }
};
std::shared_ptr<ThreadPool> ThreadPool::_instance = nullptr;
int main() {
auto res = ThreadPool::GetInstance()->commit(
[](int a, int b) { return a + b; }, 10, 20);
std::cout << res.get() << std::endl;
};
|
注意,在GetInstance()中,只能使用std::shared_ptr<ThreadPool>(new ThreadPool);的方式构造 因为make_shared无法访问私有构造函数。