求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Modeler   Code  
会员   
要资料
 
追随技术信仰

随时听讲座
每天看新闻
 
 
C++并发编程(中文版)
前言
第1章 你好,C++的并发世界!
1.1 何谓并发
1.2 为什么使用并发?
1.3 C++中的并发和多线程
1.4 开始入门
1.5 本章总结
第2章 线程管理
2.1 线程管理的基础
2.2 向线程函数传递参数
2.3 转移线程所有权
2.4 运行时决定线程数量
2.5 识别线程
2.6 本章总结
第3章 线程间共享数据
3.1 共享数据带来的问题
3.2 使用互斥量保护共享数据
3.3 保护共享数据的替代设施
3.4 本章总结
第4章 同步并发操作
4.1 等待一个事件或其他条件
4.2 使用期望等待一次性事件
4.3 限定等待时间
4.4 使用同步操作简化代码
4.5 本章总结
第5章 C++内存模型和原子类型操作
5.1 内存模型基础
5.2 C++中的原子操作和原子类型
5.3 同步操作和强制排序
5.4 本章总结
第6章 基于锁的并发数据结构设计
6.1 为并发设计的意义何在?
6.2 基于锁的并发数据结构
6.3 基于锁设计更加复杂的数据结构
6.4 本章总结
第7章 无锁并发数据结构设计
7.1 定义和意义
7.2 无锁数据结构的例子
7.3 对于设计无锁数据结构的指导建议
7.4 本章总结
第8章 并发代码设计
8.1 线程间划分工作的技术
8.2 影响并发代码性能的因素
8.3 为多线程性能设计数据结构
8.4 设计并发代码的注意事项
8.5 在实践中设计并发代码
 

 
目录
在实践中设计并发代码
作者:Anthony Williams  译者:陈晓伟
7 次浏览
1次  

8.5 在实践中设计并发代码

  • 8.5.1 并行实现:std::for_each

  • 8.5.2 并行实现:std::find

  • 8.5.3 并行实现:std::partial_sum

8.5 在实践中设计并发代码

当为一个特殊的任务设计并发代码时,需要根据任务本身来考虑之前所提到的问题。为了展示以上的注意事项是如何应用的,我们将看一下在C++标准库中三个标准函数的并行实现。当你遇到问题时,这里的例子可以作为很好的参照。在有较大的并发任务进行辅助下,我们也将实现一些函数。

我主要演示这些实现使用的技术,不过可能这些技术并不是最先进的;更多优秀的实现可以更好的利用硬件并发,不过这些实现可能需要到与并行算法相关的学术文献,或者是多线程的专家库中(比如:Inter的TBB[4])才能看到。

并行版的std::for_each可以看作为能最直观体现并行概念,就让我们从并行版的std::for_each开始吧!

8.5.1 并行实现:std::for_each

std::for_each的原理很简单:其对某个范围中的元素,依次调用用户提供的函数。并行和串行调用的最大区别就是函数的调用顺序。std::for_each是对范围中的第一个元素调用用户函数,接着是第二个,以此类推,而在并行实现中对于每个元素的处理顺序就不能保证了,并且它们可能(我们希望如此)被并发的处理。

为了实现这个函数的并行版本,需要对每个线程上处理的元素进行划分。你事先知道元素数量,所以可以处理前对数据进行划分(详见8.1.1节)。假设只有并行任务运行,就可以使用std::thread::hardware_concurrency()来决定线程的数量。同样,这些元素都能被独立的处理,所以可以使用连续的数据块来避免伪共享(详见8.2.3节)。

这里的算法有点类似于并行版的std::accumulate(详见8.4.1节),不过比起计算每一个元素的加和,这里对每个元素仅仅使用了一个指定功能的函数。因为不需要返回结果,可以假设这可能会对简化代码,不过想要将异常传递给调用者,就需要使用std::packaged_task和std::future机制对线程中的异常进行转移。这里展示一个样本实现。

清单8.7 并行版std::for_each

  1. template<typename Iterator,typename Func>
  2. void parallel_for_each(Iterator first,Iterator last,Func f)
  3. {
  4. unsigned long const length=std::distance(first,last);
  5. if(!length)
  6. return;
  7. unsigned long const min_per_thread=25;
  8. unsigned long const max_threads=
  9. (length+min_per_thread-1)/min_per_thread;
  10. unsigned long const hardware_threads=
  11. std::thread::hardware_concurrency();
  12. unsigned long const num_threads=
  13. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
  14. unsigned long const block_size=length/num_threads;
  15. std::vector<std::future<void> > futures(num_threads-1); // 1
  16. std::vector<std::thread> threads(num_threads-1);
  17. join_threads joiner(threads);
  18. Iterator block_start=first;
  19. for(unsigned long i=0;i<(num_threads-1);++i)
  20. {
  21. Iterator block_end=block_start;
  22. std::advance(block_end,block_size);
  23. std::packaged_task<void(void)> task( // 2
  24. [=]()
  25. {
  26. std::for_each(block_start,block_end,f);
  27. });
  28. futures[i]=task.get_future();
  29. threads[i]=std::thread(std::move(task)); // 3
  30. block_start=block_end;
  31. }
  32. std::for_each(block_start,last,f);
  33. for(unsigned long i=0;i<(num_threads-1);++i)
  34. {
  35. futures[i].get(); // 4
  36. }
  37. }

 

