Программирование PC Rust: безопасная многопоточность Tue, January 21 2025  

Поделиться

Нашли опечатку?

Пожалуйста, сообщите об этом - просто выделите ошибочное слово или фразу и нажмите Shift Enter.


Rust: безопасная многопоточность Печать
Добавил(а) microsin   

Поддержка безопасного и эффективного программирования многопоточности (конкурентного выполнения нескольких частей кода) это другая фича Rust. Параллельное программирование, когда различные части программы выполняются независимо, и параллельное программирование, когда различные части программы выполняются одновременно, становятся все более важными, поскольку все больше компьютеров используют преимущества своих нескольких процессоров. Исторически программирования в таких контекстах было сложным и подвержено ошибкам, но с появлением Rust есть надежда, что это поменялось.

Первоначально команда Rust считала, что обеспечение безопасности памяти и предотвращение проблем параллелизма - это две отдельные проблемы, которые необходимо решить различными методами. Со временем команда обнаружила, что системы владения (ownership) и типизации являются мощным набором инструментов, помогающих управлять проблемами безопасности памяти и параллелизма! Благодаря использованию владения и проверки типов многие ошибки параллелизма представляют собой ошибки времени компиляции в Rust, а не ошибки времени выполнения. Таким образом, вместо того, чтобы тратить много времени на попытки воспроизвести точные обстоятельства, при которых возникает ошибка параллелизма во время выполнения, неправильный код будет отказываться компилироваться и представлять ошибку, объясняющую проблему. В результате вы можете исправить свой код во время работы над ним, а не после того, как он будет отправлен в производство. Таким образом, философия программирования Rust позволяет писать код, свободный от скрытых багов и легкий для рефакторинга, без введения новых багов.

Многие языки догматичны в отношении решений, которые они предлагают для решения проблем многопоточности. Например Erlang имеет элегантный функционал для синхронизации потоков на основе сообщений, однако реализует только неясные методы совместного состояния между потоками. Поддержка только некоторого подмножества из возможных решений является разумной стратегией для языков более высокого уровня, потому что язык высокого уровня может отказаться от некоторого контроля, чтобы реализовать абстракции. Однако от низкоуровневых языков ожидают, что они предоставляют решения для наилучшей производительности в любой имеющейся ситуации, и имеют меньше абстракций от оборудования. Поэтому Rust предоставляет различные инструменты для моделирования проблем любым способом, подходящим для вашей ситуации и требований.

Здесь мы рассмотрим следующие темы:

• Как создавать потоки, чтобы запустить несколько частей кода одновременно.
• Многопоточность на основе передачи сообщений, где через каналы передаются сообщения между потоками.
• Многопоточность на основе общего состояния, где несколько потоков получают общий доступ к некоторой части данных.
• Трейты Sync и Send, которые расширяют гарантии многопоточности Rust как на определяемые пользователем типы, так и на типы, предоставляемые стандартной библиотекой.

[Использование потоков для одновременного запуска кода]

В большинстве существующих операционных системах выполняемый код работает в процессе, и операционная система управляет несколькими процессами одновременно. В пределах программы вы также можете получить отдельные части кода, которые работают одновременно и (почти) независимо друг от друга. Функционал, который обеспечивает эти независимо работающие части кода, называется потоками (threads). Например, веб-сервер может иметь несколько потоков, чтобы можно было обрабатывать несколько запросов одновременно со стороны подключающихся клиентов.

Разделение вычисления в вашей программе на несколько потоков для запуска нескольких задач одновременно может повысить производительность, однако добавляет при этом сложности в управлении. Поскольку потоки могут работать одновременно, то нет никакой внутренней гарантии порядка выполнения, в котором будут выполняться части кода этих потоков. Это может привести к проблемам, таким как:

• Условия гонки (рейсинг), когда потоки обращаются к данным или ресурсам в несогласованном порядке.
• Взаимная блокировка (deadlock), когда 2 потока ждут чего-то друг от друга, и поэтому не могут выполняться.
• Баги, которые возникают только в некоторых ситуациях, и которые трудно воспроизвести для надежного исправления.

Rust пытается разобраться с этими негативными эффектами с помощью потоков, но программирование в многопоточном контексте все еще требует повышенного внимания и определенной структуры кода, отличающегося от программы с одним потоком.

Языки программирования реализуют потоки несколькими разными способами, и многие операционные системы предлагают API, который можно вызывать из языка программирования для создания новых потоков. Стандартная библиотека Rust применяет модель 1:1 реализации потоков, в соответствии с которой программа использует один поток операционной системы на один языковой поток. Существуют крейты, которые реализуют другие модели потоков, отличающиеся от модели 1:1.

Создание нового потока с помощью spawn. Чтобы создать новый поток, мы вызываем функцию thread::spawn, и передаем ей замыкание (closure, об этом мы говорили в главе 13 [2]), в котором содержится код, который мы хотим запустить в новом потоке. Пример в листинге 16-1 печатает некоторый текст из основного потока, и другой текст из нового потока (файл src/main.rs):

use std::thread;
use std::time::Duration;

fn main() { thread::spawn(|| { for i in 1..10 { println!("hi number {i} from the spawned thread!"); thread::sleep(Duration::from_millis(1)); } });
for i in 1..5 { println!("hi number {i} from the main thread!"); thread::sleep(Duration::from_millis(1)); } }

Листинг 16-1. Создание нового потока для печати строки, отличающейся от печатаемой строки основного потока.

Обратите внимание, что когда основной поток main программы Rust завершается, все порожденные потоки прибиваются, независимо от того, завершили они свою работу или нет. Вывод из этой программы может каждый раз незначительно изменяться, однако он будет выглядеть примерно так:

hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the main thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!

