October 6, 2022

ЭМУЛЯТОР БИРЖЕВОГО СТАКАНА ЦЕН

В прошлой статье мы создали инструмент для выгрузки и обработки данных стакана. И закрыли вопрос касаемо, где брать эти данные. Теперь, когда данные выгружены в виде своего рода датасетов, мы можем написать эмулятор, который будет транслировать эти данные.

Существует разное множество баз данных для работы с временными рядами, к примеру InfluxDB и ей подобные. Я же, решил, что мне будет удобней работать с ElasticDB.

Если посмотреть схематично на реализацию, то она будет состоять из следующих компонентов:
— Конфиг для docker-compose для деплоя Elasticsearch и Kibana (второе необязательно)
— Скрипт загрузки данных из файлов в базу
— WebSocketServer который собственно и будет транслировать наши данные из бд
— WebSockerClient для удобства и мониторинга

Первым делом задеплоим докер контейнер с Elasticsearch. В него мы загрузим все данные из файлов по иструментам, которые подготавливали в прошлой статье.

docker-compose.ymlversion: '3.7'

services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.4.0
    container_name: elasticsearch
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536
    cap_add:
      - IPC_LOCK
    volumes:
      - elasticsearch-data:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
      - 9300:9300

  kibana:
    container_name: kibana
    image: docker.elastic.co/kibana/kibana:7.4.0
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - 5601:5601
    depends_on:
      - elasticsearch

volumes:
  elasticsearch-data:
    driver: local

После того как контейнер соберется и запустится, можно загрузить в бд наши подготовленные данные, для этого создайте python скрипт со следующим содержанием.

run_elastic.pyimport datetime
import os
import sys
from pprint import pprint as pp
from memory_profiler import profile
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk, bulk
from dotenv import load_dotenv
load_dotenv()

class Parser:
    es = Elasticsearch(os.environ["DB"])
    symbol = None
    iterator = 1
    bulk_data = list()

    def __init__(self, symbol):
        self.symbol = symbol

    def add(self, data: list, send: int = 0):
        self.bulk_data.append({
                "_op_type": "index",
                "_index": self.symbol,
                "_source": data,
            })
        if len(self.bulk_data) == 1000 or send == 1:
            bulk(self.es, self.bulk_data)
            self.bulk_data = list()
        self.iterator+=1

    def prepare(self, fl: str):
        with open(fl, "r") as f:
            i = 0; _data = dict({"ts":0,"a":[],"b":[],"delay":0})
            for n in f:
                if i==0: i+=1; continue
                if (i%1000000)==0: print(i)
                nd = n.split(",")
                if int(nd[0]) == _data["ts"]:
                    if nd[1] == 'a':
                        _data['a'].append( [float(nd[2]), float(nd[3])] )
                    elif nd[1] == 'b':
                        _data['b'].append( [float(nd[2]), float(nd[3])] )
                elif not _data["ts"]:
                    if nd[1] == 'a':
                        _data['a'].append( [float(nd[2]), float(nd[3])] )
                    elif nd[1] == 'b':
                        _data['b'].append( [float(nd[2]), float(nd[3])] )
                    _data["ts"] = int(nd[0])
                    _data['delay'] = float(nd[4].replace("\n", ""))
                else:
                    self.add(_data)
                    _data = dict({"ts":0,"a":[],"b":[],"delay":0})
                i+=1
            if _data: self.add(_data,1); del _data

name = os.environ["SYMBOL"]
p = Parser(name)

directory = os.environ["PATH_TO_FILES"]
for n in  os.listdir(directory):
    nd = os.path.join(directory,n)
    if os.path.isdir(nd) and name.upper() in n:
        for f in os.listdir(nd):
            print(nd + "/"+ f)
            s = datetime.datetime.now()
            p.prepare(nd + "/"+ f)
            delay = datetime.datetime.now() - s
            print("Time execute: " + str(delay))
            print("Rows in DB: " + str(p.iterator))

И сразу же файл конфига

.envDB="http://localhost:9200"
PATH_TO_FILES="Полный путь до директории с файлами"
SYMBOL="Наименование инструмента, к примеру atomusdt"
WS_URL="ws://localhost:5678"
WS_HOST="127.0.0.1"
WS_PORT="5678"
EMU_SYMBOL="atomusdt"
EMU_FROM="Дата и время с которого будет трансляция к примеру 01.01.2022 10:29:20.221"

Теперь это можно запустить, и загрузить все файлы в бд. Это может занят достаточно продолжительное время, поэтому наберитесь терпения.