代码结构与清单8.4的差不多。最重要的不同在于futures向量对std::future<void>类型①变量进行存储,因为工作线程不会返回值,并且简单的lambda函数会对block_start到block_end上的任务②执行f函数。这是为了避免传入线程的构造函数③。当工作线程不需要返回一个值时,调用futures[i].get()④只是提供检索工作线程异常的方法;如果不想把异常传递出去,就可以省略这一步。

实现并行std::accumulate的时候,使用std::async会简化代码;同样,parallel_for_each也可以使用std::async。实现如下所示。

清单8.8 使用std::async实现std::for_each

  1. template<typename Iterator,typename Func>
  2. void parallel_for_each(Iterator first,Iterator last,Func f)
  3. {
  4. unsigned long const length=std::distance(first,last);
  5. if(!length)
  6. return;
  7. unsigned long const min_per_thread=25;
  8. if(length<(2*min_per_thread))
  9. {
  10. std::for_each(first,last,f); // 1
  11. }
  12. else
  13. {
  14. Iterator const mid_point=first+length/2;
  15. std::future<void> first_half= // 2
  16. std::async(&parallel_for_each<Iterator,Func>,
  17. first,mid_point,f);
  18. parallel_for_each(mid_point,last,f); // 3
  19. first_half.get(); // 4
  20. }
  21. }

 

和基于std::async的parallel_accumulate(清单8.5)一样,是在运行时对数据进行迭代划分的,而非在执行前划分好,这是因为你不知道你的库需要使用多少个线程。像之前一样,当你将每一级的数据分成两部分,异步执行另外一部分②,剩下的部分就不能再进行划分了,所以直接运行这一部分③;这样就可以直接对std::for_each①进行使用了。这里再次使用std::async和std::future的get()成员函数④来提供对异常的传播。

回到算法,函数需要对每一个元素执行同样的操作(这样的操作有很多种,初学者可能会想到std::count和std::replace),一个稍微复杂一些的例子就是使用std::find。

8.5.2 并行实现:std::find

接下来是std::find算法,因为这是一种不需要对数据元素做任何处理的算法。比如,当第一个元素就满足查找标准,那就没有必要对其他元素进行搜索了。将会看到,算法属性对于性能具有很大的影响,并且对并行实现的设计有着直接的影响。这个算法是一个很特别的例子,数据访问模式都会对代码的设计产生影响(详见8.3.2节)。该类中的另一些算法包括std::equal和std::any_of。

当你和妻子或者搭档,在一个纪念盒中找寻一张老照片,当找到这张照片时,就不会再看另外的照片了。不过,你得让其他人知道你已经找到照片了(比如,大喊一声“找到了!”),这样其他人就会停止搜索了。很多算法的特性就是要对每一个元素进行处理,所以它们没有办法像std::find一样,一旦找到合适数据就停止执行。因此,你需要设计代码对其进行使用——当得到想要的答案就中断其他任务的执行,所以不能等待线程处理对剩下的元素进行处理。

如果不中断其他线程,那么串行版本的性能可能会超越并行版,因为串行算法可以在找到匹配元素的时候,停止搜索并返回。如果系统能支持四个并发线程,那么每个线程就可以对总数据量的1/4进行检查,并且在我们的实现只需要单核完成的1/4的时间,就能完成对所有元素的查找。如果匹配的元素在第一个1/4块中,串行算法将会返回第一个,因为算法不需要对剩下的元素进行处理了。

一种办法,中断其他线程的一个办法就是使用一个原子变量作为一个标识,在处理过每一个元素后就对这个标识进行检查。如果标识被设置,那么就有线程找到了匹配元素,所以算法就可以停止并返回了。用这种方式来中断线程,就可以将那些没有处理的数据保持原样,并且在更多的情况下,相较于串行方式,性能能提升很多。缺点就是,加载原子变量是一个很慢的操作,会阻碍每个线程的运行。

