Parallel Programming

.Parallel Programming
Tasks | TaskFactory | Task Parallel Library

".Net 4 introduced the Parallel Framework (PFX) which contains features for targeting multicore processors. Included are the high level features of PLINQ and the Parallel class. Low level features include the Task class, concurrent collections, and lightweight signaling constructs. Additional constructs are also included to help with parallel programming."

.Parallel For vs Sequential For Loop
Parallel For vs Sequential For Loop

A number of new multithreading API and constructs targeting multicore processors were introduced in .NET 4. They comprise the Parallel Framework (PFX) which included the Parallel class and the Task Parallel Library (TPL). While all of these new capabilities can be performed with the traditional multithreading constructs, these new APIs and constructs make them easier to implement. For less experienced programmers, the new constructs also improve reliability and help reduce the frequency of bugs. Additionally some of the parallel constructs overcome some of the short falls of using the thread pool, such as providing a built-in way of determining when the thread operation has completed.

The highest layer of functionality in PFX consist of the PLINQ and Parallel class APIs. PLINQ is a parallel implementation of LINQ to Objects and in many scenarios can significantly increase the speed of the queries. The Parallel class provides support for structured parallelism with parallel loops and regions. The Parallel.For and Parallel.ForEach are easy ways to create loops where each iteration may run in parallel. The lower layer of the PFX contains task parallelism classes and constructs for aiding parallel programming. Included in this layer are:

  1. Task Classes - provide an easy and efficient way to use the thread pool and also have features for managing units of work.
  2. Concurrent Collections - provide a set of thread-safe collections.
  3. Spinning Primitives - provides a lock without the cost of context switching.
  4. Semaphone Slim - optimized to meet low-latency demands of parallel programming.
  5. Lazy Initialization Types - delay creation of object until it is first used.

Tasks

.Net introduced the concept of a Task which is a lightweight object for managing a parallelized unit of work. Tasks use threads from the thread pool which are the same as those used by ThreadPool.QueueWorkItem, but with enhancements for managing the work, such as:

  1. Attaching "continuation" tasks which causes a method to run when a task is canceled, faults, or finished successfully.
  2. Propagate exceptions to parents or other tasks.
  3. Tune the scheduling of tasks.
  4. Establish task child and parent relationships.
  5. Implement local work queues which reduce contention.

The following Task classes are available in the System.Threading.Tasks namespace:

  1. Task - manages a unit of work.
  2. Task<TResult> - manages a unit of work with a return value.
  3. TaskFactory - support for creating and scheduling Task objects.
  4. TaskScheduler - handles the low-level work of queuing tasks onto threads.
  5. TaskCompletionSource - represents an external asynchronous operation

Visual Studio 2010 provides a window for monitoring tasks (Debug->Windows->Parallel Tasks).

Tasks can be created by calling the Task.Run() method which accepts an Action delegate. This queues the specified work to run on the thread pool and returns a task handle. The following options are available for waiting for a task to complete:

  1. For a particular task, call its Wait method with an optional timeout. (Equivalent to calling Join() on a thread.)
  2. For a particular task, access it Result property.
  3. For multiple tasks, use the static method Task.WaitAll.
  4. For multiple tasks, use the static method Task.WaitAny (waits for just one task to complete).

While waiting for a task to complete, any unhandled exceptions are rethrown to the caller inside an AggregateException object. This object usually eliminates the need to handle exceptions within the task code blocks. Inside the AggregateException object may be a number of exceptions. It is also possible to have AggregateException inside an AggregateException. For nested AggregateException objects the Flatten() method will remove the nesting and convert them into a simple flat list. The AggregateException also has a Handle() method which allows the catching of only specific exception types.

Continuation tasks allow you to start another task after the other task completes, faults, or is canceled by using the ContinueWith method. Faulted means an unhandled exception occurred. Canceled means the task canceled through the use of its CancellationToken property or a conditional continuation predicate was not satisified. The following example starts a task with Task.Run() and waits for the task to finish with Task.Wait().

Task with Added Continuation Tasks