Вызов thread::sleep заставляет поток остановить выполнение на короткое время, позволяя тем самым запуститься другому потоку. Вероятно потоки будут запускаться один за другим, однако это не гарантируется, поскольку зависит от того, как работает планировщик вашей операционной системы, когда он обслуживает переключение контекста между потоками. В этом запуске сначала печатает основной поток, несмотря на то, что оператор печати порожденного потока появляется первой в коде. И хотя мы задали порожденному потоку печатать, пока i не станет 9, он успеет напечатать только 5 до завершения основного потока.

Если вы запустили этот код, и видите только вывод из основного потока, или не видите никакого перекрытия, попробуйте увеличить числа в диапазонах, чтобы создать для операционной системы больше возможностей для переключения между потоками.

Ожидание завершения всех потоков с помощью присоединения к дескрипторам. Код в листинге 16-1 не только останавливает порожденный поток преждевременно в момент завершения основного потока, но и потому, что нет гарантии порядка, в котором потоки работают, мы также не можем гарантировать, что порожденный поток вообще будет запущен!

Мы можем решить эту проблему порожденного потока, что он не запустится, или преждевременно завершится, путем сохранения возвращенного значения thread::spawn в переменную. Возвращаемый тип из thread::spawn это дескриптор JoinHandle. JoinHandle это владеемое значение, которое, когда мы вызываем на нем метод join, будет ждать завершения этого потока. Листинг 16-2 показывает, как использовать JoinHandle созданного потока в листинге 16-1 и вызов join, чтобы обеспечить завершение порожденного потока до выхода из основного потока (файл src/main.rs):

use std::thread;
use std::time::Duration;

fn main() { let handle = thread::spawn(|| { for i in 1..10 { println!("hi number {i} from the spawned thread!"); thread::sleep(Duration::from_millis(1)); } });
for i in 1..5 { println!("hi number {i} from the main thread!"); thread::sleep(Duration::from_millis(1)); }
handle.join().unwrap(); }

Листинг 16-2. Сохранение JoinHandle из thread::spawn для гарантии, что порожденный поток отработает до своего завершения.

Вызов join на дескрипторе handle блокирует текущий выполняющийся поток, пока не завершится поток, представленный через handle. Блокирование потока означает, что он не может ни выполнять работу, ни завершиться. Поскольку мы поместили вызов join после цикла потока main, запуск листинга 16-2 должно сгенерировать вывод наподобие следующего:

hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 1 from the spawned thread!
hi number 3 from the main thread!
hi number 2 from the spawned thread!
hi number 4 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!

Два потока продолжать чередовать свое выполнение, но поток main будет ждать завершения порожденного потока, потому что был вызов handle.join().

Но давайте посмотрим, что произойдет, когда мы вместо этого переместим handle.join() перед циклом main, примерно так (файл src/main.rs):

use std::thread;
use std::time::Duration;

fn main() { let handle = thread::spawn(|| { for i in 1..10 { println!("hi number {i} from the spawned thread!"); thread::sleep(Duration::from_millis(1)); } });
handle.join().unwrap();
for i in 1..5 { println!("hi number {i} from the main thread!"); thread::sleep(Duration::from_millis(1)); } }

Теперь основной поток будет ждать завершения порожденного потока, и только после этого запустит свой цикл, так что вывод потоков теперь не будет чередоваться, что показано ниже:

hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!

Небольшие детали, такие как место вызова join, могут повлиять на то, запустятся или нет потоки одновременно.

Использование замыканий с потоками. Мы часто будем использовать ключевое слово move с замыканиями, переданными в thread::spawn, потому что замыкание будет тогда брать во владение значения, которое оно использует из среды, таким образом происходит передача владения над этими значениями от одного потока к другому. В секции "Захват ссылок, или передача владения" главы 13 [2] мы обсуждали move в контексте замыканий. Теперь мы дополнительно сконцентрируемся на взаимодействии между move и thread::spawn.

Обратите внимание, что в листинге 16-1 замыкание, передаваемое в thread::spawn, не имеет аргументов: мы не используем данные основного потока в коде порожденного потока. Чтобы использовать данные из основного потока в коде порожденного потока, замыкание порожденного потока должно захватить необходимые значения. Листинг 16-3 показывает попытку создать вектор в основном потоке, и использовать его в порожденном потоке. Однако в настоящий момент это пока не сработает, код не скомпилируется.

use std::thread;

fn main() { let v = vec![1, 2, 3];
let handle = thread::spawn(|| { println!("Here's a vector: {v:?}"); });
handle.join().unwrap(); }

Листинг 16-3. Попытка использовать вектор, созданный в потоке main, в другом потоке (файл src/main.rs).

Замыкание использует v, так что оно будет захватывать v и делать его частью окружения замыкания. Потому что thread::spawn запустит это замыкание в новом потоке, мы должны были бы получить доступ к v внутри этого нового потока. Однако при компиляции этого примера мы получим ошибку:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0373]: closure may outlive the current function, but it borrows `v`,
 which is owned by the current function
 --> src/main.rs:6:32
  |
