C#: использование очереди для взаимодействия потоков |
![]() |
Добавил(а) microsin |
В приложении для обеспечения разделения данных и поддержки отзывчивости интерфейса обычно используют несколько потоков. Один из них обрабатывает интерфейс пользователя - клики на кнопках, перерисовку визуальных элементов формы, а другой (такой как например BackgroundWorker [1] или Thread [2]) выполняет фоновые операции - обмен через периферийные устройства, декодирование данных, обработку сетевого трафика и т. п. Для передачи информации между потоками используют стандартные методы ожидания события, такие как мьютексы, семафоры, очереди. В этой статье раccматриваются очереди Queue для организации блокировки потока в ожидании поступления данных. [Класс-обертка для очереди] Для удобства использования был создан класс TQueue на базе класса Queue. Он позволяет создавать очереди из объектов произвольного типа. // Модуль Queue.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Windows.Forms; using System.IO.Ports; using System.Threading; using System.ComponentModel; namespace mynamespace { public class TQueue< T> { private readonly Queue< T> queue = new Queue< T>(); private readonly int maxSize; public TQueue(int maxSize) { this.maxSize = maxSize; } public void Enqueue(T item) { lock (queue) { while (queue.Count >= maxSize) { queue.Clear(); } queue.Enqueue(item); if (queue.Count == 1) { // Разбудить все, что заблокировалось // на очереди: Monitor.PulseAll(queue); } } } public bool Dequeue(out T item, int timeout) { item = default(T); lock (queue) { while (queue.Count == 0) { if (Monitor.Wait(queue, timeout) == false) return false; } item = queue.Dequeue(); if (queue.Count == maxSize - 1) { // Разбудить все, что заблокировалось // на очереди: Monitor.PulseAll(queue); } return true; } } public int Count() { int count; lock (queue) { count = queue.Count; } return (count); } public void Clear() { lock (queue) { queue.Clear(); } } } } Рассмотрим пример, в котором один поток (BackgroundWorker parserTask, находится в классе COMportReader) принимает данные из последовательного порта. Если в начале принятой строки поток находит символ #, то посылает посылает их в очередь из строк StringQueue, которая реализована как публичное свойство класса COMportReader. namespace mynamespace { public class COMportReader { // Сообщения, полученные от COM-порта: public TQueue< string> StringQueue; private BackgroundWorker parserTask; static bool _continue; log logfile; SerialPort _serialPort; public COMportReader(String port) { logfile = new log(this.GetType().ToString(), "logapp.txt"); StringQueue = new TQueue< string>(20); _serialPort = new SerialPort(); _serialPort.PortName = port; _serialPort.BaudRate = 115200; _serialPort.Parity = Parity.None; _serialPort.DataBits = 8; _serialPort.StopBits = StopBits.One; _serialPort.Handshake = Handshake.None; _serialPort.ReadTimeout = 500; _serialPort.WriteTimeout = 500; _serialPort.Encoding = Encoding.Default; _serialPort.NewLine = "\n"; // Парсер - поток, которые получает данные от последовательного порта. parserTask = new BackgroundWorker(); parserTask.DoWork += ReadData; } public bool IsOpen { get { return _serialPort.IsOpen; } } public bool Open() { bool retval = false; try { _serialPort.Open(); _continue = true; if (!parserTask.IsBusy) { parserTask.RunWorkerAsync(); } retval = true; } catch (Exception ex) { logfile.write(ex.Message.ToString()); } return (retval); } public bool Close() { bool retval = false; _continue = false; // Если адаптер был открыт, закроем его if (_serialPort.IsOpen) { lock (_serialPort) { _serialPort.Close(); } } // Если адаптер закрыт, вернем true if (!_serialPort.IsOpen) { retval = true; } return retval; } private void ReadData(object pSender, DoWorkEventArgs pEventArgs) { while (_continue) { String message = ""; char[] charsToTrim = {'\n', ' ', '\r'}; try { message = _serialPort.ReadLine(); message = message.Trim(charsToTrim); char firstsym = message[0]; switch (firstsym) { case '#': StringQueue.Enqueue(message); break; default: logfile.write(message); break; } } catch { } } } } } На этой очереди StringQueue блокируется другой поток BackgroundWorker, метод цикла taskDoWork которого блокируется на очереди StringQueue в ожидании поступления данных. Блокировка на очереди осуществляется вызовом StringQueue.Dequeue. Таймаут блокировки указан 100 миллисекунд. Как только в очереди оказываются новые данные, поток немедленно разблокируется до истечения таймаута. private void taskDoWork(object pSender, DoWorkEventArgs pEventArgs) { String datastr; while (true) { // Блокировка на очереди в течение 100 мс: COMportReader.StringQueue.Dequeue(out datastr, 100); if (null != datastr) { // Обработка данных из очереди, полученные // через строку datastr: ... } } } Класс TQueue позволяет создавать очередь из более сложных элементов. К примеру, в очередь можно помещать экземпляры классов. Ниже показано, как это делается на примере класса DataMessage. // Класс DataMessage:
namespace mynamespace { public enum MessageType { INFO, EVENT, ERR_TIMEOUT, ERR_BAD_CRC, ERR_UNKNOWN } class DataMessage { public DateTime Время; public String Сигнал; public bool РелеАктивировано; public MessageType ТипСообщения; // Конструктор класса, который заполняет свои поля // декодированными данными из datastr: public DataMessage(String datastr) { Время = DateTime.Now; Сигнал = ""; РелеАктивировано = false; ТипСообщения = MessageType.ERR_UNKNOWN; String[] words; switch (datastr[1]) { case 'B': ТипСообщения = MessageType.EVENT; Сигнал = datastr.Substring(3); break; case 'M': ТипСообщения = MessageType.EVENT; if ("UP" == datastr.Substring(3, 2)) { words = datastr.Substring(6).Split(new char[] { '#' }); if ("relayON" == words[0]) { РелеАктивировано = true; } } else if ("DN" == datastr.Substring(3, 2)) { words = datastr.Substring(6).Split(new char[] { '#' }); if ("relayOFF" == words[0]) { РелеАктивировано = false; } } break; case 'E': words = datastr.Substring(6).Split(new char[] { '#' }); if ("TIME" == words[0]) ТипСообщения = MessageType.ERR_TIMEOUT; else if ("CRC" == words[0]) ТипСообщения = MessageType.ERR_BAD_CRC; break; case 'T': ТипСообщения = MessageType.INFO; Сигнал = datastr; break; default: logfile.write("Ошибка декодирования " + datastr); break; } } } } // Создание очереди из классов DataMessage: DecodedMessageQueue = new TQueue< DataMessage>(20); ... // Постановка сообщений в очередь: DataMessage datamessage = new DataMessage(datastr); DecodedMessageQueue.Enqueue(datamessage); // Передача события новых данных datamessage в форму, где
// они будут отображаться на визуальных компонентах
// интерфейса приложения. guiTask.ReportProgress(0); ... // Обработчик прогресса guiTask, который имеет доступ
// в интерфейсу GUI:
void guiTaskProgressHandler(object sender, ProgressChangedEventArgs pEventArgs) { DataMessage datamessage; // Блокировка на очереди в течение 1 секунды: DecodedMessageQueue.Dequeue(out datamessage, 1000); CultureInfo ci = new CultureInfo("en-us"); switch(datamessage.ТипСообщения) { case MessageType.EVENT: if ("" != datamessage.Сигнал) { текст = datamessage.Сигнал; ДобавитьСтроку(текст); } else if (datamessage.РелеАктивировано) { ДобавитьСтрокуСостоянияРеле(datamessage.РелеАктивировано); } break; case MessageType.ERR_TIMEOUT: ДобавитьСтрокуОшибки("Таймаут"); break; case MessageType.ERR_BAD_CRC: ДобавитьСтрокуОшибки("Не совпала контрольная сумма"); break; case MessageType.INFO: СтрокаСтатуса.Text = datamessage.Сигнал; break; default: ДобавитьСтрокуОшибки("Неизвестная ошибка"); break; } } [Ссылки] 1. Visual Studio C++ 2010 Express: BackgroundWorker Class. |