Программирование PC Потоки на C#. Часть 4: продвинутое использование Thu, September 12 2024  

Поделиться

Нашли опечатку?

Пожалуйста, сообщите об этом - просто выделите ошибочное слово или фразу и нажмите Shift Enter.

Потоки на C#. Часть 4: продвинутое использование Печать
Добавил(а) microsin   

[Не блокирующая синхронизация]

Ранее было показано [2], что необходимость синхронизации возникает даже в простом случае присвоения или инкременте поля. Блокировка всегда может удовлетворить этому требованию; однако если существует ситуация, когда в особо критических по производительности сценариях множество потоков конкурируют друг с другом на этой блокировке, то это означает, что происходят чрезмерные затраты на переключение контекста и соответственно добавляются нежелательные задержки. Конструкции не блокирующей синхронизация .NET Framework могут производить простые операции без какого-либо блокирования, постановки на паузу или ожидания.

Правильное написание не блокирующего или свободного от блокировок многопоточного кода довольно хитрое! Барьеры памяти, в частности, это часто самое частое попадалово на баги (примерно то же самое с ключевым словом volatile, с которым тоже проще всего накосячить [5]). Дважды подумайте - действительно ли Вам нужно получить выгоду по быстродействию, прежде чем решите отказаться от обычных блокировок. Помните о том, что захват и освобождение блокировки, на которую нет конкурентного запроса, в эру 2010 годов займет на обычных десктопах время порядка 20 нс.

Не блокирующая синхронизация также работает и между несколькими процессами, т. е. глобально на компьютере PC Windows. Пример того, где это может быть полезно - чтение и запись общей для процессов памяти (process-shared memory).

Барьеры памяти и изменчивость данных (volatility). Рассмотрим следующий пример:

class Foo
{
   int _answer;
   bool _complete;
 
   void A()
   {
      _answer = 123;
      _complete = true;
   }
 
   void B()
   {
      if (_complete) Console.WriteLine (_answer);
   }
}

Если методы A и B запускаются конкурентно в разных потоках, то есть ли возможность для B вывести "0"? Ответ будет "да" по следующим причинам:

• Компилятор, CLR или CPU могут поменять выполнение инструкций программы для повышения эффективности.
• Компилятор, CLR или CPU могут ввести оптимизацию кэша, при которой присвоение значений переменным не будет гарантированно немедленно быть видимым для других потоков.

C# и подсистема выполнения кода реального времени тщательно гарантируют, чтобы такие оптимизации не нарушили работу обычного однопоточного кода - или многопоточного кода, который правильно использует блокировки. Вне этих сценариев Вы должны явно защищаться от таких оптимизаций созданием барьеров памяти (которые также называют "заборами" памяти, memory fences), чтобы ограничит эффекты переупорядочивания выполнения инструкций и кэширования чтения/записи.

Полный барьер памяти. Самый простой барьер памяти это полный барьер (full fence), который предотвращает любой вид переупорядочивания выполнения инструкций или кэширование вокруг барьера. Вызов Thread.MemoryBarrier генерирует полный барьер (full fence); мы можем исправить наш пример путем наложения четырех полных барьеров следующим образом:

class Foo
{
   int _answer;
   bool _complete;
 
   void A()
   {
      _answer = 123;
      Thread.MemoryBarrier();       // Барьер 1
      _complete = true;
      Thread.MemoryBarrier();       // Барьер 2
   }
 
   void B()
   {
      Thread.MemoryBarrier();       // Барьер 3
      if (_complete)
      {
         Thread.MemoryBarrier();    // Барьер 4
         Console.WriteLine (_answer);
      }
   }
}

Барьеры 1 и 4 не дадут этому примеру вывести "0". Барьеры 2 и 3 предоставляют гарантию "свежести": если B запустится после A, чтение _complete будет вычислено как true.

Полный барьер эру десктопов 2010 года занимает по времени около 10 наносекунд.

Следующие конструкции неявно генерируют полный барьер:

• Оператор блокировки lock (Monitor.Enter/Monitor.Exit).
• Все методы класса Interlocked (что будет рассмотрено далее).
• Асинхронные функции обратного вызова (callback), которые используют пул потоков. Это включает асинхронные делегаты, функции обратного вызова APM, продолжения Task.
• Установка и ожидание конструкции сигнализации.
• Все что полагается на сигнализацию, что запускает Task и ждет Task.

На основании последнего следующий код безопасен для многопоточности:

int x = 0;
Task t = Task.Factory.StartNew (() => x++);
t.Wait();
Console.WriteLine (x);    // 1

Необязательно нужен полный барьер с каждым отдельным чтением или записью. Если у нас есть три поля _answerX, то нам все еще нужно применить только 4 барьера:

class Foo
{
   int _answer1, _answer2, _answer3;
   bool _complete;
 
   void A()
   {
      _answer1 = 1; _answer2 = 2; _answer3 = 3;
      Thread.MemoryBarrier();
      _complete = true;
      Thread.MemoryBarrier();
   }
 
   void B()
   {
      Thread.MemoryBarrier();
      if (_complete)
      {
         Thread.MemoryBarrier();
         Console.WriteLine (_answer1 + _answer2 + _answer3);
      }
   }
}

Хороший способ работы с барьерами - помещать их перед и после каждой инструкции, которая записывает или читает общее поле, и затем выводить за пределы барьеров все, что не нужно для этой цели. Если не уверены в каком-то коде, оставьте его внутри барьера. Или, что лучше: вернитесь к использованию блокировок!

Работать с общими записываемыми переменными (полями) без блокировок или барьеров означает искать себе лишних приключений. Есть много вводящей в заблуждение информации по этой теме (включая документацию MSDN), где утверждается, что MemoryBarrier требуется только на многопроцессорных системах со слабым упорядочиванием памяти, таких как системы с несколькими процессорами Itanium. В следующей короткой программе мы можем продемонстрировать, что барьеры памяти важны и на обычных процессорах Intel Core-2 и Pentium. Вам нужно запустить этот код с разрешенными оптимизациями и без отладчика (в Visual Studio выберите режим компиляции Release, после чего запустите скомпилированную программу без отладки):

static void Main()
{
   bool complete = false; 
   var t = new Thread (() =>
   {
      bool toggle = false;
      while (!complete) toggle = !toggle;
   });
   t.Start();
   Thread.Sleep (1000);
   complete = true;
   t.Join();            // Бесконечная блокировка
}

Эта программа никогда не завершится, потому что переменная complete кэшируется в регистре CPU. Если вставить вызов Thread.MemoryBarrier внутрь цикла while (или вокруг чтения переменной complete), то эта ошибка исправится.

Ключевое слово volatile. Другой (более продвинутый, но менее понятный) способ решить эту проблему - применить ключевое слово volatile к полю _complete:

volatile bool _complete;

Ключевое слово volatile инструктирует компилятор генерировать барьер на исключительный захват (acquire-fence) на каждом чтении из этого поля, и освобождение барьера (release-fence) на каждой записи в это поле. Acquire-fence предотвращает ситуации, когда другие операции чтения/записи происходят перед барьером; release-fence предотвращает ситуации, когда другие чтения/записи перемещаются за барьер. Такой "половинчатый" барьер работает быстрее "полного", потому что дает расширенные возможности по оптимизации для кода реального времени и аппаратуры.

Так уж получилось, что процессоры Intel X86 и X64 всегда применяют барьеры acquire-fence для чтений и release-fence для записей - независимо от того, используете ли Вы ключевое слово volatile или нет, т. е. оно не дает эффекта в аппаратуре, если Вы используете эти процессоры. Однако volatile работает на оптимизациях, выполняемых компилятором и CLR - как на 64-битных процессорах AMD, так и (в большей степени) на процессорах Itanium. Это означает, что не следует расслабляться на основании каких-то соображений, что Ваши клиенты используют только определенный "проверенный" тип CPU.

И еще: даже если Вы используете volatile, все равно нужно быть в здоровом тонусе беспокойства, что мы скоро увидим. Короче говоря, volatile на панацея от всех проблем!

Эффект от применения volatile можно обобщить следующей таблицей:

Первая инструкция Вторая инструкция Могут ли они поменять порядок выполнения?
Read Read нет
Read Write нет
Write Write нет (CLR гарантирует, что операции запись-запись никогда не поменяют порядок своего выполнения, даже если не применялось ключевое слово volatile)
Write Read ДА!

