Программирование PC Потоки на C#. Часть 2: основы синхронизации Thu, March 28 2024  

Поделиться

Нашли опечатку?

Пожалуйста, сообщите об этом - просто выделите ошибочное слово или фразу и нажмите Shift Enter.

Потоки на C#. Часть 2: основы синхронизации Печать
Добавил(а) microsin   

В предыдущей части [2] было показано, как запустить задачу (task) на потоке (thread), как конфигурировать поток и как передать данные в обоих направлениях (в поток и из потока). Также описывалось, каким образом локальные переменные делаются приватными для потока, и как ссылки могут использоваться между потоками совместно, чтобы можно было обмениваться данными между потоками через общие поля класса.

Следующий важный шаг это синхронизация: координация действий потоков, чтобы они работали предсказуемым образом. Синхронизация в частности важна, когда потоки обращаются к одним и тем же данным; удивительно просто работать с такими данными.

Конструкции синхронизации можно поделить на 4 категории:

Простые методы блокировки. Суть этих методов в ожидании, когда другой поток завершится. Ожидание может быть задано на определенный период времени. К методам простой блокировки относятся Sleep, Join и Task.Wait.

Конструкции критических секций (locking constructs). Это вводит ограничение, что определенная секция кода может исполняться в любой момент времени только ограниченным количеством потоков. Исключительная блокировка (exclusive locking) встречается чаще всего - она позволяет в любой момент времени только одному потоку осуществлять доступ к общим данным, при этом другие потоки не могут помешать доступу. Стандартные конструкции исключительной блокировки это lock (Monitor.Enter/Monitor.Exit, см. далее "Locking"), Mutex (см. далее) и SpinLock (см. [3]). Конструкции не исключительной блокировки (nonexclusive locking) это Semaphore, SemaphoreSlim (см. далее) и блокировки reader/writer (см. [4]).

Конструкции сигнализации (signaling). Они позволяют потоку приостановиться, пока не придет оповещение от другого потока, что устраняет необходимость не эффективного опроса (каких-то общих флагов или переменных). Есть два используемых обычно устройства сигнализации (signaling devices): обработчики ожидания события (event wait handles, см. далее) и методы and Wait/Pulse класса Monitor [5]. Framework 4.0 представляет классы CountdownEvent (см. далее) и Barrier [6].

Не блокирующие конструкции синхронизации. Они защищают доступ к общему полю путем вызова примитивов процессора. Библиотека CLR и язык C# предоставляют следующие не блокирующие конструкции: Thread.MemoryBarrier, Thread.VolatileRead, Thread.VolatileWrite [7], ключевое слово volatile [8] и класс Interlocked [9].

Блокировка важна для всех перечисленных категорий, кроме последней. Давайте кратко рассмотрим концепцию блокировки.

[Что такое блокировка]

Поток считается заблокированным, когда его выполнение приостановлено по какой-то причине, такой как засыпание (Sleep) или ожидание завершения другого потока с помощью Join или EndInvoke. Заблокированный поток немедленно уступает текущий квант процессорного времени, и с этого момента не использует процессор, пока не будет удовлетворено условие снятия блокировки (blocking condition). Вы можете проверить, заблокирован ли поток, с помощью его свойства ThreadState:

bool blocked = (someThread.ThreadState & ThreadState.WaitSleepJoin) != 0;

Примечание: состояние потока может измениться в момент его опроса, и сами действия по постоянному опросу не рекомендуются как не эффективные. Поэтому приведенный выше код полезен только в сценариях диагностики.

Когда поток блокируется или разблокируется, операционная система (точнее - её планировщик, sheduler) выполняет так называемое переключение контекста (context switch). Эта операция влечет трату процессорного времени в несколько микросекунд.

Разблокировка происходит одним из 4 способов (кнопка питания на системном блоке компьютера не считается!):

• Удовлетворено условие блокировки
• Истек таймаут операции (если был указан таймаут)
• Работа потока была прервана с помощью Thread.Interrupt [10]
• Работа потока была оборвана с помощью Thread.Abort [10]

Поток не считается заблокированным, если его выполнение приостановлено через (устаревший) метод Suspend [11].

Блокировка против циклов опроса. Иногда поток должен поставить свои действия на паузу, пока не выполнится определенное условие. Конструкции сигнализации и критических секций эффективно решают эту проблему с помощью блокировки потока до момента, пока не будет удовлетворено условие. Однако есть более простая альтернатива: поток может ждать появления нужного условия в постоянно прокручивающемся цикле опроса. Пример:

while (!proceed);

или:

while (DateTime.Now < nextStartTime);

О общем это очень затратный способ, бесполезно тратящий ресурс процессора: библиотека CLR и операционная система будут считать, что поток выполняет важное вычисление, и даст ему на это соответствующие выделенные ресурсы!

Иногда применяется гибрид между блокировкой и прокруткой с опросом:

while (!proceed) Thread.Sleep (10);

Хотя такой вариант не слишком элегантен, он обычно намного более эффективен чем прямая прокрутка цикла без вызова Sleep. Хотя тут проблемы могут возникнуть из-за проблем конкурентного доступа к опрашиваемому флагу, по которому будет запущено продолжение действий. Правильное использование критической секции и сигналов помогает избежать этой проблемы.

Прокрутка с циклом опроса может быть редко эффективной только в тех случаях, когда Вы ожидаете удовлетворения условия в очень краткий промежуток времени (возможно в пределах нескольких микросекунд) потому что это позволяет избежать чрезмерной нагрузки на процессор и задержки на переключение контекста. Среда .NET Framework предоставляют для этого специальные методы и классы, что рассматривается в разделе описания параллельного программирования [3].

Состояния потока (ThreadState). Вы можете узнать, в каком из состояний находится поток, путем чтения свойства ThreadState. Это вернет флаги перечисления типа ThreadState, которые комбинируют 3 "слоя" данных по принципу побитного кодирования информации. Однако большинство этих бит избыточны, не используются или устарели. На диаграмме ниже показан один из "слоев":

CSharp Threading ThreadState

Следующий код преобразует ThreadState в одно из четырех наиболее полезных значений: Unstarted, Running, WaitSleepJoin и Stopped:

public static ThreadState SimpleThreadState (ThreadState ts)
{
   return ts & (ThreadState.Unstarted |
                ThreadState.WaitSleepJoin |
                ThreadState.Stopped);
}

Свойство ThreadState полезно для целей диагностики, но его чтение в целях синхронизации будет работать нестабильно, потому что состояние потока может поменяться в любой момент, в том числе и между проверками ThreadState, и это повлияет на актуальность прочитанной из ThreadState информации.

[Блокировка]

Исключительная, или монопольная блокировка (exclusive locking) используется, чтобы гарантировать, что только один поток в любой момент времени мог войти в определенную секцию кода. Есть две основные конструкции для exclusive locking, это lock и Mutex. Из этих двух конструкция lock работает быстрее и более удобен. Однако у Mutex есть ниша, в которой её блокировка может охватить приложения в различных процессах, работающих на компьютере (чем отличается процесс от потока см. [1]).

В этой секции мы начнем обсуждения с конструкции lock, и затем перейдем к рассмотрению Mutex и семафорам (для не исключительной блокировки, nonexclusive locking). Позже мы рассмотрим блокировки reader/writer [4].

Начиная с Framework 4.0 есть также структура SpinLock для сценариев кода, выполняющегося в условиях высокой конкуренции.

Начнем с примера следующего класса:

class ThreadUnsafe
{
   static int _val1 = 1, _val2 = 1;
 
   static void Go()
   {
      if (_val2 != 0) Console.WriteLine (_val1 / _val2);
      _val2 = 0;
   }
}

Этот класс не будет потокобезопасным: если Go был вызван двумя потоками одновременно, то есть возможность получения ошибки деления на 0, потому что _val2 установилась бы в 0 в одном потоке, в и в другом потоке в это же время мог бы выполняться оператор в параметре вызова Console.WriteLine.

Вот так блокировка lock может исправить эту проблему:

class ThreadSafe
{
   static readonly object _locker = new object();
   static int _val1, _val2;
 
   static void Go()
   {
      lock (_locker)
      {
         // Начало критической секции кода
         if (_val2 != 0) Console.WriteLine (_val1 / _val2);
         _val2 = 0;
         // Конец критической секции кода
      }
   }
}

Только один поток может в любой момент времени заблокировать объект синхронизации lock (в этом примере объект синхронизации _locker). Любые претендующие на доступ к lock-участку кода будут заблокированы, пока блокировка не будет снята (т. е. пока выполнение не выйдет за пределы lock-участка кода). Такой участок кода также называют критической секцией. Если больше одного потока претендуют на доступ к региону кода lock, то они ставятся в очередь готовности (ready queue), и доступ к критической секции будет даваться по принципу FIFO, т. е. первым запросил доступ - первым получит доступ (некая проблема здесь заключается в нюансах поведения планировщика Windows и библиотеки CLR, в результате чего этот порядок предоставления доступа иногда нарушается). Исключительные блокировки, как иногда говорят, принуждает к применению строго последовательного доступа (serialized access) к участку кода, защищенному lock, потому что доступ со стороны одного потока никогда не может перекрыть доступ другого. В нашем примере логика защиты применена внутри метода Go method, когда осуществляется доступ к полям _val1 и _val2.

Поток блокируется, пока оспариваемый участок критического кода находится в состоянии ThreadState WaitSleepJoin. При рассмотрении Interrupt и Abort будет описано, как заблокированный поток может быть принудительно освобожден другим потоком. Это довольно мощный метод, который может использоваться для завершения потока.

