Программирование PC Rust: проект многопоточного web-сервера Thu, November 21 2024  

Поделиться

Нашли опечатку?

Пожалуйста, сообщите об этом - просто выделите ошибочное слово или фразу и нажмите Shift Enter.


Rust: проект многопоточного web-сервера Печать
Добавил(а) microsin   

В этой главе (перевод [1]) мы продемонстрируем некоторые концепции, который были рассмотрены в последних главах обучающего руководства Rust и некоторых более ранних лекциях. В качестве конечного демонстрационного проекта мы создадим веб-сервер, который отобразит в браузере страничку с текстом "hello", как показано на рис. 20-1.

Rust web server fig20 01

Рис. 20-1. Домашняя страничка веб-сервера, написанного на Rust.

Наш веб-сервер мы будем создавать по следующему плану:

1. Вспомним кое-что про протоколы TCP и HTTP.
2. Запустим прослушивание подключений TCP на сокете.
3. Реализуем парсинг некоторых запросов HTTP.
4. Создадим корректный ответ HTTP.
5. Улучшим производительность нашего сервера с помощью пула потоков.

Перед тем, как продолжить, следует упомянуть одну деталь: мы будем использовать не самый лучший метод реализация web-сервера на Rust. В сообществе Rust на крейтах crates.io опубликовано несколько готовых, более полноценных реализаций web-сервера и пула потоков. Однако наше намерение состоит в том, чтобы помочь вам учиться, а не идти легким путем. Поскольку Rust - это язык системного программирования, мы можем выбрать такой уровень абстракции, с которым мы хотим работать, и можем перейти на более низкий уровень, чем это возможно или практично в других языках. Поэтому мы напишем базовый HTTP-сервер и пул потоков вручную, чтобы вы могли изучить общие идеи и методы, лежащие в основе крейтов, которые вы могли бы использовать в будущем.

[Построение однопоточного web-сервера]

Мы начнем с запуска однопоточного веб-сервера, и перед этим немного обсудим протоколы, которые в нем используются: Hypertext Transfer Protocol (HTTP) и Transmission Control Protocol (TCP). Оба этих протокола работают по принципу запрос-ответ (request-response), т. е. клиент инициирует запрос, а сервер ожидает эти запросы и обслуживает их, выдавая клиенту ответы на его запросы. Содержимое этих запросов и ответов определяются протоколами.

TCP это низкоуровневый протокол, который описывает, как информация передается от одного хоста к другому. При этом не имеет значение, какая это информация. HTTP работает поверх TCP, и HTTP определяет формат запросов клиента (например браузера) и ответов web-сервера. Технически возможно использовать HTTP с другими протоколами, но в подавляющем большинстве случаев HTTP отправляет свои данные по TCP. Мы будем работать с необработанными байтами TCP и HTTP запросов и ответов.

Ожидание соединения TCP. Наш web-сервер должен открыть прослушивание сокета в ожидании соединения TCP от клиента, так что это первое, чем мы займемся. Стандартная библиотека предоставляет модуль std::net, который позволяет нам это сделать. Давайте создадим новый проект, как обычно:

$ cargo new hello
     Created binary (application) `hello` project
$ cd hello

Теперь введем код листинг 20-1 в файл src/main.rs. Этот код прослушивает подключение на локальном адресе и порту 127.0.0.1:7878 на предмет получения входящих соединений TCP. Когда код получит входящий поток соединения, он напечатает "Connection established!".

use std::net::TcpListener;

fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() { let stream = stream.unwrap(); println!("Connection established!"); } }

Листинг 20-1. Прослушивание входящих соединений TCP и печать сообщения о том, что получен поток соединения stream (файл src/main.rs).

Используя TcpListener, мы можем прослушивать соединения TCP на адресе 127.0.0.1:7878. В этом адресе перед двоеточием стоит IP-адрес, который присутствует на любом компьютере (так называемый адрес loopback, или localhost), а 7878 это номер прослушиваемого порта. Обычно HTTP работает на порту 80, но мы выбрали здесь порт 7878, чтобы он гарантированно не конфликтовал с другим веб-сервером, который возможно уже работает на вашей машине.

Функция bind в этом сценарии работает наподобие функции new, и она возвращает новый экземпляр TcpListener. Функция названа bind потому, что в терминологии сетей соединение на прослушиваемом порту известно как "привязка" (binding) к порту.

Функция bind возвратит Result< T, E>, который показывает, успешной ли была привязка к порту, или нет. Например, разрешение подключения к порту 80 требует прав администратора (не администраторы могут запустить прослушивания только на портах больше 1023), так что если мы попытаемся реализовать соединение с портом 80 без привилегий администратора, то привязка к порту не сработает. Привязка также не сработает, например, если будут запущены одновременно две программы, прослушивающие один и тот же порт. Поскольку мы пишем базовый сервер только с целью обучения, то здесь мы не будем заботиться обработкой подобных ошибок; вместо этого мы просто вызываем unwrap для остановки программы, если произойдут ошибки.

Метод incoming на TcpListener возвратит итератор, который предоставляет нам последовательность потоков (а именно экземпляров stream типа TcpStream). Один stream представляет открытое соединение между клиентом и серверомr. Соединение это имя полного процесса запроса и ответа, когда клиент подключается к серверу, сервер генерирует ответ и закрывает соединение. Таким образом, мы будем читать из TcpStream, чтобы увидеть, что отправил клиент, а затем запишем наш ответ в поток (stream), чтобы отправить данные обратно клиенту. В целом этот цикл for будет обрабатывать каждое соединение по очереди, и создавать нам серию потоков для обработки.

Сейчас наша обработка заключается в вызове unwrap для остановки программы, если произошла какая-либо ошибка; если ни одной ошибки не было, то программа напечатает сообщение. Мы добавим в следующем листинге дополнительный функционал для случая успеха. Причина, по который метод incoming может вернуть ошибку, может заключаться в том, что мы фактически не выполняем итерацию по соединениям. Вместо этого мы проводим итерации по попыткам соединения. Соединение может быть неудачным по разным причинам, многие из которых специфичны для операционной системы. Например многие операционные системы ограничивают количество одновременно открытых соединений, которые операционная система может поддержать; новые соединения, когда превышено допустимое количество соединений, будут генерировать ошибку, пока открытые соединения не закроются.

Давайте попробуем запустить этот код командой cargo run в терминале, после чего откройте адрес 127.0.0.1:7878 в web-браузере. Браузер должен показать сообщение об ошибке наподобие "Connection reset", потому что наш сервер пока не отправляет обратно никакие данные. Однако в терминале мы увидим несколько сообщений, когда браузер подключается к серверу.

     Running `target/debug/hello`
Connection established!
Connection established!
Connection established!

Иногда в ответ на одно подключение браузера сообщений "Connection established!" может быть несколько; это может происходить из-за того, что браузер делает запрос отображения страницы для других ресурсов, таких как иконки favicon.ico, которая появляется на закладке браузера. Также может быть, что браузер попытается подключиться к серверу несколько раз, потому что сервер не отвечает отправкой данных. Когда stream выходит из области действия, и отбрасывается в конце цикла, соединение закрывается как часть реализации drop. Браузеры иногда обрабатывают закрытые соединения повторными попытками, потому что проблема сети может быть временная.

Помните, что остановить программу можно нажатием комбинации клавиш Ctrl+C, когда вы хотите прекратить работу определенной версии кода. Затем можно перезапустить программу командой cargo run после того, как вы сделаете изменение и запустите новую версию кода.

Чтение содержимого запроса. Чтобы разделить проблемы получения соединения и произведения некоторых действий с соединением, мы создадим новую функцию handle_connection, обрабатывающую соединение. В теле этой функции мы будем считывать данные из TCP stream и печатать их, чтобы увидеть, какие данные посылает браузер. Поменяйте код, как в листинге 20-2.

use std::{
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
};

fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } }

fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let http_request: Vec< _> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect();
println!("Request: {http_request:#?}"); }

Листинг 20-2: Reading from the TcpStream and printing the data (файл src/main.rs).

Мы привели std::io::prelude и std::io::BufReader в область видимости, чтобы получить доступ к трейтам и типам, которые позволяют выполнять с потоком TCP операции чтения и записи. В цикле for функции main вместо печати сообщения, которое говорит нам о создании соединения, мы теперь будем вызывать функцию handle_connection и передавать в неё поток stream.

В функции handle_connection мы создаем новый экземпляр BufReader, который обертывает который обращается к stream через мутируемую ссылку. BufReader добавляет буферизацию для обслуживания вызовов методов std::io::Read трейта.

