33 Возможности библиотеки Task Parallel Library. Синхронизация доступа буферу

Синхронизация доступа к одноэлементному буферу

Задача

Несколько потоков работают с общим одноэлементным буфером. Потоки делятся на "писателей", осуществляющих запись сообщений в буфер, и "читателей", осуществляющих извлечение сообщений из буфера. Только один поток может осуществлять работу с буфером. Если буфер свободен, то только один писатель может осуществлять запись в буфер. Если буфер занят, то только один читатель может осуществлять чтение из буфера. После чтения буфер освобождается и доступен для записи. В качестве буфера используется глобальная переменная, например, типа string. Работа приложения заканчивается после того, как все сообщения писателей через общий буфер будут обработаны читателями.




Задание

  1. Реализуйте взаимодействие потоков-читателей и потоков-писателей с общим буфером без каких-либо средств синхронизации. Проиллюстрируйте проблему совместного доступа. Почему возникает проблема доступа?
  2. Реализуйте доступ "читателей" и "писателей" к буферу с применением следующих средств синхронизации:
    • блокировки (lock);
    • сигнальные сообщения (ManualResetEvent, AutoResetEvent, ManualResetEventSlim);
    • семафоры (Semaphore, SemaphoreSlim).
    • атомарные операторы (Interlocked)
  3. Исследуйте производительность средств синхронизации при разном числе сообщений, разном объеме сообщений, разном числе потоков.
  4. Сделайте выводы об эффективности применения средств синхронизации.

Методические указания

В случае одноэлементного буфера достаточно использовать флаг типа bool для контроля состояния буфера. Читатели обращаются к буферу, только если он свободен:

  
// Работа читателя  
while (!finish)  
         {  
          if (!bEmpty)
          {
            MyMessages.Add(buffer);
            bEmpty = true;
          }
        }
      
    
Писатели обращаются к буферу, только если он пуст:

// Работа писателя 
while(i < n) 
  if (bEmpty) 
  { 
    buffer = MyMessages[i++]; 
    bEmpty = false; 
  } 

    
Писатели работают, пока не запишут все свои сообщения. По окончании работы писателей основной поток может изменить статус переменной finish, который является признаком окончания работы читателей.

static void Main()  
  // Запускаем читателей и писателей 
  ..  
  // Ожидаем завершения работы писателей 
  for(int i=0; i< writers.Length; i++) 
    writers[i].Join(); 
  // Сигнал о завершении работы для читателей 
  finish = true; 
      
  // Ожидаем завершения работы читателей 
  for(int i=0; i< readers.Length; i++) 
    readers[i].Join(); 
      
      
    
Отсутствие средств синхронизации при обращении к буферу приводит к появлению гонки данных – несколько читателей могут прочитать одно и то же сообщение, прежде чем успеют обновить статус буфера; несколько писателей могут одновременно осуществить запись в буфер. В данной задаче следствием гонки данных является потеря одних сообщений и дублирование других. Для фиксации проблемы предлагается выводить на экран число повторяющихся и потерянных сообщений.

Самый простой вариант решения проблемы заключается в использовании критической секции (lock или Monitor).

