18 Возможности библиотеки Task Parallel Library. Работа с потоками в C#

Mac OS X + Mono

Для начала необходимо установить систему .NET Framework, на Mac OS X 10.8.5 это делается следующим образом.

$ brew update
$ brew install mono
==> Downloading https://downloads.sf.net/project/machomebrew/Bottles/mono-3.8.0.mountain_lion.bottle.tar.gz
######################################################################## 100,0%
==> Pouring mono-3.8.0.mountain_lion.bottle.tar.gz
==> Caveats
To use the assemblies from other formulae you need to set:
  export MONO_GAC_PREFIX="/usr/local"
==> Summary
🍺  /usr/local/Cellar/mono/3.8.0: 1128 files, 219M

Как скомпилировать приложение из Терминала можно почитать здесь: OSX | Mono.

Далее можно установить интегрированную среду разработку: Download - MonoDevelop.

Как создать поток в C#

Среда исполнения .NET CLR стремится оптимизировать работу управляемых потоков и использовать для их выполнения потоки процесса, существующие на уровне операционной системы. Поэтому создание потоков типа Thread не всегда сопряжено с созданием потоков процесса.

В качестве рабочего элемента можно использовать:
  • метод класса;
  • делегат метода;
  • лямбда-выражение.
Примеры:

using System.Threading;
using System;

 class Program  
 { 
   static void LocalWorkItem() 
   { 
     Console.WriteLine("Hello from static method"); 
   } 
   static void Main() 
   { 
     Thread thr1 = new Thread(LocalWorkItem); 
     thr1.Start(); 

     Thread thr2 = new Thread(() => 
       { 
         Console.WriteLine("Hello from lambda-expression"); 
       }); 
     thr2.Start(); 

     ThreadClass thrClass = new  ThreadClass("Hello from thread-class"); 
     Thread thr3 = new Thread(thrClass.Run); 
     thr3.Start(); 
   } 
 } 
 class ThreadClass 
 { 
   private string greeting; 
   public ThreadClass(string sGreeting)  
   {  
     greeting = sGreeting; 
   } 
   public void Run() 
   { 
     Console.WriteLine(greeting); 
   } 
 } 

Как дождаться завершения потока в C#

using System.Threading;
using System;

 class Program  
 { 
     static void Main() 
   { 
 Thread thr1 = new Thread(() => 
   { 
     for(int i=0; i<5; i++) 
       Console.Write("A"); 
   }); 

 Thread thr2 = new Thread(() => 
   { 
     for(int i=0; i<5; i++) 
       Console.Write("B"); 
   }); 

 Thread thr3 = new Thread(() => 
   { 
     for(int i=0; i<5; i++) 
       Console.Write("C"); 
   }); 

 thr1.Start(); 
 thr2.Start(); 
 thr1.Join(); 
 thr2.Join(); 
 thr3.Start(); 
   } 
 } 

Вывод: BBBBAABAAACCCCC

Передача параметров с помощью глобальных переменных

using System;
using System.Threading;

 class Program 
 { 

   static long Factorial(long n) 
   { 
     long res = 1; 
     do 
     { 
       res = res * n; 
     } while(--n > 0); 
     return res; 
   } 

   static void Main() 
   {   
     long res1 = 0, res2 = 0;

     long n1 = 5, n2 = 10; 

     Thread t1 = new Thread(() => 
       { 
         res1 = Factorial(n1); 
       }); 

     Thread t2 = new Thread(() => { res2=Factorial(n2); }); 

     // Запускаем потоки 
     t1.Start();  t2.Start(); 

     // Ожидаем завершения потоков 
     t1.Join();  t2.Join(); 

     Console.WriteLine("Factorial of {0} equals {1}", n1, res1); 
     Console.WriteLine("Factorial of {0} equals {1}", n2, res2); 
   } 

 } 

Вывод:
Factorial of 5 equals 120
Factorial of 10 equals 3628800

c# / Возвращение результатов из потока с помощью глобальных переменных / ХэшКод
c# - How to get thread results through local variables? - Stack Overflow

Передача параметров в рабочий метод потока с помощью перегрузки метода Start

using System;
using System.Threading;

   class Program 
       { 
         static double res; 
         static void ThreadWork(object state) 
         { 
           string sTitle = ((object[])state)[0] as string; 
           double d = (double)(((object[])state)[1]);  
           Console.WriteLine(sTitle); 
           res = SomeMathOperation(d); 
         } 
         
         static void Main() 
         { 
           Thread thr1 = new Thread(ThreadWork); 
           thr1.Start(new object[] {"Thread #1", 3.14}); 
           thr1.Join(); 
           Console.WriteLine("Result: {0}", res); 
     
         }   

   static double SomeMathOperation(double d) 
   { 
     return d;
  } 
}