Имейте в виду, что применение volatile не защит от записи, за которым идет чтение, от их перестановки, и это может создать головоломку. Joe Duffy хорошо иллюстрирует проблему на следующем примере: если Test1 и Test2 запущены одновременно в разных потоках, то может произойти ситуация, когда и a, и b оба завершились со значением 0 (несмотря на использование volatile на обоих переменных x и y):

class IfYouThinkYouUnderstandVolatile
{
   volatile int x, y;
 
   void Test1()      // Работает в одном потоке
   {
      x = 1;         // Volatile write (release-fence)
      int a = y;     // Volatile read (acquire-fence)
      ...
   }
 
   void Test2()      // Работает в другом потоке
   {
      y = 1;         // Volatile write (release-fence)
      int b = x;     // Volatile read (acquire-fence)
      ...
   }
}

Документация MSDN утверждает, что ключевое слово volatile гарантирует, что в поле всегда присутствует самое актуальное значение. Как мы увидели, это неверно, порядок действий "запись потом чтение" может быть изменен.

Это дает веские доводы избегать volatile: даже если Вы понимаете, что на самом деле происходит в этом примере, поймут ли это также и другие разработчики, работающие с подобным кодом? Полный барьер (full fence) между каждым из этих присваиваний в Test1 и Test2 (или традиционная блокировка) решит эту проблему.

Ключевое слово volatile не поддерживается в аргументах, передаваемых по ссылке, или в захваченных локальных переменных (captured local variables): в этих случаях нужно использовать методы VolatileRead и VolatileWrite.

VolatileRead, VolatileWrite. Статические методы VolatileRead и VolatileWrite в классе Thread читают и записывают переменную с гарантией (технически более высокой), которая предоставляет ключевое слово volatile. Их реализации относительно не эффективны, хотя они в действительности генерируют полные барьеры. Вот их полные реализации для целочисленного типа:

public static void VolatileWrite (ref int address, int value)
{
   MemoryBarrier(); address = value;
}
 
public static int VolatileRead (ref int address)
{
   int num = address; MemoryBarrier(); return num;
}

Вы можете видеть, что если произойдет вызов VolatileWrite, за которым будет идти вызов VolatileRead, то между ними не будет сгенерирован барьер: это подразумевает тот же сценарий головоломки, который мы выдели ранее.

Барьеры памяти и блокировка. Как мы заметили раньше, Monitor.Enter и Monitor.Exit оба генерируют полные барьеры. Поэтому если мы игнорируем гарантию блокировки взаимного исключения, то можно сказать, что:

lock (какое_нибудь_поле) { ... }

является эквивалентом этого:

Thread.MemoryBarrier(); { ... } Thread.MemoryBarrier();

Класс Interlocked. Использовать барьеры памяти не всегда достаточно, когда код без блокировок читает или записывает поля. Операции с 64-битными полями, инкременты и декременты с ними требуют более жесткой обработки с помощью вспомогательного класса Interlocked. Interlocked также предоставляет методы Exchange и CompareExchange, последний позволяет операции read-modify-write без блокировки, с применением малого по объему кода.

Оператор изначально атомарный, если он выполняется как одна неделимая инструкция на нижележащем процессоре. Строгая атомарность оператора исключает любую возможность вытеснения во время его выполнения. Простое чтение или запись поля 32 бита или меньше всегда атомарное, потому что разрядность регистров процессора как правило не меньше 32 бит. Операции с 64-битными полями гарантированно атомарными будут только в 64-битной среде выполнения, и операторы, которые комбинируют больше одной операции чтения/записи никогда не будут атомарными:

class Atomicity
{
   static int _x, _y;
   static long _z;
 
   static void Test()
   {
      long myLocal;
      _x = 3;        // Атомарная операция
      _z = 3;        // Не атомарная операция в 32-битном окружении 
                     // (потому что поле _z имеет разрядность 64 бита)
      myLocal = _z;  // Не атомарная операция в 32-битном окружении
      _y += _x;      // Не атомарная операция (состоит из операций чтения И записи)
      _x++;          // Не атомарная операция (состоит из операций чтения И записи)
   }
}

Чтение и запись 64-битных полей не атомарные в 32-битном окружении, потому что они требуют выполнения 2 отдельных инструкций: по одной для каждой 32-битной ячейки памяти. Таким образом, если поток X читает 64-битное значение, когда поток Y обновляет его, то вытеснение потока X потоком Y (или наоборот) может привести к непредсказуемым результатам операции чтения: может получиться так, что результатом будет битовая комбинация новых и старых значений (рваное чтение).

Компилятор реализует унарные операторы вида x++ путем чтения переменной, её обработки и затем записи обратно в ту же ячейку памяти. Рассмотрим следующий класс:

class ThreadUnsafe
{
   static int _x = 1000;
   static void Go() { for (int i = 0; i < 100; i++) _x--; }
}

Отложив проблему барьеров памяти, Вы могли бы ожидать, что если 10 потоков одновременно запустят Go, то последний декремент _x должен закончиться на значении 0. Однако это не гарантируется, потому что событие гонки между потоками приведет к тому, что какой-то поток произведет вытеснение во время выполнения унарного оператора декремента _x--, в результате чего значение общего поля _x станет полностью недостоверным.

Конечно, эту проблему можно решить, обернув не атомарные операции оператором блокировки. Блокировка, если она применена правильно, фактически симулирует атомарность. Однако класс Interlocked предоставляет более простое и быстрое решение для таких простых операций:

class Program
{
   static long _sum;
 
   static void Main()
   {                                                              // _sum
      // Простые операции инкремента/декремента:
      Interlocked.Increment (ref _sum);                           // 1
      Interlocked.Decrement (ref _sum);                           // 0
 
      // Добавление/вычитание значения:
      Interlocked.Add (ref _sum, 3);                              // 3
 
      // Чтение 64-битного поля:
      Console.WriteLine (Interlocked.Read (ref _sum));            // 3
 
      // Запись 64-битного поля с предварительным чтением предыдущего значения.
      // Эта операция выведет "3", после чего обновит _sum значением 10:
      Console.WriteLine (Interlocked.Exchange (ref _sum, 10));    // 10
 
      // Обновит поле только в том случае, если оно совпадает с определенным
      // значением (10):
      Console.WriteLine (Interlocked.CompareExchange (ref _sum,
                                                      123, 10);   // 123
   }
}

Все методы класса Interlocked генерируют полный барьер. Таким образом поля, к которым Вы осуществляете доступ через Interlocked, не нуждаются в дополнительных барьерах памяти - если в других местах программы к ним не осуществляется доступ без Interlocked или блокировки.

Математические операции Interlocked ограничены инкрементом (Increment), декрементом (Decrement), и добавлением (Add). Если Вы хотите применить умножение - или выполнить другое вычисление - то можете выполнить это в стиле без блокировок использованием метода CompareExchange (обычно вместе с циклическим ожиданием). Пример будет дан в документации по параллельному программированию [6].

Interlocked работает по принципу предоставления информации об атомарности для операционной системы и виртуальной машины.

Методы Interlocked обычно вовлекают дополнительную нагрузку 10 нс, половина которой - вдвое меньше, чем обычная блокировка, на которой нет конкурентного ожидания. Кроме того, использование методов Interlocked не вносит дополнительных затрат на переключение контекста из-за блокировки. Обратная сторона медали - Interlocked внутри цикла с большим количеством итераций менее эффективно, чем одна блокировка вокруг всего цикла (хотя Interlocked позволяет улучшить конкурентность выполнения потоков).

[Обмен сигналами через Wait и Pulse]

Ранее мы обсуждали события с обработкой ожидания (Event Wait Handles [3]) - простой механизм сигнализации, где поток блокируется до момента получения оповещения от другого потока.

Более мощная конструкция сигнализации предоставляется классом Monitor через статические методы Wait и Pulse (и PulseAll). Принцип состоит в том, что Вы пишете логику сигнализации самостоятельно, используя свои флаги и поля (обрамленные операторами блокировки lock), и затем предоставляете команды Wait и Pulse, чтобы предотвратить циклы ожидания. С этими же методами и оператором блокировки lock Вы можете достичь функциональности AutoResetEvent, ManualResetEvent и Semaphore, как и (с некоторыми оговорками) статических методов WaitAll и WaitAny дескриптора ожидания WaitHandle.

Однако сигнализация Wait и Pulse имеет некоторые недостатки по сравнению с дескрипторами событий ожидания:

• Wait/Pulse не могут пересекать домены приложения (или процессы) на компьютере.
• Вам нужно помнить о защите всех переменных, связанных с логикой сигнализации с блокировками.
• Программы Wait/Pulse могут запутать разработчиков, полагающихся на документацию Microsoft.

Проблема этой документации возникает потому, что не очевидно, как предполагается использовать Wait и Pulse, даже когда Вы прочитали о том, как они работают. Wait и Pulse также имеют особую ненависть к дилетантам: будут искать любые прорехи в понимании их принципа работы, после чего будут извращенно над Вами издеваться! К счастью, есть простые примеры использования Wait и Pulse.

С точки зрения производительности вызов Pulse занимает несколько сотен наносекунд в эпоху компьютеров 2010-х годов - около трети времени, которое занимает вызов Set на дескрипторе ожидания. Дополнительная нагрузка на ожидание сигнала, когда нет конкурентного доступа, полностью зависит от Вас - потому что именно Вы сами реализуете логику с обычными полями и переменными. На практике это очень просто, и вычислительные расходы не превышают время, затрачиваемое на взятие блокировки.

Как использовать Wait и Pulse. Рассмотрим пример:

1. Определим одно поле для использования в качестве объекта синхронизации:

readonly object _locker = new object();

2. Определим поле (или поля) для использования в Вашем пользовательском условии (условиях) блокировки. Пример:

bool _go;

или:

int _semaphoreCount;

3. Всякий раз, когда Вы хотите выполнить блокировку, добавьте следующий код:

lock (_locker)
   while ( < условие блокировки > )
      Monitor.Wait (_locker);

4. Всякий раз, когда Вы меняете (или потенциально меняете) условие блокировки, добавьте следующий код:

lock (_locker)
{
   // Здесь изменяйте поле (поля) или данные, которые могут повлиять на
   // условие (условия) блокировки:
   // ...
   Monitor.Pulse(_locker);    // или: Monitor.PulseAll (_locker);
}

Примечание: если Вы меняете условие блокировки и хотите ждать, то можете реализовать шаги 3 и 4 в одной блокировке.

Этот шаблон позволяет любому потоку ждать любое время любого условия. Ниже приведен простой пример, где рабочий поток ждет, когда поле _go установится в true:

class SimpleWaitPulse
{
   static readonly object _locker = new object();
   static bool _go;
 