Конструкция Назначение Работает между процессами? Загрузка(*)
lock (Monitor.Enter / Monitor.Exit) Гарантирует, что только один поток в любой момент времени может получить доступ к ресурсу или секции кода. - 20 нс
Mutex ДА 1000 нс
SemaphoreSlim (добавлено в Framework 4.0) Гарантирует, что не более указанного количества потоков могут получить доступ к ресурсу или секции кода. - 200 нс
Semaphore ДА 1000 нс
ReaderWriterLockSlim (добавлено в Framework 3.5) Позволяет нескольким читающим потокам существовать вместе с одним записывающим. - 40 нс
ReaderWriterLock (сильно устарело) - 100 нс

Примечание (*): время, которое тратится на блокировку и разблокировку конструкции на одном и том же потоке (подразумевая, что другие потоки не блокируются), как это было измерено на процессоре Intel Core i7 860.

Monitor.Enter и Monitor.Exit. Оператор lock на C# фактически является "синтаксическим сахаром", т. е. обертками над вызовами методов Monitor.Enter и Monitor.Exit с блоком try/finally. Это представляет программисту упрощенную версию того, что реально происходит внутри метода Go в предыдущем примере:

Monitor.Enter (_locker);
try
{
   if (_val2 != 0) Console.WriteLine (_val1 / _val2);
   _val2 = 0;
}
finally { Monitor.Exit (_locker); }

Вызов Monitor.Exit без предшествующего вызова Monitor.Enter на одном и том же объекте приведет к выбрасыванию исключения.

Перезагрузки lockTaken. Код, который мы только что продемонстрировали, на компиляторах C# версия 1.0, 2.0 и 3.0 будет транслироваться из оператора lock.

Однако в этом коде есть тонкая уязвимость. Рассмотрим (маловероятное) событие исключения, которое выбрасывается с реализацией Monitor.Enter между вызовом Monitor.Enter и блоком try (при этом возможно будет вызван Abort на этом потоке, либо выбрасывание исключение OutOfMemoryException). В таком сценарии блокировка может не произойти. Если блокировка произошла, то она не будет освобождена - потому что мы никогда не войдем в блок try/finally. Это приведет к пропущенной блокировке (leaked lock).

Для устранения этой опасности разработчики CLR 4.0 добавили следующую перезагрузку для Monitor.Enter:

public static void Enter (object obj, ref bool lockTaken);

Параметр lockTaken равен false после этого метода если (и только если) метод Enter выбросил исключение и блокировка lock не была взята.