如何返回值和传播异常呢?现在你有两个选择。你可以使用一个future数组,使用std::packaged_task来转移值和异常,在主线程上对返回值和异常进行处理;或者使用std::promise对工作线程上的最终结果直接进行设置。这完全依赖于你想怎么样处理工作线程上的异常。如果想停止第一个异常(即使还没有对所有元素进行处理),就可以使用std::promise对异常和最终值进行设置。另外,如果想要让其他工作线程继续查找,可以使用std::packaged_task来存储所有的异常,当线程没有找到匹配元素时,异常将再次抛出。

这种情况下,我会选择std::promise,因为其行为和std::find更为接近。这里需要注意一下搜索的元素是不是在提供的搜索范围内。因此,在所有线程结束前,获取future上的结果。如果被future阻塞住,所要查找的值不在范围内,就会持续的等待下去。实现代码如下。

清单8.9 并行find算法实现

  1. template<typename Iterator,typename MatchType>
  2. Iterator parallel_find(Iterator first,Iterator last,MatchType match)
  3. {
  4. struct find_element // 1
  5. {
  6. void operator()(Iterator begin,Iterator end,
  7. MatchType match,
  8. std::promise<Iterator>* result,
  9. std::atomic<bool>* done_flag)
  10. {
  11. try
  12. {
  13. for(;(begin!=end) && !done_flag->load();++begin) // 2
  14. {
  15. if(*begin==match)
  16. {
  17. result->set_value(begin); // 3
  18. done_flag->store(true); // 4
  19. return;
  20. }
  21. }
  22. }
  23. catch(...) // 5
  24. {
  25. try
  26. {
  27. result->set_exception(std::current_exception()); // 6
  28. done_flag->store(true);
  29. }
  30. catch(...) // 7
  31. {}
  32. }
  33. }
  34. };
  35. unsigned long const length=std::distance(first,last);
  36. if(!length)
  37. return last;
  38. unsigned long const min_per_thread=25;
  39. unsigned long const max_threads=
  40. (length+min_per_thread-1)/min_per_thread;
  41. unsigned long const hardware_threads=
  42. std::thread::hardware_concurrency();
  43. unsigned long const num_threads=
  44. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
  45. unsigned long const block_size=length/num_threads;
  46. std::promise<Iterator> result; // 8
  47. std::atomic<bool> done_flag(false); // 9
  48. std::vector<std::thread> threads(num_threads-1);
  49. { // 10
  50. join_threads joiner(threads);
  51. Iterator block_start=first;
  52. for(unsigned long i=0;i<(num_threads-1);++i)
  53. {
  54. Iterator block_end=block_start;
  55. std::advance(block_end,block_size);
  56. threads[i]=std::thread(find_element(), // 11
  57. block_start,block_end,match,
  58. &result,&done_flag);
  59. block_start=block_end;
  60. }
  61. find_element()(block_start,last,match,&result,&done_flag); // 12
  62. }
  63. if(!done_flag.load()) //13
  64. {
  65. return last;
  66. }
  67. return result.get_future().get(); // 14
  68. }

 

清单8.9中的函数主体与之前的例子相似。这次,由find_element类①的函数调用操作实现,来完成查找工作的。循环通过在给定数据块中的元素,检查每一步上的标识②。如果匹配的元素被找到,就将最终的结果设置到promise③当中,并且在返回前对done_flag④进行设置。

如果有一个异常被抛出,那么它就会被通用处理代码⑤捕获,并且在promise⑥尝中试存储前,对done_flag进行设置。如果对应promise已经被设置,设置在promise上的值可能会抛出一个异常,所以这里⑦发生的任何异常,都可以捕获并丢弃。

这意味着,当线程调用find_element查询一个值,或者抛出一个异常时,如果其他线程看到done_flag被设置,那么其他线程将会终止。如果多线程同时找到匹配值或抛出异常,它们将会对promise产生竞争。不过,这是良性的条件竞争;因为,成功的竞争者会作为“第一个”返回线程,因此这个结果可以接受。

回到parallel_find函数本身,其拥有用来停止搜索的promise⑧和标识⑨;随着对范围内的元素的查找⑪,promise和标识会传递到新线程中。主线程也使用find_element来对剩下的元素进行查找⑫。像之前提到的,需要在全部线程结束前,对结果进行检查,因为结果可能是任意位置上的匹配元素。这里将“启动-汇入”代码放在一个块中⑩,所以所有线程都会在找到匹配元素时⑬进行汇入。如果找到匹配元素,就可以调用std::future<Iterator>(来自promise⑭)的成员函数get()来获取返回值或异常。

不过,这里假设你会使用硬件上所有可用的的并发线程,或使用其他机制对线程上的任务进行提前划分。就像之前一样,可以使用std::async,以及递归数据划分的方式来简化实现(同时使用C++标准库中提供的自动缩放工具)。使用std::async的parallel_find实现如下所示。