   static void Main()
   {                                   // Новый поток заблокируется,
      new Thread (Work).Start();       // потому что _go==false.
 
      Console.ReadLine();              // Ожидание, когда пользователь нажмет Enter.
 
      lock (_locker)                   // Позволим потоку пробудиться путем установки
      {                                // _go=true и вызова Pulse.
         _go = true;
         Monitor.Pulse (_locker);
      }
   }
 
   static void Work()
   {
      lock (_locker)
         while (!_go)
            Monitor.Wait (_locker);    // Освобождение блокировки во время ожидания.
 
      Console.WriteLine ("Проснулся!!!");
   }
}

Этот код выведет следующее (после нажатия Enter):

Проснулся!!!

Для безопасной работы потоков мы гарантируем, что ко всем общим полям доступ осуществляется через блокировку. Следовательно, мы добавляем операторы блокировки lock вокруг чтения и обновления флага _go. Это основа многопоточной безопасности (если Вы не собираетесь следовать принципам не блокирующей синхронизации).

Метод блок это то место, где мы блокируем выполнение кода на доступе к общей переменной, с ожиданием, когда флаг _go станет true. Метод Monitor.Wait выполняет вот эти действия, в следующем порядке:

1. Освобождает блокировку на _locker.
2. Блокируется, пока не будет вызван Pulse на _locker.
3. Заново запрашивает блокировку на _locker. Если блокировка конкурентно занята, то блокируется до тех пор, пока блокировка не станет доступной.

Это означает, что в любом случае не удерживается блокировка на объекте синхронизации, пока Monitor.Wait ожидает Pulse:

lock (_locker)
{
   while (!_go)
      Monitor.Wait (_locker);  // Блокировка освобождена.
   // Блокировка взята снова.
   ...
}

Затем выполнение продолжается на следующем операторе. Monitor.Wait разработан для использования внутри оператора lock; если он будет использован по-другому, то будет выброшено исключение. То же самое касается и Monitor.Pulse.

В методе Main мы подаем сигнал для рабочего потока Work установкой флага _go (внутри блокировки) и вызовом Pulse. Как только мы освободили блокировку, рабочий поток возобновит выполнение, прокрутив цикл while.

Методы Pulse и PulseAll освобождают потоки, заблокированные на операторе Wait. Pulse освободит максимум 1 поток; PulseAll освободит их все. В нашем примере так как только один поток заблокирован, то не имеет значение какой метод вызвать - Pulse или PulseAll, потому что они будут действовать одинаково. Если ожидают больше одного потока, то вызов PulseAll обычно самый безопасный шаблон для использования.

Чтобы ждать (Wait) обмена с помощью Pulse или PulseAll, синхронизирующий объект (в нашем случае _locker) должен быть один и тот же.

В нашем шаблоне вызов Pulse показывает, что что-то могло поменяться, и этот ожидающий поток должен проверить заново свои условия блокировки. В методе потока Work, эта проверка осуществляется в условии цикла while. Тогда ожидающий поток принимает решение о продолжении, а не оповещающий поток. Если вызов Pulse само по себе берется как инструкция продолжиться, конструкция Wait лишается любой реальной значимости; Вы заканчиваете с низшей версией AutoResetEvent.

Если мы ставим наш шаблон удалением цикла while, флага _go и ReadLine, то придем к простейшему примеру Wait/Pulse:

static void Main()
{
   new Thread (Work).Start();
   lock (_locker) Monitor.Pulse (_locker);
}
 
static void Work()
{
   lock (_locker) Monitor.Wait (_locker);
   Console.WriteLine ("Проснулся!!!");
}

Это не сможет отобразить вывод, потому что его поведение не определенное! Произойдет гонка между главным потоком (тело процедуры Main) и рабочим потоком (тело процедуры Work). Если сначала выполнится Wait, то сигнал сработает. Если сначала выполнится Pulse, то этот сигнал пропадет, и рабочий поток навсегда останется заблокированным. Это отличается от поведения AutoResetEvent, где метод Set имеет эффект памяти или "защелкивания", так что он все еще будет эффективным, если вызовется перед WaitOne.

У Pulse нет эффекта защелкивания, потому что Вы ожидаете записи в саму защелку, для чего мы ранее использовали флаг "_go". Этот момент делает Wait и Pulse универсальными: с двоичным флагом мы можем заставить их работать так же, как и AutoResetEvent; с полем int мы можем написать CountdownEvent или Semaphore. С более сложными структурами данных мы можем пойти дальше и реализовать такие же конструкции, как очередь генератора/получателя.

Очередь генератора/получателя (Producer/Consumer Queue). Ранее мы описывали концепцию очереди producer/consumer, и как написать её с помощью AutoResetEvent. Теперь мы приступим к написанию более мощной версии с помощью Wait и Pulse.

Сейчас мы позволим использовать произвольное количество рабочих потоков. Все потоки будем отслеживать в массиве:

Thread[] _workers;

Это даст нам возможность подключаться (Join) к этим потокам позже, когда мы будем останавливать очередь.

Каждый рабочий поток будет выполнять метод с именем Consume. Мы можем создать потоки и запустить их в одном цикле следующим образом:

public PCQueue (int workerCount)
{
   _workers = new Thread [workerCount];
 
   // Создание и запуск отдельного потока.
   for (int i = 0; i < workerCount; i++)
      (_workers [i] = new Thread (Consume)).Start();
}

Вместо использования простой строки для описания задачи мы реализуем более гибкий метод с помощью делегата. Будем использовать делегата System.Action в .NET Framework, который определен следующим образом:

public delegate void Action();

Этот делегат соответствует любому методу без параметров - вместо делегата наподобие ThreadStart. Мы все еще можем представить задачи, которые вызовут методы с параметрами, хотя путем обертки вызова в анонимный делегат или lambda-выражения:

Action myFirstTask = delegate
{
   Console.WriteLine ("foo");
};
 
Action mySecondTask = () => Console.WriteLine ("foo");
 
Queue< Action> _itemQ = new Queue< Action>();

Перед тем, как перейти к методам EnqueueItem и Consume, посмотрим сначала на полный код:

using System;
using System.Threading;
using System.Collections.Generic;
 
public class PCQueue
{
   readonly object _locker = new object();
   Thread[] _workers;
   Queue< Action> _itemQ = new Queue< Action>();
 