using System;
using System.Threading.Tasks;
namespace TaskContinuations
{
    class Program
    {
        static void Main()
        {
            // Define and start the task
            Task<string> t1 = Task.Run(() =>
            {
                //Environment.Exit(-1);
                return "I am task T1";
            }).ContinueWith((s) =>
                {
                    return s.Result + " - continued";
                });

            t1.ContinueWith((s) =>
            {
                Console.WriteLine("Canceled");
            }, TaskContinuationOptions.OnlyOnCanceled);

            t1.ContinueWith((s) =>
            {
                Console.WriteLine("Faulted");
            }, TaskContinuationOptions.OnlyOnFaulted);

            t1.ContinueWith((s) =>
            {
                Console.WriteLine("Completed");
            }, TaskContinuationOptions.OnlyOnRanToCompletion);

            Console.WriteLine(t1.Result);
        }
    }
}

Tasks can also have child/parent relationships which allows for complex hierarchies. In these relationships, the parent task will finish only after the child tasks have completed. The following program has one parent task with two child tasks .AttachedToParent.

.Parent Task with Children
Parent Task with Children

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ChildParentTasks
{
    class Program
    {
        static void Main()
        {
            Task parent = new Task(() =>
            {
                Console.WriteLine("Parent task executing.");

                new Task(() =>
                {
                    Console.WriteLine("Nested task 1 starting.");
                    Thread.SpinWait(500);
                    Console.WriteLine("Nested task 1 completing.");
                }, TaskCreationOptions.AttachedToParent).Start();

                new Task(() =>
                {
                    Console.WriteLine("Nested task 2 starting.");
                    //Thread.SpinWait(500000);
                    Thread.Sleep(5000);
                    Console.WriteLine("Nested task 2 completing.");
                }, TaskCreationOptions.AttachedToParent).Start();
            });

            parent.Start();
            parent.Wait();
            Console.WriteLine("Parent task has completed.\n");
        }
    }
}

Top

Task Factory

The Task.Run() method did not exist prior to .NET 4.5, so the Task.Factory.StartNew() method was the earlier method for starting tasks. However Task.Run() did not make Task.Factory.StartNew() obsolete, but rather is a quick way to use Task.Factory.StartNew(). Task.Run() should be used for the common cases of offloading work to be processed on the ThreadPool. However there are complex cases where the additional thread control that Task.Factory.StartNew() provides with its TaskCreationOptions. Also Task.Factory.StartNew() has overloads that accept object state, which for performance-sensitive code paths can be used to avoid closures.

The following programs uses Task.Factory.StartNew() to start two task, each in an endless loop. A CancellationToken is used to cancel the tasks after five seconds. The AggregateException is caught and all the exceptions are displayed on the screen.

.Task.Factory.StartNew and Aggregate Exception
Task.Factory.StartNew() and Aggregate Exception

using System;
using System.Threading;
using System.Threading.Tasks;

namespace TaskFactory
{
    class Program
    {
        static void Main(string[] args)
        {
            Task[] tasks = new Task[2];

            CancellationTokenSource cts = new CancellationTokenSource();
            tasks[0] = Task.Factory.StartNew(() =>
            {

                while (true)
                {
                    cts.Token.ThrowIfCancellationRequested();
                    Console.WriteLine("Task1");
                    Thread.Sleep(1000);
                }
            });

            tasks[1] = Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    cts.Token.ThrowIfCancellationRequested();
                    Console.WriteLine("Task2");
                    Thread.Sleep(2000);
                }
            });

            Thread.Sleep(5000);
            cts.Cancel();

            try
            {
                Task.WaitAll(tasks);

            }
            catch (AggregateException e)
            {
                foreach (var ex in e.InnerExceptions)
                {
                    Console.WriteLine("Exception: " + ex.Message);
                }
            }
            Console.WriteLine();
        }
    }
}

Top




Task Parallel Library

.Parallel For vs Sequential For Loop
Run Time Comparison between Sequential For Loop and Parallel For Loop


Parallel Class



The Task Parallel Library (TPL) was introduced in .NET 4.0 to support data parallelism through the Parallel class. This class provides static methods which allow work to be performed in parallel. The Parallel class handles all the low-level parallel logic, so you do not have to create threads, create queue work items, or use locks (for basic loops). All members of the Parallel class are thread-safe and may be used concurrently from multiple threads. The static methods in the Parallel class are:

  1. Data Parallelism
    1. Parallel.For - performs the equivalent of a C# for loop.

    2. Parallel.ForEach- performs the equivalent of a C# foreach loop.
  2. Task Parallelism
    1. Parallel.Invoke - executes an array of delegates in parallel.

