28 Возможности библиотеки Task Parallel Library. Атомарные операторы

Библиотека .NET 4.0 предоставляет высокоэффективные атомарные операторы, которые реализованы как статические методы класса System.Threading.Interlocked. Атомарные операторы предназначены для потокобезопасного неблокирующего выполнения операций над данными, преимущественно целочисленного типа.


Атомарность означает, что при выполнении оператора никто не вмешается в работу потока. Функционально, атомарные операторы равносильны критической секции, выделенной с помощью lock, Monitor или других средств синхронизации.

lock (sync_obj) 
{ 
  counter++; 
} 
// можно выполнить с помощью атомарного оператора 
Interlocked.Increment(ref counter); 
  
Атомарные операторы являются неблокирующими - поток не выгружается и не ожидает, поэтому обеспечивают высокую эффективность. Выполнение оператора Interlocked занимает вдвое меньшее время, чем выполнение критической секции с lock-блокировкой без конкуренции.

Оператор Interlocked.CompareExchange позволяет атомарно выполнить конструкцию "проверить-присвоить":

lock(LockObj) 
{ 
  if(x == curVal) 
    x = newVal;   
} 

oldVal = Interlocked.CompareExchange(ref x, newVal, curVal); 
  
Если значение переменной x равно значению, задаваемому третьим аргументом curVal, то переменной присваивается значение второго аргумента newVal. Возвращаемое значение позволяет установить, осуществилась ли замена значения.

Атомарный оператор Read предназначен для потокобезопасного чтения 64-разрядных целых чисел (Int64). Чтение значений типа long (Int64) на 32-разрядной вычислительной системе не является атомарной операцией на аппаратном уровне. Поэтому многопоточная работа с 64-разрядными переменными может приводить к некорректным результатам. В следующем фрагменте проиллюстрируем проблему чтения переменных типа Int64:

Int64 bigInt = Int64.MinValue; 
Thread t = new Thread(() => { 
      while(true) { 
        if(bigInt == Int64.MinValue) 
          bigInt = Int64.MaxValue; 
        else 
          bigInt = Int64.MinValue; 
      }); 
}
t.Start(); 
List<Int64> lstBig = new List<Int64>(); 
for(int i=0; i < 1000; i++) 
{ 
  Thread.Sleep(100); 
  lstBig.Add(bigInt); 
} 
t.Abort(); 

Console.WriteLine("Distinct values: " 
 + lstBig.Distinct().Count()); 
lstBig.Distinct().AsParallel().ForAll(Console.WriteLine); 
  
В этом примере значение переменной bigInt изменяется только в одном потоке. Основной поток периодически читает текущие значения bigInt. Поток t циклически меняет значение переменной bigInt с MinValue на MaxValue и с MaxValue на MinValue. Тем не менее, вывод показывает, что основной поток прочитал и другие значения. Эти "промежуточные" значения появились из-за не атомарности действий над 64-разрядными переменными – пока основной поток прочитал первые 32 бита числа, дочерний поток изменил следующие 32 бита. Предпоследняя строчка выводит число различных значений переменной bigInt, прочитанных в основном потоке. Последняя строчка выводит на консоль все различные значения.

Distinct values: 4 
-9223372036854775808 
-9223372032559808513 
9223372036854775807 
9223372032559808512 
  
Для устранения проблемы необходимо сделать атомарным запись и чтение переменной bigInt:

Int64 bigInt = Int64.MinValue; 
Thread t = new Thread(() => { 
  Int64 oldValue = Interlocked.CompareExchange(ref bigInt,  
      Int64.MinValue, Int64.MaxValue); 
  Interlocked.CompareExchange(ref bigInt,  
       Int64.MaxValue, oldValue); 
      }); 
t.Start(); 
List<Int64> lstBig = new List<64>(); 
for(int i=0; i < 1000; i++) 
{ 
  Thread.Sleep(100); 
  lstBig.Add(Interlocked.Read(ref bigInt)); 
} 
t.Abort(); 

Console.WriteLine("Distinct values: " 
     + lstBig.Distinct().Count()); 
lstBig.Distinct().AsParallel().ForAll(Console.WriteLine); 

Изменение bigInt реализовано с помощью двух операторов CompareExchange. Первый оператор пытается присвоить MinValue, если текущее значение равно MaxValue. Оператор возвращает старое значение. Сравнивая текущее со старым значением, определяем, произошло ли изменение. Если изменения не было, то присваиваем максимальное значение MaxValue. Атомарное чтение реализовано с помощью оператора Interlocked.Read. Вывод результатов свидетельствует о решении проблемы:

Distinct value: 2 
-9223372036854775808 
9223372036854775807 
  
Операции над 64 разрядными целыми на 64-разрядной системе являются атомарными на аппаратном уровне, поэтому не требуют средств синхронизации при параллельной записи и чтении. Но при параллельной записи в нескольких потоках, возникает проблема гонки данных.

Вопросы

  1. Можно ли организовать работу нескольких потоков без средств синхронизации?
  2. Может ли многопоточное приложение, использующее только конструкции lock, войти в состояние взаимоблокировки?
  3. Какие средства синхронизации позволяют реализовать функциональность критической секции? В каких случаях следует отдавать предпочтение тому или иному объекту?

Упражнения

1) Исследуйте эффективность легковесных средств синхронизации по сравнению с аналогичными объектами ядра операционной системы: SemaphoreSlim – Semaphore, ManualResetEvent – ManualResetEventSlim.

2) Для анализа можно использовать задачу обращения к разделяемому счетчику:
voidThreadFunc() {
// Вход в критическую секцию с помощью 
// того или иного средства синхронизации
totalCount++;
// Выход из критической секции
}

3) Исследуйте эффективность потокобезопасных коллекций по сравнению с синхронизированным доступом к обычным коллекциям.

4) Исследуйте эффективность атомарных операторов по сравнению со средствами организации критической секции (lock, Monitor, Mutex).

5) Самостоятельно освойте работу с объектами, реализующими типовые схемы синхронизации, CountdownEvent и Barrier. Реализуйте функциональность этих объектов с помощью средств синхронизации, рассмотренных в лекции. Исследуйте эффективность и удобство работы объектов CountdownEvent и Barrier.