Программирование PC Python: потоки (threading) Tue, March 05 2024  

Поделиться

Нашли опечатку?

Пожалуйста, сообщите об этом - просто выделите ошибочное слово или фразу и нажмите Shift Enter.

Python: потоки (threading) Печать
Добавил(а) microsin   

Исходный код: Lib/threading.py. В этом модуле содержится высокоуровневый интерфейс работы с потоками, организованный поверх низкоуровневого модуля _thread.

Изменение в версии 3.7: этот модуль необязательный, и в настоящий момент нужен не всегда.

Примечание: в серии версий Python 2.x этот модуль содержит camelCase-имена (горбатыеИмена) для некоторых методов и функций. Они устарели с появлением Python 3.10, однако все еще поддерживаются для совместимости с Python 2.5 и более старыми версиями.

См. также concurrent.futures.ThreadPoolExecutor, предоставляющий высокоуровневый интерфейс для перевода задач в фоновый поток (background thread) без блокировки выполнения вызвавшего потока, при этом все еще остается возможность получить его результаты в случае необходимости.

Очередь (queue) предоставляет thread-safe интерфейс для обмена данными между работающими потоками.

Класс asyncio предоставляет альтернативный способ достижения конкурентного выполнения уровня задач без необходимости использования нескольких работающих системных потоков.

Деталь реализации CPython: в CPython из-за его глобальной блокировки интерпретатора (Global Interpreter Lock), только один поток может выполнять код Python одновременно (несмотря на то, что некоторые библиотеки, ориентированные на производительность, могут преодолеть это ограничение). Если вы хотите, чтобы приложение лучше использовало вычислительные ресурсы на многоядерных машинах, то рекомендуется использовать multiprocessing или concurrent.futures.ProcessPoolExecutor. Однако threading все еще остается подходящей моделью, если нужно одновременно запустить несколько связанных с вводом/выводом задач.

Доступность: нельзя использовать ни Emscripten [3], ни WASI [4]. Этот модуль не работает, либо недоступен на WebAssembly-платформах wasm32-emscripten и wasm32-wasi. См. [2] для дополнительной информации.

Функции. Модуль Lib/threading.py определяет следующие функции:

threading.active_count()

Возвратит количество объектов Thread, присутствующих в настоящий момент. Возвращенное количество эквивалентно длине списка, возвращаемого enumerate().

Функция activeCount это устаревший псевдоним для active_count.

threading.current_thread()

Возвратит текущий объект Thread object, соответствующий потоку, который вызвал эту функцию. Если поток управления вызывающей стороны не был создан через модуль threading, то будет возвращен пустой объект потока (dummy thread object) с ограниченной функциональностью.

Функция currentThread это устаревший псевдоним для current_thread.

threading.excepthook(args, /)

Обработает неперехваченное исключение, которое вызвал метод Thread.run().

У параметра args атрибуты следующие:

exc_type: тип исключения.
exc_value: значение исключения, может быть None.
exc_traceback: обратная трассировка исключения, может быть None.
thread: поток, который вызвал исключение, может быть None.

Если exc_type равно SystemExit, то исключение молча игнорируется. Иначе исключение будет напечатано в sys.stderr.

Если это исключение вызвала функция, то для его обработки будет вызвана sys.excepthook().

Определение threading.excepthook() может быть переназначено, чтобы управлять процессом обработки не перехваченных исключений, которые вызвала Thread.run().

Сохранение exc_value с помощью пользовательского перехватчика может создать ссылочный цикл. Он должен быть очищен явным образом, чтобы прервать ссылочный цикл, если исключение больше не требуется.

Сохранение thread с помощью пользовательского перехватчика может возродить этот поток, если он установлен на объект, который завершается. Избегайте сохранения thread после завершения пользовательского обработчика, чтобы не допускать возрождения объектов.

Эта функция появилась в версии 3.8.

См. также sys.excepthook() для обработки не перехваченных исключений.

threading.__excepthook__

Хранит оригинальное значение threading.excepthook(). Оно сохраняется таким образом, что исходное значение может быть восстановлено, если оно было заменено поврежденными или альтернативными объектами.

Эта функция появилась в версии 3.10.

threading.get_ident()

Возвратит идентификатор текущего потока, ненулевое целое число. Это число не заключает в себе никакого прямого смысла; оно предназначено служить "волшебным пирожком" (magic cookie), например в качестве индекса словаря специфических для потока данных. Идентификаторы потока могут быть повторно использованы, когда поток существует, и создается другой поток.

Эта функция появилась в версии 3.3.

threading.get_native_id()

Возвратит активный интегральный Thread ID текущего потока, назначенный ему ядром. Это неотрицательное целое число, значение которого уникально идентифицирует этот поток в контексте системы в целом - пока поток не будет уничтожен. После уничтожения потока это значение может быть повторно использовано операционной системой.

Доступность: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX [2].

Эта функция появилась в версии 3.8.

threading.enumerate()