6 |     let handle = thread::spawn(|| {
  |                                ^^ may outlive borrowed value `v`
7 |         println!("Here's a vector: {v:?}");
  |                                     - `v` is borrowed here
  |
note: function requires argument type to outlive `'static`
 --> src/main.rs:6:18
  |
6 |       let handle = thread::spawn(|| {
  |  __________________^
7 | |         println!("Here's a vector: {v:?}");
8 | |     });
  | |______^
help: to force the closure to take ownership of `v` (and any other referenced
 variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++
For more information about this error, try `rustc --explain E0373`. error: could not compile `threads` (bin "threads") due to 1 previous error

Rust подсказывает, как захватить v, и поскольку для println! нужна только ссылка на v, замыкание пытается заимствовать v. Однако здесь есть проблема: Rust не может сказать, как долго порожденный поток будет работать, так что он не знает, будет ли ссылка на v всегда достоверной.

Листинг 16-4 предоставляет сценарий, в котором с большой вероятностью ссылка на v не будет достоверной (файл src/main.rs):

use std::thread;

fn main() { let v = vec![1, 2, 3]; let handle = thread::spawn(|| { println!("Here's a vector: {v:?}"); });
drop(v); // О, нет!
handle.join().unwrap(); }

Листинг 16-4. Поток с замыканием, которое пытается захватить ссылку на v из основного потока, который выбрасывает v.

Если Rust позволит нам запустить этот код, то есть возможность, что порожденный поток немедленно будет помещен в background без какого-либо запуска. Порожденный поток имеет внутри себя ссылку на v, однако основной поток немедленно отбрасывает v, используя функцию drop, которая обсуждалась в главе 15 [3]. Затем, когда порожденный поток начал выполняться, ссылка v перестала быть достоверной!

Для исправления ошибки компиляции листинга 16-3 мы можем использовать совет из сообщения об ошибке:

help: to force the closure to take ownership of `v` (and any other referenced
 variables), use the `move` keyword
  |
6 |     let handle = thread::spawn(move || {
  |                                ++++

Добавлением ключевого слова move перед замыканием мы принуждаем замыкание брать владение над его используемыми значениями вместо того, чтобы позволить Rust сделать вывод о том, что оно должно заимствовать значения. Модификация листинга 16-3 показана в листинге 16-5, и иона будет компилироваться и запускаться, как мы и хотели (файл src/main.rs):

use std::thread;

fn main() { let v = vec![1, 2, 3];
let handle = thread::spawn(move || { println!("Here's a vector: {v:?}"); });
handle.join().unwrap(); }

Листинг 16-5. Использование ключевого слова move, чтобы замыкание брало во владение используемые им значения.

У нас может возникнуть соблазн использовать то же самое, чтобы исправить код в листинге 16-4, где основной поток вызвал drop, путем использования move замыкания. Однако этот исправление не сработает, потому что то, что делает листинг 16-4, запрещено по другой причине. Если мы добавим move для замыкания, то мы переместим v в окружение замыкания, и больше не сможем вызывать drop на нем в основном потоке. Вместо этого мы получим следующую ошибку компилятора:

$ cargo run
   Compiling threads v0.1.0 (file:///projects/threads)
error[E0382]: use of moved value: `v`
  --> src/main.rs:10:10
   |
4  |     let v = vec![1, 2, 3];
   |         - move occurs because `v` has type `Vec< i32>`, which does not implement the `Copy` trait
5  |
6  |     let handle = thread::spawn(move || {
   |                                ------- value moved into closure here
7  |         println!("Here's a vector: {v:?}");
   |                                     - variable moved due to use in closure
...
10 |     drop(v); // oh no!
   |          ^ value used here after move
For more information about this error, try `rustc --explain E0382`. error: could not compile `threads` (bin "threads") due to 1 previous error

Правила владения Rust снова спасены! Ошибка в листинге 16-3 была из-за того, что Rust был консервативен, и заимствовал только v для потока, что означало, что основной поток теоретически может сделать недействительной ссылку на порожденный поток. Указанием для Rust переместить владение над v в порожденный поток мы гарантируем для Rust, что основной поток больше нигде не будет использовать v. Если мы поменяем листинг 16-4 таким же способом, то нарушим правила владения, когда попытаемся использовать v в основном потоке. Ключевое слово move отменяет консервативное поведение по умолчанию Rust в контексте заимствования; это не позволяет нам нарушать правила владения.

С этим базовым пониманием потоков и их API, давайте посмотрим, что мы можем делать с потоками.

[Использование сообщений для передачи данных между потоками]

Одним из методов безопасной многопоточности (применяемым с нарастающей популярностью) является передача сообщений, где потоки коммуницируют друг с другом путем отправки сообщений с данными. Идея описана в слогане из документации по языку Go: "Не осуществляйте обмен через общую память; вместо этого предоставляйте память в совместный доступ через коммуникацию".

Чтобы реализовать многопоточность на основе передачи сообщений, стандартная библиотека Rust предоставляет реализацию каналов. Канал это основная концепция программирования, в которой данные передаются от одного потока другому потоку.

Канал можно представить себе как однонаправленный сток воды, наподобие ручейка. Или вы поместите в ручей резиновую уточку, по он отправит её вниз по течению, до своего конца водного пути.

Канал состоит из двух половинок: передатчик (transmitter) и приемник, или получатель (receiver). Transmitter это верхняя часть канала (upstream), то место, в котором вы помещаете уточку в ручей, а receiver это то место, куда уточка доплывет (downstream). Одна часть кода вызывает методы на передатчике с данными, которые вы хотите отправить, а другая часть на принимающем конце проверяет их получение в сообщениях. Говорят, что канал закрыт, если отбрасывается любая из его половин, transmitter или receiver.

Здесь мы будем работать над программой, где один поток генерирует значения, и посылает их через канал, а другой канал будет принимать эти значения и печатать. Передавать по каналу будем простые значения, для иллюстрации этого функционала. Как только вы освоите эту технику, то сможете использовать каналы для любых потоков, которые нуждаются в коммуникации друг с другом, такие как система чата или система, где множество потоков выполняют вычисления различных частей для одного потока, который агрегирует результаты.

Сначала в листинге 16-6 мы создадим канал, пока ничего с ним не делая. Обратите внимание, что этот код не скомпилируется, потому что Rust не может сказать, какой тип значений мы хотим передавать через канал.

use std::sync::mpsc;

fn main() { let (tx, rx) = mpsc::channel(); }

Листинг 16-6. Создание канала и назначение его двух половин в tx и rx (файл src/main.rs).

Мы создаем новый канал, используя функцию mpsc::channel; mpsc означает "multiple producer, single consumer" (несколько продюсеров, один потребитель). Короче говоря способ, которым стандартная библиотека Rust реализует каналы, означает, что канал может иметь несколько отправляющих концов, которые производят отправляемые значения, но только один принимающий конец, который потребляет эти значения. Это можно представить себе как несколько ручейков, стекающих в одну большую речку: все, что посылается по ручейкам, заканчивается в конце большой реки. Мы начнем с одного продюсера, но потом добавим еще несколько, когда этот пример заработает.

Функция mpsc::channel возвратит кортеж, первый элемент которого отправляющий конец (transmitter), а второй элемент это принимающий конец (receiver). Аббревиатуры tx и rx традиционно используются во многих полях для передатчика и приемника соответственно, так что мы дали такие имена переменным для каждого конца канала. Мы используем инструкцию let с шаблоном, который деструктурирует кортежи; мы будем обсуждать шаблоны в операторах let и деструктурирование в главе 18. Пока что все, что нужно знать: использование оператора let таким способом является удобным методом достичь распаковки частей кортежа, возвращенного из вызова mpsc::channel.

Давайте переместим передающий конец в порожденный поток, и заставим его передавать одну строку, чтобы порожденный поток связывался с основным потоком, как показано в листинге 16-7.

use std::sync::mpsc;
use std::thread;

fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }

Листинг 16-7. Перемещение tx в порожденный поток и отправка "hi" (файл src/main.rs).

Мы используем thread::spawn для создания нового потока, и затем используем move, чтобы переместить tx в замыкание, чтобы порожденный поток владел tx. Порожденному потоку нужно владеть передатчиком, чтобы он мог посылать сообщения через канал. Передатчик имеет метод send, который принимает значение, которое мы хотим отправить. Метод send возвратит тип Result< T, E>, так что если приемник был отброшен, и некуда передавать значение, то операция отправки возвратит ошибку. В этом примере мы вызвали unwrap, чтобы при ошибке произошла паника. Однако в реальном приложении мы могли бы обработать это более правильным образом: обратитесь к главе 9 [4] для обзора стратегий обработки ошибок (error handling).

В листинге 16-8 мы берем значение из приемника, который находится в основном потоке.

use std::sync::mpsc;
use std::thread;

fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); });
let received = rx.recv().unwrap(); println!("Got: {received}"); }

Листинг 16-8. Получение значения "hi" в основном потоке и его печать (файл src/main.rs).

В приемнике есть два полезных метода: recv и try_recv. Мы используем recv, который будет блокировать выполнение основного потока и ждать, пока значение не будет отправлено по каналу. Как только значение отправлено, recv выполнит его возврат в Result< T, E>. Когда передатчик закрывается, recv возвратит ошибку для сигнализации, что никаких значений больше не будет.

Метод try_recv не делает блокировку, вместо этого немедленно возвращая Result< T, E>: значение Ok будет содержать сообщение, если оно доступно, и значение Err в настоящий момент нет сообщений. Использование try_recv полезно, если у этого потока есть другая работа, которую ему надо делать при ожидании сообщений: мы могли бы написать цикл, который часто вызывает try_recv, обрабатывает сообщение, если оно доступно, и иначе делает другую работу до следующей проверки.

Мы в этом примере использовали recv для простоты; у нас нет другой работы для основного потока, кроме как ждать сообщения, так что подойдет блокировка основного потока на ожидании.

Когда мы запустим код листинга 16-8, то увидим значение, печатаемое из основного потока main:

Got: hi

Каналы и передача владения. Правила владения играют жизненно важную роль в отправке сообщений, потому что помогают вам писать безопасный многопоточный код. Предотвращение ошибок параллельного программирование получается наследство соблюдения прав владения в ваших программах Rust. Давайте проведем эксперимент, чтобы показать совместную работу каналов и системы владения в контексте предотвращения проблем: мы попытаемся использовать значение val в порожденном потоке после того, как послали его по каналу. Попробуйте скомпилировать код в листинг 16-9, чтобы увидеть, что этот код не дозволяется (файл src/main.rs):

use std::sync::mpsc;
use std::thread;

fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); println!("val is {val}"); });
let received = rx.recv().unwrap(); println!("Got: {received}"); }