Вот корректный шаблон использования (в который C# 4.0 будет транслировать оператор lock):

bool lockTaken = false;
try
{
   Monitor.Enter (_locker, ref lockTaken);
   // Тут какой-то наш код...
}
finally { if (lockTaken) Monitor.Exit (_locker); }

TryEnter. Также предоставляет метод TryEnter, который позволяет задать таймаут либо в миллисекундах, либо через TimeSpan. Этот метод вернет true, если блокировка была получена, или false, если блокировка не была получена из-за таймаута метода. TryEnter может быть также вызван без аргумента, что "проверяет" блокировку lock, таймаут произойдет немедленно, если блокировка не может быть получена надлежащим способом.

Как и метод Enter, метод TryEnter перезагружен в CLR 4.0, чтобы принять аргумент lockTaken.

Выбор объекта синхронизации. Любой объект, видимый каждому из участвующих в общей работе потоков, может использоваться в качестве синхронизирующего объекта согласно одному жесткому правилу: это должен быть ссылочный тип (reference type). Синхронизирующий объект обычно имеет область доступа private (потому что это помогает инкапсулировать логику блокировки) и обычно это поле экземпляра или статическое поле. Синхронизирующий объект может иметь двойное назначение, т. е. он может быть встроен в защищаемый объект, как это делает поле _list в следующем примере:

class ThreadSafe
{
   List < string> _list = new List < string>();
 
   void Test()
   {
      lock (_list)
      {
         _list.Add ("Item 1");
         ...

Поле, выделенное для этой цели (такое как _locker в предыдущем примере), позволяет точное управление областью действия и гранулярностью блокировки. Объект текущего содержимого (containing object, this) - или даже его тип - также может использоваться в качестве объекта синхронизации:

lock (this) { ... }

или:

lock (typeof (Widget)) { ... }      // Для защиты доступа к static-данным

Недостаток такого способа блокировки в том, что Вы не инкапсулируете отдельно логику блокировки, и становится сложнее защититься от глухой блокировки (deadlocking, см. ниже) и излишней блокировки (excessive blocking). Блокировка на типе также может просочиться через границы домена приложения (в пределах одного и того же процесса).

Также Вы можете реализовать блокировку lock на локальных переменных, захваченных lambda-выражениями или anonymous-методами.

Блокировка не ограничивает каким-либо образом доступ к самому объекту. Другими словами, x.ToString() не будет блокироваться, потому что другой поток вызвал lock(x); оба потока должны вызвать lock(x), чтобы блокировка произошла.

Когда применять блокировку? Как основное правило, Вам нужна блокировка вокруг доступа к любой записываемой общей переменной (writable shared field). Даже в простейшем случае - операция присваивания одиночного поля - нужно учитывать синхронизацию. В следующем классе ни метод Increment, ни метод Assign не будут потокобезопасными:

class ThreadUnsafe
{
   static int _x;
   static void Increment() { _x++; }
   static void Assign()    { _x = 123; }
}

Ниже показаны потокобезопасные версии для Increment и Assign:

class ThreadSafe
{
   static readonly object _locker = new object();
   static int _x;
 
   static void Increment() { lock (_locker) _x++; }
   static void Assign()    { lock (_locker) _x = 123; }
}

В не блокирующей синхронизации [12] мы рассмотрим случаи, когда возникает необходимость в такой синхронизации, и как барьеры на памяти и класс Interlocked [9] может предоставить альтернативу блокировке в этих ситуациях.

Блокировка и атомарность. Если группа переменных всегда читается и записывается в пределах одной и той же блокировки lock, можно сказать, что эти переменные читаются и записываются атомарно. Предположим, что поля x и y всегда читаются и назначаются внутри блокировки на объекте locker:

lock (locker) { if (x != 0) y /= x; }

Можно сказать, что к x и y осуществляется атомарный доступ, потому выполнение кода в пределах блокировки не может быть разделено или вытеснено действиями в другом потоке, в результате чего посторонние действия никак не могут отдельно повлиять на x или y так, что результат вычислений станет недостоверным. Вы никогда не получите ошибку деления на 0, так как доступ к x и y реализован в одной исключительной блокировке (exclusive lock).

Атомарность, предоставленная блокировкой lock, нарушается, если произойдет выбрасывание исключения внутри блока lock. Например:

decimal _savingsBalance, _checkBalance;
 
void Transfer (decimal amount)
{
   lock (_locker)
   {
      _savingsBalance += amount;
      _checkBalance -= amount + GetBankFee();
   }
}

Если исключение сработало в результате вызова GetBankFee(), то банк потерял бы деньги. В этом случае нам нужно избегать проблем путем вызова GetBankFee заранее. Решение подобной проблемы для более сложных случаев - реализация логики отката (rollback) с помощью блока catch или finally.

Понятие атомарности инструкции это другая, хотя аналогичная концепция: инструкция считается атомарной, если она неделимо (её никак нельзя прервать, поделить на части) выполняется на нижележащем процессоре, см. описание не блокирующей синхронизации [12].

Вложенные блокировки. Поток можно повторно блокировать на одном и том же объекте вложенным (реентрантным) способом:

lock (locker)
   lock (locker)
      lock (locker)
      {
         // Тут какие-то действия...
      }

или так:

Monitor.Enter (locker); Monitor.Enter (locker);  Monitor.Enter (locker); 
   // Тут какие-то действия...
Monitor.Exit (locker);  Monitor.Exit (locker);   Monitor.Exit (locker);

В этих сценариях объект разблокируется только когда произойдет выход из самого внешнего оператора lock, или будет выполнено соответствующее количество операторов Monitor.Exit.

Вложенная блокировка полезна, когда один метод вызывает другой в пределах критической секции lock:

static readonly object _locker = new object();
 
static void Main()
{
   lock (_locker)
   {
      AnotherMethod();
      // У нас все еще есть блокировка - потому что блокировки lock реентрантны.
  }
}
 
static void AnotherMethod()
{
   lock (_locker) { Console.WriteLine ("Метод AnotherMethod"); }
}

Поток может быть заблокирован только на первом (самым внешним) операторе lock.

Глухие блокировки (deadlock). Глухая, или "мертвая" блокировка deadlock произойдет, когда два потока взаимно ждут освобождения ресурса, захваченного другим потоком, в результате ничего не происходит. Ниже приведена самая простая иллюстрация этой ситуации с двумя блокировками lock:

object locker1 = new object();
object locker2 = new object();
 
new Thread (() => {
                     lock (locker1)
                     {
                        Thread.Sleep (1000);
                        lock (locker2);   // Deadlock
                     }
                  }).Start();
 
lock (locker2)
{
  Thread.Sleep (1000);
  lock (locker1);                         // Deadlock
}

Программист может "наколбасить" и более сложные цепочки глухой блокировки с участием трех и большего количества потоков.

Библиотека CLR в стандартном окружении хоста не работает наподобие сервера SQL Server, не определяет автоматически глухие блокировки и не представляет автоматическое средство исправления таких блокировок путем останова одного из участников глухой блокировки. Глухая блокировка потоков заставляет их на неопределенное время прервать свое выполнение, если конечно Вы не предусмотрели таймаут блокировки. В итерации хоста SQL CLR, однако, deadlock-и автоматически определяются и выбрасывается (перехватываемое catch) исключение в одном из потоков, участвующих в глухой блокировке.

Deadlock одна из самых жестких проблем в многопоточности, особенно когда есть множество взаимосвязанных объектов. Фундаментальная сложность состоит в том, что Вы не можете быть уверены, что завершилась блокировка кода, который сам вызвал блокировку.

Например, Вы можете нечаянно заблокировать private-поле в своем классе x, не зная, что Ваш вызывающий код (или код, вызвавший вызывающего) уже заблокирован на поле b в классе y. Между тем другой поток делает обратное, создавая deadlock. Ирония тут в том, что проблема усиливается "хорошими" (изначально подразумеваемыми) шаблонами объектно-ориентированного стиля программирования, потому что принцип "скрывай детали кода внутри объектов" создает цепочки взаимосвязей, которые не очевидны для программиста, пока код не начнет выполняться в реальном времени.

Популярные советы избежать мертвых блокировок типа "блокируйте объекты в правильном порядке" тяжело применить на практике, хотя они могут помочь в простых случаях, примеры которых мы описывали. Лучшая стратегия - особенно внимательно применять блокировки вокруг вызова методов в объектах, которые могут ссылаться обратно на Ваш собственный объект. Тщательно взвесьте необходимость блокировки вокруг вызова методов в других классах, часто это делается, однако иногда - что мы рассмотрим позже - есть и другие опции реализации. Больше полагаясь на декларативность [13] и параллелизм обработки данных [14], не изменяемые типы (immutable types, см. далее) и не блокирующие конструкции синхронизации [12], можно снизить необходимость в блокировках.

Есть еще один способ почувствовать проблему: когда Вы вызываете другой код, содержащий блокировку, происходит скрытая инкапсуляция этой блокировки. Это не приведет к ошибке в библиотеке CLR или .NET Framework, но является фундаментальным ограничением блокировки в целом. Проблемам блокировки посвящены многие исследовательские проекты, включая Software Transactional Memory.

Другой сценарий мертвой блокировки возникнет, когда вызывается Dispatcher.Invoke (в приложении WPF) или Control.Invoke (в приложении Windows Forms) во время активной блокировки. Если случилось так, что UI запустил другой метод, который ждет на той же блокировке, то произойдет deadlock. Это часто можно исправить простым вызовом BeginInvoke вместо Invoke. Альтернативно Вы можете освободить свою блокировку перед вызовом Invoke, хотя это не будет работать, если вызывающий код запустил блокировку. Invoke и BeginInvoke будут рассматриваться далее в секции "Rich Client Applications и Thread Affinity".

Производительность. Блокировка работает быстро: Вы можете ожидать, что захват и освобождение критической секции lock займет меньше 20 наносекунд на поколении компьютеров 2010 года, если к этой секции блокировки не было конкурентного доступа. Если же был случай конкурентного доступа, то последующие затраты на переключение контекста введут трату процессорного времени примерно около микросекунды, хотя этот интервал может быть больше, если учесть время, за которое смена состояния потока будет реально обработана планировщиком. Вы можете избежать трат на переключение контекста с помощью класса SpinLock [3] - если блокировка происходит очень коротко.

Блокировка может снизить конкурентность, если блокировка удерживается слишком долго. Это также может повысить шансы возникновения deadlock.

Mutex. Мьютекс подобен C# lock, но он может работать между несколькими процессами. Другими словами, Mutex может действовать как в пределах компьютера, так и в пределах приложения.

Захват и освобождение Mutex в случае отсутствия конкурентных попыток доступа занимает несколько микросекунд - примерно в 50 раз медленнее, чем работает критическая секция lock.

С классом Mutex можно вызвать метод WaitOne для блокировки и ReleaseMutex для разблокировки. Закрытие или избавление от (disposing) Mutex автоматически освободит его. Так же, как и с оператором lock, Mutex может быть освобожден только в том потоке, который получил этот Mutex.

Общее использование для межпроцессного Mutex применяется для гарантии, что только один экземпляр программы может быть запущен в любой момент времени. Вот как это делается:

class OneAtATimePlease
{
   static void Main()
   {
      // Присвоение мьютексу имени делает его доступным в пределах
      // всего компьютера. Используйте имя, которое уникально для
      // Вашей компании, например имя, включающее URL сайта.
 
      using (var mutex = new Mutex (false, "oreilly.com OneAtATimeDemo"))
      {
         // Ожидание в течение нескольких секунд, если произошла попытка
         // конкурентного запуска в случае, когда другой экземпляр
         // программы все еще находится в процессе завершения.
         if (!mutex.WaitOne (TimeSpan.FromSeconds (3), false))
         {
            Console.WriteLine ("Another app instance is running. Bye!");
            return;
         }
         RunProgram();
      }
  }
 
   static void RunProgram()
   {
      Console.WriteLine ("Программа уже работает. Нажмите Enter для выхода");
      Console.ReadLine();
   }
}

Если приложение запущено по управлением службы терминала (Terminal Services) от имени определенного пользователя, то видимый в пределах всего компьютера Mutex обычно виден только для приложений в пределах одной терминальной сессии (т. е. только в сессии этого пользователя терминала). Чтобы сделать мьютекс видимым для всех сессий терминала, снабдите его имя префиксом Global\.

Семафор. Это объект, который похож на ночной клуб: у него есть определенная емкость, принудительно отслеживаемая вышибалой на входе. Будучи заполненным, клуб больше не может принять посетителей, что создает очередь снаружи. Тогда для каждой уходящей персоны зайдет одна персона из головы очереди. Конструктор семафора требует минимум 2 аргументов: количество доступных в настоящий момент мест и общая емкость семафора.

Семафор с емкостью, равной 1, работает подобно Mutex или lock, за исключением того, что у семафора нет "владельца" - он не обращает внимания на потоки (thread-agnostic). Любой поток может вызвать Release на Semaphore, в то время как с Mutex или lock, только один поток может получить блокировку и освободить её.

Есть две подобных по функционалу версии этого класса: Semaphore и SemaphoreSlim. Последний был представлен в Framework 4.0, и он оптимизирован для удовлетворения повышенных требований на малые задержки в параллельном программировании [14]. Также он полезен в традиционной многопоточности, потому что позволяет указать билет отмены (cancellation token [15]) для ожидания. Однако это нельзя использовать для сигнализации между процессами.

Semaphore вводит трату времени процессора около 1 микросекунды в вызове WaitOne или Release; SemaphoreSlim тратит на это около четверти микросекунды.

Семафоры могут быть полезные в пределах ограниченной многопоточности - они предотвращают ситуации, когда одну и ту же секцию кода выполняют слишком много потоков. В следующем примере 5 потоков пытаются войти в ночной клуб, в который разрешено войти за один раз только троим потокам:

class TheClub
{
   static SemaphoreSlim _sem = new SemaphoreSlim (3); // Емкость == 3
 
   static void Main()
   {
      for (int i = 1; i <= 5; i++) new Thread (Enter).Start (i);
   }
 
   static void Enter (object id)
   {
      Console.WriteLine (id + " хочет зайти");
      _sem.Wait();
      Console.WriteLine (id + " вошел!");    // Только 3 потока
      Thread.Sleep (1000 * (int) id);        // могут находиться тут
      Console.WriteLine (id + " выходит");// одновременно.
      _sem.Release();
   }
}

Результат работы этого примера:

1 хочет зайти
1 вошел!
2 хочет зайти
2 вошел!
3 хочет зайти
3 вошел!
4 хочет зайти
5 хочет зайти
1 выходит
4 вошел!
2 выходит
5 вошел!

Если вместо оператора Sleep выполняется интенсивный дисковый ввод/вывод, то этот семафор улучшит общую производительность программы путем ограничения конкурентной активности с жестким диском.

Semaphore, если он именован, может работать между процессами точно так же, как и Mutex.

[Безопасность потоков (Thread Safety)]

Программа или метод считается потокобезопасным (thread-safe) если он не вводит никакой неопределенности в работе кода при наличии сценария многопоточности. Безопасность потоков достигается главным образом путем блокировки и уменьшения возможности по взаимодействию между потоками.

Типы общего назначения (general-purpose types) редко ориентированы на многопоточность по следующим причинам:

• Цена разработки потокобезопасного кода может быть значительной, в частности если разрабатываемый тип имеет множество полей (каждое поле потенциально может взаимодействовать в произвольном контексте многопоточности).
• Обеспечение потокобезопасности может повлечь дополнительные затраты процессорного времени, что снизит общую производительность кода (в частности затраты могут зависеть от того, используется ли в действительности этот тип несколькими потоками).
• Потокобезопасный тип вовсе не обязательно приведет к безопасному его использованию в программе. Также часто конечный код, включающий использование потокобезопасного типа, делает его потокобезопасность избыточной.

Следовательно, рационально потокобезопасность реализовать только в том месте, где она действительно должна быть, чтобы правильно обработать определенный сценарий многопоточности.

Однако здесь есть несколько способов "обмана", чтобы безопасно запустить большие и сложные классы в многопоточном окружении. Один из них - жертвовать гранулярностью путем обертки одной монопольной блокировкой больших секций кода - даже доступ ко всем объекту - принуждая применить последовательный доступ к объекту на верхнем уровне. Такая тактика фактически важна, если Вы хотите использовать не безопасный по отношению к многопоточности сторонний код (или большинство Framework-типов) в многопоточном контексте. Прием должен просто использовать одинаковую монопольную блокировку (exclusive lock), чтобы защитить доступ ко всем свойствам, методам и полям на не безопасном по отношению к потокам объекте. Это решение хорошо работает, если все методы объекта выполняются быстро (иначе будут иметь место большие блокировки).

Примитивные типы, к которым относятся некоторые типы .NET Framework, будучи инстанцированными, являются потокобезопасными только при доступе read-only. Ответственность за правильное их использование для потокобезопасности лежит на разработчике, обычно это верно для монопольных блокировок (исключение составляют сборки в библиотеке System.Collections.Concurrent).

Другой способ обмана - минимизация взаимодействия потоков путем минимизации количества общих данных. Это отличный подход, неявно используемый в приложениях среднего уровня, не сохраняющих свое состояние, и серверах веб-страниц. Поскольку могут одновременно поступить несколько клиентских запросов, вызываемые методы сервера должны быть потокозащищенными. Дизайн без сохранения состояния (stateless, популярный по причине хорошей масштабируемости) изначально ограничивает возможность взаимодействия между потоками, поскольку классы не сохраняют данные между запросами. Тогда взаимодействие потоков ограничивается только статическими полями, решение создать которые можно принять только в целях кэширования в памяти общих используемых данных, и предоставления инфраструктуры служб аутентификации и аудита.

Конечный этап реализации безопасности потоков состоит в использовании режима автоматической блокировки. Библиотека .NET Framework это точно делает, если вы разделяете ContextBoundObject на подклассы и применяете атрибут Synchronization для этого класса. Тогда каждый раз, когда вызывается метод или происходит обращение к свойству такого объекта, автоматически берется блокировка на весь объект, пока не выполнится полностью метод или не завершится доступ к свойству. Хотя это упрощает обеспечение безопасности потоков, возникают собственные проблемы: мертвые блокировки (deadlock), которые иначе не произошли бы, ухудшение параллелизма и непреднамеренная реентрантность. По этим причинам ручная блокировка обычно лучший выбор - по крайней мере пока не станет доступным упрощенный режим автоматической блокировки.

Безопасность потоков и типы .NET Framework. Блокировку можно использовать, чтобы превратить не безопасный по отношению к потокам код в потокобезопасный. Хорошее применение для этого библиотека .NET Framework: почти все её не примитивные типы, будучи инстанцированными, не являются потокобезопасными (для чего-то большего, чем доступ только на чтение). И все же они могут быть использованы в многопоточном коде, если любой доступ к любому имеющемуся объекту осуществляется через блокировку lock. Ниже приведен пример, где два потока одновременно добавляют элемент в одну и ту же коллекцию List, и затем делают перечисление списка в цикле:

class ThreadSafe
{
   static List < string> _list = new List < string>();
 
   static void Main()
   {
      new Thread (AddItem).Start();
      new Thread (AddItem).Start();
   }
 
   static void AddItem()
   {
      lock (_list) _list.Add ("Item " + _list.Count);
      string[] items;
      lock (_list) items = _list.ToArray();
      foreach (string s in items) Console.WriteLine (s);
   }
}

В этом примере мы осуществляем блокировку на самом объекте _list. Если бы у нас было 2 взаимосвязанных списка, мы должны были бы выбрать общий объект, на котором можно было бы осуществить блокировку (мы могли бы выбрать для этого один из этих списков, но лучше использовать для блокировки отдельное, независимое поле).

Перечисление коллекций .NET также не является потокобезопасным в том смысле, что будет выброшено исключение, если список модифицируется во время процесса его перечисления. Вместо того, чтобы блокироваться на время до завершения перечисления, в этом примере мы просто сначала делаем копию элементов в массив. Это дает возможность избежать чрезмерной блокировки, если процесс перечисления списка может занять много времени (другое решение - использовать блокировку reader/writer [4]).

Блокировки вокруг потокобезопасных объектов. Иногда также нужно делать блокировку при доступе к потокобезопасным объектам. Для иллюстрации представим, что Framework-класс List был действительно потокобезопасным, и мы хотим добавить элемент в его список:

if (!_list.Contains (newItem)) _list.Add (newItem);

Независимо от того, является список потокобезопасным или нет, этот оператор определенно не является потокобезопасным! Для обеспечения потокобезопасности весь оператор if должен быть обернут в блокировку, чтобы предотвратить вытеснение в промежутке между проверкой и добавлением нового элемента. Та же самая блокировка затем нужна в любом месте, где мы модифицируем этот список. Например, следующий оператор также нуждается в обертке идентичной блокировкой (чтобы гарантировать, что процесс модификации не вытеснит предыдущий оператор):

_list.Clear();

Другими словами, мы должны были бы реализовать блокировку точно так же, как это делали с не безопасными для многопоточного использования классами.

Блокировка вокруг доступа к коллекции может привести к чрезмерной блокировке в среде высокой конкуренции потоков. Для этого Framework 4.0 предоставляет потокобезопасные очередь (queue), стек (stack) и словарь (dictionary).

Статические члены класса. Обертывание доступа к объекту вокруг пользовательской блокировки работает только если все конкурентные потоки учитывают и используют блокировки. Это может не иметь место, если объект широко доступен. Самый худший случай - статический (объявленный с ключевым словом static) член класса со снятым ограничением на доступ (объявлен с типом доступа public). Для примера представим: если статическое свойство DateTime.Now структуры DateTime, было бы не потокобезопасным, то два конкурентных доступа к нему дадут ошибочный результат или выбрасывание исключения. Единственный способ бороться с этим через внешнюю блокировку мог бы состоять в том, чтобы заблокировать доступ к самому типу - lock(typeof(DateTime)) - перед вызовом DateTime.Now. Это работало бы, только если все программисты согласились бы поступать подобным образом (что маловероятно). Кроме того, блокировка типа создает собственные проблемы.

По этой причине статические члены структуры DateTime (структура и класс на C# это по сути одно и то же) должны быть тщательно реализованы для обеспечения потокобезопасности. Ото общий шаблон поведения кода библиотеки .NET Framework: static-члены потокобезопасны; члены экземпляров (instance members) не потокобезопасны. Следование этому шаблону также целесообразно при написании типов для публичного использования, чтобы не создавать невозможные проблемы с безопасностью потоков. Другими словами, путем реализации статических методов потокозащищенными Вы программируете код типа, чтобы не исключать безопасность потоков для пользователей этого типа.

Безопасность для использования в потоках статических методов это то, что Вы должны кодировать специально: оно не произойдет само собой, на основании объявления метода статическим!

Безопасность для потоков при доступе только на чтение. Реализация типов потокобезопасными для конкурентного доступа только на чтение (где это возможно) выгодна, поскольку это означает, что пользователи могут избежать чрезмерной блокировки. Многие типы из библиотеки .NET Framework следуют этому принципу: коллекции (collections), например, потокобезопасны для конкурентного доступа потоков на чтение.

Следование этому принципу простое: если Вы документируете тип как потокобезопасный для конкурентного доступа только на чтение, не записывайте в поля в методах, которые пользователь ожидает работающими только на чтение (или делайте блокировку вокруг записи). Например, в реализации метода ToArray() в коллекции Вы можете начать со сжатия внутренней структуры коллекции. Однако это сделало бы метод не потокобезопасным для пользователей, которые ожидают доступа только на чтение.

Безопасность потоков для только чтения одна из причин, по которой перечислители (enumerators) отделены от перечислений (enumerables): два потока могут одновременно перечислять элементы коллекции, потому что каждый получает отдельный объект перечислителя.

При отсутствии документации приходится платить предположением, что по своей природе любой метод работает только на чтение. Хороший пример это класс Random: когда Вы вызываете Random.Next(), его внутренняя реализация требует, чтобы это действие обновило внутреннее приватное значение seed. Таким образом, Вы должны либо делать блокировку вокруг использования класса Random, или поддерживать раздельные экземпляры этого класса для каждого потока.

Безопасность потоков в серверах приложений (Application Servers). Серверы приложений требуют многопоточности для обработки одновременных запросов от клиентов. Приложения WCF, ASP.NET и Web Services неявно используют многопоточность; то же самое верно для приложений сервера Remoting, которые используют сетевой канал, такой как TCP или HTTP. Это означает, что когда Вы пишете код на стороне сервера, то должны учитывать безопасность потоков, если есть возможность взаимодействия среди потоков при обработке запросов клиентов. К счастью, такая возможность редка; типичный класс сервера либо не сохраняет состояния (stateless, не имеет полей), либо содержит модель активации, которая создает отдельный экземпляр объекта на каждый потупивший запрос от клиента. Взаимодействие обычно возникает только через статические поля, иногда используемые для кэширования в памяти частей базы данных в целях улучшения производительности.

Например, предположим, что у Вас есть метод RetrieveUser, который делает запрос к базе данных:

// User это пользовательский класс с полями для данных пользователя.
internal User RetrieveUser (int id) { ... }

Если этот метод вызывался часто, то Вам следует улучшить производительность путем кэширования результатов в статическом словаре (static Dictionary). Вот решение, учитывающее безопасность потоков:

static class UserCache
{
   static Dictionary < int, User> _users = new Dictionary < int, User>();
 
   internal static User GetUser (int id)
   {
      User u = null;
      lock (_users)
         if (_users.TryGetValue (id, out u))
            return u;
 
      u = RetrieveUser (id);     // Метод для получения пользователя из базы данных
      lock (_users) _users [id] = u;
      return u;
   }
}

Мы должны, как минимум, делать блокировку вокруг чтения и обновления словаря для гарантии безопасности потоков. В этом примере мы выбрали практический компромисс в блокировке между простотой и производительностью. Наш дизайн потенциально создает очень малую потенциальную не эффективность: если 2 потока одновременно вызовут этот метод с одним и тем же ранее не запрашиваемым id, то метод RetrieveUser был бы вызван дважды - и словарь получил бы ненужное обновление. Блокировка вокруг всего метода предотвратила бы это, но создала бы еще меньшую эффективность: весь кэш был бы заблокирован на время вызова RetrieveUser, в течение этого времени другие потоки блокировались бы при запросе любого пользователя.

Rich Client Applications и Thread Affinity. Обе библиотеки Windows Presentation Foundation (WPF) и Windows Forms следуют моделям на основе родственности потоков (thread affinity). Хотя каждая из библиотек имеет отдельную реализацию, обе очень похожи по своим функциям.

Объекты, которые создают rich client, основаны главным образом на DependencyObject в случае применения WPF, или на Control в случае Windows Forms. Эти объекты имеют родственность потоков. Это означает, что только поток, который который объект инстанцирует, может впоследствии получить доступ к его членам. Нарушение правила приведет либо к не предсказуемому поведению, либо к выбросу исключения.

Положительно то, что Вам не нужно делать блокировку вокруг доступа к объекту UI. Недостаток же в том, что если нужно вызвать член объекта X, который был создан другим потоком Y, то Вы должны перенаправить (marshal) запрос на вызов объекту Y. Вы явно можете делать это следующим образом:

• В WPF вызовите Invoke или BeginInvoke на элементе объекта Dispatcher.
• В Windows Forms вызовите Invoke или BeginInvoke на элементе управления (control).

Invoke и BeginInvoke оба принимают делегата, который обращается к методу на целевом элементе управления, который Вы хотите запустить. Invoke работает синхронно: вызывающий код блокируется, пока перенаправление (marshal) не завершится. BeginInvoke работает асинхронно: вызвавший его код немедленно получает обратно управление, и маршалированный запрос ставится в очередь (с использованием той же самой очереди сообщений, которая поддерживает события клавиатуры, мыши и таймера).

Предположим, что у нас есть окно, которое содержит текстовое поле ввода (text box) с именем txtMessage, содержимое которого мы хотим обновлять рабочим потоком. Вот пример для WPF:

public partial class MyWindow : Window
{
   public MyWindow()
   {
      InitializeComponent();
      new Thread (Work).Start();
   }
 
   void Work()
   {
      Thread.Sleep (5000);    // Симуляция интенсивных вычислений
      UpdateMessage ("The answer");
   }
 
   void UpdateMessage (string message)
   {
      Action action = () => txtMessage.Text = message;
      Dispatcher.Invoke (action);
   }
}

Подобный код используется для Windows Forms, за исключением что мы вместо Dispatcher.Invoke вызовем метод Invoke (принадлежащий классу формы Form):

void UpdateMessage (string message)
{
   Action action = () => txtMessage.Text = message;
   this.Invoke (action);
}

Framework предоставляет две конструкции для упрощения этого процесса: 

   • BackgroundWorker [16]
   • Продолжения для Task [17]

Рабочие потоки против потоков UI. Полезно думать о приложениях rich client, что они имеют две категории потоков: потоки графического интерфейса пользователя (UI threads) и рабочие потоки (worker threads). Потоки UI инстанцируют (и впоследствии "владеют") элементами графического интерфейса UI; рабочие потоки не инстанцируют и не владеют элементами UI. Worker-потоки обычно выполняют длинные вычисления, такие как выборка/получение данных (если бы эти вычисления были короткими, то надобности в рабочих потоках не было бы).

Большинство приложений rich client имеют один поток UI (который также является главным потоком приложения) и этот поток периодически порождает рабочие потоки - либо напрямую, либо с использованием класса BackgroundWorker [16]. Эти рабочие потоки маршалируют свои обращения обратно в главный поток UI, чтобы обновлять состояние органов управления или чтобы сообщать о прогрессе выполнения операции.

Итак, когда у приложения может быть несколько потоков UI? Основной такой сценарий возникает, когда у Вас приложение с несколькими окнами верхнего уровня, что часто называют приложением Single Document Interface (SDI); пример такого приложения Microsoft Word. Каждое окно SDI обычно показывает само себя как отдельное "приложение" на панели задач, и часто работает функционально изолированно от других окон SDI. Путем назначения каждому такому окну своего собственного потока UI, приложение может более отзывчивым.

Immutable-объекты. Не мутируемый (immutable) объект это такой объект, который нельзя изменить - снаружи или внутри. Поля immutable-объекта обычно декларируются как read-only, и они полностью инициализируются в момент конструирования объекта.

Немутируемость это признак функционального программирования, где вместо мутирования объекта Вы создаете новый объект с другими свойствами. LINQ следует этой парадигме. Немутируемость также важна в многопоточности, чтобы избежать проблем с общим записываемым состоянием - путем устранения (или минимизации) записываемых данных.

Один из шаблонов использования немутируемых объектов - инкапсуляция группы связанных полей для минимизации задержек блокировок. Чтобы привести простой пример, предположим, что у нас есть два поля, и мы хотим читать/записывать эти поля атомарно:

int _percentComplete;string _statusMessage;

Вместо того, чтобы реализовать блокировку вокруг этих полей, мы могли бы определить следующий immutable-класс:

class ProgressStatus    // Представляет прогресс какой-то активности
{
   public readonly int PercentComplete;
   public readonly string StatusMessage;
 
   // У этого класса может быть намного больше полей...
 
   public ProgressStatus (int percentComplete, string statusMessage)
   {
      PercentComplete = percentComplete;
      StatusMessage = statusMessage;
   }
}

Тогда мы могли бы определить одно поле такого типа на объекте блокировки:

readonly object _statusLocker = new object();
ProgressStatus _status;

Теперь мы можем читать/записывать значения такого типа без удержания блокировки для большего количества присваиваний, чем одно:

var status = new ProgressStatus (50, "Работаем с этим");
// Представим, что мы присваиваем еще много полей...
// ...
lock (_statusLocker) _status = status;       // Очень краткая блокировка

Для чтения объекта сначала получаем копию объекта (в блокировке). Затем можно прочитать его значения без необходимости удерживать блокировку:

ProgressStatus statusCopy;
lock (_locker ProgressStatus) statusCopy = _status;   // Снова короткая блокировка
int pc = statusCopy.PercentComplete;
string msg = statusCopy.StatusMessage;
...

Технически последние 2 строки кода потокобезопасны благодаря предшествующей блокировке, выполняющей неявный барьер памяти (см. [7] в 4 части этой документации).

Обратите внимание, что это свободный от блокировки способ обеспечить целостность группы связанных полей. Но это не предотвратит данные от изменения, когда Вы впоследствии работаете с ними - для этого обычно нужна блокировка. В 5 части этой документации мы рассмотрим больше примеров немутируемости для упрощения многопоточности, включая PLINQ [13].

Также можно безопасно назначить новый объект ProgressStatus на основе его предыдущего значения (например, можно "инкрементировать" значение PercentComplete) - без блокировки больше одной строки кода. Фактически мы можем делать это без использования одиночной блокировки через использования явные барьеров памяти Interlocked.CompareExchange и ожидание в цикле (spin-wait). Это продвинутая техника, которую мы опишем позже в секции, посвященной параллельному программированию [3].

[Сигнализация с обработкой ожидания события]

Обработка ожидания события (event wait handle) используется для сигнализации. Это способ обмена состоянием, когда один поток ждет поступления оповещения от другого. Event wait handle это самый простой вариант конструкций сигнализации, и он не связан с событиями C#. Event wait handle доступны через три функции: AutoResetEvent, ManualResetEvent и (из Framework 4.0) CountdownEvent. Первые два основаны на общем классе EventWaitHandle откуда они наследуют весь свой функционал.

Конструкция Назначение Работает между процессами? Загрузка(*)
AutoResetEvent Позволяет потоку однократно разблокироваться, когда будет получен сигнал от другого потока. ДА 1000 нс
ManualResetEvent Позволяет потоку разблокироваться навсегда, когда он получил сигнал от другого потока (до сброса). ДА 1000 нс
ManualResetEventSlim (добавлено в Framework 4.0) - 40 нс
CountdownEvent (введено в Framework 4.0) Позволяет потоку разблокироваться, когда он получил заранее определенное количество сигналов. - 40 нс
Barrier (добавлено в Framework 4.0) Реализует барьер выполнения потока. - 80 нс
Wait и Pulse Позволяет потоку блокироваться до момента удовлетворения пользовательского условия. - 120 нс для Pulse

Примечание (*): время, которое тратится на сигнал и ожидание в конструкции на одном и том же потоке (подразумевая, что другие потоки не блокируются), как это было измерено на процессоре Intel Core i7 860.

AutoResetEvent. AutoResetEvent похож на билетный турникет: установка в него билета позволяет пройти через него одному человеку. Префикс auto в имени класса отражает факт, что открытие турникета автоматически закрывается, или сбрасывается (reset) после выполнения через него нескольких шагов. Поток ожидает на турникете, или блокируется, путем вызова WaitOne (ждет этого "one" турникета, пока он открывается), и билет вставляется путем вызова метода Set. Если несколько потоков вызовут WaitOne, то позади турникета растет очередь (как и в случае с блокировками, справедливость первого доступа по отношению к моменту постановки в очередь может иногда нарушаться из-за нюансов реализации операционной системы). Билет может поступить от одного потока; другими словами, любой (не заблокированный) поток с доступом к объекту AutoResetEvent может установить Set на нем для освобождения одного заблокированного потока.

Вы можете создать AutoResetEvent двумя способами. Первый использует его конструктор:

var auto = new AutoResetEvent (false);

Замечание: передача true в этот конструктор эквивалентно немедленному вызову Set на объекте. Второй способ создает AutoResetEvent следующим образом:

var auto = new EventWaitHandle (false, EventResetMode.AutoReset);

В примере ниже запускается поток, чья работа состоит в простом ожидании сигнала от другого потока:

class BasicWaitHandle
{
   static EventWaitHandle _waitHandle = new AutoResetEvent (false);
 
   static void Main()
   {
      new Thread (Waiter).Start();
      Thread.Sleep (1000);    // Ожидание в течение секунды...
      _waitHandle.Set();      // В этом месте будет разбужен Waiter.
   }
 
   static void Waiter()
   {
      Console.WriteLine ("Ожидание...");
      _waitHandle.WaitOne();  // Ожидание оповещения.
      Console.WriteLine ("Оповещение поступило");
   }
}

Пример выведет следующее:

Ожидание... (тут пауза) Оповещение поступило

CSharp Threading EventWaitHandle

Если Set был вызван, когда нет ни одного потока ожидающего потока, то дескриптор ожидания остается открытым, пока какой-нибудь поток не вызовет WaitOne. Это поведение помогает избежать гонок между потоками в голове очереди турникета при определении потока, вставляющий билет ("Упс, билет был вставлен слишком быстро, неудача, теперь нужно ждать неопределенно долго!"). Однако повторяющиеся вызовы Set на турникете, на котором нет ожидания, не даст возможности пройти всей компании потоков: пройдет только один, и все предыдущие "билеты" будут потрачены впустую.

Вызов Reset на AutoResetEvent закрывает турникет (если он открыт) без ожидания или блокировки.

WaitOne принимает не обязательный параметр timeout, и он вернет false если ожидание окончилось по причине истечения таймаута и сигнал не за это время не был получен.

Как только Вы закончили работу с дескриптором ожидания, можете вызвать его метод Close, чтобы освободить ресурсы операционной системы. Альтернативно Вы можете просто бросить все ссылки на дескриптор ожидания и позволить сборщику мусора когда-нибудь позже выполнить работу по утилизации дырок (мусора) в памяти (дескрипторы ожидания реализуют шаблон расформирования, который вызывается через финализирующий метод Close). Это один из нескольких сценариев, где возможно приемлемо полагаться на такое поведение, потому что дескрипторы ожидание слабо загружают операционную систему (асинхронные делегаты [2] полагаются на тот же самый механизм для реализации своего дескриптора ожидания IAsyncResult).

Дескрипторы ожидания освобождаются автоматически, когда выгружается из памяти домен приложения.

Сигнализация в двух направлениях. Предположим, что нам необходимо, чтобы основной поток сигнализировал рабочему потоку 3 раза подряд. Если главный поток просто вызовет Set на дескрипторе ожидания несколько раз в быстрой последовательности, то второй или третий сигналы могут потеряться, поскольку рабочий поток обрабатывает каждый сигнал определенное время.

Решение для главного потока в том, чтобы главный поток ждал, пока рабочий поток не будет готов к приему нового сигнала. Это можно реализовать с другим AutoResetEvent следующим образом:

class TwoWaySignaling
{
   static EventWaitHandle _ready = new AutoResetEvent (false);
   static EventWaitHandle _go = new AutoResetEvent (false);
   static readonly object _locker = new object();
   static string _message;
 
   static void Main()
   {
      // Запуск рабочего потока:
      new Thread (Work).Start();
 
      // Главный поток:
      _ready.WaitOne();                // Первое ожидание готовности рабочего потока
      lock (_locker) _message = "ooo";
      _go.Set();                       // Указание рабочему потоку начать задание
 
      _ready.WaitOne();
      lock (_locker) _message = "ahhh";// Передать рабочему потоку другое сообщение
      _go.Set();
      _ready.WaitOne();
      lock (_locker) _message = null;  // Сигнал рабочему потоку завершиться
      _go.Set();
   }
 
   // Рабочий поток:
   static void Work()
   {
      while (true)
      {
         _ready.Set();                 // Отправка сигнала готовности.
         _go.WaitOne();                // Ожидание начала...
         lock (_locker)
         {
            if (_message == null) return; // Контролируемый выход
            Console.WriteLine (_message);
         }
      }
   }
}

Вывод этого примера:

ooo
ahhh

CSharp Threading TwoWaySignaling

В примере использовалось сообщение null, чтобы дать указание рабочему потоку завершиться. С потоками, которые работают в бесконечном цикле, важно предусмотреть стратегию завершения!

Очередь генератор/потребитель. Очередь producer/consumer является общим требованием обмена данными между потоками. Вот как этот работает:

• Настраивается очередь, чтобы описать рабочие элементы, или данные, над которыми осуществляется обработка.
• Когда требуется выполнение задачи, она ставится в очередь, позволяя вызывающему коду выполнять другие действия.
• Один или большее количество потоков работают в фоновом режиме, собирая и обрабатывая поставленные в очередь элементы.

Достоинство этой модели в том, что присутствует прецизионное управление, сколько рабочих потоков может быть запущено одновременно. Это позволяет Вам ограничить не только траты процессорного времени, но и других ресурсов. Например, если задачи выполняют интенсивный дисковый ввод/вывод, у Вас может быть только один рабочий поток, чтобы избежать исчерпание ресурсов операционной системы для других приложений. Другой тип приложения может иметь 20 рабочих потоков. Вы также можете добавлять или удалять рабочие потоки во время во время жизни очереди. Пул потоков CLR сам по себе является разновидностью очереди producer/consumer.

Очередь producer/consumer обычно содержит элементы, над которыми выполняется (одинаковая) обработка. Например, элементами данных могут быть имена файлов, и обработка может шифровать эти файлы.

В примере ниже мы используем один AutoResetEvent для подачи сигнала рабочему потоку, когда он ожидает на пустой очереди (другими словами, ожидание происходит, когда вся работа выполнена). Мы завершим рабочий поток постановкой в очередь null-задачи:

using System;
using System.Threading;
using System.Collections.Generic;
 
class ProducerConsumerQueue : IDisposable
{
   EventWaitHandle _wh = new AutoResetEvent (false);
   Thread _worker;
   readonly object _locker = new object();
   Queue< string> _tasks = new Queue< string>();
 
   public ProducerConsumerQueue()
   {
      _worker = new Thread (Work);
      _worker.Start();
   }
 
   public void EnqueueTask (string task)
   {
      lock (_locker) _tasks.Enqueue (task);
      _wh.Set();
   }
 
   public void Dispose()
   {
      EnqueueTask (null);  // Сигнал потребителю (рабочему потоку) выйти.
      _worker.Join();      // Ожидание завершения потока-потребителя.
      _wh.Close();         // Освобождение любых ресурсов операционной системы.
   }
 
   // Рабочий поток:
   void Work()
   {
      while (true)
      {
         string task = null;
         lock (_locker)
         if (_tasks.Count > 0)
         {
            task = _tasks.Dequeue();
            if (task == null) return;
         }
         if (task != null)
         {
            Console.WriteLine ("Выполнение задачи: " + task);
            Thread.Sleep (1000);    // Симуляция работы...
         }
         else
            _wh.WaitOne();          // Нет больше задач, ожидание сигнала.
      }
   }
}

Чтобы гарантировать потокобезопасность, здесь мы используем блокировку для защиты доступа к коллекции Queue< string>. Мы также явно закрываем дескриптор ожидания его методом Dispose, поскольку мы могли потенциально создать и уничтожить множество экземпляров этого класса во время жизни приложения.

Вот метод главного потока, которым тестируется очередь:

static void Main()
{
   using (ProducerConsumerQueue q = new ProducerConsumerQueue())
   {
      q.EnqueueTask ("Привет");
      for (int i = 0; i < 10; i++) q.EnqueueTask ("задача " + i);
      q.EnqueueTask ("Пока!");
   }
 
   // Выход реализуется вызовом метода Dispose, когда
   // ставится в очередь null-задача, и главный поток ждет, когда
   // рабочий поток (потребитель данных завершит свою задачу).
}

Вывод этого примера:

Выполнение задачи: Привет
Выполнение задачи: задача 1
Выполнение задачи: задача 2
Выполнение задачи: задача 3
...
...
Выполнение задачи: задача 9
Пока!

Framework 4.0 предоставляет новый класс BlockingCollection< T> [18], который реализует функционал очереди producer/consumer. Наша написанная вручную очередь producer/consumer все еще актуальна - она не только показывает AutoResetEvent и безопасность потоков, но также является базой для более сложных структур. Например, если Вы хотите получить ограниченную очередь блокировки (с ограничением на количество поставленных в очередь задач), и также хотите поддерживать отмену (и удаление) поставленных в очередь элементов, этот код предоставит отличную начальную точку для программирования. Пример очереди producer/consume будет впоследствии использоваться при обсуждении Wait и Pulse [19].

ManualResetEvent. ManualResetEvent функционирует как обычные ворота. Вызов Set открывает ворота, позволяя любому количеству потоков вызвать WaitOne чтобы пройти через них. Вызов Reset закрывает ворота. Потоки, которые вызвали WaitOne на закрытых воротах, будут заблокированы; когда ворота откроются в следующий раз, они все запустятся одновременно. Кроме этих отличий, ManualResetEvent работает наподобие AutoResetEvent.

С AutoResetEvent Вы можете сконструировать ManualResetEvent двумя способами:

var manual1 = new ManualResetEvent (false);
var manual2 = new EventWaitHandle (false, EventResetMode.ManualReset);

Framework 4.0 предоставил другую версию ManualResetEvent, которая называется ManualResetEventSlim. Она оптимизирована для коротких времен ожидания - с возможностью как опции работать с циклом ожидания на установленное количество итераций. Также это более эффективная управляемая (managed) реализация, позволяющая остановить Wait через CancellationToken [15]. Однако это нельзя использовать для сигнализации между процессами. ManualResetEventSlim не подкласс WaitHandle; однако он предоставляет свойство WaitHandle, которое возвратит основанный на WaitHandle объект, который будет вызван (с профилем производительности традиционного дескриптора ожидания).

Ожидание сигнализации AutoResetEvent или ManualResetEvent занимает около 1 микросекунды (предполагая, что нет блокирования).

ManualResetEventSlim и CountdownEvent (см. ниже) могут быть в 50 раз быстрее для сценариев короткого ожидания, из-за того, что они не полагаются на операционную систему и разумно используют циклы прокрутки.

Однако в большинстве сценариев потери от классов сигнализации сами по себе не создают узкое место. Исключение составляет код с жестким одновременным выполнением, что будет обсуждаться в части 5 (см. [14]) этой документации.

ManualResetEvent полезен для разрешения в одном потоке, чтобы разблокировать множество других потоков. Обратный сценарий обрабатывается CountdownEvent.

CountdownEvent. Позволяет Вам ждать больше одного потока. Этот класс был введен в Framework 4.0, и для него разработана эффективная, полностью управляемая (managed) реализация.

Если Ваша программа работает на предыдущей версии .NET Framework, то не все еще потеряно! Позже мы покажем, как написать аналог CountdownEvent с использованием Wait и Pulse [20].

Для использования CountdownEvent инстанцируйте этот класс с количеством потоков (или счетчиком, count), которые Вы хотите ждать:

var countdown = new CountdownEvent (3);   // Инициализация "count" значением 3.

Вызов Signal декрементирует count; вызов Wait блокирует поток, пока count не дойдет до 0. Пример:

static CountdownEvent _countdown = new CountdownEvent (3);
 
static void Main()
{
   new Thread (SaySomething).Start ("Я поток 1");
   new Thread (SaySomething).Start ("Я поток 2");
   new Thread (SaySomething).Start ("Я поток 3");
 
   _countdown.Wait();   // Блокировка, пока Signal не будет вызван 3 раза.
   Console.WriteLine ("Все потоки что-то сказали!");
}
 
static void SaySomething (object thing)
{
   Thread.Sleep (1000);
   Console.WriteLine (thing);
   _countdown.Signal();
}

Проблемы, для которых предназначен CountdownEvent, можно иногда решить проще путем использования конструкций структурированного параллелизма, которые рассматриваются в части 5 [14] (PLINQ и класс Parallel).

Вы можете добавлять увеличение значения для счетчика CountdownEvent вызовом AddCount. Однако если счетчик уже достиг нуля, AddCount выбросит исключение: Вы не можете "отменить" сигнал события CountdownEvent путем вызова AddCount. Чтобы избежать возможности срабатывания исключения, используйте вместо этого другой метод TryAddCount, который вернет false, если счет достиг до 0.

Для отмены сигнала события обратного счета вызовите Reset: это одновременно отменит сигнал конструкции и сбросит счетчик к своему оригинальному значению.

Наподобие ManualResetEventSlim, CountdownEvent публикует свойство WaitHandle для сценариев, где некоторый другой класс или метод ожидает объект, основываясь на WaitHandle.

Создание EventWaitHandle для взаимодействия между процессами. Конструктор EventWaitHandle позволяет создавать "именованный" EventWaitHandle, который может работать между несколькими процессами (чем процесс отличается от потока, см. [2]). Имя это просто строка, и у неё может быть любое значение, которое не содержит нежелательного конфликта с каким-то другим именем! Если это имя уже используется на компьютере, где работают процессы, то Вы получите ссылку на тот же самый нижележащий EventWaitHandle; иначе операционная система создаст новый. Пример:

EventWaitHandle wh = new EventWaitHandle (false, EventResetMode.AutoReset,
                                          "MyCompany.MyApp.SomeName");

Если каждое из двух приложений запустят этот код, то они могут обмениваться сигналами друг с другом: дескриптор ожидания (wait handle) может работать на всех потоках обоих процессов.

Дескрипторы ожидания и Thread Pool. Если в Вашем приложении есть несколько потоков, которые тратят большинство своего времени на дескрипторе ожидания (wait handle), то можно уменьшить трату ресурсов вызовом ThreadPool.RegisterWaitForSingleObject. Этот метод принимает делегата, который выполняется, когда прошел сигнал дескриптора ожидания. Пока идет ожидание, это не связывает поток:

static ManualResetEvent _starter = new ManualResetEvent (false);
 
public static void Main()
{
   RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject
                              (_starter, Go, "Какие-то данные", -1, true);
   Thread.Sleep (5000);
   Console.WriteLine ("Сигнал для рабочего потока...");
   _starter.Set();
   Console.ReadLine();
   reg.Unregister (_starter);    // Очистка по завершению работы.
}
 
public static void Go (object data, bool timedOut)
{
   Console.WriteLine ("Запущено - " + data);
   // Выполнение задачи...
}

Пример выведет следующее:

(задержка 5 секунд)
Сигнал для рабочего потока...
Запущено - Какие-то данные

Когда прошел сигнал на дескриптор ожидания (или когда истек таймаут), делегат запустится на потоке пула.

В дополнение к дескриптору ожидания и делегату RegisterWaitForSingleObject принимает объект "черного ящика", который передается Вашему методу делегата (наподобие ParameterizedThreadStart), а также таймаут в миллисекундах (–1 означает ожидание без таймаута) и bool-флаг, показывающий, однократный ли это запрос или повторяющийся.

RegisterWaitForSingleObject в частности важен в сервере приложения, который должен обрабатывать множество одновременных запросов. Предположим, что Вам нужно блокировать выполнение на ManualResetEvent и просто ждать WaitOne:

void AppServerMethod()
{
   _wh.WaitOne();
   // ... продолжение выполнения
}

Если 100 клиентов вызовут этот метод, то 100 потоков сервера были бы заняты на время блокирования. Замена _wh.WaitOne на RegisterWaitForSingleObject позволяет методу выполнить немедленный возврат, не теряя потоки:

void AppServerMethod
{
   RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject
      (_wh, Resume, null, -1, true);
   ...
}
 
static void Resume (object data, bool timedOut)
{
   // ... продолжение выполнения
}

Объект данных, переданный в Resume, позволяет продолжить обработку любых текущих данных.

WaitAny, WaitAll и SignalAndWait. В дополнение к методам Set, WaitOne и Reset это статические методы класса WaitHandle, решающие более сложные задачи синхронизации. Методы WaitAny, WaitAll и SignalAndWait выполняют операции сигнализации и ожидания на нескольких дескрипторах. Дескрипторы ожидания могут быть разных типов (включая Mutex и Semphore, поскольку они также выводятся из абстрактного класса WaitHandle). ManualResetEventSlim и Countdown также могут принять участие в этих методах через их свойства WaitHandle.

WaitAll и SignalAndWait имеют странное соединение с устаревшей (legacy) архитектурой COM: эти методы требуют, чтобы вызывающая сторона была в многопоточном окружении - модель, меньше всего подходящая для функциональной совместимости. Главный поток приложения WPF или Windows Forms, например, в этом режиме не может взаимодействовать с буфером обмена (clipboard). Мы кратко рассмотрим альтернативы ниже.

WaitHandle.WaitAny ждет любого из массива дескрипторов ожидания; WaitHandle.WaitAll атомарно ждет всех имеющихся дескрипторов. Это означает следующее, если Вы ждете на двух AutoResetEvents:

• WaitAny никогда не закончится "защелкиванием" обоих событий.
• WaitAll никогда не закончится "защелкиванием" только одного события.

SignalAndWait вызовет Set на одном WaitHandle, и затем вызовет WaitOne на другом WaitHandle. После сигнализации первого дескриптора произойдет переход на начало очереди в ожидании второго дескриптора; это помогает ему успешно выполниться (хотя операция не будет реально атомарной). Вы можете думать про этот метод как "замену" одного сигнала на другой, и использование его на паре EventWaitHandles, чтобы установить два потока на "встречу" в одном моменте времени. Этот трюк выполнит AutoResetEvent или ManualResetEvent. Первый поток выполнит следующее:

WaitHandle.SignalAndWait (wh1, wh2);

В то же время другой поток выполнит противоположное:

WaitHandle.SignalAndWait (wh2, wh1);

Альтернативы WaitAll и SignalAndWait. WaitAll и SignalAndWait не будут работать в одном потоке. К счастью, есть альтернативы. В случае SignalAndWait редко когда надо использовать иго семантику перехода по очереди: в нашем примере "встречи" было бы допустимо просто вызывать Set на первом дескрипторе ожидания, и затем вызвать WaitOne на другом, если бы дескрипторы ожидания использовались только для встречи. В классе Barrier [6] мы рассмотрим другой вариант для реализации встречи потоков.

В случае WaitAll при некоторых ситуация альтернативой будет использование метода Invoke класса Parallel, который будет рассматриваться в части 5 [14]. Также мы рассмотрим продолжение задачи и продолжения (Tasks и continuations), и посмотрим, как TaskFactory из ContinueWhenAny предоставляет альтернативу для WaitAny.

Во всех других сценариях ответом будет низкоуровневый подход, который решает все проблемы сигнализации: Wait и Pulse [5].

[Контексты синхронизации]

Альтернативой для ручной блокировки является блокировка декларативная. Путем наследования из ContextBoundObject и применения атрибута Synchronization Вы инструктируете библиотеку CLR применить блокировку автоматически. Пример:

using System;
using System.Threading;
using System.Runtime.Remoting.Contexts;
 
[Synchronization]
public class AutoLock : ContextBoundObject
{
   public void Demo()
   {
      Console.Write ("Start...");
      Thread.Sleep (1000);          // Выполнение не может быть здесь вытеснено
      Console.WriteLine ("end");    // благодаря автоматической блокировке!
   } 
}
 
public class Test
{
   public static void Main()
   {
      AutoLock safeInstance = new AutoLock();
      new Thread (safeInstance.Demo).Start();   // Конкурентный вызов
      new Thread (safeInstance.Demo).Start();   // 3 раза метода Demo.
      safeInstance.Demo();                      //
   }
}

Результат работы этого примера:

Start... end
Start... end
Start... end

Библиотека CLR гарантирует, что в любой момент времени только один поток может выполнить код в safeInstance. Она делает это созданием одного объекта синхронизации и блокировки на нем вокруг любого вызова каждого метода или свойства safeInstance. Область блокировки в этом случае весь объект safeInstance, и это называется контекстом синхронизации.

Как это работает? Подсказка находится в атрибуте пространства имен Synchronization: System.Runtime.Remoting.Contexts. ContextBoundObject можно рассматривать как "remote" (дальний) объект, что означает перехват всех его вызовов. Чтобы этот перехват был возможен, когда мы инстанцируем AutoLock, CLR в действительности вернет прокси - объект с теми же методами и свойствами, что и объект AutoLock, который работает как промежуточный. Именно через это и происходит автоматическая блокировка. В целом перехват занимает около микросекунды для каждого вызова метода.

Автоматическая синхронизация не может использоваться ни для защиты статических членов типа, ни для классов, которые не были унаследованы от ContextBoundObject (например, Windows Form).

Блокировка применяется внутри точно так же. Вы можете ожидать, что следующий пример выведет тот же самый результат, как и предыдущий пример:

[Synchronization]
public class AutoLock : ContextBoundObject
{
   public void Demo()
   {
      Console.Write ("Start...");
      Thread.Sleep (1000);
      Console.WriteLine ("end");
   }
 
   public void Test()
   {
      new Thread (Demo).Start();
      new Thread (Demo).Start();
      new Thread (Demo).Start();
      Console.ReadLine();
  }
 
   public static void Main()
   {
      new AutoLock().Test();
   }
}

Обратите внимание, что здесь вставлен оператор Console.ReadLine. Поскольку только один поток может выполнить код в любой момент времени в объекте этого класса,, то три новых потока останутся заблокированными в методе Demo, пока метод Test не завершится, это потребовало для завершения ReadLine. В результате мы получили тот же результат, что и ранее, но только после нажатия на клавишу Enter. Это хороший метод потокобезопасности, подходящий для устранения проблем любой полезной многопоточности в классе.

Далее, мы не решили проблему, описанную ранее: если бы AutoLock был классом коллекции, например, мы все еще нуждались в блокировке вокруг оператора наподобие следующего, подразумевая, что он запущен из другого класса (если классом этого кода не был самостоятельно синхронизируемый ContextBoundObject):

if (safeInstance.Count > 0) safeInstance.RemoveAt (0);

Контекст синхронизации может быть расширен вне области действия одного объекта. По умолчанию, если синхронизируемый объект инстанцирован из кода другого объекта, то оба они используют один и тот же контекст (другими словами, одну большую блокировку!). Это поведение можно поменять указанием целочисленного флага в конструкторе атрибута Synchronization, используя константы, определенные в классе SynchronizationAttribute:

Константа Назначение
NOT_SUPPORTED Эквивалентно не использованию атрибута Synchronized.
SUPPORTED Подсоединяется к существующему контексту синхронизации, если инстанцирован из другого объекта синхронизации, иначе остается не синхронизированным.
REQUIRED (по умолчанию) Подсоединяется к существующему контексту синхронизации, если инстанцирован из другого объекта синхронизации, иначе создает новый контекст.
REQUIRES_NEW Всегда создает новый контекст синхронизации.

Таким образом, если объект класса SynchronizedA инстанцирует объект класса SynchronizedB, они получат отдельные контексты синхронизации, если SynchronizedB был декларирован так:

[Synchronization (SynchronizationAttribute.REQUIRES_NEW)]
public class SynchronizedB : ContextBoundObject { ...

Чем больше область действия контекста синхронизации, тем проще им управлять, но тем менее это полезно для параллелизма. Кроме того, отдельные контексты синхронизации вводят deadlock-и. Пример:

[Synchronization]
public class Deadlock : ContextBoundObject
{
   public DeadLock Other;
   public void Demo() { Thread.Sleep (1000); Other.Hello(); }
   void Hello()       { Console.WriteLine ("hello");        }
}
 
public class Test
{
   static void Main()
   {
      Deadlock dead1 = new Deadlock();
      Deadlock dead2 = new Deadlock();
      dead1.Other = dead2;
      dead2.Other = dead1;
      new Thread (dead1.Demo).Start();
      dead2.Demo();
   }
}

Поскольку каждый экземпляр Deadlock создан внутри Test - не синхронизированном классе - каждый экземпляр получает свой собственный контекст синхронизации, и таким образом собственную блокировку. Когда два объекта вызывают друг друга, у deadlock займет много времени (если быть точным, то 1 секунду). Проблема была бы особенно коварной, если бы классы Deadlock и Test были бы написаны разными командами программистов. Это в отличие от явных блокировок, где deadlock-и обычно более очевидны.

Реентрантность. Потокобезопасный метод иногда называют реентрантным (повторно входимым), потому что он может быть безопасно вытеснен в любой части своего выполнения тем же самым методом, и затем вызван снова в другом потоке без какого-либо вредного влияния. В общем смысле условия, ориентированные на безопасное многопоточное выполнение и реентрантность рассматривают либо как синонимы, либо как связанные друг с другом понятия.

У реентрантности есть однако более зловещий оттенок в автоматических режимах блокировки. Если атрибут Synchronization примененный с аргументом реентрантности, равен true:

[Synchronization(true)]

то блокировка контекста синхронизации будет временно освобождена, когда выполнение покинет этот контекст. В предыдущем примере это препятствовало бы возникновению deadlock, что очевидно было бы желательно. Однако побочный эффект состоит в том, что во время этого промежуточного периода любой поток может вызвать любой метод оригинального объекта (повторно войти в контекст синхронизации, "re-entering"), в результате произойдут все сложности многопоточного выполнения, которых стараются избежать в первую очередь. Это проблема реентрантности.

Из-за того, что [Synchronization(true)] применяется на уровне класса, этот атрибут превращает любой метод, выходящий из контекста, в троянского коня для реентрантности.

Хотя реентрантность может быть опасной, иногда есть некоторые другие варианты. Например предположим, что нужно реализовать многопоточность внутри синхронизированного класса путем делегирования логики в рабочие потоки, работающие на объектах в отдельных контекстах. Без реентрантности этим рабочим потокам можно необоснованно препятствовать или во взаимной коммуникации, или в коммуникации с оригинальным объектом.

Это высвечивает фундаментальную уязвимость автоматической синхронизации: расширенная область, к которой применена блокировка, может на самом деле вводить трудности, которые иначе могли и не возникать. К этим сложностям относятся deadlock, реентрантность и урезанный параллелизм выполнения, тогда ручная блокировка может быть более приемлемой не только для простых сценариев.

[Ссылки]

1. Threading in C# PART 2: BASIC SYNCHRONIZATION site:albahari.com.
2. Потоки на C#. Часть 1: введение.
3. SpinLock и SpinWait.
4. Блокировки Reader/Writer.
5Обмен сигналами через Wait и Pulse.
6. Класс Barrier.
7. Барьеры памяти и изменчивость данных.
8. Ключевое слово volatile.
9. Класс Interlocked.
10. Interrupt и AbortAbortAbort.
11. Suspend и Resume.
12. Не блокирующая синхронизация.
13. PLINQ.
14Потоки на C#. Часть 5: параллельное программирование.
15. Маркеры отмены (Cancellation Tokens).
16. BackgroundWorker.
17. Продолжения (continuations).
18. BlockingCollection.
19. Очередь генератора/получателя (Producer/Consumer Queue).
20. Написание CountdownEvent.
21Чем отличается мьютекс от семафора?

 

Добавить комментарий


Защитный код
Обновить

Top of Page