清单8.10 使用std::async实现的并行find算法

  1. template<typename Iterator,typename MatchType> // 1
  2. Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,
  3. std::atomic<bool>& done)
  4. {
  5. try
  6. {
  7. unsigned long const length=std::distance(first,last);
  8. unsigned long const min_per_thread=25; // 2
  9. if(length<(2*min_per_thread)) // 3
  10. {
  11. for(;(first!=last) && !done.load();++first) // 4
  12. {
  13. if(*first==match)
  14. {
  15. done=true; // 5
  16. return first;
  17. }
  18. }
  19. return last; // 6
  20. }
  21. else
  22. {
  23. Iterator const mid_point=first+(length/2); // 7
  24. std::future<Iterator> async_result=
  25. std::async(&parallel_find_impl<Iterator,MatchType>, // 8
  26. mid_point,last,match,std::ref(done));
  27. Iterator const direct_result=
  28. parallel_find_impl(first,mid_point,match,done); // 9
  29. return (direct_result==mid_point)?
  30. async_result.get():direct_result; // 10
  31. }
  32. }
  33. catch(...)
  34. {
  35. done=true; // 11
  36. throw;
  37. }
  38. }
  39. template<typename Iterator,typename MatchType>
  40. Iterator parallel_find(Iterator first,Iterator last,MatchType match)
  41. {
  42. std::atomic<bool> done(false);
  43. return parallel_find_impl(first,last,match,done); // 12
  44. }

如果想要在找到匹配项时结束,就需要在线程之间设置一个标识来表明匹配项已经被找到。因此,需要将这个标识递归的传递。通过函数①的方式来实现是最简单的办法,只需要增加一个参数——一个done标识的引用,这个表示通过程序的主入口点传入⑫。

核心实现和之前的代码一样。通常函数的实现中,会让单个线程处理最少的数据项②;如果数据块大小不足于分成两半,就要让当前线程完成所有的工作了③。实际算法在一个简单的循环当中(给定范围),直到在循环到指定范围中的最后一个,或找到匹配项,并对标识进行设置④。如果找到匹配项,标识done就会在返回前进行设置⑤。无论是因为已经查找到最后一个,还是因为其他线程对done进行了设置,都会停止查找。如果没有找到,会将最后一个元素last进行返回⑥。

如果给定范围可以进行划分,首先要在st::async在对第二部分进行查找⑧前,要找数据中点⑦,而且需要使用std::ref将done以引用的方式传递。同时,可以通过对第一部分直接进行递归查找。两部分都是异步的,并且在原始范围过大时,直接递归查找的部分可能会再细化。

如果直接查找返回的是mid_point,这就意味着没有找到匹配项,所以就要从异步查找中获取结果。如果在另一半中没有匹配项的话,返回的结果就一定是last,这个值的返回就代表了没有找到匹配的元素⑩。如果“异步”调用被延迟(非真正的异步),那么实际上这里会运行get();这种情况下,如果对下半部分的元素搜索成功,那么就不会执行对上半部分元素的搜索了。如果异步查找真实的运行在其他线程上,那么async_result变量的析构函数将会等待该线程完成,所以这里不会有线程泄露。

像之前一样,std::async可以用来提供“异常-安全”和“异常-传播”特性。如果直接递归抛出异常,future的析构函数就能让异步执行的线程提前结束;如果异步调用抛出异常,那么这个异常将会通过对get()成员函数的调用进行传播⑩。使用try/catch块只能捕捉在done发生的异常,并且当有异常抛出⑪时,所有线程都能很快的终止运行。不过,不使用try/catch的实现依旧没问题,不同的就是要等待所有线程的工作是否完成。

实现中一个重要的特性就是,不能保证所有数据都能被std::find串行处理。其他并行算法可以借鉴这个特性,因为要让一个算法并行起来这是必须具有的特性。如果有顺序问题,元素就不能并发的处理了。如果每个元素独立,虽然对于parallel_for_each不是很重要,不过对于parallel_find,即使在开始部分已经找到了匹配元素,也有可能返回范围中最后一个元素;如果在知道结果的前提下,这样的结果会让人很惊讶。

OK,现在你已经使用了并行化的std::find。如在本节开始说的那样,其他相似算法不需要对每一个数据元素进行处理,并且同样的技术可以使用到这些类似的算法上去。我们将在第9章中看到“中断线程”的问题。

为了完成我们的并行“三重奏”,我们将换一个角度来看一下std::partial_sum。对于这个算法,没有太多的文献可参考,不过让这个算法并行起来是一件很有趣的事。

8.5.3 并行实现:std::partial_sum

