Программирование PC Потоки на C#. Часть 5: параллельное программирование Tue, December 18 2018  

Поделиться

нашли опечатку?

Пожалуйста, сообщите об этом - просто выделите ошибочное слово или фразу и нажмите Shift Enter.


Потоки на C#. Часть 5: параллельное программирование Печать
Добавил(а) microsin   

В этой части документации мы рассмотрим то новое в многопоточном API для использования с многоядерными процессорами, которое появилось в версии Framework 4.0:

• Parallel LINQ или PLINQ.
• Класс Parallel.
• Конструкции для параллельного выполнения задач (task parallelism).
• Коллекции одновременного выполнения (concurrent collections).
• SpinLock и SpinWait.

Это API также известно как PFX (Parallel Framework, аббревиатура ИМХО несколько ошибочная). Класс Parallel вместе с конструкциями параллелизма задач называется библиотекой параллельных задач, или TPL (Task Parallel Library).

Framework 4.0 также добавил несколько низкоуровневых конструкций для потоков, которые выполняют традиционные для многопоточности задачи. Мы их уже рассматривали ранее [5]:

• Конструкции сигнализации с малыми задержками (low-latency signaling constructs) SemaphoreSlim, ManualResetEventSlim, CountdownEvent и Barrier.
• Маркеры отмены (cancellation tokens) для кооперативной отмены действий (cooperative cancellation).
• Классы ленивой инициализации (lazy initialization classes).
• ThreadLocal< T>.

Для комфортного понимания перед дальнейшим чтением желательно ознакомиться с частями 1..4 документации [2, 3, 4, 5] - в частности для изучения блокировок (locking) и безопасности совместной работы потоков (thread safety).

Все листинги кода секций параллельного программирования доступны как интерактивные примеры в LINQPad [6]. LINQPad это электронный блокнот, идеально подходящий для проверки блоков кода C# без необходимости создания окружающего класса, проекта или решения (имеется в виду среда Microsoft Visual Studio). Для получения доступа к примерам кликните на "Download More Samples" закладки " на закладке" в LINQPad, находящейся слева внизу, и выберите "C# 4.0 in a Nutshell: More Chapters".

[Почему PFX?]

В последнее время тактовые частоты CPU перестали расти, и производители переключились увеличение процессорных ядер. Это создало некоторую проблему для нас как для программистов, потому что стандартный однопоточный код не будет автоматически работать быстрее из-за увеличения числа ядер.

Задействование многих ядер проще реализовать в большинстве серверных приложений, где каждый поток независимо от других потоков работает над обработкой отдельного запроса пользователя, но сложнее то же самое сделать в приложении декстопа - потому что это обычно требует от нагруженного вычислениями кода реализации следующих функций:

1. Разбиение всей вычислительной задачи на малые части.
2. Запуск параллельной обработки этих частей с помощью многопоточности.
3. Собрать все результаты работы вместе, когда они станут доступны.

Последнее нужно сделать таким образом, чтобы не повредить совместной работе потоков ни с точки зрения безопасности и устойчивости (thread-safe), ни с точки производительности. Хотя Вы можете сделать все это классическими конструкциями многопоточности, это будет грубым подходом - в частности на шагах разделения общей задачи на части и последующей сборки результатов работы потоков. Другая проблема в том, что обычная стратегия блокировки для безопасности потоков вызывает большую конкуренцию потоков, когда много потоков работают над одними и теми же данными сразу.

Библиотеки PFX разработаны специально для того, чтобы помочь программировать в этих сценариях.

Программирование с целью задействования нескольких ядер или нескольких процессоров называется параллельным программированием. Это подмножество более широкой концепции многопоточности.

Концепции PFX. Существует 2 стратегии для разделения общей работы между потоками: параллелизм данных (data parallelism) и параллелизм задач (task parallelism).

Когда набор задач нужно выполнить на многих значениях данных, мы можем распараллелить всю задачу так, чтобы каждый поток выполнял (одинаковый) набор действий над подмножеством значений. Такой подход называется data parallelism, потому что мы распределяем обрабатываемые данные между потоками. В отличие от этого task parallelism подразумевает выполнение разными потоками разных задач; другими словами, каждый поток выполняет отличающий от других потоков алгоритм действий.

В общем случае data parallelism проще реализуется и лучше масштабируется для повышения количества ядер, потому что такой подход снижает или вовсе убирает работу потоков над общими данными (благодаря чему снижается конкуренция потоков и легче решать проблемы thread-safety). Также параллелизм данных усиливает тот факт, что чаще есть больше значений данных, чем каких-то дискретных задач, что увеличивает потенциал параллелизма.

Параллелизм данных также способствует структурированному параллелизму, что означает, что параллельные единицы общей работы запускаются и завершаются в одном и том же месте программы. В отличие от этого параллелизм задач имеет тенденцию быть не структурированным, это означает, что элементы параллельной работы могут начаться и завершиться в местах, рассеянных по программе. Структурированный параллелизм проще, меньше подвержен ошибкам, и позволяет Вам обрабатывать трудное задание разделения и координацию потоков (и даже финальное сопоставление результатов) из библиотек.

Компоненты PFX. PFX представляет два слоя функциональности. Верхний слой состоит из двух видов data parallelism API: библиотека PLINQ и класс Parallel. Нижний слой содержит классы параллелизма задач, плюс набор дополнительных конструкций, чтобы помочь с параллельным программированием.

CSharp Threading ParallelProgramming

PLINQ предоставляет самый богатый функционал: автоматизация всех шагов параллелизации, включая разделение работы на задачи, запуск выполнение этих задач в потоках и конечное сопоставление результатов в одну выходную последовательность. Она называется декларативной - потому что Вы просто декларируете, что хотите распараллелить свою работу (что структурируете как запрос LINQ), и позволяете платформе позаботиться о деталях реализации. В отличие от такого подхода другой вариант работы императивный, здесь Вам нужно специально написать код для разделения задачи на части и сопоставления результатов обработки. В случае использования класса Parallel Вы должны собрать результаты работы потоков самостоятельно; с конструкциями параллелизма задач Вы должны также самостоятельно разделить работу на части:

Решение для организации
параллельности
Разделение общей
работы на части
Сведение результатов
PLINQ ДА ДА
Класс Parallel ДА нет
Параллелизм задач PFX нет нет

Коллекции конкуренции (concurrent collections) и примитивы циклов опроса (spinning primitives) помогут Вам реализовать на низком уровне действия по параллельному программированию Это важно, потому что PFX была спроектирована для работы только на сегодняшней аппаратуре, но также может потребоваться работать с будущими моделями процессоров, где есть намного больше ядер. Если Вы хотите переместить кучу напиленной древесины, имея 32 рабочих, то самая большая трудность при перемещении древесины состоит в том, чтобы эти рабочие не мешали друг другу. То же самое с делением алгоритма между 32 ядрами: обычные блокировки используются для защиты доступа к общим ресурсам, результирующая блокировка может означать, что только часть этих ядер когда-либо будут на самом деле работать одновременно. Коллекции конкуренции специально настроены для высоко конкурентного доступа, с фокусом на минимизации или устранения блокировок. PLINQ и класс Parallel сами по себе полагаются на коллекции конкуренции и примитивы циклов опроса для эффективного управления работой.

Традиционная многопоточность это один из сценариев, когда многопоточность дает выигрыш даже на одноядерной машине, когда на самом деле нет настоящего параллелизма. Мы рассматривали все это ранее: задачи такого класса призваны сохранить отзывчивость интерфейса пользователя с одновременным выполнением других задач наподобие загрузки веб-страниц.

Некоторые конструкции, которые мы рассмотрим в секциях параллельного программирования, также иногда полезны и для традиционной многопоточности. В частности:

• PLINQ и класс Parallel полезны, когда Вы хотите выполнять операции параллельно и затем ждать их завершения (структурированный параллелизм). Это включает незначительно нагружающие CPU задачи, такие как вызов веб-службы.
• Конструкции параллелизма задач полезны, когда Вы хотите запустить некую операцию в потоке пула, и также для управления работой задачи через продолжения выполнения (continuations) и родительские/дочерние задачи.
• Коллекции конкурентного выполнения иногда применимы, когда Вы хотите организовать потокобезопасную очередь, стек или словарь.
• BlockingCollection предоставляет простой способ реализовать структуры генератор/потребитель данных (producer/consumer).

Когда следует использовать PFX. Изначально PFX предназначен для параллельного программирования: задействование ядер многоядерных процессоров для ускорения работы кода с интенсивными вычислениями.

Сложность в применении многих ядер показывает закон Амдала, который устанавливает, что максимальное улучшение производительности ограничивается порцией кода, которая обязательно должна выполняться последовательно. Например, если две трети времени алгоритма можно распараллелить, Вы не сможете превысить определенный порог производительности, даже если будете иметь в наличии бесконечное количество ядер [7].

Таким образом сначала нужно оценить узкое место, ограничивающее производительность кода. Также стоит определиться, насколько нужны в коде конкретные интенсивные вычисления - часто алгоритмическая оптимизация дает самый большой выигрыш в производительности. Однако недостаток такого подхода в том, что некоторые техники оптимизации усложняют реализацию параллелизма кода.

Самый большой выигрыш дает правильная организация пожалуй самого сложного момента параллелизма - где общая работа может быть просто разделена на части, которые можно эффективно выполнить каким-то алгоритмом (структурированный параллелизм лучше всего подходит для таких проблем). Примерами могут служить задачи по обработке изображений, ray tracing, атаки по подбору в прикладной математике или криптографии. Другой пример проблемы параллельности - реализация оптимизированной версии алгоритма быстрой сортировки - достижение хорошего результата не тривиально, и может потребовать применения не структурированного параллелизма.

[PLINQ]

PLINQ автоматически распараллеливает локальные запросы (LINQ queries). Достоинство PLINQ в том, что её легко использовать - платформа берет на себя заботу разделения общей работы и конечного сведения вместе результатов.

Чтобы использовать PLINQ просто вызовите AsParallel() на входной последовательности, и затем продолжите запрос LINQ, как обычно. Следующий запрос вычисляет простые числа между 3 и 100000 - при полном использовании всех ядер целевой машины:

// Вычисление последовательности простых чисел с использованием элементарного
// (не оптимизированного) алгоритма.
//
// Примечание: все коды листинга в этой главе доступны в LINQPad как
// интерактивные куски кода. Для активации этих примеров кликните
// "Download More Samples" на закладке Samples утилиты LINQPad,
// находящейся слева внизу, и выберите "C# 4.0 in a Nutshell: More Chapters".
IEnumerable< int> numbers = Enumerable.Range (3, 100000-3);
 
var parallelQuery = 
   from n in numbers.AsParallel()
   where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
   select n;
 
int[] primes = parallelQuery.ToArray();

AsParallel это метод расширения в System.Linq.ParallelEnumerable. Он оборачивает ввод в последовательность на базе ParallelQuery< TSource>, который запускает операторы запроса LINQ, которые Вы впоследствии вызываете, чтобы связать с альтернативным набором методов расширения, определенных в ParallelEnumerable. Это дает параллельные реализации каждого из стандартных операторов запроса. В сущности это работает по принципу разделения входной последовательности на куски, выполняемые в разных потоках, которые соединяются вместе обратно в одну выходную потребляемую последовательность.

На картинке ниже показан прицип распараллеливания выражения:

"abcdef".AsParallel().Select(c => char.ToUpper(c)).ToArray()

CSharp Threading PLINQExecution

Вызов AsSequential() разворачивает последовательность ParallelQuery так, чтобы последующие операторы запроса связали связались со стандартными операторами запроса и выполнились последовательно. Это нужно перед вызовом методов, которые имеют побочные эффекты или не безопасны в условиях многопоточности.

Для операторов запроса, которые принимают две входные последовательности (Join, GroupJoin, Concat, Union, Intersect, Except и Zip), Вы должны применить AsParallel() к обоим входным последовательностям (иначе будет выброшено исключение). Однако Вам не нужно продолжать применять AsParallel к запросу во время его выполнения, потому что операторы запроса PLINQ выводят другую последовательность ParallelQuery. Фактически повторный вызов AsParallel будет не эффективным, потому что вызовет слияние и повторное разделение запроса:

mySequence.AsParallel()          // Оборачивает последовательность в ParallelQuery< int>
          .Where (n => n > 100)  // Выводит другой ParallelQuery< int>
          .AsParallel()          // Не требуется, и не эффективно!
          .Select (n => n * n)

Не все операторы запроса можно эффективно распараллелить. Для тех, которые распараллелить нельзя, PLINQ вместо этого реализует последовательный оператор. PLINQ может также работать последовательно, если подозревает, что издержки параллелизации на самом деле замедлят определенный запрос.

PLINQ подходит только для локальных коллекций: он не работает с LINQ к SQL или Entity Framework, потому что в этих случаях LINQ транслируется в SQL, который затем выполняется на сервере баз данных. Однако Вы можете использовать PLINQ для выполнения дополнительных локальных запросов на результатах, полученных из запросов к базе данных.

Если запрос PLINQ выбрасывает исключение, то оно перебрасывается как AggregateException, у которого свойство InnerExceptions содержит реальное исключение (или исключения). Подробнее см. "Работа с AggregateException".

Так как AsParallel прозрачно распараллеливает запросы LINQ, возникает вопрос: почему Microsoft не делает по умолчанию простую параллелизацию стандартных операторов запроса, т. е. не применяет PLINQ по умолчанию?

Тут несколько причин. Во-первых, для того, чтобы применение PLINQ было полезным, должно присутствовать разумное количество интенсивной вычислительной работы, чтобы передать её на обработку рабочим потокам. Большинство запросов LINQ к объектам выполняются очень быстро, и к ним не только не нужно применять параллелизм, но это также будет вовлекать дополнительные ненужные вычислительные расходы на разделение задачи, сбор результатов вместе, координацию работы потоков, в результате все только замедлится.

Кроме того:

• Выход запроса PLINQ query (по умолчанию) может отличаться от запроса LINQ в контексте упорядочивания элементов.
• PLINQ оборачивает исключения в AggregateException (чтобы обработать ситуации выбрасывания множества исключений).
• PLINQ будет давать ненадежные результаты, если запросы вовлекают себя не безопасные для выполнения в потоках методы.

И наконец, PLINQ предлагает довольно много вариантов настройки. Обременение стандартных запросов LINQ к Objects API такими нюансами дополнительно отвлекало бы от выполнения общей задачи.

