August 9, 2022

Туториал по libp2p: создание peer-to-peer приложения на Rust

Всем привет! С вами ArteMm aka Скамушка

В этой статье мы узнаем, что такое libp2p, а также создадим простое p2p приложение. Это перевод/конспект данной статьи с улучшениями и исправлениями ошибок от меня.

Навигация по статье:

1. Введение

2. Первоначальная настройка

3. Что такое libp2p?

4. Как работает libp2p

5. Создание клиента libp2p

6. Обработка ввода в libp2p

7. Отправка сообщений с помощью libp2p

8. Ответы на сообщения с помощью libp2p

9. Тестирование с помощью libp2p

10. Заключение

11. Доп. комментарий и материалы на эту тему

1. Введение

За последние несколько лет на сцене децентрализованного ПО произошло несколько очень интересных событий (даже помимо всей крипты и блокчейнов). Яркими примерами являются IPFS; новая платформа распределенного кодирования Radicle; децентрализованная социальная сеть Scuttlebutt; и многие другие приложения в Fediverse, такие как Mastodon.

В этом руководстве я покажу вам, как создать очень простое p2p приложение с помощью Rust и фантастической библиотеки libp2p, которая существует на разных стадиях зрелости для широкого спектра языков.

Мы собираемся создать приложение для кулинарных рецептов с простым интерфейсом командной строки, которое позволит нам:

  • Создавать рецепты
  • Публиковать рецепты
  • Перечислять локальные рецепты
  • Перечислять другие пиры, которые мы обнаружили в сети
  • Перечислять опубликованные рецепты заданного пира
  • Перечислять все рецепты всех известных нам пиров

Мы сделаем все это примерно в 300 строках Rust. Давайте начнем!

2. Первоначальная настройка

Чтобы следовать этому примеру, все, что вам нужно, — это последняя установка Rust (1.47+).

Сначала создайте новый проект Rust:

cargo new rust-p2p-example
cd rust-p2p-example

Затем отредактируйте файл Cargo.toml и добавьте необходимые зависимости:

