此章节主要内容有,等待时间或条件、使用future、限时等待、简化代码四个部分
下面使用std::this_thread::sleep_for实现线程的休眠,休眠的时候虽然不工作,但是它是一个间隔休眠在不断工作的任务,这样浪费了资源
#include <mutex>
#include <thread>
#include <iostream>
using namespace std;
bool flag=false;
;
mutex m
void wait_for_flag() {
<mutex>lk(m);
unique_lockwhile (!flag) {//线程一直处于间隔休眠的工作状态
.unlock();
lk::sleep_for(chrono::milliseconds(100));//休眠0.1s
this_thread.lock();
lk}
<< "flag is true" << endl;
cout }
void change_for_flag() {
::sleep_for(chrono::milliseconds(10000));//休眠10s
this_thread<mutex>lk(m);
unique_lock= true;
flag }
int main()
{
([]()->void {
thread a();
wait_for_flag});
([]()->void {
thread b();
change_for_flag});
.join(); b.join();
a//10秒钟后才会输出flag is true
return 0;
}
C++标准库中条件变量有两套实现,std::condition_variable和std::condition_variable_any,前者只能配和mutex使用,后者可以和合适的互斥量一起工作,值得注意的是,调用条件变量的notify_one或者notify_all并不是进行解锁,而是通知等待队列上线程可以去竞争锁了,如果拿到了锁则调用wiat谓词判断条件。
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
using namespace std;
bool flag=false;
;
mutex m;
condition_variable flag_condition
void wait_for_flag() {
<mutex>lk(m);
unique_lock//当wait时,首先会先unlock掉lk,将线程出于阻塞或等待状态
.wait(lk, [] {
flag_condition<< "wait check" << endl;
cout return flag;//返回false才会进行等待
});
//当被notify后重新获得锁m,再次进行等待条件检查
//条件满足时则不再wait,条件不满足时则在此进入wait
<< "flag is true" << endl;
cout }
void change_for_flag() {
::sleep_for(chrono::milliseconds(10000));//休眠10s
this_thread<mutex>lk(m);
unique_lock= true;
flag .notify_one();
flag_condition//挑一个等待的线程唤醒,notify_one并不会将锁释放掉,而是通知等待状态的线程可以去竞争锁了
<< "notify_one" << endl;
cout ::sleep_for(chrono::milliseconds(5000));
this_thread<< "15s end" << endl;
cout //最后由lk析构将锁释放掉
}
/*
wait check
notify_one
15s end
wait check
flag is true
*/
int main()
{
([]()->void {
thread a();
wait_for_flag});
([]()->void {
thread b();
change_for_flag});
.join(); b.join();
areturn 0;
}
#include <mutex>
#include <thread>
#include <iostream>
#include <queue>
#include <condition_variable>
#include <memory>
using namespace std;
template<typename T>
class safe_queue {
private:
<T> m_queue;
queuemutable mutex m_mutex;
m_condition;
condition_variable public:
() = default;
safe_queue(safe_queue const& other) {
safe_queue//safe_queue是safe_queue的友元哦
<mutex> lk(other.m_mutex);
lock_guard//为什么不需要锁住自已,因为这里是构造函数啊
m_queue = other.m_queue;
}
void push(T value) {
<mutex>lk(m_mutex);
lock_guardm_queue.push(value);
m_condition.notify_all();
}
&& wait_and_pop() {
T<mutex> lk(m_mutex);
unique_lockm_condition.wait(lk, [this] {return !m_queue.empty(); });
= m_queue.front();
T value m_queue.pop();
return std::move(value);
}
bool try_pop(T&value) {
<mutex> lk(m_mutex);
lock_guardif (m_queue.empty()) {
return false;
}
= m_queue.front();
value m_queue.pop();
return true;
}
bool empty() const {
<m_mutex> lk(m_mutex);
lock_guardreturn m_queue.empty();
}
};
int main()
{
<int> m_queue;
safe_queue<int>info = {1,2,3,4,5,6,7,8,9,10};
vector(
thread a[&]{
for (auto const& item : info) {
//this_thread::sleep_for(chrono::milliseconds(1000));
m_queue.push(item);
}
});
(
thread b[&]()->void {
for (int i = 0; i < 10; i++) {
int&& data = m_queue.wait_and_pop();
<< data << endl;
cout }
});
.join(); b.join();
areturn 0;
}
future有两种一种为std::future<>一种为std::shared__future<>,其区别和std::unique_ptr<>与std::shared__ptr<>类似,std::thread执行的任务不能有返回值,但可以future进行解决
#include<iostream>
#include<future>
using namespace std;
int async_task() {
::sleep_for(chrono::milliseconds(5000));
this_threadreturn 999;
}
int main() {
<int> res = std::async(async_task);
futureint data=res.get();//阻塞直到async_task任务返回
<< data << endl;//999
cout <int> res1 = async(async_task);
shared_future<int> res2 = res1;
shared_future<< res1.get() << " " << res2.get() << endl;//999 999
cout return 0;
}
其规则与std::thread使用的方式类似
#include<iostream>
#include<future>
using namespace std;
int async_sum(int a,int b) {
return a + b;
}
class Task {
public:
int sum(int const & a,int const & b) {
return a + b;
}
};
class Runable {
public:
int operator()(int a,int b) {
return a + b;
}
};
int main() {
//函数指针
auto future1=async(async_sum,1,2);
;
Task taskint a = 1, b = 2;
//对象方法
auto future2 = async(&Task::sum,task,std::ref(a),std::ref(b));
//可执行对象
auto future3 = async(Runable(),1,2);
<< future1.get() << " " << future2.get() << " " << future3.get() << endl;//3 3 3
cout return 0;
}
async第一个参数可以为std::launch
1、std::launch::async为开启新线程执行任务其也是默认参数
2、std::launch::deferred为调用延迟到wait()或get()调用时才执行
#include<iostream>
#include<future>
using namespace std;
void func1() {
<< "async and deferred" << endl;
cout }
int main() {
<void> future1 = async(launch::async,func1);//马上执行
futureauto future2 = async(launch::deferred,func1);//推迟执行等待wait或get
auto future3 = async(func1);//默认为launch::async
.wait();
future1.get();
future3.get();
future2/*
async and deferred
async and deferred
async and deferred
*/
return 0;
}
packaged_task作为线程函数传递 到 std::thread 对象中,或作为可调用对象传递到另一个函数中或直接调用
#include<iostream>
#include<future>
using namespace std;
int func(int a,int b) {
return a + b;
}
void run(packaged_task<int(int, int)> &task) {
(1,2);
task<< "started task" << endl;
cout }
int main() {
<int(int, int)> task(func);
packaged_task
([&] {
thread a::sleep_for(chrono::milliseconds(3000));
this_thread(task);//执行任务
run});
<int> res=task.get_future();
future<< res.get() << endl;
cout .join();
a//started task
//3
return 0;
}
有意思的任务队列
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>
#include <iostream>
using namespace std;
std::mutex m;
std::deque<std::packaged_task<int()> > tasks;//任务队列
bool run_state = true;
void process_func()//消息处理队列
{
while (run_state)
{
std::packaged_task<int()> task;
{
std::lock_guard<std::mutex> lk(m);
if (tasks.empty())
continue;
= std::move(tasks.front());
task .pop_front();
tasks}
();//调用task并不会开启新线程,task只是一个可调用对象
task}
}
std::thread process_thread(process_func);//开启任务处理线程
template<typename Func>
std::future<int> post_task(Func f)
{
std::packaged_task<int()> task(f);
std::future<int> res = task.get_future();
std::lock_guard<std::mutex> lk(m);
.push_back(std::move(task));
tasksreturn res;
}
int f() {
static int num = 0;
::sleep_for(chrono::milliseconds(100));//模拟任务时常
this_threadreturn ++num;
}
<future<int>>results;
deque
int main() {
([&] {
thread afor (int i = 0; i < 100; i++) {
.push_back(post_task(f));
results}
});
.join();//提交任务到队列完毕
a
([&] {
thread reading_resfor (auto& item : results) {
<< item.get() << endl;
cout //当future关联的packaged_task被调用并返回结果后,才能get出结果
//在相应task没被调用或结束之前,调用get相会阻塞
}
});
.join();//任务全部处理完之后,才能关闭任务处理线程
reading_res= false;//关闭任务处理线程
run_state .join();
process_threadreturn 0;
}
std::promise也是一种产生future的一种方式,promise与future关联,future可以进行get的标志不再是相应任务进行返回,而是promise.set_value方法,传送future可以get的数据
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
using namespace std;
void task(std::promise<string> m_promise) {
m_promise.set_value("hello promise");
::sleep_for(chrono::milliseconds(2000));
this_thread<< "task end" << endl;
cout }
int main(int argc, char** argv)
{
std::promise<string> m_promise;
std::promise<string> m_promise_other;
m_promise.swap(m_promise_other);//swap交换
<string> m_future = m_promise.get_future();
futurem_thread([&] {
thread (std::move(m_promise));//赋值拷贝与拷贝构造是被禁止的,可以使用移动构造
task});
<< m_future.get() << endl;
cout m_thread.join();
//hello promise
//task end
//从功能上来看promise也是中线程通信的一种方式
return 0;
}
async
#include<iostream>
#include<stdexcept>
#include<future>
using namespace std;
void func() {
throw std::exception("func throw exception");
}
int main() {
<void>f = async(func);
futuretry {
.get();
f}
catch (const exception&e) {
<< e.what() << endl;//func throw exception
cout }
return 0;
}
packaged_task
#include<iostream>
#include<stdexcept>
#include<future>
using namespace std;
void func() {
throw std::exception("func throw exception");
}
int main() {
<void()> task(func);
packaged_task<void> f=task.get_future();
future();
tasktry {
.get();
f}
catch (exception& e) {
<< e.what() << endl;//func throw exception
cout }
return 0;
}
promise
#include<iostream>
#include<stdexcept>
#include<future>
#include<exception>
using namespace std;
void func(promise<void>m_promise) {
/*try {
throw runtime_error("error");
}
catch (...) {
m_promise.set_exception(std::current_exception());
}*/
//或者使用
m_promise.set_exception(std::make_exception_ptr(runtime_error("error")));
}
int main() {
<void> m_promise;
promise<void> f = m_promise.get_future();
future([&] {func(move(m_promise)); });
thread t1try {
.get();
f}
catch (const exception& e) {
<< e.what() << endl;//error
cout }
.join();
t1return 0;
}
future只能get一次,如果多个线程需要等待同一个future呢,应该怎样做呢
#include<thread>
#include<future>
#include<string>
#include<iostream>
using namespace std;
() {
string funcreturn "func";
}
int main() {
<string> f = async(func);
future//运行出错,存在竞争关系,f只能get一次
([&] {cout << f.get() << endl; });
thread t1([&] {cout << f.get() << endl; });
thread t2.join();
t1.join();
t2return 0;
}
使用shared_future。类模板std::shared_future
提供访问异步操作结果的机制,类似于std::future
,只是允许多个线程等待相同的共享状态。不像std::future
,因此只有一个实例可以引用任何特定的异步结果%29,std::shared_future
是可复制的,多个共享的未来对象可能引用相同的共享状态。如果每个线程通过自己的副本访问同一个共享状态,则从多个线程访问该状态是安全的。shared_future
对象。
#include<thread>
#include<future>
#include<string>
#include<iostream>
using namespace std;
() {
string funcreturn "func";
}
int main() {
<string> f = async(func);
future<< boolalpha << f.valid() << endl;//true
cout <string> sf(move(f));
shared_future//shared_future<string>sf = f.share();
<< boolalpha << f.valid() << endl;//false
cout
([&] {
thread t1<string> local = sf;
shared_future.get(); cout << "f ok" << endl;
local});
([&] {
thread t2<string> local = sf;
shared_future.get(); cout << "f ok" << endl;
local});
.join();
t1.join();
t2//f ok
//f ok
return 0;
}
限时有两种,一种是限制事件段另一种为限制时刻,前者通常以_for结尾,后者为__until作为后缀。
例如std::condition__variable的wait_for与wait__until
#include<thread>
#include<iostream>
#include<condition_variable>
#include<mutex>
#include<chrono>
using namespace std;
m_mutex;
mutex ;
condition_variable conditionint info=0;
int main() {
([&] {
thread t1<mutex> lk(m_mutex);
unique_lock//wait_for阻塞当前线程,直到条件变量被唤醒,或到指定时限时长后
//wait_unti阻塞当前线程,直到条件变量被唤醒,或直到抵达指定时间点
.wait_for(lk, chrono::duration(chrono::milliseconds(7000)), [&]()->bool {
conditionreturn info!=0;
});
<< "t1 wake up " << info << endl;//t1 wake up 0
cout });
.join();
t1//若超过7s时condition没有被notify,则t1尝试超时重新获取lk
return 0;
}
#include<iostream>
#include<chrono>
using namespace std;
//时钟就是时间信息的来源 时钟是一个类
//当前时间 时间类型 时钟节拍 稳定时钟
int main() {
//system_clock是可调的是不稳定时钟
::system_clock::time_point now= chrono::system_clock::now();
chrono<< now << endl;//2022-10-18 16:05:28.6659077
cout std::time_t time2 = chrono::system_clock::to_time_t(now);
<< time2 << endl;//1666111276
cout
//稳定时钟 steady_clock
::steady_clock::time_point time1= chrono::steady_clock::now();
chrono<< time1.time_since_epoch() << endl;//55111350630200ns
cout //clock 的纪元间的时间量的 duration
//high_resolution_clock最短滴答周期
::high_resolution_clock::time_point time3= chrono::high_resolution_clock::now();
chrono::high_resolution_clock::time_point time4 = chrono::high_resolution_clock::now();
chrono<< time4 - time3 << endl;//400ns
cout
//是否满足时钟要求C++ 20
bool res1=chrono::is_clock<chrono::system_clock>();
bool res2= chrono::is_clock<chrono::steady_clock>();
bool res3 = chrono::is_clock<chrono::high_resolution_clock>();
<< res1 << " " << res2 << " " << res3 << endl;//1 1 1
cout
//UTC时间 协调世界时间C++20
::utc_clock::time_point time5= chrono::utc_clock::now();
chrono::system_clock::time_point time6= chrono::utc_clock::to_sys(time5);
chrono//还有from_sys成员函数
<< chrono::system_clock::to_time_t(time6) << endl;//1666110621
cout
//tai_clock国际原子钟C++20
::tai_clock::to_utc(chrono::tai_clock::now());
chrono//chrono::tai_clock::from_utc(utc_time);
//gps时间时钟C++20
::gps_clock::to_utc(chrono::gps_clock::now());
chrono//chrono::gps_clock::from_utc(utc_time);
//用于文件时间的时钟C++20
::file_clock::now();
chrono//chrono::file_clock::from_utc();
//chrono::file_clock::to_utc();
return 0;
}
每种时钟都有自己的duration类型
#include<iostream>
#include<chrono>
using namespace std;
int main() {
::duration<short, std::ratio<60, 1>>;//分钟计 60秒1分钟
chrono::duration<double, std::ratio<1, 1000>>;//毫秒计 1秒1000毫秒
chrono
using namespace std::chrono_literals;//使用时间单位后缀
std::chrono::hours one_day = 24h;
std::chrono::minutes half_an_hour = 30min;
std::chrono::milliseconds m_time = 10ms;
std::chrono::nanoseconds n_time = std::chrono::nanoseconds(10);//10ns
<< n_time << endl;//10ns
cout
//转换
auto one_day_seconds=std::chrono::duration_cast<std::chrono::seconds>(one_day);
<< one_day_seconds << endl;//86400s
cout
//做差计算
auto sub=one_day - half_an_hour;
<< sub << endl;//1410min
cout << sub.count() << endl;//1410
cout
return 0;
}
上面有用到为future的wait_for传递时间段
#include<iostream>
#include<chrono>
#include<mutex>
#include<future>
#include<string>
using namespace std;
() {
string task::sleep_for(chrono::seconds(10));
this_threadreturn "hello";
}
int main() {
m_mutex;
mutex <string> res= async(task);
future//std::future_status::timeout 超时返回
//std::future_status::ready 状态已经改变
//std::future_status::deferred 任务延迟了
if (res.wait_for(chrono::seconds(5)) == std::future_status::ready) {
<< "没有超出时间" << endl;
cout }
else {
<< "超出时间" << endl;
cout }
//输出超出时间 wait_for返回了 timeout
.wait();
resreturn 0;
}
可见已经可以限制异步任务在一定时间范围内执行了,根据超时然后做出其他操作等
在上面的时钟里,可以看到有time_point数据类型
UNIX的时间戳表示1970年1月1日00:00,time_point的time_since_epoch表示指定时间点至UNIX时间戳的时间间隔
#include<iostream>
#include<chrono>
using namespace std;
int main() {
//chrono::high_resolution_clock::time_point
auto start = chrono::high_resolution_clock::now();
for(int i=0;i<10;i++){}
auto end = chrono::high_resolution_clock::now();
::duration time = end - start;
chrono<< time.count() << endl;//600滴答
cout return 0;
}
在条件等待中使用超时时间
#include<iostream>
#include<chrono>
#include<mutex>
#include<thread>
using namespace std;
int main() {
;
condition_variable cvm_mutex;
mutex bool done=false;
//time_point
auto const timeout = chrono::steady_clock::now() + chrono::milliseconds(2000);
<mutex> lk(m_mutex);
unique_lockwhile (!done) {
if (cv.wait_until(lk,timeout) == cv_status::timeout) {
<< "timeout" << endl;
cout break;
}
}
//输出timeout
return 0;
}
#include<iostream>
#include<thread>
#include<mutex>
#include<future>
using namespace std;
int main1() {
//std::this_thread::sleep_for();
//std::this_thread::sleep_until();
//condition_variable cv;//与condition_variable_any
//cv.wait_until(lock,time_point);
//cv.wait_for(lock,duration);
//返回std::cv_status::time_out或std::cv_status::no_timeout
//wait_for(lock, duration,predicate)
//wait_until(lock, duration,predicate)
//bool —— 当唤醒时,返回谓词的结果
;
timed_mutex tm//tm.try_lock_for(duration);
//tm.try_lock_until(point);//获取到锁返回true否则返回false
//unique_lock<timed_mutex> lk(tm, time);//time可为time_point或者duration
//lk.unlock();
//lk.try_lock_for();
//lk.try_lock_until();
//同时对future和shared_future支持
//future<void> res = async([]()->void {});
//res.wait_for();//当等待超时,返回std::future_status::timeout
//res.wait_until();
//当期望值准备就绪时,返回std::future_status::ready
//当期望值持有一个为启动的延迟函数,返回std::future_status::deferred
return 0;
}
非并发版本
#include <list>
#include <algorithm>
#include <iostream>
using namespace std;
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
std::list<T> result;
.splice(result.begin(), input, input.begin());
resultconst& pivot = *result.begin();//基准元素
T auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });//使用划分函数
std::list<T> lower_part;
.splice(lower_part.end(), input, input.begin(), divide_point);//左边部分裁剪到lower_part
lower_part
auto new_lower(sequential_quick_sort(std::move(lower_part)));//递归左边且拷贝
auto new_higher(sequential_quick_sort(std::move(input)));//递归右边且拷贝
.splice(result.end(), new_higher);//右边部分加入到result
result.splice(result.begin(), new_lower);//左边的部分加入到前面
resultreturn result;//返回结果
}
int main2() {
<int> m_list = { 23,43,12,4,4,6,7,34,6,75 };
list<int> res = sequential_quick_sort<int>(m_list);
listfor (auto const& item : res) {
<< item << " ";
cout }
<< endl;
cout //4 4 6 6 7 12 23 34 43 75
return 0;
}
并发版本
#include <list>
#include <algorithm>
#include <future>
#include<iostream>
using namespace std;
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
std::list<T> result;
.splice(result.begin(), input, input.begin());
resultconst& pivot = *result.begin();
T //划分
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });
std::list<T> lower_part;
.splice(lower_part.end(), input, input.begin(),
lower_part);
divide_point//开启异步任务对做部分快速排序
std::future<std::list<T> > new_lower(
std::async(¶llel_quick_sort<T>, std::move(lower_part)));
//主线程对右边排序
auto new_higher(
(std::move(input)));
parallel_quick_sort.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());//future.get
resultreturn result;
}
int main3() {
<int> m_list = { 23,43,12,4,4,6,7,34,6,75 };
list<int> res = parallel_quick_sort<int>(m_list);
listfor (auto const& item : res) {
<< item << " ";
cout }
<< endl;
cout //4 4 6 6 7 12 23 34 43 75
return 0;
}
#include <future>
#include <iostream>
#include <thread>
#include <type_traits>
using namespace std;
template<typename F, typename A>
std::future<std::result_of<F(A&&)>::type>
(F&& f, A&& a)
spawn_task{
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type(A&&)> task(std::move(f));//pack
std::future<result_type> res(task.get_future());
std::thread t(std::move(task), std::move(a));//run package
.detach();//线程分离
treturn res;//return future
}
void task(int a) {
<< a << endl;
cout }
int main4() {
<void> res = spawn_task(task, a);
future.get();
resreturn 0;
}
目前仍在实验阶段,编译器的支持不一致,暂时不学习
https://zh.cppreference.com/w/cpp/experimental/concurrency
(并发 TS)
futurestd::future 版本
以持续和其他特性增强的 (类模板)
(并发 TS)
shared_future std::shared_future 版本
以持续和其他特性增强的 (类模板)
(并发 TS)使用 std::experimental::future 的 std::promise 修改版本
promise(类模板)
(并发 TS)
packaged_taskstd::experimental::future 的 std::packaged_task 修改版本
使用 (类模板)
(并发 TS)
when_all
产生在所有给定 furure 或 shared_future 均就绪时成为就绪的 future(函数模板)
(并发 TS)
when_any
产生在至少一个给定 future 或 shared_future 就绪时成为就绪的 future(函数模板)
(并发 TS)
make_ready_future
产生立即就绪并保有指定值的 future(函数模板)
(并发 TS)
make_exceptional_future
产生立即就绪并保有给定异常的 future(函数模板)
定义于头文件 <experimental/latch>
latch(并发 TS)
单次使用的线程屏障 (类)
定义于头文件 <experimental/barrier>
barrier(并发 TS)
可复用线程屏障(类)
flex_barrier(并发 TS)
带有在完成时的可定制行为的可复用线程屏障 (类)
这些类模板替换 shared_ptr 的原子函数重载
定义于头文件 <experimental/atomic>
atomic_shared_ptr(并发 TS)
std::shared_ptr 的原子版本
(类模板)
atomic_weak_ptr
(并发 TS)
std::weak_ptr 的原子版本 (类模板)