Работа в лямбда-выражениях и анонимных делегатах с общими переменными может приводить к непредсказуемым результатам:

using System;
using System.Threading;

class Program 
{ 
    static void Main() 
    {   
        for(int i=0; i<10; i++) 
        { 
            Thread t = new Thread(() => Console.Write("ABCDEFGHIJK"[i]));
            t.Start();  
        } 
    }
}
 
Ожидаем получить все буквы в случайном порядке, а получаем: IIIJJJIJIJ

Если в строковой константе оставить только 10 букв, полагая, что индекс i может быть от 0 до 9, получаем ошибку "Индекс вышел за границы массива".

Проблема связана с тем, что при объявлении потока делегат метода или лямбда-выражение содержит только ссылку на индекс i. Когда созданный поток начинает свою работу фактическое значение индекса уже давно убежало вперед. Последнее значение индекса равно 10, что и приводит к возникновению исключения. Исправить данный фрагмент можно с помощью дополнительной переменной, которая на каждой итерации сохраняет текущее значение индекса.

using System;
using System.Threading;

class Program 
{ 
    static void Main() 
    {   
      for(int i=0; i<10; i++) 
       { 
         int i_copy = i; 
         Thread t = new Thread(delegate()
            {
                Console.Write("ABCDEFGHIJK"[i_copy]);
            }
         ); 
         t.Start();  
       } 
    }
}

Вывод: GFEAJIBHCD