// Работа читателя 
while (!finish) 
{   
  lock ("read") 
  { 
    if (!bEmpty) 
    { 
      MyMessage[i++] = buffer; 
      bEmpty = true; 
    }     
  } 
      
    
Для писателей существует своя критическая секция:

// Работа писателя 
while(i < n) 
  lock("write") 
  { 
    if (bEmpty) 
    { 
      buffer = MyMessage[i++]; 
      bEmpty = false; 
    } 
  } 
    
Данная реализация не является оптимальной. Каждый из читателей поочередно входит в критическую секцию и проверяет состояние буфера, в это время другие читатели блокируются, ожидая освобождения секции. Если буфер свободен, то синхронизация читателей избыточна. Более эффективным является вариант двойной проверки:

// Работа читателя 
  while (!finish) 
  {  if (!bEmpty) 
    { 
      lock ("read") 
      { 
        if (!bEmpty) 
        { 
          bEmpty = true; 
          MyMessage[i++] = buffer; 
        }     
      } 
    } 
  } 
    
Если буфер свободен, то читатели "крутятся" в цикле, проверяя состояние буфера. При этом читатели не блокируются. Как только буфер заполняется, несколько читателей, но не все, успевают войти в первый if-блок, прежде чем самый быстрый читатель успеет изменить статус буфера bEmpty = true.

Применение сигнальных сообщений позволяет упростить логику синхронизации доступа. Читатели ожидают сигнала о поступлении сообщения, писатели – сигнала об опустошении буфера. Читатель, освобождающий буфер, сигнализирует об опустошении. Писатель, заполняющий буфер, сигнализирует о наполнении буфера. Сообщения с автоматическим сбросом AutoResetEvent обладают полезным свойством – при блокировке нескольких потоков на одном и том же объекте AutoResetEvent появление сигнала освобождает только один поток, другие потоки остаются заблокированными. Порядок освобождения потоков при поступлении сигнала не известен, но в данной задаче это не существенно.

// Работа читателя 
void Reader(object state) 
  var evFull = state[0] as AutoResetEvent; 
  var evEmpty = state[1] as AutoResetEvent; 
  while(!finish) 
  {   
    evFull.WaitOne(); 
    MyMessage.Add(buffer); 
    evEmpty.Set(); 
  } 
    
// Работа писателя 
void Writer(object state) 
  var evFull = state[0] as AutoResetEvent; 
  var evEmpty = state[1] as AutoResetEvent; 
  while(i < n) 
  { 
    evEmpty.WaitOne();
    buffer = MyMessage[i++]; 
    evFull.Set(); 
  } 
}
    
Данный фрагмент приводит к зависанию работы читателей. Писатели закончили работу, а читатели ждут сигнала о наполненности буфера evFull. Для разблокировки читателей необходимо сформировать сигналы evFull.Set() от писателей при завершении работы или от главного потока. Чтобы отличить ситуацию завершения можно осуществлять проверку статуса finish непосредственно после разблокировки.

  // Рабочий цикл читателей 
  while(true) 
  {   
    evFull.Wait(); 
    // Сигнал о завершении работы 
    if(finish) break; 
    MyMessage.Add(buffer); 
    evEmpty.Set(); 
  } 
    
Применение семафоров (Semaphore, SemaphoreSlim) в данной задаче аналогично использованию сигнальных сообщений AutoResetEvent. Кроме предложенного варианта обмена сигналами между читателями и писателями, семафоры и сигнальные сообщения могут использоваться в качестве критической секции читателей и писателей.

void Reader(object state) 
  var semReader = state as SemaphoreSlim; 
  while(!finish) 
  { 
    if(!bEmpty) 
    { 
      semReader.Wait(); 
      if(!bEmpty) 
      { 
        bEmpty = true; 
        myMessages.Add(buffer); 
      }     
      semReader.Release(); 
    } 
  } 
      
void Writer(object state) 
  var semWriter = state as SemaphoreSlim; 
  while(i < myMessages.Length) 
  { 
    if(bEmpty) 
    { 
      semWriter.Wait(); 
      if(bEmpty) 
      { 
        bEmpty = false; 
        buffer = myMessages[i]; 
      }     
      semWriter.Release(); 
    } 
  } 
      
    

Вопросы и упражнения

  1. Почему проблема гонки данных проявляется не при каждом прогоне?
  2. Какие факторы увеличивают вероятность проявления проблемы гонки данных?
  3. Возможно ли в данной задаче при отсутствии средств синхронизации возникновение исключения и аварийное завершение программы?
  4. Можно ли в данной задаче использовать атомарные операторы для обеспечения согласованности доступа? Необходимы ли при этом дополнительные средства синхронизации?
  5. Можно ли в данной задаче использовать потокобезопасные коллекции для обеспечения согласованного доступа?
  6. Какие средства синхронизации обеспечивают наилучшее быстродействие в данной задаче? Объясните с чем это связано.

Синхронизация приоритетного доступа к многоэлементному буферу

Задача

Несколько потоков работают с общим многоэлементным буфером. Потоки делятся на "читателей" и "писателей", каждый поток обладает приоритетом. Писатели осуществляют запись в буфер, если есть свободные ячейки. Читатели извлекают содержимое буфера, если есть заполненные ячейки. Работа приложения заканчивается после того, как все сообщения писателей будут обработаны читателями через общий буфер. В качестве буфера используется "кольцевой массив".


