Source code for ubdcc_dcn.DepthCacheNode
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/ubdcc-dcn/ubdcc_dcn/DepthCacheNode.py
#
# Project website: https://github.com/oliver-zehentleitner/unicorn-binance-depth-cache-cluster
# Github: https://github.com/oliver-zehentleitner/unicorn-binance-depth-cache-cluster
# Documentation: https://oliver-zehentleitner.github.io/unicorn-binance-depth-cache-cluster
# PyPI: https://pypi.org/project/ubdcc-dcn
#
# License: MIT
# https://github.com/oliver-zehentleitner/unicorn-binance-depth-cache-cluster/blob/master/LICENSE
#
# Author: Oliver Zehentleitner
#
# Copyright (c) 2024-2026, Oliver Zehentleitner (https://about.me/oliver-zehentleitner)
# All rights reserved.
import queue
from functools import partial
from .RestEndpoints import RestEndpoints
from ubdcc_shared_modules.AccountGroups import get_account_group
from ubdcc_shared_modules.ServiceBase import ServiceBase
from unicorn_binance_local_depth_cache import BinanceLocalDepthCacheManager, DepthCacheNotFound
from unicorn_binance_local_depth_cache.manager import __version__ as ubldc_version
[docs]
class DepthCacheNode(ServiceBase):
def __init__(self, cwd=None, mgmt_port=None):
# Thread-safe queue: UBLDC on_restart callbacks fire from their
# manager thread; the async main loop drains the queue and forwards
# to mgmt. Must be initialized BEFORE super().__init__() because
# ServiceBase.__init__() calls self.app.start() which blocks and
# synchronously runs main() → _drain_restart_queue(). Any attribute
# set after super().__init__() is never assigned.
self._restart_queue: queue.Queue = queue.Queue()
super().__init__(app_name="ubdcc-dcn", cwd=cwd, mgmt_port=mgmt_port)
def _on_stream_restart(self, exchange: str, market: str, timestamp: float) -> None:
"""Thread-safe: invoked from UBLDC's manager thread on every stream restart."""
self._restart_queue.put((exchange, market, timestamp))
def _ldcs_for_account_group(self, account_group: str):
"""Yield every running BinanceLocalDepthCacheManager whose exchange maps
to the given account_group."""
for exchange, by_interval in self.app.data.get('depthcache_instances', {}).items():
if get_account_group(exchange) != account_group:
continue
for ldc in by_interval.values():
if ldc is not None:
yield ldc
async def _sync_credentials(self) -> None:
"""For every account_group currently used by this DCN, pull the assigned
credential from mgmt. If the credential id differs from the one we last
propagated, hot-swap the UBRA in all affected UBLDCs via
`set_credentials()`. Runs once per main-loop iteration after the
mgmt sync."""
cache = self.app.data['credential_id_by_account_group']
used_account_groups = {get_account_group(exch)
for exch in self.app.data.get('depthcache_instances', {})}
used_account_groups.discard(None)
for account_group in used_account_groups:
credential = await self.app.ubdcc_assign_credentials(account_group=account_group)
new_id = credential.get('id') if credential else None
if cache.get(account_group) == new_id:
continue
api_key = credential.get('api_key') if credential else None
api_secret = credential.get('api_secret') if credential else None
for ldc in self._ldcs_for_account_group(account_group):
try:
ldc.set_credentials(api_key=api_key, api_secret=api_secret)
except Exception as error_msg:
self.app.stdout_msg(f"set_credentials() failed for '{account_group}': "
f"{error_msg}", log="error")
cache[account_group] = new_id
if new_id is not None:
self.app.stdout_msg(f"Assigned credential '{new_id}' for account_group "
f"'{account_group}'.", log="info")
else:
self.app.stdout_msg(f"No credential available for account_group "
f"'{account_group}' — using public rate limits.",
log="info")
[docs]
async def main(self):
self.app.data['depthcache_instances'] = {}
self.app.data['local_depthcaches'] = []
self.app.data['responsibilities'] = []
self.app.data['credential_id_by_account_group'] = {}
await self.start_rest_server(endpoints=RestEndpoints)
self.app.set_status_running()
await self.app.register_or_restart(ubldc_version=ubldc_version)
self.db_init()
while self.app.is_shutdown() is False:
await self.app.sleep()
await self.app.ubdcc_node_sync()
await self._drain_restart_queue()
await self._sync_credentials()
self.app.data['responsibilities'] = self.db.get_dcn_responsibilities()
self.app.stdout_msg(f"Local DepthCaches: {self.app.data['local_depthcaches']}", log="debug", stdout=False)
self.app.stdout_msg(f"Responsibilities: {self.app.data['responsibilities']}", log="debug", stdout=False)
for dc in self.app.data['responsibilities']:
if self.app.is_shutdown() is True:
break
if dc not in self.app.data['local_depthcaches']:
# Create DC
self.app.stdout_msg(f"Adding local DC: {dc}", log="info")
if self.app.data['depthcache_instances'].get(dc['exchange']) is None:
self.app.data['depthcache_instances'][dc['exchange']] = {}
if self.app.data['depthcache_instances'][dc['exchange']].get(dc['update_interval']) is None:
on_restart = partial(self._on_stream_restart, dc['exchange'])
kwargs = {"exchange": dc['exchange'], "on_restart": on_restart}
if dc['update_interval'] is not None:
kwargs['depth_cache_update_interval'] = dc['update_interval']
# The UBRA is managed by UBLDC itself (no ubra_manager
# kwarg); credentials are propagated immediately below
# so the initial snapshot already runs with an
# authenticated key when one is assigned.
self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']] = \
BinanceLocalDepthCacheManager(**kwargs)
await self._sync_credentials()
else:
self.app.data['depthcache_instances'][dc['exchange']][dc['update_interval']].create_depthcache(
markets=dc['market'],
refresh_interval=dc['refresh_interval']
)
await self.app.ubdcc_update_depthcache_distribution(exchange=dc['exchange'],
market=dc['market'],
status="running")
self.app.data['local_depthcaches'].append(dc)
await self.app.ubdcc_node_sync()
stop_depthcaches = {}
for dc in self.app.data['local_depthcaches']:
if self.app.is_shutdown() is True:
break
if dc not in self.app.data['responsibilities']:
# Stop DC
self.app.stdout_msg(f"Removing local DC: {dc}", log="info")
if stop_depthcaches.get(dc['exchange']) is None:
stop_depthcaches[dc['exchange']] = {dc['update_interval']: {'markets': [dc['market']]}}
else:
if stop_depthcaches[dc['exchange']].get(dc['update_interval']) is None:
stop_depthcaches[dc['exchange']] = {dc['update_interval']: {'markets': [dc['market']]}}
else:
stop_depthcaches[dc['exchange']][dc['update_interval']]['markets'].append(dc['market'])
self.app.data['local_depthcaches'].remove(dc)
for exchange in stop_depthcaches:
for update_interval in stop_depthcaches[exchange]:
try:
self.app.data['depthcache_instances'][exchange][update_interval].stop_depthcache(
markets=stop_depthcaches[exchange][update_interval]['markets']
)
except DepthCacheNotFound as error_msg:
self.app.stdout_msg(f"DepthCache not found: {error_msg}", log="error")
self.app.stdout_msg(f"Stopping all DepthCache instances ...", log="error")
for dc in self.app.data['local_depthcaches']:
for update_interval in self.app.data['depthcache_instances'][dc['exchange']]:
self.app.data['depthcache_instances'][dc['exchange']][update_interval].stop_manager()
async def _drain_restart_queue(self) -> None:
"""
Drain pending stream-restart events from UBLDC's on_restart callback
and forward each one to mgmt. Runs once per main-loop iteration.
"""
while True:
try:
exchange, market, timestamp = self._restart_queue.get_nowait()
except queue.Empty:
return
await self.app.ubdcc_update_depthcache_distribution(
exchange=exchange,
market=market,
last_restart_time=timestamp,
)