Возвратит список объектов Thread, активных в настоящий момент. Этот список включает фоновые потоки (daemonic threads) и пустые объекты потоков (dummy thread objects), созданные в current_thread(). Список не включает в себя завершенные потоки и те потоки, которые еще не стартовали. Однако главный поток (main thread) всегда входит в этот список, даже когда он завершен.

threading.main_thread()

Возвратит объект Thread главного потока. В нормальных условиях главный поток (main thread) это поток, в котором был запущен интерпретатор Python.

Эта функция появилась в версии 3.4.

threading.settrace(func)

Установит функцию трассировки всех потоков, запущенных их модуля threading. Эта функция будет передана в sys.settrace() для каждого потока перед вызовом его метода run().

threading.gettrace()

Извлечет функцию трассировки, установленной settrace().

Эта функция появилась в версии 3.10.

threading.setprofile(func)

Установит функцию профилировщика для всех потоков, запущенных из модуля threading. Эта функция будет передана в sys.setprofile() для каждого потока перед вызовом его метода run().

threading.getprofile()

Извлечет функцию профиля, установленной setprofile().

Эта функция появилась в версии 3.10.

threading.stack_size([size])

Возвратит размер стека потока, использованный при создании новых потоков. Опциональный аргумент size указывает размер стека, используемый для созданных впоследствии потоков, и этот размер должен быть либо 0 (используется размер платформы, или конфигурируется значение по умолчанию), либо положительным числом, не меньшим 32768 (32 килобайта). Если size не указан, то используется 0. Если изменение размера стека потока не поддерживается, то возникнет ошибка RuntimeError. Если указанный размер недопустимый, то возникнет исключение ValueError, и размер стека останется неизмененным.

32 килобайта в настоящий момент минимальный поддерживаемый размер стека, гарантирующий достаточное пространство и для самого интерпретатора. Обратите внимание, что на некоторых платформах может действовать специальные ограничения на значения размера стека, такие как требование минимального размера стека > 32 килобайт, или необходимость выделения размера, который нацело делится на размер системной страницы (system memory page). Для дополнительной информации следует изучить документацию на применяемую платформу (обычно размер страницы составляет 4 килобайта, так что рекомендуется, чтобы размер стека нацело делился на 4096, когда нет более подробной информации по требованиям к размеру стека).

Доступность: Windows, pthreads. Платформы Unix с поддержкой POSIX threads.

Константы. Модуль threading также определяет следующие константы:

threading.TIMEOUT_MAX

Максимальное значение, допустимое для параметра таймаута блокирующих функций (Lock.acquire(), RLock.acquire(), Condition.wait(), и т. д.). Указание таймаута больше этого значения приведет к исключению OverflowError.

Эта появилось в версии 3.2.

Классы. В модуле threading определены некоторые классы, подробно описываемые далее.

Дизайн модуля реализован практически на модели потоков Java. Однако Java делает блокировки (lock) и переменные условия (condition variable) базовым поведение для каждого объекта, а у Python для этого имеются отдельные объекты. Thread-класс на Python поддерживает подмножество поведения Java-класса Thread. В настоящий момент не реализованы приоритеты, группы потоков, и потоки не могут быть уничтожены, остановлены, приостановлены, возоблены или прерваны. Статические методы Java-класса Thread, когда они реализованы, отображаются на функции уровня модуля.

Все методы, описанные далее, выполняются атомарно.

[Локальные данные потока]

Thread-local data это данные, которые являются специфическими для потока. Чтобы управлять локальными данными потока, просто создайте экземпляр local (или подкласс), и сохраняйте в нем атрибуты:

mydata = threading.local()
mydata.x = 1

Значения экземпляра будут отличаться для разных потоков.

class threading.local

Класс, который представляет локальные данные потока (thread-local data).

Для дополнительной информации и подробных примеров см. строку в описании модуля _threading_local: Lib/_threading_local.py [5].

[Объекты потока]

Класс Thread представляет активность, действие (activity), которое работает в отдельном потоке управления. Существует 2 способа указать activity: путем передачи вызываемого объекта в конструктор, или путем переназначения метода run() в субклассе. В субклассе не должны быть переназначены никакие другие методы кроме конструктора. Другими словами, переназначайте только методы __init__() и run() в этом классе.

Как только поток создан, его activity должна быть запущена методом start() потока. Это запустит метод run() в отдельном потоке управления.

Как только activity стартовала, поток считается существующим (alive). Он прекращает существование, когда завершится метод run() - либо обычным способом, либо возникновением необработанного исключения (unhandled exception). Метод is_alive() позволяет проверить существование потока.

Другие потоки могут вызвать метод join() потока. Это заблокирует вызвавший поток до тех пор, пока не будет завершен поток, для которого был вызван join().

У потока есть имя (name). Имя может быть передано в конструктор, и прочитано или установлено через атрибут name.

Если метод run() сгенерировал исключение, то для его обработки будет вызван threading.excepthook(). По умолчанию threading.excepthook() игнорирует обработку и молча делает SystemExit.

Поток может быть помечен как "daemon thread". Смысл этого флага в том, что когда вся программа Python делает выход, когда остаются только daemon-потоки. Начальное значение наследуется из создающего потока. Этот флаг может быть установлен через свойство daemon, либо через аргумент конструктора daemon.

