Source code for pypergraph.network.api.layer_0_api

import logging
from typing import List, Dict, Any, Union, Optional, Literal

from prometheus_client.parser import text_string_to_metric_families

from pypergraph.core.cross_platform.di.rest_client import RESTClient, HttpxClient
from pypergraph.core.cross_platform.rest_api_client import RestAPIClient
from pypergraph.network.models.network import PeerInfo, TotalSupply
from pypergraph.network.models.account import Balance
from pypergraph.network.models.network import Ordinal
from pypergraph.network.models.node_param import (
    SignedUpdateNodeParameters,
    NodeParametersInfo,
    UpdateNodeParameters,
)
from pypergraph.network.models.snapshot import SignedGlobalIncrementalSnapshot
from pypergraph.network.models.transaction import TransactionReference
from pypergraph.network.models.node_collateral import (
    NodeCollateralsInfo,
    SignedCreateNodeCollateral,
    SignedWithdrawNodeCollateral,
)
from pypergraph.network.models.delegated_stake import (
    DelegatedStakesInfo,
    SignedCreateDelegatedStake,
    SignedWithdrawDelegatedStake,
)


def _handle_metrics(response: str) -> List[Dict[str, Any]]:
    """
    Parse Prometheus metrics output from a text response.

    :param response: Prometheus text output.
    :return: List of dictionaries with metric details.
    """
    metrics = []
    for family in text_string_to_metric_families(response):
        for sample in family.samples:
            metrics.append(
                {
                    "name": sample[0],
                    "labels": sample[1],
                    "value": sample[2],
                    "type": family.type,
                }
            )
    return metrics


[docs] class L0Api: def __init__( self, host: str, client: Optional[RESTClient] = None, timeout: int = 25 ): if not host: logging.warning("L0Api | ML0 :: Layer 0 API object not set.") self._host = host self.client = client or HttpxClient(timeout=timeout)
[docs] def config(self, host: Optional[str] = None, client: Optional[RESTClient] = None): """Reconfigure the RestAPIClient.""" if host: self._host = host if client: self.client = client
async def _make_request( self, method: str, endpoint: str, params: Dict[str, Any] = None, payload: Dict[str, Any] = None, ) -> Union[Dict, List, str]: """ Helper function to create a new RestAPIClient instance and make a request. """ async with RestAPIClient(base_url=self._host, client=self.client) as client: return await client.request( method=method, endpoint=endpoint, params=params, payload=payload )
[docs] async def get_cluster_info(self) -> List[PeerInfo]: result = await self._make_request("GET", "/cluster/info") return PeerInfo.process_cluster_peers(data=result)
[docs] async def get_metrics(self) -> List[Dict[str, Any]]: """ Get metrics from the L0 endpoint. :return: Prometheus output as a list of dictionaries. """ response = await self._make_request("GET", "/metrics") return _handle_metrics(response)
[docs] async def get_total_supply(self) -> TotalSupply: result = await self._make_request("GET", "/dag/total-supply") return TotalSupply(**result)
[docs] async def get_address_balance(self, address: str) -> Balance: result = await self._make_request("GET", f"/dag/{address}/balance") return Balance(**result, meta=result.get("meta"))
[docs] async def get_latest_snapshot(self) -> SignedGlobalIncrementalSnapshot: result = await self._make_request("GET", "/global-snapshots/latest") return SignedGlobalIncrementalSnapshot(**result)
[docs] async def get_latest_snapshot_ordinal(self) -> Ordinal: result = await self._make_request("GET", "/global-snapshots/latest/ordinal") return Ordinal(**result)
[docs] async def post_state_channel_snapshot(self, address: str, snapshot: dict): # TODO: How to test this? return await self._make_request( "POST", f"/state-channel/{address}/snapshot", payload=snapshot )
[docs] async def get_delegated_stake_last_reference( self, address: str ) -> TransactionReference: result = await self._make_request( "GET", f"/delegated-stakes/last-reference/{address}" ) return await TransactionReference(**result)
[docs] async def get_delegated_stakes_info(self, address) -> DelegatedStakesInfo: result = await self._make_request("GET", f"/delegated-stakes/{address}/info") return DelegatedStakesInfo(**result)
[docs] async def put_delegated_stake_withdrawal(self, tx: SignedWithdrawDelegatedStake): return await self._make_request( "PUT", "/delegated-stakes", payload=tx.model_dump() )
[docs] async def post_delegated_stake(self, tx: SignedCreateDelegatedStake): return await self._make_request( "POST", "/delegated-stakes", payload=tx.model_dump() )
[docs] async def get_delegated_stakes_reward_info(self): return await self._make_request("GET", "delegated-stakes/rewards-info")
[docs] async def get_node_collateral_last_reference( self, address: str ) -> TransactionReference: result = await self._make_request( "GET", f"/node-collateral/last-reference/{address}" ) return TransactionReference(**result)
[docs] async def get_node_collaterals_info(self, address) -> NodeCollateralsInfo: result = await self._make_request("GET", f"/node-collateral/{address}/info") return NodeCollateralsInfo(**result)
[docs] async def put_node_collateral_withdrawal(self, tx: SignedWithdrawNodeCollateral): return await self._make_request( "PUT", "/node-collateral", payload=tx.model_dump() )
[docs] async def post_node_collateral(self, tx: SignedCreateNodeCollateral): return await self._make_request( "POST", "/node-collateral", payload=tx.model_dump() )
[docs] async def post_node_parameters(self, tx: SignedUpdateNodeParameters): """Register validator parameters for delegated staking""" return await self._make_request("POST", "/node-params", payload=tx.model_dump())
@staticmethod def _get_node_parameters_search_and_sort_path_and_params( search_name_or_id: Optional[str], sort: Optional[ Literal[ "name", "peerID", "address", "totalAddressesAssigned", "totalAmountDelegated", ] ], sort_order: Optional[Literal["ASC", "DESC"]], ) -> Dict: params = {} if isinstance(search_name_or_id, str): params["search"] = search_name_or_id if sort in ( "name", "peerID", "address", "totalAddressesAssigned", "totalAmountDelegated", ): params["sort"] = sort if sort_order in ("ASC", "DESC"): params["sortOrder"] = sort_order return {"params": params}
[docs] async def get_node_parameters( self, search_name_or_id: Optional[str] = None, sort: Optional[ Literal[ "name", "peerID", "address", "totalAddressesAssigned", "totalAmountDelegated", ] ] = None, sort_order: Optional[Literal["ASC", "DESC"]] = None, ): request = self._get_node_parameters_search_and_sort_path_and_params( search_name_or_id, sort, sort_order ) result = await self._make_request( "GET", "/node-params", params=request["params"] ) return NodeParametersInfo.process_node_parameters(**result)
[docs] async def get_node_parameters_by_address(self, address): result = await self._make_request("GET", f"/node-params/{address}") return UpdateNodeParameters(**result)