August 6, 2022

Пишем свой примитивный блокчейн

Всем привет! С вами Тёма!

Сегодня я вместе с вами напишу собственный примитивный блокчейн на Rust, а делать мы его будем опираясь на данную статью ТЫК

Начнем

В этом руководстве мы создадим очень простой блокчейн с нуля, используя Rust. Наш блокчейн не будет особенно эффективным, безопасным или надежным, но он поможет нам понять, как можно простым способом реализовать некоторые из фундаментальных концепций, лежащих в основе широко известных идей

Мы не будем вдаваться в подробности каждой концепции и реализация будет иметь некоторые серьезные недостатки, но наша цель состоит в том, чтобы создать что-то, с чем вы сможете поиграть, применить к своим собственным идеям и изучить, чтобы лучше познакомиться как с Rust, так и с технологией блокчейна в целом

Основное внимание будет уделено технической части — то есть тому, как реализовать некоторые концепции и как они сочетаются друг с другом. Мы не будем объяснять, что такое блокчейн, и не будем касаться майнинга, консенсуса и т.п., помимо того, что необходимо для этого руководства. В основном нас будет интересовать реализация идей при помощи Rust

Кроме того, мы не будем создавать криптовалюту или аналогичную систему. Наш дизайн намного проще: каждая нода в сети может добавлять данные в блокчейн, добывая блок локально, а затем транслируя этот блок другим нодам

Пока блок валидный (позже мы узнаем, что это значит), каждая нода добавляет его в свою цепочку, и наш фрагмент данных становится частью децентрализованной, защищенной от несанкционированного доступа, неразрушимой сети!

Мы будем делать упрощенный и несколько надуманный дизайн, который довольно быстро столкнется с проблемами эффективности и надежности при масштабировании. Но так как мы просто делаем это как упражнение, чтобы учиться, то это совершенно нормально

Настройка проекта

Сразу прописываем в консоль следующие команды

cargo new rust-blockchain-example
cd rust-blockchain-example

Затем отредактируйте файл Cargo.toml