Примечание: daemon-потоки жестко терминируются при общем останове (shutdown). Их ресурсы (такие как открытые файлы, транзакции базы данных и т. п.) могут не быть корректно освобождены. Если вы хотите добиться корректной остановки потоков, то не делайте их daemonic, и используйте подходящий механизм сигнализации, такой как Event.

Существует объект главного потока, "main thread". Он соответствует начальному потоку управления в программе Python, и он не является потоком типа daemon.

Могут создаваться объекты пустого потока, "dummy thread objects". Это объекты потока, соответствующие "чужеродным потокам" (alien threads), которые являются потоками управления, запущенными вне модуля threading, например запущенными непосредственно из кода C. У объектов пустого потока функциональность ограниченная; они всегда считаются alive и daemonic, и к ним нельзя подключиться (join). Они никогда не удаляются, поскольку нельзя определить завершение потоков alien.

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Этот конструктор всегда должен быть вызван с ключевым словом аргументов. Описание этих аргументов:

group Здесь должно быть None; этот аргумент зарезервирован для будущего расширения, когда будет реализован класс ThreadGroup.

target Это вызываемый объект, который будет вовлечен методом run(). По умолчанию None, что означает, что никакая функция вызвана не будет.

name Имя потока. По умолчанию уникальное имя конструируется в форме "Thread-N" где N небольшое десятичное число, или "Thread-N (target)", где "target" это target.__name__, если был указан аргумент target.

args Это список или кортеж аргументов для запуска target. По умолчанию ().

kwargs Это словарь аргументов для запуска target. По умолчанию {}.

daemonic Если не None, то аргумент daemon явно устанавливает, что поток является типом daemonic. Если None (по умолчанию), то свойство daemonic наследуется из текущего потока.

Если субкласс переназначает конструктор, то нужно гарантировать вызов конструктора базового класса (Thread.__init__()) перед любыми действиями с потоком.

Изменения в версии 3.10: используется имя target, если аргумент name опущен. Изменения в версии 3.3: добавлен аргумент daemon.

start()

Запустит активность потока. Этот метод должен быть вызван не более одного раза на объект потока. Принимаются меры, чтобы метод run() объекта был вовлечен в отдельном потоке управления.

Этот метод сгенерирует RuntimeError, если был вызван несколько раз на одном и том же объекте потока.

run()

Этот метод представляет активность потока.

Вы можете переназначить этот метод в субклассе. Стандартный метод run() вовлечет вызываемый объект, переданный в конструктор объекта через аргумент target, если он указан, вместе с позиционными аргументами и аргументами ключевых слов, которые берутся из args и kwargs соответственно.

Использование списка или кортежа в виде аргумента args, который передается в Thread, будет давать тот же самый эффект.

Пример:

>>>
from threading import Thread
t = Thread(target=print, args=[1])
t.run()
1
t = Thread(target=print, args=(1,))
t.run()
1

join(timeout=None)

Ждет, когда поток завершится. Этот вызов заблокирует вызвавший поток до тех пор, пока не будет завершен поток, на который был нацелен вызов join() – этот поток либо должен завершиться нормальным образом, либо через unhandled exception – или пока не произойдет опционально указанный таймаут.

Когда аргумент timeout присутствует, и он не равен None, это должно быть число с плавающей запятой, указывающее значение таймаута операции в секундах (или дробных долях секунды). Поскольку join() всегда возвратит None, вы должны вызвать is_alive() после join() чтобы определить, было ли истечение таймаута – если поток все еще активен, то вызов join() завершится по таймауту.

Когда аргумент timeout не указан, либо указан как None, операция заблокируется до тех пор, пока целевой поток не завершится.

К потоку можно вызвать join несколько раз.

Вызов join() сгенерирует RuntimeError, если была сделана попытка join к текущему потоку, что вызовет deadlock. Также будет ошибкой делать join() к потоку до его запуска, и такая попытка сгенерирует такое же исключение.

name

Строка, используемая для целей идентификации. У неё нет семантики. Нескольким потокам можно дать одинаковое имя (хотя в этом мало смысла). Начальное имя устанавливается конструктором.

getName()
setName()

Устаревшие API-функции для извлечения/установки имени; вместо этого используйте прямое обращение к свойству экземпляра класса.

Устарело начиная с версии 3.10.

ident

Идентификатор этого потока или None, если поток не был запущен. Это ненулевое целое число. См. функцию get_ident(). Идентификаторы потока могут использоваться повторно, когда поток существует, и создается другой поток. Идентификатор доступен даже после того, как произошел выход из потока.

native_id

Thread ID (TID) этого потока, назначенный операционной системой (её ядром). Это положительное целое число или, если поток не был запущен. См. функцию get_native_id(). Это значение может использоваться для уникальной идентификации определенного потока на уровне системы (до тех пор, пока поток не завершится, после чего значение идентификатора может быть повторно использоваться операционной системой).

Обратите внимание, что подобно идентификаторам Process ID идентификаторы Thread ID достоверны (гарантировано уникальны на уровне системы) от момента создания потока до момента, когда он завершился.

