Программирование PC Python: синхронизация между потоками Sat, March 29 2025  

Поделиться

Нашли опечатку?

Пожалуйста, сообщите об этом - просто выделите ошибочное слово или фразу и нажмите Shift Enter.


Python: синхронизация между потоками Печать
Добавил(а) microsin   

В этой статье (перевод [1]) обсуждается концепция синхронизации потоков для многопоточного программирования на языке Python [2].

[Синхронизация между потоками]

Синхронизация потоков определена как механизм, который обеспечивает невозможность для двух или большего количества потоков одновременно выполнять один и тот же сегмент программы, известный как критическая секция.

Критическая секция относится к частям программы, где осуществляется доступ к общему ресурсу (такому как файл или последовательный порт).

На диаграмме ниже показан пример, где 3 потока пытаются одновременно обратиться к общему ресурсу, или к критической секции. Это также называют конкурентным доступом к общему ресурсу.

multithreading python fig06

Без синхронизации конкурентный доступ к общему ресурсу может привести к гонке между потоками (так называемый рейсинг, race condition, и состязание потоков), когда есть риск потери целостности общего ресурса.

Рейсинг происходит, когда два или большее количество потоков пытаются обратиться к общему ресурсу и изменить его одновременно. В результате значения переменных общего ресурса могут стать непредсказуемыми, и меняться в зависимости от интервалов времени переключения контекста потоков.

Рассмотрим программу, которая поможет понять концепцию рейсинга:

import threading

# Глобальная переменная x: x = 0

# Функция для инкремента глобальной переменной x:
def increment(): global x x += 1

# Задача потока, которая вызывает функцию increment
# 100000 раз:
def thread_task(): for _ in range(100000): increment()

def main_task(): global x # Установка глобальной переменной x в 0: x = 0
# Создание потоков: t1 = threading.Thread(target=thread_task) t2 = threading.Thread(target=thread_task)
# Запуск потоков: t1.start() t2.start()
# Ожидание, когда потоки завершат свою работу: t1.join() t2.join()

if __name__ == "__main__": for i in range(10): main_task() print("Итерация {0}: x = {1}".format(i,x))

Вывод программы:

Итерация 0: x = 175005
Итерация 1: x = 200000
Итерация 2: x = 200000
Итерация 3: x = 169432
Итерация 4: x = 153316
Итерация 5: x = 200000
Итерация 6: x = 167322
Итерация 7: x = 200000
Итерация 8: x = 169917
Итерация 9: x = 153589

В этой программе выполняются следующие действия:

• В функции main_task создаются два потока t1 и t2, и глобальная переменная x устанавливается в 0.
• Каждый поток имеет целевую функцию thread_task, в которой функция increment вызывается 100000 раз.
• Функция increment будет инкрементировать на 1 глобальную x в каждом вызове.
• Цикл for основной программы запускает описанные выше действия 10 раз.

Ожидается, что в результате работы обоих потоков на каждой итерации цикла переменная x должна будет содержать значение 200000. Но очевидно, что это получается далеко не всегда. Что происходит?..

Примечание: на самом деле результат может отличаться от версии операционной системы. Показанный выше вывод с отличающимися значениями x был для программы, запущенной на Ubuntu. На Windows в каждой итерации значение x получалось 200000.

Не одинаковые вычисленные значения x получаются из-за конкурентного доступа к общей переменной x. Эта непредсказуемость в значении x - не что иное, как состояние гонки.

Ниже приведена диаграмма, которая показывает, как состояние гонки может возникнуть в вышеуказанной программе:

multithreading python fig07

Обратите внимание, что ожидаемое значение x на приведенной выше диаграмме равно 12, но из-за состояния гонки оно оказывается равным 11!

Таким образом, нам нужен инструмент для правильной синхронизации между потоками.

[Реализация критической секции с помощью Lock]

Модуль threading предоставляет класс Lock, предназначенный для устранения гонки. Lock реализован с использованием объекта Semaphore, предоставляемого операционной системой.

Что такое семафор. Семафор это объект синхронизации, который управляет доступом нескольких процессов/потоков к общему ресурсу в рабочем окружении параллельного выполнения кода. Это просто значение в назначенном месте в хранилища операционной системы (или ядра), которое каждый процесс/поток может проверить, а затем изменить. В зависимости от этого значения процесс/поток либо может использовать общий ресурс, либо обнаружит, что он уже кем-то используется, и нужно подождать некоторое время, прежде чем повторить попытку доступа. Семафоры могут быть двоичными (со значениями 0 или 1), либо могут иметь дополнительные значения (такие семафоры называются семафорами со счетчиком). Обычно процесс/поток, использующий семафор, проверяет его значение. Если семафор сигнализирует (== 0), что ресурс в настоящее время свободен, то поток изменяет значение семафора (== 1), показывая тем самым другим процессам/потокам, что ресурс занят.

Класс Lock предоставляет следующие методы:

acquire([blocking]): захват семафора. Захват может быть блокирующим и не блокирующим. 

   - Когда аргумент blocking установлен в True (по умолчанию), то выполнение вызвавшего потока блокируется до тех пор, пока захват не будет снят. В момент освобождения семафор переводится в состояние захвата, и acquire возвратит True.
   - Когда аргумент blocking установлен в False, выполнение вызвавшего потока не блокируется. Если захват снят, то acquire захватит семафор и вернет True, иначе немедленно вернет False.

release(): освобождение захвата семафора. Когда семафор захвачен, он освобождается, и release выполнит возврат. Если при этом любой другой поток находится в состоянии блокировки, ожидая освобождения семафора, то он выйдет из блокировки, захватит семафор и продолжит выполнение. Если семафор уже в разблокированном состоянии, то будет выброшено исключение ThreadError.