Задания

  1. Реализуйте синхронизированное взаимодействие читателей и писателей с учетом приоритета. Аргументируйте выбор средств синхронизации.
  2. Вывод программы включает: время работы каждого писателя и читателя; число сообщений, обработанных каждым писателем и читателем.
  3. Выполните прогон программы при разных параметрах: разном числе писателей и читателей, разном объеме сообщений, разных приоритетах потоков. Результаты прогонов представьте в табличной форме.

Методические указания

В качестве многоэлементного буфера используется кольцевой массив. Он представляет собой обычный массив размера n. Буфер называется кольцевым, так как при смещении текущего индекса после крайнего элемента следует первый. Для доступа к буферу используются два индекса: один для чтения и один для записи. Такая организация обеспечивает независимость операций чтения и записи – если в массиве есть свободные элементы и есть заполненные элементы, то операции чтения и записи могут производиться одновременно без каких-либо средств синхронизации.

В начале работы буфер является пустым – оба индекса указывают на первый элемент. При осуществлении операций чтения или записи соответствующие индексы смещаются.


Операция чтения блокируется, если буфер пуст, запись при этом разрешена. Операция записи блокируется, если буфер полностью заполнен, чтение при этом разрешено. Равенство индексов чтения и записи является признаком и занятости буфера, и пустоты. Чтобы различать эти ситуации необходимо контролировать, какая операция привела к равенству индексов. Если операция записи, то буфер заполнен.


Операции чтения и записи могут быть реализованы следующим образом:

bool Write(string Msg)
 if(bFull) 
  return false; 
 buffer[iW] = Msg; 
 iW = (iW + 1) % n; 
 // Если индексы совпали после записи, 
 // буфер заполнен 
  if(iW == iR) 
  bFull = true; 
 return true;  
bool Read(ref string Msg) 
    { 
     // Если индексы совпадают, но не после операции записи 
     // буфер пуст 
     if(iW == iR && !bFull) 
  return false; 
     Msg = buffer[iR]; 
     iR = (iR + 1) % n; 
     // Если буфер был заполнен, то снимаем отметку 
     if(bFull)  
   bFull = false; 
     return true; 
    }
  
Главный поток контролирует статус завершения операций чтения и записи. Если операция чтения не выполнена, то поток читателя блокируется.

Ситуация усложняется, если доступ к буферу осуществляют несколько читателей и несколько писателей. Один из вариантов решения проблемы – добавить конструкции критической секции в функции чтения и записи.

Другой подход заключается в реализации схемы "управляющий-рабочие", где управляющий контролирует все операции, требующие синхронизации. Рабочие потоки (читатели и писатели) обращаются к управляющему (основной поток) с сигналом о готовности осуществлять операцию чтения или записи. Управляющий поток фиксирует обращения читателей и писателей, вычисляет текущие индексы для чтения и записи, контролирует состояние буфера (полностью заполнен или полностью пуст), выбирает читателя и писателя, которым разрешает доступ. Операции чтения и записи по корректным индексам, полученным от управляющего потока, осуществляются читателями и писателями уже без контроля.

Взаимодействие рабочих и управляющего удобно организовать с помощью сигнальных сообщений типа ManualResetEventSlim.

Сигналы о готовности evReadyToRead, evReadyToWrite генерируют читатели и писатели, готовые осуществлять операции с буфером. Управляющий контролирует состояние сигналов у каждого рабочего.

Сигналы о возможности операций чтения и записи evStartReading, evStartWriting генерируются управляющим потоком конкретным читателям и писателям. Перед генерацией сигналов управляющий вычисляет индекс чтения или записи и сохраняет его в индивидуальной ячейке конкретного рабочего.

Такая организация взаимодействия позволяет достаточно легко изменять правила доступа: вводить приоритеты читателей и писателей, учитывать время обращения к управляющему потоку и обеспечивать "справедливость" доступа в плане очередности.

void ReaderThread(int iReader,  
  ManualResetEventSlim evReadyToRead, 
  ManualResetEventSlim evStartReading) 
 // Инициализация внутреннего буфера 
 var Messages = new List<string>(); 
 // Рабочий цикл чтения 
 while(true) 
  { 
     // Сигнализирует о готовности  
     evReadyToRead.Set(); 
     // Ждем сигнала от менеджера 
     evStartReading.Wait(); 
     // Разрешено чтение по текущему индексу 
     int k = ReadIndexCopy[iReader]; 
     Messages.Add(buffer[k]); 
    // Сбрасываем сигнал о чтении 
     evStartReading.Reset(); 
     // Проверяем статус завершения работы 
     if (finish) break; 
  } 