Мы создали переменную http_request, чтобы собрать строки запроса, которые браузер посылает серверу. Мы показываем, что хотим собрать эти строки в вектор путем добавления аннотации типа Vec< _>.

BufReader реализует трейт std::io::BufRead, который предоставляет метод lines. Метод lines возвратит итератор Result< String, std::io::Error> путем разделения данных stream на порции, когда в потоке встречается байт новой строки. Чтобы получить каждую строку String, мы обрабатываем каждый Result через map и unwrap. Result может быть ошибкой, если данные содержат недопустимые для UTF-8 данные, или если есть проблема с чтением из stream. Опять-таки, реальная программа должна обрабатывать эти ошибки более правильным образом, однако здесь для демонстративного упрощения мы выбрали останову программы в случае ошибки.

Браузер сигнализирует о конце запроса HTTP отправкой двух символов новой строки, поэтому для получения одного запроса из stream мы извлекаем строки, пока получим строку, которая является пустой строкой. Как только мы собрали строки в вектор, мы распечатываем их, используя отладочное форматирование, чтобы изучить инструкции браузера, отправляемые нашему серверу.

Давайте попробуем этот код, для чего запустите программу и снова выполните запрос в браузере по адресу 127.0.0.1:7878. Обратите внимание, что браузер все еще сообщает об ошибке, но программа выведет примерно такие сообщения:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.42s
     Running `target/debug/hello`