Рассмотрим следующий пример, исправляющий недостаток предыдущего примера:

import threading

# Глобальная переменная x: x = 0

# Функция для инкремента глобальной переменной x:
def increment(): global x x += 1

# Задача потока, которая вызывает функцию increment
# 100000 раз:
def thread_task(): for _ in range(100000): lock.acquire() increment() # критическая секция lock.release()
global x # Установка глобальной переменной x в 0: x = 0
# Создание объекта семафора lock: lock = threading.Lock() # Создание потоков: t1 = threading.Thread(target=thread_task, args=(lock,)) t2 = threading.Thread(target=thread_task, args=(lock,))
# Запуск потоков: t1.start() t2.start()
# Ожидание, когда потоки завершат свою работу: t1.join() t2.join()

if __name__ == "__main__": for i in range(10): main_task() print("Итерация {0}: x = {1}".format(i,x))

Критическая секция в этой программе (т. е. код, выполняющийся непрерывно и атомарно), находится между вызовами lock.acquire() и lock.release(). Вывод программы теперь правильный:

Итерация 0: x = 200000
Итерация 1: x = 200000
Итерация 2: x = 200000
Итерация 3: x = 200000
Итерация 4: x = 200000
Итерация 5: x = 200000
Итерация 6: x = 200000
Итерация 7: x = 200000
Итерация 8: x = 200000
Итерация 9: x = 200000

Давайте разберемся по шагам, что происходит в этом коде:

• Сначала создается экземпляр объекта Lock вызовом его конструктора:

lock = threading.Lock()

• Затем lock передается как аргумент функцию потока:

t1 = threading.Thread(target=thread_task, args=(lock,))
t2 = threading.Thread(target=thread_task, args=(lock,))

• В критической секции функции потока мы применили захват семафора вызовом метода lock.acquire(). Как только произошел захват семафора, никакой другой поток не сможет получить доступ к коду критической секции (в нашем примере это функция increment) пока блокировка не будет освобождена вызовом метода lock.release().

lock.acquire()
increment()
lock.release()

Как вы можете видеть программа дает теперь каждый раз правильный результат 200000 для переменной x. Следующая диаграмма показывает, что происходит при выполнении этого кода:

multithreading python fig08

[Пример эффективного использования background-потока]

В следующем простом примере создается фоновый поток, который читает из последовательного порта данные в глобальный буфер. Таким образом создается двухпоточное приложение - фоновый поток и основной поток. Основной поток программы читает данные из этого буфера и выводит их на экран консоли.

import serial
import threading
import time
from queue import Queue

# Глобальный буфер, где поток накапливает принятые данные: serial_buffer = bytearray() buffer_lock = threading.Lock()

def serial_reader(): global serial_buffer try: # На Windows для параметра port будет использоваться имя # наподобие 'COMn', на Linyx это должно быть имя # наподобие '/dev/ttyUSB1'. ser = serial.Serial( port='COM1', # Подставьте тут реальное имя последовательного порта baudrate=9600, timeout=1 ) while True: if ser.in_waiting: # Чтение доступных данных во временный буфер: data = ser.read(ser.in_waiting) # Безопасно добавим принятые данные в глобальный буфер, # используя блокировку (lock): with buffer_lock: serial_buffer.extend(data) time.sleep(0.1) # Небольшая задержка, чтобы не перегружать CPU: except serial.SerialException as e: print(f"Ошибка последовательного порта: {e}") return

def main(): # Запуск background-потока, который читает данные из последовательного порта: reader_thread = threading.Thread(target=serial_reader, daemon=True) reader_thread.start() # Основной поток читает и печатает принятые данные: try: while True: # Безопасный доступ к глобальному буферу с использованием блокировки: with buffer_lock: if len(serial_buffer) > 0: # Печать и очистка глобального буфера: print(serial_buffer.decode('utf-8', errors='ignore'), end='') serial_buffer.clear() time.sleep(0.1) # Небольшая задержка, чтобы не перегружать CPU except KeyboardInterrupt: # Была нажата комбинация клавиш Ctrl+C, завершение работы программы: print("\nExiting...")

if __name__ == "__main__": main()

[Достоинства и недостатки многопоточности]

Достоинства:

• Позволяет писать программы, в которых интерфейс взаимодействия с пользователем не блокируется (программа не зависает в результате выполнения длительных вычислений). Это потому, что потоки в программе получают некоторую независимость друг от друга.
• Лучше используются системные ресурсы, поскольку потоки могут параллельно выполнять свои задачи.
• Улучшается производительность многопроцессорных систем.

Многопоточные серверы и интерактивные GUI обязательно используют многопоточность.

Недостатки:

• С повышением количества потоков увеличивается сложность, повышаются накладные расходы на переключение контекста.
• Необходима синхронизация общих ресурсов (объектов, данных, файлов, и т. п.).
• Многопоточный код сложен в отладке, при недостаточно тщательном планировании выполнения потоков может давать непредсказуемые результаты.
• Потенциальные взаимные блокировки (deadlocks) могут привести к тому, что некоторые потоки могут получать слишком мало процессорного времени.
• Создание и синхронизация потоков приводит к повышенному расходу ресурсов CPU и памяти.
• Переносимость сложного кода может зависеть от версии операционной системы и даже от версии Python.

[Ссылки]

1. Multithreading in Python | Set 2 (Synchronization) site:geeksforgeeks.org.
2. Python: введение в параллельное программирование.

 

Добавить комментарий


Защитный код
Обновить

Top of Page