Баллистика параллельного выполнения. Как и обычные запросы LINQ, запросы PLINQ вычисляются "ленивым" способом. Это означает, что выполнение срабатывает только когда Вы начинаете потреблять результаты запроса - обычно в цикле foreach (хотя это может быть сделано и через оператор преобразования, такой как ToArray, или оператор, который возвращает один элемент или значение).

Однако по мере потребления результатов выполнение осуществляется несколько иначе по сравнению с обычными последовательными запросами. Последовательный запрос выполняется полностью потребителем в стиле "выталкивания" (pull): каждый элемент из входной последовательности выбирается точно тогда, когда этого требует потребитель. Параллельный запрос обычно использует независимые потоки для выборки элементов из входной последовательности немного перед тем, как это понадобилось потребителю (нечто похожее на телевизионный суфлер для дикторов или антишоковый буфер данных в CD-плеерах). Затем он обрабатывает элементы в параллельном запросе через цепочку запроса, сохраняя результаты в маленьком буфере, откуда они могут быть взяты по запросу потребителя. Если потребитель приостановил потребление данных или сразу прекратил их обработку, то процессор запроса также ставит на паузу или останавливает работу, чтобы зря не расходовать время CPU или память.

Вы можете подстроить буферизацию PLINQ вызовом WithMergeOptions после AsParallel. Значение по умолчанию AutoBuffered обычно дает самые лучшие результаты. NotBuffered запрещает буферизацию, что полезно, если Вы хотите увидеть результаты настолько быстро, насколько это возможно; FullyBuffered полностью кеширует весь набор результата перед тем, как он будет доступен потребителю (операторы OrderBy и Reverse в сущности работают по такому методу, как как и операторы элемента, агрегации и преобразования).

PLINQ и порядок следования. Побочный эффект от параллелизации операторов запроса состоит в том, что когда результаты соединяются вместе, то не обязательно они будут в том же порядке, в каком были на входе, что показано в предыдущей диаграмме. Другими словами, больше не соблюдается нормальное сохранение порядка следования данных запросов LINQ.

Если Вам необходимо сохранить порядок следования, то это можно принудительно активировать вызовом AsOrdered() после AsParallel():

myCollection.AsParallel().AsOrdered()...

Вызов AsOrdered ухудшает производительность с большим количеством элементов, потому что PLINQ должна отслеживать оригинальную позицию каждого элемента.

Вы можете отменить в запросе действие AsOrdered позже вызовом AsUnordered: это вводит "точку случайной перетасовки" что позволяет запросу с этого момента выполняться более эффективно. Если Вам нужно сохранить входную последовательность только для двух первых операторов запроса, то сделайте так:

inputSequence.AsParallel().AsOrdered()
   .QueryOperator1()
   .QueryOperator2()
   .AsUnordered()       // С этого места очередность данных не соблюдается.
   .QueryOperator3()
   ...

Поведение AsOrdered не активировано по умолчанию, потому что для большинства запросов оригинальный порядок входных данных не имеет значения. Другими словами, если бы AsOrdered действовало по умолчанию, то Вы применяли бы AsUnordered для большинства своих параллельных запросов для получения самой лучшей производительности (иначе зачем нам параллелизм), что было бы обременительно.

Ограничения PLINQ. Есть несколько практических ограничений на то, что PLINQ мог бы распараллелить. Эти ограничения могут впоследствии быть устранены последующими сервис-паками и более новыми версиями Framework.

Следующие операторы запроса не поддаются параллелизации, если их исходные элементы не находятся в своей исходной позиции индексации:

• Take, TakeWhile, Skip и SkipWhile.

• Индексированные версии Select, SelectMany и ElementAt.

Большинство операторов запроса меняют позицию индексации элементов (включая те, которые удаляют элементы, такие как Where). Это означает, что если Вы хотите использовать такие предшествующие операторы, то они должны быть перед запросом.

Следующие операторы запроса могут быть распараллелены, но используют затратную стратегию разделения, которая иногда может работать медленнее, чем последовательная обработка:

• Join, GroupBy, GroupJoin, Distinct, Union, Intersect и Except.

Отобранные перегрузки оператора Aggregate в своих стандартных инкарнациях не параллелизуются - PLINQ для них предоставляет специальные перегрузки.

Все другие операторы параллелизуются, хотя их использование не дает гарантии, что Ваш запрос будет распараллелен. PLINQ может запустить Ваш запрос последовательно, если заподозрит, что дополнительные затраты на параллелизацию дадут эффект замедления определенного запроса. Вы можете отменить такое поведение и принудительно заставить применять параллельную обработку путем вызова следующего оператора после AsParallel():

.WithExecutionMode (ParallelExecutionMode.ForceParallelism)

Предположим, что нам нужно написать спеллчекер, который быстро обрабатывает большие документы, напрягая все доступные ядра. Путем формулирования нашего алгоритма в виде запроса LINQ мы могли бы очень просто распараллелить его.

Первый шаг состоит в загрузке словаря английских слов в HashSet для эффективного обращения к нему:

if (!File.Exists ("WordLookup.txt"))   // Тут содержится около 150000 слов
   new WebClient().DownloadFile (
      "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
 
var wordLookup = new HashSet< string> (
   File.ReadAllLines ("WordLookup.txt"),
   StringComparer.InvariantCultureIgnoreCase);

ОК, теперь создадим из нашей словарной таблицы тестовый "документ", представляющий массив из миллиона случайных слов. После сборки этого массива введем в него несколько орфографических ошибок:

var random = new Random();
string[] wordList = wordLookup.ToArray();
 
string[] wordsToTest = Enumerable.Range (0, 1000000)
   .Select (i => wordList [random.Next (0, wordList.Length)])
   .ToArray();
 
wordsToTest [12345] = "woozsh";  // Это введет пару ошибок
wordsToTest [23456] = "wubsie";  // орфографии.

Теперь мы можем выполнить нашу параллельную проверку орфографии путем тестирования wordsToTest на соответствие словарю wordLookup. PLINQ делает это очень просто:

var query = wordsToTest
   .AsParallel()
   .Select  ((word, index) => new IndexedWord { Word=word, Index=index })
   .Where   (iword => !wordLookup.Contains (iword.Word))
   .OrderBy (iword => iword.Index);
 
query.Dump();        // Отображение результата в LINQPad

Вот результат, который покажет LINQPad:

OrderedParallelQuery< IndexedWord> (2 элемента)
Слово Индекс
woozsh 12345
wubsie 23456

IndexedWord это пользовательская структура, которую мы определили следующим образом:

struct IndexedWord { public string Word; public int Index; }

Метод wordLookup.Contains в предикате дает запросу некую "суть" и делает его достойным параллелизации.

Мы могли бы немного упростить запрос с помощью анонимного типа вместо структуры IndexedWord. Однако это снизило бы производительность, потому что анонимные типы (которые являются классами, и поэтому ссылочными типами) привели бы к выделению памяти в куче и последующий за этим сбор мусора.

Разница в производительности между кешем и стеком могла бы быть незначительной с последовательными запросами, но с параллельными запросами стековое выделение памяти может быть довольно выгоднее. Причина в том, что стековое выделение памяти очень хорошо параллелизуется (у каждого потока свой собственный стек), в то же время все потоки должны конкурировать друг с другом на куче, что управляется одним менеджером памяти и сборщиком мусора.

Использование ThreadLocal< T>. Давайте расширим наш пример параллелизацией создания самого массива тестовых слов. Структурируем его алгоритм в запрос LINQ, что должно быть просто. Вот так выглядит последовательная версия:

string[] wordsToTest = Enumerable.Range (0, 1000000)
   .Select (i => wordList [random.Next (0, wordList.Length)])
   .ToArray();

К сожалению, вызов random.Next не потокобезопасен, так что не так просто вставить AsParallel() в этот запрос. Потенциальное решение проблемы - написать функцию, которая будет делать блокировку вокруг вызван random.Next; однако это снизило бы эффективность параллельности. Лучше всего использовать ThreadLocal< Random>, чтобы создать отдельный объект Random для каждого потока. Тогда мы сможем распараллелить запрос следующим образом:

var localRandom = new ThreadLocal< Random>
   ( () => new Random (Guid.NewGuid().GetHashCode()) );
 
string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
   .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
   .ToArray();

Наша дополнительная функция нужна для инстанциации объекта Random. В неё мы передаем хеш-код Guid, чтобы гарантировать, что если два объекта Random будут созданы в короткий промежуток времени, то они приведут к различным последовательностям случайного числа.

Заманчиво было бы перебрать Ваши готовые приложения в поиске запросов LINQ, и поэкспериментировать с ними на предмет параллелизации. Это обычно не будет продуктивным, потому что большинство проблем, для решение которых LINQ лучше всего подходит, выполняются очень быстро, и параллелизация не даст выгоды. Лучше всего найти те места, где интенсивные вычисления CPU составляют узкое место в программе, после чего задать себе вопрос: "можно ли как-то представить эти вычисления в виде запроса LINQ?" (добро пожаловать в побочные эффекты реструктуризации кода, когда LINQ обычно уменьшает код и делает его более читаемым).

PLINQ также хорошо подходит для случаев, когда есть сложности с ручным распараллеливанием вычислений. Это также подходит для структурированных блокирующихся задач, таких как вызов нескольких веб-сервисов сразу (см. "Вызов блокирующих и интенсивных по вводу/выводу функций").

PLINQ может быть плохим выбором для обработки изображений, потому что сборка вместе миллионов точек в выходную последовательность станет узким местом программы. Вместо этого лучше всего записывать точки прямо массив не обслуживаемого блока памяти, и использовать класс Parallel или параллелизм задач для управления многопоточностью (однако есть возможность победить конечный сбор результатов обработки использованием ForAll. Так делать стоит, если алгоритм обработки изображения естественно укладывается в LINQ).

Функциональная чистота. Поскольку PLINQ запускает Ваш запрос в параллельных потоках, Вы должны остерегаться выполнять не безопасные для потоков операции. В частности, запись в переменные даст побочные эффекты, и поэтому не будет потокобезопасной:

// Следующий запрос умножает каждый элемент на его позицию.
// Если на входе Enumerable.Range(0,999), то это должно дать на выходе
// квадраты чисел.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;

Мы могли бы сделать инкремент i потокобезопасным путем ввода блокировок или использования класса Interlocked [5], но проблема все еще останется, поскольку i не обязательно будет соответствовать позиции входного элемента. И добавление AsOrdered в запрос не решило бы эту проблему, потому что AsOrdered гарантирует только то, что элементы появятся на выходе в том же порядке, в каком они обрабатывались бы последовательно - но это не даст их действительную последовательную обработку.

Вместо этого запрос нужно переписать с использованием индексированной версии Select:

var query = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i);

Для лучшей производительности любые методы, вызываемые из операторов запроса, должны быть потокобезопасными, для чего они не должны записывать что-то в поля или свойства (должны быть без побочных эффектов, или функционально чистыми). Если потокобезопасность достигается блокировкой, то параллелизм запроса потенциально будет ограничен - на коэффициент, равный длительности блокировки, поделенной на общее время, потраченное в этой функции.

Вызов блокирующих и интенсивных по вводу/выводу функций. Иногда запрос выполняется долго не из-за загрузки CPU, а из-за ожидания чего-нибудь - такого как ожидание завершения загрузки веб-страницы или ответа аппаратуры. PLINQ может эффективно распараллелить такие запросы, если Вы дадите ему подсказку, вызвав WithDegreeOfParallelism после AsParallel. Например предположим, что мы хотим выполнить пинг шести web-сайтов одновременно. Вместо того, чтобы использовать неуклюжих асинхронных делегатов или вручную запускать 6 потоков, это можно эффективно выполнить запросом PLINQ:

from site in new[]
{
   "www.albahari.com",
   "www.linqpad.net",
   "www.oreilly.com",
   "www.takeonit.com",
   "stackoverflow.com",
   "www.rebeccarey.com"  
}
.AsParallel().WithDegreeOfParallelism(6)
let p = new Ping().Send (site)
select new
{
   site,
   Result = p.Status,
   Time = p.RoundtripTime
}

WithDegreeOfParallelism принуждает PLINQ запустить указанное количество потоков (в данном примере 6) одновременно. Это необходимо, когда вызываются такие блокирующие выполнение потока функции, как Ping.Send, потому что PLINQ иначе предполагает, что запрос интенсивно загружает CPU, и выделяет задачи соответствующим образом. Например, на двухядерной машине PLINQ может по умолчанию запустить одновременно только 2 задачи, что в данной ситуации очень нежелательно.

PLINQ обычно обслуживает каждую задачу в потоке согласно выделению в пуле потоков. Вы можете ускорить начальный запуск потоков вызовом ThreadPool.SetMinThreads.

Для другого примера предположим, что мы пишем систему наблюдения, и хотим с повторами комбинировать изображения из 4 камер безопасности в одно общее изображение для отображения на мониторе наблюдения. Представим камеру следующим классом:

class Camera
{
   public readonly int CameraID;
   public Camera (int cameraID) { CameraID = cameraID; }
 
   // Получение изображения с камеры: вернет простую строку вместо картинки.
   public string GetNextFrame()
   {
      Thread.Sleep (123);     // Симуляция времени получения снимка.
      return "Кадр из камеры " + CameraID;
   }
}

Чтобы получить составную картинку, мы должны вызвать GetNextFrame на каждом объекте камеры. Если заранее знать, что эта операция привязана к интенсивному вводу/выводу, то можно ускорить в 4 раза скорость смены кадров на мониторе применением параллелизации - даже на одноядерной машине. PLINQ делает это возможным с минимальными усилиями:

Camera[] cameras = Enumerable.Range (0, 4)   // Создание 4 объектов камер.
   .Select (i => new Camera (i))
   .ToArray();
 
while (true)
{
   string[] data = cameras
      .AsParallel().AsOrdered().WithDegreeOfParallelism (4)
      .Select (c => c.GetNextFrame()).ToArray();
 
   Console.WriteLine (string.Join (", ", data));   // Отображение данных...
}

GetNextFrame является блокирующимся методом, поэтому мы используем WithDegreeOfParallelism, чтобы достичь нужной параллельности. В нашем примере блокировка происходит, когда мы вызываем Sleep; в реальной жизни блокировка будет происходить из-за получения изображения из камеры, потому что поток ввода/вывода данных с камеры требует просто ожидания данных и не вовлекает интенсивное использование CPU.