Request: [
    "GET / HTTP/1.1",
    "Host: 127.0.0.1:7878",
    "User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:99.0)
        Gecko/20100101 Firefox/99.0",
    "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,
        image/avif,image/webp,*/*;q=0.8",
    "Accept-Language: en-US,en;q=0.5",
    "Accept-Encoding: gzip, deflate, br",
    "DNT: 1",
    "Connection: keep-alive",
    "Upgrade-Insecure-Requests: 1",
    "Sec-Fetch-Dest: document",
    "Sec-Fetch-Mode: navigate",
    "Sec-Fetch-Site: none",
    "Sec-Fetch-User: ?1",
    "Cache-Control: max-age=0",
]

В зависимости от вашего браузера эти сообщения могут несколько отличаться. Теперь, когда мы напечатали данные запроса, мы можем увидеть причину, почему получали несколько соединений в одном запросе браузера, посмотрев путь после GET в первой строке запроса. Если все повторяющиеся соединения запрашивают /, то мы знаем, что браузер пытается извлечь / повторно, потому что он не получает ответа от нашей программы.

Давайте разберем эти данные запроса, чтобы понять, что браузер запрашивает от нашей программы.

Из чего состоит запрос HTTP. Протокол HTTP текстовый, и его запрос оформлен следующим образом:

Method Request-URI HTTP-Version CRLF
headers CRLF
message-body

Первая строка запроса содержит информацию о клиенте, который выполнил запрос. первая часть строки запроса показывает метод запроса, такой как GET или POST, что описывает направление передачи данных между клиентом и сервером. Наш клиент использует запрос GET, что означает, что он запрашивает информацию.

Следующая часть запроса это /, что показывает Uniform Resource Identifier (URI), запрашиваемый клиентом: URI почти, но не совсем, такой же, как Uniform Resource Locator (URL). Различие между URI и URL для наших целей не имеет значения, но спецификация HTTP использует термин URI, так что мы здесь можем просто мысленно заменить URI на URL.

Последняя часть это версия HTTP, которую использует клиент, и затем строка запроса заканчивается последовательностью CRLF. CRLF это сокращение от carriage return и line feed, эти термины пришли со времен печатной машинки, и обозначают байты возврата каретки и перевода строки. Последовательность CRLF также записывают как \r\n, где \r это возврат каретки (байт 0x0D), а \n это перевод строки (байт 0x0A). Последовательность байт CRLF отделяет строку запроса от остальных данных запроса. Обратите внимание, что CRLF печатается, так что мы видим начало новой строки вместо \r\n.

Глядя на данные строки запроса, полученные из запуска нашей программы, мы видим, что GET это метод, / это URI запроса, и HTTP/1.1 версия протокола.

После строки запроса остальные строки являются заголовками, начиная с Host: и далее. Запросы GET не имеют тела.

Попробуйте выполнить запрос из другого браузера, или запросив другой адрес, такой как 127.0.0.1:7878/test, чтобы увидеть, как поменяются данные запроса.

Теперь, когда мы знаем, как выдает запрос браузер, мы отправим данные ему обратно!

Ответ на запрос HTTP. Мы собираемся реализовать отправку данных в ответ на запрос клиента. Ответы должны иметь следующий формат:

HTTP-Version Status-Code Reason-Phrase CRLF
headers CRLF
message-body

Первая строка это строка состояния, которая содержит версию HTTP (HTTP-Version), используемую в ответе, числовой код статуса, оценивающий результат запроса (Status-Code), и текст описывающий код статуса (Reason-Phrase). Далее идут заголовки (headers) и тело сообщения (message-body). Эти 3 части ответа отделены друг от друга последовательностью CRLF.

Вот пример ответа, который использует HTTP версии 1.1, код статуса 200, и OK reason phrase, без заголовков и без тела ответа:

HTTP/1.1 200 OK\r\n\r\n

Код статуса 200 это стандартный код успешного ответа. Это минимальный текст успешного ответа сервера. Давайте запишем его в поток, чтобы сформировать успешный ответ. Из функции мы удалим println!, который печатал данные, и заменим кодом в листинге 20-3.

fn handle_connection(mut stream: TcpStream) {
    let buf_reader = BufReader::new(&mut stream);
    let http_request: Vec< _> = buf_reader
        .lines()
        .map(|result| result.unwrap())
        .take_while(|line| !line.is_empty())
        .collect();
let response = "HTTP/1.1 200 OK\r\n\r\n";
stream.write_all(response.as_bytes()).unwrap(); }

Листинг 20-3. Формирование минимального успешного HTTP-ответа путем записи в поток stream (файл src/main.rs).

Первая новая переменная определяет переменную ответа, которая хранит данные сообщения успеха. Затем мы вызываем as_bytes на  нашем ответе, чтобы преобразовать данные строки в байты. Метод write_all на stream принимает &[u8] и отправляет эти байты напрямую через соединение. Поскольку операция write_all может быть неудачной, мы используем unwrap на любом результате ошибки, как делали раньше. И опять, в реальном приложении здесь должна быть обработка ошибки, а не простое завершение программы.

С этими изменениями запустите код и сделайте запрос в браузере. В терминале больше не будут печататься никакие данные, так что вы не увидите никакой вывод, кроме Cargo. Когда вы загрузите ссылку 127.0.0.1:7878 в веб-браузере, то должны получить пустой экран вместо ошибки. Вы смогли захардкодить прием запроса HTTP и послать на него ответ!

Возврат реального HTML. Давайте реализуем какой-нибудь функционал, больший, чем пустая страничка. Создайте новый файл hello.html в корневой директории вашего проекта, но не в директории src. Вы можете ввести любой HTML, какой захотите; листинг 20-4 показывает один из вариантов.

< !DOCTYPE html>
< html lang="en">
  < head>
    < meta charset="utf-8">
    < title>Hello!< /title>
  < /head>
  < body>
    < h1>Hello!< /h1>
    < p>Hi from Rust< /p>
  < /body>
< /html>

Листинг 20-4. Пример HTML-файла для формирования ответа сервера (файл hello.html).

Это минимальный HTML5 документ, с заголовком и некоторым текстом. Для возврата его сервером, когда принят запрос, мы изменим handle_connection, как показано в листинге 20-5, чтобы прочитать HTML-файл, добавить его в качестве тела ответа, и отправить.

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
};
// -- вырезано --

fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let http_request: Vec< _> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect();
let status_line = "HTTP/1.1 200 OK"; let contents = fs::read_to_string("hello.html").unwrap(); let length = contents.len();
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap(); }

Листинг 20-5. Отправка содержимого файла hello.html в качестве тела ответа (файл src/main.rs).

Мы добавили fs в оператор use, чтобы привести файловую систему стандартной библиотеки в область действия. Код чтения содержимого файла в строк должен быть для вас знакомым; мы использовали это в главе 12 [2], когда читали содержимое файла для проекта ввода/вывода в листинге 12-4.

Далее мы используем format! для добавления содержимого файла в качестве тела успешного ответа. Чтобы обеспечить корректный HTTP-ответ, мы добавили заголовок Content-Length, который установлен в размер нашего тела ответа, т. е. в нашем случае это размер hello.html.

Запустите этот код командой cargo run и запустите запрос 127.0.0.1:7878 в браузере; вы должны увидеть ваш отрисованный HTML!

В настоящий момент мы игнорируем данные запроса в http_request, и просто, безусловно отправляем в ответ на запрос содержимое HTML-файла. Т. е. если вы попытаетесь в браузере сделать любой другой запрос наподобие 127.0.0.1:7878/something-else, то все еще получите тот же самый HTML-ответ. Пока что наш сервер очень ограничен и не делает все то, что выполняют большинство веб-серверов. Мы хотим доработать этот функционал, чтобы наш ответ менялся в зависимости от запроса, с отправкой обратно HTML-файла как реакцию на правильно сформированный запрос к /.

Проверка запроса и селективный ответ на него. Сейчас наш web-сервер возвращает HTML из файла, не обращая внимания на то, что запросил клиент. Давайте добавим проверку, что браузер запрашивает корневой каталог / перед отправкой HTML-файла, и возвратим ошибку, если браузер запрашивает что-то другое. Для этого нам нужно модифицировать обработчик handle_connection, как показано в листинге 20-6. Этот новый код проверяет содержимое полученного запроса на соответствие ожидаемому /, и добавляет блоки if и else для обработки разных запросов по-разному.

// -- вырезано --

fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap();
if request_line == "GET / HTTP/1.1" { let status_line = "HTTP/1.1 200 OK"; let contents = fs::read_to_string("hello.html").unwrap(); let length = contents.len();
let response = format!( "{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}" );
stream.write_all(response.as_bytes()).unwrap(); } else { // Какой-то другой запрос } }

Листинг 20-6. Обработка запросов для / по-другому в отличие от прочих запросов (файл src/main.rs).

Мы будем смотреть только на первую строку HTTP-запроса, поэтому вместо того, чтобы считывать весь запрос в вектор, мы вызываем next, чтобы получить первый элемент из итератора. Первый unwrap заботится об Option, и остановит выполнение, если в итераторе не окажется элементов. Второй unwrap обрабатывает Result, и дает тот же эффект, что и map, добавленный в листинге 20-2.

Далее мы проверяем request_line, чтобы увидеть, что в ней находится GET-запрос для пути /. Если это так, то блок if возвратит содержимое нашего файла HTML.

Если же request_line не равна строке запроса для пути /, то это означает какой-то другой запрос. Давайте добавим код блока else, который будет отвечать на все другие запросы.

Запустите этот код и выполните в браузере запрос 127.0.0.1:7878; вы должны получить HTML в файле hello.html. Если вы сделаете любой другой запрос, например 127.0.0.1:7878/something-else, то получите ошибку соединения, которую вы видели, когда запускали код в листинге 20-1 и листинге 20-2.

Теперь добавьте код листинга 20-7 в блок else, чтобы отправлялся ответ с кодом статуса 404, что сигнализирует браузеру о том, что содержимое для запроса не было найдено. Мы также возвратим некоторый HTML для странички, которая будет отображаться в браузере для конечного пользователя.

    // -- вырезано --
    } else {
        let status_line = "HTTP/1.1 404 NOT FOUND";
        let contents = fs::read_to_string("404.html").unwrap();
        let length = contents.len();
let response = format!( "{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}" );
stream.write_all(response.as_bytes()).unwrap(); }

Листинг 20-7. Ответ кодом статуса 404 и страничкой с описанием ошибки, если было запрошено что-то другое, отличающееся от корневого каталога / сервера (файл src/main.rs).

Здесь в нашем ответе строка состояния имеет код статуса 404 и reason-фразу NOT FOUND. Тело ответа будет в HTML-файле 404.html. Для странички ошибки вам понадобится создать файл 404.html рядом с файлом hello.html; не опасайтесь экспериментировать, создайте любой HTML, основываясь на примере в листинге 20-8.

< !DOCTYPE html>
< html lang="en">
  < head>
    < meta charset="utf-8">
    < title>Hello!< /title>
  < /head>
  < body>
    < h1>Oops!< /h1>
    < p>Sorry, I don't know what you're asking for.< /p>
  < /body>
< /html>

Листинг 20-8. Содержимое примера странички, отправляемой в любом ответе 404 (файл 404.html).

С этими изменениями снова запустите сервер. Запрос 127.0.0.1:7878 должен получать в ответ содержимое hello.html, и любой другой запрос, наподобие 127.0.0.1:7878/foo, должен получать сообщение об ошибке из HTML-файла 404.html.

Начало рефакторинга. Сейчас блоки if и else содержат повторяющийся код: они оба читают файлы и записывают их содержимое в stream. Единственное отличие в них это содержимое строки статуса (переменная status_line) имя отправляемого файла (переменная filename). Давайте сделаем код более лаконичным, переместив эти различия в отдельные строки if и else, которые будут устанавливать эти переменные; затем мы можем безусловно использовать эти переменные для чтения файла и записи ответа в поток. Листинг 20-9 показывает результирующий код после замены больших блоков if и else.

// -- вырезано --

fn handle_connection(mut stream: TcpStream) { // -- вырезано --
let (status_line, filename) = if request_line == "GET / HTTP/1.1" { ("HTTP/1.1 200 OK", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND", "404.html") };
let contents = fs::read_to_string(filename).unwrap(); let length = contents.len();
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap(); }

Листинг 20-9. Рефакторинг блоков if и else, чтобы они содержали только отличия двух случаев обработки (файл src/main.rs).

Теперь блоки if и else возвратят в кортеже только подходящие значения для строки статуса и имени файла; затем мы используем деструктурирование кортежа, чтобы присвоить из него два значения переменным status_line и filename с помощью паттерна, как обсуждалось в главе 18 [3].

Код получился более простой, и он теперь лучше отражает разницу между двумя случаями обработки запроса. Это означает, что у нас есть только одно место для обновления кода, если мы захотим изменить способ чтения файлов и отправки ответов. Поведение кода в листинге 20-9 будет таким же, как в листинге 20-7.

Отлично! Наш простейший web-сервер написан примерно 40 строками кода Rust, и он может обрабатывать запрос корневого каталога сервера отправкой содержимого файла hello.html, и в ответ на все другие запросы посылать ответ 404.

Сейчас наш сервер однопоточный, т. е. одновременно он может обработать запрос только один запрос. Давайте убедимся, что это может быть проблемой, симулируя некоторые медленные запросы. Затем мы исправим наш сервер, чтобы он мог обрабатывать несколько запросов одновременно.

[Переделка однопоточного сервера в многопоточный]

Сейчас наш сервер обрабатывает каждый запрос по очереди. Это значит, что он не будет обрабатывать второе соединение, пока не завершит обработку первого. Если сервер получит еще и еще запросы, то это последовательное выполнение станет все менее и менее оптимальным. Если сервер получит запрос, который занимает много времени для обработки, то последующие запросы должны будут ожидать, пока не завершится долгий запрос, даже если новые запросы могут быть обработаны быстро. Нам это надо исправить, но сначала мы рассмотрим эту проблему в действии.

Симуляция медленного запроса в текущей реализации сервера. Давайте проверим, как запрос с медленной обработкой может повлиять на другие запросы с нашей текущей реализацией сервера. Листинг 20-10 реализует обработку запроса /sleep с симулируемым медленным ответом, который заставит сервер приостановить работу (sleep) на 5 секунд перед тем, как он пошлет ответ.

use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// -- вырезано --

fn handle_connection(mut stream: TcpStream) { // -- вырезано --
let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), };
// -- вырезано -- }

Листинг 20-10. Симуляция медленного ответа на запрос с задержкой 5 секунд (файл src/main.rs).

Мы поменяли оператор if на match, в котором теперь 3 ветки. Нам нужно сделать явное сопоставление слайса request_line с паттерном на совпадение со значениями строковых литералов; оператор match, в отличие от if, не делает автоматическое обращение по ссылкам и разыменование ссылок.

Первая ветка match такая же, как был блок if листинга 20-9. Вторая ветка проверяет на совпадение запроса со /sleep. Когда этот запрос принят, сервер заснет на 5 секунд перед тем, как успешно отрисовать HTML-страничку. Третья ветка такая же, как блок else листинга 20-9.

Теперь вы увидите, насколько примитивен наш сервер: реальные библиотеки будут обрабатывать распознавание множественных запросов гораздо менее многословным способом!

Запустите сервер командой cargo run. Затем откройте два окна браузера: одно для http://127.0.0.1:7878/, и другое для http://127.0.0.1:7878/sleep. Если вы введете / URI несколько раз, как раньше, то получите быстрый ответ. Однако если вы введете запрос /sleep и затем загрузите запрос /, то увидите, что / ждет, пока не закончится задержка sleep 5 секунд.

Существует несколько техник, с помощью которых мы могли бы избежать приостановку обработки из-за медленных запросов; одна из них, которую мы реализуем, это пул потоков (thread pool).

Увеличение пропускной способности сервера с помощью пула потоков. Thread pool это группа порожденных потоков, которые находятся в ожидании и готовности к обработке задачи. Когда программа получает новую задачу, она назначает её одному из потоков пула, и этот поток будет задачу обрабатывать. Остальные потоки в пуле доступны для обработки любых других задач, поступающих при обработке первой задачи. Когда первый поток закончит обработку своей задачи, он возвращается в пул ожидающих потоков, готовый к приему обработки новой задачи. Пул потоков позволит вам обрабатывать соединения параллельно, повышая тем самым пропускную способность сервера.

Мы ограничим количество потоков в пуле, чтобы защититься от атак типа "Отказ в обслуживании" (Denial of Service, DoS); если бы наша программа создавала новый поток для каждого поступившего запроса, то если кто-то запустит одновременно 10 миллионов запросов для нашего сервера, то создаст хаос, при которым будут заняты все ресурсы нашего сервера, и все запросы зависнут.

Вместо того, чтобы порождать неограниченное количество потоков, у нас будет ограниченное количество потоков, ожидающих в пуле. Пул будет поддерживать очередь входящих запросов. Каждый из потоков в пуле будет извлекать запрос из этой очереди, обрабатывать его, и затем запрашивать очередь на получение нового запроса (это так называемая блокировка потока на очереди). С таким дизайном мы можем обработать до N запросов параллельно, где N это количество потоков. Если каждый поток отвечает на долго работающий запрос, то последующие запросы все еще могут резервироваться в очереди, но мы увеличили количество продолжительно выполняющихся запросов, которые мы можем обработать, пока не достигнем количества N потоков.

Эта техника всего лишь одна из нескольких способов повысить пропускную способность web-сервера. Другие опции могут эксплуатировать модель fork/join, модель однопоточного асинхронного ввода/вывода (single-threaded async I/O), или модель многопоточного асинхронного ввода/вывода (multi-threaded async I/O). Если вас интересует эта тема, то ознакомьтесь с соответствующей документацией по другим решениям и попробуйте их реализовать; с таким низкоуровневым языком программирования, как Rust, возможны все эти опции.

Перед тем, как мы начнем реализовывать пул потоков, немного поговорим про то, как должно выглядеть его использование. Когда вы пытаетесь разработать код, то может помочь предварительное написание клиентского интерфейса. Напишите API кода так, как вы хотели бы его вызывать; затем реализуйте функциональность в разработанной структуре, вместо того, чтобы сначала реализовать функционал, а уже потом прикручивать к нему публичный API.

Подобно тому, как мы использовали разработку на основе тестов в проекте главы 12 [2], здесь мы будем использовать разработку на основе помощи компилятора. Т. е. напишем код, который вызывает нужную нам функцию, и затем посмотрим на сообщения ошибки компилятора, чтобы определить, что надо поменять, чтобы код работал. Однако перед этим мы рассмотрим технику, которую не собираемся использовать в качестве отправной точки.

Порождение потока для каждого запроса. Сначала давайте рассмотрим, как мог бы выглядеть наш код, если бы мы создавали поток для каждого соединения. Как упоминалось ранее, это не наш конечный план из-за возможных проблем потенциального порождения неограниченного количества потоков, однако это стартовая точка для получения сначала рабочего многопоточного сервера. Затем как улучшение мы добавим пул потоков, и различие между двумя этими решениями будет понятнее. Листинг 20-11 показывает изменения функции main, чтобы порождать новый поток для обработки каждого потока stream в цикле for.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() { let stream = stream.unwrap();
thread::spawn(|| { handle_connection(stream); }); } }

Листинг 20-11. Порождение нового потока для каждого stream (файл src/main.rs).

Как вы уже знаете из главы 16 [4], thread::spawn создаст новый поток и запустит код из блока замыкания в новом потоке. Если вы запустите этот код и загрузите запрос /sleep в своем браузере, затем / в двух дополнительных закладках браузера, то увидите, что запросы / больше не ждут, когда завершится обработка /sleep. Однако, как мы уже упоминали, это может иногда перегрузить систему, потому что вы создаете потоки без какого-либо ограничения.

Создание конечного количества потоков. Мы хотим, чтобы наш пул потоков работал аналогичным, привычным способом, поэтому переключение на пул потоков не потребует больших изменений в коде, который использует наш API. Листинг 20-12 показывает гипотетический интерфейс для структуры ThreadPool, который мы хотим использовать вместо thread::spawn.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);
for stream in listener.incoming() { let stream = stream.unwrap();
pool.execute(|| { handle_connection(stream); }); } }

Листинг 20-12. Наш идеальный интерфейс ThreadPool (файл src/main.rs, этот код не скомпилируется).

Мы используем ThreadPool::new, чтобы создать новый пул потоков с конфигурируемым количеством потоков, в нашем случае 4. Затем в цикле for вызов pool.execute имеет аналогичный интерфейс, как у thread::spawn, в котором на входе замыкание, которое пул должен запускать в каждом потоке. Нам нужно реализовать pool.execute таким образом, чтобы он принимал замыкание, и передавал его в поток пула для запуска. Этот код пока что не скомпилируется, но мы попробуем его скомпилировать, чтобы получить указания компилятора, что нужно сделать для исправления ошибки.

Сборка ThreadPool под управлением компилятора. Сделайте изменения в src/main.rs согласно листингу 20-12, и затем запустите команду cargo check, чтобы получить указания компилятора, что делать дальше. Вот первая ошибка, которую мы получим:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`. error: could not compile `hello` (bin "hello") due to 1 previous error

Отлично! Эта ошибка говорит нам, что нам нужен тип ThreadPool или модуль, так что мы его сейчас создадим. Наша реализация ThreadPool не будет зависеть от того, какую работу выполняет наш веб-сервер. Итак, давайте поменяем крейт hello с двоичного на библиотечный, чтобы в нем была реализация нашего ThreadPool. После этого мы сможем использовать отдельную библиотеку пула потоков для любой работы, какую мы хотим выполнять с помощью пула потоков, не только для обработки web-запросов.

Создайте файл src/lib.rs, содержащий следующее, простейшее определение структуры ThreadPool, которое мы можем иметь в настоящий момент (файл src/lib.rs):

pub struct ThreadPool;

Затем отредактируйте файл main.rs, чтобы привести ThreadPool в область действия из библиотечного крейта путем добавления следующей строки в начало src/main.rs:

use hello::ThreadPool;

Этот код все еще не будет работать, давайте его снова проверим на получение информации о следующей ошибке, которую предстоит исправить:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct
             `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found
                                in `ThreadPool`
For more information about this error, try `rustc --explain E0599`. error: could not compile `hello` (bin "hello") due to 1 previous error

Эта ошибка говорит о том, что далее нам необходимо создать связанную функцию new для ThreadPool. Мы также знаем, что new должна иметь один параметр, который может принять 4 в качестве аргумента, и должна возвратить экземпляр ThreadPool. Давайте реализуем простейшую функцию new с такими характеристиками (файл src/lib.rs):

pub struct ThreadPool;

impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } }

Мы выбрали usize в качестве типа для параметра size, потому что мы знаем, что отрицательное количество потоколов не имеет смысла. Мы также знаем, что будем использовать 4 в качестве количества элементов в коллекции потоков, для чего и предназначен тип usize (см. секцию "Целочисленные типы" главы 3 [5].

Еще раз проверим код:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool`
  in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |         -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`. error: could not compile `hello` (bin "hello") due to 1 previous error

Теперь ошибка произошла из-за того, что у нас нет метода execute на ThreadPool. Вспомним из секции "Создание конечного количества потоков", что наш пул потоков должен иметь программный интерфейс, подобный thread::spawn. Кроме того, мы реализуем функцию execute, чтобы она принимала замыкание и передавала его в ожидающий поток на пуле для запуска.

Давайте определим метод execute на ThreadPool, чтобы он принимал замыкание в качестве параметра. Вспомним из секции "Перемещение захваченных значений из замыканий и Fn-трейты" главы 13 [6], что мы можем принять замыкания в качестве параметров с тремя различными трейтами: Fn, FnMut и FnOnce. Нам нужно решить, какой вид замыкания здесь использовать. Мы знаем, что в конечном итоге сделаем что-то похожее на стандартную реализацию библиотеки thread::spawn, поэтому мы можем посмотреть на trait bound параметра в сигнатуре thread::spawn. Документация показывает нам следующее:

pub fn spawn< F, T>(f: F) -> JoinHandle< T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Параметр типа F это то, что нас здесь интересует; параметр типа T связан с возвращаемым значением, и нас это не касается. Мы можем видеть, что spawn использует FnOnce как trait bound на F. Это вероятно то, что мы также хотим, потому что в конечном итоге передаем аргумент, который хотим выполнить для spawn. Мы также можем быть уверены, что FnOnce это трейт, который мы хотим использовать, потому что поток для запуска запроса будет выполнять замыкание запроса однократно, что соответствует Once в FnOnce.

Параметр типа F также имеет trait bound Send и lifetime bound 'static, которые полезны в нашей ситуации: нам нужен Send для перемещения замыкания из одного потока в другой, и 'static, потому что мы не знаем, сколько потребуется времени для выполнения потока. Давайте создадим метод execute на ThreadPool, который будет принимать generic-параметр типа F с этими bounds (файл src/lib.rs):

impl ThreadPool {
    // -- вырезано --
    pub fn execute< F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

Мы все еще используем () после FnOnce, потому что этот FnOnce представляет замыкание, которое не принимает параметров, и возвращает unit-тип (). Как и в определениях функций, возвращаемый тип может быть опущен из сигнатуры, однако даже если у нас нет параметров, скобки все равно нужны.

И снова, здесь пока что простейшая реализация метода execute: она ничего не делает, но мы проверим, скомпилируется ли этот код:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s

Код скомпилировался! Однако имейте в виду, что если вы попробуете выполнить cargo run и сделаете запрос в браузере, то браузер покажет ошибку, которую мы видели в начале этой главы. Наша библиотека пока не запускает замыкание, переданное в execute!

Вероятно вы слышали поговорку, связанные со строгими компиляторами, такими как Haskell и Rust: "если код компилируется, то он работает". Однако это высказывание не универсально верное. Наш проект компилируется, но он абсолютно ничего не делает! Если бы мы строили реальный, полный проект, то это был бы хороший момент для написания unit-тестов, чтобы проверить, что скомпилированный код делает именно то, что мы хотели.

Проверка в new количества потоков. Мы ничего не делаем с параметрами для new и execute. Давайте реализуем тела этих функций с поведением, которое нам нужно. Сначала подумаем над new. Ранее мы выбрали unsigned-тип для параметра size, потому что пул с отрицательным количеством потоков не имеет смысла. Однако не имеет также смысл пул с нулевым количеством потоков, тем не менее 0 это абсолютно допустимое значение для типа usize. Мы добавим код, который проверит, что параметр size больше нуля, перед тем как возвратим экземпляр ThreadPool, и будем вызывать панику программы, если new получит нулевой параметр, используя макрос assert!, как показано в листинге 20-13.

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
ThreadPool }
// -- вырезано -- }

Листинг 20-13. Реализация ThreadPool::new для panic, если size равен 0 (файл src/lib.rs).

Мы также добавили некоторую документацию для нашего ThreadPool с помощью синтаксиса doc comments. Обратите внимание, что мы следовали хорошей практике оформления документации, когда описываются ситуации, в которых наша функция может вызвать панику, что обсуждалось в главе 14 [7]. Попробуйте запустить cargo doc --open и кликните на структуру ThreadPool, чтобы увидеть, как выглядит сгенерированная документация.

Вместо добавления макроса assert!, как сделано здесь, мы могли бы поменять new на build и возвратить Result как мы делали с Config::build в проекте I/O, см. листинг 12-9 [2]. Однако для этого случая мы решили, что попытка создания пула потоков вообще без потоков должна быть неустранимой ошибкой. Если вы чувствуете себя уверенным, то попробуйте написать функцию build со следующей сигнатурой, чтобы сравнить её с функцией new:

pub fn build(size: usize) -> Result< ThreadPool, PoolCreationError> {

Выделение памяти для сохранения потоков. Теперь у нас есть способ узнать, что имеется допустимое количество потоков для хранения в пуле, мы можем создать эти потоки и сохранить их в структуре ThreadPool перед возвратом структуры. Но как мы можем "сохранить" поток? Давайте еще раз посмотрим на сигнатуру thread::spawn:

pub fn spawn< F, T>(f: F) -> JoinHandle< T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

Функция spawn возвращает JoinHandle< T>, где T это тип, возвращаемый замыканием. Давайте тоже попробуем использовать JoinHandle и посмотрим, что получится. В нашем случае замыкания, которые мы передаем в пул потоков, будут обрабатывать соединения и возвращать что-то, так что T будет unit-типом ().

Код в листинге 20-14 будет компилироваться, однако пока не создает никакие потоки. Мы изменили определение ThreadPool, чтобы он содержал вектор экземпляров thread::JoinHandle< ()>, инициализировал вектор размером size, настроил цикл for, который будет запускать некоторый код для создания потоков, и вернул содержащий их экземпляр ThreadPool.

use std::thread;

pub struct ThreadPool { threads: Vec< thread::JoinHandle< ()>>, }

impl ThreadPool { // -- вырезано -- pub fn new(size: usize) -> ThreadPool { assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size { // создание неких потоков и сохранение их в вектор }
ThreadPool { threads } } // -- вырезано -- }

Листинг 20-14. Создание вектора для ThreadPool, чтобы в нем хранились потоки (файл src/lib.rs, этот код пока не обеспечивает желаемое поведение).

Мы привели std::thread в область действия библиотечного крейта, потому что используем thread::JoinHandle как тип элементов в вектора в ThreadPool.

Как только получен допустимый размер, наш ThreadPool создает новый вектор, который может хранить size элементов. Функция with_capacity выполняет ту же задачу, что и Vec::new, но с важным отличием: она предварительно выделяет память в векторе. Поскольку мы знаем, что нужно сохранить size элементов в векторе, то это предварительное выделение несколько более эффективно, чем использование Vec::new с самостоятельным изменением размера по мере вставки элементов.

Когда вы снова запустите cargo check, то результат будет успешным.

Структура Worker, отвечающая за отправку кода из ThreadPool в Thread. Мы оставили комментарий в цикле for в листинге 20-14, относящийся к созданию потоков. Здесь мы рассмотрим, как реально создавать потоки. Стандартная библиотека предоставляет thread::spawn как способ для создания потоков, и thread::spawn ожидает на входе некоторый код, который поток должен запустить при его создании. Однако в нашем случае мы хотим создать потоки, чтобы они ждали кода, который мы отправим позже. Стандартная реализация библиотеки не включает ничего подобного, так что нам придется это делать вручную.

Давайте реализуем это поведение путем введения новой структуры данных между ThreadPool и потоками, которая будет обслуживать это новое поведение. Назовем эту структуру Worker, общим термином pooling-реализаций. Worker берет код, который должен быть запущен, и запускает его в потоке Worker. Это можно представить себе как человека, работающего на кухне в ресторане: работник (worker) ждет, когда поступит заказ от клиентов, а затем отвечают за прием этих заказов и их выполнение.

Вместо сохранения вектора экземпляров JoinHandle< ()> в пуле потоков, мы будем сохранять экземпляры структур Worker. Каждый Worker сохранит один экземпляр JoinHandle< ()>. Затем мы реализуем метод на Worker, который берет замыкание с кодом для запуска, и пошлет его в уже запущенный поток для выполнения. Мы также дадим каждому worker идентификатор (id), чтобы мы могли отличать каждого worker в пуле, когда выполняется вывод в лог или отладка.

Здесь приведено описание нового процесса, который будет происходить, когда мы создаем ThreadPool. Мы реализуем код, который отправляет замыкание в поток после того, как мы настроим Worker следующим образом:

1. Определение структуры Worker, которая содержит id и JoinHandle< ()>.
2. Поменяем ThreadPool, чтобы в нем содержался вектор экземпляров Worker.
3. Определим функцию Worker::new, которая принимает номер id и возвратит экземпляр Worker, где хранится id и порожденный поток с пустым замыканием.
4. В ThreadPool::new используем цикл for со счетчиком, чтобы генерировать id, создания нового Worker с этим id, и сохранения worker-а в вектор.

Если вы достаточно продвинулись, попробуйте реализовать все это сами, перед тем как посмотреть на листинг 20-15.

Готовы? Вот листинг 20-15 с одним из вариантов введения вышеописанных модификаций (файл src/lib.rs):

use std::thread;

pub struct ThreadPool { workers: Vec< Worker>, }

impl ThreadPool { // -- вырезано -- pub fn new(size: usize) -> ThreadPool { assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size { workers.push(Worker::new(id)); }
ThreadPool { workers } } // -- вырезано -- }

struct Worker { id: usize, thread: thread::JoinHandle< ()>, }

impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {});
Worker { id, thread } } }

Листинг 20-15. Модификация ThreadPool, чтобы хранить экземпляры Worker вместо непосредственного сохранения потоков.

Мы поменяли имя поля в ThreadPool с threads на workers, потому что теперь оно хранит экземпляры Worker вместо экземпляров JoinHandle< ()>. Мы используем счетчик в цикле for в качестве аргумента для Worker::new, и сохраняем каждый новый Worker в векторе workers.

Внешний код (наподобие нашего сервера в src/main.rs) не должен знать подробности реализации, относящиеся к использованию структуры Worker внутри ThreadPool, так что мы сделали приватной структуру Worker и её функцию new. Функция Worker::new использует id, который мы ей передаем, и сохраняет экземпляр JoinHandle< ()>, который создается путем порождения нового потока с использованием пустого замыкания.

Замечание: если операционная система не может создать поток из-за нехватки системных ресурсов, то вызов thread::spawn приведет к панике. Это приведет к тому, что будет паниковать весь сервер, несмотря на то, что некоторые потоки могли создаться успешно. Для упрощения это поведение допустимо, однако в реальном приложении сервера с реализацией пула потоков было бы лучше использовать std::thread::Builder и его метод spawn, который вместо этого возвращает Result.

Этот код скомпилируется и будет сохранять количество экземпляров Worker, указанное как аргумент для ThreadPool::new. Но мы все еще не обрабатываем замыкание, которое получаем в execute. Давайте дальше рассмотрим, как это сделать.

Отправка запросов в потоки через каналы. Следующая проблема, которую мы будем решать, заключается в том, что замыкание, которое передано в thread::spawn, абсолютно ничего не делает. В настоящий момент мы получаем замыкание, которое хотим выполнять в методе execute. Однако нам нужно предоставить thread::spawn замыкание для запуска, когда создаем каждый Worker при создании ThreadPool.

Мы хотим, чтобы структуры Worker, которые мы только что создали, извлекали код для запуска их очереди, содержащейся в ThreadPool, и отправляли этот код в его поток для запуска.

Каналы, которые мы изучили в главе 16 [4], предоставляют простой способ коммуникаций между потоками, и они для нашего случая отлично подойдут. Мы используем канал для функции как очередь заданий, и метод execute будет посылать задание из ThreadPool в экземпляры Worker, которые будут посылать задание в свой поток. План будет такой:

1. ThreadPool создаст канал и будет удерживать отправителя (sender).
2. Каждый Worker будет удерживать получателя (receiver).
3. Мы создадим новую структуру задания Job, которая будет хранить замыкания, которые мы хотим послать через канал.
4. Метод execute пошлет задание, которое он хочет выполнить, через отправителя.
5. В своем потоке Worker будет зациклен на своем получателе, и выполнять замыкания любых заданий, которые получит.

Начнем с создания канала в ThreadPool::new и сохранении отправителя в экземпляре ThreadPool, как показано в листинге 20-16. Структура Job пока ничего не содержит, но будет элементом типа, который мы посылаем по каналу.

use std::{sync::mpsc, thread};

pub struct ThreadPool { workers: Vec< Worker>, sender: mpsc::Sender< Job>, }

struct Job;

impl ThreadPool { // -- вырезано -- pub fn new(size: usize) -> ThreadPool { assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size { workers.push(Worker::new(id)); }
ThreadPool { workers, sender } } // -- вырезано -- }

Листинг 20-16. Модификация ThreadPool для сохранения sender канала, по которому передаются экземпляры задания Job (файл src/lib.rs).

В ThreadPool::new мы создаем наш новый канал и заставляем пул удерживать sender. Это успешно скомпилируется.

Давайте попробуем передать получателя (receiver) каждому worker, когда пул потоков создает канал. Мы знаем, что хотим использовать receiver в потоке, который порождают worker-ы, поэтому мы будем ссылаться на параметр receiver в замыкании. Код в листинге 20-17 пока что не скомпилируется.

impl ThreadPool {
    // -- вырезано --
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size { workers.push(Worker::new(id, receiver)); }
ThreadPool { workers, sender } } // -- вырезано -- }

// -- вырезано --

impl Worker { fn new(id: usize, receiver: mpsc::Receiver< Job>) -> Worker { let thread = thread::spawn(|| { receiver; });
Worker { id, thread } } }

Листинг 20-17. Передача receiver в worker-ы (файл src/lib.rs, этот код не скомпилируется).

Мы сделали некоторые небольшие и очевидные изменения: передали receiver в Worker::new, и затем используем это в замыкании.

Когда мы попробуем проверить этот код, то получим ошибку:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type
   |                               `std::sync::mpsc::Receiver< Job>`,
   |                               which does not implement the `Copy` trait
...
25 |         for id in 0..size {
   |         ----------------- inside of this loop
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here,
   |             in previous iteration of loop
   |
note: consider changing this parameter type in method `new` to borrow
      instead if owning the value isn't necessary
  --> src/lib.rs:47:33
   |
47 |     fn new(id: usize, receiver: mpsc::Receiver< Job>) -> Worker {
   |        --- in this method       ^^^^^^^^^^^^^^^^^^^ this parameter
            takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
   |
25 ~         let mut value = Worker::new(id, receiver);
26 ~         for id in 0..size {
27 ~             workers.push(value);
   |
For more information about this error, try `rustc --explain E0382`. error: could not compile `hello` (lib) due to 1 previous error

Код пытается передать receiver в несколько экземпляров Worker. Это не сработает по причине, которая описана в главе 16 [4]: реализация канала, которую предоставляет Rust, подразумевает нескольких генераторов (producer) и одного потребителя (consumer). Это означает, что мы не можем просто клонировать потребляющий конец канала, чтобы исправить этот код. Мы также не хотим отправлять сообщение несколько раз нескольким потребителям; нам нужен один список сообщений с несколькими worker-ами, чтобы каждое сообщение обрабатывалось один раз.

Кроме того, снятие задачи с очереди канала включает в себя мутацию receiver, поэтому потокам нужен безопасный способ совместного использования и модификации receiver; иначе мы можем получить условия гонки (race conditions, что рассматривалось в главе 16 [4]).

Вспомним потоко-безопасные умные указатели (thread-safe smart pointers), которые обсуждались в главе 16 [4]: чтобы совместно использовать несколько потоков (share ownership) и позволить потокам мутировать значение, нам нужно использовать Arc< Mutex< T>>. Тип Arc позволит нескольким worker-ам владеть receiver, и Mutex гарантирует, что только один поток получит задание job из receiver в любой момент времени. Листинг 20-18 показывает изменения, которые нам нужно сделать.

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
// -- вырезано --

impl ThreadPool { // -- вырезано -- pub fn new(size: usize) -> ThreadPool { assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); }
ThreadPool { workers, sender } }
// -- вырезано -- }

// -- вырезано --

impl Worker { fn new(id: usize, receiver: Arc< Mutex< mpsc::Receiver< Job>>>) -> Worker { // -- вырезано -- } }

Листинг 20-18. Совместное использование receiver между worker-ами с помощью Arc и Mutex (файл src/lib.rs).

В ThreadPool::new, мы помещаем receiver в Arc и Mutex. Для каждого нового worker мы клонируем we clone the Arc, чтобы увеличить счетчик ссылок, благодаря чему worker-ы могут совместно владеть receiver.

Реализация метода Method. Давайте наконец-то реализуем метод execute на ThreadPool. Мы также поменяем Job со структуры на тип алиаса для объекта трейта, который хранит тип замыкания, получаемый execute. Как обсуждалось в секции "Создание синонимов типов с алиасами типов" главы 19 [8], тип алиасов позволит нам сделать длинные типы короче, чтобы их было проще использовать. Взгляните на листинг 20-19.

// -- вырезано --

type Job = Box< dyn FnOnce() + Send + 'static>;

impl ThreadPool { // -- вырезано --
pub fn execute< F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } }

// -- вырезано --

Листинг 20-19. Создание алиаса типа Job для Box, который содержит каждое замыкание и затем посылает задание job через канал (файл src/lib.rs).

После создания нового экземпляра Job с использованием замыкания, которое мы получаем в execute, мы посылаем это задание job на отправляющий конец канала. Мы вызываем unwrap на send для случая, когда отправка оказалась неудачной. Это может произойти, например, если мы остановим выполнение всех наших потоков, т. е. принимающая сторона канала прекратила получать новые сообщения. На данный момент мы не можем остановить выполнение наших потоков: наши потоки продолжат выполнение, пока существует пул. Причина, по которой мы используем unwrap, заключается в том, что мы знаем, что случай сбоя не произойдет, но компилятор этого не знает.

Но это еще не все! В worker наше замыкание, переданное в thread::spawn, все еще только ссылается на принимающий конец канала. Вместо этого нам нужно, чтобы замыкание работало в бесконечном цикле, запрашивая принимающий конец канала для получения задания job и запуска задания при его получении. Давайте сделаем изменения для Worker::new, показанные в листинге 20-20.

// -- вырезано --

impl Worker { fn new(id: usize, receiver: Arc< Mutex< mpsc::Receiver< Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job(); });
Worker { id, thread } } }

Листинг 20-20. Получение и выполнение заданий в потоке worker-а (файл src/lib.rs).

Здесь мы сначала блокируемся на receiver для захвата мьютекса, и затем вызываем unwrap для паники при любой ошибке. Получение блокировки может завершиться неудачей, если мьютекс находится в poisoned-состоянии, которое может произойти, если какой-то другой поток запаниковал при удержании блокировки вместо того, чтобы освободить блокировку. В этой ситуации вызов unwrap даст этому потоку панику как корректное предпринимаемое действие. Не стесняйтесь поменять этот unwrap на expect с сообщением об ошибке, которое будет для вас осмысленным.

Если вы получили блокировку на мьютексе, то вызовем recv для получения Job из канала. Последний unwrap также обработает здесь любые ошибки, которые могут произойти, если поток, удерживающий sender, закрылся, подобно тому, как метод send возвратил Err, если закрылся receiver.

Вызов recv блокирующий, так что если пока нет задания job, текущий поток будет ждать, пока не будет доступно задание. Mutex< T> гарантирует, что только один поток Worker в любой момент времени пытается запросить задание.

Наш пул потоков теперь в рабочем состоянии! Запустите cargo run, и сделайте для сервера несколько запросов:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec< Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default
warning: field is never read: `id` --> src/lib.rs:48:5 | 48 | id: usize, | ^^^^^^^^^
warning: field is never read: `thread` --> src/lib.rs:49:5 | 49 | thread: thread::JoinHandle< ()>, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings Finished dev [unoptimized + debuginfo] target(s) in 1.40s Running `target/debug/hello` Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing.

Это успех! У нас есть пул потоков, которые асинхронно обрабатывают соединения. Никогда не создается больше 4 потоков, так что наша система не окажется перегруженной, если сервер получит слишком много запросов. Если мы получим запрос /sleep, то сервер сможет его обработать одновременно с обработкой поступивших других запросов, запустив их в другом потоке.

Замечание: если вы откроете запрос /sleep одновременно в нескольких окнах браузера, то они могут загружаться по одному 5-секундных интервалах. Некоторые web-браузеры выполняют несколько экземпляров одного и того же запроса последовательно по соображениям кеширования. Это ограничение не вызвано нашим web-сервером.

После того, как вы познакомились с циклом while let в главе 18, у вас может возникнуть вопрос, почему код не был написан так, как показано в листинге 20-21.

// -- вырезано --

impl Worker { fn new(id: usize, receiver: Arc< Mutex< mpsc::Receiver< Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing.");
job(); } });
Worker { id, thread } } }

Листинг 20-21. Альтернативная реализация Worker::new с помощью цикла while let (файл src/lib.rs, этот код пока не предоставляет желаемого поведения).

Этот код скомпилируется и запустится, но приводит к желаемому поведению многопоточности: медленный запрос по-прежнему будет заставлять другие потоки ожидать завершения обработки. Причина довольно тонка: структура Mutex не имеет публичного метода unlock, потому что владение блокировкой основано на времени жизни MutexGuard< T> внутри LockResult< MutexGuard< T>>, возвращаемого методом lock. Во время компиляции система проверки заимствования (borrow checker) может затем применить правило, что ресурс, защищаемый Mutex не может быть доступен, если мы не удерживаем блокировку. Однако эта реализация также может привести к тому, что блокировка будет удерживаться дольше, чем предполагалось, если мы не будем помнить о времени жизни MutexGuard< T>.

Код в листинге 20-20, который использует let job = receiver.lock().unwrap().recv().unwrap(); работает, потому что с let любые временные значения, используемые в выражении на правой стороне присваивания, немедленно удаляются при завершении оператора let. Однако while let (и if let, и match) не отбрасывают временные значения, пока не закончится связанный блок. В листинге 20-21 блокировка остается удерживаемой на время вызова job(), т. е. другие worker-ы не смогут получить задания.

[Корректное завершение и очистка]

Код в листинге 20-20 отвечает за асинхронно приходящие запросы с помощью использования пула потоков, так как мы и хотели. Мы получили некоторые предупреждения (warnings) по поводу полей worker, id и thread, которые мы не используем напрямую, что напоминает нам, что мы не делаем никакую очистку. Когда мы используем не очень красивый метод Ctrl+C для завершения потока main, все другие потоки также немедленно останавливаются, даже если они находятся в процесса обработки запроса.

Затем мы реализуем трейт Drop для вызова join на каждом потоке в пуле, так чтобы они могли завершить запросы, над которым работают, перед закрытием. Затем мы реализуем способ указать потокам, что они должны прекратить принимать новые запросы и завершить работу. Чтобы увидеть этот код в действии, мы изменим наш сервер, чтобы он принимал только два запроса, прежде чем красиво закрыть его пул потоков.

Реализация трейта Drop на ThreadPool. Давайте начнем с реализации Drop на нашем пуле потоков. Когда пул отбрасывается, все наши потоки должны объединиться, чтобы они закончили свою работу. Листинг 20-22 показывает первую попытку реализации Drop; этот код пока что не работает.

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap(); } } }

Листинг 20-22. Присоединение к каждому потоку, когда пол потоков выходит из области действия (файл src/lib.rs, этот код не скомпилируется).

Сначала мы в цикле проходим по каждому worker-у в пуле потоков. Мы используем для этого &mut, потому что self мутируемая ссылка, и мы должны также быть в состоянии мутировать worker. Для каждого worker мы печатаем сообщение о том, что конкретный worker завершает работу, а затем вызываем join в потоке этого worker-а. Если вызов join потерпел неудачу, то мы используем unwrap, чтобы Rust вызывал панику и перешел в аварийное завершение программы.

Когда мы скомпилируем этот код, получится ошибка:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle< ()>`,
   |             which does not implement the `Copy` trait
   |
note: `JoinHandle::< T>::join` takes ownership of the receiver `self`,
      which moves `worker.thread`
  --> /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/thread/mod.rs:1718:17
For more information about this error, try `rustc --explain E0507`. error: could not compile `hello` (lib) due to 1 previous error

Ошибка говорит нам, что мы не можем вызвать join, потому что у нас есть только мутируемое заимствование каждого worker, а join принимает во владение свой аргумент. Чтобы решить эту проблему, нам нужно переместить thread наружу из экземпляра Worker, который владеет thread, чтобы join мог потребить thread. Мы сделал это в листинге 17-15 (см. секцию "Запрос ревью поста, меняющий его состояние" главы 17): если Worker вместо этого содержит Option< thread::JoinHandle< ()>>, то мы можем вызывать метод take на Option, чтобы переместить значение наружу из варианта Some, и оставить вариант None на своем месте. Другими словами, работающий Worker будет иметь вариант Some в потоке, и когда мы хотим очистить Worker, мы заменим Some на None, чтобы у Worker не было потока для запуска.

Итак, мы знаем, что хотим обновить определение Worker примерно так (файл src/lib.rs, этот код не скомпилируется):

struct Worker {
    id: usize,
    thread: Option< thread::JoinHandle< ()>>,
}

Давайте теперь снова обратимся к компилятору, чтобы он подсказал, что еще требует изменения. Проверка этого кода покажет нам еще 2 ошибки:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in `Option< JoinHandle< ()>>`
   |
note: the method `join` exists on the type `JoinHandle< ()>`
  --> /rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/thread/mod.rs:1718:5
help: consider using `Option::expect` to unwrap the `JoinHandle< ()>` value, panicking if
   |  the value is an `Option::None`
   |
52 |             worker.thread.expect("REASON").join().unwrap();
   |                          +++++++++++++++++
error[E0308]: mismatched types --> src/lib.rs:72:22 | 72 | Worker { id, thread } | ^^^^^^ expected `Option< JoinHandle< ()>>`, found `JoinHandle< _>` | = note: expected enum `Option< JoinHandle< ()>>` found struct `JoinHandle< _>` help: try wrapping the expression in `Some` | 72 | Worker { id, thread: Some(thread) } | +++++++++++++ +
Some errors have detailed explanations: E0308, E0599. For more information about an error, try `rustc --explain E0308`. error: could not compile `hello` (lib) due to 2 previous errors

Давайте исправим вторую ошибку, которая указывает на код в конце Worker::new;, нам нужно обернуть значение thread в Some, когда создаем новый Worker. Сделайте следующие изменения, чтобы исправить эту ошибку (файл src/lib.rs, этот код не скомпилируется):

impl Worker {
    fn new(id: usize, receiver: Arc< Mutex< mpsc::Receiver< Job>>>) -> Worker {
        // -- вырезано --
Worker { id, thread: Some(thread), } } }

Первая ошибка относится к нашей реализации Drop. Как упоминалось ранее, мы намереваемся вызвать take на значении Option, чтобы переместить thread наружу из worker. Следующие изменения сделают это (файл src/lib.rs, этот код не даст желаемого поведения):

impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }

Как обсуждалось в главе 17 [9], метод take на Option извлекает наружу вариант Some и оставляет None на своем месте. Мы используем if let для деструктурирования Some и получения потока thread; затем мы вызываем join на thread. Если поток worker-а уже None, то мы знаем, что у worker-а уже его thread очищен, и в этом случае ничего не происходит.

Сигнал Threads остановить прослушивание для Jobs. Со всеми сделанными изменениями наш код компилируется без каких-либо warning-ов. Однако плохая новость заключается в том, что этот код все еще не работает так, как мы хотим. Ключом является логика в замыканиях, запускаемая потоками экземпляров Worker: в настоящий момент мы вызываем join, но это не закроет потоки, потому что бесконечный цикл ждет поступления заданий. Если мы попытаемся отбросить наш ThreadPool с нашей текущей реализацией drop, то поток main заблокируется навсегда, ожидая завершения первого потока.

Чтобы исправить эту проблему, нам нужно изменить реализацию drop в ThreadPool, и затем поменять цикл в Worker.

Сначала мы поменяем реализацию drop в ThreadPool для явного выбрасывания sender перед ожиданием завершения работы потоков. Листинг 20-23 показывает изменения для ThreadPool, чтобы явно выбрасывать sender. Мы используем ту же самую технику с Option и take, как мы делали с thread, чтобы можно было переместить sender изнутри ThreadPool (файл src/lib.rs, этот код не даст желаемого поведения):

pub struct ThreadPool {
    workers: Vec< Worker>,
    sender: Option< mpsc::Sender< Job>>,
}
// -- вырезано --

impl ThreadPool { pub fn new(size: usize) -> ThreadPool { // -- вырезано -- ThreadPool { workers, sender: Some(sender), } }
pub fn execute< F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap(); } }

impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take());
for worker in &mut self.workers { println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }

Листинг 20-23. Явное выбрасывание sender перед присоединением потоков worker.

Выбрасывание sender закроет канал, что показывает, что больше не могут передаваться сообщения. Когда это произойдет, все вызовы recv, которые были сделаны в бесконечном цикле, возвратят ошибку. В листинге 20-24 мы поменяли цикл Worker для корректного выхода из цикла для такого случая, т. е. потоки завершатся, когда реализация drop в ThreadPool вызовет на них join.

impl Worker {
    fn new(id: usize, receiver: Arc< Mutex< mpsc::Receiver< Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();
match message { Ok(job) => { println!("Worker {id} got a job; executing.");
job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } });
Worker { id, thread: Some(thread), } } }

Листинг 20-24. Явный выход из цикла, когда recv возвратил ошибку (файл src/lib.rs).

Чтобы увидеть этот код в действии, модифицируйте main, чтобы принять только 2 запроса перед корректным закрытием сервера, как показано в листинге 20-25.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) { let stream = stream.unwrap();
pool.execute(|| { handle_connection(stream); }); }
println!("Shutting down."); }

Листинг 20-25. Остановка сервера после обслуживания двух запросов путем выхода из цикла (файл src/main.rs).

Вы не захотите иметь реальный web-сервер, чтобы он завершался после обработки только двух запросов. Этот код просто демонстрирует, что изящное завершение и очистка работают корректно.

Метод take определен в трейте Iterator, и ограничивает итерацию максимум первыми двумя элементами. ThreadPool выйдет из области действия по окончанию main, и запустится реализация drop.

Запустите сервер командой cargo run, и сделайте три запроса. Третий запрос должен возвратить ошибку, и в вашем терминале вы должны увидеть нечто подобное:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

У вас может быть другой порядок worker-ов в печатаемых сообщениях. Мы можем видеть из сообщений, как этот код работает: worker-ы 0 и 3 получили первые два запроса. Сервер остановил прием сообщений после второго соединения, и реализация Drop на ThreadPool начала выполняться еще до того, как worker 3 начал выполнять свое задание job. Отбрасывание sender отключает все worker-ы и говорит им завершить работу. Каждый worker печатает сообщение, когда отключается, и затем пул потоков вызывает join для ожидания завершения каждого потока worker.

Обратите внимание на один интересный аспект в этом конкретном выполнении: ThreadPool отбросил sender, и перед тем, как любой worker получил ошибку, мы пытаемся сделать join для worker 0. Worker 0 еще не получил ошибку от recv, так что поток main заблокировался в ожидании завершения worker 0. Тем временем worker 3 получил job, и затем все потоки получили ошибку. Когда worker 0 завершился, поток main ждал завершения остальных worker-ов. В этот момент они все вышли из своих циклов и остановились.

Поздравляем! Вы завершили проект, получился web-сервер, который использует пул потоков для асинхронной обработки запросов. Мы можем выполнить корректное завершение работы сервера, которое очищает все потоки в пуле.

Вот полный код сервера, файл src/main.rs:

use hello::ThreadPool;
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, };

fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) { let stream = stream.unwrap();
pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); }

fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), };
let contents = fs::read_to_string(filename).unwrap(); let length = contents.len();
let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap(); }

Файл src/lib.rs:

use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool { workers: Vec< Worker>, sender: Option< mpsc::Sender< Job>>, }

type Job = Box< dyn FnOnce() + Send + 'static>;

impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); }
ThreadPool { workers, sender: Some(sender), } }
pub fn execute< F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap(); } }

impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take());
for worker in &mut self.workers { println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }

struct Worker { id: usize, thread: Option< thread::JoinHandle< ()>>, }

impl Worker { fn new(id: usize, receiver: Arc< Mutex< mpsc::Receiver< Job>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv();
match message { Ok(job) => { println!("Worker {id} got a job; executing.");
job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } });
Worker { id, thread: Some(thread), } } }

Здесь вы можете сделать кое-что еще, если захотите улучшить проект. Вот некоторые идеи:

• Добавьте больше документации для ThreadPool и его публичных методов.
• Добавьте тесты функционала библиотеки (см. [10]).
• Поменяйте вызовы unwrap для более надежной обработки ошибок.
• Используйте ThreadPool для выполнения какой-нибудь задачи, кроме как обработки web-запросов.
• Найдите крейт пула потоков на crates.io, и реализуйте подобный web-сервер, используя этот крейт. Затем сравните его API и надежность с пулом потоков, который мы реализовали.

[Общие выводы]

Теперь вы готовы реализовать свои собственные проекты Rust и помочь с проектами других людей. Имейте в виду, что существует гостеприимное сообщество других пользователей Rust, которые хотели бы помочь вам с любыми проблемами.

[Ссылки]

1. Rust Final Project Building a Multithreaded Web Server site:rust-lang.org.
2. Rust: пример консольной программы ввода/вывода.
3. Rust: паттерны и выражение match.
4. Rust: безопасная многопоточность.
5. Rust: общая концепция программирования.
6. Rust: итераторы и замыкания.
7. Rust: дополнительная информация про Cargo и Crates.io.
8. Rust: продвинутые функции.
9. Rust: объектно-ориентированное программирование.
10. Rust: написание автоматизированных тестов.

 

Добавить комментарий


Защитный код
Обновить

Top of Page