   public PCQueue (int workerCount)
   {
      _workers = new Thread [workerCount];
 
      // Создание и запуск отдельного потока для каждой задачи:
      for (int i = 0; i < workerCount; i++)
         (_workers [i] = new Thread (Consume)).Start();
  }
 
   public void Shutdown (bool waitForWorkers)
   {
      // Поствим в очередь один элемент null для каждой задачи,
      // чтобы она завершилась:
      foreach (Thread worker in _workers)
         EnqueueItem (null);
 
      // Ожидание завершения всех рабочих задач:
      if (waitForWorkers)
         foreach (Thread worker in _workers)
            worker.Join();
   }
 
   public void EnqueueItem (Action item)
   {
      lock (_locker)
      {
         _itemQ.Enqueue (item);     // Мы должны вызвать Pulse, потому что
         Monitor.Pulse (_locker);   // поменяли условие блокировки.
      }
   }
 
   void Consume()
   {
      while (true)                  // Продолжим работу, пока не будет
      {                             // указано нечто другое.
         Action item;
         lock (_locker)
         {
            while (_itemQ.Count == 0) Monitor.Wait (_locker);
            item = _itemQ.Dequeue();
         }
         if (item == null) return;  // Это дает нам сигнал завершиться.
         item();                    // Обработка элемента.
      }
   }
}

Тут у нас также есть стратегия для завершения рабочей задачи: постановка в очередь null сигнализирует потребителю завершить работу после обработки всех ожидающих элементов (если мы хотим выйти быстрее, то следует использовать независимый флаг отмены "cancel"). Поскольку мы поддерживаем несколько потребителей, то должны поставить в очередь один null-элемент на каждый поток, чтобы полностью остановить очередь.

Ниже показан код метода Main, который запускает очередь producer/consumer, указывая два конкурирующих потока - потребителя, и затем ставит в очередь 10 делегатов, которые совместно используются этими двумя потребителями:

static void Main()
{
   PCQueue q = new PCQueue (2);
 
   Console.WriteLine ("Постановка в очередь 10 элементов...");
 
   for (int i = 0; i < 10; i++)
   {
      int itemNumber = i;        // Чтобы избежать захвата переменной ловушкой
      q.EnqueueItem (() =>
      {
         Thread.Sleep (1000);    // Симуляция интенсивной вычислительной работы
         Console.Write (" Task" + itemNumber);
      });
   }
 
   q.Shutdown (true);
   Console.WriteLine();
   Console.WriteLine ("Рабочие потоки завершились!");
}

Этот пример выведет следующее:

Постановка в очередь 10 элементов...
 Task1 Task0 (pause...) Task2 Task3 (pause...) Task4 Task5 (pause...)
 Task6 Task7 (pause...) Task8 Task9 (pause...)
Рабочие потоки завершились!

Теперь посмотрим на метод EnqueueItem:

public void EnqueueItem (Action item)
{
   lock (_locker)
   {
      _itemQ.Enqueue (item);     // Мы должны выдать Pulse, потому что
      Monitor.Pulse (_locker);   // поменяли условие блокировки.
   }
}

Так как очередь используется несколькими потоками, то мы должны обернуть все чтения/записи в блокировку. И потому, что мы изменяем условие блокировки (потребитель может приступить к действию в результате постановки в очередь задачи), нам нужно использовать Pulse.

Ради эффективности мы мы вызываем Pulse вместо PulseAll, когда ставим элемент в очередь. Причина в том, что (самое большее) один потребитель должен быть разбужен на один элемент. Если у Вас есть только одно мороженое, то нет смысла будить весь класс из 30 спящих детей, чтобы поставить их в очередь на потребление этого мороженого; то же самое произойдет и с 30 потоками - мы не получим никакой выгоды от вывода из всех из сна, потому что только один приступит к работе, и остальные 29 бесполезно прокрутят свой цикл перед тем, как снова войти в сон. Функциональность однако не нарушится, если мы заменим Pulse на PulseAll.

Теперь посмотрим на метод Consume, где рабочий поток берет элемент из очереди и обрабатывает его. Мы хотим, чтобы рабочий поток блокировался, когда ему нечего делать, т. е. когда нет элементов в очереди. Таким образом, наше условие блокировки будет _itemQ.Count==0:

Action item;
lock (_locker)
{
   while (_itemQ.Count == 0) Monitor.Wait (_locker);
   item = _itemQ.Dequeue();
}
if (item == null) return;  // Это сигнал выйти.
item();                    // Выполнение полезной работы (обработка
                           // элемента из очереди).

Выход из цикла while произойдет тогда, когда _itemQ.Count не равно 0, это означает, что (как минимум) один элемент ожидает обработки в очереди. Мы должны выбрать элемент из очереди перед освобождением блокировки, иначе элемент может уже не быть в очереди (существующий другой поток может выбрать этот элемент). В частности, другой потребитель данных, только что завершивший предыдущую обработку, мог бы внезапно забрать наш элемент, если в этот момент не действует блокировка, и мы сделали бы что-то типа этого:

Action item;
lock (_locker)
{
   while (_itemQ.Count == 0) Monitor.Wait (_locker);
}
lock (_locker)                // НЕ ПРАВИЛЬНО!
{
   item = _itemQ.Dequeue();   // Элемент может уже здесь не находиться!
}
...

После того, как элемент взят из очереди, мы немедленно освобождаем блокировку. Если мы удержим её на время обработки задачи, то нежелательно заблокируем другие потребители данных и генераторы данных. Мы не должны вызывать Pulse после выборки из очереди, так как никакой другой потребитель данных никогда не сможет разблокировать при наличии меньшего количества элементов в очереди.

Краткая блокировка имеет преимущество, когда Wait и Pulse (обычно) избегает нежелательной блокировки других потоков. Блокировка по большому количеству строк кода вполне допустима, если она выполнится за короткое время. Помните, что Вам помогает Monitor.Wait освобождением нижележащей блокировки при ожидании Pulse!

Таймауты ожидания. Вы можете указать таймаут, когда вызываете Wait, либо в миллисекундах, либо через TimeSpan. Тогда метод Wait вернет false, если он сдался в ожидании таймаута. Таймаут применяется только на фазе ожидания. Таким образом, Wait с таймаутом делает следующее:

1. Освобождает нижележащую блокировку.
2. Блокирует выполнение до появления Pulse, или до истечения таймаута.
3. Заново взводит нижележащую блокировку.

Указание таймаута осуществляется наподобие запроса для CLR выдать "виртуальный Pulse" после истечения интервала таймаута. Wait, когда произошел таймаут, все еще выполнит шаг 3 и восстановит блокировку, как если бы был успешно получен Pulse.

Если Wait блокирует выполнение на шаге 3 (при восстановлении блокировки), то любой таймаут игнорируется. Это редко составляет проблему, хотя только когда другие потоки блокируются на очень короткое время, и приложение с использованием Wait/Pulse хорошо реализовано. Таким образом, повторное получение блокировки должно быть почти мгновенной операцией.

Таймауты ожидания имеют полезное применение. Иногда может быть нежелательным или невозможным выдавать Pulse каждый раз при возникновении разблокировки. В качестве примера может быть ситуация, когда условие блокировки вовлекает вызов метода, который получает информацию из периодических запросов к базе данных. Если задержка в доступе не составляет проблемы, то решение простое - Вы можете указать таймаут при вызове Wait следующим образом:

lock (_locker)
   while ( < blocking-condition> )
      Monitor.Wait (_locker, < timeout> );

Это принуждает условие блокировки к повторной проверке после интервала, указанного в timeout, как и когда произошел вызов Pulse. Чем проще условие блокировки, тем меньше может быть таймаут, не создавая проблемы снижения эффективности. В этом случае мы не заботимся о том, получил ли Wait вызов Pulse или произошел таймаут, и игнорируем возвращаемое Wait значение.

Та же система работает одинаково хорошо, если Pulse не произошел из-за ошибки в программе. В этом случае стоит добавить таймаут ко всем командам Wait в программах, где синхронизация особенно сложная, как способ восстановления из ситуаций неясных, случайно появляющихся ошибок. Это также предоставляет определенную степень защиты от ошибок, если программа была позже изменена, что привело к пропаданию вызова Pulse.

Monitor.Wait вернет значение bool, показывающее, был ли реальный вызов Pulse. Если возвращено false, то это означает истечение таймаута: иногда полезно записывать в лог такие события или выбрасывать исключения, если истечение таймаута было нежелательным.

Когда больше одного потока вызвали Waits на одном и том же объекте, формируется "очередь ожидания" (waiting queue) позади объекта синхронизации (это отличается от очереди готовности "ready queue", используемой для предоставления доступа к блокировке). Тогда каждый Pulse освободит один поток в голове очереди ожидания, так что он может войти в очередь готовности и заново восстановить блокировку. Можно это представить как автоматическая парковка автомобилей: Вы становитесь в очередь сначала на станции оплаты, чтобы получить свой билет (очередь ожидания, waiting queue); и снова становитесь в очередь на барьере, когда хотите уехать с парковки (очередь готовности, ready queue).

CSharp WaitPulse

Однако порядок, свойственный структуре очереди, часто не имеет значения в приложениях Wait/Pulse, и в таких случаях может быть проще представить "пул" ожидающих потоков. Тогда каждый Pulse освободит один ожидающий поток из пула.

PulseAll освободит всю очередь, или пул, потоков ожидания. Однако получившие импульс потоки не запустятся точно одновременно, скорее в организованной последовательности, поскольку каждый из их операторов Wait пытается повторно получить ту же самую блокировку. В результате PulseAll переместит всю очередь ожидания в очередь готовности, так что потоки могут организованно возобновить выполнение.

Сигнализация в обоих направлениях и гонки. Важная особенность Monitor.Pulse в том, что это выполняется асинхронно, т. е. никаким образом не блокируя самого себя и не ставя на паузу. Если другой поток ждет объекта импульса, то он будет разблокирован. Иначе выдача импульса не даст никакого эффекта, и просто будет тихо проигнорирована.

Таким образом, Pulse дает обмен только в одну сторону: пульсирующий поток (потенциально) подает сигнал ожидающему потоку. Здесь нет внутреннего механизма подтверждения: не вернет значение, показывающее, получил ли какой-либо поток импульс разблокировки, или нет. Кроме того, когда оповещающий поток выдал импульс и тем самым освободил его блокировку, нет никакой гарантии, что ожидающий разблокировки поток немедленно оживет. Здесь может быть маленькая задержка с дискретностью единиц планировщика, во время которой ни у одного потока нет блокировки. Это означает, что выдающий импульс поток не может знать, возобновил ли выполнение ожидающий поток - если Вы не реализовали специальный код для передачи такой информации (например, с помощью другого флага или другой комбинации Wait и Pulse).

Для иллюстрации предположим, что нам нужен сигнал 5 раз подряд:

class Race
{
   static readonly object _locker = new object();
   static bool _go;
 