Листинг 16-9. Попытка использовать val после того, как оно было послано по каналу.

Здесь мы пытаемся напечатать значение val после того, как послал его по канал через tx.send. Позволить такое плохая идея: как только значение было послано другому потоку, этот поток мог бы модифицировать val или выбросить до того, как мы попытаемся использовать его значение снова. Потенциально изменения другого потока могут привести к ошибкам или неожиданным результатам из-за несогласованных или несуществующих данных. Таким образом, Rust показывает нам ошибку, если мы попытаемся скомпилировать код в листинге 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:26
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does
   |                 not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                          ^^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which
     comes from the expansion of the macro `println` (in Nightly builds, run
     with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`. error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

Наша ошибка многопоточности привела к ошибке компиляции. Функция send берет во владение свой параметр, и когда значение перемещается, приемник принимает над ним владение. Это предотвращает нас от случайного использования этого значения повторно после его отправки; система владения проверяет, что все ли в порядке.

Передача нескольких значений и наблюдение за ожиданием получателя. Код в листинге 16-8 компилируется и запускается, но он не очень ясно показывает нам, что два потока общаются друг с другом через канал. В листинге 16-10 были сделаны некоторые модификации, которые докажут, что код в листинге 16-8 работает многопоточно: порожденный поток будет теперь посылать несколько сообщений, и останавливаться на секунду между каждым сообщением.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() { let (tx, rx) = mpsc::channel();
thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ];
for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } });
for received in rx { println!("Got: {received}"); } }

Листинг 16-10. Отправка несколько сообщений, с паузой между отправками (файл src/main.rs).

На этот раз в порожденном потоке есть вектор строк, которые мы хотим отправить в основной поток. Происходит итерация по этому вектору, когда каждая строка отправляется по отдельности, и вставляется пауза в 1 секунду между отправками вызовом функции thread::sleep.

В основном потоке мы больше не вызываем функцию recv явно: вместо этого мы рассматриваем rx как итератор. Для каждого принятого значения его текст выводится на печать. Когда канал закрывается, итерация заканчивается.

Когда мы запустим код в листинге 16-10, вы должны увидеть следующий вывод с 1-секундной паузой между каждой строкой:

Got: hi
Got: from
Got: the
Got: thread

Поскольку у нас нет кода, который приостанавливает или задерживает цикл for в основном потоке, мы можем сказать, что основной поток ждет приема значений из порожденного потока.

Создание нескольких продюсеров клонированием передатчика. Мы уже упоминал, что mpsc сокращение от multiple producer, single consumer (несколько генераторов, один потребитель). Давайте вставим mpsc для использования расширенного кода в листинге 16-10, чтобы создать несколько потоков, которые все посылают сообщения одному приемнику. Мы можем это сделать клонированием передатчика, как показано в листинге 16-11 (файл src/main.rs):

    // -- вырезано --
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ];
for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } });
thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ];
for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } });
for received in rx { println!("Got: {received}"); }
// -- вырезано --

Листинг 16-11. Отправка нескольких сообщений из нескольких продюсеров (порожденных потоков).

На этот раз перед созданием порожденного потока мы вызываем clone на передатчике. Это даст нам новый передатчик, который мы можем передать новому порожденному потоку. Оригинальный передатчик мы передаем во второй порожденный поток. Это дает нам 2 потока, каждый из которых посылает разные сообщения в приемник (основной поток).

Когда вы запустите этот код, ваш вывод должен выглядеть следующим образом:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

Вы можете видеть появление значений в другом порядке, что зависит от вашей операционной системы. Именно это делает многопоточность интересной, но также и более сложной для использования. Если вы поэкспериментируете с thread::sleep, давая различные значения в разных потоках, то каждый запуск будет происходить еще более недетерминированным образом.

Теперь, когда мы рассмотрели, как работает канал, давайте посмотрим на другой метод многопоточности.

[Многопоточность с общим состоянием]

Передача сообщений хороший способ организации многопоточности, но он не единственный. Другой метод синхронизации между потоками - обеспечить им общие данные, куда они смогут обращаться. Рассмотрим еще раз эту часть слогана из документации по языку Go: "не делайте взаимодействие между потоками через общую память".

Как будет выглядеть коммуникация потоков при совместном использовании памяти? Кроме того, почему бы энтузиастам, передающим сообщения, не использовать совместную память?

В некотором смысле каналы на любом языке программирования похожи на одиночное владение, потому что как только вы передаете значение по каналу, вы больше не должны использовать это значение. Параллелизм с общей памятью похож на множественное владение: несколько потоков могут одновременно обращаться к одной и той же области памяти. Как вы видели в главе 15 [3], где smart-указатели делают возможным множественное владение, это множественное владение может добавлять сложности, потому что разными владельцами нужно управлять. Система типов Rust и правила владения в значительной степени помогают реализовать правильное управление. В качестве примера давайте рассмотрим мьютексы, один из наиболее распространенных примитивов многопоточности для общей памяти.

Использование мьютексов для разрешения единовременно доступа к данным из одного потока. Mutex это аббревиатура для mutual exclusion (взаимное исключение), при этом мьютекс позволяет только одному потоку получить доступ к данным в любой момент времени. Чтобы получить доступ к данным в мьютексе, поток должен сначала сигнализировать, что он должен запросив получение блокировки мьютекса (mutex lock). Блокировка (lock) это структура данных, которая является частью мьютекса, и это позволяет отслеживать, кто в настоящий момент имеет эксклюзивный доступ к данным. Поэтому мьютекс описывается как защита данных, которую он обеспечивает с помощью системы блокировки.

Мьютексы имеют репутацию сложных для использования, потому что вам нужно помнить 2 правила:

• Перед использованием данных необходимо получить блокировку (acquire lock).
• Когда вы завершили работу с данными, защищенными мьютексом, вы должны их разблокировать для других потоков, которые также смогут получить блокировку на мьютексе.

Реальную метафору мьютекса могут представить себе дискуссию на конференции, где есть только один микрофон. Прежде чем участник дискуссии (спикер) сможет говорить, он должен спросить, или показать интерес на пользование микрофоном. Когда спикер получил микрофон, он может говорить сколько захочет, а затем передать микрофон следующему спикеру, который хочет говорить. Если спикер забудет передать микрофон, когда перестанет его использовать, то никто не сможет больше говорить. Если управление микрофоном идет не по плану, то дискуссия не получится.

Управление мьютексами может быть невероятно сложным, поэтому многие программисты предпочитают каналы. Однако благодаря системе типов и правилам владения Rust вы не сможете неправильно заблокировать и разблокировать.

API Mutex< T>. Как пример, как использовать мьютекс, давайте начнем использовать мьютекс в однопоточном контексте, как показано в листинге 16-12 (файл src/main.rs):

use std::sync::Mutex;

fn main() { let m = Mutex::new(5);
{ let mut num = m.lock().unwrap(); *num = 6; }
println!("m = {m:?}"); }

Листинг 16-12. Исследование API Mutex< T>, для упрощения в однопоточном контексте.

Как и со многими типами, мы создаем Mutex< T>, используя связанную функцию new. Для доступа к данным внутри мьютекса мы используем метод lock для получения блокировки. Вызов lock будет блокировать текущий поток, так что он не сможет работать, пока не наступит его очередь получить блокировку.

Вызов lock будет неудачным, если запаниковал другой поток, удерживающий блокировку. В этом случае никто никогда не сможет получать блокировку, поэтому мы решили выбрать unwrap и паниковать с этим потоком, если окажемся в такой ситуации.

После того, как мы получили блокировку, можно рассматривать возвращенное значение, в нашем примере num, как мутируемую ссылку на данные внутри. Система типов гарантирует, что мы можем получить блокировку перед использованием значения в m. Тип m это Mutex< i32>, не i32, так что мы должны вызвать call, чтобы можно было использовать значение i32. Мы не можем забыть; система типов не даст нам доступ другим способом к внутреннему i32.

Как вы можете подозревать, Mutex< T> это smart-указатель. Если быть более точным, то вызов lock возвратит smart-указатель MutexGuard, обернутый в LockResult, который мы обработали вызовом unwrap. Smart-указатель MutexGuard реализует трейт Deref для указания на наши внутренние данные; smart-указатель также имеет реализацию трейта Drop, который освободит блокировку автоматически, когда MutexGuard выйдет из области действия, что происходит по окончании внутренней области. В результате у нас нет риска забыть освободить блокировку, и тем самым заблокировать мьютекс от использования в других потоках, потому что освобождение блокировки произойдет автоматически.

После отбрасывания блокировки мы можем напечатать значение мьютекса и увидеть, что у нас получилось поменять внутреннее i32 на 6.

Совместное использование Mutex< T> между несколькими потоками. Теперь давайте попробуем совместно использовать значение между несколькими потоками, используя Mutex< T>. Мы будем прокручивать 10 потоков, и каждый из них будет увеличивать значение counter на 1, так что дойдет от 0 до 10. Следующий пример в листинге 16-13 приведет к ошибке компиляции, и мы будем использовать эту ошибку, чтобы больше узнать про использование Mutex< T>, и как Rust помогает это делать корректным образом.

use std::sync::Mutex;
use std::thread;

fn main() { let counter = Mutex::new(0); let mut handles = vec![];
for _ in 0..10 { let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); }
for handle in handles { handle.join().unwrap(); }
println!("Result: {}", *counter.lock().unwrap()); }

Листинг 16-13. 10 потоков, каждый инкрементирует counter, защищенный Mutex< T> (файл src/main.rs).

Мы создали переменную counter для хранения i32 внутри Mutex< T>, как было сделано в листинге 16-12. Далее мы создали 10 потоков итерацией по диапазону значений чисел. Мы использовали thread::spawn и дали всем этим потокам одно и то же замыкание (closure). Переменная counter перемещается в поток, приобретает блокировку на Mutex< T> вызовом метода lock, и поток добавляет 1 в значение, хранящееся в мьютексе. Когда поток завершает работу своего замыкания, переменная num выходит из области действия, освобождая тем самым блокировку, чтобы другой поток мог эту блокировку взять.

В основном потоке мы собираем все соединенные дескрипторы (находятся в векторе handles) вызовом join (как мы делали в листинге 16-2), чтобы обеспечить завершение всех потоков. В этом месте основной поток получает блокировку, и печатает результат работы этой программы.

Давайте теперь выясним, почему этот код не хочет компилироваться.

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
  --> src/main.rs:21:29
   |
5  |     let counter = Mutex::new(0);
   |         ------- move occurs because `counter` has type `Mutex< i32>`, which
   |         does not implement the `Copy` trait
...
8  |     for _ in 0..10 {
   |     -------------- inside of this loop
9  |         let handle = thread::spawn(move || {
   |                                    ------- value moved into closure here,
   |         in previous iteration of loop
...
21 |     println!("Result: {}", *counter.lock().unwrap());
   |                             ^^^^^^^ value borrowed here after move
   |
help: consider moving the expression out of the loop so it is only moved once
   |
8  ~     let mut value = counter.lock();
9  ~     for _ in 0..10 {
10 |         let handle = thread::spawn(move || {
11 ~             let mut num = value.unwrap();
   |
For more information about this error, try `rustc --explain E0382`. error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

Сообщение об ошибке говорит, что переменная counter была перемещена в предыдущей итерации цикла. Rust говорит нам, что мы не можем переместить владение над counter в несколько потоков. Давайте исправим эту ошибку компилятора методом множественного владения, что мы обсуждали в главе 15 [3].

Множественное владение с несколькими потоками. В главе 15 мы давали значение нескольким владельцам, используя smart-указатель Rc< T>, чтобы создать значение, на котором существует счетчик ссылок на него. Давайте сделаем то же самое и посмотрим, что получится. Мы обернем Mutex< T> в Rc< T> как в листинге 16-14, и клонируем Rc< T> перед передачей владения в поток.

use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() { let counter = Rc::new(Mutex::new(0)); let mut handles = vec![];
for _ in 0..10 { let counter = Rc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); }
for handle in handles { handle.join().unwrap(); }
println!("Result: {}", *counter.lock().unwrap()); }

Листинг 16-14. Попытка использовать Rc< T>, чтобы позволить нескольким потокам владеть Mutex< T> (файл src/main.rs).

И опять при компиляции получаются ошибки, но уже другие. Компилятор учит нас уму-разуму.

$ cargo run
   Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc< Mutex< i32>>` cannot be sent between threads safely
  --> src/main.rs:11:36
   |
11 |           let handle = thread::spawn(move || {
   |                        ------------- ^------
   |                        |             |
   |  ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
   | |                      |
   | |                      required by a bound introduced by this call
12 | |             let mut num = counter.lock().unwrap();
13 | |
14 | |             *num += 1;
15 | |         });
   | |_________^ `Rc< Mutex< i32>>` cannot be sent between threads safely
   |
   = help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented
     for `Rc< Mutex< i32>>`, which is required by `{closure@src/main.rs:11:36: 11:43}: Send`
note: required because it's used within this closure
  --> src/main.rs:11:36
   |
11 |         let handle = thread::spawn(move || {
   |                                    ^^^^^^^
note: required by a bound in `spawn`
  --> /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/thread/mod.rs:691:1
For more information about this error, try `rustc --explain E0277`. error: could not compile `shared-state` (bin "shared-state") due to 1 previous error

Важная часть этого сообщения: "`Rc< Mutex< i32>>` cannot be sent between threads safely". Компилятор также говорит нам причину, по которой не получается безопасно передать Rc< Mutex< i32>>: трейт "trait `Send` is not implemented for `Rc< Mutex< i32>>`". Мы поговорим про трейт Send в следующей секции: это один из трейтов, который гарантирует, что используемые нами типы с потоками предназначены для работы в условиях многопоточности.

К сожалению, тип Rc< T> не безопасен для совместного используя между потоками. Когда Rc< T> обслуживает счетчик ссылок, он добавляет 1 к счетчику на каждом вызове clone, и вычитает 1 из счетчика, когда каждый клон выбрасывается. Но при этом не используется никакие примитивы многопоточности, чтобы гарантировать, что изменения счетчика не будут прерваны другим потоком. Это может привести к некорректному подсчету ссылок - тонким ошибкам, которые в свою очередь могут привести к утечке памяти, или когда значение будет отброшено перед его использованием. Все-что нам нужно - такой же тип, как и Rc< T>, но такой, который вносит изменения в счетчик ссылок безопасным для многопоточности способом (thread-safe).

Атомарный подсчет ссылок на основе Arc< T>. К счастью, есть тип Arc< T>, который работает как Rc< T>, но при этом его можно использовать безопасно в условиях многопоточности. Название Arc означает atomic Rc, т. е. атомарный Rc, обеспечивающий атомарный доступ к счетчику ссылок тип. Атомарность это дополнительный примитив многопоточности, который мы здесь не рассматриваем (для получения подробностей см. документацию стандартной библиотеки для std::sync::atomic). Пока все, что нужно знать, это то, что атомарные типы это типы наподобие примитивных, но они безопасны для использования в многопоточном контексте.

Затем вы можете задаться вопросом, почему все примитивные типы не являются атомарными и почему типы стандартной библиотеки не реализованы для использования Arc< T> по умолчанию. Причина в том, что безопасность для использования в потоках не бесплатна, она требует дополнительных накладных расходов в плане производительности, так что это следует использовать только при реальной необходимости. Если вы просто выполняете операции в пределах одного потока, то ваш код будет работать быстрее, если вам не требуются гарантии атомарности, предоставляемые atomic-типами.

Давайте вернемся к нашему примеру: у типов Arc< T> и Rc< T> одинаковый API, так что мы можем исправить программу изменением только пары строк, где вызывается new, и вызывается clone. Код в листинг 16-15 будет наконец компилироваться и запускаться (файл src/main.rs):

use std::sync::{Arc, Mutex};
use std::thread;

fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![];
for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); }
for handle in handles { handle.join().unwrap(); }
println!("Result: {}", *counter.lock().unwrap()); }