std::partial_sum会计算给定范围中的每个元素,并用计算后的结果将原始序列中的值替换掉。比如,有一个序列[1,2,3,4,5],在执行该算法后会成为:[1,3(1+2),6(1+2+3),10(1+2+3+4),15(1+2+3+4+5)]。让这样一个算法并行起来会很有趣,因为这里不能讲任务分块,对每一块进行独立的计算。比如,原始序列中的第一个元素需要加到后面的一个元素中去。

确定某个范围部分和的一种的方式,就是在独立块中计算部分和,然后将第一块中最后的元素的值,与下一块中的所有元素进行相加,依次类推。如果有个序列[1,2,3,4,5,6,7,8,9],然后将其分为三块,那么在第一次计算后就能得到[{1,3,6},{4,9,15},{7,15,24}]。然后将6(第一块的最后一个元素)加到第二个块中,那么就得到[{1,3,6},{10,15,21},{7,15,24}]。然后再将第二块的最后一个元素21加到第三块中去,就得到[{1,3,6},{10,15,21},{28,36,55}]。

将原始数据分割成块,加上之前块的部分和就能够并行了。如果每个块中的末尾元素都是第一个被更新的,那么块中其他的元素就能被其他线程所更新,同时另一个线程对下一块进行更新,等等。当处理的元素比处理核心的个数多的时候,这样完成工作没问题,因为每一个核芯在每一个阶段都有合适的数据可以进行处理。

如果有很多的处理器(就是要比处理的元素个数多),那么之前的方式就无法正常工作了。如果还是将工作划分给每个处理器,那么在第一步就没必要去做了。这种情况下,传递结果就意味着让处理器进行等待,这时需要给这些处于等待中的处理器一些工作。所以,可以采用完全不同的方式来处理这个问题。比起将数据块中的最后一个元素的结果向后面的元素块传递,可以对部分结果进行传播:第一次与相邻的元素(距离为1)相加和(和之前一样),之后和距离为2的元素相加,在后来和距离为4的元素相加,以此类推。比如,初始序列为[1,2,3,4,5,6,7,8,9],第一次后为[1,3,5,7,9,11,13,15,17],第二次后为[1,3,6,10,14,18, 22,26,30],下一次就要隔4个元素了。第三次后[1, 3, 6, 10, 15, 21, 28, 36, 44],下一次就要隔8个元素了。第四次后[1, 3, 6, 10, 15, 21, 28, 36, 45],这就是最终的结果。虽然,比起第一种方法多了很多步骤,不过在可并发平台下,这种方法提高了并行的可行性;每个处理器可在每一步中处理一个数据项。

总体来说,当有N个操作时(每步使用一个处理器)第二种方法需要log(N)[底为2]步;在本节中,N就相当于数据链表的长度。比起第一种,每个线程对分配块做N/k个操作,然后在做N/k次结果传递(这里的k是线程的数量)。因此,第一种方法的时间复杂度为O(N),不过第二种方法的时间复杂度为Q(Nlog(N))。当数据量和处理器数量相近时,第二种方法需要每个处理器上log(N)个操作,第一种方法中每个处理器上执行的操作数会随着k的增加而增多,因为需要对结果进行传递。对于处理单元较少的情况,第一种方法会比较合适;对于大规模并行系统,第二种方法比较合适。

不管怎么样,先将效率问题放一边,让我们来看一些代码。下面清单实现的,就是第一种方法。

清单8.11 使用划分的方式来并行的计算部分和

  1. template<typename Iterator>
  2. void parallel_partial_sum(Iterator first,Iterator last)
  3. {
  4. typedef typename Iterator::value_type value_type;
  5. struct process_chunk // 1
  6. {
  7. void operator()(Iterator begin,Iterator last,
  8. std::future<value_type>* previous_end_value,
  9. std::promise<value_type>* end_value)
  10. {
  11. try
  12. {
  13. Iterator end=last;
  14. ++end;
  15. std::partial_sum(begin,end,begin); // 2
  16. if(previous_end_value) // 3
  17. {
  18. value_type& addend=previous_end_value->get(); // 4
  19. *last+=addend; // 5
  20. if(end_value)
  21. {
  22. end_value->set_value(*last); // 6
  23. }
  24. std::for_each(begin,last,[addend](value_type& item) // 7
  25. {
  26. item+=addend;
  27. });
  28. }
  29. else if(end_value)
  30. {
  31. end_value->set_value(*last); // 8
  32. }
  33. }
  34. catch(...) // 9
  35. {
  36. if(end_value)
  37. {
  38. end_value->set_exception(std::current_exception()); // 10
  39. }
  40. else
  41. {
  42. throw; // 11
  43. }
  44. }
  45. }
  46. };
  47. unsigned long const length=std::distance(first,last);
  48. if(!length)
  49. return last;
  50. unsigned long const min_per_thread=25; // 12
  51. unsigned long const max_threads=
  52. (length+min_per_thread-1)/min_per_thread;
  53. unsigned long const hardware_threads=
  54. std::thread::hardware_concurrency();
  55. unsigned long const num_threads=
  56. std::min(hardware_threads!=0?hardware_threads:2,max_threads);
  57. unsigned long const block_size=length/num_threads;
  58. typedef typename Iterator::value_type value_type;
  59. std::vector<std::thread> threads(num_threads-1); // 13
  60. std::vector<std::promise<value_type> >
  61. end_values(num_threads-1); // 14
  62. std::vector<std::future<value_type> >
  63. previous_end_values; // 15
  64. previous_end_values.reserve(num_threads-1); // 16
  65. join_threads joiner(threads);
  66. Iterator block_start=first;
  67. for(unsigned long i=0;i<(num_threads-1);++i)
  68. {
  69. Iterator block_last=block_start;
  70. std::advance(block_last,block_size-1); // 17
  71. threads[i]=std::thread(process_chunk(), // 18
  72. block_start,block_last,
  73. (i!=0)?&previous_end_values[i-1]:0,
  74. &end_values[i]);
  75. block_start=block_last;
  76. ++block_start; // 19
  77. previous_end_values.push_back(end_values[i].get_future()); // 20
  78. }
  79. Iterator final_element=block_start;
  80. std::advance(final_element,std::distance(block_start,last)-1); // 21
  81. process_chunk()(block_start,final_element, // 22
  82. (num_threads>1)?&previous_end_values.back():0,
  83. 0);
  84. }

 

