from typing import Optional, Dict, List
from rx.subject import BehaviorSubject
from pypergraph.core.cross_platform.di.rest_client import RESTClient
from pypergraph.network.models.account import Balance
from pypergraph.network.api import Layer0Api
from pypergraph.network.api import Layer1Api
from pypergraph.network.api import BlockExplorerApi
from pypergraph.network.models.transaction import (
PendingTransaction,
SignedTransaction,
TransactionReference,
)
from pypergraph.network.models.block_explorer import Snapshot, Transaction
from pypergraph.network.models.network import NetworkInfo
from pypergraph.core.exceptions import NetworkError
import logging
# Get a logger for this specific module
logger = logging.getLogger(__name__)
[docs]
class DagTokenNetwork:
def __init__(
self,
network_id: str = "mainnet",
l0_host: Optional[str] = None,
currency_l1_host: Optional[str] = None,
client: Optional[RESTClient] = None,
):
# Initialize connected network info
self.connected_network = NetworkInfo(
network_id=network_id, l0_host=l0_host, currency_l1_host=currency_l1_host
)
self.be_api = (
BlockExplorerApi(
host=self.connected_network.block_explorer_url, client=client
)
if self.connected_network.block_explorer_url
else None
)
self.l0_api = Layer0Api(
host=self.connected_network.l0_host
or f"https://l0-lb-{network_id}.constellationnetwork.io",
client=client,
)
self.cl1_api = Layer1Api(
host=self.connected_network.currency_l1_host
or f"https://l1-lb-{network_id}.constellationnetwork.io",
client=client,
)
self._network_change = BehaviorSubject(
{
"module": "network",
"type": "network_change",
"event": self.get_network(),
}
)
[docs]
def config(
self,
network_id: str = None,
block_explorer_url: Optional[str] = None,
l0_host: Optional[str] = None,
currency_l1_host: Optional[str] = None,
):
"""
Reconfigure the network; new configuration is applied only if different from the current one.
"""
new_info = NetworkInfo(
network_id=network_id,
block_explorer_url=block_explorer_url,
l0_host=l0_host,
currency_l1_host=currency_l1_host,
)
self.set_network(new_info)
[docs]
def set_network(self, network_info: NetworkInfo):
if network_info.network_id not in (
"mainnet",
"integrationnet",
"testnet",
None,
):
raise ValueError("DagTokenNetwork :: Invalid network id.")
if self.connected_network.__dict__ != network_info.__dict__:
self.connected_network = network_info
self.be_api.config(network_info.block_explorer_url) # Block Explorer
self.l0_api.config(network_info.l0_host)
self.cl1_api.config(network_info.currency_l1_host) # Currency layer
# Emit a network change event
self._network_change.on_next(
{
"module": "network",
"type": "network_change",
"event": self.get_network(),
}
)
[docs]
def get_network(self) -> Dict:
"""
Returns the DagTokenNetwork NetworkInfo object as dictionary.
:return: Serialized NetworkInfo object.
"""
return self.connected_network.__dict__
[docs]
async def get_address_balance(self, address: str) -> Balance:
return await self.l0_api.get_address_balance(address)
[docs]
async def get_address_last_accepted_transaction_ref(
self, address: str
) -> TransactionReference:
"""
Get the last transaction hash and ordinal from DAG address.
:param address:
:return: Object with ordinal and transaction hash.
"""
return await self.cl1_api.get_last_reference(address)
[docs]
async def get_pending_transaction(self, hash: str) -> PendingTransaction:
"""
Check if the given transaction is pending.
:param hash:
:return: Object if transaction is pending, else log error.
"""
try:
return await self.cl1_api.get_pending_transaction(hash)
except NetworkError as e:
# NOOP for 404 or other exceptions
if e.status == 404:
logger.debug("DagTokenNetwork :: No transaction pending.")
else:
logger.error(f"DagTokenNetwork :: {e}")
raise e
[docs]
async def get_transactions_by_address(
self,
address: str,
limit: Optional[int] = None,
search_after: Optional[str] = None,
) -> List[Transaction]:
"""
Get all address-specific transactions from a given timestamp.
:param address: DAG address.
:param limit: Limit per page.
:param search_after: Timestamp.
:return: List of BlockExplorerTransaction objects.
"""
try:
return await self.be_api.get_transactions_by_address(
address, limit, search_after
)
except Exception:
# NOOP for 404 or other exceptions
logger.info(f"DagTokenNetwork :: No transactions found for {address}.")
[docs]
async def get_transaction(self, hash: str) -> Transaction:
"""
Get the given transaction from the block explorer.
:param hash: Transaction hash.
:return: BlockExplorerTransaction object.
"""
try:
return await self.be_api.get_transaction(hash)
except Exception:
# NOOP for 404 or other exceptions
logger.info("DagTokenNetwork :: No transaction found.")
[docs]
async def post_transaction(self, tx: SignedTransaction) -> str:
"""
Post a signed transaction to layer 1.
:param tx: Signed transaction.
:return: Transaction hash.
"""
response = await self.cl1_api.post_transaction(tx)
# Support both data/meta format and object return format
return response.get("data", {}).get("hash") or response.get("hash")
[docs]
async def get_latest_snapshot(self) -> Snapshot:
"""
Get the latest snapshot from the block explorer.
:return: Snapshot object.
"""
response = await self.be_api.get_latest_snapshot()
return response
[docs]
async def post_delegate_stake(self, tx: dict) -> str:
"""
Delegate stake on L0.
I believe this is a one-time thing for node operators to make the node available for delegated stake?
:param tx: Signed transaction.
:return: Transaction hash.
"""
response = await self.l0_api.post_delegated_stake(tx)
# Support both data/meta format and object return format
return response.get("data", {}).get("hash") or response.get("hash")