Вызов AsOrdered гарантирует, что картинки будут отображается в определенном порядке. Поскольку здесь только 4 элемента в последовательности, это даст незначительный эффект на снижение производительности.

Изменение степени параллелизма. Вы можете только один раз вызвать WithDegreeOfParallelism в запросе PLINQ. Если нужно вызвать его снова, то следует принудительно слить данные запроса и заново его разделить повторным вызовом AsParallel() в запросе:

"The Quick Brown Fox"
   .AsParallel().WithDegreeOfParallelism (2)
   .Where (c => !char.IsWhiteSpace (c))
   .AsParallel().WithDegreeOfParallelism (3)    // Принудительное слияние + разделение
   .Select (c => char.ToUpper (c))

Отмена запроса (Cancellation). Отмена запроса PLINQ, когда Вы потребляете его результаты в цикле foreach, осуществляется просто: обычный break в цикле foreach завершит прокрутку цикла, и запрос будет автоматически отменен поскольку перечислитель был неявно уничтожен.

Для запроса, который завершается на оператор преобразования, элемента или агрегации, его можно отменить из другого потока через маркер отмены (cancellation token). Чтобы вставить маркер (token), вызовите WithCancellation после вызова AsParallel, передав в свойстве Token объект CancellationTokenSource. После этого другой объект может вызвать Cancel на источнике маркера, что выбросит исключение OperationCanceledException в потребителе запроса:

IEnumerable< int> million = Enumerable.Range (3, 1000000);
 
var cancelSource = new CancellationTokenSource(); var primeNumberQuery = 
   from n in million.AsParallel().WithCancellation (cancelSource.Token)
   where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
   select n;
 
new Thread (() => {
                    Thread.Sleep (100);      // Отмена запроса через
                    cancelSource.Cancel();   // 100 миллисекунд.
                  }
           ).Start();
try
{
   // Запуск запроса в работу:
   int[] primes = primeNumberQuery.ToArray();
   // Сюда мы никогда не попадем, потому что другой поток отменит работу этого кода.
}
catch (OperationCanceledException)
{
   Console.WriteLine ("Запрос отменен");
}

PLINQ не делает вытесняющий обрыв работы потока, потому что это опасно (см. "Прекращение работы потока" [5]). Вместо этого при отмене происходит ожидание завершения обработки текущего элемента у каждого рабочего потока перед завершением запроса. Это означает, что любые внешние методы, которые вызывает запрос, отработают до своего завершения.

Оптимизация по выходу. Одно из достоинств PLINQ - удобно соединяются результаты распараллеленной работы в одну выходную последовательность. Хотя иногда все, что требуется - запустить некую функцию по одному разу на каждый элемент:

foreach (int n in parallelQuery)
   DoSomething (n);

Если имеет место этот случай, и не имеет значения, в каком порядке будут обработаны элементы, то можно улучшить эффективность PLINQ методом ForAll.

Метод ForAll запускает делегата на каждый выходной элемент ParallelQuery. Это происходит напрямую в PLINQ, пропуская шаги слияния и перечисления результатов. Простейший пример:

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);

CSharp Threading ForAll

Сбор вместе и перечисление результатов не массивно дорогая операция, так что оптимизация ForAll уступает большим выгодам, когда большое количество быстро обрабатываемых входных элементов.

Оптимизация по входу. У PLINQ есть 3 стратегии разделения задачи для передачи входных элементов потокам:

Стратегия Выделение элемента Относительное быстродействие
Chunk partitioning Динамическое Среднее
Range partitioning Статическое Плохое или отличное
Hash partitioning Статическое Плохое

Для операторов запроса, которые требуют сравнения элементов (GroupBy, Join, GroupJoin, Intersect, Except, Union и Distinct), у Вас нет выбора: PLINQ всегда использует hash-разделение. Hash-разделение относительно не эффективное, потому что должно предварительно рассчитать хеш-код для каждого элемента (так что элементы с одинаковыми хеш-кодами можно было обработать в одном потоке). Если Вы решили, что это слишком медленно, то можете только вызвать AsSequential для запрета параллелизма.

Для всех других операторов запроса у Вас есть выбор - использовать разделение по диапазону (range) или по куску (chunk). По умолчанию:

• Если входная последовательность может быть проиндексирована (если это массив, или реализуется IList< T>), то PLINQ использует range-разделение.
• Иначе PLINQ выберет chunk-разделение.

В сущности range-разделение быстрее на длинных последовательностях, где обработка каждого элемента занимает примерно одинаковое время CPU. Иначе chunk-разделение обычно работает быстрее.

Чтобы принудительно включить range-разделение:

• Если запрос начинается на Enumerable.Range, замените его на ParallelEnumerable.Range.
• Иначе просто вызовите ToList или ToArray на входной последовательности (очевидно это снизит общую производительность, что следует учитывать).

ParallelEnumerable.Range это не просто ярлычок для вызова Enumerable.Range(...).AsParallel(). Это меняет производительность запроса путем активации range-разделения.

Чтобы принудительно включить chunk-разделение, оберните входную последовательность в вызов Partitioner.Create (находится в System.Collection.Concurrent) следующим образом:

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery = Partitioner.Create (numbers, true).AsParallel() .Where (...)

Второй аргумент Partitioner.Create показывает, что Вы хотите сбалансировать запрос по нагрузке, что является другим способом указать, что нужно применить chunk-разделение.

Chunk-разделение работает таким образом, что каждый рабочий поток для обработки периодически берет малые порции элементов (chunks) из входной последовательности. PLINQ начинает с выделения очень маленьких кусков (из одного или двух элементов за один раз), затем увеличивает размер куска по мере выполнения запроса: это гарантирует, что малые последовательности будут эффективно распараллелены, и большие последовательности не приведут к чрезмерным расщеплениям входных данных. Если произойдет так, что рабочий поток получает "простые" элементы (которые обработаются быстро), то это закончится тем, что он получит больше кусков. Такая система удерживает каждый поток одинаково нагруженным (ядра получаются "сбалансированными" по нагрузке); недостаток в том, что выборка элементов из общей входной последовательности требует синхронизации (обычно исключительной блокировки), что может привести к некоторым дополнительным тратам по вычислениям и конкуренции потоков.

CSharp Threading Partitioning

Range-разделение пропускает обычное перечисление по входу, и предварительно выделяет одинаковое количество элементов для каждого потока, избегая состязания на входной последовательности. Но если произойдет так, что некоторые потоки получат простые элементы, и выполнят свою обработку быстрее, то они останутся в ожидании завершения обработки в остальных потоках. Наш предыдущий пример с числовыми вычислениями может показать плохую эффективность при range-разделении. Примером, когда range-разделение работает хорошо, будет вычисление суммы квадратных корней первых 10 миллионов целых чисел:

ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))

ParallelEnumerable.Range вернет ParallelQuery< T>, так что Вам не нужно впоследствии вызывать AsParallel.

Range-разделение не обязательно будет выделять диапазоны элементов в непрерывных блоках - вместо этого может быть выбрана стратегия чередования (striping). Например, если есть два рабочих потока, то один может обрабатывать элементы с четными номерами, в то время как другой процесс с нечетными. Оператор TakeWhile почти наверняка инициирует striping-стратегию, чтобы избежать нежелательной обработки элементов в последовательности.

Параллелизация пользовательских агрегаций. PLINQ эффективно распараллеливает операторы Sum, Average, Min и Max без дополнительного вмешательства. Однако оператор Aggregate представляет для PLINQ дополнительные сложности.

Если Вы не знакомы с этим оператором, то можете думать об Aggregate как про обобщенную версию Sum, Average, Min и Max - другими словами, это оператор, который позволят Вам подключить пользовательский алгоритм накопления для реализации необычных агрегаций. Следующий код демонстрирует, как Aggregate может выполнить работу Sum:

int[] numbers = { 2, 3, 4 };
int sum = numbers.Aggregate (0, (total, n) => total + n);      // 9

Первый аргумент для Aggregate это начальное значение (seed), с которого начинается накопление. Второй аргумент это выражение для обновления накапливаемого значения с учетом нового элемента. Опционально Вы можете предоставить третий аргумент для проектирования конечного результата из накопленного значения.

Большинство проблем, для которых был разработан Aggregate, можно решить проще с циклом foreach, с более знакомым синтаксисом. Достоинство Aggregate в том, что большие и сложные агрегации можно декларативно распараллелить с помощью PLINQ.

Агрегации без начальной точки отсчета (unseeded). Вы может опустить значение seed при вызове Aggregate, в этом случае первый элемент станет неявным seed, и агрегация начнется со второго элемента. Вот предыдущий пример, переделанный на unseeded:

int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate ((total, n) => total + n);         // 6

Это даст тот же результат, что и ранее, но реально мы делаем другое вычисление. Раньше мы выполнили вычисление 0+1+2+3; теперь же 1+2+3. Мы можем лучше проиллюстрировать разницу применением умножения вместо сложения:

int[] numbers = { 1, 2, 3 };
int x = numbers.Aggregate (0, (prod, n) => prod * n);    // 0*1*2*3 = 0
int y = numbers.Aggregate (   (prod, n) => prod * n);    //   1*2*3 = 6

Как мы скоро увидим, unseeded-агрегации имеют преимущество для параллелизации, так как не требуют использования специальных перегрузок. Но с unseeded-агрегациями можно попасть в ловушку: их методы предназначены для использования с делегатами, которые являются коммутативными и ассоциативными. При другом использовании любой результат будет не интуитивным (с обычными запросами) или не детерминированным (в случае, когда Вы распараллелили запрос с помощью PLINQ). Для примера рассмотрим следующую функцию:

(total, n) => total + n * n

Она ни коммутативна, ни ассоциативна (т. е. 1+2*2 != 2+1*1). Посмотрим, что получится, когда мы используем её для сложения квадратов чисел 2, 3 и 4:

int[] numbers = { 2, 3, 4 };
int sum = numbers.Aggregate ((total, n) => total + n * n);    // 27

Вместо того, чтобы получить 29 как результат 2*2 + 3*3 + 4*4, получится 27 в результате вычисления 2 + 3*3 + 4*4. Мы можем исправить это несколькими способами. Первый способ - добавить 0 в качестве первого элемента:

int[] numbers = { 0, 2, 3, 4 };

Однако это не только не элегантно, но все еще даст некорректные результаты при параллелизации, потому что PLINQ усиливает подразумеваемую ассоциативность функции, выбирая несколько элементов как seed-значения. Для иллюстрации мы обозначим нашу функцию агрегации следующим образом:

f(total, n) => total + n * n

тогда запрос LINQ к объектам вычислил бы f(f(f(0, 2),3),4), в то время как PLINQ мог бы сделать f(f(0,2),f(3,4)) со следующим результатом:

Первая часть распараллеливания:  a = 0 + 2*2  (= 4)
Вторая часть распараллеливания:  b = 3 + 4*4  (= 19)
Конечный результат:              a + b*b  (= 365)
ИЛИ ДАЖЕ:                        b + a*a  (= 35) 

Есть 2 хороших решения. Первое - превратить это в seeded-агрегацию с нулем для seed. Единственная сложность в том, что с PLINQ нам нужно использовать специальную перегрузку, чтобы запрос не выполнятся последовательно (скоро мы это увидим).

Второе решение реструктурировать запрос так, чтобы функция агрегации стала коммутативной и ассоциативной:

int sum = numbers.Select (n => n * n).Aggregate ((total, n) => total + n);

Конечно, в таком простом сценарии Вы можете (и должны) использовать оператор Sum вместо Aggregate:

int sum = numbers.Sum (n => n * n);

Вы можете на самом деле пойти довольно далеко просто с Sum и Average. Например, можно использовать Average для вычисления среднего квадратического значения (root-mean-square, RMS):

Math.Sqrt (numbers.Average (n => n * n))

и даже стандартную девиацию:

double mean = numbers.Average();
double sdev = Math.Sqrt (numbers.Average (n =>
              {
                double dif = n - mean;
                return dif * dif;
              }));

Оба варианта безопасны, эффективным и полностью параллелизуются.

Параллелизация Aggregate. Мы только что говорили, что для unseeded-агрегаций предоставленный делегат должен быть ассоциативным и коммутативным. PLINQ даст некорректные результаты, если это правило нарушается, потому что это вовлечет несколько seed-значений из входной последовательности, чтобы одновременно агрегировать несколько частей последовательности.

Явные seed-агрегации могли бы выглядеть как безопасная опция с PLINQ, но к сожалению обычно они выполняются последовательно, потому что полагаются на одно значение seed. Чтобы смягчить это, PLINQ предоставляет другую перегрузку Aggregate, которая позволяет указать несколько значений seed - или скорее разработанную функцию для seed. Для каждого потока эта функция выполнится, чтобы генерировать отдельное значение seed, которое становится локальным аккумулятором для потока, в который будет выполняться агрегация элементов.

Мы также должны предоставить функцию для индикации, как комбинировать локальный и главный аккумуляторы. И наконец, эта перегрузка Aggregate (немного необоснованно) ожидает делегата для выполнения любого конечного преобразования для результата (того же самого результата Вы можете достичь, самостоятельно вызвав какую-нибудь функцию преобразования позже). Итак, имеется 4 делегата, передаваемых в таком порядке:

seedFactory. Вернет новый локальный аккумулятор.
updateAccumulatorFunc. Агрегирует элемент в локальный аккумулятор.
combineAccumulatorFunc. Комбинирует локальный аккумулятор с главным аккумулятором.
resultSelector. Применяет любое последнее преобразования к конечному результату.

В простых сценариях Вы можете указать значение seed вместо функции seed (вместо seedFactory). Эта тактика не сработает, когда seed является ссылочным типом, который Вы хотите мутировать, потому что один и тот же экземпляр тогда будет использоваться каждым потоком.

В качестве очень простого примера рассмотрим следующие суммы значений в массиве чисел:

numbers.AsParallel().Aggregate (
   () => 0,                                     // seedFactory
   (localTotal, n) => localTotal + n,           // updateAccumulatorFunc
   (mainTot, localTot) => mainTot + localTot,   // combineAccumulatorFunc
   finalResult => finalResult)                  // resultSelector

В этом примере мы могли бы получить тот же ответ, используя более простые способы (такие как агрегация unseeded, или что еще лучше, оператор Sum). Чтобы дать более реалистичный пример, предположим, что мы хотим вычислить частоту появления каждой буквы английского алфавита в указанной строке. Простое последовательное решение может выглядеть вот так:

string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
   int index = char.ToUpper (c) - 'A';
   if (index >= 0 && index <= 26) letterFrequencies [index]++;
};

Пример, когда строка будет очень большой - вычисление генома. Тогда "алфавит" должен состоять из букв a, c, g и t.

Чтобы распараллелить это, мы должны заменить оператор foreach на вызов Parallel.ForEach (что мы рассмотрим в следующей секции), но тут мы столкнемся с проблемой конкурентной обработки общего массива. И блокировка вокруг доступа к этому массив дала бы безопасность для потоков, но убила бы весь потенциал параллелизации.

Агрегат предоставляет опрятное решение. В этом случае в аккумулятор это массив, такой же как letterFrequencies из предыдущего примера. Вот последовательная версия, использующая Aggregate:

int[] result =
   text.Aggregate (
      new int[26],                  // Создание "аккумулятора"
      (letterFrequencies, c) =>     // Агрегирование буквы в аккумулятор
      {
         int index = char.ToUpper (c) - 'A';
         if (index >= 0 && index <= 26) letterFrequencies [index]++;
         return letterFrequencies;
      });

И теперь параллельная версия, использующая специальную перегрузку PLINQ:

int[] result =
   text.AsParallel().Aggregate (
      () => new int[26],            // Создание нового локального аккумулятора
 
      (localFrequencies, c) =>      // Агрегация в локальный аккумулятор
      {
         int index = char.ToUpper (c) - 'A';
         if (index >= 0 && index <= 26) localFrequencies [index]++;
         return localFrequencies;
      },
                                    // Агрегация аккумулятора local->main
      (mainFreq, localFreq) =>
         mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
 
      finalResult => finalResult    // Выполнение последнего действия
   );                               // над конечным результатом.

Обратите внимание, что функция локального накопления мутирует массив localFrequencies. Это важная и законная возможность для выполнения оптимизации, потому что localFrequencies локален для каждого потока.

[Класс Parallel]

PFX предоставляет базовую форму структурированного параллелизма через три статические метода в классе Parallel:

Parallel.Invoke. Запускает параллельно делегатов из массива.
Parallel.For. Выполняет параллельный эквивалент цикла for языка C#.
Parallel.ForEach. Выполняет параллельный эквивалент цикла foreach языка C#.

Все три метода блокируют выполнение до момента завершения работы. Как с PLINQ, после не обработанного исключения оставшиеся рабочие потоки останавливаются после своей текущей итерации и исключение (или исключения) выбрасываются обратно в вызывающий код - обернутое в AggregateException.

Parallel.Invoke. Parallel.Invoke параллельно запускает в массиве делегатов Action, и затем ждет их завершения. Самая простая версия метода определена следующим образом:

public static void Invoke (params Action[] actions);

Вот так мы можем использовать Parallel.Invoke, чтобы одновременно загрузить две веб-странички:

Parallel.Invoke (
   () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
   () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));

Внешне это выглядит как удобный ярлычок для создания и ожидания двух объектов Task (или асинхронных делегатов). Но здесь есть важное отличие: Parallel.Invoke все еще работает эффективно, если Вы передадите в массиве миллион делегатов. Это потому, что Parallel.Invoke делит большое количество элементов на пакеты, которые присваивает нижележащим задачам Task - вместо того, чтобы создавать отдельную Task для каждого делегата.

Как и со всеми методами Parallel, Вы действуете самостоятельно при сведении вместе результатов обработки. Это означает, что Вам нужно учитывать сохранение безопасности потоков. Следующий код, к примеру, не потокобезопасный:

var data = new List< string>();
Parallel.Invoke (
   () => data.Add (new WebClient().DownloadString ("http://www.foo.com")),
   () => data.Add (new WebClient().DownloadString ("http://www.far.com")));

Блокировка вокруг этого списка List исправит проблему, хотя блокировка создаст узкое место, если у Вас очень большой массив быстро исполняемых делегатов. Есть решение лучше - использовать thread-safe collection, такую как ConcurrentBag, что идеально для этого случая.

Parallel.Invoke также перегружается для того, чтобы принять объект ParallelOptions:

public static void Invoke (ParallelOptions options,
                           params Action[] actions);

Вместе с ParallelOptions Вы можете вставить маркер отмены (cancellation token), ограничить максимальную конкуренцию и указать пользовательский планировщик задач (custom task scheduler). Маркер отмены уместен, когда Вы выполняете (грубо) больше задач, чем имеется в наличии ядер: при отмене любые не запущенные делегаты будут заброшены. Однако любые уже выполняющиеся делегаты будут продолжать свое выполнение до завершения. См. "Отмена запроса (Cancellation)" в [4] для примера использования маркеров отмены.

Parallel.For и Parallel.ForEach. Это является эквивалентом циклов for и foreach языка C#, но с каждой итерацией тела цикла, выполняющейся параллельно, вместо того, чтобы итерации выполнялись последовательно. Вот их (простейшие) сигнатуры:

public static ParallelLoopResult For (int fromInclusive,
                                      int toExclusive,
                                      Action< int> body)
 
public static ParallelLoopResult ForEach< TSource> (IEnumerable< TSource> source,
                                                    Action< TSource> body)

Следующий последовательный цикл for:

for (int i = 0; i < 100; i++)
   Foo (i);

параллелизуется примерно так:

Parallel.For (0, 100, i => Foo (i));

или еще проще:

Parallel.For (0, 100, Foo);

И следующий последовательный цикл foreach:

foreach (char c in "Hello, world")
   Foo (c);

параллелизуется наподобие:

Parallel.ForEach ("Hello, world", Foo);

Практический пример: если мы импортируем пространство имен System.Security.Cryptography namespace, то можем параллельно генерировать 6 пар public/private строк ключей:

var keyPairs = new string[6];
 
Parallel.For (0, keyPairs.Length,
              i => keyPairs[i] = RSA.Create().ToXmlString (true));

Как с Parallel.Invoke, мы можем передать для Parallel.For и Parallel.ForEach большое количество обрабатываемых элементов, и они будут эффективно разбиты на несколько задач.

Последний запрос также можно реализовать с помощью PLINQ:

string[] keyPairs =
   ParallelEnumerable.Range (0, 6)
   .Select (i => RSA.Create().ToXmlString (true))
   .ToArray();

Сравнение внешних и внутренних циклов. Parallel.For и Parallel.ForEach обычно работают лучше на внешних циклах (outer loop) чем на внутренних (inner loop). Это вызвано тем, что с внешним циклом Вы предлагаете большие блоки работы для параллелизации, снижая затраты на внешнее обслуживание. Параллелизация и внутренних, и внешних циклов обычно не требуется. В следующем примере нам понадобилось бы больше 100 ядер, чтобы извлечь выгоду от внутренней параллелизации:

Parallel.For (0, 100, i =>
{
   Parallel.For (0, 50, j => Foo (i, j));    // В этом месте последовательное выполнение
});                                          // цикла было бы лучше параллельного.

Индексированный Parallel.ForEach. Иногда полезно знать индекс итерации цикла. С последовательным foreach это просто:

int i = 0;
foreach (char c in "Hello, world")
   Console.WriteLine (c.ToString() + i++);

Инкремент общей переменной, однако, не безопасно для потоков в контексте параллельного выполнения. Вы должны вместо этого использовать следующую версию ForEach:

public static ParallelLoopResult ForEach< TSource> (IEnumerable< TSource> source,
                                                    Action< TSource, ParallelLoopState, long> body)

Мы проигнорируем ParallelLoopState (что будет рассмотрено в следующей секции). Сейчас нам интересен третий параметр типа long делегата Action, который показывает индекс цикла:

Parallel.ForEach ("Hello, world", (c, state, i) =>
{
   Console.WriteLine (c.ToString() + i);
});

Чтобы перевести это в практический контекст, вернемся к проекту спеллчекера, который мы писали на основе PLINQ. Следующий код загружает словарь для проверки миллиона слов:

if (!File.Exists ("WordLookup.txt"))   // Содержит около 150000 слов
   new WebClient().DownloadFile (
      "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
 
var wordLookup = new HashSet< string> (
   File.ReadAllLines ("WordLookup.txt"),
      StringComparer.InvariantCultureIgnoreCase);
 
var random = new Random();
string[] wordList = wordLookup.ToArray();
 
string[] wordsToTest = Enumerable.Range (0, 1000000)
   .Select (i => wordList [random.Next (0, wordList.Length)])
   .ToArray();
 
wordsToTest [12345] = "woozsh";     // Ввод нескольких ошибок
wordsToTest [23456] = "wubsie";     // орфографии.

Мы можем выполнить проверку орфографии нашего массива wordsToTest с использованием индексированной версии Parallel.ForEach следующим образом:

var misspellings = new ConcurrentBag< Tuple< int,string>>();
 
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
   if (!wordLookup.Contains (word))
      misspellings.Add (Tuple.Create ((int) i, word));
});

Обратите внимание, что мы должны были сопоставить результаты в thread-safe коллекции: из-за этой необходимости этот код имеет недостаток по сравнению с кодом на PLINQ. Достоинство PLINQ в том, что мы избегаем цены применения индексированного оператора запроса Select - который менее эффективный, чем индексированный ForEach.

ParallelLoopState: ранний выход из циклов. Из-за того, что тело цикла в параллельном For или ForEach это делегат, Вы не можете выполнить ранний выход из цикла с помощью оператора break. Вместо этого Вы должны вызвать Break или Stop на объекте ParallelLoopState:

public class ParallelLoopState
{
   public void Break();
   public void Stop();
 
   public bool IsExceptional { get; }
   public bool IsStopped { get; }
   public long? LowestBreakIteration { get; }
   public bool ShouldExitCurrentIteration { get; }
}

Получить ParallelLoopState просто: все версии For и ForEach перегружены чтобы принять тела цикла типа Action< TSource, ParallelLoopState>. Распараллелим вот это:

foreach (char c in "Hello, world")
   if (c == ',')
      break;
   else
      Console.Write (c);

следующим образом:

Parallel.ForEach ("Hello, world", (c, loopState) =>
{
   if (c == ',')
      loopState.Break();
   else
      Console.Write (c);
});

Выведенный результат:

Hlloe

Можно увидеть из выведенного результата "Hlloe", что тела цикла могут выполняться в случайном порядке. Кроме этого отличия, вызов Break приведет к тому, что как минимум одинаковые элементы в цикле будут выполняться последовательно: этот пример всегда выведет как минимум буквы H, e, l, l и o в некотором порядке. Вызов Stop отличается от вызова Break тем, что принуждает все потоки завершиться сразу после их текущей итерации. В нашем примере вызов Stop дал бы нам подмножество букв H, e, l, l и o, если другой поток отстает. Вызов Stop полезно сделать, когда Вы обнаружили то, что ищете, или когда что-то пошло не так, и мы не хотим смотреть на результаты.

Методы Parallel.For и Parallel.ForEach возвратят объект ParallelLoopResult, который публикует свойства IsCompleted и LowestBreakIteration. Они скажут Вам, доработал ли цикл до завершения, и если нет, то на какой итерации цикл был оборван.

Если LowestBreakIteration вернет null, то это означает, что Вы вызвали на цикле Stop (вместо Break).

Если тело Вашего цикла длинное, то Вы можете захотеть, чтобы другие потоки остановили выполнение внутри своего тела в случае раннего вызова Break или Stop. Вы можете выполнить это опросом свойства ShouldExitCurrentIteration в различных местах своего кода; оно станет true сразу после Stop - или почти сразу после Break.

ShouldExitCurrentIteration также станет true после запроса отмены (cancellation request) - или если в цикле выброшено исключение.

IsExceptional позволит Вам узнать, произошло ли исключение в другом потоке. Любое не обработанное исключение приведет к остановке цикла после каждой текущей итерации потока: чтобы избежать этого, Вы должны явно обработать исключения в своем коде.

Оптимизация с локальными переменными. Parallel.For и Parallel.ForEach предоставляют набор перегрузок, реализующих аргумент обычного типа с именем TLocal. Эти перегрузки разработаны, чтобы помочь Вам оптимизировать слияния данных в циклах с интенсивными итерациями. Самый простой пример:

public static ParallelLoopResult For < TLocal> (
   int fromInclusive,
   int toExclusive,
   Func < TLocal> localInit,  Func < int, ParallelLoopState, TLocal, TLocal> body,
   Action < TLocal> localFinally);

Эти методы редко нужны на практике, потому что их целевые сценарии обычно обслуживаются PLINQ (что удачнее, потому что синтаксис этих перегрузок пугает).

В сущности, проблема заключается в следующем: предположим, что мы хотим сложить квадратные корни чисел от 1 до 10000000. Вычисление 10 миллионов квадратных корней просто параллелизуется, но суммирование их значений проблематично, потому что мы должны реализовать блокировку вокруг обновления результата:

object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
              i => { lock (locker) total += Math.Sqrt (i); });

Выигрыш от параллелизации нивелируется ценой получения 10 миллионов блокировок плюс результирующая блокировка.

Однако реально нам не нужны эти 10 миллионов блокировок. Вообразите команду волонтеров, собирающих большой объем мусора. Если бы все рабочие использовали бы одну мусорную корзину, то перемещения рабочих и их конкуренция за корзину сделали бы общий процесс сбора мусора экстремально не эффективным. Очевидное решение - дать каждому рабочему по "локальному" мусорному ведру, которое он может по случаю опорожнять в общую корзину.

TLocal-версии для For и ForEach работают точно по такому принципу. Волонтерами тут служат внутренние рабочие потоки, и локальное значение представляет местное ведерко мусора. В случае применения Parallel для такой работы Вы должны передать ему двух дополнительных делегатов, чтобы показать:

1. Как инициализировать новое локальное значение.
2. Как комбинировать локальную агрегацию с главным значением.

Дополнительно вместо тела делегата, возвращающего void, он должен возвращать новый агрегат для этого локального значения. Вот наш переработанный пример:

object locker = new object();
double grandTotal = 0;
 
Parallel.For (1, 10000000, 
   () => 0.0,                       // Инициализация локального значения.
   (i, state, localTotal) =>        // Делегат тела цикла. Обратите внимание, что он возвращает
      localTotal + Math.Sqrt (i),   // новое общее локальное значение localTotal.
 
   localTotal =>                                   // Добавление локального значения
      { lock (locker) grandTotal += localTotal; }  // к общему значению (с блокировкой).
);

Мы все еще должны применить блокировку, то теперь только вокруг агрегации локального значения к общему. Это делает весь процесс кардинально эффективнее.