[dependencies]
chrono = "0.4"
sha2 = "0.9.8"
serde = {version = "1.0", features = ["derive"] }
serde_json = "1.0"
libp2p = { version = "0.39", features = ["tcp-tokio", "mdns"] }
tokio = { version = "1.0", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread", "sync", "time"] }
hex = "0.4"
once_cell = "1.5"
log = "0.4"
pretty_env_logger = "0.4"

Мы используем libp2p в качестве нашей p2p сети и Tokio в качестве рантайма

Мы будем использовать библиотеку sha2 для нашего хеширования sha256 и hex крейт для преобразования двоичных хэшей в читаемый шестнадцатеричный код

Кроме того, на самом деле есть еще и утилиты, такие как serde для JSON, log и pretty_env_logger для ведения логов, Once_cell для статической инициализации и chrono для временных меток

Основы блокчейна

Для начала в файл main.rs мы импортируем сразу все необходимые компоненты

use chrono::prelude::*;
use libp2p::{
    core::upgrade,
    futures::StreamExt,
    mplex,
    noise::{Keypair, NoiseConfig, X25519Spec},
    swarm::{Swarm, SwarmBuilder},
    tcp::TokioTcpConfig,
    Transport,
};
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::time::Duration;
use tokio::{
    io::{stdin, AsyncBufReadExt, BufReader},
    select, spawn,
    sync::mpsc,
    time::sleep,
};

mod p2p;

Теперь определим структуры для нашей цепочки блоков:

pub struct App {
    pub blocks: Vec<Block>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Block {
    pub id: u64,
    pub hash: String,
    pub previous_hash: String,
    pub timestamp: i64,
    pub data: String,
    pub nonce: u64,
}

Наша структура App по существу содержит состояние нашего приложения. В этом примере мы не будем сохранять блокчейн, поэтому он исчезнет, ​​как только мы остановим приложение

Это состояние представляет собой просто список блоков. Мы добавим новые блоки в конец этого списка, и это будет наш блокчейн

Фактически, логика сделает этот список блоков цепочкой блоков, где каждый блок ссылается на хэш предыдущего блока, который будет реализован в логике нашего приложения

Блок в нашем случае будет состоять из id, который представляет собой индекс. Затем хэш sha256 (расчет которого мы рассмотрим позже), хеш предыдущего блока, timestamp, данные, содержащиеся в блоке, и nonce, который мы также рассмотрим, когда будем говорить о добыче блока

Прежде чем мы перейдем к майнингу, давайте сначала реализуем некоторые функции проверки, которые нам нужны для базовых консенсусов, чтобы каждая нода знала какой блокчейн является правильным, в случае, если их будет несколько

Начнем с имплементации нашей структуры App:

impl App {
    fn new() -> Self {
        Self { blocks: vec![] }
    }

    fn genesis(&mut self) {
        let genesis_block = Block {
            id: 0,
            timestamp: Utc::now().timestamp(),
            previous_hash: String::from("genesis"),
            data: String::from("genesis!"),
            nonce: 2836,
            hash: "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43".to_string(),
        };
        self.blocks.push(genesis_block);
    }
...
}

Мы инициализируем наш блокчейн пустой цепочкой, но позже мы реализуем некоторую логику. При запуске мы запрашиваем у других нод их цепочку и, если она длиннее нашей, используем их. Это наши упрощенные критерии консенсуса

Функция genesis хардкодит первый блок в нашей цепочке блоков. Это «особый» блок, поскольку он не придерживается тех же правил, что и остальные блоки. Например, у него нет валидного previous_hash, так как перед ним просто не было блока

Нам нужно это, чтобы «bootstrap» (загружать) нашу ноду или всю сеть при запуске первой ноды. Цепочка должна с чего-то начинаться, и она начнется вот с этого

Блоки, блоки, блоки

Далее давайте добавим некоторые функции, позволяющие нам добавлять новые блоки в цепочку

impl App {
...
    fn try_add_block(&mut self, block: Block) {
        let latest_block = self.blocks.last().expect("there is at least one block");
        if self.is_block_valid(&block, latest_block) {
            self.blocks.push(block);
        } else {
            error!("could not add block - invalid");
        }
    }
...
}

Здесь мы извлекаем последний блок в цепочке и затем проверяем, действительно ли блок, который мы хотим добавить, валиден. Если нет, мы просто выкидываем ошибку

В нашем блокчейне мы не будем реализовывать настоящую обработку ошибок. Как вы увидите позже, если мы столкнемся с проблемами соревнования за блок между нодами и получим недопустимое состояние, то наша нода в просто сломается)

Я упомяну некоторые возможные решения этих проблем, но мы не будем их реализовывать здесь, у нас есть много чего другого для изучения

Далее давайте посмотрим на is_block_valid — основную часть нашей логики

const DIFFICULTY_PREFIX: &str = "00";

fn hash_to_binary_representation(hash: &[u8]) -> String {
    let mut res: String = String::default();
    for c in hash {
        res.push_str(&format!("{:b}", c));
    }
    res
}

impl App {
...
    fn is_block_valid(&self, block: &Block, previous_block: &Block) -> bool {
        if block.previous_hash != previous_block.hash {
            warn!("block with id: {} has wrong previous hash", block.id);
            return false;
        } else if !hash_to_binary_representation(&hex::decode(&block.hash).expect("can decode from hex"),).starts_with(DIFFICULTY_PREFIX){
            warn!("block with id: {} has invalid difficulty", block.id);
            return false;
        } else if block.id != previous_block.id + 1 {
            warn!(
                "block with id: {} is not the next block after the latest: {}",
                block.id, previous_block.id
            );
            return false;
        } else if hex::encode(calculate_hash(
            block.id,
            block.timestamp,
            &block.previous_hash,
            &block.data,
            block.nonce,
        )) != block.hash
        {
            warn!("block with id: {} has invalid hash", block.id);
            return false;
        }
        true
    }
...
}

Сначала мы определяем константу DIFFICULTY_PREFIX. Это основа нашей очень упрощенной системы майнинга. По сути, при майнинге блока, человек, занимающийся майнингом, должен хешировать данные для блока (в нашем случае с помощью SHA256) и найти хеш, который в двоичном виде начинается с "00". Это также обозначает нашу «difficulty» в сети

Как вы можете себе представить, время на поиск подходящего хэша значительно увеличивается, если нам нужны три, четыре, пять или даже 20 нулей в начале. В «настоящем» блокчейне эта сложность будет сетевым атрибутом, который согласовывается между нодами на основе алгоритма консенсуса и на основе хеш-мощности сети, поэтому сеть может гарантировать создание нового блока в определенном количестве времени

Мы не будем заниматься этим здесь. Для простоты мы просто захардкодить его двумя ведущими нулями. Это не займет много времени для вычислений на обычном компьютере, поэтому нам не нужно беспокоиться о том, чтобы ждать слишком долго при тестировании

Далее у нас есть вспомогательная функция, которая является простым двоичным представлением заданного массива байтов в виде строки. Это используется для удобной проверки соответствия хэша нашему условию DIFFICULTY_PREFIX. Есть, очевидно, гораздо более элегантные и быстрые способы сделать это, но это просто и работает для нашего случая

Теперь к логике проверки блока. Это важно, потому что это гарантирует, что наш блокчейн придерживается своих свойств и его трудно подделать. Сложность изменения чего-либо возрастает с каждым блоком, поскольку вам придется пересчитывать (т. е. повторно майнить) прошлую часть цепочки, чтобы снова получить действительную цепочку. Это достаточно дорого, поэтому не имеет смысла делать в реальном блокчейне

Есть несколько практических правил, которым вы должны следовать:

  • previous_hash должен совпадать с хэшем последнего блока в цепочке
  • hash должен начинаться с нашего DIFFICULTY_PREFIX (т. е. двух нулей), что указывало бы на то, что он был добыт правильно
  • id должен быть последним ID, увеличенным на 1
  • Хэш должен быть действительно правильным; хеширование данных блока должно дать нам хэш блока (в противном случае вы могли бы просто создать случайный хеш, начинающийся с 001)

Если мы подумаем об этом как о распределенной системе, вы можете заметить, что здесь могут возникнуть проблемы. Что, если две ноды одновременно майнят блок на с id блока 5? Они оба создадут блок с id 6, а предыдущий блок укажет на блок с id 5 и нам будут отправлены оба блока. Мы бы их валидировали и добавили бы первый пришедший, а второй бы выбрасывали при валидации, так как у нас уже есть блок с ID 6

Это неотъемлемая проблема такой системы и причина, по которой между нодами должен существовать алгоритм консенсуса, чтобы решить, какие блоки (то есть, какую цепочку) согласовать и использовать

В мире блокчейна есть более изощренные подходы к решению этой проблемы. Например, если бы мы отправляли наши данные как «транзакции» другим нодам, а нода добывали бы блоки с набором транзакций, это было бы несколько легче. Но тогда все будут майнить все время и побеждать будет самый быстрый. Итак это создаст дополнительные, но менее серьезные проблемы, которые нам придется исправлять

В любом случае, наш простой подход будет работать для нашей локальной тестовой сети

Какую цепь использовать?

Теперь, когда мы можем проверить блок, давайте реализуем логику для проверки всей цепочки:

impl App {
...
    fn is_chain_valid(&self, chain: &[Block]) -> bool {
        for i in 0..chain.len() {
            if i == 0 {
                continue;
            }
            let first = chain.get(i - 1).expect("has to exist");
            let second = chain.get(i).expect("has to exist");
            if !self.is_block_valid(second, first) {
                return false;
            }
        }
        true
    }
...
}

Игнорируя генезис блок, мы просто проходим все блоки и проверяем их. Если один блок не проходит проверку, мы теряем всю цепочку

В App остался еще один метод, который поможет нам выбрать, какую цепочку использовать:

impl App {
...
    // We always choose the longest valid chain
    fn choose_chain(&mut self, local: Vec<Block>, remote: Vec<Block>) -> Vec<Block> {
        let is_local_valid = self.is_chain_valid(&local);
        let is_remote_valid = self.is_chain_valid(&remote);

        if is_local_valid && is_remote_valid {
            if local.len() >= remote.len() {
                local
            } else {
                remote
            }
        } else if is_remote_valid && !is_local_valid {
            remote
        } else if !is_remote_valid && is_local_valid {
            local
        } else {
            panic!("local and remote chains are both invalid");
        }
    }
}

Это происходит, если мы запрашиваем у другой ноды его цепочку, чтобы определить, является ли она «лучше» (согласно нашему алгоритму консенсуса), чем наша локальная

Нашим критерием является просто длина цепочки. В реальных системах обычно больше факторов, таких как сложность и многие другие параметры. В нашем случае мы делаем так: если одна (правильная) цепочка длиннее другой, мы берем эту цепочку

Мы проверяем как нашу локальную, так и чужую цепочку и берем более длинную. Мы также сможем использовать эту функциональность во время запуска, когда мы запрашиваем у других нод их цепочку и поскольку наша включает только генезис блок, мы сразу же начнем с цепочки «согласовано»

Майнинг

Чтобы закончить нашу логику, связанную с блокчейном, давайте реализуем нашу базовую систему майнинга

impl Block {
    pub fn new(id: u64, previous_hash: String, data: String) -> Self {
        let now = Utc::now();
        let (nonce, hash) = mine_block(id, now.timestamp(), &previous_hash, &data);
        Self {
            id,
            hash,
            timestamp: now.timestamp(),
            previous_hash,
            data,
            nonce,
        }
    }
}

Когда создается новый блок, мы вызываем mine_block, который возвращает nonce и hash. Затем мы можем создать блок при помощи timestamp, заданных данных, идентификатора, предыдущего хеша, нового хеша и nonce

Мы говорили обо всех вышеперечисленных полях, кроме nonce. Чтобы понять, что это такое, давайте посмотрим на функцию mine_block:

fn mine_block(id: u64, timestamp: i64, previous_hash: &str, data: &str) -> (u64, String) {
    info!("mining block...");
    let mut nonce = 0;

    loop {
        if nonce % 100000 == 0 {
            info!("nonce: {}", nonce);
        }
        let hash = calculate_hash(id, timestamp, previous_hash, data, nonce);
        let binary_hash = hash_to_binary_representation(&hash);
        if binary_hash.starts_with(DIFFICULTY_PREFIX) {
            info!(
                "mined! nonce: {}, hash: {}, binary hash: {}",
                nonce,
                hex::encode(&hash),
                binary_hash
            );
            return (nonce, hex::encode(hash));
        }
        nonce += 1;
    }
}

После объявления о том, что мы собираемся добыть блок, мы устанавливаем nonce 0

Затем мы запускаем бесконечный цикл, который увеличивает nonce на каждом шаге. Внутри цикла, помимо регистрации каждой 100000-й итерации для приблизительного индикатора прогресса, мы вычисляем хэш данных блока с помощью calculate_hash, который мы проверим далее

Затем мы используем наш помощник hash_to_binary_representation и проверяем, соответствует ли вычисленный хэш нашим критериям сложности (два нолика в начале)

Если это так, мы регистрируем это и возвращаем nonce, увеличивающее число, где это произошло, и хэш (в шестнадцатеричной системе счистления). В противном случае мы увеличиваем nonce и идем снова

По сути, мы отчаянно пытаемся найти часть данных — в данном случае nonce и число, которые вместе с данными нашего блока, хешированными с помощью SHA256, дадут нам хеш, начинающийся с двух нулей

Нам нужно записать этот nonce в наш блок, чтобы другие ноды могли проверить наш хеш, поскольку nonce хэшируется вместе с данными блока. Например, если бы нам потребовалось 52342 итерации для вычисления подходящего хеша (начинающегося с двух нулей), nonce будет равен 52341 (на 1 меньше, так как он начинается с 0)

Давайте посмотрим на утилиту для фактического создания хэша SHA256

fn calculate_hash(id: u64, timestamp: i64, previous_hash: &str, data: &str, nonce: u64) -> Vec<u8> {
    let data = serde_json::json!({
        "id": id,
        "previous_hash": previous_hash,
        "data": data,
        "timestamp": timestamp,
        "nonce": nonce
    });
    let mut hasher = Sha256::new();
    hasher.update(data.to_string().as_bytes());
    hasher.finalize().as_slice().to_owned()
}

Это довольно просто. Мы создаем JSON-представление наших данных блока, используя текущий nonce, и пропускаем его через хэш SHA256 sha2, возвращая Vec<u8>

По сути, это вся логика нашего блокчейна. У нас есть структура данных блокчейна: список блоков. У нас есть блоки, которые указывают на предыдущий блок. Они должны иметь увеличивающийся id и хэш, соответствующий нашим правилам майнинга

Если мы просим получить новые блоки от других нод, мы проверяем их и, если они в порядке, добавляем их в цепочку. Если мы получаем полный блокчейн от другой ноды, мы также проверяем его и, если он длиннее нашего (т. е. содержит больше блоков в нем), мы заменяем им нашу собственную цепочку

Как вы можете себе представить, поскольку каждая нода реализует именно эту логику, блоки и согласованные цепочки могут быстро распространяться по сети, и сеть сходится к одному и тому же состоянию (как с вышеупомянутыми ограничениями обработки ошибок в нашем простом случае)

Основы P2P

Начнем с создания файла p2p.rs, который будет содержать большую часть логики одноранговой связи, которую мы будем использовать в нашем приложении

Теперь сразу импортируем все необходимые модули

use super::{App, Block};
use libp2p::{
    floodsub::{Floodsub, FloodsubEvent, Topic},
    identity,
    mdns::{Mdns, MdnsEvent},
    swarm::{NetworkBehaviourEventProcess, Swarm},
    NetworkBehaviour, PeerId,
};
use log::{error, info};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tokio::sync::mpsc;

Здесь мы снова определяем некоторые основные структуры данных и константы, которые нам понадобятся:

pub static KEYS: Lazy<identity::Keypair> = Lazy::new(identity::Keypair::generate_ed25519);
pub static PEER_ID: Lazy<PeerId> = Lazy::new(|| PeerId::from(KEYS.public()));
pub static CHAIN_TOPIC: Lazy<Topic> = Lazy::new(|| Topic::new("chains"));
pub static BLOCK_TOPIC: Lazy<Topic> = Lazy::new(|| Topic::new("blocks"));

#[derive(Debug, Serialize, Deserialize)]
pub struct ChainResponse {
    pub blocks: Vec<Block>,
    pub receiver: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct LocalChainRequest {
    pub from_peer_id: String,
}

pub enum EventType {
    LocalChainResponse(ChainResponse),
    Input(String),
    Init,
}

#[derive(NetworkBehaviour)]
pub struct AppBehaviour {
    pub floodsub: Floodsub,
    pub mdns: Mdns,
    #[behaviour(ignore)]
    pub response_sender: mpsc::UnboundedSender<ChainResponse>,
    #[behaviour(ignore)]
    pub init_sender: mpsc::UnboundedSender<bool>,
    #[behaviour(ignore)]
    pub app: App,
}

Начиная сверху, мы определяем пару ключей и производный идентификатор ноды. Это просто встроенные функции libp2p для идентификации клиента в сети

Затем мы определяем две так называемые topics: chains и blocks. Мы будем использовать протокол FloodSub - простой протокол публикации/подписки, для связи между нодами

У этого метода есть преимущество в том, что его очень просто настроить и использовать, но недостаток в том, что нам нужно транслировать каждую часть информации. Таким образом, даже если мы просто хотим ответить на «запрос для нашей цепочки» одного клиента, этот клиент отправит этот запрос всем нодам, к которым он подключен в сети, и мы также отправим наш ответ всем им

Это не проблема с точки зрения грамотности, но с точки зрения эффективности это явно ужасно. Это может быть реализовано с помощью простой модели запроса/ответа «point-to-point», которую поддерживает libp2p, но это просто еще больше усложнит этот и без того сложный пример. Если вам интересно, вы можете ознакомиться с документацией libp2p

Мы также могли бы использовать более эффективный GossipSub вместо FloodSub. Но, опять же, настроить его не так удобно, да и производительность нас сейчас особо не интересует. Если вам интересно изучить это, то вы можете ознакомьтесь с официальной документацией

В любом случае, topics — это каналы, на которые можно подписаться. Мы можем подписаться на «chains» (цепочки) и использовать их для отправки нашего локального блокчейна другим нодам и для получения их. То же самое верно и для «blocks», которые мы будем использовать для трансляции и получения новых блоков

Далее у нас есть концепция ChainResponse, содержащая список блоков и получателя. Это структура, которую мы ожидаем. Если кто-то отправит нам свой локальный блокчейн и использует для отправки им наш chain

LocalChainRequest — это то, что запускает это взаимодействие. Если мы отправим LocalChainRequest с peer_id другой ноды в сети, то это приведет к тому, что они отправят нам свою цепочку обратно, как мы увидим позже

Для обработки входящих сообщений, отложенной инициализации и ввода с клавиатуры пользователем клиента мы определяем перечисление EventType, которое поможет отправлять события в приложение, чтобы синхронизировать состояние нашего приложения с входящим и исходящим сетевым трафиком

Наконец, ядром функциональности P2P является наш AppBehaviour, который реализует NetworkBehaviour, концепцию libp2p для реализации децентрализованного сетевого стека

AppBehaviour содержит наш экземпляр FloodSub для связи pub/sub и экземпляр Mdns, что позволит нам автоматически находить другие ноды в нашей локальной сети (но не за ее пределами)

Мы также добавляем наш App к этому поведению, а также каналы для отправки событий, для инициализации и для обмена запросами/ответами между частями приложения. Мы увидим это в действии позже

Инициализация AppBehaviour также довольно проста:

impl AppBehaviour {
    pub async fn new(
        app: App,
        response_sender: mpsc::UnboundedSender<ChainResponse>,
        init_sender: mpsc::UnboundedSender<bool>,
    ) -> Self {
        let mut behaviour = Self {
            app,
            floodsub: Floodsub::new(*PEER_ID),
            mdns: Mdns::new(Default::default())
                .await
                .expect("can create mdns"),
            response_sender,
            init_sender,
        };
        behaviour.floodsub.subscribe(CHAIN_TOPIC.clone());
        behaviour.floodsub.subscribe(BLOCK_TOPIC.clone());

        behaviour
    }
}

Обработка входящих сообщений

Во-первых, мы реализуем обработчики данных, поступающих с других нод

Мы начнем с событий Mdns, так как они в просты:

impl NetworkBehaviourEventProcess<MdnsEvent> for AppBehaviour {
    fn inject_event(&mut self, event: MdnsEvent) {
        match event {
            MdnsEvent::Discovered(discovered_list) => {
                for (peer, _addr) in discovered_list {
                    self.floodsub.add_node_to_partial_view(peer);
                }
            }
            MdnsEvent::Expired(expired_list) => {
                for (peer, _addr) in expired_list {
                    if !self.mdns.has_node(&peer) {
                        self.floodsub.remove_node_from_partial_view(&peer);
                    }
                }
            }
        }
    }
}

Если мы обнаружен новую ноду, мы добавляем ее в наш список нод FloodSub, чтобы мы могли общаться. По истечении срока его действия мы снова удаляем его

Более интересна реализация NetworkBehaviour у нашего коммуникационного протокола FloodSub

impl NetworkBehaviourEventProcess<FloodsubEvent> for AppBehaviour {
    fn inject_event(&mut self, event: FloodsubEvent) {
        if let FloodsubEvent::Message(msg) = event {
            if let Ok(resp) = serde_json::from_slice::<ChainResponse>(&msg.data) {
                if resp.receiver == PEER_ID.to_string() {
                    info!("Response from {}:", msg.source);
                    resp.blocks.iter().for_each(|r| info!("{:?}", r));

                    self.app.blocks = self.app.choose_chain(self.app.blocks.clone(), resp.blocks);
                }
            } else if let Ok(resp) = serde_json::from_slice::<LocalChainRequest>(&msg.data) {
                info!("sending local chain to {}", msg.source.to_string());
                let peer_id = resp.from_peer_id;
                if PEER_ID.to_string() == peer_id {
                    if let Err(e) = self.response_sender.send(ChainResponse {
                        blocks: self.app.blocks.clone(),
                        receiver: msg.source.to_string(),
                    }) {
                        error!("error sending response via channel, {}", e);
                    }
                }
            } else if let Ok(block) = serde_json::from_slice::<Block>(&msg.data) {
                info!("received new block from {}", msg.source.to_string());
                self.app.try_add_block(block);
            }
        }
    }
}

Для входящих событий, таких как FloodsubEvent::Message, мы проверяем, соответствует ли payload любой из наших ожидаемых структур данных

Если это ChainResponse, это означает, что мы получили локальную цепочку блоков от другой ноды

Мы проверяем, действительно ли мы являемся получателем указанного фрагмента данных, и, если да, регистрируем входящий блокчейн и пытаемся выполнить наш консенсус. Если он действителен и длиннее нашей цепочки, мы заменяем ею нашу цепочку. В противном случае мы сохраняем собственную цепочку

Если входящие данные являются LocalChainRequest, то мы проверяем, являемся ли мы теми, от кого им нужна цепочка, проверяя from_peer_id. Если это так, мы просто отправляем им JSON-версию нашего локального блокчейна. Фактическая часть отправки находится в другой части кода, но сейчас мы просто отправляем ее через наш канал событий для ответов

Наконец, если это входящий Block, то это означает, что кто-то еще добыл блок и хочет, чтобы мы добавили его в нашу локальную цепочку. Проверяем, действителен ли блок и, если да, добавляем его

Собираем все вместе

Отлично! Теперь давайте соединим все это вместе и добавим несколько команд для взаимодействия пользователя с приложением

Вернемся в main.rs и допишем вот это

#[tokio::main]
async fn main() {
    pretty_env_logger::init();

    info!("Peer Id: {}", p2p::PEER_ID.clone());
    let (response_sender, mut response_rcv) = mpsc::unbounded_channel();
    let (init_sender, mut init_rcv) = mpsc::unbounded_channel();

    let auth_keys = Keypair::<X25519Spec>::new()
        .into_authentic(&p2p::KEYS)
        .expect("can create auth keys");

    let transp = TokioTcpConfig::new()
        .upgrade(upgrade::Version::V1)
        .authenticate(NoiseConfig::xx(auth_keys).into_authenticated())
        .multiplex(mplex::MplexConfig::new())
        .boxed();

    let behaviour = p2p::AppBehaviour::new(App::new(), response_sender, init_sender.clone()).await;

    let mut swarm = SwarmBuilder::new(transp, behaviour, *p2p::PEER_ID)
        .executor(Box::new(|fut| {
            spawn(fut);
        }))
        .build();

    let mut stdin = BufReader::new(stdin()).lines();

    Swarm::listen_on(
        &mut swarm,
        "/ip4/0.0.0.0/tcp/0"
            .parse()
            .expect("can get a local socket"),
    )
    .expect("swarm can be started");

    spawn(async move {
        sleep(Duration::from_secs(1)).await;
        info!("sending init event");
        init_sender.send(true).expect("can send init event");
    });

Это очень много кода, но в основном он просто настраивает то, о чем мы уже говорили. Мы инициализируем ведение логов и два наших канала событий для инициализации и ответов

Затем мы инициализируем нашу пару ключей, libp2p transport, поведение и Swarm libp2p, который является сущностью, которая запускает наш сетевой стек

Мы также инициализируем буферизованный считыватель на stdin, чтобы мы могли читать входящие команды от пользователя и запускать наш Swarm

Наконец, мы создаем асинхронную сопрограмму, которая ждет секунду, а затем отправляет триггер инициализации на канал инициализации

Это сигнал, который мы будем использовать после запуска ноды, чтобы немного подождать, пока нода не будет запущена и подключена. Затем мы запрашиваем у другой ноды их текущую цепочку блоков, чтобы ускорить нас

Остальная часть main представляет собой интересную часть — часть, где мы обрабатываем события клавиатуры от пользователя, входящие данные и исходящие данные

loop {
        let evt = {
            select! {
                line = stdin.next_line() => Some(p2p::EventType::Input(line.expect("can get line").expect("can read line from stdin"))),
                response = response_rcv.recv() => {
                    Some(p2p::EventType::LocalChainResponse(response.expect("response exists")))
                },
                _init = init_rcv.recv() => {
                    Some(p2p::EventType::Init)
                }
                event = swarm.select_next_some() => {
                    info!("Unhandled Swarm Event: {:?}", event);
                    None
                },
            }
        };

        if let Some(event) = evt {
            match event {
                p2p::EventType::Init => {
                    let peers = p2p::get_list_peers(&swarm);
                    swarm.behaviour_mut().app.genesis();

                    info!("connected nodes: {}", peers.len());
                    if !peers.is_empty() {
                        let req = p2p::LocalChainRequest {
                            from_peer_id: peers
                                .iter()
                                .last()
                                .expect("at least one peer")
                                .to_string(),
                        };

                        let json = serde_json::to_string(&req).expect("can jsonify request");
                        swarm
                            .behaviour_mut()
                            .floodsub
                            .publish(p2p::CHAIN_TOPIC.clone(), json.as_bytes());
                    }
                }
                p2p::EventType::LocalChainResponse(resp) => {
                    let json = serde_json::to_string(&resp).expect("can jsonify response");
                    swarm
                        .behaviour_mut()
                        .floodsub
                        .publish(p2p::CHAIN_TOPIC.clone(), json.as_bytes());
                }
                p2p::EventType::Input(line) => match line.as_str() {
                    "ls p" => p2p::handle_print_peers(&swarm),
                    cmd if cmd.starts_with("ls c") => p2p::handle_print_chain(&swarm),
                    cmd if cmd.starts_with("create b") => p2p::handle_create_block(cmd, &mut swarm),
                    _ => error!("unknown command"),
                },
            }
        }
    }

Мы запускаем бесконечный цикл и используем Tokio select! макрос для запуска нескольких асинхронных функций

Это означает, что первый из этих исходов будет обработан сразу, а затем мы начнем цикл заново

Первый emitter событий — это наш буферизованный считыватель, который будет давать нам входные строки от пользователя. Если мы его получаем, то мы создаем EventType::Input со строкой

Затем мы прослушиваем канал ответа и канал инициализации, соответственно создавая их события

И если события приходят на swarm, это означает, что это события, которые не обрабатываются ни нашим поведением Mdns, ни нашим поведением FloodSub, и мы просто регистрируем их. В основном это шум, например подключение/отключение в нашем случае, но они полезны для отладки

Когда соответствующие события созданы, мы приступаем к их обработке

Для нашего события Init мы вызываем genesis() в нашем приложении, создавая наш genesis блок. Если мы подключены к нодам, то мы инициируем LocalChainRequest к последнему блоку в списке

Очевидно, что здесь имело бы смысл задавать вопросы нескольким нодам, а может быть, и несколько раз, и выбирать наилучшую (то есть самую длинную) цепочку ответов. Но для простоты мы просто спрашиваем и принимаем все, что нам присылают

Затем, если мы получаем событие LocalChainResponse, это означает, что что-то было отправлено по каналу ответа. Если вы помните, выше это произошло в нашем поведении FloodSub, когда мы отправили нашу локальную цепочку блоков обратно запрашивающей ноде. Здесь мы фактически отправляем входящий JSON в правильный топик FloodSub, поэтому он транслируется в сеть

Наконец, для пользовательского ввода у нас есть три команды:

  • ls p выводит список всех нод
  • ls c выводит локальную цепочку блоков
  • create b $data создает новый блок с строковым содержимым $data

Каждая команда вызывает одну из этих вспомогательных функций (это вставить в файл p2p.rs):

pub fn get_list_peers(swarm: &Swarm<AppBehaviour>) -> Vec<String> {
    info!("Discovered Peers:");
    let nodes = swarm.behaviour().mdns.discovered_nodes();
    let mut unique_peers = HashSet::new();
    for peer in nodes {
        unique_peers.insert(peer);
    }
    unique_peers.iter().map(|p| p.to_string()).collect()
}

pub fn handle_print_peers(swarm: &Swarm<AppBehaviour>) {
    let peers = get_list_peers(swarm);
    peers.iter().for_each(|p| info!("{}", p));
}

pub fn handle_print_chain(swarm: &Swarm<AppBehaviour>) {
    info!("Local Blockchain:");
    let pretty_json =
        serde_json::to_string_pretty(&swarm.behaviour().app.blocks).expect("can jsonify blocks");
    info!("{}", pretty_json);
}

pub fn handle_create_block(cmd: &str, swarm: &mut Swarm<AppBehaviour>) {
    if let Some(data) = cmd.strip_prefix("create b") {
        let behaviour = swarm.behaviour_mut();
        let latest_block = behaviour
            .app
            .blocks
            .last()
            .expect("there is at least one block");
        let block = Block::new(
            latest_block.id + 1,
            latest_block.hash.clone(),
            data.to_owned(),
        );
        let json = serde_json::to_string(&block).expect("can jsonify request");
        behaviour.app.blocks.push(block);
        info!("broadcasting new block");
        behaviour
            .floodsub
            .publish(BLOCK_TOPIC.clone(), json.as_bytes());
    }
}

Список клиентов и печать блокчейна довольно просты, а вот процесс создания блока интереснее

В этом случае мы используем Block::new для создания (и майнинга) нового блока. Как только это произойдет, мы преобразуем его в JSON и транслируем в сеть, чтобы другие могли добавить его в свою цепочку

Здесь мы бы поместили некоторую логику для r-попыток. Например, мы могли бы добавить его в очередь и посмотреть, распространится ли через некоторое время наш блок на широко согласованный блокчейн, и, если нет, то получить новую копию согласованной цепочки и снова добыть ее, чтобы получить ее там. Как упоминалось выше, этот дизайн, безусловно, не будет масштабироваться на множество нод, постоянно добывающих свои блоки, но это нормально для нас, так как мы учимся

Тестирование нашего Rust блокчейна

Мы можем запустить приложение, используя RUST_LOG=info cargo run. На самом деле лучше запускать несколько клиентов в разных окнах терминала

Например, мы можем запустить две ноды:

INFO  rust_blockchain_example > Peer Id: 12D3KooWJWbGzpdakrDroXuCKPRBqmDW8wYc1U3WzWEydVr2qZNv

и:

INFO  rust_blockchain_example > Peer Id: 12D3KooWSXGZJJEnh3tndGEVm6ACQ5pdaPKL34ktmCsUqkqSVTWX

Использование ls p во втором клиенте показывает нам подключение к первому:

INFO  rust_blockchain_example::p2p > Discovered Peers:
 INFO  rust_blockchain_example::p2p > 12D3KooWJWbGzpdakrDroXuCKPRBqmDW8wYc1U3WzWEydVr2qZNv

Затем мы можем использовать ls c для печати генезис блока:

INFO  rust_blockchain_example::p2p > Local Blockchain:
 INFO  rust_blockchain_example::p2p > [
  {
    "id": 0,
    "hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "previous_hash": "genesis",
    "timestamp": 1636664658,
    "data": "genesis!",
    "nonce": 2836
  }
]

Так, вроде все понятно, давайте теперь создадим блок:

create b hello
 INFO  rust_blockchain_example      > mining block...
 INFO  rust_blockchain_example      > nonce: 0
 INFO  rust_blockchain_example      > mined! nonce: 62235, hash: 00008cf68da9f978aa080b7aad93fb4285e3c0dbd85fc21bc7e83e623f9fa922, binary hash: 0010001100111101101000110110101001111110011111000101010101000101111110101010110110010011111110111000010100001011110001111000000110110111101100010111111100001011011110001111110100011111011000101111111001111110101001100010
 INFO  rust_blockchain_example::p2p > broadcasting new block

На первой ноде мы видим это:

INFO  rust_blockchain_example::p2p > received new block from 12D3KooWSXGZJJEnh3tndGEVm6ACQ5pdaPKL34ktmCsUqkqSVTWX

И вызов ls c:

INFO  rust_blockchain_example::p2p > Local Blockchain:
 INFO  rust_blockchain_example::p2p > [
  {
    "id": 0,
    "hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "previous_hash": "genesis",
    "timestamp": 1636664655,
    "data": "genesis!",
    "nonce": 2836
  },
  {
    "id": 1,
    "hash": "00008cf68da9f978aa080b7aad93fb4285e3c0dbd85fc21bc7e83e623f9fa922",
    "previous_hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "timestamp": 1636664772,
    "data": " hello",
    "nonce": 62235
  }
]

Блок был добавлен!

Запустим третью ноду. Он должен автоматически получить эту обновленную цепочку, потому что она длиннее своей собственной (только блок генезиса)

INFO  rust_blockchain_example > Peer Id: 12D3KooWSDyn83pJD4eEg9dvYffceAEcbUkioQvSPY7aCi7J598q

 INFO  rust_blockchain_example > sending init event
 INFO  rust_blockchain_example::p2p > Discovered Peers:
 INFO  rust_blockchain_example      > connected nodes: 2
 INFO  rust_blockchain_example::p2p > Response from 12D3KooWSXGZJJEnh3tndGEVm6ACQ5pdaPKL34ktmCsUqkqSVTWX:
 INFO  rust_blockchain_example::p2p > Block { id: 0, hash: "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43", previous_hash: "genesis", timestamp: 1636664658, data: "genesis!", nonce: 2836 }
 INFO  rust_blockchain_example::p2p > Block { id: 1, hash: "00008cf68da9f978aa080b7aad93fb4285e3c0dbd85fc21bc7e83e623f9fa922", previous_hash: "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43", timestamp: 1636664772, data: " hello", nonce: 62235 }

После отправки события init мы запросили цепочку у второй ноды и получили ее

Вызов ls c показывает нам ту же цепочку:

INFO  rust_blockchain_example::p2p > Local Blockchain:
 INFO  rust_blockchain_example::p2p > [
  {
    "id": 0,
    "hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "previous_hash": "genesis",
    "timestamp": 1636664658,
    "data": "genesis!",
    "nonce": 2836
  },
  {
    "id": 1,
    "hash": "00008cf68da9f978aa080b7aad93fb4285e3c0dbd85fc21bc7e83e623f9fa922",
    "previous_hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "timestamp": 1636664772,
    "data": " hello",
    "nonce": 62235
  }
]

Создание блока тут так же работает

create b alsoworks
 INFO  rust_blockchain_example      > mining block...
 INFO  rust_blockchain_example      > nonce: 0
 INFO  rust_blockchain_example      > mined! nonce: 34855, hash: 0000e0bddf4e603da675b92b88e86e25692eaaa8ad20db6ecab5940bdee1fdfd, binary hash: 001110000010111101110111111001110110000011110110100110111010110111001101011100010001110100011011101001011101001101110101010101010100010101101100000110110111101110110010101011010110010100101111011110111000011111110111111101
 INFO  rust_blockchain_example::p2p > broadcasting new block

Нода 1:

INFO  rust_blockchain_example::p2p > received new block from 12D3KooWSDyn83pJD4eEg9dvYffceAEcbUkioQvSPY7aCi7J598q

ls c
 INFO  rust_blockchain_example::p2p > Local Blockchain:
 INFO  rust_blockchain_example::p2p > [
  {
    "id": 0,
    "hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "previous_hash": "genesis",
    "timestamp": 1636664658,
    "data": "genesis!",
    "nonce": 2836
  },
  {
    "id": 1,
    "hash": "00008cf68da9f978aa080b7aad93fb4285e3c0dbd85fc21bc7e83e623f9fa922",
    "previous_hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "timestamp": 1636664772,
    "data": " hello",
    "nonce": 62235
  },
  {
    "id": 2,
    "hash": "0000e0bddf4e603da675b92b88e86e25692eaaa8ad20db6ecab5940bdee1fdfd",
    "previous_hash": "00008cf68da9f978aa080b7aad93fb4285e3c0dbd85fc21bc7e83e623f9fa922",
    "timestamp": 1636664920,
    "data": " alsoworks",
    "nonce": 34855
  }
]

Нода 2:

INFO  rust_blockchain_example::p2p > received new block from 12D3KooWSDyn83pJD4eEg9dvYffceAEcbUkioQvSPY7aCi7J598q
ls c
 INFO  rust_blockchain_example::p2p > Local Blockchain:
 INFO  rust_blockchain_example::p2p > [
  {
    "id": 0,
    "hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "previous_hash": "genesis",
    "timestamp": 1636664655,
    "data": "genesis!",
    "nonce": 2836
  },
  {
    "id": 1,
    "hash": "00008cf68da9f978aa080b7aad93fb4285e3c0dbd85fc21bc7e83e623f9fa922",
    "previous_hash": "0000f816a87f806bb0073dcf026a64fb40c946b5abee2573702828694d5b4c43",
    "timestamp": 1636664772,
    "data": " hello",
    "nonce": 62235
  },
  {
    "id": 2,
    "hash": "0000e0bddf4e603da675b92b88e86e25692eaaa8ad20db6ecab5940bdee1fdfd",
    "previous_hash": "00008cf68da9f978aa080b7aad93fb4285e3c0dbd85fc21bc7e83e623f9fa922",
    "timestamp": 1636664920,
    "data": " alsoworks",
    "nonce": 34855
  }
]

Отлично, все работает!

Весь код можно найти в гитхабе ТЫК

Надеюсь статья была интересной и понятной!

Все мои ресурсы - https://t.me/ortomich_links