Доступность: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD.

Эта функция появилась в версии 3.8.

is_alive()

Возвратит True, если поток существует. Значение будет возвращено сразу после того, как запустится метод run(), и так будет до тех пор, пока не завершится метод run(). Функция enumerate() модуля возвратит список всех доступных потоков, находящихся в рабочем состоянии.

daemon

Двоичное значение, которое показывает, является ли поток daemon-потоком (если да, то будет возвращено True, если нет, то False). Это значение должно быть установлено перед вызовом start(), иначе будет сгенерировано RuntimeError. Начальное значение наследуется из создающего потока. Главный поток (main thread) не является daemon-потоком, по поэтому у всех потоков, создаваемых из главного потока, по умолчанию daemon = False.

Вся программа Python делает выход, когда не осталось ни одного работающего не-daemon потока.

isDaemon()
setDaemon()

Устаревшие API-функции для извлечения/установки значения daemon; вместо них напрямую используйте значение свойства.

Устарело начиная с версии 3.10.

[Lock объектов]

Примитивная блокировка (lock) это примитив синхронизации, который не принадлежит определенному потоку при блокировке. На Python это в настоящее время это самый низкоуровневый доступный примитив синхронизации, реализованный непосредственно модулем расширения _thread.

Примитив блокировки находится в одном из двух состояний, "locked" (заблокировано) или "unlocked" (разблокировано). Он создается в состоянии unlocked, и имеет 2 базовых метода acquire() и release(). Когда состояние unlocked, метод acquire() меняет состояние на locked, и делает немедленный возврат. Когда состояние locked метод acquire() блокируется до тех пор, пока другой поток не вызовет метод release(), тогда вызов acquire() сбросит состояние в locked и выполнит возврат. Метод release() должен вызываться только в состоянии locked; это поменяет состояние на unlocked и сделает немедленный возврат. Если сделать попытку блокировки в состоянии unlocked, то будет сгенерирована RuntimeError.

Блокировки также поддерживают протокол управления контекстом.

Когда больше одного потока находятся в состоянии blocked, вызвав acquire(), ожидая изменения состояния в unlocked, только один поток поток возобновит выполнение, когда вызов release() сбросит состояние в unlocked; какой из ожидающих потоков возобновит выполнение - не определено, и это может меняться в различных реализациях.

Все методы выполняются атомарно.

class threading.Lock

Класс, реализующий объекты примитивной блокировки. Как только поток получил блокировку (acquired lock), последующие попытки получения блокировки блокируются до тех пор, пока блокировка не будет освобождена (released); любой поток может освободить блокировку.

Обратите внимание, что Lock на самом деле является заводской функцией, которая возвращает экземпляр наиболее эффективной версии конкретного класса Lock, поддерживаемого платформой.

acquire(blocking=True, timeout=- 1)

Получение блокировки, с блокированием или без.

Когда вызывается с аргументом blocking, установленным в True (по умолчанию), выполнение блокируется до тех пор, пока не произойдет разблокировка, затем установится в locked и возвратит True.

Когда вызыается с аргументом blocking, установленным в False, блокировка не происходит. Если вызов с blocking, установленным в True, блокируется, то немедленно возвратит False; иначе установит блокировку в locked и возвратит True.

Когда запускается аргументом timeout в виде положительного значения с плавающей точкой, то блокировка происходит как не более чем указанное количество секунд, и до тех пор, пока блокировка не может быть получена. Аргумент timeout, установленный в -1, указывает на неограниченное ожидание. Запрещается указывать timeout, когда blocking = False.

Возвращаемое значение True, если блокировка получена успешно, и False если это не так (например, если истек таймаут).

Изменено в версии 3.2: новый параметр timeout. Изменено в версии 3.2: захват блокировки теперь может быть прерван сигналами на POSIX, если нижележащая реализация threading это поддерживает.

release()

Освободит блокировку. Это можно вызвать из любого потока, не только из потока, который захватил блокировку.

Когда блокировка находится в заблокированном состоянии, сбросит её в разблокированное состояние и выполнит возврат. Если любые другие потоки заблокированы в ожидании разблокировки этой блокировки, только одному из них разрешается продолжить работу.

Когда сделана попытка запуска на разблокированной блокировки, генерируется RuntimeError.

У метода нет возвращаемого значения.

locked()

Возвратит True, если блокировка захвачена.

[RLock объектов]

Реэнтрантная блокировка это примитив синхронизации, который можно захватить несколько раз одним и тем же потоком. Внутренне это использует концепцию "потока владельца" и "уровень рекурсии" в дополнение к состоянию locked/unlocked, используемому примитивными блокировками. В состоянии locked некоторый поток владеет блокировкой; в состоянии unlocked у примитива блокировки владельца нет.

Чтобы взять блокировку, поток вызывает свой метод acquire(); произойдет возврат, когда поток владеет этой блокировкой. Чтобы снять блокировку, поток вызывает свой метод release(). Вызовы acquire()/release() составляют пары, и блокировки могут быть вложенными; только последний вызов release() (release() из самой внешней пары) сбросит блокировку в разблокированное состояние, и позволит другому заблокированному в acquire() потоку продолжить свое выполнение.

