PLINQ 和 TPL 的自定义分区程序

若要并行化对数据源的作,关键步骤之一是将源 分区 为多个分区,这些分区可由多个线程并发访问。 PLINQ 和任务并行库 (TPL) 提供在编写并行查询或 ForEach 循环时透明工作的默认分区程序。 对于更高级的方案,可以插入自己的分区程序。

分区类型

可通过多种方式对数据源进行分区。 在最有效的方法中,多个线程合作处理原始源序列,而不是以物理方式将源分隔为多个子序列。 对于数组和其他索引源(例如 IList 提前已知长度的集合), 范围分区 是最简单的分区类型。 每个线程都会收到唯一起始和结束索引,这样就可以处理范围内的数据源,而又不会覆盖其他任何线程或被其他任何线程覆盖。 范围分区中涉及的唯一开销是创建范围的初始工作;之后无需进行其他同步。 因此,只要工作负荷均匀划分,它就可以提供良好的性能。 范围分区的缺点是,如果一个线程提前完成,它无法帮助其他线程完成其工作。

对于长度未知的链接列表或其他集合,可以使用 区块分区。 在区块分区中,并行循环或查询中的每个线程或任务使用一个区块中的一些源元素,处理它们,然后返回以检索其他元素。 分区程序可确保所有元素都是分布式的,并且没有重复项。 区块可为任意大小。 例如, How to: Implement Dynamic Partitions 中演示的分区程序创建只包含一个元素的区块。 只要区块不太大,这种分区就本质上是负载均衡的,因为将元素分配给线程不是预先确定的。 不过,每当线程需要获取其他区块时,分区程序就要承担一次同步开销。 在这些情况下产生的同步量与区块大小成反比。

通常情况下,范围分区只有在以下情况下才会更快:委托的执行时间为小到中等,数据源有大量元素,且每个分区的工作总量大致相等。 因此,在大多数情况下,区块分区通常更快。 在委托的元素数较少或执行时间较长的源上,区块和范围分区的性能大致相同。

TPL 分区程序还支持动态数量的分区。 这意味着它们可以动态创建分区,例如,循环 ForEach 生成新任务时。 此功能使分区器能够与循环本身一起缩放。 动态分区组件天生具有负载均衡能力。 创建自定义分区程序时,必须支持可通过 ForEach 循环使用的动态分区。

为 PLINQ 配置负载均衡分区程序

借助 Partitioner.Create 方法的一些重载,可以为数组或 IList 源创建分区程序,并指定是否应尝试均衡各线程的工作负载。 当分区程序配置为进行负载均衡时,将使用区块分区,并且元素会在请求时以小区块的形式分发给每个分区。 此方法有助于确保所有分区都有要处理的元素,直到整个循环或查询完成。 额外的重载可用于为任何 IEnumerable 源提供负载均衡分区。

通常,负载均衡要求分区相对频繁地从分区器请求元素。 相比之下,执行静态分区的分区程序可以使用范围或区块分区将元素一次性分配给每个分区程序。 这需要比负载均衡更少的开销,但如果一个线程最终完成的工作比其他线程多得多,则执行时间可能更长。 默认情况下,在传递 IList 或数组时,PLINQ 始终使用范围分区而不进行负载均衡。 若要为 PLINQ 启用负载均衡,请使用 Partitioner.Create 该方法,如以下示例所示。

// Static partitioning requires indexable source. Load balancing
// can use any IEnumerable.
var nums = Enumerable.Range(0, 100000000).ToArray();

// Create a load-balancing partitioner. Or specify false for static partitioning.
Partitioner<int> customPartitioner = Partitioner.Create(nums, true);

// The partitioner is the query's data source.
var q = from x in customPartitioner.AsParallel()
        select x * Math.PI;

q.ForAll((x) =>
{
    ProcessData(x);
});
' Static number of partitions requires indexable source.
Dim nums = Enumerable.Range(0, 100000000).ToArray()

' Create a load-balancing partitioner. Or specify false For  Shared partitioning.
Dim customPartitioner = Partitioner.Create(nums, True)

' The partitioner is the query's data source.
Dim q = From x In customPartitioner.AsParallel()
        Select x * Math.PI

q.ForAll(Sub(x) ProcessData(x))