Как было замечено ранее, PLINQ часто хорошо подходит для подобных задач. Наш пример мог бы очень просто параллелится с помощью PLINQ следующим образом:

ParallelEnumerable.Range(1, 10000000)
                  .Sum (i => Math.Sqrt (i))

Обратите внимание, что здесь мы применили ParallelEnumerable, чтобы принудительно задать разделение по диапазону (range partitioning): для нашего примера это улучшает производительность, потому что все числа обрабатываются по времени примерно одинаково.

В более сложных сценариях Вы можете использовать оператор Aggregate запроса LINQ вместо Sum. Если Вы предоставили функцию локального начального значения (local seed factory), то ситуация была бы похожей на предоставление локального значения функции с циклом Parallel.For.

[Параллелизм задач]

Task Parallelism (параллелизм задач) это метод параллелизации самого низкого уровня с помощью PFX. В System.Threading.Tasks namespace для этого определены классы:

Класс Назначение
Task Для управления единицей работы.
Task< TResult> Для управления единицей работы с возвратом значения.
TaskFactory Для создания задач.
TaskFactory< TResult> Для создания задач и задач-продолжений с тем же самым возвращаемым значением.
TaskScheduler Для управления планированием запуска задач.
TaskCompletionSource Для ручного управления рабочим процессом задач.

В сущности задача (task) это облегченный объект для управления распараллеленной единицей работы. Task позволяет избежать накладных расходов на запуск выделенного потока с помощью использования пула потоков (CLR thread pool): это тот же самый пул потоков, используемый ThreadPool.QueueUserWorkItem, что появилось в CLR 4.0, чтобы эффективно работать с Tasks (и обычно более эффективно).

Tasks можно использовать всякий раз, когда Вы хотите что-то выполнить параллельно. Однако это заточено для задействования нескольких ядер: фактически класс Parallel и запросы PLINQ внутренне построены на конструкциях параллелизма задач.

Tasks делают больше, чем просто дают простой и эффективный способ работы через пул потоков. Они также предоставляют некоторые мощные функции для управления единицами работы, включая следующее:

• Настройка планирования выполнения задач (task scheduling).
• Организация взаимосвязи родительский/дочерний (parent/child), когда одна task запускается из другой.
• Реализация совместной отмены (cooperative cancellation).
• Ожидание набора задач - без конструкции сигнализации.
• Подключение задачи (задач) продолжения (continuation).
• Планирование продолжения на основе нескольких предшествующих задач.
• Распространение исключений на родительские задачи, задачи продолжения и потребители данных задач (task consumers).

Задачи также реализуют локальные рабочие запросы (local work queues). Это оптимизация, которая позволяет Вам эффективно создавать множество быстро выполняющихся дочерних задач без возникновения накладных расходов на конкуренцию, которая иначе возникла бы с одной рабочей очередью.

Библиотека TLP (Task Parallel Library) позволяет Вам создать сотни (или даже тысячи) задач с минимальными затратами. Но если Вы хотите создать миллионы задач, то понадобится разделить эти задачи на большие блоки работы, чтобы сохранить эффективность. Класс Parallel и PLINQ делают это автоматически.

Visual Studio 2010 предоставляет новое окно для мониторинга задач (Debug -> Window -> Parallel Tasks). Это эквивалентно окну Threads, но только для задач. Окно Parallel Stacks также предоставляет специальный режим просмотра отладочной информации для задач.

Создание и запуск задач. В описании пула потоков [2] уже обсуждалось, что Вы можете создать и запустить Task вызовом Task.Factory.StartNew с передачей делегата в Action:

Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!"));

В обычной версии Task< TResult> (подкласс Task) позволяет Вам получить данные из задачи после её завершения:

Task< string> task = Task.Factory.StartNew<string> (() =>   // Запуск задачи
{
   using (var wc = new System.Net.WebClient())
      return wc.DownloadString ("http://www.linqpad.net");
});
 
RunSomeOtherMethod();         // Мы можем делать другую работу параллельно...
 
string result = task.Result;  // Ожидание завершения задачи и получение результата.

Task.Factory.StartNew создает и запускает задачу за один шаг. Вы можете разделить эти операции, если сначала инстанциировать объект Task, и затем вызвать Start:

var task = new Task (() => Console.Write ("Hello"));
...
task.Start();

Задача, которую Вы создаете таким способом, можно также запустить синхронно (в том же потоке) вызовом RunSynchronously вместо Start.

Вы можете отслеживать состояние выполнения задачи через её свойство Status.

Указание состояния объекта. Когда инстанциируется задача, или вызывается Task.Factory.StartNew, можно указать объект состояния (state object), который передается целевому методу. Это полезно, если Вы хотели бы вызвать метод напрямую вместо использования lambda-выражения:

static void Main()
{
   var task = Task.Factory.StartNew (Greet, "Hello");
   task.Wait();   // Ожидание завершения задачи.
}
 
static void Greet (object state) { Console.Write (state); }    // Hello

Учитывая, что у нас в C# есть lambda-выражения, можно ввести объект состояния для лучшего использования, когда задаче назначается осмысленное имя. Мы можем тогда использовать AsyncState для опроса этого имени:

static void Main()
{
   var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
   Console.WriteLine (task.AsyncState);      // Greeting
   task.Wait();
}
 
static void Greet (string message) { Console.Write (message); }

Visual Studio отобразит AsyncState каждой задачи в окне Parallel Tasks, так что имея понятное имя задачи, можно значительно упростить отладку.

TaskCreationOptions. Вы можете подстроить выполнение задачи, указав перечисление TaskCreationOptions, когда вызываете StartNew (или когда инстанциируете задачу Task). TaskCreationOptions это перечисление флагов, содержащее следующие (комбинируемые по ИЛИ) значения:

LongRunning
PreferFairness
AttachedToParent

LongRunning советует планировщику выделить для задачи отдельный поток. Это выгодно для долго работающих задач, потому что иначе может образоваться пожирающая ресурсы очередь, с принудительной приостановкой быстро выполняющихся задач на неразумно долгое время. LongRunning также хорошо применять для блокирующихся задач.

Проблема постановки задач в очередь возникает потому, что планировщик задач обычно пытается сохранить достаточное количество одновременно активных задач на потоках, чтобы поддерживать занятость каждого ядра CPU. Ограничение нагрузки CPU слишком большим количеством активных потоков позволяет избежать деградации производительности, которая произошла бы, если бы операционная система была бы вынуждена тратить слишком много времени на обслуживание тиков (expensive time slicing) и переключение контекста (context switching).

PreferFairness говорит планировщику попытаться обеспечить условия, чтобы выполнение задач было запланировано в том же порядке, в каком они были запущены. Обычно порядок запуска задач могло бы быть другим, потому что имеется внутренняя оптимизация планирования задач с использованием локальных рабочих очередей. Эта оптимизация дает практическую выгоду с очень маленькими (четко очерченными) задачами.

AttachedToParent используется для создания дочерних задач.

Дочерние задачи. Когда одна задача запускает другую, Вы опционально можете установить взаимодействие parent-child путем указания TaskCreationOptions.AttachedToParent:

Task parent = Task.Factory.StartNew (() =>
{
   Console.WriteLine ("I am a parent");
 
   Task.Factory.StartNew (() =>     // Отсоединенная (т. е. родительская) задача
   {
      Console.WriteLine ("I am detached");
   });
 
   Task.Factory.StartNew (() =>     // Дочерняя задача (child)
   {
      Console.WriteLine ("I am a child");
   }, TaskCreationOptions.AttachedToParent);
});

Особенность дочерних (child) задач в том, что когда Вы ждете завершения родительской (parent) задачи, то она ждет, пока завершатся все её дочерние задачи. Это может быть в частности полезно, когда дочерняя задача является продолжением (continuation), как мы скоро увидим.

Ожидание на задачах. Вы можете явно ждать завершения задачи двумя способами:

• Вызовом её метода Wait (опционально с таймаутом)
• Доступом к её свойству Result (в случае Task< TResult>)

Вы можете также ждать на нескольких задачах сразу с помощью статических методов Task.WaitAll (ждет завершения всех указанных задач) и Task.WaitAny (ждет завершения только одной задачи).

WaitAll работает подобно ожиданию каждой задачи по отдельности, но более эффективно тем, что это требует (самое большее) только одного переключения контекста. Также если одна или большее количество задач выбросят необработанное исключение, WaitAll все еще будет ждать каждую задачу - и затем перебросить одно AggregateException, в котором аккумулированы исключения от каждой задачи, в которой такое исключение произошло. Это эквивалентно следующему:

// Предположим, что t1, t2 и t3 это задач:
var exceptions = new List< Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);

Вызов WaitAny эквивалентно ожиданию ManualResetEventSlim, что сигнализируется каждой завершающейся задачей.

Так же, как и таймаут, Вы можете передать маркер отмены (cancellation token) методам Wait: это позволит Вам отменить ожидание, а не саму задачу.

Задачи обработки исключений. Когда Вы хотите ждать завершения задачи (вызовом её метода Wait или доступом к её свойству Result), то любые не обработанные исключения перебрасываются в вызывающий код, обернутые в объект AggregateException. Это обычно дает возможность избегать необходимости написания кода обработки исключений в теле задач; вместо этого мы можем сделать следующее:

int x = 0;
Task< int> calc = Task.Factory.StartNew (() => 7 / x);
try
{
   Console.WriteLine (calc.Result);
}
catch (AggregateException aex)
{
   Console.Write (aex.InnerException.Message);  // Попытка поделить на 0
}

Вам все еще нужно реализовать обработку исключений для отключенных автономных задач (не порожденные задачи, на которых нет ожидания) чтобы предотвратить возникновение необработанного и отказа всего приложения, когда задача выпадает из процесса и убирается сборщиком мусора (к этой ситуации относится следующее замечание). То же самое относится к задачам, на которых происходит ожидание с таймаутом, потому что исключение, выброшенное по истечении интервала таймаута, будет иначе "не обработанным" (unhandled).

Статическое событие TaskScheduler.UnobservedTaskException предоставляет последний шанс обработать необработанные исключения задачи. Путем обработки этого события Вы можете перехватить исключения задачи, которые иначе могут привести к падению приложения - и обеспечить свою собственную логику решить проблему исключения (или записать в лог информацию об исключении).

Для порожденных задач ожидание родителя неявно приводит к ожиданию дочерних задач - и тогда всплывут дочерние исключения:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() => 
{
   Task.Factory.StartNew (() =>     // Сын
   {
      Task.Factory.StartNew (() => { throw null; }, atp);      // Внук
   }, atp);
});
 
// Следующий вызов выбросит исключение NullReferenceException (обернутое
// вложением в AggregateExceptions):
parent.Wait();

Интересно, что если Вы проверите свойство задачи Exception после того, как она выбросила исключение, то акт чтения этого свойства предотвратит последующее падение приложения. Рациональность такого подхода в том, что разработчики PFX не хотят, чтобы Вы игнорировали исключения - пока Вы не подтвердите их каким-то способом, они Вас не накажут завершением программы.

Не обработанное исключение на задаче не приведет к немедленному завершению приложения: вместо этого завершение будет задержано до тех пор, пока сборщик мусора не захватит задачу и не вызовет её "завершатель". Завершение откладывается, потому что нельзя знать точно, хотите ли Вы вызвать Wait или проверить свойство Result или Exception до момента, когда задача будет уничтожена сборщиком мусора. Эта задержка может иногда ввести Вас в заблуждение по поводу источника ошибки (хотя отладчик Visual Studio может помочь, если Вы разрешите точку останова на исключениях first-chance).

Как мы увидим, альтернативная стратегия разборок с исключениями состоит в использовании продолжений (continuations).

Отмена задач. Вы можете опционально передать маркер отмены (cancellation token), когда запускаете задачу. Это позволит Вам отменить задачи через шаблон совместной отмены (cooperative cancellation), как описывалось ранее:

var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
 
Task task = Task.Factory.StartNew (() => 
{
   // Тут что-то делаем...
   token.ThrowIfCancellationRequested();     // Проверка запроса на отмену
   // Тут что-то делаем...
}, token);
...
cancelSource.Cancel();

Чтобы детектировать отмененную задачу, захватите (catch) исключение AggregateException, и проверьте внутреннее исключение следующим образом:

try 
{
   task.Wait();
}
catch (AggregateException ex)
{
   if (ex.InnerException is OperationCanceledException)
      Console.Write ("Задача отменена!");
}

Если Вы хотите явно выбросить исключение OperationCanceledException (вместо вызова token.ThrowIfCancellationRequested), то должны передать маркер отмены в конструктор OperationCanceledException. Если Вы не сделаете этого, задача не закончится со статусом TaskStatus.Canceled, и не сработают продолжения OnlyOnCanceled.

Если задача отменена перед её запуском, то не будет запланировано её выполнение - вместо этого на задаче будет немедленно выброшено исключение OperationCanceledException.

Из-за того, что маркеры отмены (cancellation tokens) распознаются другим API, Вы можете передать их в другие конструкции и отмены будут распространены беспрепятственно:

var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
 
Task task = Task.Factory.StartNew (() =>
{
   // Передача нашего маркера отмены в запрос PLINQ:
   var query = someSequence.AsParallel().WithCancellation (token)...
   ... перечисление запроса ...
});

Вызов Cancel на cancelSource в этом примере отменит запрос PLINQ, который выбросит исключение OperationCanceledException на теле задачи, которое тогда отменит задачу.

Маркеры отмены, которые Вы можете передать в такие методы, как Wait и CancelAndWait, позволяют Вам отменить операцию ожидания, но не саму задачу.

Продолжения (continuations). Иногда полезно запустить задачу сразу после того, как другая завершится (или в ней произойдет ошибка/исключение). Метод ContinueWith в классе Task делает именно это:

Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));

Как только task1 (предшествующая задача) завершится, потерпит сбой или будет отменена, автоматически запустится task2 (задача продолжения, continuation). Если task1 завершится перед тем, как запустится вторая строка, то выполнение task2 будет запланировано на выполнение правильным способом. Аргумент ant, переданный в lambda-выражение задачи продолжения, является ссылкой на предшествующую задачу.

Наш пример продемонстрировал простейший вид продолжения, и функционально он делает следующее:

Task task = Task.Factory.StartNew (() =>
{
   Console.Write ("antecedent..");
   Console.Write ("..continuation");
});

