В приложении для обеспечения разделения данных и поддержки отзывчивости интерфейса обычно используют несколько потоков. Один из них обрабатывает интерфейс пользователя - клики на кнопках, перерисовку визуальных элементов формы, а другой (такой как например BackgroundWorker [1] или Thread [2]) выполняет фоновые операции - обмен через периферийные устройства, декодирование данных, обработку сетевого трафика и т. п. Для передачи информации между потоками используют стандартные методы ожидания события, такие как мьютексы, семафоры, очереди. В этой статье раccматриваются очереди Queue для организации блокировки потока в ожидании поступления данных.
[Класс-обертка для очереди]
Для удобства использования был создан класс TQueue на базе класса Queue. Он позволяет создавать очереди из объектов произвольного типа.
// Модуль Queue.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using System.IO.Ports;
using System.Threading;
using System.ComponentModel;
namespace mynamespace
{
public class TQueue< T>
{
private readonly Queue< T> queue = new Queue< T>();
private readonly int maxSize;
public TQueue(int maxSize) { this.maxSize = maxSize; }
public void Enqueue(T item)
{
lock (queue)
{
while (queue.Count >= maxSize)
{
queue.Clear();
}
queue.Enqueue(item);
if (queue.Count == 1)
{
// Разбудить все, что заблокировалось
// на очереди:
Monitor.PulseAll(queue);
}
}
}
public bool Dequeue(out T item, int timeout)
{
item = default(T);
lock (queue)
{
while (queue.Count == 0)
{
if (Monitor.Wait(queue, timeout) == false) return false;
}
item = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// Разбудить все, что заблокировалось
// на очереди:
Monitor.PulseAll(queue);
}
return true;
}
}
public int Count()
{
int count;
lock (queue)
{
count = queue.Count;
}
return (count);
}
public void Clear()
{
lock (queue)
{
queue.Clear();
}
}
}
}
Рассмотрим пример, в котором один поток (BackgroundWorker parserTask, находится в классе COMportReader) принимает данные из последовательного порта. Если в начале принятой строки поток находит символ #, то посылает посылает их в очередь из строк StringQueue, которая реализована как публичное свойство класса COMportReader.
namespace mynamespace
{
public class COMportReader
{
// Сообщения, полученные от COM-порта:
public TQueue< string> StringQueue;
private BackgroundWorker parserTask;
static bool _continue;
log logfile;
SerialPort _serialPort;
public COMportReader(String port)
{
logfile = new log(this.GetType().ToString(), "logapp.txt");
StringQueue = new TQueue< string>(20);
_serialPort = new SerialPort();
_serialPort.PortName = port;
_serialPort.BaudRate = 115200;
_serialPort.Parity = Parity.None;
_serialPort.DataBits = 8;
_serialPort.StopBits = StopBits.One;
_serialPort.Handshake = Handshake.None;
_serialPort.ReadTimeout = 500;
_serialPort.WriteTimeout = 500;
_serialPort.Encoding = Encoding.Default;
_serialPort.NewLine = "\n";
// Парсер - поток, которые получает данные от последовательного порта.
parserTask = new BackgroundWorker();
parserTask.DoWork += ReadData;
}
public bool IsOpen
{
get { return _serialPort.IsOpen; }
}
public bool Open()
{
bool retval = false;
try
{
_serialPort.Open();
_continue = true;
if (!parserTask.IsBusy)
{
parserTask.RunWorkerAsync();
}
retval = true;
}
catch (Exception ex)
{
logfile.write(ex.Message.ToString());
}
return (retval);
}
public bool Close()
{
bool retval = false;
_continue = false;
// Если адаптер был открыт, закроем его
if (_serialPort.IsOpen)
{
lock (_serialPort)
{
_serialPort.Close();
}
}
// Если адаптер закрыт, вернем true
if (!_serialPort.IsOpen)
{
retval = true;
}
return retval;
}
private void ReadData(object pSender, DoWorkEventArgs pEventArgs)
{
while (_continue)
{
String message = "";
char[] charsToTrim = {'\n', ' ', '\r'};
try
{
message = _serialPort.ReadLine();
message = message.Trim(charsToTrim);
char firstsym = message[0];
switch (firstsym)
{
case '#':
StringQueue.Enqueue(message);
break;
default:
logfile.write(message);
break;
}
}
catch
{
}
}
}
}
}
На этой очереди StringQueue блокируется другой поток BackgroundWorker, метод цикла taskDoWork которого блокируется на очереди StringQueue в ожидании поступления данных. Блокировка на очереди осуществляется вызовом StringQueue.Dequeue. Таймаут блокировки указан 100 миллисекунд. Как только в очереди оказываются новые данные, поток немедленно разблокируется до истечения таймаута.
private void taskDoWork(object pSender, DoWorkEventArgs pEventArgs)
{
String datastr;
while (true)
{
// Блокировка на очереди в течение 100 мс:
COMportReader.StringQueue.Dequeue(out datastr, 100);
if (null != datastr)
{
// Обработка данных из очереди, полученные
// через строку datastr:
...
}
}
}
Класс TQueue позволяет создавать очередь из более сложных элементов. К примеру, в очередь можно помещать экземпляры классов. Ниже показано, как это делается на примере класса DataMessage.
// Класс DataMessage:
namespace mynamespace
{
public enum MessageType
{
INFO,
EVENT,
ERR_TIMEOUT,
ERR_BAD_CRC,
ERR_UNKNOWN
}
class DataMessage
{
public DateTime Время;
public String Сигнал;
public bool РелеАктивировано;
public MessageType ТипСообщения;
// Конструктор класса, который заполняет свои поля
// декодированными данными из datastr:
public DataMessage(String datastr)
{
Время = DateTime.Now;
Сигнал = "";
РелеАктивировано = false;
ТипСообщения = MessageType.ERR_UNKNOWN;
String[] words;
switch (datastr[1])
{
case 'B':
ТипСообщения = MessageType.EVENT;
Сигнал = datastr.Substring(3);
break;
case 'M':
ТипСообщения = MessageType.EVENT;
if ("UP" == datastr.Substring(3, 2))
{
words = datastr.Substring(6).Split(new char[] { '#' });
if ("relayON" == words[0])
{
РелеАктивировано = true;
}
}
else if ("DN" == datastr.Substring(3, 2))
{
words = datastr.Substring(6).Split(new char[] { '#' });
if ("relayOFF" == words[0])
{
РелеАктивировано = false;
}
}
break;
case 'E':
words = datastr.Substring(6).Split(new char[] { '#' });
if ("TIME" == words[0])
ТипСообщения = MessageType.ERR_TIMEOUT;
else if ("CRC" == words[0])
ТипСообщения = MessageType.ERR_BAD_CRC;
break;
case 'T':
ТипСообщения = MessageType.INFO;
Сигнал = datastr;
break;
default:
logfile.write("Ошибка декодирования " + datastr);
break;
}
}
}
}
// Создание очереди из классов DataMessage:
DecodedMessageQueue = new TQueue< DataMessage>(20);
...
// Постановка сообщений в очередь:
DataMessage datamessage = new DataMessage(datastr);
DecodedMessageQueue.Enqueue(datamessage);
// Передача события новых данных datamessage в форму, где
// они будут отображаться на визуальных компонентах
// интерфейса приложения.
guiTask.ReportProgress(0);
...
// Обработчик прогресса guiTask, который имеет доступ
// в интерфейсу GUI:
void guiTaskProgressHandler(object sender, ProgressChangedEventArgs pEventArgs)
{
DataMessage datamessage;
// Блокировка на очереди в течение 1 секунды:
DecodedMessageQueue.Dequeue(out datamessage, 1000);
CultureInfo ci = new CultureInfo("en-us");
switch(datamessage.ТипСообщения)
{
case MessageType.EVENT:
if ("" != datamessage.Сигнал)
{
текст = datamessage.Сигнал;
ДобавитьСтроку(текст);
}
else if (datamessage.РелеАктивировано)
{
ДобавитьСтрокуСостоянияРеле(datamessage.РелеАктивировано);
}
break;
case MessageType.ERR_TIMEOUT:
ДобавитьСтрокуОшибки("Таймаут");
break;
case MessageType.ERR_BAD_CRC:
ДобавитьСтрокуОшибки("Не совпала контрольная сумма");
break;
case MessageType.INFO:
СтрокаСтатуса.Text = datamessage.Сигнал;
break;
default:
ДобавитьСтрокуОшибки("Неизвестная ошибка");
break;
}
}
[Ссылки]
1. Visual Studio C++ 2010 Express: BackgroundWorker Class. 2. Потоки на C#. Часть 1: введение. |