   static void Main()
   {
      new Thread (SaySomething).Start();
 
      for (int i = 0; i < 5; i++)
      {
         lock (_locker) 
         {
            _go = true;
            Monitor.PulseAll (_locker);
         }
      }
   }
 
   static void SaySomething()
   {
      for (int i = 0; i < 5; i++)
      {
         lock (_locker)
         {
            while (!_go) Monitor.Wait (_locker);
            _go = false;
            Console.WriteLine ("Что произошло?");
         }
      }
   }
}

Ожидаемый вывод:

Что произошло?
Что произошло?
Что произошло?
Что произошло?
Что произошло?

Вот какой вывод будет в действительности:

Что произошло? (зависание)

Эта плохо написанная программа демонстрирует состояние гонки: цикл for в главном потоке может свободно прокрутить свои 5 итераций в любое время, когда рабочий поток не удерживает блокировку, и это может произойти даже перед запуском рабочего потока! Пример producer/consumer не пострадал бы от такой проблемы, потому что если бы основной поток обогнал рабочий поток, то каждый запрос стоял бы в очереди. Но в этом случае нам нужно заблокировать главный поток на каждой итерации, если рабочий поток все еще занят обработкой предыдущей задачи.

Мы можем исправить этот код добавлением флага _ready к классу, управляемому рабочим потоком. Затем главный поток ждет, пока рабочий поток не будет готов (ready) к установке флага _go.

Это аналогично предыдущему примеру, который делает то же самое с помощью двух AutoResetEvents.

Исправленный вариант, который выведет "Что произошло?" 5 раз:

class Solved
{
   static readonly object _locker = new object();
   static bool _ready, _go;
 
   static void Main()
   {
      new Thread (SaySomething).Start();
 
      for (int i = 0; i < 5; i++)
      {
         lock (_locker)
         {
            while (!_ready) Monitor.Wait (_locker);
            _ready = false;
            _go = true;
            Monitor.PulseAll (_locker);
         }
      }
   }
 
   static void SaySomething()
   {
      for (int i = 0; i < 5; i++)
      {
         lock (_locker)
         {
            _ready = true;
            Monitor.PulseAll (_locker);           // Помните, что вызов Monitor.Wait
            while (!_go) Monitor.Wait (_locker);  // освобождает и повторно
            go = false;                           // захватывает блокировку.
            Console.WriteLine ("Что произошло?");
         }
      }
   }
}

В методе Main (главный поток) мы очищаем флаг _ready, устанавливаем флаг _go и вызываем Pulse, и все это делаем в одном операторе блокировки lock. Преимущество такого подхода - устойчивость, если мы позже добавим третий поток. Представим себе, что другой поток пытается в то же самое время подавать сигнал рабочему потоку. В этом сценарии наша логика не нарушится: в действительности мы атомарно делаем действия по очистке _ready и установке _go.

Симуляция дескрипторов ожидания (Wait Handle). Вы возможно заметили, что в у шаблона предыдущего примера следующая структура:

lock (_locker)
{
   while (!_flag) Monitor.Wait (_locker);
   _flag = false;
   ...
}

Здесь _flag устанавливается в true другим потоком. В результате получается эффект использования AutoResetEvent. Если мы пропустим _flag=false, то получим базовый шаблон ManualResetEvent.

Давайте подробно представим код для ManualResetEvent, использующего Wait и Pulse:

readonly object _locker = new object();
bool _signal;
 
void WaitOne()
{
   lock (_locker)
   {
      while (!_signal) Monitor.Wait (_locker);
   }
}
 
void Set()
{
   lock (_locker) { _signal = true; Monitor.PulseAll (_locker); }
}
 
void Reset() { lock (_locker) _signal = false; }

Мы использовали PulseAll, потому что здесь может быть любое количество заблокированных ожидающих потоков.

Написание AutoResetEvent произойдет простой заменой кода в WaitOne на следующий:

lock (_locker)
{
   while (!_signal) Monitor.Wait (_locker);
   _signal = false;
}

и заменой PulseAll на Pulse в методе Set:

lock (_locker) { _signal = true; Monitor.Pulse (_locker); }

Воздержитесь от использования PulseAll на очереди с ожидающими потоками, потому что каждый вызов PulseAll приведет к повреждению очереди и затем к её повторному формированию.

Замена _signal на целочисленное поле создаст базовый шаблон для Semaphore.

Симуляция статических методов, которые работают с набором дескрипторов ожидания, легко реализуется в простых сценариях. Эквивалент вызова WaitAll это ничто иное, как условие блокировки, включающее все флаги, используемые вместо дескрипторов ожидания:

lock (_locker)
   while (!_flag1 && !_flag2 && !_flag3...)
      Monitor.Wait (_locker);

Это может быть особенно полезно, если учесть, что WaitAll часто нельзя использовать из-за проблем устаревшего COM. Симуляция WaitAny это просто вопрос замены оператора && оператором ||.

Если у Вас много флагов, то этот метод становится менее эффективным, потому что все эти флаги должны использовать один объект синхронизации, чтобы сигнализация работала атомарно. В этом случае использование дескрипторов ожидания получает преимущество.

Написание CountdownEvent. С помощью Wait и Pulse мы можем реализовать полный функционал CountdownEvent следующим образом:

public class Countdown
{
   object _locker = new object ();
   int _value;
 