Листинг 16-15. Использование типа Arc< T> для обертки вокруг Mutex< T>, чтобы можно было реализовать общее владение между несколькими потоками.

Этот код напечатает следующее:

Result: 10

Мы сделали это! Подсчитали от 0 до 10, что возможно не очень выразительно, но тем самым мы научились немного Mutex< T> и безопасности потоков (thread safety). Вы также можете использовать структуру этой программы для более сложных операций, чем просто увеличение счетчика. С использованием этой стратегии вы можете разделить вычисления на отдельные независимые части, распределить эти части на разные потоки, и затем использовать Mutex< T>, чтобы каждый поток обновлял конечный результат своей частью.

Обратите внимание, что если вы делаете простые числовые операции, то существуют типы проще, чем Mutex< T>, предоставляемые в модуле std::sync::atomic стандартной библиотеки. Эти типы предоставляют безопасный, конкурентный, атомарный доступ к примитивным типам. Мы в этом примере выбрали использование Mutex< T> с примитивным типом, чтобы сконцентрироваться на том, как работает Mutex< T>.

Сходство между RefCell< T>/Rc< T> и Mutex< T>/Arc< T>. Вы могли бы заметить, что переменная counter не мутируемая, но мы могли бы получить мутируемую ссылку на его внутреннее значение; это означает, что Mutex< T> предоставляет внутреннюю мутируемость (interior mutability), как это делает семейство Cell. Таким же способом мы использовали RefCell< T> в главе 15 [3], чтобы мутировать содержимое внутри Rc< T>, мы используем Mutex< T> для мутирования содержимого внутри Arc< T>.