When the Parallel.For and Parallel.ForEach are used in nested loops, they usually work best as the outer loop, with the inner loops being sequentially processes. Coding the Parallel.For and Parallel.ForEach loops is similar to their sequential counterpart. For example:

The sequential for loop:

for (int I=0; I MyMethod(i);

is coded with a Parallel for loop as:

Parallel.For( 0, 100, i => MyMethod (i);

The Parallel class provides an instance of the ParallelLoopState class to each loop. The ParallelLoopState class allows iterations of Parallel loops to interact with other iterations. ParallelLoopState has two methods which tell the loop that no further iterations are required:

  1. Break - ensures all further iterations currently running will finish at their earliest convenience.

  2. Stop - ensures all further iterations will stop immediately.

For example, if Break is called on the 100th iteration of a for loop iterating in parallel from 0 to 1000, all iterations less than 100 will still run while iterations 101 through 1000 are net needed. Stop and Break might be used in a search-based algorithm where once the result is found, no other loop iterations need to run.The property LowestBreakIteration contains the lowest iteration of the loop from which the Break() method was called. When Stop() is called, LowestBreakIteration is set to null.

Parallel processing only increases performance when there is a lot of work to be done that can be executed in parallel. Smaller amounts of work performed in parallel may actually decrease performance because of the synchronization required by processing in parallel.See my demonstration video Parallel For Loop vs Regular For Loop for the coding and comparison of a Parallel.For loop. The Parallel loop completed the work over 3.5 times faster than the sequential work. The code used in the demonstration is included below.

Parallel For Loop vs Regular For Loop

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ParallelTests
{
    class Program
    {
        static void Main(string[] args)
        {
            long work = 0;

            Stopwatch stopWatch = new Stopwatch();
            stopWatch.Start();


            #region Regular For Loop Region           
            //Console.WriteLine("Regular for Loop is running ...\n");
            //for (int myInt = 0; myInt < 10; myInt++)
            //{
            //    for (long myLong = 0; myLong < 1E9; myLong++) { work++; }
            //}
            #endregion

            #region Parallel For Loop Region
            Console.WriteLine("Parallel for Loop is running ...\n");
            Parallel.For<long>(0, 10, () => 0, (myInt, loop, subtotal) =>
                {
                    for (long myLong = 0; myLong < 1E9; myLong++) subtotal++;
                    return subtotal;
                },
                (x) => Interlocked.Add(ref work, x)
                );
            #endregion

            // Calculate and Display Execution Time
            stopWatch.Stop();
            TimeSpan ts = stopWatch.Elapsed;
            string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}",
                ts.Hours, ts.Minutes, ts.Seconds,
                ts.Milliseconds / 10);
            Console.WriteLine("RunTime " + elapsedTime + "\n");

            // Display Work Value
            Console.WriteLine("Work is: " + work.ToString("E", CultureInfo.InvariantCulture));
            Console.WriteLine("\n\n");
        }
    }
}

Parallel.Invoke

Parallel.Invoke is used for task parallelism where an array of actions are executed in parallel. The actions can be named methods, anonymous delegates, or lambda expressions, as shown in the following program. Note the program is checking for an AggregateException which is used to consolidate multiple failures into a single exception object. AggregateException is used extensively in the TPL and PLINQ.

Parallel Invoke with a Named Method, Anonymous Delegate, and a Lambda Expression

using System;
using System.Threading;
using System.Threading.Tasks;

class ParallelInvokeDemo
{
    static void Main()
    {
        try
        {
            Parallel.Invoke(
                NamedMethod,        // Static Named Method

                delegate()                // Anonymous Delegate
                {
                    Console.WriteLine("Anonymous Delegate, Thread={0}", Thread.CurrentThread.ManagedThreadId);
                },

                () =>                        // Lambda Expression
                {
                    Console.WriteLine("Lambda Expression, Thread={0}", Thread.CurrentThread.ManagedThreadId);
                }
            );
        }
        catch (AggregateException e)
        {
            Console.WriteLine(e.InnerException.ToString());
        }
    }

    static void NamedMethod()
    {
        Console.WriteLine("Named Method, Thread={0}", Thread.CurrentThread.ManagedThreadId);
    }
}

Top



Reference Articles

Top