Реэнтрантные блокировки также поддерживают протокол управления контекстом.

class threading.RLock

В этом классе реализован реэнтрантный объект блокировки. Реэнтрантная блокировка должна быть освобождена потоком, который её захватил. Как только поток захватил реэнтрантную блокировку, тот же самый поток может захватить её снова без блокировки; поток должен освободить этот объект блокировки столько же раз, сколько он его захватил.

Обратите внимание, что RLock в действительности это заводская функция, которая возвратит экземпляр самой эффективной версии конкретного класса RLock, поддерживаемого платформой.

acquire(blocking=True, timeout=- 1)

Захватывает блокировку, в блокирующем или не блокирующем режиме.

Когда вызывается без аргументов: если поток уже владеет блокировкой, то увеличивается счетчик рекурсии на 1, и происходит немедленный возврат. Иначе, если другой поток захватил эту блокировку, то на вызове acquire выполнение заблокируется, пока блокировка не будет освобождена. Как только блокировка освободилась (ей не владеет какой-либо поток), то произойдет захват объекта блокировки, уровень рекурсии установится на 1, и произойдет возврат (поток разблокируется). Если больше чем один поток находится в заблокированном состоянии в ожидании разблокировки, то только один из них сможет захватить блокировку. В этом случае не будет возвращено значение.

Когда acquire вызван с аргументом blocking = True, то делается все то же самое, что и при вызове без аргументов, и будет возвращено True.

Когда acquire вызван с аргументом blocking = False, блокировка не произойдет. Если вызов без аргумента произвел блокировку, то будет немедленный возврат False; иначе выполняется все то же самое, как при вызове без аргументов, и будет возвращено True.

Когда acquire вызван с аргументом timeout (значение с плавающей точкой), установленным в положительное значение, то блокировка будет на время не более установленного значения в секундах, и будет продолжаться, пока объект блокировки не может быть захвачен. Будет возвращено True, если объект блокировки был захвачен, и False если истек таймаут.

Изменено в версии 3.2: новый параметр timeout.

release()

Освобождает блокировку, декрементируя уровень рекурсии. Если значение рекурсии после декоремента достигнет 0, блокировка сбросится (у неё не будет владельца), и если в этот момент любой другой поток (потоки) находится в состоянии блокировки, ожидая освобождения блокировки, то как только блокировка будет освобождена, этот поток становится её владельцем и продолжит выполнение. Счетчик рекурсии объекта блокировки при этом установится в 1. Если после блокировки счетчик рекурсии не обнулится, то владелец объекта блокировки не поменяется.

Вызвать этот метод можно только в том случае, если объектом блокировки владеет вызывающий поток. Возникнет RuntimeError, если этот метод вызван, когда блокировка освобождена.

Метод не возвращает значение.

[Condition-объекты]

Переменная условия (condition variable) всегда ассоциируется с некоторым видом блокировки; она может быть передана или будет создана по умолчанию. Передача полезна, когда несколько переменных условия должны использовать общую, одну и ту же блокировку. Блокировка является частью объекта условия: вы не должны отслеживать это по отдельности.

Переменная условия подчиняется протоколу управления контекстом: с помощью оператора with захватывается связанная блокировка на время действия блока with. Методы acquire() и release() также вызывают соответствующие методы ассоциированной блокировки.

Другие методы должны вызываться с удержанием связанной блокировки. Метод wait() снимает блокировку, и затем блокируется до тех пор, пока другой поток его не разбудит вызовом notify() или notify_all(). После пробуждения wait() снова захватит блокировку и сделает возврат. Также можно указать таймаут.

Метод notify() разбудит один из потоков, который ждет переменную условия, если существуют такие ожидающие потоки. Метод notify_all() разбудит все потоки, которые ждут переменную условия.

Примечание: методы notify() и notify_all() не освобождают блокировку; это значит, что разбуженный поток или потоки не выполнят немедленный возврат из своих вызовов wait(), но только тогда, когда поток, вызвавший notify() или notify_all(), окончательно откажется от владения блокировкой.

Типовой стиль программирования, использующий переменные условия, использует блокировку для синхронизации доступа к некоторому общему состоянию. Потоки, интересующиеся в определенном изменении состояния, делают повторяющиеся вызовы wait(), пока не увидят необходимое состояние, в то время как потоки, изменяющие это состояние, вызывают notify() или notify_all(), когда они поменяли состояние и таким способом, оповещая об изменении ожидающие потоки, которым это изменение могло быть желательным. Например, следующий код это стандартная ситуация поставщик-потребитель с неограниченной емкостью буфера:

# Потребление одного элемента
with cv:
   while not an_item_is_available():
      cv.wait()
   get_an_available_item()
 
# Генерация одного элемента
with cv:
   make_an_item_available()
   cv.notify()

