|
gr的多线程同步控制boost::barrier同步点(主线程控制所有子线程的同步)
CMakeLists.txt的配置:
- find_package(Boost)
- # Found Boost: /usr/local/include (found version "1.69.0")
- target_link_libraries(main -lboost_system -lboost_thread -lpthread)
复制代码
- #include <iostream>
- #include <list>
- #include <string>
- #include <unistd.h>
- #include <boost/shared_ptr.hpp>
- #include <boost/make_shared.hpp>
- #include <boost/function.hpp>
- #include <boost/utility.hpp>
- #include <boost/thread/barrier.hpp>
- #include <boost/thread/condition_variable.hpp>
- #include <boost/thread/locks.hpp>
- #include <boost/thread/mutex.hpp>
- #include <boost/thread/thread.hpp>
- #include <boost/thread/shared_mutex.hpp>
- typedef boost::thread thread;
- typedef boost::mutex mutex;
- typedef boost::unique_lock<boost::mutex> scoped_lock;
- typedef boost::condition_variable condition_variable;
- typedef boost::barrier barrier;
- // boost::barrier是多线程的同步点,构造函数会要求传递一个数字,表示需要同步多少个线程
- typedef boost::shared_ptr<barrier> barrier_sptr;
- class thread_group : public boost::noncopyable
- {
- public:
- thread_group(){};
- ~thread_group()
- {
- for (std::list<thread *>::iterator it = m_threads.begin();
- it != m_threads.end();
- ++it)
- {
- delete (*it);
- }
- };
- thread *create_thread(const boost::function0<void> &threadfunc)
- {
- // 是不是这里只能用new的 堆空间的地址
- std::unique_ptr<thread> thrd(new thread(threadfunc));
- // thrd是智能指针, thrd.get()是裸指针,
- add_thread(thrd.get());
- // 没明白为什么要 release()
- // 如果使用智能指针而没有调用Release () ,它会自动释放,不会资源泄漏
- // release只是切断了联系, release返回的指针通常用来初始化另一个智能指针,或给智能指针赋值
- return thrd.release();
- };
- void add_thread(thread *thrd)
- {
- // 为什么要锁: 同时来创建线程? add_thread
- boost::lock_guard<boost::shared_mutex> guard(m_mutex);
- // For now we'll simply ignore requests to add a thread object
- // multiple times. Should we consider this an error and either
- // throw or return an error value?
- std::list<thread *>::iterator it =
- std::find(m_threads.begin(), m_threads.end(), thrd);
- BOOST_ASSERT(it == m_threads.end());
- if (it == m_threads.end())
- m_threads.push_back(thrd);
- };
- void remove_thread(thread *thrd)
- {
- boost::lock_guard<boost::shared_mutex> guard(m_mutex);
- // For now we'll simply ignore requests to remove a thread
- // object that's not in the group. Should we consider this an
- // error and either throw or return an error value?
- std::list<thread *>::iterator it =
- std::find(m_threads.begin(), m_threads.end(), thrd);
- BOOST_ASSERT(it != m_threads.end());
- if (it != m_threads.end())
- m_threads.erase(it);
- };
- void join_all()
- {
- boost::shared_lock<boost::shared_mutex> guard(m_mutex);
- for (std::list<thread *>::iterator it = m_threads.begin();
- it != m_threads.end();
- ++it)
- {
- (*it)->join();
- }
- };
- void interrupt_all()
- {
- boost::shared_lock<boost::shared_mutex> guard(m_mutex);
- for (std::list<thread *>::iterator it = m_threads.begin(),
- end = m_threads.end();
- it != end;
- ++it)
- {
- // interrupt 中断
- (*it)->interrupt();
- }
- };
- size_t size() const
- {
- boost::shared_lock<boost::shared_mutex> guard(m_mutex);
- return m_threads.size();
- };
- private:
- std::list<thread *> m_threads;
- mutable boost::shared_mutex m_mutex;
- };
- template <class F>
- class thread_body_wrapper
- {
- private:
- F d_f;
- std::string d_name;
- public:
- explicit thread_body_wrapper(F f, const std::string& name = "") : d_f(f), d_name(name)
- {
- printf("第二步:thread_body_wrapper的构造, name=%s\n", d_name.c_str());
- }
- void operator()()
- {
- // d_f();
- printf("第三步:thread_body_wrapper的()操作符运算, name=%s\n", d_name.c_str());
- try {
- d_f();
- } catch (boost::thread_interrupted const&) {
- std::cerr << "thread[" << d_name << "]: " << "中断了:interrupted" << std::endl;
- } catch (std::exception const& e) {
- std::cerr << "thread[" << d_name << "]: " << e.what() << std::endl;
- } catch (...) {
- std::cerr << "thread[" << d_name << "]: "
- << "caught unrecognized exception\n";
- }
- }
- };
- class tpb_thread_body
- {
- public:
- tpb_thread_body(std::string name,
- barrier_sptr start_sync){
- printf("第五步:tpb_thread_body的构造(里面有死循环,子线程开始等待wait()了),name=%s\n", name.c_str());
- start_sync->wait(); // 所有的线程都到这 才一起走 同步的核心
- while(1){
- // std::cout << "运行:" + name << std::endl;
- printf("线程体运行:%s\n", name.c_str());
- sleep(1);
- // std::this_thread::sleep_for(chrono::milliseconds(1000));
- }
- };
- ~tpb_thread_body(){};
- };
- class tpb_container
- {
- std::string d_name;
- barrier_sptr d_start_sync;
- public:
- tpb_container(std::string name,
- barrier_sptr start_sync)
- : d_name(name), d_start_sync(start_sync)
- {
- printf("第一步:tpb_container的构造,name=%s\n", d_name.c_str());
- }
- void operator()()
- {
- printf("第四步:tpb_container的()操作符运算,name=%s\n", d_name.c_str());
- tpb_thread_body body(d_name, d_start_sync);
- }
- };
- int main()
- {
- // gr的多线程同步控制boost::barrier同步点(主线程控制所有子线程的同步)
- int thread_num = 3;
- std::string thread_name;
- // 为什么要 子线程数+1 是主线程吗? 是的
- barrier_sptr start_sync =
- boost::make_shared<barrier>(thread_num + 1);
- thread_group d_threads; // 线程池
- for (int i = 0; i < thread_num; ++i)
- {
- thread_name = "thread_name=" + std::to_string(i);
- d_threads.create_thread(thread_body_wrapper<tpb_container>(
- tpb_container("tpb_container:"+thread_name, start_sync), "thread_body_wrapper:"+thread_name));
- }
- printf("主线程开始等待了wait()\n");
- start_sync->wait(); // 主线程的 只有到这 所有子线程才会都开始跑起来
- // d_threads.join_all(); // 所有子线程 阻塞主线程
- sleep(3);
- d_threads.interrupt_all();
- printf("主线程指令: 所有的子线程中断了, 没起作用?\n");
- sleep(5);
- return 0;
- }
复制代码
|
|