确定在任何给定方案中是否使用负载均衡的最佳方式是试验和测量在具有代表性的负载和计算机配置下完成作所需的时间。 例如,静态分区可能会在只有少数核心的多核计算机上提供显著的加速,但可能会导致具有相对许多核心的计算机速度变慢。

下表列出了方法的 Create 可用重载。 这些分区程序不仅限于在 PLINQ 或 Task 中使用。 它们还可用于任何自定义并行构造。

过载 使用负载均衡
Create<TSource>(IEnumerable<TSource>) 始终
Create<TSource>(TSource[], Boolean) 将布尔参数指定为 true 时
Create<TSource>(IList<TSource>, Boolean) 将布尔参数指定为 true 时
Create(Int32, Int32) 从不
Create(Int32, Int32, Int32) 从不
Create(Int64, Int64) 从不
Create(Int64, Int64, Int64) 从不

在 Parallel.ForEach 中配置静态范围分区器

For 循环中,循环的主体作为委托提供给方法。 调用该委托的成本与虚拟方法调用的成本大致相同。 在某些情况下,并行循环的主体可能非常小,以至于每次循环迭代中委托调用的成本变得显著。 在这种情况下,可以使用 Create 重载之一,对数据源元素创建范围分区的 IEnumerable<T>。 然后,可以将此范围 ForEach 集合传递给主体由常规 for 循环组成的方法。 这种方法的优势在于,委托调用成本在每个范围内只产生一次,而不是每个元素都产生一次。 以下示例演示了基本模式。

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {

        // Source must be array or IList.
        var source = Enumerable.Range(0, 100000).ToArray();

        // Partition the entire source array.
        var rangePartitioner = Partitioner.Create(0, source.Length);

        double[] results = new double[source.Length];

        // Loop over the partitions in parallel.
        Parallel.ForEach(rangePartitioner, (range, loopState) =>
        {
            // Loop over each range element without a delegate invocation.
            for (int i = range.Item1; i < range.Item2; i++)
            {
                results[i] = source[i] * Math.PI;
            }
        });

        Console.WriteLine("Operation complete. Print results? y/n");
        char input = Console.ReadKey().KeyChar;
        if (input == 'y' || input == 'Y')
        {
            foreach(double d in results)
            {
                Console.Write("{0} ", d);
            }
        }
    }
}
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module PartitionDemo

    Sub Main()
        ' Source must be array or IList.
        Dim source = Enumerable.Range(0, 100000).ToArray()

        ' Partition the entire source array. 
        ' Let the partitioner size the ranges.
        Dim rangePartitioner = Partitioner.Create(0, source.Length)

        Dim results(source.Length - 1) As Double

        ' Loop over the partitions in parallel. The Sub is invoked
        ' once per partition.
        Parallel.ForEach(rangePartitioner, Sub(range, loopState)

                                               ' Loop over each range element without a delegate invocation.
                                               For i As Integer = range.Item1 To range.Item2 - 1
                                                   results(i) = source(i) * Math.PI
                                               Next
                                           End Sub)
        Console.WriteLine("Operation complete. Print results? y/n")
        Dim input As Char = Console.ReadKey().KeyChar
        If input = "y"c Or input = "Y"c Then
            For Each d As Double In results
                Console.Write("{0} ", d)
            Next
        End If

    End Sub
End Module

循环中的每个线程都接收其自己的 Tuple<T1,T2> 值,其中包含指定子区域中的起始索引值和结束索引值。 内部 for 循环使用 fromInclusivetoExclusive 值来遍历数组或直接对 IList 进行循环。

其中一个 Create 重载函数允许您指定分区的大小和分区数量。 此重载适用于以下情况:每个元素的工作量很少,甚至每个元素调用一个虚拟方法都会对性能产生显著影响。

自定义分区器

在某些情况下,实现自己的分区程序可能是值得尝试的,或甚至有必要这样做。 例如,你可能具有一个自定义集合类,可以比默认分区程序更高效地分区,具体取决于你了解该类的内部结构。 或者,你可能希望根据了解处理源集合中不同位置的元素所需的时间,创建大小不同的范围分区。

要创建一个基本的自定义分区器,请从 System.Collections.Concurrent.Partitioner<TSource> 派生一个类,并按照下表所述重写虚拟方法。