这个实现中,使用的结构体和之前算法中的一样,将问题进行分块解决,每个线程处理最小的数据块⑫。其中,有一组线程⑬和一组promise⑭,用来存储每块中的最后一个值;并且实现中还有一组future⑮,用来对前一块中的最后一个值进行检索。可以为future⑯做些储备,以避免生成新线程时,再分配内存。

主循环和之前一样,不过这次是让迭代器指向了每个数据块的最后一个元素,而不是作为一个普通值传递到最后⑰,这样就方便向其他块传递当前块的最后一个元素了。实际处理是在process_chunk函数对象中完成的,这个结构体看上去不是很长;当前块的开始和结束迭代器和前块中最后一个值的future一起,作为参数进行传递,并且promise用来保留当前范围内最后一个值的原始值⑱。

生成新的线程后,就对开始块的ID进行更新,别忘了传递最后一个元素⑲,并且将当前块的最后一个元素存储到future,上面的数据将在循环中再次使用到⑳。

在处理最后一个数据块前,需要获取之前数据块中最后一个元素的迭代器(21),这样就可以将其作为参数传入process_chunk(22)中了。std::partial_sum不会返回一个值,所以在最后一个数据块被处理后,就不用再做任何事情了。当所有线程的操作完成时,求部分和的操作也就算完成了。

OK,现在来看一下process_chunk函数对象①。对于整块的处理是始于对std::partial_sum的调用,包括对于最后一个值的处理②,不过得要知道当前块是否是第一块③。如果当前块不是第一块,就会有一个previous_end_value值从前面的块传过来,所以这里需要等待这个值的产生④。为了将算法最大程度的并行,首先需要对最后一个元素进行更新⑤,这样你就能将这个值传递给下一个数据块(如果有下一个数据块的话)⑥。当完成这个操作,就可以使用std::for_each和简单的lambda函数⑦对剩余的数据项进行更新。

如果previous_end_value值为空,当前数据块就是第一个数据块,所以只需要为下一个数据块更新end_value⑧(如果有下一个数据块的话——当前数据块可能是唯一的数据块)。

最后,如果有任意一个操作抛出异常,就可以将其捕获⑨,并且存入promise⑩,如果下一个数据块尝试获取前一个数据块的最后一个值④时,异常会再次抛出。处理最后一个数据块时,异常会全部重新抛出⑪,因为抛出动作一定会在主线程上进行。

因为线程间需要同步,这里的代码就不容易使用std::async重写。任务等待会让线程中途去执行其他的任务,所以所有的任务必须同时执行。

基于块,以传递末尾元素值的方法就介绍到这里,让我们来看一下第二种计算方式。

实现以2的幂级数为距离部分和算法

第二种算法通过增加距离的方式,让更多的处理器充分发挥作用。在这种情况下,没有进一步同步的必要了,因为所有中间结果都直接传递到下一个处理器上去了。不过,在实际中我们很少见到,单个处理器处理对一定数量的元素执行同一条指令,这种方式成为单指令-多数据流(SIMD)。因此,代码必须能处理通用情况,并且需要在每步上对线程进行显式同步。

完成这种功能的一种方式是使用栅栏(barrier)——一种同步机制:只有所有线程都到达栅栏处,才能进行之后的操作;先到达的线程必须等待未到达的线程。C++11标准库没有直接提供这样的工具,所以你得自行设计一个。

