ЭМУЛЯТОР БИРЖЕВОГО СТАКАНА ЦЕН
В прошлой статье мы создали инструмент для выгрузки и обработки данных стакана. И закрыли вопрос касаемо, где брать эти данные. Теперь, когда данные выгружены в виде своего рода датасетов, мы можем написать эмулятор, который будет транслировать эти данные.
Существует разное множество баз данных для работы с временными рядами, к примеру InfluxDB и ей подобные. Я же, решил, что мне будет удобней работать с ElasticDB.
Если посмотреть схематично на реализацию, то она будет состоять из следующих компонентов:
— Конфиг для docker-compose для деплоя Elasticsearch и Kibana (второе необязательно)
— Скрипт загрузки данных из файлов в базу
— WebSocketServer который собственно и будет транслировать наши данные из бд
— WebSockerClient для удобства и мониторинга
Первым делом задеплоим докер контейнер с Elasticsearch. В него мы загрузим все данные из файлов по иструментам, которые подготавливали в прошлой статье.
docker-compose.ymlversion: '3.7' services: elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:7.4.0 container_name: elasticsearch environment: - xpack.security.enabled=false - discovery.type=single-node ulimits: memlock: soft: -1 hard: -1 nofile: soft: 65536 hard: 65536 cap_add: - IPC_LOCK volumes: - elasticsearch-data:/usr/share/elasticsearch/data ports: - 9200:9200 - 9300:9300 kibana: container_name: kibana image: docker.elastic.co/kibana/kibana:7.4.0 environment: - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 ports: - 5601:5601 depends_on: - elasticsearch volumes: elasticsearch-data: driver: local
После того как контейнер соберется и запустится, можно загрузить в бд наши подготовленные данные, для этого создайте python скрипт со следующим содержанием.
run_elastic.pyimport datetime import os import sys from pprint import pprint as pp from memory_profiler import profile from elasticsearch import Elasticsearch from elasticsearch.helpers import parallel_bulk, bulk from dotenv import load_dotenv load_dotenv() class Parser: es = Elasticsearch(os.environ["DB"]) symbol = None iterator = 1 bulk_data = list() def __init__(self, symbol): self.symbol = symbol def add(self, data: list, send: int = 0): self.bulk_data.append({ "_op_type": "index", "_index": self.symbol, "_source": data, }) if len(self.bulk_data) == 1000 or send == 1: bulk(self.es, self.bulk_data) self.bulk_data = list() self.iterator+=1 def prepare(self, fl: str): with open(fl, "r") as f: i = 0; _data = dict({"ts":0,"a":[],"b":[],"delay":0}) for n in f: if i==0: i+=1; continue if (i%1000000)==0: print(i) nd = n.split(",") if int(nd[0]) == _data["ts"]: if nd[1] == 'a': _data['a'].append( [float(nd[2]), float(nd[3])] ) elif nd[1] == 'b': _data['b'].append( [float(nd[2]), float(nd[3])] ) elif not _data["ts"]: if nd[1] == 'a': _data['a'].append( [float(nd[2]), float(nd[3])] ) elif nd[1] == 'b': _data['b'].append( [float(nd[2]), float(nd[3])] ) _data["ts"] = int(nd[0]) _data['delay'] = float(nd[4].replace("\n", "")) else: self.add(_data) _data = dict({"ts":0,"a":[],"b":[],"delay":0}) i+=1 if _data: self.add(_data,1); del _data name = os.environ["SYMBOL"] p = Parser(name) directory = os.environ["PATH_TO_FILES"] for n in os.listdir(directory): nd = os.path.join(directory,n) if os.path.isdir(nd) and name.upper() in n: for f in os.listdir(nd): print(nd + "/"+ f) s = datetime.datetime.now() p.prepare(nd + "/"+ f) delay = datetime.datetime.now() - s print("Time execute: " + str(delay)) print("Rows in DB: " + str(p.iterator))
.envDB="http://localhost:9200" PATH_TO_FILES="Полный путь до директории с файлами" SYMBOL="Наименование инструмента, к примеру atomusdt" WS_URL="ws://localhost:5678" WS_HOST="127.0.0.1" WS_PORT="5678" EMU_SYMBOL="atomusdt" EMU_FROM="Дата и время с которого будет трансляция к примеру 01.01.2022 10:29:20.221"
Теперь это можно запустить, и загрузить все файлы в бд. Это может занят достаточно продолжительное время, поэтому наберитесь терпения.
Следующим шагом будет реализация вебсокет сервера, это сам механизм отвечающий за трансляцию данных.
ws_server.pyfrom elasticsearch import Elasticsearch from datetime import datetime, timezone from pprint import pprint as pp import asyncio import random import websockets import json import sys import threading from dotenv import load_dotenv load_dotenv() es = Elasticsearch() class Emulator: data = None ts_from = None index = None delta = 60 def __init__(self,symbol, dt): self.ts_from = int(datetime.strptime(dt, "%d.%m.%Y %H:%M:%S.%f").replace(tzinfo=timezone.utc).timestamp() * 1000) self.index = symbol self._getData() def _ts_to(self, ts_from): return ts_from + self.delta * 1000 def _getData(self): q = { "query": { "range": { "ts": { "gte": self.ts_from, "lt": self._ts_to(self.ts_from) } } }, "size": 1000, } resp = es.search(index=self.index, body=q) if resp['hits']['total']['value'] > 0: self.data = [hit['_source'] for hit in resp['hits']['hits']] print("Got %d Hits" % resp['hits']['total']['value']) print("In self.data: %d"% len(self.data)) def _append(self, ts_from): q = { "query": { "range": { "ts": { "gte": ts_from, "lt": self._ts_to(ts_from) } } }, "size": 1000, } resp = es.search(index=self.index, body=q) if resp['hits']['total']['value'] > 0: self.data.extend([hit['_source'] for hit in resp['hits']['hits']]) print("Got %d Hits" % resp['hits']['total']['value']) print("In self.data after append: %d"% len(self.data)) async def translation(self, websocket, path): while True: if self.data: current = self.data.pop(0) sys.stdout.write("\rCurrent length: %d "% len(self.data)) sys.stdout.flush() await websocket.send(json.dumps(current)) if len(self.data) < 500: threading.Thread(target=self._append, args=[self.data[len(self.data)-1]["ts"]]).start() await asyncio.sleep(int(current['delay'])/1000) def run(self): start_server = websockets.serve(self.translation, '127.0.0.1', 5678) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever() em = Emulator("atomusdt","01.01.2022 10:29:20.221") em.run()
Для начала работы его достаточно запустить, особых настроек у него нет. Стоит упомянуть, что как таковых данных в бд большое множество и было бы не целесообразно выгружать все, поэтому в данном коде предусмотрена динамическая подгрузка данных при достижении определенного лимита, а так же реализована задержка, чтобы максимально приблизиться к боевым условиям.
После запуска websocket сервера, при подключении к нему клиента, он начнет транслировать аск и бид цены, для подключения можно использоваться сторонние программы для работы с сокетами, либо проверить через плагины для хром браузера.
Но для полноты картины напишем клиент для сервера.
ws_client.pyimport asyncio import websockets from pprint import pprint as pp import json import os import sys from collections import OrderedDict from dotenv import load_dotenv load_dotenv() class WSClient: asks = dict() bids = dict() best_bid = None best_ask = None async def connect(self): uri = os.environ['WS_URL'] async with websockets.connect(uri) as websocket: while 1: obj = await websocket.recv() obj = json.loads(obj) best_bid = .0 for n in obj['b']: if best_bid < n[0] and n[1] > .0: best_bid = n[0] self.best_bid = best_bid best_ask = 999999999 for n in obj['a']: if best_ask > n[0] and n[1] > .0: best_ask = n[0] self.best_ask = best_ask self._pp() def _pp(self): sys.stdout.write("\rASK: %f BID: %f SPREAD: %f\t\t\t"% (self.best_ask,self.best_bid, (self.best_ask-self.best_bid))) sys.stdout.flush() try: ws = WSClient() asyncio.run(ws.connect()) except KeyboardInterrupt: print("\n Stop")
Теперь запустив клиент и сервер, можно посмотреть как работает наша сборка