Однако алгоритм, основанный на продолжении, более гибкий, потому что Вы могли бы сначала ожидать на task1, и затем ожидать на task2. Это в частности полезно, если task1 возвращает данные.

Другое (более тонкое) различие в том, что по умолчанию предшествующая задача и задача продолжения могут быть выполнены в разных потоках. Вы можете принудительно запустить их в одном потоке, если укажете TaskContinuationOptions.ExecuteSynchronously, когда вызываете ContinueWith: это может улучшить производительность в случае очень частых мелких продолжений путем уменьшения косвенных обращений.

Продолжения и Task< TResult>. Как и обычные задачи, продолжения могут быть типа Task< TResult> с возвратом данных. В следующем примере мы вычисляем Math.Sqrt(8*2) с использованием поставленных в цепочку задач, и затем отображаем результат:

Task.Factory.StartNew< int> (() => 8)
   .ContinueWith (ant => ant.Result * 2)
   .ContinueWith (ant => Math.Sqrt (ant.Result))
   .ContinueWith (ant => Console.WriteLine (ant.Result));   // 4

Наш пример возможно излишне упрощен; в реальной жизни эти lambda-выражения могли бы вызывать функции с интенсивными вычислениями.

Продолжения и исключения. Задача продолжения (continuation) может узнать, произошло ли исключение в через свойство Exception предшествующей задачи (antecedent task). Следующий код запишет в консоль подробности исключения NullReferenceException:

Task task1 = Task.Factory.StartNew (() => { throw null; });
Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception));

Если antecedent-задача выбросила исключение continuation-не опросила свойство Exception antecedent-задачи (и на antecedent-задаче нет ожидания), то исключение считается не обработанным и приложение завершится по ошибке (если не будет обработано исключение через TaskScheduler.UnobservedTaskException).

Безопасный шаблон - перебросить исключения antecedent-задачи. Пока происходит ожидание на continuation-задаче, исключение будет распространено и переброшено на ожидающий код:

Task continuation = Task.Factory.StartNew     (()  => { throw null; })
                                .ContinueWith (ant =>
   {
      if (ant.Exception != null) throw ant.Exception;    // Продолжение обработки...
   });
 
continuation.Wait();    // Исключение теперь перебрасывается обратно в вызывающий код

Другой способ разобраться с исключениями - указать другие continuation-задачи для результатов с исключениями, отличающиеся от continuation-задач для нормального завершения. Это решается через TaskContinuationOptions:

Task task1 = Task.Factory.StartNew (() => { throw null; });
 
Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
                                 TaskContinuationOptions.OnlyOnFaulted);
 
Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
                              TaskContinuationOptions.NotOnFaulted);

Как мы увидим позже, этот шаблон в частности полезен совместно с дочерними задачами.

Следующий метод расширения "глотает" не обработанные исключения задачи:

public static void IgnoreExceptions (this Task task)
{
   task.ContinueWith (t => { var ignore = t.Exception; },
      TaskContinuationOptions.OnlyOnFaulted);

Вот, как это использовалось бы (это можно было бы улучшить путем добавления кода для записи в лог информации исключения):

Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();

Продолжения и дочерние задачи. Мощная функция continuation-задач в том, что они начинаются только тогда, когда все дочерние задачи завершились. Начиная с этого места любые выброшенные дочерними задачами исключения перенаправляются в continuation-задачу.

В следующем примере мы запустим 3 дочерние задачи, каждая из них выбрасывает NullReferenceException. Затем мы перехватываем эти все исключения за один раз через continuation родительской задачи:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
   Task.Factory.StartNew (() => { throw null; }, atp);
   Task.Factory.StartNew (() => { throw null; }, atp);
   Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
                    TaskContinuationOptions.OnlyOnFaulted);

CSharp Threading Continuations

Продолжения по условию. По умолчанию запуск задачи-продолжения планируется безусловно - всякий раз, когда предшествующая задача завершается, выбрасывает исключение или отменяется. Вы можете изменить это поведение установкой (комбинируемых по ИЛИ) флагов, находящихся в перечислении TaskContinuationOptions. Вот 3 основных флага, управляющих продолжением по условию (conditional continuation):

NotOnRanToCompletion = 0x10000,
NotOnFaulted = 0x20000,
NotOnCanceled = 0x40000,

Эти флаги "отнимающие" в том смысле, что чем больше Вы их применяете, тем меньше возможности у continuation-задачи выполниться. Для удобства вот также следующие заранее скомбинированные значения:

OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,
OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,
OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted

Примечание: комбинирование всех Not* флагов (NotOnRanToCompletion, NotOnFaulted, NotOnCanceled) бессмысленно, потому что в результате continuation всегда будет отменено.

"RanToCompletion" означает, что запуск будет осуществлен при успешном завершении antecedent-задачи, т. е. когда не было отмены (cancellation) или не обработанных исключений.

"Faulted" означает, что на antecedent-задаче произошло не обработанное исключение.

"Canceled" означает одно из двух случаев:

• Задача antecedent была отменена (canceled) через её маркер отмены (cancellation token). Другими словами, было выброшено исключение внутри antecedent-задачи - чье свойство CancellationToken совпало переданному в antecedent-задачу, когда она была запущена.
• Задача antecedent была неявно отменена из-за того, что она не удовлетворила утверждению продолжения по условию (conditional continuation predicate).

Важно осознавать, что когда continuation не выполняется на основании этих флагов, то continuation не забыто и не отброшено, оно отменено (canceled). Это означает, что любые продолжения на самом continuation затем будут запущены - кроме случая, когда Вы применили NotOnCanceled. Например, предположим следующее:

Task t1 = Task.Factory.StartNew (...);
 
Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
                              TaskContinuationOptions.OnlyOnFaulted);
 
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));

Как здесь установлено, t3 всегда запланировано на выполнение - даже если t1 выбросит исключение. Это потому, что если t1 успешно завершится, то задача fault будет отменена, и не будет ограничений на продолжение для t3, и t3 запустится без условия.

CSharp Threading ConditionalContinuations

Если мы хотим выполнить t3 только если fault реально запустится, то вместо этого нужно написать следующее:

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
                              TaskContinuationOptions.NotOnCanceled);

Примечание: альтернативно мы могли бы указать OnlyOnRanToCompletion; разница будет в том, что t3 тогда не запустилась бы, если было бы выброшено исключение в задаче fault.

Продолжения с несколькими предшественниками. Другая полезная особенность продолжений - можно запланировать выполнение continuation-задач, базируясь на завершении нескольких предшествующих (antecedent) задач. ContinueWhenAll планирует выполнение, когда все antecedent-задачи были завершены; ContinueWhenAny планирует выполнение, когда одна antecedent-задача завершилась. Оба метода определены в классе TaskFactory:

var task1 = Task.Factory.StartNew (() => Console.Write ("X"));
var task2 = Task.Factory.StartNew (() => Console.Write ("Y"));
 
var continuation = Task.Factory.ContinueWhenAll (
   new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));

Этот пример кода выведет "Done" после вывода "XY" или "YX". Аргумент tasks в lambda-выражении дает Вам доступ к массиву завершенных задач, что полезно, когда antecedent-задачи возвращают данные. Следующий пример складывает вместе числа, возвращенные из двух antecedent-задач:

// В реальной жизни task1 и task2 вызывали сложные функции:
Task<int> task1 = Task.Factory.StartNew (() => 123);
Task<int> task2 = Task.Factory.StartNew (() => 456);
 
Task<int> task3 = Task<int>.Factory.ContinueWhenAll (
   new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result));
 
Console.WriteLine (task3.Result);         // 579

В этом примере мы включали аргумент типа < int> для вызова Task.Factory, чтобы разъяснить, что мы получаем обычную фабрику задачи (generic task factory). Аргумент типа не обязателен, потому что он будет выведен компилятором.

Несколько продолжений на одном предшественнике. Вызов ContinueWith больше одного раза на одной и той же задаче создает несколько продолжений (continuation-задач) на одном предшественнике (antecedent-задаче). Когда antecedent-задача завершится, то все continuation-задачи запустятся вместе (если Вы не указали TaskContinuationOptions.ExecuteSynchronously, в этом случае continuation-задачи будут выполняться последовательно).

Следующий код ждет 1 секунду, и затем выведет либо "XY", либо "YX":

var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
t.ContinueWith (ant => Console.Write ("X"));
t.ContinueWith (ant => Console.Write ("Y"));

Планировщики задач и интерфейс пользователя. Планировщик задач (scheduler) выделяет задачи потокам. Все задачи связаны с планировщиком задачи, представленным абстрактным классом TaskScheduler. Framework предоставляет две конкретные реализации: планировщик по умолчанию (default scheduler), который работает в тандеме с пулом потоков CLR, и планировщиком контекста синхронизации (synchronization context scheduler). Последний разработан (главным образом), чтобы помочь Вам с моделью потоков WPF и Windows Forms, которые требуют, чтобы к элементам графического интерфейса пользователя (UI) и органам его управления (controls) доступ осуществлялся только из создавшего их потока. Например предположим, что мы хотим получить некие данные из веб-сервиса в виде фонового действия, и затем обновить метку WPF с именем lblResult, отражающую результат этого действия. Мы можем поделить эту работу на 2 задачи (task):

1. Вызвать метод, чтобы получить данные из web-сервиса (предшествующая задача, antecedent task).
2. Обновить lblResult полученными результатами (задача продолжения, continuation task).

Если для задачи продолжения (continuation task) указываем планировщик контекста синхронизации, полученный, когда было сконструировано окно, мы можем безопасно обновить lblResult:

public partial class MyWindow : Window
{
   TaskScheduler _uiScheduler;   // Декларируем как поле, так что мы можем
                                 // использовать это всюду в нашем классе.
   public MyWindow()
   {    
      InitializeComponent();
 
      // Получение планировщика UI для потока, который создал форму:
      _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
 
      Task.Factory.StartNew< string> (SomeComplexWebService)
         .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
   }
 
   string SomeComplexWebService() { ... }
}

Также есть возможность написать свой собственный планировщик задач (через подкласс TaskScheduler), хотя это нечто такое, что Вы делали бы только в очень специализированных сценариях. Для пользовательского планирования Вы чаще всего использовали бы TaskCompletionSource, что мы скоро рассмотрим.

TaskFactory. Когда Вы вызываете Task.Factory, то создаете статическое свойство на Task, которое возвращает объект TaskFactory по умолчанию. Назначение task factory - создавать задачи, особенно три вида задач:

• "Обычные" задачи (через StartNew).
• Continuation-задачи с несколькими задачами предшественниками (через ContinueWhenAll и ContinueWhenAny).
• Задачи, которые обертывают методы, которые следуют за моделью асинхронного программирования (через FromAsync).

Интересно, что TaskFactory является единственным способом достичь последних двух целей. В случае StartNew это TaskFactory - просто удобство и технически избыточно, Вы можете просто инстанциировать объекты Task и вызвать на них Start.

Создание своих собственных task factorу. TaskFactory не абстрактная фабрика: Вы можете в действительности инстациировать этот класс, и это полезно, когда Вы хотите постоянно создавать задачи, используя те же самые (не стандартные) значения для TaskCreationOptions, TaskContinuationOptions или TaskScheduler. Например, если мы хотим неоднократно создавать долго работающие порожденные задачи, то мы могли бы создать пользовательскую фабрику следующим образом:

var factory = new TaskFactory (
   TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
   TaskContinuationOptions.None);

Создание задач тогда просто делается вызовом StartNew на фабрике:

Task task1 = factory.StartNew (Method1);
Task task2 = factory.StartNew (Method2);
...

Пользовательские опции продолжения применяются, когда вызывается ContinueWhenAll и ContinueWhenAny.

TaskCompletionSource. Класс Task достигает двух разных вещей:

• Планирует запуск делегата на потоке из пула.
• Предоставляет богатый набор функций для управления рабочими элементами (продолжения, дочерние задачи, перенаправление исключения и т. д.).

Интересно, что эти две вещи не сильно связаны: Вы можете задействовать функции задачи для управления рабочими элементами без планирования запуска чего-либо на пуле потоков. Класс, который включает этот шаблон использования, называется TaskCompletionSource.

Чтобы использовать TaskCompletionSource, Вы просто инстанциируете этот класс. Он предоставляет свойство Task, возвращающее задачу, на которой Вы можете ждать и подключать к ней продолжения - точно так же, как и с любой другой задачей. Однако задача полностью управляется объектом TaskCompletionSource через следующие методы:

public class TaskCompletionSource< TResult>
{
   public void SetResult (TResult result);
   public void SetException (Exception exception);
   public void SetCanceled();
 
   public bool TrySetResult (TResult result);
   public bool TrySetException (Exception exception);
   public bool TrySetCanceled();
   ...
}

При вызове больше одного раза SetResult, SetException или SetCanceled выбросят исключение; методы Try* вместо этого вернут false.

TResult соответствует типу результата задачи, так что TaskCompletionSource< int> даст Вам Task< int>. Если Вы хотите иметь задачу без результата, создайте объект TaskCompletionSource и передайте null, когда вызываете SetResult. Тогда Вы можете привести тип Task< object> к типу Task.

Следующий пример напечатает 123 после ожидания 5 секунд:

var source = new TaskCompletionSource<int>();
 
new Thread (() => { Thread.Sleep (5000); source.SetResult (123); })
   .Start();
 
Task< int> task = source.Task;      // Наша "подчиненная" задача.
Console.WriteLine (task.Result);    // 123

Позже мы покажем, как можно использовать BlockingCollection для написания очереди producer/consumer. Тогда мы продемонстрируем, как TaskCompletionSource улучшает решение, позволяя поставленным в очередь рабочим элементам осуществлять ожидание и отмену.

[Работа с AggregateException]

Как мы виделиPLINQ, класс Parallel и Tasks автоматически перенаправляют исключения потребителю данных (consumer). Чтобы увидеть, почему это важно, рассмотрим следующий запрос LINQ, который выбрасывает исключение DivideByZeroException в первой итерации:

try
{
   var query = from i in Enumerable.Range (0, 1000000)
               select 100 / i;
   ...
}
catch (DivideByZeroException)
{
   ...
}

Если мы будем использовать PLINQ для распараллеливания этого запроса с игнорированием обработки исключений, то DivideByZeroException вероятно было бы выброшено на отдельном потоке с пропуском нашего catch-блока, что привело бы к падению приложения.

