147 lines
5.1 KiB
Python
147 lines
5.1 KiB
Python
import os
|
|
import logging
|
|
import socket
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def resolve_address(address: str) -> str:
|
|
"""Resolve hostname to IP address for TigerBeetle client.
|
|
|
|
TigerBeetle Python client doesn't handle DNS resolution properly,
|
|
so we need to resolve hostnames to IPs before connecting.
|
|
"""
|
|
if ":" in address:
|
|
host, port = address.rsplit(":", 1)
|
|
else:
|
|
host, port = address, "3000"
|
|
|
|
try:
|
|
ip = socket.gethostbyname(host)
|
|
resolved = f"{ip}:{port}"
|
|
logger.info(f"Resolved {address} to {resolved}")
|
|
return resolved
|
|
except socket.gaierror as e:
|
|
logger.warning(f"Failed to resolve {host}: {e}, using original address")
|
|
return address
|
|
|
|
|
|
class TigerBeetleClient:
|
|
"""
|
|
Lazy-initialized TigerBeetle client singleton.
|
|
Connection is only established on first actual use.
|
|
|
|
IMPORTANT: TigerBeetle Python client requires io_uring.
|
|
Docker container must run with: --security-opt seccomp=unconfined
|
|
"""
|
|
_instance = None
|
|
_initialized = False
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super(TigerBeetleClient, cls).__new__(cls)
|
|
cls._instance._client = None
|
|
cls._instance._initialized = False
|
|
return cls._instance
|
|
|
|
def _ensure_connected(self):
|
|
"""Lazy initialization - connect only when needed."""
|
|
if self._initialized:
|
|
return self._client is not None
|
|
|
|
self._initialized = True
|
|
self.cluster_id = int(os.getenv("TB_CLUSTER_ID", "0"))
|
|
raw_address = os.getenv("TB_ADDRESS", "127.0.0.1:3000")
|
|
self.replica_addresses = resolve_address(raw_address)
|
|
|
|
try:
|
|
import tigerbeetle as tb
|
|
self._client = tb.ClientSync(
|
|
cluster_id=self.cluster_id, replica_addresses=self.replica_addresses
|
|
)
|
|
logger.info(f"Connected to TigerBeetle cluster {self.cluster_id} at {self.replica_addresses}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to TigerBeetle: {e}")
|
|
self._client = None
|
|
return False
|
|
|
|
def close(self):
|
|
if self._client:
|
|
self._client.close()
|
|
logger.info("TigerBeetle client closed.")
|
|
|
|
def create_accounts(self, accounts):
|
|
"""Create accounts in TigerBeetle."""
|
|
if not self._ensure_connected():
|
|
logger.error("TigerBeetle client not available.")
|
|
return [Exception("TigerBeetle not connected")]
|
|
|
|
try:
|
|
errors = self._client.create_accounts(accounts)
|
|
if errors:
|
|
for error in errors:
|
|
if hasattr(error, 'error'):
|
|
logger.error(f"Error creating account: {error.error.name}")
|
|
else:
|
|
logger.error(f"Error creating account: {error}")
|
|
return errors
|
|
except Exception as e:
|
|
logger.error(f"Exception during account creation: {e}")
|
|
return [e]
|
|
|
|
def create_transfers(self, transfers):
|
|
"""Create transfers in TigerBeetle."""
|
|
if not self._ensure_connected():
|
|
logger.error("TigerBeetle client not available.")
|
|
return [Exception("TigerBeetle not connected")]
|
|
|
|
try:
|
|
errors = self._client.create_transfers(transfers)
|
|
if errors:
|
|
for error in errors:
|
|
if hasattr(error, 'error'):
|
|
logger.error(f"Error creating transfer: {error.error.name}")
|
|
else:
|
|
logger.error(f"Error creating transfer: {error}")
|
|
return errors
|
|
except Exception as e:
|
|
logger.error(f"Exception during transfer creation: {e}")
|
|
return [e]
|
|
|
|
def lookup_accounts(self, account_ids: list[int]):
|
|
"""Look up accounts in TigerBeetle."""
|
|
if not self._ensure_connected():
|
|
logger.error("TigerBeetle client not available.")
|
|
return []
|
|
try:
|
|
accounts = self._client.lookup_accounts(account_ids)
|
|
return accounts
|
|
except Exception as e:
|
|
logger.error(f"Exception during account lookup: {e}")
|
|
return []
|
|
|
|
def get_account_transfers(self, account_id: int, limit: int = 50):
|
|
"""Get transfers for an account from TigerBeetle."""
|
|
if not self._ensure_connected():
|
|
logger.error("TigerBeetle client not available.")
|
|
return []
|
|
try:
|
|
import tigerbeetle as tb
|
|
account_filter = tb.AccountFilter(
|
|
account_id=account_id,
|
|
timestamp_min=0,
|
|
timestamp_max=0,
|
|
limit=limit,
|
|
flags=tb.AccountFilterFlags.CREDITS | tb.AccountFilterFlags.DEBITS | tb.AccountFilterFlags.REVERSED,
|
|
)
|
|
transfers = self._client.get_account_transfers(account_filter)
|
|
return transfers
|
|
except Exception as e:
|
|
logger.error(f"Exception during get_account_transfers: {e}")
|
|
return []
|
|
|
|
|
|
# Singleton instance (lazy - doesn't connect until first use)
|
|
tigerbeetle_client = TigerBeetleClient()
|