Проверка состояния приложения во время цикла while необходима, потому что функция wait() может выполнить возврат после произвольно продолжительного интервала времени, а условие, которое запустило оповещение notify(), может больше не иметь значение true. Это присуще многопоточному программированию. Можно использовать метод wait_for() для автоматической проверки условия, и упростить вычисление таймаутов:

# Потребление элемента
with cv:
   cv.wait_for(an_item_is_available)
   get_an_available_item()

Для выбора между notify() и notify_all() проверьте, может ли одно изменение быть интересным для одного или нескольких ожидающих потоков. Например, в типовой ситуации поставщик-потребитель добавление одного элемента в буфер требует только пробуждения одного потребляющего потока.

class threading.Condition(lock=None)

Этот класс реализует объект переменной условия. Переменная условия позволяет одному или нескольким потокам ждать, пока им не поступи оповещение от другого потока.

Если указан аргумент lock, и он указан не None, то здесь должен присутствовать объект Lock или RLock, используемый как нижележащая блокировка. Иначе будет создан новый объект RLock, который будет использоваться для нижележащей блокировки.

Изменено в версии 3.3: заводская функция заменена на класс.

acquire(*args)

Захватит нижележащую блокировку. Этот метод вызовет соответствующий метод нижележащей блокировки; возвращаемое значение независимо от того, что вернет метод нижележащей блокировки.

release()

Освободит нижележащую блокировку. Этот метод вызовет соответствующий метод нижележащей блокировки. Метод не возвращает значение.

wait(timeout=None)

Ожидание, пока не поступит оповещение, либо не истечет таймаут. Если вызывающий поток не захватил блокировку, когда вызван этот метод, то будет сгенерирована RuntimeError.

Этот метод освободит нижележащую блокировку, и затем заблокируется, пока не будет разбужен вызовом notify() или notify_all() из другого потока для той же переменной условия, или пока не истечет опционально указанный таймаут. Как только таймаут истек, блокировка захватывается повторно и происходит возврат.

Когда указан аргумент timeout, и он не None, то здесь должно быть значение с плавающей точкой, определяющее длительность таймаута в секундах (может быть указано дробное количество секунд).

Когда в качестве нижележащей блокировки используется RLock, она не освобождается её методом release(), поскольку это может фактически не освободить блокировку, когда она была взята несколько раз рекурсивно. Вместо этого используется внутренний интерфейс класса RLock, который сделает фактическую разблокировку, даже если она была взята рекурсивно несколько раз. Затем используется другой внутренний интерфейс для восстановления уровня рекурсии, когда блокировка берется снова.

Возвращаемое значение True, если не задано значение таймаута, и в этом случае будет возвращено False.

Изменено в версии 3.2: ранее этот метод всегда возвращал None.

wait_for(predicate, timeout=None)

Ожидание, пока переменная условия не будет оценена как True. Параметр predicate должен быть вызываемым объектом, результат выполнения которого может быть интерпретировано как булево значение. Параметр timeout может быть предоставлен для указания максимального времени ожидания.

Это метод может делать повторные вызовы wait(), пока не будет удовлетворено условие predicate, или пока не истечет таймаут. Возвращаемое значение это последнее возвращенное значение из predicate, и оно оно будет оценено как False, если истек таймаут.

Если игнорировать фичу timeout, то вызов этого метода грубо эквивалентен следующему:

while not predicate():
   cv.wait()

Таким образом, применимы те же правила, что и с wait(): блокировка должна удерживаться, когда произошел вызов, и заново захватывается при возврате. Объект predicate вычисляется с удержанием блокировки.

Эта функция появилась в версии 3.2.

notify(n=1)

По умолчанию разбудит один ожидающий это условие поток, если таковой имеется. Если вызывающий поток не владеет блокировкой при вызове этого метода, то возникнет RuntimeError.

Этот метод рабудит не более n потоков, ожидающих переменную условия. Метод notify эквивалентен no-op (отсутствию операции), если нет ожидающих потоков.

Текущая реализация разбудит точно n потоков, если ожидают как минимум n потоков. Однако полагаться на это поведение небезопасно. В будущем оптимизированная реализация может время от времени пробуждать больше n потоков.

Примечание: разбуженные потоки фактически не возвратятся из своих вызовов wait(), пока заново не захватят блокировку. Поскольку notify() не освобождает блокировку, это должен сделать вызывающий код.

notify_all()

Разбудит все потоки, ожидающие на этом условии. Этот метод работает аналогично notify(), но разбудит все ожидающие потоки вместо одного. Если вызывающий поток не владеет блокировкой, когда вызывается этот метод, то возникнет RuntimeError.

Метод notifyAll является устаревшим псевдонимом для notify_all.

[Объекты семафора]

Это один из самых старых примитивов синхронизации в истории компьютерных технологий, придуманный немецким ученым Edsger W. Dijkstra (он использовал имена P() и V() вместо acquire() и release()).

Семафор обслуживает внутренний счетчик, который декрементируется каждым вызовом acquire(), и инкрементируется каждым вызовом release(). Счетчик никогда не может уменьшиться ниже 0; когда вызов acquire() встретится с нулевым значением счетчика, он заблокируется в ожидании, когда другой поток вызовет release().