Следовательно, исключения автоматически перехватываются и перенаправляются в вызывающий код. Но к сожалению, это не совсем так же просто, как перехват DivideByZeroException. Причина в том, что эти библиотеки задействуют многие потоки, и в действительности может быть так, что два или большее количество исключений могут быть выброшены одновременно. Для гарантии, что о всех исключениях будет сообщено, исключения обертываются в контейнер AggregateException, который предоставляет свойство InnerExceptions, содержащее перехваченное исключение (или исключения):

try
{
   var query = from i in ParallelEnumerable.Range (0, 1000000)
               select 100 / i;
   // Перечисление запроса
   ...
}
catch (AggregateException aex)
{
   foreach (Exception ex in aex.InnerExceptions)
      Console.WriteLine (ex.Message);
}

И PLINQ, и класс Parallel завершают запрос или выполнение цикла, когда встретится первое исключение - при этом не будут обработаны какие-либо другие элементы или прокрутки тела цикла. Однако может быть выброшено большее количество исключений перед завершением текущего цикла. Первое возникшее исключение AggregateException будет выдано в свойстве InnerException.

Flatten и Handle. Класс AggregateException предоставляет несколько методов для упрощения обработки исключения: Flatten и Handle.

Flatten. AggregateExceptions будут довольно часто содержать другие AggregateExceptions. Пример, когда это может случиться - если дочерняя задач выбросит исключение. Вы можете исключить любой уровень вложенности для упрощения обработки путем вызова Flatten. Этот метод вернет новый экземпляр AggregateException с простым плоским списком внутренних исключений:

catch (AggregateException aex)
{
   foreach (Exception ex in aex.Flatten().InnerExceptions)
      myLogWriter.LogException (ex);
}

Handle. Иногда полезно перехватывать только определенные типы исключений, чтобы другие типы выбрасывались заново. Метод Handle в AggregateException предоставляет ярлычок для этой функции. Он принимает предикат исключения, запускаемый с каждым внутренним исключением:

public void Handle (Func< Exception, bool> predicate)

Если этот предикат вернет true, то считается, что исключение "обработано". После того, как делегат запустится на каждом исключении, произойдет следующее:

• Если все исключения были "обработаны" (делегат вернул true), исключение не будет выброшено повторно.
• Если были какие-либо исключения, для которых делегат вернул false ("не обработано"), новый AggregateException будет построен из этих исключений, и они будут повторно выброшены.

Например, следующий код завершится повторным выбрасыванием другого AggregateException, которое содержит одно исключение NullReferenceException:

var parent = Task.Factory.StartNew (() => 
{
   // Выбросим 3 исключения сразу с использованием 3 дочерних задач:
   int[] numbers = { 0 };
 
   var childFactory = new TaskFactory
      (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
 
   childFactory.StartNew (() => 5 / numbers[0]);   // Деление на 0
   childFactory.StartNew (() => numbers [1]);      // Индекс вне диапазона
   childFactory.StartNew (() => { throw null; });  // Null-ссылка
});
 
try { parent.Wait(); }
catch (AggregateException aex)
{
   // Обратите внимание, что нам все еще нужно вызвать Flatten:
   aex.Flatten().Handle (ex =>
   {
      if (ex is DivideByZeroException)
      {
         Console.WriteLine ("Divide by zero");
         return true;               // Это исключение "обработано"
      }
      if (ex is IndexOutOfRangeException)
      {
         Console.WriteLine ("Index out of range");
         return true;               // Это исключение "обработано"
      }
      return false;           // Все другие исключения будут выброшены повторно
  });
}

[Конкурентные коллекции]

Framework 4.0 предоставляет несколько новых коллекций в пространстве имен System.Collections.Concurrent. Все они полностью безопасны для использования с потоками:

Конкурентная коллекция Не конкурентный эквивалент
ConcurrentStack< T> Stack< T>
ConcurrentQueue< T> Queue< T>
ConcurrentBag< T> нет аналога
BlockingCollection< T> нет аналога
ConcurrentDictionary< TKey,TValue> Dictionary< TKey,TValue>

Эти конкурентные коллекции могут быть иногда полезны в обычной многопоточности, когда Вам нужна thread-safe коллекция. Однако тут есть некоторые подводные камни:

• Конкурентные коллекции настроены на параллельное программирование. Стандартные коллекции выигрывают у них почти всегда, кроме очень конкурентных сценариев.
• Thread-safe коллекция не гарантирует, что использующий её код будет thread-safe.
• Если Вы делаете перечисление по конкурентной коллекции, в то время как другой поток её модифицирует, то не будет выброшено исключение. Вместо этого Вы получите смесь из старого и нового содержимого.
• Нет конкурентной версии для List< T>.
• Конкурентные классы стека, очереди и Bag внутренне реализованы как связанные списки. Это делает их не настолько эффективными по использованию памяти, как не конкурентные классы Stack и Queue, но лучше для конкурентного доступа, потому что связанные списки способствуют реализациям без блокировки или реализациям с минимумом блокировок (причина в том, что вставка узла в связанный список требует обновление нескольких ссылок, вто время как вставка элемента в структуру наподобие List< T> может потребовать перемещения в памяти тысяч существующих элементов.

Другими словами, эти коллекции просто не обеспечивают ярлыки для использования обычной коллекции с блокировкой. Для демонстрации, если мы запустим следующий код в одном потоке:

var d = new ConcurrentDictionary< int,int>();
for (int i = 0; i < 1000000; i++) d[i] = 123;

то он выполнится в 3 раза медленнее, чем этот код:

var d = new Dictionary< int,int>();
for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;

Однако чтение из ConcurrentDictionary будет быстрое, потому что чтение работает без блокировки.

Конкурентные коллекции также отличаются от обычных коллекций тем, что предоставляют специальные методы для выполнения атомарных операций проверка-и-действие, такие как TryPop. Большинство этих методов унифицированы через интерфейс IProducerConsumerCollection< T>.

IProducerConsumerCollection< T>. Коллекция producer/consumer это одна из коллекций, для которых существует два основных применения:

• Добавление элемента (функция "producer").
• Получение элемента с удалением его (функция "consumer").

Классические примеры - стеки и очереди. Коллекции producer/consumer важны в параллельном программировании, потому что они способствуют реализациям без блокировок.

Интерфейс IProducerConsumerCollection< T> представляет потокобезопасную коллекцию producer/consumer. Следующие классы реализуют этот интерфейс:

ConcurrentStack< T>
ConcurrentQueue< T>
ConcurrentBag< T>

IProducerConsumerCollection< T> расширяет ICollection, добавляя следующие методы:

void CopyTo (T[] array, int index);
T[] ToArray();
bool TryAdd (T item);
bool TryTake (out T item);

Методы TryAdd и TryTake проверяют, может ли быть выполнена операция добавление/удаление, и если так, что выполнит добавление/удаление. Проверка и действие выполняются атомарно, с устранением надобности в блокировке, которая была бы вокруг обычной коллекции:

int result;
lock (myStack) if (myStack.Count > 0) result = myStack.Pop();

TryTake вернет false, если коллекция пустая. TryAdd всегда выполнится успешно и вернет true в трех предоставленных реализациях. Однако если Вы напишете свою собственную коллекцию, которая запрещает дубликаты, то заставили бы TryAdd вернуть false, если элемент уже существует (если бы Вы писали конкурентный набор).

Определенный элемент, который удаляет TryTake, определяется через подкласс:

• Со стеком TryTake удаляет тот элемент, который был добавлен самым последним.
• С очередью TryTake удаляет тот элемент, который был добавлен самым первым.
• Вместе с bag TryTake удаляет любой элемент, который может быть удален максимально эффективно.

Эти три конкретные класса часто явно реализуют методы TryTake и TryAdd, предоставляя тот же функционал через такие более специфические публичные методы, как TryDequeue и TryPop.

ConcurrentBag< T>. ConcurrentBag< T> сохраняет не упорядоченную коллекцию объектов (с разрешенными дубликатами). ConcurrentBag< T> подходит для ситуаций, когда Вам не нужно заботиться о том, какой элемент Вы получите при вызове Take или TryTake.

Преимущество ConcurrentBag< T> над конкурентной очередью или стеком в том, что метод Add почти никогда не приводит к состязанию при вызове более одного раза. В отличие от этого вызов Add параллельно в очереди или в стеке вводит некое состязание (хотя меньшее, чем блокировка вокруг не конкурентных коллекций). Вызов Take на конкурентном bag также очень эффективно - пока каждый поток не берет больше элементов, чем было добавлено.

Внутри конкурентного bag каждый поток получает свой собственный приватный связанный список. Элементы, добавленные к этому приватному списку, которые принадлежат потоку, вызвавшему Add, устраняют состязание. Когда Вы перечисляете содержимое bag, перечислитель прогуливается через приватный список каждого потока, уступая каждый из их элементов поочередно.

Когда Вы вызываете Take, bag сначала смотрит на приватный список текущего потока. Если в нем как минимум один элемент, он легко может выполнить задачу, и (в большинстве случаев) без состязания. Но если этот список пуст, он должен "украсть" элемент из приватного списка другого потока, что потенциально может привести к состязанию.

Таким образом, для точности, вызов Take даст нам элемент, добавленный последний раз в этом потоке; если нет элементов в этом потоке, то он даст Вам элемент, добавленным последним в другом потоке, выбранном случайно.

Конкурентные bag идеальный выбор, когда параллельная операция над Вашей коллекцией чаще всего состоит в добавлении элементов - или когда добавления (Add) и взятия (Take) на потоке сбалансированы. Мы ранее видели подобный пример, когда использовали Parallel.ForEach для реализации параллельной проверки орфографии:

var misspellings = new ConcurrentBag< Tuple< int,string>>();
 
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
   if (!wordLookup.Contains (word))
      misspellings.Add (Tuple.Create ((int) i, word));
});

Конкурентный bag был бы плохим выбором для очереди producer/consumer, потому что элементы добавляются и удаляются в разных потоках.

BlockingCollection< T>. Если Вы вызовите TryTake на любой коллекции producer/consumer, которые мы обсуждали ранее:

ConcurrentStack< T>
ConcurrentQueue< T>
ConcurrentBag< T>

и коллекция пустая, то метод вернет false. Иногда было бы полезнее в таком сценарии ждать, пока элемент не станет доступным.

Вместо перегрузки методов TryTake с такой функциональностью (которая вызвала бы вызвала бы прорыв участников после разрешения маркеров отмени и таймаутов), разработчики PFX инкапсулировали этот функционал в оборачивающий класс BlockingCollection< T>. Блокирующая коллекция обертывает любую коллекцию, которая реализует IProducerConsumerCollection< T>, и позволяет Вам взять (Take) элемент из обернутой коллекции - с блокировкой, если нет доступных элементов.

Блокирующая коллекция также позволяет Вам ограничить локальный размер коллекции, блокируя продюсера, если превышен этот размер. Коллекция, ограниченная таким способа, называется набором, ограниченным блокировкой (bounded blocking collection).

Чтобы использовать BlockingCollection< T>:

1. Инстанциируйте класс, опционально указывая IProducerConsumerCollection< T> для обертки и максимальный размер (границу) коллекции.
2. Вызовите Add или TryAdd, чтобы добавить элементы в нижележащую коллекцию.
3. Вызовите Take или TryTake для удаления (потребления) элементов из нижележащей коллекции.

Если Вы вызовите конструктор без передачи коллекции, то этот класс автоматически инстанциирует ConcurrentQueue< T>. Методы генерации (producing) и потребления (consuming) позволят Вам указать маркеры отмены (cancellation tokens) и таймауты. Add и TryAdd могут вызвать блокировку, если размер коллекции ограничен; Take и TryTake будут вызывать блокировку, пока коллекция пуста.

Другой способ потреблять элементы - вызвать GetConsumingEnumerable. Это вернет (потенциально) бесконечную последовательность, которая выдает элементы по мере их доступности. Вы можете принудить последовательность завершиться вызовом CompleteAdding: этот метод также предотвратит от постановки в очередь дальнейших элементов.

Ранее мы написали очередь producer/consumer с использованием Wait и Pulse. Вот тот же самый класс, переработанный для использования BlockingCollection< T> (обработка исключений оставлена в стороне):

public class PCQueue : IDisposable
{
   BlockingCollection _taskQ = new BlockingCollection(); 
   public PCQueue (int workerCount)
   {
      // Создание и запуск отдельной задачи (Task) для каждого consumer:
      for (int i = 0; i < workerCount; i++)
         Task.Factory.StartNew (Consume);
   }
 
   public void Dispose() { _taskQ.CompleteAdding(); }
 
   public void EnqueueTask (Action action) { _taskQ.Add (action); }
 
   void Consume()
   {
      // Эта последовательность, которую мы перечисляем, блокируется,
      // когда нет доступных элементов, и её мы завершим вызовом CompleteAdding.
      foreach (Action action in _taskQ.GetConsumingEnumerable())
         action();      // Выполнение задачи.
   }
}

Из-за того, что мы ничего не передали в конструктор BlockingCollection, автоматически инстанциируется конкурентная очередь. Если бы мы передали ConcurrentStack, то завершились бы стеком producer/consumer.

BlockingCollection также предоставляет статические методы AddToAny и TakeFromAny, которые позволяют нам добавлять или получать элемент при указании нескольких блокирующих коллекций. Действие action тогда сработает на первой коллекции, которая в состоянии обслужить запрос.

Эффективное использование TaskCompletionSource. Пример producer/consumer, которые только что написали, недостаточно гибкий в том, что мы не можем отслеживать рабочие элементы после того, как они поставлены в очередь. Было бы хорошо, если бы мы могли:

• Узнать, когда рабочий элемент завершен.
• Отмена не запустившихся рабочих элементов.
• Элегантно обработать любые исключения, выбрасываемые рабочим элементом.

Идеальное решение состояло бы в возврате методом EnqueueTask некоторого объекта, который давал бы этот описанный функционал. Хорошая новость в том, что такой класс есть, и это все он уже делает - класс Task. Все, что нам нужно - перехватить управление задачей через TaskCompletionSource:

public class PCQueue : IDisposable
{
   class WorkItem
   {
      public readonly TaskCompletionSource< object> TaskSource;
      public readonly Action Action;
      public readonly CancellationToken? CancelToken;
 
      public WorkItem (
         TaskCompletionSource< object> taskSource,
         Action action,
         CancellationToken? cancelToken)
      {
         TaskSource = taskSource;
         Action = action;
         CancelToken = cancelToken;
      }
   }
 