Другая деталь, которую стоит отметить, заключается в том, что Rust не может защитить вас от всевозможных логических ошибок при использовании Mutex< T>. Вспомним, что в главе 15 использование Rc< T> сопряжено с риском создания цикла ссылок, когда два значения Rc< T> ссылаются друг на друга, приводя к утечке памяти. Подобным образом использование Mutex< T> связано с риском создания взаимных блокировок (deadlocks). Это происходит, когда операция должна заблокировать 2 ресурса, и каждый из 2 потоков захватили по одой блокировке, заставляя ждать друг друга. Если вы интересуетесь deadlock-ами, попробуйте создать программу на Rust, в которой есть deadlock; затем исследуйте стратегий устранения deadlock-а для мьютексов в любом языке, как это реализовано в Rust. Документация API стандартной библиотеки для Mutex< T>, и MutexGuard предоставляет полезную информацию.

Мы завершим эту главу, поговорив про характеристики трейтов Send и Sync, м как мы можем их использовать с собственными пользовательскими типами.

[Расширяемый параллелизм с трейтами Sync и Send]

Интересно, что в языке Rust очень мало функционала по многопоточности. Почти все возможности параллелизма, о которых мы говорили до сих пор в этой главе, были частью стандартной библиотеки, а не языка Rust. Ваши опции для обеспечения много поточности не ограничены языком или стандартной библиотекой; вы можете написать свои собственные возможности по многопоточности, или использовать написанное другими программистами.