方法 DESCRIPTION
GetPartitions 此方法由主线程调用一次,并返回 IList(IEnumerator(TSource))。 循环或查询中的每个工作线程都可以在列表上调用 GetEnumerator 来检索到不同分区的 IEnumerator<T>
SupportsDynamicPartitions 如果实现true,则返回GetDynamicPartitions;否则返回false
GetDynamicPartitions 如果SupportsDynamicPartitionstrue,则可以选择调用此方法,而不是GetPartitions

如果结果必须可排序,并且需要对元素进行索引访问,请从System.Collections.Concurrent.OrderablePartitioner<TSource>类派生并重写其虚拟方法,如下表所述。

方法 DESCRIPTION
GetPartitions 此方法由主线程调用一次,并返回一个 IList(IEnumerator(TSource))。 循环或查询中的每个工作线程都可以在列表上调用 GetEnumerator 来检索到不同分区的 IEnumerator<T>
SupportsDynamicPartitions 如果实现true,则返回GetDynamicPartitions;否则返回 false。
GetDynamicPartitions 通常,这只会调用 GetOrderableDynamicPartitions
GetOrderableDynamicPartitions 如果SupportsDynamicPartitionstrue,则可以选择调用此方法,而不是GetPartitions

下表提供了有关如何实现 OrderablePartitioner<TSource> 类的三种负载均衡分区程序的其他详细信息。

方法/属性 IList/数组(不执行负载均衡) IList/数组(执行负载均衡) 可枚举(IEnumerable)
GetOrderablePartitions 使用范围分区 使用更适合列表的区块分区(partitionCount 已指定) 通过创建静态数量的分区来使用区块分区。
OrderablePartitioner<TSource>.GetOrderableDynamicPartitions 抛出不支持异常 使用针对列表和动态分区优化的区块分区 使用区块分区,通过创建动态数量的分区来实现这一过程。
KeysOrderedInEachPartition 返回 true 返回 true 返回 true
KeysOrderedAcrossPartitions 返回 true 返回 false 返回 false
KeysNormalized 返回 true 返回 true 返回 true
SupportsDynamicPartitions 返回 false 返回 true 返回 true

动态分区

如果你打算将分区器用于ForEach方法中,则必须能够返回动态数量的分区。 这意味着分区程序可以在循环执行期间随时为新的分区按需提供枚举器。 基本上,每当循环添加新的并行任务时,它请求该任务的新分区。 如果需要数据可排序,则派生自 System.Collections.Concurrent.OrderablePartitioner<TSource> 以便为每个分区中的每个项分配唯一索引。

有关详细信息和示例,请参阅 如何:实现动态分区

分区程序合同

实现自定义分区程序时,请遵循以下准则来帮助确保与 PLINQ 和 ForEach TPL 中的正确交互:

  • 如果调用 GetPartitionspartitionsCount 参数值等于或小于零,抛出 ArgumentOutOfRangeException。 尽管 PLINQ 和 TPL 永远不会传递 partitionCount 等于 0,但我们仍建议你防范这种可能性。

  • GetPartitions 并且 GetOrderablePartitions 应始终返回 partitionsCount 分区数。 如果分区程序耗尽数据,并且无法按请求创建任意数量的分区,则该方法应返回每个剩余分区的空枚举器。 否则,PLINQ 和 TPL 都会抛出 InvalidOperationException

  • GetPartitionsGetOrderablePartitionsGetDynamicPartitionsGetOrderableDynamicPartitions 不应返回 nullNothing 在 Visual Basic 中)。 如果返回,PLINQ/TPL 会抛出 InvalidOperationException

  • 返回分区的方法应始终返回可以完全和唯一枚举数据源的分区。 除非分区器设计特别需要,否则数据源中不应有重复或跳过的项。 如果未遵循此规则,则输出顺序可能会混乱。

  • 为了让输出顺序不出现混乱,下面的布尔 Getter 必须始终准确返回以下值:

    • KeysOrderedInEachPartition:每个分区返回具有递增键索引的元素。

    • KeysOrderedAcrossPartitions:对于返回的所有分区,分区 i 中的键索引高于分区 i-1 中的键索引。

    • KeysNormalized:所有关键索引都是单调递增的,不带间隙,从零开始。

  • 所有索引都必须是唯一的。 可能没有重复的索引。 如果未遵循此规则,则输出顺序可能会混乱。

  • 所有索引都必须是非负的。 如果未遵循此规则,则 PLINQ/TPL 可能会引发异常。

另请参阅