Writing a Contextual Partitioner

The default partitioner and the chunking partitioner both operate on any data type. One advantage of writing a customer partitioner is that you can tailor your strategy to the data type that you need to process. This section will demonstrate how to implement a contextual partitioner and, in doing so, explain how to extend the Partitioner class to implement a custom technique.

To start our partitioner, we need a context—some data type with characteristics that we are going to specialize. I have defined the class WorkItem in Listing 5-17.

■ Note Listings 5-17 through 5-25 contains fragments that won't compile on their own; they need to be assembled together. If you have downloaded the source code from www.apress.com, you will find all of code fragments already assembled for you in the project called Listing_17-25.

Listing 5-17. The WorkItem Class class WorkItem {

public int WorkDuration { get; set;

public void performWork() {

// simulate work by sleeping Thread.Sleep(WorkDuration);

The WorkItem has a property that reports how long processing an instance will take and a method that simulates the processing by sleeping. This is a conveniently simple example, but it is representative of many situations where you are able to make an estimate of how long it will take to process an instance based on some other characteristic, for example, encrypting a message based on the number of characters.

The objective for our partitioner is to use the estimated duration of each item to create chunks of items that take the same amount of time to process. The result will be chunks with varying lengths. The effect of focusing on the amount of processing required will be to evenly distribute the workload across the cores in the system.

Listing 5-18 shows how we will use our partitioner, called ContextualPartitioner. To create the source data for the parallel loop, we create an instance of System.Random and use it to generate pseudorandom values between one and ten, which we use to set the WorkDuration property of 10,000 instances of WorkItem.

Listing5-18. Using the ContextPartitioner Class class ContextTest {

static void Main(string[] args) {

// create a random number source Random rnd = new Random();

// create the source data WorkItem[] sourceData = new WorkItem[10000]; for (int i = 0; i < sourceData.Length; i++) { sourceData[i]

= new WorkItem() { WorkDuration = rnd.Next(1, 11) };

// created the contextual partitioner Partitioner<WorkItem> cPartitioner

= new ContexPartioner(sourceData, 100);

// create the parallel Parallel.ForEach(cPartitioner, item => { // perform the work item item.performWork();

// wait for input before exiting Console.WriteLine("Press enter to finish");

Console.ReadLine();

After creating the data source, we then create the partitioner, and the arguments passed to the constructor are the WorkItem array and the target total processing timer per chunk. The example specifies a target value of 100. The partitioner is passed as an argument to a Parallel.ForEach() loop, the body of which simply calls the performWork() method of each WorkItem instance that is processed.

Now that you have seen the WorkItem definition and how the partitioner is used, we can turn our attention to the partitioner itself. To create a partitioner, you must override the members listed in Table 5-8.

Table 5-8. Required Overrides of the Partitioner Class

Member

Description

SupportsDynamicPartitions

Return true if the partitioner supports dynamic partitioning. Returning

false from this property will cause an exception if the partitioner is

used in a Parallel.ForEach() loop.

GetPartitions(int)

Break up the data into a fixed number of partitions. No further

partitions will be requested; the partitioner is free to allocate data as it

wishes to each partition.

GetDynamicPartitions()

Return an object which creates new partitions when its

GetEnumerator() method is called.

The complete partitioner class is shown in Listing 5-19.

Listing 5-19. A Contextual Partitioner class ContextPartitioner : Partitioner<WorkItem> { // the set of data items to partition protected WorkItem[] dataItems; // the target sum of values per chunk protected int targetSum; // the first unchunked item private long sharedStartIndex = 0; // lock object to avoid index data races private object lockObj = new object(); // the object used to create enumerators private EnumerableSource enumSource;

public ContextPartitioner(WorkItem[] data, int target) { // set the instance variables from the parameters dataItems = data; targetSum = target; // create the enumerable source enumSource = new EnumerableSource(this);

public override bool SupportsDynamicPartitions { get {

// dynamic partitions are required for // parallel foreach loops return true;

Download from Wow! eBook public override IList<IEnumerator<WorkItem>> GetPartitions(int partitionCount) { // create the list which will be the result

IList<IEnumerator<WorkItem>> partitionsList = new List<IEnumerator<WorkItem>>(); // get the IEnumerable that will generate dynamic partitions IEnumerable<WorkItem> enumObj = GetDynamicPartitions(); // create the required number of partitions for (int i = 0; i < partitionCount; i++) {

partitionsList.Add(enumObj.GetEnumerator());

// return the result return partitionsList;

public override IEnumerable<WorkItem> GetDynamicPartitions() { return enumSource;

private Tuple<long, long> getNextChunk() { // create the result tuple Tuple<long, long> result;

// get an exclusive lock as we perform this operation lock (lockObj) {

// check that there is still data available if (sharedStartIndex < dataItems.Length) { int sum = 0;

long endIndex = sharedStartIndex;

while (endIndex < dataItems.Length && sum < targetSum) { sum += dataItems[endIndex].WorkDuration; endIndex++;

result = new Tuple<long, long>(sharedStartIndex, endIndex); sharedStartIndex = endIndex; } else {

// there is no data available result = new Tuple<long, long>(-1, -1);

// end of locked region // return the result return result;

class EnumerableSource : IEnumerable<WorkItem> { ContextPartitioner parentPartitioner;

public EnumerableSource(ContextPartitioner parent) { parentPartitioner = parent;

IEnumerator IEnumerable.GetEnumerator() {

return ((IEnumerable<WorkItem>)this).GetEnumerator();

IEnumerator<WorkItem> IEnumerable<WorkItem>.GetEnumerator() {

return new ChunkEnumerator(parentPartitioner).GetEnumerator();

class ChunkEnumerator {

private ContextPartitioner parentPartitioner;

public ChunkEnumerator(ContextPartitioner parent) { parentPartitioner = parent;

public IEnumerator<WorkItem> GetEnumerator() { while (true) {

// get the indices of the next chunk

Tuple<long, long> chunkIndices = parentPartitioner.getNextChunk(); // check that we have data to deliver if (chunkIndices.Iteml == -1 && chunkIndices.Item2 == -1) { // there is no more data break; } else {

// enter a loop to yield the data items for (long i = chunkIndices.Iteml; i < chunkIndices.Item2; i++) { yield return parentPartitioner.dataItems[i];

Let's go through the class in fragments so you can better understand how it fits together. The constructor, shown in Listing 5-20, takes the array of WorkItems and our chunk processing target and assigns them to the instance variables dataItems and targetSum respectively. It also creates an instance of EnumerableSource, which we'll get to in due course.

Listing 5-20. The Constructor and Property of the ContextPartitioner Class class ContextPartitioner : Partitioner<WorkItem> { // the set of data items to partition protected WorkItem[] dataItems; // the target sum of values per chunk protected int targetSum; // the first unchunked item private long sharedStartIndex = 0; // lock object to avoid index data races private object lockObj = new object(); // the object used to create enumerators private EnumerableSource enumSource;

public ContextPartitioner(WorkItem[] data, int target) { // set the instance variables from the parameters dataItems = data; targetSum = target; // create the enumerable source enumSource = new EnumerableSource(this);

public override bool SupportsDynamicPartitions { get {

// dynamic partitions are required for // parallel foreach loops return true;

The SupportsDynamicPartitions property must always return true for partitioners used in Parallel.For() loops. Static partitioners can be used with PLINQ however (see Chapter 6 for details). The GetDynamicPartitions() method returns an object that implements the IEnumerable<WorkItem> interface and will create a new partition each time that the GetEnumerator() call is made on it. In our case, shown in Listing 5-21, we return the instance of the EnumerableSource class we created in the constructor.

Listing5-21. The ContextPartitioner.GetDynamicPartitionsO Method public override IEnumerable<WorkItem> GetDynamicPartitions() { return enumSource;

The GetPartitions() method is used to generate static partitions; this task is easy for a dynamic partitioner, as you can simply create the required number of dynamic partitions. The result from the GetPartitions() method is an IList of IEnumerator<WorkItem>s ; each of these is a static partition. In our implementation, shown in Listing 5-22, you can see that we do this using the result from the GetDynamicPartitions() method.

Listing5-22. The ContextPartitioner.GetPartitionsO Method public override IList<IEnumerator<WorkItem>> GetPartitions(int partitionCount) { // create the list which will be the result

IList<IEnumerator<WorkItem>> partitionsList = new List<IEnumerator<WorkItem>>(); // get the IEnumerable that will generate dynamic partitions IEnumerable<WorkItem> enumObj = GetDynamicPartitions(); // create the required number of partitions for (int i = 0; i < partitionCount; i++) {

partitionsList.Add(enumObj.GetEnumerator());

// return the result return partitionsList;

The most interesting method in the ContextPartitioner class is getNextChunk(), shown in Listing 523. This is the method that implements our partitioning strategy, creating chunks that will take roughly the target amount of time to process. I say "roughly" because this method iterates through the data items until the target is exceeded, so the amount of processing time per chunk can be slightly more than the target. This variation is acceptable for our example, but you could be a lot more rigorous if required.

There are a couple of key design decisions to note with this method. First, observe that the result from this method is a System.Tuple<long,long>. The Tuple values represent the inclusive start index and exclusive end index into the WorkItem array that the chunk represents. We don't copy the data into a dedicated array, but rather share the main array between each of the partitions—you'll see how this works shortly.

Second, you'll see that we used the sharedStartIndex variable to track the index of the first unchunked WorkItem in the data array. We use synchronization to protect this variable to ensure we don't create multiple chunks with the same index values, leading to WorkItems being processed more than once (the lock keyword is used for simplicity, but any of the primitives discussed in Chapter 4 would work).

Listing5-23. The ContextPartitioner.getNextChunkO Method private Tuple<long, long> getNextChunk() { // create the result tuple Tuple<long, long> result;

// get an exclusive lock as we perform this operation lock (lockObj) {

// check that there is still data available if (sharedStartIndex < dataItems.Length) { int sum = 0;

long endIndex = sharedStartIndex;

while (endIndex < dataItems.Length && sum < targetSum) { sum += dataItems[endIndex].WorkDuration; endIndex++;

result = new Tuple<long, long>(sharedStartIndex, endIndex); sharedStartIndex = endIndex; } else {

// there is no data available result = new Tuple<long, long>(-l, -l);

// end of locked region // return the result return result;

If there is no data left, we return a Tuple whose values are both -l, which signals the partition that requested the chunk that no more data will be forthcoming. You'll see how this works when we get to the ChunkEnumerator class shortly. Before we get there, we have to cover the EnumerableSource class, as shown in Listing 5-24.

Listing5-24. The ContextPartitioner.EnumerableSource Class class EnumerableSource : IEnumerable<WorkItem> { ContextPartitioner parentPartitioner;

public EnumerableSource(ContextPartitioner parent) { parentPartitioner = parent;

IEnumerator IEnumerable.GetEnumerator() {

return ((IEnumerable<WorkItem>)this).GetEnumerator();

IEnumerator<WorkItem> IEnumerable<WorkItem>.GetEnumerator() {

return new ChunkEnumerator(parentPartitioner).GetEnumerator();

This class doesn't do very much other than act as a bridge between ContextPartitioner and ChunkEnumerator, but it exists for an important reason. If the ContextPartitioner class implements the IEnumerable<WorkItem> interface and returns an instance of itself when the GetDynamicPartitions() method is called, we create ambiguity with the Parallel.ForEach() method, which has separate versions that accept classes derived from Partitioner and that implement IEnumerable<>. The ambiguity requires that the user of ContextPartitioner cast instances of the class to Partitioner. If ContextPartitioner is cast to an implementation of IEnumerable<>, the default partitioner will be used to partition our partitioner, which is obviously not the effect we are striving for. To that end, EnumerableSource is the result from the ContextPartitioner.CreateDynamicPartitions() method to separate out the base class from the interface implementation.

As described in Table 5-8, the object returned from the ContextPartitioner. CreateDynamicPartitions() must create new partitions when the GetEnumerator() method is called. EnumerableSource does this by creating a new instance of ChunkEnumerator and returning the result from its GetEnumerator() method. The ChunkEnumerator class is shown in Listing 5-25.

Listing5-25. The ContextPartitioner.ChunkEnumerator Class class ChunkEnumerator {

private ContextPartitioner parentPartitioner;

public ChunkEnumerator(ContextPartitioner parent) { parentPartitioner = parent;

public IEnumerator<WorkItem> GetEnumerator() { while (true) {

// get the indices of the next chunk

Tuple<long, long> chunkIndices = parentPartitioner.getNextChunk();

// check that we have data to deliver if (chunkIndices.Item1 == -1 && chunkIndices.Item2 == -1) { // there is no more data break; } else {

// enter a loop to yield the data items for (long i = chunkIndices.Item1; i < chunkIndices.Item2; i++) { yield return parentPartitioner.dataItems[i];

This class works in a very simple manner. The GetEnumerator() method enters a loop that requests chunks via the ContextPartitioner.getNextChunk() method. If it receives a Tuple with both items set to -1, it breaks out of the loop. This happens when there is no unchunked data left. Otherwise, the yield keyword is used to return items from the main data array, using the index values of the Tuple returned from getNextChunk() ,

And that's our contextual partitioner and an explanation of how to implement a custom partitioning strategy by extending the Partitioner class.

0 0

Post a comment