   BlockingCollection< WorkItem> _taskQ = new BlockingCollection< WorkItem>();
 
   public PCQueue (int workerCount)
   {
      // Создание и запуск отдельной Task для каждого потребителя (consumer):
      for (int i = 0; i < workerCount; i++)
         Task.Factory.StartNew (Consume);
   }
 
   public void Dispose() { _taskQ.CompleteAdding(); }
 
   public Task EnqueueTask (Action action) 
   {
      return EnqueueTask (action, null);
   }
 
   public Task EnqueueTask (Action action, CancellationToken? cancelToken)
   {
      var tcs = new TaskCompletionSource< object>();
      _taskQ.Add (new WorkItem (tcs, action, cancelToken));
      return tcs.Task;
   }
 
   void Consume()
   {
      foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
      {
         if (workItem.CancelToken.HasValue && 
            workItem.CancelToken.Value.IsCancellationRequested)
         {
            workItem.TaskSource.SetCanceled();
         }
         else
         try
         {
            workItem.Action();
            workItem.TaskSource.SetResult (null);  // Показывает завершение
         }
         catch (OperationCanceledException ex)
         {
            if (ex.CancellationToken == workItem.CancelToken)
               workItem.TaskSource.SetCanceled();
            else
               workItem.TaskSource.SetException (ex);
         }
         catch (Exception ex)
         {
            workItem.TaskSource.SetException (ex);
         }
      }
   }
}

В EnqueueTask мы ставим в очередь рабочий элемент, который инкапсулирует целевой делегат и источник завершения задачи - который позволяет нам позже управлять задачей, которую мы возвращаем потребителю (consumer).

В Consume мы сначала проверяем, была ли задача (task) отменена после исключения из очереди рабочего элемента. Если нет, то мы запускаем делегата и затем вызываем SetResult на источнике завершения задачи, чтобы показать её завершение.

Теперь мы можем использовать этот класс:

var pcQ = new PCQueue (1);
Task task = pcQ.EnqueueTask (() => Console.WriteLine ("Easy!"));
...

Мы сейчас можем ждать задачу, выполнять на ней продолжения, распространять исключения на продолжения родительских задач, и так далее. Другими словами, у нас есть богатство модели задачи с эффектом реализации собственного планировщика.

[SpinLock и SpinWait]

В параллельном программировании частный случай прокруток часто предпочтительнее блокировки, так как это позволяет избежать переключения контекста и переходы состояния ядра. SpinLock и SpinWait разработаны для помощи в таких случаях. Их главное назначение - написание пользовательских конструкций синхронизации.

SpinLock и SpinWait это структуры, а не классы! Это дизайнерское решение относится к экстремальной технике оптимизации, предназначенной для того, чтобы избежать накладных расходов косвенной адресации и сборки мусора. Это означает, что Вы должны остерегаться неумышленно копировать экземпляры - передавая их, например, в другой метод без модификатора ссылки (ref), или декларируя их как поля только для чтения (readonly). Это в частности важно для случая SpinLock.

SpinLock. Эта структура позволяет Вам выполнить блокировку без трат на переключение контекста, ценой оставления потока в циклах (бесполезная занятость). Этот метод допустим в сценариях высокой конкуренции, когда блокировка будет очень короткой (например при написании с нуля потокобезопасных связанных списков).

Если Вы оставите spinlock на удержании слишком долго (самое большее на миллисекунду), это сделает её время сравнимым со слайсом времени, порождая переключение контекста точно так же, как это было бы с обычной блокировкой. Когда будет запланировано новое выполнение этого кода планировщиком, этот код снова уступит управление - получится продолжающийся "цикл уступок". Это будет потреблять гораздо меньше ресурсов CPU, чем прямая прокрутка - но больше, чем блокирование.

На одноядерной машине spinlock начнет "цикл уступок" немедленно, если имеет место спор за ресурс.

Использование SpinLock происходит подобно использованию обычной блокировки, кроме:

• Spinlock это структура (как было упомянуто ранее).
• Spinlock не реентрантна. Это означает, что Вы не можете вызвать Enter на одной и той же SpinLock дважды подряд в одном и том же потоке. Если нарушить это правило, то либо будет выброшено исключение (эли это разрешено отслеживающим владельцем), либо произойдет deadlock (если отслеживание владельцем запрещено). Вы можете указать, разрешать ли отслеживание владельцем, когда конструируете spinlock. Отслеживание ухудшает производительность.
• SpinLock позволяет Вам опрашивать, взята ли блокировка, через свойство IsHeld, и если отслеживание владельцем разрешено, через IsHeldByCurrentThread.
• Нет эквивалента оператору lock языка C#, чтобы предоставить синтаксический сахар для SpinLock.

Другое отличие в том, что когда Вы вызвали Enter, то должны следовать шаблону устойчивости с предоставлением аргумента lockTaken (который почти всегда делается с блоком try/finally).

Вот пример:

var spinLock = new SpinLock (true);    // Разрешено отслеживание владельцем
bool lockTaken = false;
try
{
   spinLock.Enter (ref lockTaken);
   // Тут какие-то действия...
}
finally
{
   if (lockTaken) spinLock.Exit();
}

Как с обычной блокировкой lock, значение lockTaken будет false после вызова Enter если (и только если) метод Enter выбросит исключение и блокировка не была взята. Это происходит в очень редких сценариях (таких как вызов Abort на потоке, или когда выброшено исключение OutOfMemoryException), и позволит Вам надежно узнать, вызван ли был впоследствии Exit.

SpinLock также предоставляет метод TryEnter, который принимает таймаут.

Учитывая неловкую семантику SpinLock из-за его типа значения и отсутствие поддержки со стороны языка, создается впечатление, чтобы мы страдали каждый раз при использовании SpinLock! Дважды подумайте, прежде чем отказываться от обычной блокировки.

SpinLock имеет смысл чаще всего при написании Ваших собственных конструкций синхронизации. И даже тогда spinlock не настолько полезен, как это звучит. Он все еще ограничивает конкуренцию, и зря тратит время CPU, не делая ничего полезного. Часто для лучший выбор использовать это время, делая что-то спекулятивное - с помощью SpinWait.

SpinWait. Это помогает написать код без блокировки, который вместо этого прокручивает циклы. В результате избегаем опасности исчерпания ресурсов системы и инверсии приоритета, что иначе могло бы возникнуть в случае SpinLock.

Программирование без блокировок с помощью SpinWait довольно трудное, поскольку она предназначена для решения проблем многопоточности, которые не решит никакая другая конструкция более высокого уровня. Но сначала нужно понять, что такое не блокирующая синхронизация [5].

Почему нам нужна SpinWait. Предположим, что мы пишем систему сигнализации на простом флаге, основанную на цикле:

bool _proceed;
void Test()
{
   // Цикл, пока другой поток не установит _proceed в значение true:
   while (!_proceed) Thread.MemoryBarrier();
   ...
}

Это всегда будет очень эффективно, если Test запустится, когда _proceed уже был установлен в true, или установка _proceed произойдет быстро, в течение нескольких циклов. Но теперь предположим, что _proceed остался в значении false на несколько секунд - и 4 потока вызвали Test одновременно. Тогда эти циклы полностью забьют процессорное время четырехядерного CPU! Это приведет к тому, что другие потоки будут работать очень медленно (нехватка ресурсов системы) - включая тот самый поток, который должен был бы установить _proceed в true (инверсия приоритета). Эта ситуация обостряется на одноядерных машинах, где прокрутки почти всегда приведут к смене приоритетов (и хотя сегодня одноядерные машины встречаются редко, одноядерные виртуальные машины обычный случай).

SpinWait решает эти проблемы двумя способами. Во-первых, она ограничивает интенсивную трату ресурсов CPU на прокрутки цикла путем установки количества итераций, после которых слайс времени будет уступаться после каждой прокрутки (вызовом Thread.Yield и Thread.Sleep), снижая потребление процессорного времени. Во-вторых, она определяет, работает ли программа на одноядерной машине, и если это так, то уступка происходит на каждом цикле.

Как использовать SpinWait. Есть для этого два способа. Первый заключается в вызове статического метода SpinUntil. Этот метод принимает предикат (и опционально таймаут):

bool _proceed;
void Test()
{
   SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });
   ...
}

Другой способ (более гибкий) использования SpinWait - инстанциация структуры и затем вызов SpinOnce в цикле:

bool _proceed;
void Test()
{
   var spinWait = new SpinWait();
   while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }
   ...
}

Первый способ является ярлычком второго.

Как работает SpinWait. В своей текущей реализации SpinWait выполняет жесткие прокрутки циклов CPU для 10 итераций перед тем, как уступить контекст другим потокам. Однако она не возвращает немедленно управление в вызывающий код после каждого из таких циклов: вместо этого она вызывает Thread.SpinWait чтобы выполнить прокрутку через CLR (и в конечном счете через операционную систему) для установки периода времени. Этот период времени изначально равен несколько десятков наносекунд, но удваивается с каждой итерацией, пока не произойдет 10 итераций. Это гарантирует некую предсказуемость на трате процессорного времени в фазе интенсивных циклов CPU, которую CLR и операционная система может настроить в соответствии с рабочими условиями. Как правило это занимает интервал немногих десятков микросекунд - мало, но больше, чем цена переключение контекста.

На одноядерной машине SpinWait уступает контекст на каждой итерации. Вы можете проверить, уступит ли SpinWait контекст на следующей итерации, путем проверки свойства NextSpinWillYield.

Если SpinWait остается в режиме "цикла с уступками" достаточно долго (возможно 20 циклов), то она будет периодически засыпать на несколько миллисекунд, чтобы дополнительно экономить ресурсы и помочь выполняться другим потокам.

Обновления без блокировки с помощью SpinWait и Interlocked.CompareExchange. SpinWait вместе с Interlocked.CompareExchange может атомарно обновлять поля значением, вычисленным относительно его оригинального значения (операция read-modify-write). Для примера предположим, что мы хотим умножить поле x на 10. Если это сделать напрямую, то это не будет потокобезопасно:

x = x * 10;

По той же причине инкремент поля не будет потокобезопасным, как это уже обсуждалось в описании не блокирующей синхронизации [5].

Вот так это правильно делается без блокировки:

1. Берется снимок x в локальную переменную.
2. Вычисляется новое значение (в этом случае снимок умножается на 10).
3. Записывается вычисленное значение обратно, если снимок все еще актуален  (этот шаг должен быть сделан атомарно вызовом Interlocked.CompareExchange).
4. Если снимок устарел, то возврат к шагу 1.

Например:

int x;
 
void MultiplyXBy (int factor)
{
   var spinWait = new SpinWait();
   while (true)
   {
      int snapshot1 = x;
      Thread.MemoryBarrier();
      int calc = snapshot1 * factor;
      int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1);
      if (snapshot1 == snapshot2) return;    // Никто нас не вытеснял.
      spinWait.SpinOnce();
   }
}

Мы (немного) можем улучшить производительность с помощью вызова Thread.MemoryBarrier. Нам это может сойти с рук, потому что CompareExchange все равно генерирует барьер памяти - так что самое худшее, что может случиться, это дополнительная прокрутка, если окажется, что в snapshot1 прочитано устаревшее на предыдущей итерации значение.

Interlocked.CompareExchange обновляет поле указанным значением, если текущее значение поля совпадает с третьим аргументом. Тогда будет возвращено старое значение поля, так что Вы можете проверить успех обновления путем сравнения с оригинальным snapshot. Если значения различаются, то это означает, что другой поток вытеснил Вас, тогда нужно вернуться в начало и сделать другую попытку модификации значения.

CompareExchange перегружен, чтобы также работать с типом object. Мы можем использовать эту перегрузку путем написания метода обновления без блокировки, который работает со всеми ссылочными типами:

static void LockFreeUpdate< T> (ref T field, Func < T, T> updateFunction)
   where T : class
{
   var spinWait = new SpinWait();
   while (true)
   {
      T snapshot1 = field;
      T calc = updateFunction (snapshot1);
      T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1);
      if (snapshot1 == snapshot2) return;
      spinWait.SpinOnce();
   }
}

Вот так мы можем использовать этот метод, чтобы написать потокобезопасное событие без блокировок (это то, что фактически по умолчанию с событиями делает компилятор C# 4.0):

EventHandler _someDelegate;
public event EventHandler SomeEvent
{
   add    { LockFreeUpdate (ref _someDelegate, d => d + value); }
   remove { LockFreeUpdate (ref _someDelegate, d => d - value); }
}

SpinWait против SpinLock. Мы могли бы решить эти проблемы по-другому, обернув доступ к общему полю вокруг SpinLock. Хотя проблема со spin locking состоит в том, что это позволяет в любой момент времени выполнять только один поток - даже при том, что spinlock (обычно) устраняет накладные расходы на переключение контекста. Вместе со SpinWait мы можем выполнить поток спекулятивно и предположить отсутствие состязания. Если нас вытеснили, то просто попытаемся снова. Трата времени CPU на что-то полезное намного лучше, чем трата его на spinlock!

В завершение рассмотрим следующий класс:

class Test
{
   ProgressStatus _status = new ProgressStatus (0, "Starting");
 
   class ProgressStatus    // Не измененный класс
   {
      public readonly int PercentComplete;
      public readonly string StatusMessage;
 
      public ProgressStatus (int percentComplete, string statusMessage)
      {
         PercentComplete = percentComplete;
         StatusMessage = statusMessage;
      }
   }
}

Мы можем использовать наш метод LockFreeUpdate для "инкремента" поля PercentComplete в _status следующим образом:

LockFreeUpdate (ref _status,
   s => new ProgressStatus (s.PercentComplete + 1, s.StatusMessage));

Обратите внимание, что мы создаем новый объект ProgressStatus на основе существующих значений. Благодаря методу LockFreeUpdate акт чтения существующего значения PercentComplete, его инкремент запись обратно не могут быть вытеснены небезопасно: любое вытеснение будет надежно детектировано, что вызовет прокрутку и повторную попытку.

[Ссылки]

1. Threading in C# PART 5: PARALLEL PROGRAMMING site:albahari.com.
2. Потоки на C#. Часть 1: введение.
3. Потоки на C#. Часть 2: основы синхронизации.
4. Потоки на C#. Часть 3: использование потоков.
5. Потоки на C#. Часть 4: продвинутое использование.
6. The .NET Programmer’s Playground site:linqpad.net.
7. Закон Амдала site:wikipedia.org.

 

Добавить комментарий


Защитный код
Обновить

Top of Page