   public Countdown() { }
   public Countdown (int initialCount) { _value = initialCount; }
 
   public void Signal() { AddCount (-1); }
 
   public void AddCount (int amount)
   {
      lock (_locker) 
      { 
         _value += amount;
         if (_value < 1) Monitor.PulseAll (_locker);
      }
   }
 
   public void Wait()
   {
      lock (_locker)
         while (_value > 0)
            Monitor.Wait (_locker);
   }
}

Этот шаблон кода похож на тот, что мы видели раньше, за исключением условия блокировки на основе целочисленного поля.

Встреча потоков. Мы можем использовать класс Countdown для написания рандеву двух потоков - как мы это делали ранее [3] с WaitHandle.SignalAndWait:

class Rendezvous
{
   static object _locker = new object();
 
   // В Framework 4.0 мы могли бы вместо этого использовать
   // встроенный класс CountdownEvent.
   static Countdown _countdown = new Countdown(2);
 
   public static void Main()
   {
      // Перевод каждого потока в сон на случайное время.
      Random r = new Random();
      new Thread (Mate).Start (r.Next (10000));
      Thread.Sleep (r.Next (10000));
 
      _countdown.Signal();
      _countdown.Wait();
 
      Console.Write ("Mate! ");
   }
 
   static void Mate (object delay)
   {
      Thread.Sleep ((int) delay);
 
      _countdown.Signal();
      _countdown.Wait();
    
      Console.Write ("Mate! ");
   }
}

В этом примере каждый поток засыпает на случайное количество времени и затем ждет другой поток, в результате оба выводят сообщение "Mate" (скорее всего) одновременно. Это называется барьером выполнения потока (thread execution barrier), и может быть расширено на любое количество потоков (настройкой начального значения счетчика обратного отсчета Countdown).

Барьеры выполнения потока полезны, когда Вы хотите удержать несколько потоков на определенном шаге в процессе выполнения серии задач. Однако наше текущее решение ограничено, в нем мы не можем снова использовать тот же самый объект Countdown для организации встречи пoтоков второй раз - по крайней мере не без дополнительных конструкций сигнализации. Чтобы решить эту проблему, Framework 4.0 предоставляет новый класс Barrier.

[Класс Barrier]

Barrier представляет в Framework 4.0 новую конструкцию сигнализации. Он реализует барьер выполнения потока, что позволяет любому количеству потоков встретиться в определенное время. Этот класс очень быстрый и эффективный, в он построен на основе Wait, Pulse и блокировок прокруток цикла (spinlocks).

Чтобы использовать этот класс:

1. Инстанцируйте его, указав количество потоков, которые должны принять участие в встрече (Вы это можете поменять позже вызовом AddParticipants/RemoveParticipants).
2. В каждом потоке вызовите SignalAndWait, когда хотите, чтобы он ожидал встречи.

Инстанциация Barrier со значением 3 приведет к тому, что SignalAndWait будет блокировать выполнение, пока этот метод не будет вызван 3 раза. Но в отличие от CountdownEvent барьер запускается автоматически: вызов SignalAndWait снова заблокирует выполнение, пока не будет вызван еще 3 раза. Это позволяет сохранять несколько потоков на одном шаге, когда они выполняют серию задач.

CSharp Barrier

В следующем примере каждый из 3 потоков записывает числа от 0 до 4, делая это по шагам с другими потоками:

static Barrier _barrier = new Barrier (3);
 
static void Main()
{
   new Thread (Speak).Start();
   new Thread (Speak).Start();
   new Thread (Speak).Start();
}
 
static void Speak()
{
   for (int i = 0; i < 5; i++)
   {
      Console.Write (i + " ");
      _barrier.SignalAndWait();
   }
}

Результат работы этого примера:

0 0 0 1 1 1 2 2 2 3 3 3 4 4 4

Реально полезная функция Barrier в том, что при его конструировании также можно указать для него дополнительное действие (действие пост-фазы), совпадающее со "встречей". Это делегат, который будет вызван после того, как SignalAndWait был вызван n раз, но перед перед тем, как потоки будут разблокированы. В нашем примере, если мы инстанциировали барьер следующим образом:

static Barrier _barrier = new Barrier (3, barrier => Console.WriteLine());

то вывод получится таким:

0 0 0 
1 1 1 
2 2 2 
3 3 3 
4 4 4 

Действие пост-фазы может быть полезным, когда нужно объединить данные, которые были произведены каждым из встретившихся рабочих потоков. Не нужно заботиться о вытеснении, потому что все рабочие потоки блокируются в ожидании завершения каждым обработки свой части работы.

[Блокировки Reader/Writer]

Довольно часто экземпляры типа потокобезопасны для одновременных операций чтения, но не для одновременных операций обновления (ни тем более для одновременных чтения и обновления). Это может быть справедливым с таким ресурсом, как файл. Хотя защита экземпляров такого типа простой исключительной блокировкой для всех режимов доступа обычно добивается цели, она нежелательно ограничивает одновременный доступ, если есть множество читающих потоков и только иногда происходят обновления. Пример такого случая - сервер бизнес-приложений, где часто используемые данные кэшируются в статических полях для быстрого доступа. Класс ReaderWriterLockSlim был разработан для предоставления максимальной доступности данных при блокировке в таком сценарии.

Класс ReaderWriterLockSlim был представлен в Framework 3.5, заменив старый "толстый" класс ReaderWriterLock. Последний обладает похожей функциональностью, но в несколько раз медленнее и содержит ошибку в своем механизме обработки обновлений блокировки. Однако в сравнении с обычной блокировкой (Monitor.Enter/Exit) класс ReaderWriterLockSlim работает в 2 раза медленнее.

В обоих классах здесь есть два базовых вида блокировки - блокировка чтения (read lock) и блокировка записи (write lock):

• Блокировка записи универсально исключительная.
• Блокировка чтения совместима с другими блокировками чтения.

Таким образом поток, удерживающий блокировку записи, будет блокировать все другие потоки, пытающиеся получить блокировку чтения или блокировку записи (и наоборот). Но если нет потоков, удерживающих блокировку записи, то любое количество потоков могут одновременно получить блокировку чтения.

ReaderWriterLockSlim определяет следующие методы для получения и освобождения блокировок чтения/записи:

public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();

Дополнительно есть версии "Try" всех методов EnterXXX, которые принимают аргумент таймаута в стиле Monitor.TryEnter (таймауты могут произойти довольно легко, если за ресурс очень высокая конкуренция). ReaderWriterLock предоставляет подобные методы AcquireXXX и ReleaseXXX. Вместо возврата false они выбрасывают исключение ApplicationException, если произошел таймаут.

Следующая программа демонстрирует ReaderWriterLockSlim. 3 потока постоянно проходят по списку, где 2 потока каждую секунду добавляют случайное число к списку. Блокировка чтения защищает читающие список потоки, и блокировка записи защищает записывающие в список потоки:

class SlimDemo
{
   static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
   static List< int> _items = new List< int>();
   static Random _rand = new Random();
 
   static void Main()
   {
      new Thread (Read).Start();
      new Thread (Read).Start();
      new Thread (Read).Start();
 
      new Thread (Write).Start ("A");
      new Thread (Write).Start ("B");
  }
 
   static void Read()
   {
      while (true)
      {
         _rw.EnterReadLock();
         foreach (int i in _items) Thread.Sleep (10);
         _rw.ExitReadLock();
      }
   }
 
   static void Write (object threadID)
   {
      while (true)
      {
         int newNumber = GetRandNum (100);
         _rw.EnterWriteLock();
         _items.Add (newNumber);
         _rw.ExitWriteLock();
         Console.WriteLine ("Поток " + threadID + " добавил " + newNumber);
         Thread.Sleep (100);
      }
   }
 
   static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }
}

В коде релиза обычно добавляют блоки try/finally, чтобы гарантировать, что все блокировки были освобождены, если было выброшено исключение.

Результат выполнения этого примера:

Поток B добавил 61
Поток A добавил 83
Поток B добавил 55
Поток A добавил 33
...

ReaderWriterLockSlim позволяет большей одновременной активности чтения Read, чем простая блокировка. Мы может показать это вставкой следующий код в метод Write, в начало цикла while:

Console.WriteLine (_rw.CurrentReadCount + " одновременно читающих");