试想游乐场中的过山车。如果有适量的游客在等待,那么过山车管理员就要保证,在过山车启动前,每一个位置都得坐一个游客。栅栏的工作原理也一样:你已经知道了“座位”的数量,线程就是要等待所有“座位”都坐满。当等待线程够数,那么它们可以继续运行;这时,栅栏会重置,并且会让下一拨线程开始扥带。通常,会在循环中这样做,当同一个线程再次到达栅栏处,它会再次等待。这种方法是为了让线程同步,所以不会有线程在其他未完成的情况下,就去完成下一个任务。如果有线程提前执行,对于这样一个算法,就是一场灾难,因为提前出发的线程可能会修改要被其他线程使用到的数据,后面线程获取到的数据就不是正确数据了。

下面的代码就简单的实现了一个栅栏。

清单8.12 简单的栅栏类

  1. class barrier
  2. {
  3. unsigned const count;
  4. std::atomic<unsigned> spaces;
  5. std::atomic<unsigned> generation;
  6. public:
  7. explicit barrier(unsigned count_): // 1
  8. count(count_),spaces(count),generation(0)
  9. {}
  10. void wait()
  11. {
  12. unsigned const my_generation=generation; // 2
  13. if(!--spaces) // 3
  14. {
  15. spaces=count; // 4
  16. ++generation; // 5
  17. }
  18. else
  19. {
  20. while(generation==my_generation) // 6
  21. std::this_thread::yield(); // 7
  22. }
  23. }
  24. };

 

这个实现中,用一定数量的“座位”构造了一个barrier①,这个数量将会存储count变量中。起初,栅栏中的spaces与count数量相当。当有线程都在等待时,spaces的数量就会减少③。当spaces的数量减到0时,spaces的值将会重置为count④,并且generation变量会增加,以向线程发出信号,让这些等待线程能够继续运行⑤。如果spaces没有到达0,那么线程会继续等待。这个实现使用了一个简单的自旋锁⑥,对generation的检查会在wait()开始的时候进行②。因为generation只会在所有线程都到达栅栏的时候更新⑤,在等待的时候使用yield()⑦就不会让CPU处于忙等待的状态。

这个实现比较“简单”的真实意义:使用自旋等待的情况下,如果让线程等待很长时间就不会很理想,并且如果超过count数量的线程对wait()进行调用,这个实现就没有办法工作了。如果想要很好的处理这样的情况,必须使用一个更加健壮(更加复杂)的实现。我依旧坚持对原子变量操作顺序的一致性,因为这会让事情更加简单,不过有时还是需要放松这样的约束。全局同步对于大规模并行架构来说是消耗巨大的,因为相关处理器会穿梭于存储栅栏状态的缓存行中(可见8.2.2中对乒乓缓存的讨论),所以需要格外的小心,来确保使用的是最佳同步方法。

不论怎么样,这些都需要你考虑到;需要有固定数量的线程执行同步循环。好吧,大多数情况下线程数量都是固定的。你可能还记得,代码起始部分的几个数据项,只需要几步就能得到其最终值。这就意味着,无论是让所有线程循环处理范围内的所有元素,还是让栅栏来同步线程,都会递减count的值。我会选择后者,因为其能避免线程做不必要的工作,仅仅是等待最终步骤完成。

这意味着你要将count改为一个原子变量,这样在多线程对其进行更新的时候,就不需要添加额外的同步:

  1. std::atomic<unsigned> count;

初始化保持不变,不过当spaces的值被重置后,你需要显式的对count进行load()操作:

  1. spaces=count.load();

这就是要对wait()函数的改动;现在需要一个新的成员函数来递减count。这个函数命名为done_waiting(),因为当一个线程完成其工作,并在等待的时候,才能对其进行调用它:

  1. void done_waiting()
  2. {
  3. --count; // 1
  4. if(!--spaces) // 2
  5. {
  6. spaces=count.load(); // 3
  7. ++generation;
  8. }
  9. }

 

实现中,首先要减少count①,所以下一次spaces将会被重置为一个较小的数。然后,需要递减spaces的值②。如果不做这些操作,有些线程将会持续等待,因为spaces被旧的count初始化,大于期望值。一组当中最后一个线程需要对计数器进行重置,并且递增generation的值③,就像在wait()里面做的那样。最重要的区别:最后一个线程不需要等待。当最后一个线程结束,整个等待也就随之结束!

