import os import logging import socket logger = logging.getLogger(__name__) def resolve_address(address: str) -> str: """Resolve hostname to IP address for TigerBeetle client. TigerBeetle Python client doesn't handle DNS resolution properly, so we need to resolve hostnames to IPs before connecting. """ if ":" in address: host, port = address.rsplit(":", 1) else: host, port = address, "3000" try: ip = socket.gethostbyname(host) resolved = f"{ip}:{port}" logger.info(f"Resolved {address} to {resolved}") return resolved except socket.gaierror as e: logger.warning(f"Failed to resolve {host}: {e}, using original address") return address class TigerBeetleClient: """ Lazy-initialized TigerBeetle client singleton. Connection is only established on first actual use. IMPORTANT: TigerBeetle Python client requires io_uring. Docker container must run with: --security-opt seccomp=unconfined """ _instance = None _initialized = False def __new__(cls): if cls._instance is None: cls._instance = super(TigerBeetleClient, cls).__new__(cls) cls._instance._client = None cls._instance._initialized = False return cls._instance def _ensure_connected(self): """Lazy initialization - connect only when needed.""" if self._initialized: return self._client is not None self._initialized = True self.cluster_id = int(os.getenv("TB_CLUSTER_ID", "0")) raw_address = os.getenv("TB_ADDRESS", "127.0.0.1:3000") self.replica_addresses = resolve_address(raw_address) try: import tigerbeetle as tb self._client = tb.ClientSync( cluster_id=self.cluster_id, replica_addresses=self.replica_addresses ) logger.info(f"Connected to TigerBeetle cluster {self.cluster_id} at {self.replica_addresses}") return True except Exception as e: logger.error(f"Failed to connect to TigerBeetle: {e}") self._client = None return False def close(self): if self._client: self._client.close() logger.info("TigerBeetle client closed.") def create_accounts(self, accounts): """Create accounts in TigerBeetle.""" if not self._ensure_connected(): logger.error("TigerBeetle client not available.") return [Exception("TigerBeetle not connected")] try: errors = self._client.create_accounts(accounts) if errors: for error in errors: if hasattr(error, 'error'): logger.error(f"Error creating account: {error.error.name}") else: logger.error(f"Error creating account: {error}") return errors except Exception as e: logger.error(f"Exception during account creation: {e}") return [e] def create_transfers(self, transfers): """Create transfers in TigerBeetle.""" if not self._ensure_connected(): logger.error("TigerBeetle client not available.") return [Exception("TigerBeetle not connected")] try: errors = self._client.create_transfers(transfers) if errors: for error in errors: if hasattr(error, 'error'): logger.error(f"Error creating transfer: {error.error.name}") else: logger.error(f"Error creating transfer: {error}") return errors except Exception as e: logger.error(f"Exception during transfer creation: {e}") return [e] def lookup_accounts(self, account_ids: list[int]): """Look up accounts in TigerBeetle.""" if not self._ensure_connected(): logger.error("TigerBeetle client not available.") return [] try: accounts = self._client.lookup_accounts(account_ids) return accounts except Exception as e: logger.error(f"Exception during account lookup: {e}") return [] def get_account_transfers(self, account_id: int, limit: int = 50): """Get transfers for an account from TigerBeetle.""" if not self._ensure_connected(): logger.error("TigerBeetle client not available.") return [] try: import tigerbeetle as tb account_filter = tb.AccountFilter( account_id=account_id, timestamp_min=0, timestamp_max=0, limit=limit, flags=tb.AccountFilterFlags.CREDITS | tb.AccountFilterFlags.DEBITS | tb.AccountFilterFlags.REVERSED, ) transfers = self._client.get_account_transfers(account_filter) return transfers except Exception as e: logger.error(f"Exception during get_account_transfers: {e}") return [] # Singleton instance (lazy - doesn't connect until first use) tigerbeetle_client = TigerBeetleClient()