В результате всегда будут выведены "3 одновременно читающих" (методы Read тратят большинство своего времени внутри циклов foreach). Как и CurrentReadCount, класс ReaderWriterLockSlim предоставляет следующие свойства для мониторинга блокировок:

public bool IsReadLockHeld            { get; }
public bool IsUpgradeableReadLockHeld { get; }
public bool IsWriteLockHeld           { get; }
 
public int  WaitingReadCount          { get; }
public int  WaitingUpgradeCount       { get; }
public int  WaitingWriteCount         { get; }
 
public int  RecursiveReadCount        { get; }
public int  RecursiveUpgradeCount     { get; }
public int  RecursiveWriteCount       { get; }

Обновляемые блокировки и рекурсия. Иногда полезно поменять блокировку чтения на блокировку записи в одной атомарной операции. Для примера предположим, что Вы хотите добавить элемент к списку только если он в нем уже не присутствует. Идеальным было бы минимизировать затраты времени на удержание (исключительной) блокировки записи, для чего можно выполнить обработку следующим образом:

1. Получить блокировку чтения.
2. Проверить, если уже этот элемент в списке, и если это так, освободить блокировку и выполнить возврат.
3. Освободить блокировку чтения.
4. Получить блокировку записи.
5. Добавить элемент в список.

Проблема тут в том, что другой поток может подкрасться и изменить список (т. е. добавить такой же элемент) между шагами 3 и 4. ReaderWriterLockSlim решает эту проблему третьим видом блокировки, которая называется обновляемая блокировка (upgradeable lock). Обновляемая блокировка подобна блокировке чтения, с тем исключением, что она может быть повышена до блокировки чтения в атомарной операции. Вот как это следует использовать:

1. Сделать вызов EnterUpgradeableReadLock.
2. Выполнить действия по чтению (например, проверить, есть ли интересующий элемент в списке).
3. Вызвать EnterWriteLock (это преобразует обновляемую блокировку в блокировку записи).
4. Выполнить действия по записи (например, добавление этого элемента в список).
5. Вызвать ExitWriteLock (это преобразует блокировку записи обратно в обновляемую блокировку).
6. Выполнить любые другие операции по чтению.
7. Вызвать ExitUpgradeableReadLock.

С точки зрения вызывающего кода это скорее похоже на вложенную или рекурсивную блокировку. Однако функционально на шаге 3 класс ReaderWriterLockSlim атомарно освободит блокировку чтения и получит чистую блокировку записи (обе эти операции выполнятся атомарно, как одно неделимое действие).

Есть и другое важное отличие между обновляемыми блокировками и блокировками чтения. В то время как обновляемая блокировка может сосуществовать одновременно с любым количеством блокировок чтения, сама по себе обновляемая блокировка в любой момент времени может быть только одна. Это предотвращает появление глухой блокировки при преобразовании уровня блокировки (conversion deadlock) путем сериализации одновременных преобразований блокировки - также как это делает блокировка обновления (update lock) SQL Server:

SQL Server ReaderWriterLockSlim
Share lock Read lock
Exclusive lock Write lock
Update lock Upgradeable lock

Мы можем продемонстрировать обновляемую блокировку изменением метода Write в предыдущем примере так, чтобы он добавлял число к списку только если этого числа пока в списке нет:

while (true)
{
   int newNumber = GetRandNum (100);
   _rw.EnterUpgradeableReadLock();
   if (!_items.Contains (newNumber))
   {
      _rw.EnterWriteLock();
      _items.Add (newNumber);
      _rw.ExitWriteLock();
      Console.WriteLine ("Поток " + threadID + " добавил " + newNumber);
   }
   _rw.ExitUpgradeableReadLock();
   Thread.Sleep (100);
}

ReaderWriterLock может также делать преобразования блокировки - но это ненадежно, потому что не поддерживается концепция обновляемых блокировок. По этой причине разработчики ReaderWriterLockSlim с нуля разработали новый класс.

Рекурсия блокировки. Обычно вложенная или рекурсивная блокировка запрещена с ReaderWriterLockSlim. Следовательно, следующий код выбросит исключение:

var rw = new ReaderWriterLockSlim();
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();

Однако этот код запустится без ошибки, если Вы сконструируете экземпляр класса ReaderWriterLockSlim следующим образом:

var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);

Это гарантирует, что может происходить рекурсивная блокировка, если Вам это действительно нужно. Рекурсивные блокировки добавляют нежелательную сложность в программу, потому что можно получить больше одного вида блокировки:

rw.EnterWriteLock();
rw.EnterReadLock();
Console.WriteLine (rw.IsReadLockHeld);    // True
Console.WriteLine (rw.IsWriteLockHeld);   // True
rw.ExitReadLock();
rw.ExitWriteLock();

Базовое правило состоит в том, что как только Вы получили блокировку, последующие блокировки должны быть меньше, но не больше, следовать уровням: Read Lock (блокировка чтения), Upgradeable Lock (обновляемая блокировка), Write Lock (блокировка записи).

Однако запрос на повышение уровня обновляемой блокировки на блокировку записи всегда остается легальным.

[Suspend и Resume]

Поток может быть явно приостановлен (suspend) и затем может быть возобновлено его выполнение (resume) с помощью устаревших методов Thread.Suspend и Thread.Resume. Этот механизм полностью отделен от блокировок. Обе системы независимы друг от друга и работают параллельно.

Поток может приостановить самого себя или другой поток. Вызов Suspend приведет к краткому входу потока в состояние SuspendRequested, затем по достижению точки безопасности для сбора мусора, поток входит в состояние Suspended. Начиная с этого момента выполнение потока может быть возобновлено только с помощью другого потока, который вызовет его метод Resume. Resume будет работать только на приостановленном потоке, но не заблокированном потоке.

Начиная с .NET 2.0 Suspend и Resume стали устаревшими, их использование не рекомендуется из-за опасности произвольной приостановки другим потоком. Если приостановить поток, который находится в блокировке на критическом ресурсе, то все приложение (или компьютер) может получить deadlock. Это намного опаснее, чем вызов Abort - который в результате приведет к освобождению любых таких блокировок (как минимум теоретически) на основании кода в блоках finally.

Однако безопасным является вызов Suspend на текущем потоке - и таким способом можно реализовать простой механизм синхронизации - рабочий поток в цикле выполняет свою задачу, вызывает Suspend для самого себя, и затем ждет возобновления ("пробуждения") основным потоком, когда будет готова для обработки другая задача. Хотя тут есть сложность в определении, приостановлен ли поток. Рассмотрим следующий код:

worker.NextTask = "MowTheLawn";
 
if ((worker.ThreadState & ThreadState.Suspended) > 0)
   worker.Resume;
else
   // Мы не можем вызывать Resume, поскольку поток работает.
   // Вместо этого дайте сигнал рабочему потоку через флаг:
   worker.AnotherTaskAwaits = true;

Это ужасно небезопасно для многопоточной работы: этот код может быть вытеснен в любом месте этих строк, в то время как рабочий поток может поменять свое состояние. Хотя эту проблему можно обойти, решение становится более сложным, чем альтернатива использовать конструкцию синхронизации, такую как AutoResetEvent или Wait и Pulse. Это делает Suspend и Resume совершенно бесполезными.

Устаревшие методы Suspend и Resume имеют два режима: опасно и бесполезно!

[Прекращение работы потока]

Вы можете принудительно завершить поток методом Abort:

class Abort
{
   static void Main()
   {
      Thread t = new Thread (delegate() { while(true); } );    // Бесконечное зацикливание
      t.Start();
      Thread.Sleep (1000);          // Позволим этому циклу крутиться 1 секунду...
      t.Abort();                    // ... затем оборвем.
   }
}

Поток, работа которого обрывается, немедленно входит в состояние AbortRequested. Если он затем завершается как ожидалось, то переходит в состояние Stopped. Вызвавший код может ждать появления этого состояния вызовом Join:

class Abort
{
   static void Main()
   {
      Thread t = new Thread (delegate() { while (true); } );
 
      Console.WriteLine (t.ThreadState);     // Поток t не запущен (Unstarted)
 
      t.Start();
      Thread.Sleep (1000);
      Console.WriteLine (t.ThreadState);     // Работает (Running)
 
      t.Abort();
      Console.WriteLine (t.ThreadState);     // Запрос на обрыв работы (AbortRequested)
 
      t.Join();
      Console.WriteLine (t.ThreadState);     // Остановлен (Stopped)
   }
}