现在就准备开始写部分和的第二个实现吧。在每一步中,每一个线程都在栅栏出调用wait(),来保证线程所处步骤一致,并且当所有线程都结束,那么最后一个线程会调用done_waiting()来减少count的值。如果使用两个缓存对原始数据进行保存,栅栏也可以提供你所需要的同步。每一步中,线程都会从原始数据或是缓存中读取数据,并且将新值写入对应位置。如果有线程先从原始数据处获取数据,那下一步就从缓存上获取数据(或相反)。这就能保证在读与写都是由独立线程完成,并不存在条件竞争。当线程结束等待循环,就能保证正确的值最终被写入到原始数据当中。下面的代码就是这样的实现。

清单8.13 通过两两更新对的方式实现partial_sum

  1. struct barrier
  2. {
  3. std::atomic<unsigned> count;
  4. std::atomic<unsigned> spaces;
  5. std::atomic<unsigned> generation;
  6. barrier(unsigned count_):
  7. count(count_),spaces(count_),generation(0)
  8. {}
  9. void wait()
  10. {
  11. unsigned const gen=generation.load();
  12. if(!--spaces)
  13. {
  14. spaces=count.load();
  15. ++generation;
  16. }
  17. else
  18. {
  19. while(generation.load()==gen)
  20. {
  21. std::this_thread::yield();
  22. }
  23. }
  24. }
  25. void done_waiting()
  26. {
  27. --count;
  28. if(!--spaces)
  29. {
  30. spaces=count.load();
  31. ++generation;
  32. }
  33. }
  34. };
  35. template<typename Iterator>
  36. void parallel_partial_sum(Iterator first,Iterator last)
  37. {
  38. typedef typename Iterator::value_type value_type;
  39. struct process_element // 1
  40. {
  41. void operator()(Iterator first,Iterator last,
  42. std::vector<value_type>& buffer,
  43. unsigned i,barrier& b)
  44. {
  45. value_type& ith_element=*(first+i);
  46. bool update_source=false;
  47. for(unsigned step=0,stride=1;stride<=i;++step,stride*=2)
  48. {
  49. value_type const& source=(step%2)? // 2
  50. buffer[i]:ith_element;
  51. value_type& dest=(step%2)?
  52. ith_element:buffer[i];
  53. value_type const& addend=(step%2)? // 3
  54. buffer[i-stride]:*(first+i-stride);
  55. dest=source+addend; // 4
  56. update_source=!(step%2);
  57. b.wait(); // 5
  58. }
  59. if(update_source) // 6
  60. {
  61. ith_element=buffer[i];
  62. }
  63. b.done_waiting(); // 7
  64. }
  65. };
  66. unsigned long const length=std::distance(first,last);
  67. if(length<=1)
  68. return;
  69. std::vector<value_type> buffer(length);
  70. barrier b(length);
  71. std::vector<std::thread> threads(length-1); // 8
  72. join_threads joiner(threads);
  73. Iterator block_start=first;
  74. for(unsigned long i=0;i<(length-1);++i)
  75. {
  76. threads[i]=std::thread(process_element(),first,last, // 9
  77. std::ref(buffer),i,std::ref(b));
  78. }
  79. process_element()(first,last,buffer,length-1,b); // 10
  80. }

 

代码的整体结构应该不用说了。process_element类有函数调用操作可以用来做具体的工作①,就是运行一组线程⑨,并将线程存储到vector中⑧,同样还需要在主线程中对其进行调用⑩。这里与之前最大的区别就是,线程的数量是根据列表中的数据量来定的,而非根据std::thread::hardware_concurrency。如我之前所说,除非你使用的是一个大规模并行的机器,因为这上面的线程都十分廉价(虽然这样的方式并不是很好),还能为我们展示了其整体结构。这个结构在有较少线程的时候,每一个线程只能处理源数据中的部分数据,当没有足够的线程支持该结构时,效率要比传递算法低。

不管怎样,主要的工作都是调用process_element的函数操作符来完成的。每一步,都会从原始数据或缓存中获取第i个元素②,并且将获取到的元素加到指定stride的元素中去③,如果从原始数据开始读取的元素,加和后的数需要存储在缓存中④。然后,在开始下一步前,会在栅栏处等待⑤。当stride超出了给定数据的范围,当最终结果已经存在缓存中时,就需要更新原始数据中的数据,同样这也意味着本次加和结束。最后,在调用栅栏中的done_waiting()函数⑦。

注意这个解决方案并不是异常安全的。如果某个线程在process_element执行时抛出一个异常,其就会终止整个应用。这里可以使用一个std::promise来存储异常,就像在清单8.9中parallel_find的实现,或仅使用一个被互斥量保护的std::exception_ptr即可。

总结下这三个例子。希望其能保证我们了解8.1、8.2、8.3和8.4节中提到的设计考量,并且证明了这些技术在真实的代码中,需要承担些什么责任。


您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码: 验证码,看不清楚?请点击刷新验证码 必填



7 次浏览
1次
下一页