Семафоры также поддерживают протокол управления контекстом.

class threading.Semaphore(value=1)

Этот класс реализует объекты семафора. Семафор управляет атомарным счетчиком, в котором представлено количество вызовов release() минус количество вызовов acquire(), плюс начальное значение. Метод acquire() выполнит блокировку до тех пор, пока можно будет выполнить возврат, не делая счетчик отрицательным. Если параметр value не указан, то по умолчанию value принимается равным 1. Если указано value меньше 0, то возникнет ValueError.

Изменено в версии 3.3: заводская функция заменена на класс.

acquire(blocking=True, timeout=None)

Захват семафора.

Когда метод вызывается без аргументов:

• Если внутренний счетчик больше 0, то декремент его и немедленный возврат True.
• Если внутренний счетчик равен 0, то происходит блокировка до вызова release() из другого потока. После пробуждения, когда счетчик стал больше 0, будет выполнен декремент счетчика и возврат True. Каждым вызовом release() будет разбужен только один поток. Не стоит полагаться на порядок, в которым будут пробуждаться несколько потоков, заблокированные на одном семафоре.

Когда метод вызывается с blocking = False, блокировка не производится. Если вызов без аргумента в этой ситуации блокируется, то произойдет немедленный возврат False; иначе произойдет то же самое, что при вызове без аргументов, и будет возвращено значение True.

Когда метод вызывается с параметром timeout, и он не None, то блокировка произойдет на время не больше timeout секунд. Если за этот интервал acquire не было успешным, то будет возвращено False, иначе будет возвращено True.

Изменено в версии 3.2: новый параметр timeout.

release(n=1)

Освобождение семафора с инкрементом его внутреннего счетчика на n. Когда счетчик был равен 0, и другие потоки ждали на этом семафоре, когда счетчик станет ненулевым, то будет разбужено n таких потоков.

Изменено в версии 3.9: добавлен параметр n, чтобы освободить несколько ожидающих потоков сразу.

class threading.BoundedSemaphore(value=1)

Класс, реализующий объекты ограниченного семафора. Ограниченный семафор проверяет, что его текущее значение не превышает его начальное значение. Если это произошло, то возникнет ValueError. В большинстве ситуаций эти семафоры используются для защиты ресурсов с ограниченной емкостью. Если семафор был освобожден слишком много раз, то выдается сигнал об ошибке. Если параметр value не указан, то по умолчанию он принимается равным 1.

Изменено в версии 3.3: заводская функция заменена на класс.

Семафоры часто используются для защиты ресурсов с ограниченной емкостью, например сервер базы данных. В любой ситуации, когда размер ресурса фиксирован, вы должны использовать bounded-семафор. Перед порождением любого рабочего потока ваш главный поток должен инициализировать семафор:

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

После своего запуска рабочие потоки вызывают методы acquire и release семафора, когда нужно подключиться к серверу:

with pool_sema:
   conn = connectdb()
   try:
      # ... use connection ...
   finally:
      conn.close()

Использование bounded-семафора уменьшает шанс возникновения ошибки программирования, когда семафор освобожден большее количество раз, чем может оказаться обнаруженным.

[Объекты события]

Это один из простейших механизмов взаимодействия между потоками: один поток сигнализирует о событии, и другой поток (потоки) ожидают возникновения события.

Объект события управляет внутренним флагом, который может быть установлен в True методом set() и сброшен в False методом Clear(). Метод wait() выполняет блокировку, пока флаг не станет True.

class threading.Event

Класс, реализующий объекты события. Начальное значение флага события False.

Изменено в версии 3.3: заводская функция заменена на класс.

is_set()

Возвратит True, тогда и только тогда, когда внутренний флаг True.

Метод isSet является устаревшим псевдонимом этого метода.

set()

Установит внутренний флаг в True. Все потоки, которые ожидают это событие, будут разбужены. Потоки, которые вызовут wait(), когда этот флаг в состоянии True, не будут заблокированы.

clear()

Сбросит внутренний флаг в False. Как следствие, все потоки, вызвавшие wait(), будут заблокированы, пока не будет вызван set(), чтобы снова установить внутренний флаг в True.

wait(timeout=None)

Заблокирует выполнение, пока внутренний флаг не станет True. Если внутренний флаг на входе уже в состоянии True, то произойдет немедленный возврат. Иначе произойдет блокировка, пока другой поток не вызовет set() для установки этого флага в True, или пока не истечет опционально указанный таймаут.

Когда указан аргумент timeout, и он не None, то это должно быть число с плавающей запятой, указывающее длительность таймаута в секундах (может быть дробной частью секунды).

Этот метод вернет True, если внутренний флаг установлен в True, либо при вызове wait, либо после вызова wait. Значение True будет возвращено всегда кроме ситуаций, когда истек таймаут.

Изменено в версии 3.1: ранее этот метод всегда возвращал None.

[Объекты Timer]

Этот класс представляет действие, которое должно произойти только после определенного количества прошедшего времени. Timer является субклассом от класса Thread, и его функционирование может служить примера создания пользовательских потоков.