Однако в язык встроены 2 концепции многопоточности: std::marker трейты Sync и Send.

Sync: возможность передачи владения между потоками. Трейт Send это маркер, который показывает владение значением типа, реализующего трейт Send, тогда этот тип может перемещаться между потоками. Почти каждый тип Rust имеет Send, однако есть несколько исключений, включая Rc< T>: в нем не может быть Send, потому что если вы клонировали значение Rc< T>, и пытаетесь передать владение клоном в другой поток, то оба потока могут обновить счетчик ссылок одновременно. По этой причине Rc< T> реализован для использования в однопоточной ситуации, когда вы не хотите получить накладные расходы поддержки безопасности многопоточности (thread-safe).

Таким образом, система типов Rust и trait bounds [2] гарантируют, что вы никогда не сможете случайно послать значение Rc< T> в условиях многопоточности. Когда мы попытались сделать это в листинге 16-14, то получили ошибку "trait Send is not implemented for Rc< Mutex< i32>>". Когда мы переключились на Arc< T>, в котором реализован трейт Send, код скомпилировался.

Любой тип, полностью составленный из Send-типов, автоматически также помечается как Send-совместимый. Почти все примитивные типы имеют Send, помимо raw-указателей, которые мы обсудим в главе 19.

Sync: возможность доступа из нескольких потоков. Трейт Sync это маркер, который показывает, что существует для типа, реализующего Sync, реализована безопасная возможность обращения для нескольких потоков. Другими словами, любой тип T является Sync, если &T (немутируемая ссылка на T) является Send, т. е. ссылка может быть безопасно отправлена другому потоку. Подобно Send, примитивные типы являются Sync, и типы, составленные полностью из типов Sync, также являются Sync.