Abort вызывает к выбросу исключения ThreadAbortException на целевом потоке, в большинстве случаев, когда поток работает. Поток, выполнение которого останавливается таким способом, может выбрать обработку этого исключения, однако тогда исключение автоматически повторно выбрасывается в конец блока catch (чтобы помочь гарантировать потоку завершиться так, как это ожидалось). Однако есть возможность предотвратить автоматическое повторное выбрасывание исключения вызовом Thread.ResetAbort внутри блока catch. Тогда поток вернется в состояние Running (с которого он потенциально снова может получить Abort). В следующем примере рабочий поток возвращается к жизни всякий раз, когда сделана попытка оборвать его выполнение вызовом Abort:

class Terminator
{
   static void Main()
   {
      Thread t = new Thread (Work);
      t.Start();
      Thread.Sleep (1000); t.Abort();
      Thread.Sleep (1000); t.Abort();
      Thread.Sleep (1000); t.Abort();
   }
 
   static void Work()
   {
      while (true)
      {
         try { while (true); }
         catch (ThreadAbortException) { Thread.ResetAbort(); }
         Console.WriteLine ("I will not die!");
      }
   }
}

ThreadAbortException обрабатывается при выполнении программы, не заставляя завершаться все приложение если ThreadAbortException не было обработано, в отличие от всех других типов исключений.

Abort будет работать на потоке в любом его состоянии - running, blocked, suspended или stopped. Однако если обрывается поток в состоянии suspended, будет выброшено исключение ThreadStateException - на этот раз в вызывающем потоке - и остановка потока не завершится, пока работа останавливаемого потока не будет возобновлена (resume). Вот так следует обрывать приостановленный поток:

try { suspendedThread.Abort(); }
catch (ThreadStateException) { suspendedThread.Resume(); }
// Теперь приостановленный поток suspendedThread будет оборван.

Сложности, связанные с Thread.Abort. Если предположить, что обрываемый поток не вызвал ResetAbort, то Вы можете ожидать, что завершение потока произойдет очень быстро. Однако как это происходит с хорошим адвокатом, поток может оставаться в камере смертников довольно долго! Вот несколько факторов, которые могут сохранить поток на некоторое время в состоянии AbortRequested:

• Конструкторы статического класса никогда не прерываются таким способом (чтобы потенциально не отравить оставшуюся жизнь домена приложения).
• Все блоки catch/finally сохраняют работу, и никогда не обрываются посередине.
• Если обрывается поток в необслуживаемом коде (unmanaged code), то выполнение продолжится до момента достижения следующего оператора обслуживаемого кода (managed code).

Последний фактор может быть в частности проблематичным в том, что сама платформа .NET часто вызывает необслуживаемый код, оставаясь в нем на довольно долгое время. Примерами могут служить класс, обслуживающий работу с сетью, или класс, работающий с базой данных. Если сетевой ресурс недоступен, или медленно отвечает, то есть возможность, что выполнение останется на это время полностью в необслуживаемом коде возможно на минуты, в зависимости от реализации этого класса. В таких случаях, конечно, не хотелось бы подключаться (Join) к обрываемому потоку - по крайней мере без таймаута!

Оборвать выполнение кода .NET менее проблематично, пока блоки try/finally или операторы гарантируют правильную очистку при выбрасывании исключения ThreadAbortException. Однако даже в этом случае все еще можно быть уязвимым для неприятных неожиданностей. Для примера рассмотрим следующее:

using (StreamWriter w = File.CreateText ("myfile.txt"))
   w.Write ("Abort-Safe?");

Оператор using языка C# это просто синтаксический ярлычок, который в этом случае раскрывается в следующее:

StreamWriter w;
w = File.CreateText ("myfile.txt");
try     { w.Write ("Abort-Safe"); }
finally { w.Dispose();            }

Может произойти так, что Abort произойдет после того, как создан StreamWriter, но перед началом блока try. Фактически, если углубиться в дебри IL (Intermediate Language, промежуточный язык), можно увидеть, что запуск Abort может произойти между созданием StreamWriter и присвоением его переменной w:

IL_0001:  ldstr      "myfile.txt"
IL_0006:  call       class [mscorlib]System.IO.StreamWriter
                     [mscorlib]System.IO.File::CreateText(string)
IL_000b:  stloc.0
.try
{
   ...

Так или иначе вызов метода Dispose в блоке finally будет обойден, в результате останется заброшенным открытый дескриптор файла, что не даст возможности последующим попыткам создать файл myfile.txt до завершения процесса.

В реальной ситуации этого примера все может быть еще хуже, потому что Abort мог произойти в реализации File.CreateText. Это так называемый непрозрачный код, для которого нет исходника. К счастью, код .NET никогда не бывает по-настоящему непрозрачным: мы все еще можем воспользоваться ILDASM - или еще лучше утилитой Reflector автора Lutz Roeder, и увидеть, что File.CreateText вызвал конструктор StreamWriter, у которого следующая логика:

public StreamWriter (string path, bool append, ...)
{
   ...
   ...
   Stream stream1 = StreamWriter.CreateFile (path, append);
   this.Init (stream1, ...);
}

Нигде в этом конструкторе нет блока try/catch, что означает, что если Abort будет вызван в (не тривиальном) методе Init, новый созданный поток будет заброшен, и не останется способа закрыть нижележащий дескриптор файла.

Это ставит вопрос о том, как все-таки написать метод, дружественный к Abort. Самый общий способ обойти эту проблему - вообще не обрывать другой поток, реализовав вместо этого шаблоном кооперативной отмены активности, как это было описано ранее (см. [4], секция "Безопасная отмена операции потока").

Завершение доменов приложения. Другой способ реализовать рабочий поток, дружественный к Abort - запустить его в своем собственном домене приложения. После вызова Abort Вы снесете под корень и заново создадите домен приложения. Это позаботиться о плохом состоянии, вызванном частичной или неправильной инициализацией (хотя, к сожалению, не дает гарантии защиты от сценариев неудачного случая, описанного выше - обрыв работы конструктора StreamWriter все еще может привести к утечке необслуживаемого дескриптора).

Строго говоря, первый шаг - Abort потока - не нужен, потому что когда выгружается домен приложения, все его выполняющиеся потоки автоматически обрывают свое выполнение. Однако недостаток такого поведения в том, что если оборванные потоки своевременно не завершатся (возможно из-за зависания в блоках finally, или по другим причинам, которые мы обсуждали раньше), то домен приложения не будет выгружен, и в вызывающем коде будет выброшено исключение CannotUnloadAppDomainException. По этой причине лучше явно оборвать выполнение рабочего потока, затем вызвать Join с некоторым таймаутом (над которым Вы имеете контроль) перед выгрузкой домена приложения.

Создание и уничтожение домена приложения относительно затратно по времени мире, где активно множество потоков (может занять несколько миллисекунд, если не больше), так что это нечто такое, что следует делать не регулярно, и уж точно не в цикле! Также разделение кода, вводимое доменом приложения, вводит другой элемент, который может нести в себе преимущество или вред, в зависимости от цели, которую ставит перед собой приложение. В контексте тестирования модулей (unit-testing), например, запуск потоков в отдельных доменах приложения дает выгоду.

Завершение процессов. Другой способ, каким можно завершить поток - завершение родительского процесса. Один из примеров такой ситуации - когда свойство IsBackground рабочего потока установлено в true (рабочий поток получает свойство фонового потока), и главный поток завершается, когда рабочий поток еще работает. Фоновый поток не способен поддерживать работу приложения, и поэтому при завершении процесса фоновый поток завершается вместе с ним.

Когда поток завершается из-за завершение его родительского процесса, он немедленно умирает, и его блоки finally не выполняются.

Та же самая ситуация возникает, когда пользователь завершает не отвечающее приложение с помощью Менеджера Задач Windows (утилита Task Manager), или прибивает процесс программно вызовом Process.Kill.

[Ссылки]

1. Threading in C# PART 4: ADVANCED THREADING site:albahari.com.
2. Потоки на C#. Часть 1: введение.
3. Потоки на C#. Часть 2: основы синхронизации.
4. Потоки на C#. Часть 3: использование потоков.
5. 9 способов испортить код с помощью volatile.
6. SpinLock и SpinWait.

 

Добавить комментарий


Защитный код
Обновить

Top of Page