Source code for ubdcc_shared_modules.Database

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ¯\_(ツ)_/¯
#
# File: packages/ubdcc-mgmt/ubdcc_mgmt/Database.py
#
# Project website: https://github.com/oliver-zehentleitner/unicorn-binance-depth-cache-cluster
# Github: https://github.com/oliver-zehentleitner/unicorn-binance-depth-cache-cluster
# Documentation: https://oliver-zehentleitner.github.io/unicorn-binance-depth-cache-cluster
# PyPI: https://pypi.org/project/ubdcc-shared-modules
#
# License: MIT
# https://github.com/oliver-zehentleitner/unicorn-binance-depth-cache-cluster/blob/master/LICENSE
#
# Author: Oliver Zehentleitner
#
# Copyright (c) 2024-2026, Oliver Zehentleitner (https://about.me/oliver-zehentleitner)
# All rights reserved.

import threading
import time
import uuid


[docs] class Database: def __init__(self, app=None): self.app = app self.app.data['db'] = self self.data = {} self.data_lock = threading.Lock() self._init() def _init(self) -> bool: self.app.stdout_msg(f"Initiating Database ...", log="info") self.set(key="depthcaches", value={}) self.set(key="nodes", value={}) self.set(key="pods", value={}) self.set(key="credentials", value={}) self.set(key="timestamp", value=float()) if self.app.info['name'] == "ubdcc-mgmt": self.update_nodes() return True def _set_update_timestamp(self) -> bool: self.data['timestamp'] = self.app.get_unix_timestamp() return True
[docs] def is_empty(self) -> bool: if len(self.data['pods']) == 0 and \ len(self.data['depthcaches']) == 0: return True return False
[docs] def add_depthcache(self, exchange: str = None, market: str = None, desired_quantity: int = None, update_interval: int = None, refresh_interval: int = None) -> bool: if exchange is None or market is None: raise ValueError("Missing mandatory parameter: exchange, market") if desired_quantity is None or desired_quantity == "None": desired_quantity = 1 else: desired_quantity = int(desired_quantity) if update_interval is None or update_interval == "None": update_interval = None else: update_interval = int(update_interval) if refresh_interval is None or refresh_interval == "None": refresh_interval = None else: refresh_interval = int(refresh_interval) depthcache = {"CREATED_TIME": self.app.get_unix_timestamp(), "DESIRED_QUANTITY": desired_quantity, "DISTRIBUTION": {}, "DISTRIBUTION_CHANGES": 0, "EXCHANGE": exchange, "LAST_DISTRIBUTION_CHANGE": 0, "REFRESH_INTERVAL": refresh_interval, "MARKET": market, "UPDATE_INTERVAL": update_interval} with self.data_lock: if self.data['depthcaches'].get(exchange) is None: self.data['depthcaches'][exchange] = {} self.data['depthcaches'][exchange][market] = depthcache self._set_update_timestamp() return True
[docs] def add_depthcache_distribution(self, exchange: str = None, market: str = None, pod_uid: str = None, scheduled_start_time: float = None) -> bool: if exchange is None or market is None or pod_uid is None or scheduled_start_time is None: raise ValueError("Missing mandatory parameter: exchange, pod_uid, market, scheduled_start_time") distribution = {"CREATED_TIME": self.app.get_unix_timestamp(), "LAST_RESTART_TIME": 0, "POD_UID": pod_uid, "RESTARTS": 0, "SCHEDULED_START_TIME": scheduled_start_time, "STATUS": "starting"} with self.data_lock: self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid] = distribution self.data['depthcaches'][exchange][market]['DISTRIBUTION_CHANGES'] = \ self.data['depthcaches'][exchange][market].get('DISTRIBUTION_CHANGES', 0) + 1 self.data['depthcaches'][exchange][market]['LAST_DISTRIBUTION_CHANGE'] = \ self.app.get_unix_timestamp() self._set_update_timestamp() return True
[docs] def add_pod(self, name: str = None, uid: str = None, node: str = None, role: str = None, ip: str = None, api_port_rest: int = None, status: str = None, ubldc_version: str = None, version: str = None) -> bool: if uid is None: raise ValueError("Missing mandatory parameter: uid") pod = {"NAME": name, "UID": uid, "NODE": node, "ROLE": role, "IP": ip, "API_PORT_REST": api_port_rest, "LAST_SEEN": self.app.get_unix_timestamp(), "STATUS": status, "UBLDC_VERSION": ubldc_version, "VERSION": version} if ubldc_version is None: del pod['UBLDC_VERSION'] with self.data_lock: self.data['pods'][uid] = pod self._set_update_timestamp() return True
[docs] def add_credentials(self, account_group: str = None, api_key: str = None, api_secret: str = None) -> str: """Store a Binance API key pair for the given account group. Returns the generated credential id (UUID). Raises ValueError on invalid parameters or unknown account_group.""" if account_group is None or api_key is None or api_secret is None: raise ValueError("Missing mandatory parameter: account_group, api_key, api_secret") from .AccountGroups import is_valid_account_group if not is_valid_account_group(account_group): raise ValueError(f"Invalid account_group: {account_group}") credential_id = str(uuid.uuid4()) credential = {"ID": credential_id, "ACCOUNT_GROUP": account_group, "API_KEY": api_key, "API_SECRET": api_secret, "ADDED_TIMESTAMP": self.app.get_unix_timestamp(), "ASSIGNED_DCNS": []} with self.data_lock: if self.data.get('credentials') is None: self.data['credentials'] = {} self.data['credentials'][credential_id] = credential self._set_update_timestamp() self.rebalance_account_group(account_group) return credential_id
[docs] def rebalance_account_group(self, account_group: str = None) -> bool: """Redistribute all active DCN uids evenly (round-robin) over the credentials available for this account_group. Called whenever the credential set or the DCN population for this group changes. Returns True when the assignment actually changed.""" if account_group is None: raise ValueError("Missing mandatory parameter: account_group") with self.data_lock: credentials = self.data.get('credentials') or {} candidates = [c for c in credentials.values() if c['ACCOUNT_GROUP'] == account_group] dcn_uids = sorted(uid for uid, pod in self.data.get('pods', {}).items() if pod.get('ROLE') == "ubdcc-dcn") before = {c['ID']: list(c['ASSIGNED_DCNS']) for c in candidates} if not candidates: # no keys for this group — nothing to do (the credentials dict # is empty for this group anyway) return False for c in candidates: c['ASSIGNED_DCNS'] = [] for i, uid in enumerate(dcn_uids): candidates[i % len(candidates)]['ASSIGNED_DCNS'].append(uid) after = {c['ID']: list(c['ASSIGNED_DCNS']) for c in candidates} if before != after: self._set_update_timestamp() self.app.stdout_msg(f"Rebalanced credentials for '{account_group}': " f"{len(dcn_uids)} DCN(s) over {len(candidates)} key(s)", log="info") return True return False
[docs] def assign_credentials(self, uid: str = None, account_group: str = None) -> dict | None: """Assign (or return already-assigned) credential to a DCN uid. Load-balanced: picks credential with fewest ASSIGNED_DCNS. Returns full credential dict (incl. secret) or None if no keys available. """ if uid is None or account_group is None: raise ValueError("Missing mandatory parameter: uid, account_group") with self.data_lock: credentials = self.data.get('credentials') or {} candidates = [c for c in credentials.values() if c['ACCOUNT_GROUP'] == account_group] if not candidates: return None for c in candidates: if uid in c['ASSIGNED_DCNS']: return dict(c) chosen = min(candidates, key=lambda c: len(c['ASSIGNED_DCNS'])) chosen['ASSIGNED_DCNS'].append(uid) self._set_update_timestamp() return dict(chosen)
[docs] def delete_credentials(self, credential_id: str = None) -> bool: """Remove a stored credential. Returns True on success, False when the id was not found.""" if credential_id is None: raise ValueError("Missing mandatory parameter: credential_id") with self.data_lock: credentials = self.data.get('credentials') or {} if credential_id not in credentials: return False account_group = credentials[credential_id]['ACCOUNT_GROUP'] del credentials[credential_id] self._set_update_timestamp() self.rebalance_account_group(account_group) return True
[docs] def get_credentials(self, credential_id: str = None) -> dict | None: """Return the full credential record (incl. secret) for the given id, or None if not found. Internal use only — do not leak through public endpoints.""" with self.data_lock: credentials = self.data.get('credentials') or {} c = credentials.get(credential_id) return dict(c) if c else None
[docs] def get_credentials_list(self, reveal_secrets: bool = False) -> list: """Return list of credentials. With reveal_secrets=False (default) api_key/api_secret are masked for public consumption.""" result = [] with self.data_lock: credentials = self.data.get('credentials') or {} for c in credentials.values(): entry = {"id": c['ID'], "account_group": c['ACCOUNT_GROUP'], "added_timestamp": c['ADDED_TIMESTAMP'], "assigned_dcns": list(c['ASSIGNED_DCNS'])} if reveal_secrets: entry['api_key'] = c['API_KEY'] entry['api_secret'] = c['API_SECRET'] else: entry['api_key_preview'] = self._mask(c['API_KEY']) result.append(entry) return result
[docs] def release_credentials(self, uid: str = None) -> bool: """Remove uid from ASSIGNED_DCNS of all credentials (called when DCN pod disappears).""" if uid is None: raise ValueError("Missing mandatory parameter: uid") with self.data_lock: credentials = self.data.get('credentials') or {} changed = False for c in credentials.values(): if uid in c['ASSIGNED_DCNS']: c['ASSIGNED_DCNS'].remove(uid) changed = True if changed: self._set_update_timestamp() return changed
@staticmethod def _mask(secret: str) -> str: if not secret: return "" if len(secret) <= 6: return "*" * len(secret) return secret[:4] + "*" * (len(secret) - 6) + secret[-2:]
[docs] def delete(self, key: str = None) -> bool: with self.data_lock: if key in self.data: del self.data[key] self._set_update_timestamp() self.app.stdout_msg(f"DB entry deleted: {key}", log="debug", stdout=False) return True self.app.stdout_msg(f"DB entry {key} not found.", log="debug", stdout=False) return False
[docs] def delete_depthcache(self, exchange: str = None, market: str = None) -> bool: if exchange is None or market is None: raise ValueError("Missing mandatory parameter: exchange, market") with self.data_lock: try: del self.data["depthcaches"][exchange][market] except KeyError: return True self._set_update_timestamp() self.app.stdout_msg(f"DB depthcaches deleted: {exchange}, {market}", log="debug") return True
[docs] def delete_depthcache_distribution(self, exchange: str = None, market: str = None, pod_uid: str = None) -> bool: if exchange is None or market is None or pod_uid is None: raise ValueError("Missing mandatory parameter: exchange, pod_uid, market") with self.data_lock: del self.data["depthcaches"][exchange][market]['DISTRIBUTION'][pod_uid] self.data['depthcaches'][exchange][market]['DISTRIBUTION_CHANGES'] = \ self.data['depthcaches'][exchange][market].get('DISTRIBUTION_CHANGES', 0) + 1 self.data['depthcaches'][exchange][market]['LAST_DISTRIBUTION_CHANGE'] = \ self.app.get_unix_timestamp() self._set_update_timestamp() self.app.stdout_msg(f"DB depthcache distribution deleted: {exchange}, {market}, {pod_uid}", log="debug") return True
[docs] def delete_pod(self, uid: str = None) -> bool: if uid is None: raise ValueError("Missing mandatory parameter: uid") with self.data_lock: del self.data["pods"][uid] self._set_update_timestamp() self.app.stdout_msg(f"DB pod deleted: {uid}", log="debug", stdout=True) return True
[docs] def delete_old_pods(self) -> bool: old_pods = [] max_age = 60 with self.data_lock: for uid in self.data['pods']: if (time.time() - max_age) > self.data['pods'][uid]['LAST_SEEN']: old_pods.append(uid) for uid in old_pods: self.delete_pod(uid=uid) return True
[docs] def exists_depthcache(self, exchange: str = None, market: str = None) -> bool: if exchange is None or market is None: raise ValueError("Missing mandatory parameter: exchange, market") with self.data_lock: try: return market in self.data['depthcaches'][exchange] except KeyError: return False
[docs] def exists_pod(self, uid: str = None) -> bool: if uid is None: raise ValueError("Missing mandatory parameter: uid") return uid in self.data['pods']
[docs] def get(self, key: str = None): with self.data_lock: return self.data.get(key)
[docs] def get_all(self) -> dict: with self.data_lock: return self.data
[docs] def get_available_dcn_pods(self) -> dict: available_dcn_pods = {} for uid in self.data['pods']: if self.data['pods'][uid]['ROLE'] == "ubdcc-dcn": try: available_dcn_pods[uid] = self.data['nodes'][self.data['pods'][uid]['NODE']]['USAGE_CPU_PERCENT'] except KeyError: available_dcn_pods[uid] = 0 return available_dcn_pods
[docs] def get_backup_dict(self) -> dict: with self.data_lock: return self.app.sort_dict(input_dict=self.app.data['db'].data)
[docs] def get_best_dcn(self, available_pods: dict = None, excluded_pods: list = None) -> str | None: if available_pods is None: available_pods = self.get_available_dcn_pods() for uid in excluded_pods: try: del available_pods[uid] except KeyError: pass available_pods_uid: list = [uid for uid in available_pods.keys()] return self.app.get_dcn_uid_unused_longest_time(selection=available_pods_uid)
[docs] def get_dcn_responsibilities(self) -> list: with (self.data_lock): responsibilities = [] for exchange in self.data['depthcaches']: for market in self.data['depthcaches'][exchange]: for pod_uid in self.data['depthcaches'][exchange][market]['DISTRIBUTION']: if pod_uid == self.app.id['uid'] and \ self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid]['SCHEDULED_START_TIME'] < \ self.app.get_unix_timestamp(): responsibilities.append({"exchange": exchange, "market": market, "refresh_interval": self.data['depthcaches'][exchange][market]['REFRESH_INTERVAL'], "update_interval": self.data['depthcaches'][exchange][market]['UPDATE_INTERVAL']}) return responsibilities
[docs] def get_depthcache_list(self) -> dict: with self.data_lock: try: return self.data['depthcaches'] except KeyError: return {}
[docs] def get_depthcache_info(self, exchange: str = None, market: str = None) -> dict: if exchange is None or market is None: raise ValueError("Missing mandatory parameter: exchange, market") with self.data_lock: try: return self.data['depthcaches'][exchange][market] except KeyError: return {}
[docs] def get_pod_by_address(self, address: str = None) -> dict | None: if address is None: raise ValueError("Missing mandatory parameter: address") with self.data_lock: try: for uid in self.data['pods']: if self.data['pods'][uid]['IP'] == address: return self.data['pods'][uid] except KeyError: return None
[docs] def get_pod_by_uid(self, uid=None) -> dict | None: if uid is None: raise ValueError("Missing mandatory parameter: uid") with self.data_lock: try: return self.data['pods'][uid] except KeyError: return None
[docs] def get_responsible_dcn_addresses(self, exchange: str = None, market: str = None) -> list: with self.data_lock: responsible_dcn = [] try: for pod_uid in self.data['depthcaches'][exchange][market]['DISTRIBUTION']: if self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid]['STATUS'] == "running": responsible_dcn.append([self.data['pods'][pod_uid]['IP'], self.data['pods'][pod_uid]['API_PORT_REST'], pod_uid]) except KeyError: pass return responsible_dcn
[docs] def get_worst_dcn(self, available_pods: dict | None = None, excluded_pods: list = None) -> str | None: if available_pods is None: available_pods = self.get_available_dcn_pods() # Todo: Find worst pod (cpu or subscriptions or ...) worst_pod = None for uid in available_pods: if uid not in excluded_pods: worst_pod = uid return worst_pod
[docs] def replace_data(self, data: dict = None): with self.data_lock: self.data = data return True
[docs] def remove_orphaned_distribution_entries(self) -> bool: with self.data_lock: remove_distributions = [] for exchange in self.data['depthcaches']: for market in self.data['depthcaches'][exchange]: for pod_uid in self.data['depthcaches'][exchange][market]['DISTRIBUTION']: if self.exists_pod(uid=pod_uid) is False: remove_distributions.append({"exchange": exchange, "market": market, "pod_uid": pod_uid}) with self.data_lock: for item in remove_distributions: del self.data['depthcaches'][item['exchange']][item['market']]['DISTRIBUTION'][item['pod_uid']] self._set_update_timestamp() return True
[docs] def remove_orphaned_credential_assignments(self) -> bool: with self.data_lock: existing_pods = set(self.data.get('pods', {}).keys()) credentials = self.data.get('credentials') or {} changed = False for c in credentials.values(): new_list = [uid for uid in c['ASSIGNED_DCNS'] if uid in existing_pods] if len(new_list) != len(c['ASSIGNED_DCNS']): c['ASSIGNED_DCNS'] = new_list changed = True if changed: self._set_update_timestamp() return True
[docs] def revise(self) -> bool: start_time = time.time() self.app.stdout_msg(f"Revise the Database ...", log="info") self.update_nodes() self.delete_old_pods() self.remove_orphaned_distribution_entries() self.remove_orphaned_credential_assignments() self.rebalance_credential_assignments_if_needed() self.manage_distribution() run_time = time.time() - start_time self.app.stdout_msg(f"Database revised in {run_time} seconds!", log="info") return True
[docs] def rebalance_credential_assignments_if_needed(self) -> bool: """Detect DCN-population changes since the last revise() and rebalance affected account_groups. Keeps every active DCN exactly one assigned credential per account_group (or none if no key is configured).""" with self.data_lock: credentials = self.data.get('credentials') or {} current_dcns = {uid for uid, pod in self.data.get('pods', {}).items() if pod.get('ROLE') == "ubdcc-dcn"} groups_with_creds = {c['ACCOUNT_GROUP'] for c in credentials.values()} assigned_by_group: dict[str, set[str]] = {g: set() for g in groups_with_creds} for c in credentials.values(): assigned_by_group[c['ACCOUNT_GROUP']].update(c['ASSIGNED_DCNS']) changed = False for group in groups_with_creds: if assigned_by_group[group] != current_dcns: if self.rebalance_account_group(account_group=group): changed = True return changed
[docs] def manage_distribution(self) -> bool: add_distributions = [] remove_distributions = [] with self.data_lock: for exchange in self.data['depthcaches']: print(f"exchange={exchange}") for market in self.data['depthcaches'][exchange]: print(f"market={market}") existing_distribution = [] for pod_uid in self.data['depthcaches'][exchange][market]['DISTRIBUTION']: existing_distribution.append(pod_uid) existing_quantity = len(self.data['depthcaches'][exchange][market]['DISTRIBUTION']) desired_quantity = self.data['depthcaches'][exchange][market]['DESIRED_QUANTITY'] if existing_quantity < desired_quantity: add_quantity = desired_quantity - existing_quantity delayed_start_time: float = 300.0 for i in range(0, add_quantity): best_dcn = self.get_best_dcn(excluded_pods=existing_distribution) print(f"best_dcn={best_dcn}") if best_dcn is not None: if existing_quantity > 0: multiplikator = i + existing_quantity else: multiplikator = i scheduled_start_time = self.app.get_unix_timestamp() + float(multiplikator) * delayed_start_time add_distributions.append({"exchange": exchange, "market": market, "pod_uid": best_dcn, "scheduled_start_time": scheduled_start_time}) existing_distribution.append(best_dcn) print(f"ADDED: {add_distributions}") break elif existing_quantity > desired_quantity: remove_quantity = existing_quantity - desired_quantity exclude_dcn = [] available_pods = {} for uid in existing_distribution: available_pods[uid] = uid for _ in range(0, remove_quantity): worst_dcn = self.get_worst_dcn(available_pods=available_pods, excluded_pods=exclude_dcn) if worst_dcn is not None: exclude_dcn.append(worst_dcn) remove_distributions.append({"exchange": exchange, "market": market, "pod_uid": worst_dcn}) for item in add_distributions: self.add_depthcache_distribution(exchange=item['exchange'], market=item['market'], pod_uid=item['pod_uid'], scheduled_start_time=item['scheduled_start_time']) for item in remove_distributions: self.delete_depthcache_distribution(exchange=item['exchange'], market=item['market'], pod_uid=item['pod_uid']) return True
[docs] def set(self, key: str = None, value: dict | str | float | list | set | tuple = None) -> bool: with self.data_lock: self.data[key] = value self._set_update_timestamp() self.app.stdout_msg(f"DB entry added/updated: {key} = {value}", log="debug", stdout=False) return True
[docs] def update_nodes(self) -> bool: nodes = self.app.get_k8s_nodes() if nodes: self.set(key="nodes", value=nodes) self.app.stdout_msg(f"DB all nodes updated!", log="debug", stdout=False) return True else: self.app.stdout_msg(f"Timed update of the DB key 'nodes': Query of the k8s nodes was empty, no " f"update is performed!", log="error", stdout=True) return False
[docs] def update_depthcache(self, desired_quantity: int = None, exchange: str = None, refresh_interval: int = None, market: str = None, update_interval: int = None) -> bool: if exchange is None or market is None: raise ValueError("Missing mandatory parameter: exchange, market") with self.data_lock: if desired_quantity is not None: self.data['depthcaches'][exchange][market]['DESIRED_QUANTITY'] = desired_quantity self._set_update_timestamp() if update_interval is not None: self.data['depthcaches'][exchange][market]['UPDATE_INTERVAL'] = update_interval self._set_update_timestamp() if refresh_interval is not None: self.data['depthcaches'][exchange][market]['REFRESH_INTERVAL'] = refresh_interval self._set_update_timestamp() self.app.stdout_msg(f"DB depthcaches updated: {exchange}, {market}, {desired_quantity}, {update_interval}", log="debug") return True
[docs] def update_depthcache_distribution(self, exchange: str = None, market: str = None, pod_uid: str = None, last_restart_time: float = None, status: str = None) -> bool: if exchange is None or market is None or pod_uid is None: raise ValueError("Missing mandatory parameter: exchange, pod_uid, market") with self.data_lock: if last_restart_time is not None: try: dist = self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid] except KeyError: return False if last_restart_time != dist.get('LAST_RESTART_TIME'): dist['LAST_RESTART_TIME'] = last_restart_time dist['RESTARTS'] = dist.get('RESTARTS', 0) + 1 self._set_update_timestamp() if status is not None: try: self.data['depthcaches'][exchange][market]['DISTRIBUTION'][pod_uid]['STATUS'] = status except KeyError: return False self._set_update_timestamp() self.app.stdout_msg(f"DB depthcache distribution updated: {exchange}, {market}, {pod_uid}, {last_restart_time}," f" {status}", log="debug") return True
[docs] def update_pod(self, uid: str = None, node: str = None, ip: str = None, api_port_rest: int = None, status: str = None) -> bool: if uid is None: raise ValueError("Missing mandatory parameter: uid") with self.data_lock: self.data['pods'][uid]['LAST_SEEN'] = self.app.get_unix_timestamp() if api_port_rest is not None: self.data['pods'][uid]['API_PORT_REST'] = api_port_rest self._set_update_timestamp() if ip is not None: self.data['pods'][uid]['IP'] = ip self._set_update_timestamp() if node is not None: self.data['pods'][uid]['NODE'] = node self._set_update_timestamp() if status is not None: self.data['pods'][uid]['STATUS'] = status self._set_update_timestamp() self.app.stdout_msg(f"DB pod updated: {uid}", log="debug", stdout=False) return True