Anonymous Methods (C# Programming Guide)

Приостановление потока

Метод Sleep() позволяет приостановить выполнение текущего потока на заданное число миллисекунд:
       // Приостанавливаем поток на 100 мс 
       Thread.Sleep(100); 
       // Приостанавливаем поток на 5 мин 
       Thread.Sleep(TimeSpan.FromMinute(5)); 

Если в качестве аргумента указывается ноль Thread.Sleep(0), то выполняющийся поток отдает выделенный квант времени и без ожидания включается в конкуренцию за процессорное время. Такой прием может быть полезен в отладочных целях для обеспечения параллельности выполнения определенных фрагментов кода.

Например, следующий фрагмент:
using System;
using System.Threading;

class Program
{
     static void ThreadFunc(object o)
     {
       for(int i=0; i<20; i++)
         Console.Write(o);
     }
     
     static void Main()
     {
       Thread[] t = new Thread[4];
       for(int i=0; i<4; i++)
         t[i] = new Thread(ThreadFunc);
       
       t[0].Start("A"); t[1].Start("B");
       t[2].Start("C"); t[3].Start("D");
       
       for(int i=0; i<4; i++)
         t[i].Join();
     }
}

Выводит на консоль:
BBBBBBBBBBBBBBBBBBBBAAAAAAAAAAAAAAAAAAAACCCCCCCCCCCCCCCCCCCDDDDDDDDDDDDDDDDDDDD

Параллельность не наблюдается, так как каждый поток за выделенный квант процессорного времени успевает обработать все 20 итераций. Изменим тело цикла рабочей функции:
       static void ThreadFunc(object o)
       {
         for(int i=0; i<20; i++)
         {
           Console.Write(o);
           Thread.Sleep(0);
         }
       }

Вывод стал более разнообразный:
AAAACACACACACACACACACACACACACACACAACCBBDDDBDBDBDBDBDBDBDBDBDBDBDBDBCCCDBBBDDBBDD
 
Существует аналог метода Thread.Sleep(0), который позволяет вернуть выделенный квант – Thread.Yield(). При этом возврат осуществляется только в том случае, если для ядра, на котором выполняется данный поток, есть другой готовый к выполнению поток. Неосторожное применение методов Thread.Sleep(0) и Thread.Yield() может привести к ухудшению быстродействия из-за не оптимального использования кэш-памяти.

Свойства потока

Каждый поток имеет ряд свойств:
  • Name - имя потока, 
  • ManagedThreadId – номер потока, 
  • IsAlive - признак существования потока, 
  • IsBackground – признак фонового потока, 
  • ThreadState – состояние потока. 

Эти свойства доступны и для внешнего вызова.

using System;
using System.Threading;

class Program
{
     static void SomeFunc()
     {
       Thread.Sleep(100);
     }

     static void Main()
     {
       int N = 4;

       // Объявляем массив потоков   
       Thread[] arThr = new Thread[N]; 
       for(int i=0; i<arThr.Length; i++) 
       { 
         arThr[i] = new Thread(SomeFunc); 
         arThr[i].Start(); 
       } 
       for(int i=0; i<arThr.Length; i++) 
       { 
         // Выводим информацию о потоках 
         Console.WriteLine("Thread Id: {0}, name: {1}, IsAlive: {2}", 
               arThr[i].ManagedThreadId,  
               arThr[i].Name, 
               arThr[i].IsAlive); 
       }   
     }
}

Вывод:
Thread Id: 3, name: , IsAlive: True
Thread Id: 4, name: , IsAlive: True
Thread Id: 5, name: , IsAlive: True
Thread Id: 6, name: , IsAlive: True

Свойства текущего потока можно получить с помощью объекта Thread.CurrentThread. Следующий фрагмент с помощью механизма рефлексии выводит все свойства текущего потока.

     using System; 
     using System.Reflection; 
     using System.Threading; 
     class ThreadInfo 
     { 
       static void Main() 
       { 
         Thread t = Thread.CurrentThread; 
         t.Name = "MAIN THREAD"; 
         foreach(PropertyInfo p in t.GetType().GetProperties()) 
         { 
           Console.WriteLine("{0}:{1}", 
               p.Name,p.GetValue(t, null)); 
         } 
     
       } 
      }

Вывод:
CurrentContext:ContextID: 0
CurrentPrincipal:System.Security.Principal.GenericPrincipal
CurrentThread:System.Threading.Thread
ApartmentState:MTA
CurrentCulture:ru-RU
CurrentUICulture:ru-RU
IsThreadPoolThread:False
IsAlive:True
IsBackground:False
Name:MAIN THREAD
Priority:Lowest
ThreadState:Running
ExecutionContext:System.Threading.ExecutionContext
ManagedThreadId:1

Приоритеты потоков

Приоритеты потоков определяют очередность выделения доступа к ЦП. Высокоприоритетные потоки имеют преимущество и чаще получают доступ к ЦП, чем низкоприоритетные. Приоритеты потоков задаются перечислением ThreadPriority, которое имеет пять значений:
  • Highest - наивысший, 
  • AboveNormal – выше среднего, 
  • Normal - средний (по умолчанию), 
  • BelowNormal – ниже среднего, 
  • Lowest - низший. 

Для изменения приоритета потока или чтения текущего используется свойство Priority. Влияние приоритетов сказывается только в случае конкуренции множества потоков за мощности ЦП.

В следующем фрагменте 5 потоков с разными приоритетами конкурируют за доступ к ЦП с 2 ядрами. Каждый поток увеличивает свой счетчик.

using System;
using System.Threading;

      class PriorityTesting 
     { 
       static long[] counts; 
       static bool finish; 

       static void ThreadFunc(object iThread) 
       { 
         while(true) 
         { 
           if(finish) 
              break; 
           counts[(int)iThread]++; 
         } 
       } 
       
       static void Main() 
       { 
         counts = new long[5]; 
         Thread[] t = new Thread[5]; 
         for(int i=0; i<t.Length; i++)  
         { 
           t[i] = new Thread(ThreadFunc); 
           t[i].Priority = (ThreadPriority)i; 
         } 
         // Запускаем потоки 
         for(int i=0; i<t.Length; i++) 
           t[i].Start(i); 
            
         // Даём потокам возможность поработать 10 c 
         Thread.Sleep(10000); 
          
         // Сигнал о завершении 
         finish = true; 
              
         // Ожидаем завершения всех потоков 
         for(int i=0; i<t.Length; i++) 
           t[i].Join(); 
         // Вывод результатов 
         for(int i=0; i<t.Length; i++) 
           Console.WriteLine("Thread with priority {0, 15}, Counts: {1}", (ThreadPriority)i, counts[i]); 
       }   
     } 

I) Вывод на ЦП с 2 ядрами в среде Windows + .NET:
     Thread with priority         Lowest, Counts:    7608195 
     Thread with priority    BelowNormal, Counts:   10457706 
     Thread with priority         Normal, Counts:   17852629 
     Thread with priority    AboveNormal, Counts:  297729812 
     Thread with priority        Highest, Counts:  302506232 

II) Вывод на ЦП с 8 ядрами в среде Mac OS X 10.8.5 + Mono JIT compiler 3.2.3:
Thread with priority          Lowest, Counts: 187944885
Thread with priority     BelowNormal, Counts: 181046392
Thread with priority          Normal, Counts: 172475310
Thread with priority     AboveNormal, Counts: 84869967
Thread with priority         Highest, Counts: 84736214

Локальное хранилище потока