Следующим шагом будет реализация вебсокет сервера, это сам механизм отвечающий за трансляцию данных.

ws_server.pyfrom elasticsearch import Elasticsearch
from datetime import datetime, timezone
from pprint import pprint as pp
import asyncio
import random
import websockets
import json
import sys
import threading
from dotenv import load_dotenv
load_dotenv()
es = Elasticsearch()


class Emulator:
    data = None
    ts_from = None
    index = None
    delta = 60

    def __init__(self,symbol, dt):
        self.ts_from = int(datetime.strptime(dt, "%d.%m.%Y %H:%M:%S.%f").replace(tzinfo=timezone.utc).timestamp() * 1000)
        self.index = symbol

        self._getData()

    def _ts_to(self, ts_from):
        return ts_from + self.delta * 1000

    def _getData(self):
        q = {
            "query": {
                "range": {
                    "ts": {
                        "gte": self.ts_from,
                        "lt": self._ts_to(self.ts_from)
                    }
                }
            },
            "size": 1000,
        }
        resp = es.search(index=self.index, body=q)

        if resp['hits']['total']['value'] > 0:
            self.data = [hit['_source'] for hit in resp['hits']['hits']]
        print("Got %d Hits" % resp['hits']['total']['value'])
        print("In self.data: %d"% len(self.data))
    
    def _append(self, ts_from):
        q = {
            "query": {
                "range": {
                    "ts": {
                        "gte": ts_from,
                        "lt": self._ts_to(ts_from)
                    }
                }
            },
            "size": 1000,
        }
        resp = es.search(index=self.index, body=q)

        if resp['hits']['total']['value'] > 0:
            self.data.extend([hit['_source'] for hit in resp['hits']['hits']])
        print("Got %d Hits" % resp['hits']['total']['value'])
        print("In self.data after append: %d"% len(self.data))

    async def translation(self, websocket, path):
        while True:
            if self.data:
                current = self.data.pop(0)
                sys.stdout.write("\rCurrent length: %d "% len(self.data))    
                sys.stdout.flush()
                await websocket.send(json.dumps(current))
                if len(self.data) < 500:
                    threading.Thread(target=self._append, args=[self.data[len(self.data)-1]["ts"]]).start()

            await asyncio.sleep(int(current['delay'])/1000)
    
    def run(self):
        start_server = websockets.serve(self.translation, '127.0.0.1', 5678)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()


em = Emulator("atomusdt","01.01.2022 10:29:20.221")
em.run()

Для начала работы его достаточно запустить, особых настроек у него нет. Стоит упомянуть, что как таковых данных в бд большое множество и было бы не целесообразно выгружать все, поэтому в данном коде предусмотрена динамическая подгрузка данных при достижении определенного лимита, а так же реализована задержка, чтобы максимально приблизиться к боевым условиям.

После запуска websocket сервера, при подключении к нему клиента, он начнет транслировать аск и бид цены, для подключения можно использоваться сторонние программы для работы с сокетами, либо проверить через плагины для хром браузера.

Но для полноты картины напишем клиент для сервера.

ws_client.pyimport asyncio
import websockets
from pprint import pprint as pp
import json
import os
import sys
from collections import OrderedDict
from dotenv import load_dotenv
load_dotenv()

class WSClient:
    asks = dict()
    bids = dict()
    best_bid = None
    best_ask = None

    async def connect(self):
        uri = os.environ['WS_URL']
        async with websockets.connect(uri) as websocket:
            while 1:
                obj = await websocket.recv()
                obj = json.loads(obj)
                
                best_bid = .0
                for n in obj['b']: 
                    if best_bid < n[0] and n[1] > .0:
                        best_bid = n[0]
                self.best_bid = best_bid

                best_ask = 999999999
                for n in obj['a']: 
                    if best_ask > n[0] and n[1] > .0:
                        best_ask = n[0]
                self.best_ask = best_ask

                self._pp()

    def _pp(self):
        sys.stdout.write("\rASK: %f BID: %f SPREAD: %f\t\t\t"% (self.best_ask,self.best_bid, (self.best_ask-self.best_bid)))
        sys.stdout.flush()

try:
    ws = WSClient()
    asyncio.run(ws.connect())
except KeyboardInterrupt:
    print("\n Stop")

Теперь запустив клиент и сервер, можно посмотреть как работает наша сборка

Отображение сервер и клиента из консоли

Или проверив работу запустив клиента в виде плагина chrome

Отображение из плагина Simple Web Socket Plugin