19 Возможности библиотеки Task Parallel Library. Пул потоков

Пул потоков предназначен для упрощения многопоточной обработки. 

Программист выделяет фрагменты кода (рабочие элементы), которые можно выполнять параллельно. Планировщик (среда выполнения) оптимальным образом распределяет рабочие элементы по рабочим потокам пула. Таким образом, вопросы эффективной загрузки оптимального числа потоков решаются не программистом, а планировщиком (исполняющей средой). 

Еще одним достоинством применения пула является уменьшение накладных расходов, связанных с ручным созданием и завершением потоков для каждого фрагмента кода, допускающего распараллеливание. 

Пул потоков используется для обработки задач типа Task. Задачи обладают рядом полезных встроенных механизмов (ожидания, отмены, продолжения и т.д.). Поэтому для распараллеливания рабочих элементов рекомендуется использовать именно задачи или шаблоны класса Parallel. Непосредственная работа с пулом без явного определения задач может быть полезна, когда нет необходимости в дополнительных возможностях объекта Task.


Для добавления рабочего элемента используется метод
       // Добавление метода без параметров 
       ThreadPool.QueueUserWorkItem(SomeWork);     
       // Добавление метода с параметром 
       ThreadPool.QueueUserWorkItem(SomeWork, data); 

В следующем фрагменте проиллюстрируем основные особенности пула потоков.

using System;
using System.Threading;

     class Program 
     { 
       static void Main() 
       { 

     for(int i=0; i<10; i++) 
     { 
       ThreadPool.QueueUserWorkItem((object o)=> { 
         Console.WriteLine("i: {0}, ThreadId: {1}, IsPoolThread: {2}",  
           i, Thread.CurrentThread.ManagedThreadId,  
           Thread.CurrentThread.IsThreadPoolThread); 
         }); 
       Thread.Sleep(100); 
     } 

       } 
     
     } 

Добавляем в пул потоков 10 экземпляров безымянного делегата, объявленного в виде лямбда-выражения. В рабочем элементе осуществляется вывод значения индекса i, номер потока и признак того, что поток принадлежит пулу.

Вывод:
     i: 10, ThreadId: 3, IsPoolThread: True 
     i: 10, ThreadId: 4, IsPoolThread: True 
     i: 10, ThreadId: 4, IsPoolThread: True 
     i: 10, ThreadId: 4, IsPoolThread: True 
     i: 10, ThreadId: 4, IsPoolThread: True 
     i: 10, ThreadId: 3, IsPoolThread: True 
     i: 10, ThreadId: 3, IsPoolThread: True 
     i: 10, ThreadId: 3, IsPoolThread: True 
     i: 10, ThreadId: 3, IsPoolThread: True 
     i: 10, ThreadId: 4, IsPoolThread: True 

Убеждаем, что действительно все рабочие элементы выполнялись потоками пула (признак IsPoolThread равен true). Всего в обработке участвовало только два потока.

Для приведенного кода возможна ситуация, когда все рабочие элементы будут обрабатываться в одном потоке пула.

i: 0, ThreadId: 4, IsPoolThread: True
i: 1, ThreadId: 4, IsPoolThread: True
i: 2, ThreadId: 4, IsPoolThread: True
i: 3, ThreadId: 4, IsPoolThread: True
i: 4, ThreadId: 4, IsPoolThread: True
i: 5, ThreadId: 4, IsPoolThread: True
i: 6, ThreadId: 4, IsPoolThread: True
i: 7, ThreadId: 4, IsPoolThread: True
i: 8, ThreadId: 4, IsPoolThread: True
i: 9, ThreadId: 4, IsPoolThread: True

Заменим работу с пулом на ручную работу с потоками:

using System;
using System.Threading;

     class Program 
     { 
       static void Main() 
       { 

     for(int i=0; i<10; i++) 
     { 
       new Thread((object o)=> { 
         Console.WriteLine("i: {0}, ThreadId: {1}, IsPoolThread: {2}",  
           i, Thread.CurrentThread.ManagedThreadId,  
           Thread.CurrentThread.IsThreadPoolThread); 
         }).Start(); 
     } 

       } 
     
     } 