Smart-указатель Rc< T> также не Sync по тем же причинам, что и Send. Тип RefCell< T> (про который мы говорили в главе 15 [3]), и семейство относящихся к Cell< T> типов не Sync. Реализация проверки заимствования (borrow checking), которую делает RefCell< T> runtime, не является безопасной для потоков (not thread-safe). Smart-указатель Mutex< T> является Sync, и может использоваться для общего доступа с несколькими потоками, как показано выше в секции "Совместное использование Mutex< T> между несколькими потоками".

Реализация Send и Sync вручную небезопасно. Поскольку типы, построенные из типов с поддержкой трейтов Send и Sync, также автоматически становятся Send и Sync, мы не должны реализовать эти трейты вручную. В качестве маркеров трейтов у них даже нет каких-либо методов для реализации. Они просто полезны для применения инвариантов, связанных с многопоточностью.

Реализация этих трейтов вручную приведет к созданию небезопасного (unsafe) кода Rust. Мы поговорим про небезопасный код Rust в главе 19; пока что важная информация - создание новых многопоточных типов не построенных из частей Send и Sync, требует тщательного анализа для обеспечения гарантий безопасности. Книга Rustonomicon [5] содержит дополнительную информацию об этих гарантиях и о том, как их поддерживать.

[Общие выводы]

Это не последняя глава, которая посвящена многопоточности: проект в главе 20 будет использовать концепции из этой главы в более реалистичной ситуации, чем небольшие примеры, показанные здесь.

Как упоминалось ранее, поскольку в сам язык Rust очень немного посвящено многопоточности, многие соответствующие решения реализованы через крейты (внешние библиотеки). Они развиваются быстрее, чем стандартная библиотека Rust, поэтому обязательно ищите в Интернете сторонние крейты, предназначенные для использования в ситуациях многопоточности.

Стандартная библиотека Rust предоставляет каналы для передачи сообщений между потоками, и типы smart-указателей, такие как Mutex< T> и Arc< T>, которые безопасны в контекстах многопоточности. Система типов и система проверки заимствования (borrow checker) гарантируют, что код, использующий эти решения, не приведет к рейсингу данных или недопустимым ссылкам. Как только вы получили компилируемый код, можете быть уверены, что он будет работать на нескольких потоках без трудно отлаживаемых багов с потоками, с которыми часто сталкиваются пользователи на других языка программирования. Многопоточное программирование больше не является концепцией, которой следует опасаться.

Далее мы поговорим о идиоматических способах моделирования проблем и структурных решениях, которые понадобятся по мере разрастания программы на Rust. Дополнительно мы рассмотрим, как идиомы Rust связаны с тем, что называют объектно-ориентированным программированием (ООП).

[Ссылки]

1. Rust Fearless Concurrency site:rust-lang.org.
2. Rust: итераторы и замыкания.
3. Rust: умные указатели.
4. Rust: обработка ошибок.
5. The Rustonomicon site:rust-lang.org.

 

Добавить комментарий


Защитный код
Обновить

Top of Page