[dependencies]
libp2p = { version = "0.39", features = ["tcp-tokio", "mdns"] }
tokio = { version = "1.0", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread", "sync", "fs"] }
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
once_cell = "1.5"
log = "0.4"
pretty_env_logger = "0.4"

Как упоминалось выше, мы будем использовать libp2p для части с p2p сетью. В частности, мы собираемся использовать его вместе с асинхронным рантаймом Tokio. Мы будем использовать Serde для сериализации и десериализации JSON и пару вспомогательных библиотек для логирования и инициализации состояния.

3. Что такое libp2p?

libp2p — это набор протоколов для создания p2p приложений, ориентированных на модульность.

Существуют реализации библиотеки для нескольких языков, таких как JavaScript, Go и Rust. Все эти библиотеки реализуют одни и те же спецификации libp2p, поэтому клиент libp2p, созданный с помощью Go, может беспрепятственно взаимодействовать с другим клиентом, написанным на JavaScript, если они совместимы с точки зрения выбранного стека протоколов. Эти протоколы охватывают широкий диапазон, от базовых сетевых транспортных протоколов до протоколов уровня безопасности и мультиплексирования.

Мы не будем слишком углубляться в детали libp2p в этой статье, но если вам интересно погрузиться глубже, официальная документация libp2p предлагает очень хороший обзор различных концепций, с которыми мы столкнемся на этом пути.

4. Как работает libp2p

Для начала в main.rs импортируем все необходимые компоненты:

use libp2p::{
    core::upgrade,
    floodsub::{Floodsub, FloodsubEvent, Topic},
    futures::StreamExt,
    identity,
    mdns::{Mdns, MdnsEvent},
    mplex,
    noise::{Keypair, NoiseConfig, X25519Spec},
    swarm::{NetworkBehaviourEventProcess, Swarm, SwarmBuilder},
    tcp::TokioTcpConfig,
    NetworkBehaviour, PeerId, Transport,
};
use log::{error, info};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tokio::{fs, io::AsyncBufReadExt, sync::mpsc};

Чтобы увидеть libp2p в действии, давайте запустим наше приложение рецептов. Мы начнем с определения некоторых констант и типов, которые нам понадобятся:

const STORAGE_FILE_PATH: &str = "./recipes.json";

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;

static KEYS: Lazy<identity::Keypair> = Lazy::new(|| identity::Keypair::generate_ed25519());
static PEER_ID: Lazy<PeerId> = Lazy::new(|| PeerId::from(KEYS.public()));
static TOPIC: Lazy<Topic> = Lazy::new(|| Topic::new("recipes"));

Мы будем хранить наши локальные рецепты в простом файле JSON с именем recipes.json, который, по ожиданиям приложения, будет находиться в той же папке, что и исполняемый файл. Мы также определяем вспомогательный тип для Result, который позволяет нам распространять произвольные ошибки.

Затем мы используем once_cell::Lazy для ленивой инициализации нескольких вещей. Прежде всего, мы используем его для генерации пары ключей и так называемого PeerId, полученного из открытого ключа. Мы также создаем Topic, который является еще одной ключевой концепцией libp2p.

Что все это значит? Если коротко, то PeerId — это просто уникальный идентификатор для конкретного пира в рамках всей p2p сети. Мы получаем его из пары ключей, чтобы обеспечить его уникальность. Кроме того, пара ключей позволяет нам безопасно общаться с остальной частью сети, гарантируя, что никто не сможет выдать себя за нас.

С другой стороны, Topic — это концепция из Floodsub, которая является реализацией интерфейса pub/sub для libp2p. Topic — это то, на что мы можем subscribe (подписаться) и отправлять сообщения, например, чтобы прослушивать только часть трафика в pub/sub сети.

Также нам понадобятся некоторые типы для рецепта:

type Recipes = Vec<Recipe>;

#[derive(Debug, Serialize, Deserialize)]
struct Recipe {
    id: usize,
    name: String,
    ingredients: String,
    instructions: String,
    public: bool,
}

И несколько типов для сообщений, которые мы планируем рассылать:

#[derive(Debug, Serialize, Deserialize)]
enum ListMode {
    ALL,
    One(String),
}

#[derive(Debug, Serialize, Deserialize)]
struct ListRequest {
    mode: ListMode,
}

#[derive(Debug, Serialize, Deserialize)]
struct ListResponse {
    mode: ListMode,
    data: Recipes,
    receiver: String,
}

enum EventType {
    Response(ListResponse),
    Input(String),
}

Рецепт довольно прост. У него есть ID, название, некоторые ингредиенты и инструкции по его выполнению. Кроме того, мы добавляем public флаг, чтобы мы могли различать, какими рецептами мы хотим поделиться, а какие оставить для себя.

Как упоминалось в начале, есть два способа получения списков от других пиров: от всех или от одного, что представлено перечислением ListMode.

Типы ListRequest и ListResponse являются просто обертками для этого типа и данных, отправляемых с их помощью.

Перечисление EventType различает ответ от другого пира и ввод от нас самих. Позже мы увидим, почему это различие имеет значение.

5. Создание клиента libp2p

Давайте приступим к написанию main функции для создания peer-to-peer сети.

#[tokio::main]
async fn main() {
    pretty_env_logger::init();

    info!("Peer Id: {}", PEER_ID.clone());
    let (response_sender, mut response_rcv) = mpsc::unbounded_channel();

    let auth_keys = Keypair::<X25519Spec>::new()
        .into_authentic(&KEYS)
        .expect("can create auth keys");

Мы инициализируем логирование и создаем асинхронный channel (канал) для связи между различными частями приложения. Позже мы будем использовать этот канал для отправки ответов от сетевого стека libp2p обратно в наше приложение для их обработки.

Кроме того, мы создаем некоторые ключи аутентификации для криптопротокола Noise, которые мы будем использовать для защиты трафика внутри сети. Для этого мы создаем новую пару ключей и подписываем ее нашими идентификационными ключами с помощью функции into_authentic.

Следующий шаг важен и включает в себя некоторые основные концепции libp2p: создание так называемого Transport.

    let transp = TokioTcpConfig::new()
        .upgrade(upgrade::Version::V1)
        .authenticate(NoiseConfig::xx(auth_keys).into_authenticated()) // XX Handshake паттерн, также существует IX и IK — только XX в настоящее время обеспечивает взаимодействие с другими реализациями libp2p (impls)
        .multiplex(mplex::MplexConfig::new())
        .boxed();

Транспорт — это набор сетевых протоколов, обеспечивающих ориентированную на соединение коммуникацию между пирами. В рамках одного приложения можно использовать несколько транспортов — например, TCP/IP и Websockets, или UDP одновременно для разных вариантов использования.

В этом примере мы будем использовать TCP в качестве основы, используя асинхронный TCP от Tokio. Как только TCP-соединение будет установлено, мы upgrade (обновим) его, чтобы использовать Noise для безопасной связи. Веб-примером этого может быть использование TLS поверх HTTP для создания безопасного соединения.

Мы используем NoiseConfig:xx handshake паттерн, который является одним из трех вариантов, потому что только он гарантирует совместимость с другими приложениями libp2p.

Что хорошо в libp2p, так это то, что мы можем написать Rust-клиент, а другой может написать JavaScript-клиент, и они все равно смогут легко общаться, если протоколы будут реализованы в обеих версиях библиотеки.

В конце мы также мультиплексируем транспорт, что позволяет нам мультиплексировать несколько подпотоков или соединений на одном и том же транспорте.

Довольно много теории! Но все это можно найти в документации libp2p. Это всего лишь один из многих способов создания p2p транспорта.

Следующее понятие — NetworkBehaviour. Это та часть libp2p, которая фактически определяет логику сети и всех пиров — например, что делать с входящими событиями и какие события отправлять.

    let mut behaviour = RecipeBehaviour {
        floodsub: Floodsub::new(PEER_ID.clone()),
        mdns: Mdns::new(Default::default())
            .await
            .expect("can create mdns"),
        response_sender,
    };

    behaviour.floodsub.subscribe(TOPIC.clone());

В этом случае, как упоминалось выше, мы будем использовать протокол FloodSub для работы с событиями. Мы также будем использовать mDNS — протокол для обнаружения других пиров в локальной сети. Мы также поместим сюда sender часть нашего канала, чтобы использовать ее для распространения событий обратно в основную часть приложения.

Тема (topic) FloodSub, которую мы создали ранее, теперь подписана на наше поведение (behaviour), что означает, что мы будем получать события и можем отправлять события по этой теме.

Мы почти закончили с настройкой libp2p. Последняя концепция, которая нам нужна, — это Swarm.

    let mut swarm = SwarmBuilder::new(transp, behaviour, PEER_ID.clone())
        .executor(Box::new(|fut| {
            tokio::spawn(fut);
        }))
        .build();

Swarm управляет соединениями, созданными с помощью транспорта, и выполняет созданное нами сетевое поведение, вызывая и получая события и предоставляя нам способ добраться до них извне.

Мы создаем Swarm с нашим транспортом, поведением и идентификатором пира. Часть executor просто говорит Swarm использовать для внутреннего запуска Tokio рантайм, но здесь мы также можем использовать другие асинхронные среды выполнения.

Осталось только запустить наш Swarm:

    Swarm::listen_on(
        &mut swarm,
        "/ip4/0.0.0.0/tcp/0"
            .parse()
            .expect("can get a local socket"),
    )
    .expect("swarm can be started");

Подобно запуску, например, TCP-сервера, мы просто вызываем listen_on с локальным IP, позволяя ОС выбрать для нас порт. Это запустит Swarm со всеми нашими настройками, но мы еще не определили никакой логики.

Давайте начнем с обработки пользовательского ввода.

6. Обработка ввода в libp2p

Для пользовательского ввода мы будем полагаться на старый добрый STDIN. Итак, перед вызовом Swarm::listen_on мы добавим:

    let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines();

Это определило асинхронный считыватель на STDIN, который читает поток построчно. Таким образом, если мы нажмем Enter, появится новое входящее сообщение.

Следующая часть — это создание нашего цикла событий, который будет слушать события от STDIN, от Swarm и от нашего канала ответа, определенного выше.

    loop {
        let evt = {
            tokio::select! {
                line = stdin.next_line() => Some(EventType::Input(line.expect("can get line").expect("can read line from stdin"))),
                response = response_rcv.recv() => Some(EventType::Response(response.expect("response exists"))),
                event = swarm.select_next_some() => {
                    info!("Unhandled Swarm Event: {:?}", event);
                    None
                },
            }
        };
        // ...
    }
}

Мы используем макрос Tokio select для ожидания нескольких асинхронных процессов, обрабатывая первый из них, который завершится. Мы ничего не делаем с событиями Swarm; они обрабатываются в нашем RecipeBehaviour, который мы рассмотрим позже, но нам все еще нужно вызывать swarm.select_next_some(), чтобы продвигать Swarm вперед.

Давайте добавим некоторую логику обработки событий вместо // …:

        if let Some(event) = evt {
            match event {
                EventType::Response(resp) => {
                   // ...
                }
                EventType::Input(line) => match line.as_str() {
                    "ls p" => handle_list_peers(&mut swarm).await,
                    cmd if cmd.starts_with("ls r") => handle_list_recipes(cmd, &mut swarm).await,
                    cmd if cmd.starts_with("create r") => handle_create_recipe(cmd).await,
                    cmd if cmd.starts_with("publish r") => handle_publish_recipe(cmd).await,
                    _ => error!("unknown command"),
                },
            }
        }

Если есть событие, мы сопоставляем его и смотрим, является ли оно событием Response или Input. Давайте пока рассмотрим только события Input.

Есть несколько вариантов. Мы поддерживаем следующие команды:

  • ls p выводит список всех известных пиров
  • ls r выводит список локальных рецептов
  • ls r {peerId} перечисляет опубликованные рецепты от определенного пира
  • ls r all перечисляет опубликованные рецепты от всех пиров
  • publish r {recipeId} публикует заданный рецепт
  • create r {recipeName}|{recipeIngredients}|{recipeInstructions} создает новый рецепт с заданными данными и увеличивающимся ID

Перечисление всех рецептов от пиров в данном случае означает отправку запроса на рецепты нашим пирам, ожидание их ответа и отображение результатов. В p2p сети это может занять некоторое время, поскольку некоторые пиры могут находиться на другом конце планеты, и мы не знаем, все ли из них вообще ответят нам. Это значительно отличается от отправки запроса, например, на HTTP-сервер.

Давайте сначала посмотрим на логику перечисления пиров:

async fn handle_list_peers(swarm: &mut Swarm<RecipeBehaviour>) {
    info!("Discovered Peers:");
    let nodes = swarm.behaviour().mdns.discovered_nodes();
    let mut unique_peers = HashSet::new();
    for peer in nodes {
        unique_peers.insert(peer);
    }
    unique_peers.iter().for_each(|p| info!("{}", p));
}

В этом случае мы можем использовать mDNS для предоставления нам всех обнаруженных нод, итерируя и отображая их.

Давайте рассмотрим создание и публикацию рецептов, прежде чем перейти к командам списка:

async fn handle_create_recipe(cmd: &str) {
    if let Some(rest) = cmd.strip_prefix("create r") {
        let elements: Vec<&str> = rest.split("|").collect();
        if elements.len() < 3 {
            info!("too few arguments - Format: name|ingredients|instructions");
        } else {
            let name = elements.get(0).expect("name is there");
            let ingredients = elements.get(1).expect("ingredients is there");
            let instructions = elements.get(2).expect("instructions is there");
            if let Err(e) = create_new_recipe(name, ingredients, instructions).await {
                error!("error creating recipe: {}", e);
            };
        }
    }
}

async fn handle_publish_recipe(cmd: &str) {
    if let Some(rest) = cmd.strip_prefix("publish r") {
        match rest.trim().parse::<usize>() {
            Ok(id) => {
                if let Err(e) = publish_recipe(id).await {
                    info!("error publishing recipe with id {}, {}", id, e)
                } else {
                    info!("Published Recipe with id: {}", id);
                }
            }
            Err(e) => error!("invalid id: {}, {}", rest.trim(), e),
        };
    }
}

В обоих случаях нам нужно проанализировать строку, чтобы получить данные, разделенные |, или заданный ID рецепта в случае publish, регистрируя ошибку, если заданные входные данные не являются действительными.

В случае create мы вызываем вспомогательную функцию create_new_recipe с заданными данными. Давайте проверим все вспомогательные функции, которые нам понадобятся для взаимодействия с нашим простым локальным JSON-хранилищем для рецептов:

async fn create_new_recipe(name: &str, ingredients: &str, instructions: &str) -> Result<()> {
    let mut local_recipes = read_local_recipes().await?;
    let new_id = match local_recipes.iter().max_by_key(|r| r.id) {
        Some(v) => v.id + 1,
        None => 0,
    };
    local_recipes.push(Recipe {
        id: new_id,
        name: name.to_owned(),
        ingredients: ingredients.to_owned(),
        instructions: instructions.to_owned(),
        public: false,
    });
    write_local_recipes(&local_recipes).await?;

    info!("Created recipe:");
    info!("Name: {}", name);
    info!("Ingredients: {}", ingredients);
    info!("Instructions:: {}", instructions);

    Ok(())
}

async fn publish_recipe(id: usize) -> Result<()> {
    let mut local_recipes = read_local_recipes().await?;
    local_recipes
        .iter_mut()
        .filter(|r| r.id == id)
        .for_each(|r| r.public = true);
    write_local_recipes(&local_recipes).await?;
    Ok(())
}

async fn read_local_recipes() -> Result<Recipes> {
    let content = fs::read(STORAGE_FILE_PATH).await?;
    let result = serde_json::from_slice(&content)?;
    Ok(result)
}

async fn write_local_recipes(recipes: &Recipes) -> Result<()> {
    let json = serde_json::to_string(&recipes)?;
    fs::write(STORAGE_FILE_PATH, &json).await?;
    Ok(())
}

Самыми основными строительными блоками являются read_local_recipes и write_local_recipes, которые просто читают и десериализуют или сериализуют и записывают рецепты из или в файл хранилища.

Помощник publish_recipe извлекает все рецепты из файла, ищет рецепт с заданным ID и устанавливает для своего public флага значение true.

При создании рецепта мы также получаем все рецепты из файла, добавляем новый рецепт в конце и записываем обратно все данные, перезаписывая файл. Это не суперэффективно, но просто и работает.

7. Отправка сообщений с помощью libp2p

Далее рассмотрим list команды и изучим, как мы можем отправлять сообщения другим пирам.

В команде list есть три возможных случая:

async fn handle_list_recipes(cmd: &str, swarm: &mut Swarm<RecipeBehaviour>) {
    let rest = cmd.strip_prefix("ls r ");
    match rest {
        Some("all") => {
            let req = ListRequest {
                mode: ListMode::ALL,
            };
            let json = serde_json::to_string(&req).expect("can jsonify request");
            swarm
                .behaviour_mut()
                .floodsub
                .publish(TOPIC.clone(), json.as_bytes());
        }
        Some(recipes_peer_id) => {
            let req = ListRequest {
                mode: ListMode::One(recipes_peer_id.to_owned()),
            };
            let json = serde_json::to_string(&req).expect("can jsonify request");
            swarm
                .behaviour_mut()
                .floodsub
                .publish(TOPIC.clone(), json.as_bytes());
        }
        None => {
            match read_local_recipes().await {
                Ok(v) => {
                    info!("Local Recipes ({})", v.len());
                    v.iter().for_each(|r| info!("{:?}", r));
                }
                Err(e) => error!("error fetching local recipes: {}", e),
            };
        }
    };
}

Мы разбираем входящую команду, удаляя часть ls r и проверяем, что осталось. Если в команде больше ничего нет, мы можем просто получить наши локальные рецепты и отобразить их с помощью помощников, определенных в предыдущем разделе.

Если мы встречаем ключевое слово all, мы создаем ListRequest с установленным ListMode::ALL, сериализуем его в JSON и, используя экземпляр FloodSub в нашем Swarm, публикуем его в ранее упомянутом Topic'е.

То же самое происходит, если в команде встречается ID пира, в этом случае мы просто отправим режим ListMode::One с этим ID. Мы могли бы проверить, является ли этот идентификатор пира валидным или даже обнаруженным нами (discovered), но давайте не будем усложнять: если нет никого, кто бы его слушал, ничего не произойдет.

Это все, что нам нужно сделать для отправки сообщений в сеть. Теперь вопрос, что происходит с этими сообщениями? Где они обрабатываются?

В случае p2p приложения помните, что мы одновременно являемся отправителем (Sender) и получателем (Receiver) событий, поэтому в нашей реализации нам нужно обрабатывать как исходящие, так и входящие события.

8. Ответы на сообщения с помощью libp2p

Наконец-то настал момент, когда в дело вступает наш RecipeBehaviour. Давайте определим его:

#[derive(NetworkBehaviour)]
struct RecipeBehaviour {
    floodsub: Floodsub,
    mdns: Mdns,
    #[behaviour(ignore)]
    response_sender: mpsc::UnboundedSender<ListResponse>,
}

Само поведение — это просто структура, но мы используем пользовательский (выводимый) макрос NetworkBehaviour из libp2p, поэтому нам не нужно вручную реализовывать все функции трейта самостоятельно.

Этот derive макрос реализует функции трейта NetworkBehaviour для всех членов структуры, которые не аннотированы с помощью behavior(ignore). Наш канал здесь игнорируется, потому что он не имеет прямого отношения к нашему поведению.

Осталось реализовать функцию inject_event для FloodsubEvent и MdnsEvent.

Давайте начнем с mDNS:

impl NetworkBehaviourEventProcess<MdnsEvent> for RecipeBehaviour {
    fn inject_event(&mut self, event: MdnsEvent) {
        match event {
            MdnsEvent::Discovered(discovered_list) => {
                for (peer, _addr) in discovered_list {
                    self.floodsub.add_node_to_partial_view(peer);
                }
            }
            MdnsEvent::Expired(expired_list) => {
                for (peer, _addr) in expired_list {
                    if !self.mdns.has_node(&peer) {
                        self.floodsub.remove_node_from_partial_view(&peer);
                    }
                }
            }
        }
    }
}

Функция inject_event вызывается, когда приходит событие для этого обработчика. На стороне mDNS есть только два события, Discovered и Expired, которые срабатывают, когда мы видим новый пир в сети или когда существующий пир исчезает. В обоих случаях мы либо добавляем его, либо удаляем из нашего «partial view» (частичного представления) FloodSub, которое представляет собой список нод, на которые мы распространяем наши сообщения.

Inject_event для событий pub/sub немного сложнее. Нам нужно реагировать на входящие ListRequest и ListResponse пейлоады. Если мы отправим ListRequest, пир, получивший запрос, получит свои локальные, опубликованные рецепты, а затем ему потребуется способ отправить их обратно.

Единственный способ отправить их обратно запрашивающему пиру — это опубликовать их в сети. Поскольку pub/sub — единственный механизм, который у нас есть, нам нужно реагировать как на входящие запросы, так и на входящие ответы.

Давайте посмотрим, как это работает:

impl NetworkBehaviourEventProcess<FloodsubEvent> for RecipeBehaviour {
    fn inject_event(&mut self, event: FloodsubEvent) {
        match event {
            FloodsubEvent::Message(msg) => {
                if let Ok(resp) = serde_json::from_slice::<ListResponse>(&msg.data) {
                    if resp.receiver == PEER_ID.to_string() {
                        info!("Response from {}:", msg.source);
                        resp.data.iter().for_each(|r| info!("{:?}", r));
                    }
                } else if let Ok(req) = serde_json::from_slice::<ListRequest>(&msg.data) {
                    match req.mode {
                        ListMode::ALL => {
                            info!("Received ALL req: {:?} from {:?}", req, msg.source);
                            respond_with_public_recipes(
                                self.response_sender.clone(),
                                msg.source.to_string(),
                            );
                        }
                        ListMode::One(ref peer_id) => {
                            if peer_id == &PEER_ID.to_string() {
                                info!("Received req: {:?} from {:?}", req, msg.source);
                                respond_with_public_recipes(
                                    self.response_sender.clone(),
                                    msg.source.to_string(),
                                );
                            }
                        }
                    }
                }
            }
            _ => (),
        }
    }
}

Мы сопоставляем входящее сообщение, пытаясь десериализовать его в запрос или ответ. В случае ответа мы просто печатаем ответ с ID вызывающего пира, который мы получаем с помощью msg.source. Когда мы получаем входящий запрос, нам нужно различать случаи ALL и One.

В случае One мы проверяем, совпадает ли заданный peer ID с нашим — действительно ли запрос предназначен для нас. Если это так, мы возвращаем наши опубликованные рецепты, что также является нашим ответом в случае ALL.

В обоих случаях мы вызываем хелпер response_with_public_recipes:

fn respond_with_public_recipes(sender: mpsc::UnboundedSender<ListResponse>, receiver: String) {
    tokio::spawn(async move {
        match read_local_recipes().await {
            Ok(recipes) => {
                let resp = ListResponse {
                    mode: ListMode::ALL,
                    receiver,
                    data: recipes.into_iter().filter(|r| r.public).collect(),
                };
                if let Err(e) = sender.send(resp) {
                    error!("error sending response via channel, {}", e);
                }
            }
            Err(e) => error!("error fetching local recipes to answer ALL request, {}", e),
        }
    });
}

В этом вспомогательном методе мы используем spawn у Tokio для асинхронного выполнения future, который считывает все локальные рецепты, создает ListResponse из данных и отправляет эти данные через channel_sender в наш цикл событий, где мы обрабатываем это следующим образом (обновите код, где // …):

                EventType::Response(resp) => {
                    let json = serde_json::to_string(&resp).expect("can jsonify response");
                    swarm
                        .behaviour_mut()
                        .floodsub
                        .publish(TOPIC.clone(), json.as_bytes());
                }

Если мы замечаем «внутренне» отправленное событие через Response, мы сериализуем его в JSON и отправляем в сеть.

9. Тестирование с помощью libp2p

Linux системы (важно)

  • Если возникли ошибки во время сборки, нужно установить дополнительные зависимости:
sudo apt-get update && sudo apt-get upgrade && sudo apt-get install -y pkg-config build-essential libudev-dev
  • Если используете VS Code с rust-analyzer и у вас возникают ошибки при сборке, нужно удалить папку target и проверить локальный пакет и все его зависимости на наличие ошибок:
rm -rf target
cargo check

На этом реализация закончена. Теперь давайте протестируем ее.

Чтобы убедиться, что наша реализация работает, запустим приложение в нескольких терминалах с помощью этой команды:

RUST_LOG=info cargo run

Имейте в виду, что приложение ожидает наличия файла recipes.json в директории, из которой вы его запускаете.

Когда приложение запустилось, мы получаем следующий лог, печатающий наш идентификатор пира:

INFO  rust_peer_to_peer_example > Peer Id: 12D3KooWDc1FDabQzpntvZRWeDZUL351gJRy3F4E8VN5Gx2pBCU2

Теперь нам нужно нажать Enter, чтобы запустить цикл событий.

При вводе ls p мы получаем список обнаруженных нами пиров:

ls p
 INFO  rust_peer_to_peer_example > Discovered Peers:
 INFO  rust_peer_to_peer_example > 12D3KooWCK6X7mFk9HeWw69WF1ueWa3XmphZ2Mu7ZHvEECj5rrhG
 INFO  rust_peer_to_peer_example > 12D3KooWLGN85pv5XTDALGX5M6tRgQtUGMWXWasWQD6oJjMcEENA

С помощью ls r мы получаем локальные рецепты:

ls r
 INFO  rust_peer_to_peer_example > Local Recipes (3)
 INFO  rust_peer_to_peer_example > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", public: true }
 INFO  rust_peer_to_peer_example > Recipe { id: 1, name: " Tea", ingredients: "Tea, Water", instructions: "Boil Water, add tea", public: false }
 INFO  rust_peer_to_peer_example > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", public: true }

Вызов ls r all запускает отправку запроса другим пирам и возвращает их рецепты:

ls r all
 INFO  rust_peer_to_peer_example > Response from 12D3KooWCK6X7mFk9HeWw69WF1ueWa3XmphZ2Mu7ZHvEECj5rrhG:
 INFO  rust_peer_to_peer_example > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", public: true }
 INFO  rust_peer_to_peer_example > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", public: true }

То же самое произойдет, если мы используем ls r с ID пира:

ls r 12D3KooWCK6X7mFk9HeWw69WF1ueWa3XmphZ2Mu7ZHvEECj5rrhG
 INFO  rust_peer_to_peer_example > Response from 12D3KooWCK6X7mFk9HeWw69WF1ueWa3XmphZ2Mu7ZHvEECj5rrhG:
 INFO  rust_peer_to_peer_example > Recipe { id: 0, name: " Coffee", ingredients: "Coffee", instructions: "Make Coffee", public: true }
 INFO  rust_peer_to_peer_example > Recipe { id: 2, name: " Carrot Cake", ingredients: "Carrots, Cake", instructions: "Make Carrot Cake", public: true }

Это работает! Вы также можете попробовать это с огромным количеством клиентов в одной сети.

Полный код примера можно найти на GitHub.

10. Заключение

В этой статье мы разобрали, как создать небольшое децентрализованное сетевое приложение с использованием Rust и libp2p.

Если вы имеете опыт работы в вебе, многие сетевые концепции будут несколько знакомы, но создание p2p приложения все равно требует принципиально иного подхода к проектированию и созданию.

Библиотека libp2p является достаточно зрелой, и, благодаря популярности Rust на криптосцене, существует развивающаяся и богатая экосистема библиотек для создания мощных децентрализованных приложений.

11. Доп. комментарий и материалы на эту тему

Будет справедливо подчеркнуть, что этот пример слишком упрощен:

  1. Он использует только mDNS для обнаружения пиров, поэтому он никогда не выйдет за пределы локальной сети
  2. Использование FloodSub для ответов на конкретный запрос является настоящим излишеством — ответ должен прийти на одну ноду, но он усиливается по всему оверлею (учитывая и без того неэффективную репликацию сообщений FloodSub)

Поэтому можно попробовать еще пару вещей:

  1. Включить bootstrap ноды (как уже используется в примерах libp2p)
  2. Использовать RequestReply NetworkBehaviour для отправки ответов на запрос списка только заинтересованным нодам (может потребоваться включить запрашивающего в список broadcast)
  3. Используйте более эффективный GossipSub вместо FloodSub — в основном тот же интерфейс, так что не так много изменений

Автор: https://blog.logrocket.com/libp2p-tutorial-build-a-peer-to-peer-app-in-rust/#comment-4773

Official Tutorials: https://docs.rs/libp2p/latest/libp2p/tutorials/index.html

Official Examples: https://github.com/libp2p/rust-libp2p/tree/master/examples

Libp2p Docs (Concepts): https://docs.libp2p.io/concepts/

На этом всё! ❤️