BOOST
BOOST::THREADS. Многопоточное программирование
Создание потока
Пример 1 демонстрирует очень простой пример использования класса boost::thread. Создается новый поток, который просто выводит “Hello World” на std::cout, а основной поток ждет его завершения.
// Пример 1 #include <boost/thread/thread.hpp> #include <iostream> void hello() { std::cout << "Hello world, I'm a thread!" << std::endl; } int main(int argc, char* argv[]) { boost::thread thrd(&hello); thrd.join(); return 0; }
Мьютексы
В примере 2 показано очень простое использование класса boost::mutex. Создаются два новых потока, каждый из них 10 раз выводит свой id и счетчик цикла, а основной поток дожидается их завершения. Объект std::cout является разделяемым ресурсом, поэтому каждый поток использует глобальный мьютекс, гарантирующий, что только один поток в каждый момент времени пытается осуществлять вывод.
// Пример 2 #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <iostream> boost::mutex io_mutex; struct count { count(int id) : id(id) { } void operator()() { for (int i = 0; i < 10; ++i) { boost::mutex::scoped_lock lock(io_mutex); Douglas Schmidt, Michael Stal, Hans Rohnert, and Frank Buschmann. Pattern-Oriented Software Architecture Volume 2 — Patterns for Concurrent and Networked Objects (Wiley, 2000). (Scoped Lock). std::cout << id << ": " << i << std::endl; } } int id; }; int main(int argc, char* argv[]) { boost::thread thrd1(count(1)); boost::thread thrd2(count(2)); thrd1.join(); thrd2.join(); return 0; }
Передача данных потоку требует ручного кодирования функционального объекта. Хотя этот код и тривиален, писать его всякий раз довольно скучно. Есть и более простое решение. Функциональные библиотеки позволяют создать новые функциональные объекты, связывая (bind) другие функциональные объекты с данными, которые при вызове будут им переданы. В примере 3 показано, как при использовании библиотеки Boost.Bind можно упростить код примера 2, отказавшись от ручного кодирования функционального объекта.
// Пример 3 // Эта программа идентична программе // из примера 2, кроме того, что // использует Boost.Bind // при создании потока, // принимающего параметры. #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/bind.hpp> #include <iostream> boost::mutex io_mutex; void count(int id) { for (int i = 0; i < 10; ++i) { boost::mutex::scoped_lock lock(io_mutex); std::cout << id << ": " <<i << std::endl; } } int main(int argc, char* argv[]) { boost::thread thrd1(boost::bind(&count, 1)); boost::thread thrd2(boost::bind(&count, 2)); thrd1.join(); thrd2.join(); return 0; }
Условные переменные
Иногда недостаточно просто установить блок и воспользоваться разделяемым ресурсом. Иногда необходимо, чтобы разделяемый ресурс перед использованием находился в некотором специальном состоянии. Например, поток может пытаться извлечь данные из стека, ожидая, когда в нем они появятся, если в настоящий момент стек пуст. Для реализации такого вида синхронизации мьютекса недостаточно. В этом случае можно использовать другой механизм синхронизации, известный как условная переменная.
Условная переменная всегда используется в связке с мьютексом и разделяемым ресурсом (или ресурсами). Поток первым делом блокирует мьютекс, а затем проверяет, находится ли ресурс в состоянии, допускающем его безопасное использование требуемым образом. Если он не в нужном состоянии, поток вызывает для условной переменной операцию ожидания. Эта операция приводит к разблокированию мьютекса во время ожидания, так что другой поток получает возможность изменить состояние разделяемого ресурса. Она также гарантирует, что при возвращении потока после ожидания мьютекс окажется заблокированным. Когда другой поток изменяет состояние разделяемого ресурса, он должен уведомить потоки, которые могут ждать условную переменную, позволяя им завершить ожидание.
Пример 4 иллюстрирует применение класса boost::condition. Определен класс, реализующий ограниченный буфер — контейнер фиксированного размера с поддержкой ввода/вывода в порядке очереди (FIFO). Этот буфер сделан потокобезопасным благодаря использованию boost::mutex. Операции put и get используют условную переменную, чтобы удостовериться в том, что поток будет ждать, пока буфер не окажется в состоянии, необходимом для завершения операции. Создаются два потока, один помещает в этот буфер сто целых, а другой их же извлекает. Ограниченный буфер в каждый момент времени может содержать только 10 целых, поэтому каждому из потоков приходится периодически ждать другой поток. Для проверки того, что это действительно происходит, операции put и get выводят в std::cout диагностические сообщения. Наконец, основной поток ожидает завершения обоих потоков.
// Пример 4 #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <iostream> const int BUF_SIZE = 10; const int ITERS = 100; boost::mutex io_mutex; class buffer { public: typedef boost::mutex::scoped_lock scoped_lock; buffer() : p(0), c(0), full(0) { } void put(int m) { scoped_lock lock(mutex); if (full == BUF_SIZE) { { boost::mutex::scoped_lock lock(io_mutex); std::cout <<"Buffer is full. Waiting..."<< std::endl; } while (full == BUF_SIZE) cond.wait(lock); } buf[p] = m; p = (p+1) % BUF_SIZE; ++full; cond.notify_one(); } int get() { scoped_lock lk(mutex); if (full == 0) { { boost::mutex::scoped_lock lock(io_mutex); std::cout << "Buffer is empty. Waiting..." << std::endl; } while (full == 0) cond.wait(lk); } int i = buf[c]; c = (c+1) % BUF_SIZE; --full; cond.notify_one(); return i; } private: boost::mutex mutex; boost::condition cond; unsigned int p, c, full; int buf[BUF_SIZE]; }; buffer buf; void writer() { for (int n = 0; n < ITERS; ++n) { { boost::mutex::scoped_lock lock(io_mutex); std::cout << "sending: " << n << std::endl; } buf.put(n); } } void reader() { for (int x = 0; x < ITERS; ++x) { int n = buf.get(); { boost::mutex::scoped_lock lock(io_mutex); std::cout << "received: " << n << std::endl; } } } int main(int argc, char* argv[]) { boost::thread thrd1(&reader); boost::thread thrd2(&writer); thrd1.join(); thrd2.join(); return 0; }
Локальная память потока
Пример 5 иллюстрирует очень простое применение класса boost::thread_specific_ptr. Создаются два потока, в них инициализируется локальная память потока, а затем в цикле 10 раз значение целого, расположенного по адресу «умного» указателя инкрементируется, а результат выводится в std::cout (который синхронизирован с помощью мьютекса, так как является разделяемым ресурсом). Основной поток ожидает завершения этих двух потоков. Вывод в этом примере ясно показывает, что каждый поток оперирует со своим экземпляром данных, хотя оба они используют один и тот же boost::thread_specific_ptr.
// Пример 5 #include <boost/thread/thread.hpp> #include <boost/thread/mutex.hpp> #include <boost/thread/tss.hpp> #include <iostream> boost::mutex io_mutex; boost::thread_specific_ptr<int> ptr; struct count { count(int id) : id(id) { } void operator()() { if (ptr.get() == 0) ptr.reset(new int(0)); for (int i = 0; i < 10; ++i) { (*ptr)++; boost::mutex::scoped_lock lock(io_mutex); std::cout << id << ": " << *ptr << std::endl; } } int id; }; int main(int argc, char* argv[]) { boost::thread thrd1(count(1)); boost::thread thrd2(count(2)); thrd1.join(); thrd2.join(); return 0; }
Однократно вызываемые функции
Остается разобраться с одним вопросом: как сделать функции инициализации (такие как конструкторы) потокобезопасными. Например, когда «глобальный» экземпляр объекта создается как синглетон уровня приложения (единственный существующий в приложении объект данного типа), существует проблема порядка инстанцирования, поэтому используется функция, возвращающая статический экземпляр, которая гарантирует, что при первом к ней обращении этот экземпляр будет создан. Проблема в том, что если несколько потоков одновременно вызовут эту функцию, конструктор для статического объекта также может быть вызван несколько раз, и результаты могут оказаться плачевными.
Решение проблемы в так называемых «однократно вызываемых функциях» (once routine). Такая функция вызывается в приложении только один раз. Если несколько потоков попытаются ее вызвать одновременно, только один из них получит такую возможность, а в это время все остальные потоки будут ждать, пока выполнение функции не завершится. Чтобы гаранти ровать однократное выполнение, такая функция вызывается косвенно через другую функцию, которой передается указатель на исходную и ссылка на специальный флаг, сигнализирующий о факте вызова функции. Этот флаг инициализируется статически, что гарантирует инициализацию в период компиляции, а не в период выполнения. И таким образом не представляет проблемы для многопоточной инициализации. Библиотека Boost.Threads предоставляет возможность однократного вызова функции посредством boost::call_once, а также определяет тип для флага boost::once_flag и специальную макроподстановку, используемую для статической инициализации флага, BOOST_ONCE_INIT.
Пример 6 показывает очень простой пример использования boost::call_once. Глобальное целое статически инициализируется нулем, а экземпляр boost::once_flag статически инициализируется с помощью BOOST_ONCE_INIT. Основной поток запускает два потока, каждый из которых пытается «инициализировать» глобальное целое, вызывая boost::call_once с указателем на функцию, инкрементирующую целое. Затем основной поток дожидается завершения обоих потоков и выводит конечное значение целого в std::cout. Вывод демонстрирует, что функция действительно была вызвана только однажды, так как значение целого – единица.
// Пример 6 #include <boost/thread/thread.hpp> #include <boost/thread/once.hpp> #include <iostream> int i = 0; boost::once_flag flag = BOOST_ONCE_INIT; void init() { ++i; } void thread() { boost::call_once(&init, flag); } int main(int argc, char* argv[]) { boost::thread thrd1(&thread); boost::thread thrd2(&thread); thrd1.join(); thrd2.join(); std::cout << i << std::endl; return 0; }
Инициализация параметров
Есть ещё классная штука - возможность инициализировать параметры, запустив функцию однократно. Например, есть три потока, которые обращаются к переменной х, но сперва её надо инициализировать, причём только один раз, а не всеми тремя потоками сразу. Для этого предусмотрен специальный механизм:
int i; boost::once_flag flag = BOOST_ONCE_FLAG; void init() { i = 0; } void thread() { boost::call_once(&init, flag); // дальнейший код }
Так вот, инициализация произойдёт всего один раз, независимо от числа конкурирующих потоков.
Ожидание внутри потоков
Иногда надо подождать немного в потоке. Для этого есть два разных способа
• boost::this_thread::sleep(params)
• boost::this_thread::yield()
Первый (с установленными параметрами) замораживает поток на указанное время.
Второй действует хитрее - он ненадолго замораживает поток, отдавая тем самым ресурсы другим участникам состязания за них. Эдакая джентльменская уступка.
BOOST::BIND
Предположим, что есть функция:
bool compare(int a, int b) {return a < b;}
для того, чтобы использовать эту функцию в, например, алгоритме sort нам необходимо написать:
std::vector<int> vec(…); std::sort(vec.begin(), vec.end(), compare);
Теперь немного усложним задачу – в зависимости от флага нам необходимо сортировать либо по возрастанию, либо по убыванию. При использовании только STL пришлось бы писать две функции. Накладно. С помощью boost эту задачу решить значительно проще:
bool compare(int a, int b, bool reverseSort) {return reverseSort ? a > b : a < b;} std::vector<int> vec(…); std::sort(vec.begin(), vec.end(), boost::bind(compare, _1, _2, order));
где order – булевская переменная, задающая порядок сортировки. Последняя строчка выглядит несколько странновато, но именно в ней и заключается вся суть. В результате ее компиляции будет получен код, реализующий сортировку массива, использующий для сравнения функцию compare, и передающей ей на вход три аргумента – два числа, которые необходимо сравнить, и способ их сравнения.
Рассмотрим этот код подробнее, записав его так:
int a = 1, b = 2; boost::bind(compare, _1, _2, false)(a, b);
(что-то подобное содержится в недрах функции std::sort). Последняя строчка в этом коде обозначает следующее:
boost::bind – функция, создающая необходимый нам связыватель. Возвращает экземпляр функционального объекта (функтора), для которого применим оператор вызова функции. (compare, - первый аргумент функции определяет имя/указатель на функцию, которая должна быть вызвана в связывателе. В данном случае это функция compare; _1, _2 – плейсхолдеры, определяющие логику передачи параметров из оператора вызова функции в функцию compare. , false) – значение последнего аргумента, передаваемого в compare. (a, b) – оператор вызова функции, производящий связывание аргументов и вызов функции compare. В итоге будет выполнена следующий код:
compare(a, b, false);
Из приведенного примера можно увидеть, что упомянутые выше плейсхолдеры определяют связь между аргументами у оператора вызова функции связывателя и аргументами вызываемой функцией. Причем, количество аргументов у оператора вызова функции связывателя определяется количеством плейсхолдеров:
boost::bind(compare, _1, 4, false)(a, b); // неправильно. boost::bind(compare, _1, 4, false)(a); // правильно boost::bind(compare, _1, _2, false)(a); // неправильно boost::bind(compare, _1, _2, false)(a, b); // правильно
Очевидно, что попытка задать в вызове boost::bind количество параметров, несоответствующее количеству обязательных параметров в вызываемой функции приведет к ошибке. Равно как и попытка указания количества параметров большего, чем количество параметров в вызываемой функции. Действуя по аналогии мы можем записать:
std::find_if(vec.begin(), vec.end(), boost::bind(compare, _1, 4, false)); // поиск первого числа, меньшего 4 std::remove_if(vec.begin(), vec.end(), boost::bind(compare, _1, 4, true)); // удаление из последовательности всех элементов, больших 4 // и т. д.
В случае использования чистого STL для каждого случая пришлось бы искать подходящий функтор… Или писать его. Здесь необходимо обратить внимание на то, что в качестве первого параметра функции bind может выступать не только указатель на функцию, но и другой функциональный объект. Например:
std::find_if(vec.begin(), vec.end(), boost::bind(std::equal_to<int>(), _1, 4));
этот строчка найдет в массиве первый элемент, равный 4.
Использование с указателями на функции члены
Помимо указателей на глобальные функции и функторов, в качестве первого параметра можно указывать указатель на функцию-член какого-то класса. Предположим один из простейших вариантов использования:
class GraphicsObject { public: void Draw(Canvas* canvas, DrawModes mode); }; // Объявляем коллекцию объектов этого класса: std::list<GraphicsObject*> GraphObjects; // А теперь нам нужно отрисовать все объекты коллекции на заданном канвасе std::for_each(GraphObjects.begin(), GraphObjects.end(); ……..);
при использовании чистого STL нам бы пришлось писать свой собственный функтор, вызывающий у переданного объекта метод Draw с заданными параметрами. Но можно воспользоваться boost::bind:
std::for_each(GraphObjects.begin(), GraphObjects.end(), boost::bind(&GraphObject::Draw, _1, canvas, mode));
в результате чего мы получаем требуемый результат. У всех объектов коллекции вызывается метод Draw, которому передаются параметры canvas и mode. При использовании указателя на функцию-член класса необходимо помнить о том, что первым параметром ей должен передаваться указатель на объект, для которого эта функция должна вызываться. В приведенном примере мы указываем, что метод должен вызываться у переданного в bind единственного параметра. Но никто не мешает нам в качестве первого передаваемого параметра указать this, или любой другой указатель. Но, как говориться в современных набивших оскомину рекламах, это еще не все.
Использование с указателем на член данных
Помимо указателей на члены-функции можно использовать указатели на члены данных. Синтаксис подобный и правила использования при этом почти такие же. Возьмем задачу. Есть структура и вектор ее экземпляров:
struct Point { int x; int y; }; std::vector<Point> PointsArray;
Теперь надо удалить из этого массива все элементы, у которых координата x равна нулю. Сделать это просто:
std::remove_if(PointsArray.begin(), PointsArray.end(), boost::bind(std::equal_to<int>(), boost::bind(&Point::x, _1), 0));
Т. е. для каждого элемента массива вызывается вложенный связыватель, который возвращает значение соответствующего элемента структуры, после чего производится его сравнение с нулем. Тут мы видим еще одну возможность связывателей:
Каскадное использование связывателей
Связыватели могут вкладываться друг в друга. Один из вариантов такого вкладывания проиллюстрирован предыдущем примером. Единственное, что в этом случае надо «держать в голове» - это то, что плейсхолдеры, вне зависимости от того, в каком bind'ере (по уровню вложенности) они находятся, «адресуют» параметры самого внешго binder'а. Усложним предыдущий пример. Нужно выбросить все точки, имеющие нулевые координаты:
std::remove_if( PointsArray.begin(), PointsArray.end(), boost::bind( std::logical_and(), boost::bind(std::equal_to<int>(), boost::bind(&Point::x, _1), 0), boost::bind(std::equal_to<int>(), boost::bind(&Point::y, _1), 0) ) ));
Это хотя и работает так, как хочется, но выглядит слишком наворочено. По этому начиная с версии 1.33 в boost::bind появилась новая возможность -
Перегруженные операторы
Для упрощения приведенных выше многоэтажных конструкций для boost::bind (начиная с версии 1.33 boost'а) перегружены следующие операторы: !, ==, !=, <, ?, > и >=. Таким образом, приведенное выше выражение упрощается до:
std::remove_if( PointsArray.begin(), PointsArray.end(), boost::bind( boost::bind(&Point::x, _1) == 0 && boost::bind(&Point::y, _1) == 0) ));
Использование ссылок
Одна из неприятностей заключается в том, что если связываемая функция принимает какой-то из своих аргументов по ссылке, то с использованием boost::bind его (напрямую) передать нельзя. Т. е. например, в следующем случае:
void foo(int& a, int& b); int a = 1, b = 2; boost::bind(foo, a, b);
аргументы a и b по ссылке переданы не будут. Для того, чтобы действительно передать ссылки, необходимо делать такой вызов:
boost::bind(foo, boost::ref(a), boost::ref(b));
В этом случае foo, вызываемый из связывателя, будет действительно работать со ссылками на соответствующие переменные.
Пример использования.
#include <vector> #include <boost/bind.hpp> #include <boost/function.hpp> class Test { public: void f_1() { std::cout << "void f()" << std::endl; } void f_2(int i) { std::cout << "void f_2(): " << i << std::endl; } void f_3(const int &i) { std::cout << "void f_2(): " << i << std::endl; } void two_params(int a, int b) { std::cout << "void two_params(int a, int b): " << a << ", " << b << std::endl; } }; class Test2 { public: void do_stuff(const std::vector<int>& v) { std::copy(v.begin(), v.end(), std::ostream_iterator<int>(std::cout, " ")); } }; void test(const std::string &a) { std::cout << "void test(const std::string &a). a = " << a << std::endl; } void prn(boost::function<void(int)> fn, int a) { fn(a); } int _tmain(int argc, _TCHAR* argv[]) { ////////////////////// bind ////////////////////// std::cout << "boost::bind test" << std::endl; Test a; boost::bind(&Test::f_1, &a)(); boost::bind(&Test::f_2, &a, 1)(); int test_int(2); boost::bind(&Test::f_2, &a, _1)(test_int); boost::bind(&Test::f_3, &a, 3)(); int test_int_2(4); boost::bind(&Test::f_3, &a, _1)(test_int_2); int one(100), two(200); boost::bind(&Test::two_params, &a, _1, _2)(one, two); boost::bind(&Test::two_params, &a, _2, _1)(one, two); std::string test_str("Hi there."); boost::bind(&test, test_str)(); boost::bind(&test, _1)(test_str); std::cout << std::endl; ////////////////////// function ////////////////////// std::cout << "boost::function test" << std::endl; boost::function<void(void)> func; func = boost::bind(&Test::f_1, &a); func(); func = boost::bind(&Test::f_2, &a, 201); func(); func = boost::bind(&Test::f_3, &a, 202); func(); func = boost::bind(&Test::two_params, &a, 203, 204); func(); boost::function<void(int)> func_2; func_2 = boost::bind(&Test::f_2, &a, _1); prn(func_2, 301); func_2 = boost::bind(&Test::f_3, &a, _1); prn(func_2, 302); int i_303(303), i_304(304), i_305(305), i_306(306); func_2 = boost::bind(&Test::two_params, &a, i_303, _1); prn(func_2, 1); func_2 = boost::bind(&Test::two_params, &a, _1, i_304); prn(func_2, 1); Test2 t; std::vector<int> vec; vec.resize(20); std::generate_n(vec.begin(), 20, rand); std::copy(vec.begin(), vec.end(), std::ostream_iterator<int>(std::cout, " ")); //simple_bind(&Test::do_stuff, t, _1)(vec); //boost::bind(&Test2::do_stuff, t, _1)(vec); return 0; }
BOOST::ASIO (::IO_SERVICE)
boost::asio на самом деле, хоть и называется библиотекой асинхронной, но может выполнять и синхронные, и асинхронные операции. Вся библиотека при этом завязана на объекты класса boost::asio::io_service – именно asio::io_service предоставляет программе связь с нативными объектами ввода/вывода. Соответственно, что бы ваша программа могла работать с boost::asio, нужно создать хотя бы один объект этого класса и уже после этого “аттачить” все другие объекты к нему, например вот так:
boost::asio::io_service io_service; boost::asio::ip::tcp::socket socket(io_service);
Подключить только что созданный сокет к серверу тоже не представляет из себя проблемы:
boost::system::error_code ec; // здесь будет записана ошибка, если она была socket.connect(server_endpoint, ec); // server_endpoint задаёт куда подключаемся
После того как мы подключили объекты ввода-вывода (в данном случае socket) к asio::io_service – этот сервис будет осуществлять взаимодействие с операционной системой и получать/отправлять данные.
boost::asio асинхронный
Тот вариант, что мы рассмотрели выше – это синхронное взаимодействие, т.е., в данном случае это значит, что пока происходит подключение – наша программа “зависнет” до того момента, пока подключение не выполнится, либо не произойдёт ошибка. Если же мы хотим что бы работа boost::asio была асинхронной, надо идти немного другим путём, который тоже предусмотрен в библиотеке. Для работы с асинхронными операциями мы должны передать в функцию так называемый completion handler, или, говоря по русски, функцию, которая будет вызвана, когда операция завершится (т.е. либо подключится, либо возникнет ошибка):
void your_completion_handler(const boost::system::error_code& ec) { // тут обрабатываем подключение либо ошибку } boost::asio::io_service io_service; boost::asio::ip::tcp::socket socket(io_service); socket.async_connect(server_endpoint, your_completion_handler);
Все асинхронные функции в boost::asio работают именно по такому принципу. С одной стороны, это довольно удобно для программирования – логика работы каждой из функций отделена от других, с другой стороны это может может показаться Вам немного непривычным.
Использование boost::asio
Приведу короткий и простой пример использования boost::asio , это код примера из самого boost, лишь перевёл комментарии:
#include <iostream> #include <istream> #include <ostream> #include <string> #include <boost/asio.hpp> using boost::asio::ip::tcp; int main(int argc, char* argv[]) { try { if (argc != 3) // если аргументы не заданы или заданы не все { std::cout << "Usage: sync_client <server> <path>\n"; std::cout << "Example:\n"; std::cout << " sync_client www.boost.org /LICENSE_1_0.txt\n"; return 1; } boost::asio::io_service io_service; // основной класс asio // Получаем список конечных точек для указанного сервера tcp::resolver resolver(io_service); tcp::resolver::query query(argv[1], "http"); tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); tcp::resolver::iterator end; // Перебираем эти конечные точки и пробуем подключиться tcp::socket socket(io_service); boost::system::error_code error = boost::asio::error::host_not_found; while (error && endpoint_iterator != end) { socket.close(); socket.connect(*endpoint_iterator++, error); } if (error) // подключиться не удалось throw boost::system::system_error(error); // Формируем запрос к веб-серверу. Указываем "Connection: close" что бы // сервер закрыл соединение как только отправит нам данные. Это // позволит нам узнать что отправка завершенна как только возникнет EOF boost::asio::streambuf request; std::ostream request_stream(&request); request_stream << "GET " << argv[2] << " HTTP/1.0\r\n"; request_stream << "Host: " << argv[1] << "\r\n"; request_stream << "Accept: */*\r\n"; request_stream << "Connection: close\r\n\r\n"; // Отправляем сформированный запрос веб-серверу. boost::asio::write(socket, request); // Читаем ответ сервер. streambuf response примет все данные // он сам будет увеличиваться по мере поступления данных от сервера. boost::asio::streambuf response; boost::asio::read_until(socket, response, "\r\n"); // Проверяем что бы не было ошибок. std::istream response_stream(&response); std::string http_version; response_stream >> http_version; unsigned int status_code; response_stream >> status_code; std::string status_message; std::getline(response_stream, status_message); if (!response_stream || http_version.substr(0, 5) != "HTTP/") // ошибка { std::cout << "Invalid response\n"; return 1; } if (status_code != 200) // если код ответа не 200, то это тоже ошибка { std::cout << "Response returned with status code " << status_code << "\n"; return 1; } // Читаем ответ. Он закончится пустой строкой. boost::asio::read_until(socket, response, "\r\n\r\n"); // Парсим заголовки std::string header; while (std::getline(response_stream, header) && header != "\r") std::cout << header << "\n"; std::cout << "\n"; // Выводим в лог if (response.size() > 0) std::cout << &response; // Теперь читаем до конца while (boost::asio::read(socket, response, boost::asio::transfer_at_least(1), error)) std::cout << &response; if (error != boost::asio::error::eof) // ошибка throw boost::system::system_error(error); } catch (std::exception& e) // возникло исключение { std::cout << "Exception: " << e.what() << "\n"; } return 0; }
Вот эти 100 строк кода работают как HTTP-клиент и позволяют нам читать страницы с веб-серверов.
Асинхронное программирование
Этот раздел глубоко разбирает некоторые вопросы, с которыми вы столкнетесь при работе с асинхронным программированием.
Необходимость работать асинхронно
как правило, синхронное программирование гораздо проще, чем асинхронное. Потому что гораздо легче думать линейно (вызываем функцию А, после ее окончания вызываем ее обработчик, вызываем функцию В, после ее окончания вызываем ее обработчик и так далее, так что можно думать в манере событие-обработчик). В последнем случае вы можете иметь, скажем, пять событий и вы никогда не сможете узнать порядок, в котором они выполняются, и вы даже не будете знать выполнятся ли они все! Но даже при том, что асинхронное программирование сложнее вы, скорее всего, предпочтете его, скажем, в написании серверов, которые должны иметь дело с большим количеством клиентов одновременно. Чем больше клиентов у вас есть, тем легче асинхронное программирование по сравнению с синхронным. Скажем, у вас есть приложение, которое одновременно имеет дело с 1000 клиентами, каждое сообщение от клиента серверу и от сервера клиенту заканчивается символом ‘\n’. Синхронный код, 1 поток:
using namespace boost::asio; struct client { ip::tcp::socket sock; char buff[1024]; // each msg is at maximum this size int already_read; // how much have we already read? }; std::vector<client> clients; void handle_clients() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) on_read(clients[i]); } void on_read(client & c) { int to_read = std::min( 1024 - c.already_read, c.sock. available()); c.sock.read_some( buffer(c.buff + c.already_read, to_read)); c.already_read += to_read; if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) { int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff; std::string msg(c.buff, c.buff + pos); std::copy(c.buff + pos, c.buff + 1024, c.buff); c.already_read -= pos; on_read_msg(c, msg); } } void on_read_msg(client & c, const std::string & msg) { // analyze message, and write back if ( msg == "request_login") c.sock.write( "request_ok\n"); else if ... }
Одна вещь, которую вы хотите избежать при написании серверов (да и в основном любого сетевого приложения) это чтобы код перестал отвечать на запросы. В нашем случае мы хотим, чтобы функция handle_clients()
блокировалась как можно меньше. Если функция заблокируется в какой-либо точке, то все входящие сообщения от клиента будут ждать, когда функция разблокируется и начнет их обработку. Для того чтобы оставаться отзывчивым мы будем читать из сокета только тогда, когда в нем есть данные, то есть if ( clients[i].sock.available() ) on_read(clients[i])
. В on_read мы будем читать только столько, сколько есть в наличии; вызов read_until(c.sock, buffer(...),'\n')
было бы не очень хорошей идеей, так как она блокируется, пока мы не прочитаем сообщение от конкретного клиента до конца (мы никогда не узнаем когда это произойдет).
Узким местом здесь является функция on_read_msg()
; все входящие сообщения будут приостановлены, до тех пор, пока выполняется эта функция. Хорошо-написанная функция on_read_msg()
будет следить, чтобы этого не произошло, но все же это может произойти (иногда запись в сокет может быть заблокирована, например, если заполнен его буфер).
Синхронный код, 10 потоков:
using namespace boost::asio; struct client { // ... same as before bool set_reading() { boost::mutex::scoped_lock lk(cs_); if ( is_reading_) return false; // already reading else { is_reading_ = true; return true; } } void unset_reading() { boost::mutex::scoped_lock lk(cs_); is_reading_ = false; } private: boost::mutex cs_; bool is_reading_; }; std::vector<client> clients; void handle_clients() { for ( int i = 0; i < 10; ++i) boost::thread( handle_clients_thread); } void handle_clients_thread() { while ( true) for ( int i = 0; i < clients.size(); ++i) if ( clients[i].sock.available() ) if ( clients[i].set_reading()) { on_read(clients[i]); clients[i].unset_reading(); } } void on_read(client & c) { // same as before } void on_read_msg(client & c, const std::string & msg) { // same as before }
Для того, чтобы использовать несколько потоков, нам нужно их синхронизировать, что и делают функции set_reading
() и set_unreading()
. Функция set_reading()
является очень важной. Вы хотите, чтобы «проверить можно ли читать и начать читать» выполнялось за один шаг. Если у вас это выполняется за два шага («проверить можно ли читать» и «начать чтение»), то вы можете завести два потока: один для проверки на чтение для какого-либо клиента, другой для вызова функции on_read
для того же клиента, в конечном итоге это может привести к повреждению данных и возможно даже к зависанию системы.
Вы заметите, что код становится все более сложным. Возможен и третий вариант для синхронного кода, а именно иметь по одному потоку на каждого клиента. Но так как число одновременных клиентов растет, то это в значительной степени становится непозволительной операцией. А теперь рассмотрим асинхронные варианты. Мы постоянно делали асинхронной операцию чтения. Когда клиент делает запрос, вызывается операция on_read
, мы отвечаем в ответ, а затем ждем, когда поступит следующий запрос (запускаем еще одну операцию асинхронного чтения).
Асинхронный код, 10 потоков:
using namespace boost::asio; io_service service; struct client { ip::tcp::socket sock; streambuf buff; // reads the answer from the client } std::vector<client> clients; void handle_clients() { for ( int i = 0; i < clients.size(); ++i) async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2)); for ( int i = 0; i < 10; ++i) boost::thread(handle_clients_thread); } void handle_clients_thread() { service.run(); } void on_read(client & c, const error_code & err, size_t read_bytes) { std::istream in(&c.buff); std::string msg; std::getline(in, msg); if ( msg == "request_login") c.sock.async_write( "request_ok\n", on_write); else if ... ... // now, wait for the next read from the same client async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2)); }
Обратите внимание, насколько проще стал код. Структура client имеет только два члена, handle_clients() просто вызывает async_read_until, а затем создает десять потоков, каждый из которых вызывает service.run(). Эти потоки будут обрабатывать все операции асинхронного чтения или записи клиенту. Еще одно нужно отметить, что функция on_read() будет постоянно готовиться к следующей операции асинхронного чтения (смотрите последнюю строку).