Таймеры запускаются, как и потоки, вызовом их метода start(). Таймер может быть остановлен (до того, как начнется его действие) вызовом метода cancel(). Интервал, в течение которого таймер выполнит задержку запуска своего действия, может быть не точно такое, как указал пользователь.

Например:

def hello():
   print("hello, world")
 
t = Timer(30.0, hello)
t.start()  # после 30 секунд напечатается "hello, world"

class threading.Timer(interval, function, args=None, kwargs=None)

Создает таймер, который запустит функцию с аргументами args и аргументами ключевого слова kwargs после истечения интервала interval, указанного в секундах. Если args = None (по умолчанию) то вместо списка аргументов используется пустой список. Если kwargs = None (по умолчанию), то используется пустой словарь.

Изменено в версии 3.3: заводская функция заменена на класс.

cancel()

Остановит таймер, и отменит действие таймера. Это сработает только если таймер все еще находится на стадии ожидания.

[Объекты Barrier]

Эта функция появилась в версии 3.2.

Этот класс предоставляет простой примитив синхронизации для использования фиксированного количества потоков, которые ждут друг друга. Каждый из потоков пытается передать barrier вызовом метода wait(), и заблокируется, пока все потоки не сделают свои вызовы wait(). В этот момент все потоки освободятся одновременно.

Барьер можно повторно использовать любое количество раз для одного и того же числа потоков.

Вот простой пример способа синхронизации потоков клиента и сервера:

b = Barrier(2, timeout=5)
 
def server():
   start_server()
   b.wait()
   while True:
      connection = accept_connection()
      process_server_connection(connection)
 
def client():
   b.wait()
   while True:
      connection = make_connection()
      process_client_connection(connection)

class threading.Barrier(parties, action=None, timeout=None)

Создает объект барьера для количества потоков parties. Когда предоставлен параметр action, то это вызываемый объект, который будет запущен одним из потоков при освобождении. У параметра timeout значение по умолчанию None, которое указывается для метода wait().

wait(timeout=None)

Побуждает поток перейти барьер. Когда все участники барьера вызовут эту функцию, все они будут одновременно освобождены. Если предоставлен параметр timeout, то он используется как предпочтение используемому в конструкторе класса.

Возвращает целочисленное значение в диапазоне от 0 до parties – 1, отличающееся для каждого потока. Это может использоваться для выбора потока, который выполнит какие-то специальные действия, например:

i = barrier.wait()
if i == 0:
   # Это должен напечатать только один поток:
   print("passed the barrier")

Если конструктору было предоставлен параметр action, то один из потоков перед своим освобождением вызовет action. Если этот вызов приведет к ошибке, то барьер перейдет в нарушенное состояние.

Если произошел таймаут вызова, то барьер перейдет в нарушенное состояние.

Этот метод может вызывать исключение BrokenBarrierError, если барьер в нарушенном состоянии, или произошел reset, когда поток находится в состоянии ожидания.

reset()

Возврат барьера в состояние по умолчанию, пустое состояние. Любые ожидающие на нем потоки получат исключение BrokenBarrierError.

Обратите внимание, что использование этой функции может потребовать некоторой внешней синхронизации, если существуют другие потоки, состояние которых неизвестно. Если барьер сломан, то может быть лучше просто оставить его и создать новый.

abort()

Переводит барьер в сломанное состояние. Это приведет к тому, что любые активные wait() или последующие вызовы потерпят неудачу с генерацией BrokenBarrierError. Используйте это например если один из потоков нуждается в abort, чтобы избежать deadlock приложения.

Возможно было бы лучше просто создать barrier с разумным значением тайм-аута для автоматической защиты от одного из потоков, у которого что-то пошло наперекосяк.

parties

Количество потоков, которые должны пройти barrier.

n_waiting

Количество потоков, ожидающих сейчас в барьере.

broken

Булева переменная, которая равна True, если barrier находится в нарушенном состоянии.

exception threading.BrokenBarrierError

Это исключение, субкласс от RuntimeError, возникнет когда объект Barrier сброшен или поврежден.

[Использование блокировок, условий и семафоров в операторе with]

Все объекты, предоставленные этим модулем, которые имеют методы acquire() и release(), могут использоваться в качестве менеджеров контекста для оператора with. Метод acquire() будет вызван, когда происходит вход в блокировку, и release() будет вызван, когда происходит выход из блокировки. Таким образом, следующий кусок кода:

with some_lock:
   # тут какие-нибудь действия...

... эквивалентен следующему коду:

some_lock.acquire()
try:
   # тут какие-нибудь действия...
finally:
   some_lock.release()

В настоящее время объекты Lock, RLock, Condition, Semaphore и BoundedSemaphore могут использоваться менеджеры контекста оператора with.

[Ссылки]

1. Python threading Thread-based parallelism site:python.org.
2. Python library Notes on availability site:python.org.
3. Emscripten, Download and install site:emscripten.org.
4. singlestore-labs / python-wasi site:github.com.
5. cpython/Lib/_threading_local.py site:github.com.

 

Добавить комментарий


Защитный код
Обновить

Top of Page