Вывод:
i: 10, ThreadId: 6, IsPoolThread: False
i: 10, ThreadId: 4, IsPoolThread: False
i: 10, ThreadId: 10, IsPoolThread: False
i: 10, ThreadId: 5, IsPoolThread: False
i: 10, ThreadId: 7, IsPoolThread: False
i: 10, ThreadId: 11, IsPoolThread: False
i: 10, ThreadId: 3, IsPoolThread: False
i: 10, ThreadId: 9, IsPoolThread: False
i: 10, ThreadId: 8, IsPoolThread: False
i: 10, ThreadId: 12, IsPoolThread: False

Каждый рабочий элемент обрабатывался в своем потоке, не входящем в состав пула.

Во всех рабочих элементах осуществляется вывод одного и того же значения индекса i, равного 10. Это связанно с асинхронностью запуска рабочих элементов – основной поток продолжает обрабатывать цикл и увеличивать значение индекса, а рабочие элементы фактически еще не выполняются. Для предотвращения такой ситуации необходимо использовать индивидуальные копии для каждого элемента.

С индексом i есть проблемы. Это объясняется тем, что операция добавления делегата в очередь выполняется гораздо быстрее, чем инициализация запуска нового потока. Пока на второй итерации осуществляется запуск потока, первый поток уже приступил к работе и прочитал текущее значение индекса. Для гарантированной работы каждого потока с уникальным индексом необходимо использовать копии индексов, создаваемые на каждой итерации:

using System;
using System.Threading;

     class Program 
     { 
       static void Main() 
       { 

     for(int i=0; i<10; i++) 
     { 
       int y = i; 
       ThreadPool.QueueUserWorkItem((object o) => Console.WriteLine(y)); 
       new Thread(() => Console.WriteLine(y)).Start(); 
     } 

       } 
     } 




Основным неудобством работы с пулом, является отсутствие механизма ожидания завершения рабочих элементов. Необходимо использовать либо какие-то средства синхронизации (например, сигнальные сообщения ManualResetEvent, шаблон синхронизации CountdownEvent), либо общие переменные.

using System;
using System.Threading;

     class Program 
     { 

     static void Func1(object o) 
     { 
       var ev = (ManualResetEvent)o; 
       // Работа 
       Console.WriteLine("Func1: Working.."); 
       ev.Set(); 
     } 

     static void Main() 
     { 
       ManualResetEvent ev = new ManualResetEvent(false); 
       ThreadPool.QueueUserWorkItem(Func1, ev); 
       ev.WaitOne();
     } 

     } 

В рабочую функцию передаем сигнальное сообщение. Основной поток блокируется в ожидании сигнала. Рабочий поток после завершения работы генерирует сигнал.

ManualResetEvent - конструктор  (System.Threading)
ManualResetEventSlim - класс (System.Threading)

В случае работы нескольких потоков можно использовать массив сообщений и метод WaitHandle.WaitAll для ожидания сигналов от всех потоков.

using System;
using System.Threading;

     class Program 
     { 
    
     static void SomeWork() 
     { 
             Console.WriteLine("SomeWork: Working.."); 
     } 

       static void Main() 
       { 
         var events = new ManualResetEvent[10]; 
         for(int i=0; i<10; i++) 
         { 
           int y = i; 
           events[i] = new ManualResetEvent(false); 
           ThreadPool.QueueUserWorkItem((object o) => { 
             SomeWork(); 
             events[y].Set(); 
           }); 
         } 
         WaitHandle.WaitAll(events); 
       } 
     }

Каждый рабочий элемент оперирует со своим уникальным сигнальным объектом. После завершения работы, в делегате устанавливается сигнальный объект. Основной поток дожидается завершения всех рабочих элементов.

Вопросы

  1. Какие достоинства у непосредственной работы с потоками?
  2. В чем заключаются основные проблемы организации многопоточной обработки?
  3. В каких случаях работа с пулом потоков является более эффективной, чем непосредственная работа с потоками?

Упражнения

Реализуйте пользовательский пул потоков, который повторяет функциональность объекта ThreadPool. Добавьте новые методы пула, которые упрощают работу и делают общение с пулом более информативным. Например, методы ожидания запущенных задач; получение сводной информации о выполнении задач (число потоков, участвовавших в обработке; время обработки).