东方耀AI技术分享

 找回密码
 立即注册

QQ登录

只需一步,快速开始

搜索
热搜: 活动 交友 discuz
查看: 1254|回复: 1
打印 上一主题 下一主题

[C/C++] gr的多线程同步控制boost::barrier同步点(主线程控制所有子线...

[复制链接]

1365

主题

1856

帖子

1万

积分

管理员

Rank: 10Rank: 10Rank: 10

积分
14446
QQ
跳转到指定楼层
楼主
发表于 2022-8-30 10:21:45 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式





gr的多线程同步控制boost::barrier同步点(主线程控制所有子线程的同步)




CMakeLists.txt的配置:
  1. find_package(Boost)
  2. #  Found Boost: /usr/local/include (found version "1.69.0")
  3. target_link_libraries(main -lboost_system -lboost_thread -lpthread)
复制代码


  1. #include <iostream>
  2. #include <list>
  3. #include <string>
  4. #include <unistd.h>

  5. #include <boost/shared_ptr.hpp>
  6. #include <boost/make_shared.hpp>
  7. #include <boost/function.hpp>
  8. #include <boost/utility.hpp>

  9. #include <boost/thread/barrier.hpp>
  10. #include <boost/thread/condition_variable.hpp>
  11. #include <boost/thread/locks.hpp>
  12. #include <boost/thread/mutex.hpp>
  13. #include <boost/thread/thread.hpp>
  14. #include <boost/thread/shared_mutex.hpp>

  15. typedef boost::thread thread;
  16. typedef boost::mutex mutex;
  17. typedef boost::unique_lock<boost::mutex> scoped_lock;
  18. typedef boost::condition_variable condition_variable;
  19. typedef boost::barrier barrier;
  20. // boost::barrier是多线程的同步点,构造函数会要求传递一个数字,表示需要同步多少个线程
  21. typedef boost::shared_ptr<barrier> barrier_sptr;

  22. class thread_group : public boost::noncopyable
  23. {
  24. public:
  25.     thread_group(){};
  26.     ~thread_group()
  27.     {
  28.         for (std::list<thread *>::iterator it = m_threads.begin();
  29.              it != m_threads.end();
  30.              ++it)
  31.         {
  32.             delete (*it);
  33.         }
  34.     };

  35.     thread *create_thread(const boost::function0<void> &threadfunc)
  36.     {
  37.         // 是不是这里只能用new的 堆空间的地址
  38.         std::unique_ptr<thread> thrd(new thread(threadfunc));
  39.         // thrd是智能指针, thrd.get()是裸指针,
  40.         add_thread(thrd.get());
  41.         // 没明白为什么要 release()
  42.         // 如果使用智能指针而没有调用Release () ,它会自动释放,不会资源泄漏
  43.         // release只是切断了联系, release返回的指针通常用来初始化另一个智能指针,或给智能指针赋值
  44.         return thrd.release();
  45.     };
  46.     void add_thread(thread *thrd)
  47.     {
  48.         // 为什么要锁: 同时来创建线程? add_thread
  49.         boost::lock_guard<boost::shared_mutex> guard(m_mutex);

  50.         // For now we'll simply ignore requests to add a thread object
  51.         // multiple times. Should we consider this an error and either
  52.         // throw or return an error value?
  53.         std::list<thread *>::iterator it =
  54.             std::find(m_threads.begin(), m_threads.end(), thrd);
  55.         BOOST_ASSERT(it == m_threads.end());
  56.         if (it == m_threads.end())
  57.             m_threads.push_back(thrd);
  58.     };
  59.     void remove_thread(thread *thrd)
  60.     {
  61.         boost::lock_guard<boost::shared_mutex> guard(m_mutex);

  62.         // For now we'll simply ignore requests to remove a thread
  63.         // object that's not in the group. Should we consider this an
  64.         // error and either throw or return an error value?
  65.         std::list<thread *>::iterator it =
  66.             std::find(m_threads.begin(), m_threads.end(), thrd);
  67.         BOOST_ASSERT(it != m_threads.end());
  68.         if (it != m_threads.end())
  69.             m_threads.erase(it);
  70.     };
  71.     void join_all()
  72.     {
  73.         boost::shared_lock<boost::shared_mutex> guard(m_mutex);
  74.         for (std::list<thread *>::iterator it = m_threads.begin();
  75.              it != m_threads.end();
  76.              ++it)
  77.         {
  78.             (*it)->join();
  79.         }
  80.     };
  81.     void interrupt_all()
  82.     {
  83.         boost::shared_lock<boost::shared_mutex> guard(m_mutex);
  84.         for (std::list<thread *>::iterator it = m_threads.begin(),
  85.                                                   end = m_threads.end();
  86.              it != end;
  87.              ++it)
  88.         {
  89.             // interrupt 中断
  90.             (*it)->interrupt();
  91.         }
  92.     };
  93.     size_t size() const
  94.     {
  95.         boost::shared_lock<boost::shared_mutex> guard(m_mutex);
  96.         return m_threads.size();
  97.     };

  98. private:
  99.     std::list<thread *> m_threads;
  100.     mutable boost::shared_mutex m_mutex;
  101. };



  102. template <class F>
  103. class thread_body_wrapper
  104. {
  105. private:
  106.     F d_f;
  107.     std::string d_name;

  108. public:
  109.     explicit thread_body_wrapper(F f, const std::string& name = "") : d_f(f), d_name(name)
  110.     {
  111.         printf("第二步:thread_body_wrapper的构造, name=%s\n", d_name.c_str());
  112.     }

  113.     void operator()()
  114.     {

  115.         // d_f();

  116.         printf("第三步:thread_body_wrapper的()操作符运算, name=%s\n", d_name.c_str());

  117.         try {
  118.             d_f();
  119.         } catch (boost::thread_interrupted const&) {
  120.             std::cerr << "thread[" << d_name << "]: " << "中断了:interrupted" << std::endl;
  121.         } catch (std::exception const& e) {
  122.             std::cerr << "thread[" << d_name << "]: " << e.what() << std::endl;
  123.         } catch (...) {
  124.             std::cerr << "thread[" << d_name << "]: "
  125.                       << "caught unrecognized exception\n";
  126.         }
  127.     }
  128. };



  129. class tpb_thread_body
  130. {

  131. public:
  132.     tpb_thread_body(std::string name,
  133.                     barrier_sptr start_sync){

  134.                         printf("第五步:tpb_thread_body的构造(里面有死循环,子线程开始等待wait()了),name=%s\n", name.c_str());
  135.                         start_sync->wait();  // 所有的线程都到这 才一起走 同步的核心

  136.                         while(1){
  137.                             // std::cout << "运行:" + name << std::endl;
  138.                             printf("线程体运行:%s\n", name.c_str());
  139.                             sleep(1);
  140.                             // std::this_thread::sleep_for(chrono::milliseconds(1000));
  141.                         }
  142.                     };
  143.     ~tpb_thread_body(){};
  144. };




  145. class tpb_container
  146. {
  147.     std::string d_name;
  148.     barrier_sptr d_start_sync;

  149. public:
  150.     tpb_container(std::string name,
  151.                   barrier_sptr start_sync)
  152.         : d_name(name), d_start_sync(start_sync)
  153.     {
  154.         printf("第一步:tpb_container的构造,name=%s\n", d_name.c_str());
  155.     }

  156.     void operator()()
  157.     {
  158.         printf("第四步:tpb_container的()操作符运算,name=%s\n", d_name.c_str());
  159.         tpb_thread_body body(d_name, d_start_sync);
  160.     }
  161. };



  162. int main()
  163. {
  164.     // gr的多线程同步控制boost::barrier同步点(主线程控制所有子线程的同步)
  165.     int thread_num = 3;
  166.     std::string thread_name;

  167.     // 为什么要 子线程数+1 是主线程吗? 是的
  168.     barrier_sptr start_sync =
  169.         boost::make_shared<barrier>(thread_num + 1);

  170.     thread_group d_threads;  // 线程池

  171.     for (int i = 0; i < thread_num; ++i)
  172.     {
  173.         thread_name = "thread_name=" + std::to_string(i);
  174.         d_threads.create_thread(thread_body_wrapper<tpb_container>(
  175.             tpb_container("tpb_container:"+thread_name, start_sync), "thread_body_wrapper:"+thread_name));
  176.     }
  177.     printf("主线程开始等待了wait()\n");
  178.     start_sync->wait();   // 主线程的 只有到这 所有子线程才会都开始跑起来

  179.     // d_threads.join_all();  // 所有子线程 阻塞主线程

  180.     sleep(3);

  181.     d_threads.interrupt_all();
  182.     printf("主线程指令: 所有的子线程中断了, 没起作用?\n");

  183.     sleep(5);

  184.     return 0;
  185. }
复制代码






让天下人人学会人工智能!人工智能的前景一片大好!
回复

使用道具 举报

1365

主题

1856

帖子

1万

积分

管理员

Rank: 10Rank: 10Rank: 10

积分
14446
QQ
沙发
 楼主| 发表于 2022-8-30 10:30:33 | 只看该作者
boost::barrier这个类,非常简单的实现,简单说就是:计数等待
让天下人人学会人工智能!人工智能的前景一片大好!
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

QQ|Archiver|手机版|小黑屋|人工智能工程师的摇篮 ( 湘ICP备2020019608号-1 )

GMT+8, 2024-6-2 02:12 , Processed in 0.170820 second(s), 18 queries .

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表