Типы, объявленные внутри рабочей функции потока, являются локальными – у каждого потока, выполняющего функцию, свои копии (приватные данные). Типы, объявленные вне рабочей функции потока, являются общими для всех потоков переменными. Изменения общих данных в одном потоке отражаются в другом потоке.

Существует возможность оперировать с локальными данными потока, объявленными вне рабочей функции, например, в общедоступном классе.

1) Первый способ заключается в объявлении статического поля, локального для каждого потока.

В следующем фрагменте рабочая функция потока использует объект пользовательского типа Data. В этом классе

using System;
using System.Threading;

     public class Data 
     { 
       public static int sharedVar; 
       [ThreadStatic] public static int localVar; 
     } 


     class Program 
     { 

       static void threadFunc(object i) 
       { 
         Console.WriteLine("Thread {0}: Before changing.. Shared: {1}, local: {2}", 
             i, Data.sharedVar, Data.localVar);  
         Data.sharedVar = (int)i; 
         Data.localVar = (int)i; 
         Console.WriteLine("Thread {0}: After changing.. Shared: {1}, local: {2}", 
             i, Data.sharedVar, Data.localVar); 
       } 

       static void Main() 
       { 
         Thread t1 = new Thread(threadFunc);  
         Thread t2 = new Thread(threadFunc); 
        
         Data.sharedVar = 3; Data.localVar = 3;   
     
         t1.Start(1);   t2.Start(2); 
         t1.Join();   t2.Join();  
       } 
     
     } 

Вывод:
Thread 2: Before changing.. Shared: 3, local: 0
Thread 2: After changing.. Shared: 2, local: 2
Thread 1: Before changing.. Shared: 3, local: 0
Thread 1: After changing.. Shared: 1, local: 1

Ограничения этого способа связаны с тем, что атрибут используется только со статическими полями и инициализация поля осуществляется только одним потоком.

2) Второй способ объявления локальных данных заключается в использовании объекта ThreadLocal<T>:

using System;
using System.Threading;

     class Program 
     { 

     static void Main() 
     { 
       ThreadLocal<int> localSum = new ThreadLocal<int>(() => 0); 
       Thread t1 = new Thread(() => { 
           for(int i=0; i<10; i++) 
             localSum.Value++; 
           Console.WriteLine(localSum.Value); 
         }); 
       Thread t2 = new Thread(() => { 
           for(int i=0; i<10; i++) 
             localSum.Value--; 
           Console.WriteLine(localSum.Value); 
         }); 
        
       t1.Start(); t2.Start(); 
       t1.Join(); t2.Join(); 
        
       Console.WriteLine(localSum.Value); 
     } 
     
     } 

Вывод:
10
-10
0

Первый поток увеличивал свой счетчик, второй уменьшал, а третий (главный поток) ничего не делал со своим локальным счетчиком, поэтому получаем 0.

3) Третий способ заключается в использовании локальных слотов потока. 

Доступ к слотам обеспечивается с помощью методов GetData, SetData. Объект, идентифицирующий слот, можно получить с помощью строковой константы. Применение слотов является менее производительным по сравнению с локальными статическими полями. Но может быть полезным, если работа потока структурирована в нескольких методах. В каждом методе можно получить доступ к слоту потока по его имени.

using System;
using System.Threading;

     public class ThreadWork 
     { 

       private string sharedWord; 

       public void Run(string secretWord) 
       { 
         sharedWord = secretWord; 
         Save(secretWord); 
         Thread.Sleep(500); 
         Show(); 
       } 

       private void Save(string s) 
       { 
       // Получаем идентификатор слота по имени 
         LocalDataStoreSlot slot = 
                Thread.GetNamedDataSlot("Secret"); 
         // Сохраняем данные 
         Thread.SetData(slot, s); 
       } 

       private void Show() 
       { 
         LocalDataStoreSlot slot = Thread.GetNamedDataSlot("Secret"); 
         string secretWord = (string)Thread.GetData(slot); 
         Console.WriteLine("Thread {0}, secret word: {1}, shared word: {2}", 
             Thread.CurrentThread.ManagedThreadId, 
              secretWord, sharedWord); 
       } 

     } 


     class Program 
     { 
       static void Main() 
       { 
         ThreadWork thr = new ThreadWork(); 
         new Thread(() => thr.Run("one")).Start(); 
         new Thread(() => thr.Run("two")).Start(); 
         new Thread(() => thr.Run("three")).Start(); 
         Thread.Sleep(1000); 
       } 
     
     } 

Thread 3, secret word: one, shared word: two
Thread 5, secret word: three, shared word: two
Thread 4, secret word: two, shared word: two

Переменная sharedWord является разделяемой, поэтому выводится последнее изменение, выполненное третьим потоком.