// Код писателя практически идентичен коду читателя 
void WriterThread(int iWriter,  
 ManualResetEventSlim evReadyToWrite,  
 ManualResetEventSlim evStartWriting) 
 // Инициализация массива сообщений писателя  
 Messages = .. 
 // Рабочий цикл записи 
 while(true) 
  { 
     // Сигнализируем о готовности менеджеру 
     evReadyToWrite.Set(); 
     // Ждем сигнала от менеджера 
     evStartWriting.Wait(); 
     // Разрешена запись по текущему индексу 
     k = WriteIndexCopy[iWriter]; 
     buffer[k] = Messages[j]; 
     // Проверяем статус завершения работы 
     if (finish || j >= Messages.Length) 
   break; 
     j++ 
     } 
  } 
   // Код менеджера 
   void Manager(int nReaders, int nWriters) 
   { 
    // Запуск читателей 
    for(int i=0; i<nReaders; i++) 
    { 
     evReadyToRead[i] =  
     new ManualResetEventSlim(false); 
     evStartReading[i] =  
      new ManualResetEventSlim(false); 
     tReaders[i] = new Task( () =>  
     Reader(i, evReadyToRead[i], evStartReading[i]));  
     tReaders[i].Start(); 
    } 
    // Запуск писателей  
    for(int i=0; i < nWriters; i++) 
    { 
     var evReadyToWrite[i] = 
      new ManualResetEventSlim(false); 
     var evStartWriting[i] =  
      new ManualResetEventSlim(false); 
     tWriters[i] = new Task( () => 
     Writer(i, evReadyToWrite[i], evStartWriting[i])); 
     tWriters[i].Start(); 
    } 
    // Рабочий цикл  
    while(true) 
    { 
     // Если в буфере есть свободные ячейки 
     // пытаемся обработать готовых писателей 
     if(!bFull) 
     { 
      // Получаем текущий индекс записи 
      iW = GetBufferWriteIndex(); 
      if(iW != -1) 
      { 
       // Устанавливаем писателя, 
       // которому разрешаем работать 
       iWriter = GetWriter();  
       if (iWriter != -1) 
       { 
       // Сбрасываем сигнал готовности 
        // выбранного писателя 
        evReadyToWrite[iWriter].Reset(); 
        // Сохраняем копию индекса для записи 
        ReadIndexCopy[iWriter] = iW; 
        // Разрешаем писателю начать работу 
       evStartWriting[iWriter].Set(); 
      } 
      } 
      else 
      bFull = true; 
     } 
     // Если буфер не пуст, пытаемся 
     // обработать готовых писателей 
     if(!bEmpty) 
     { 
      // Получаем текущий индекс для чтения 
      iR = GetBufferReadIndex(); 
      if(iR != -1) 
      { 
       //Устанавливаем готового читателя 
       iReader = GetWriter(); 
       if (iReader != -1) 
       { 
        evReadyToRead[iReader].Reset(); 
        WriteIndexCopy[iReader] = iR; 
       evStartReading[iReader].Set(); 
       } 
      } 
      else 
       bEmpty = false; 
     } 
    } 
   } 
   // Код функции получения номера готового писателя 
   // с учетом приоритетов 
   int GetWriter() 
   { 
    // Устанавливаем готовых писателей 
    var ready = new List<int>(); 
    for(int i=0; i<nWriter; i++) 
     if(evReadyToWrite[i].IsSet()) 
      ready.Add(i); 
     if(ready.Count == 0) 
      return -1; 
    return ready.OrderBy(i => WriterPriority[i]).First();    
   } 
  

Вопросы и упражнения

  1. Можно ли вместо объектов ManualResetEventSlim использовать другие типы сигнальных сообщений: AutoResetEvent или ManualResetEvent?
  2. Какие особенности задачи не позволяют использовать объект ReaderWriterSlim?
  3. Почему структура кольцевого буфера не требует синхронизации при работе одного читателя и одного писателя?
  4. Почему в предложенной реализации не используются критические секции?
  5. Реализуйте учет времени обращения рабочих потоков к буферу.
  6. Реализуйте решение задачи с использованием конкурентных коллекций в качестве буфера.