Source code for pypergraph.network.api.layer_1_api

import logging
from typing import List, Dict, Any, Union, Optional

from prometheus_client.parser import text_string_to_metric_families

from pypergraph.core.cross_platform.di.rest_client import RESTClient, HttpxClient
from pypergraph.core.cross_platform.rest_api_client import RestAPIClient
from pypergraph.network.models.allow_spend import AllowSpendReference, SignedAllowSpend
from pypergraph.network.models.network import PeerInfo
from pypergraph.network.models.token_lock import TokenLockReference, SignedTokenLock
from pypergraph.network.models.transaction import (
    PendingTransaction,
    SignedTransaction,
    TransactionReference,
)


def _handle_metrics(response: str) -> List[Dict[str, Any]]:
    """
    Parse Prometheus metrics output from a text response.

    :param response: Prometheus text output.
    :return: List of dictionaries with metric details.
    """
    metrics = []
    for family in text_string_to_metric_families(response):
        for sample in family.samples:
            metrics.append(
                {
                    "name": sample[0],
                    "labels": sample[1],
                    "value": sample[2],
                    "type": family.type,
                }
            )
    return metrics


[docs] class L1Api: def __init__( self, host: str, client: Optional[RESTClient] = None, timeout: int = 25 ): if not host: logging.warning("L1Api | ML1 :: Layer 1 API object not set.") self._host = host self.client = client or HttpxClient(timeout=timeout)
[docs] def config(self, host: Optional[str] = None, client: Optional[RESTClient] = None): """Reconfigure the RestAPIClient.""" if host: self._host = host if client: self.client = client
async def _make_request( self, method: str, endpoint: str, params: Dict[str, Any] = None, payload: Dict[str, Any] = None, ) -> Union[Dict, List, str]: """ Helper function to create a new RestAPIClient instance and make a request. """ async with RestAPIClient(base_url=self._host, client=self.client) as client: return await client.request( method=method, endpoint=endpoint, params=params, payload=payload )
[docs] async def get_cluster_info(self) -> List[PeerInfo]: result = await self._make_request("GET", "/cluster/info") return PeerInfo.process_cluster_peers(data=result)
[docs] async def get_metrics(self) -> List[Dict[str, Any]]: """ Get metrics from the L1 endpoint. :return: Prometheus output as a list of dictionaries. """ response = await self._make_request("GET", "/metrics") return _handle_metrics(response)
[docs] async def get_last_reference(self, address: str) -> TransactionReference: result = await self._make_request( "GET", f"/transactions/last-reference/{address}" ) return TransactionReference(**result)
[docs] async def get_pending_transaction(self, hash: str) -> PendingTransaction: result = await self._make_request("GET", f"/transactions/{hash}") return PendingTransaction(**result)
[docs] async def post_transaction(self, tx: SignedTransaction): return await self._make_request( "POST", "/transactions", payload=tx.model_dump() )
[docs] async def get_allow_spend_last_reference(self, address: str) -> AllowSpendReference: result = await self._make_request( "GET", f"/allow-spends/last-reference/{address}" ) return AllowSpendReference(**result)
[docs] async def post_allow_spend(self, tx: SignedAllowSpend): result = await self._make_request( "POST", "/allow-spends", payload=tx.model_dump() ) return result
[docs] async def get_token_lock_last_reference(self, address: str) -> TokenLockReference: result = await self._make_request( "GET", f"/token-locks/last-reference/{address}" ) return TokenLockReference(**result)
[docs] async def post_token_lock(self, tx: SignedTokenLock): result = await self._make_request( "POST", "/token-locks", payload=tx.model_dump(by_alias=True) ) return result