Source code for pypergraph.account.metagraph_client

import asyncio
from datetime import datetime

from typing import Any, Dict, List, Optional, Union

from pypergraph.network.shared.operations import allow_spend, token_lock
from pypergraph.network.models.transaction import (
    SignedTransaction,
    TransactionReference,
)
from pypergraph.network.metagraph_network import MetagraphTokenNetwork


[docs] class MetagraphTokenClient: """ Create a metagraph account from DagAccount. """ from pypergraph.account import DagAccount def __init__( self, account: DagAccount, metagraph_id: str, block_explorer_url: Optional[str] = None, l0_host: Optional[str] = None, currency_l1_host: Optional[str] = None, data_l1_host: Optional[str] = None, token_decimals: int = 8, ): self.account = account self.network = MetagraphTokenNetwork( metagraph_id=metagraph_id, l0_host=l0_host, currency_l1_host=currency_l1_host, data_l1_host=data_l1_host, network_id=account.network.connected_network.network_id, block_explorer=block_explorer_url or account.network.be_api._host, ) self.token_decimals = token_decimals @property def network_instance(self): return self.network @property def address(self): return self.account.address
[docs] async def get_transactions( self, limit: Optional[int] = None, search_after: Optional[str] = None ): """ Get paginated list of Block Explorer transaction objects. :param limit: Limit per page. :param search_after: Timestamp. :return: """ return await self.network.get_transactions_by_address( self.address, limit, search_after )
[docs] async def get_balance(self) -> int: """ Get Metagraph token balance for the active account. :return: Integer. """ return await self.get_balance_for(self.address)
[docs] async def get_balance_for(self, address: str) -> int: """ Get Metagraph token balance for the active account. :return: Integer. """ response = await self.network.get_address_balance(address) if response and isinstance(response.balance, (int, float)): return int(response.balance) return 0
[docs] async def get_fee_recommendation(self): # TODO: Fee api last_ref = await self.network.get_address_last_accepted_transaction_ref( self.address ) if not last_ref.get("hash"): return 0 last_tx = await self.network.get_pending_transaction(last_ref["hash"]) if not last_tx: return 0 return 1 / self.token_decimals
[docs] async def create_allow_spend( self, destination: str, amount: int, approvers: List[str], source: Optional[str] = None, fee: int = 0, currency_id: Optional[str] = None, valid_until_epoch: Optional[int] = None, ): """ Grants permission for another wallet or metagraph to spend up to a specified amount from the user’s wallet in a metagraph token or DAG. :param source: Wallet address signing the transaction. Address of logged in account, if left None :param destination: The destination address. This must be a Metagraph address. :param amount: The amount the destination address is allowed to spend. :param approvers: A list with single DAG address that can automatically approve the spend, can be Metagraph or wallet address. :param currency_id: The Metagraph ID used to identify the currency. For DAG, this parameter is left None. :param fee: Default 0. :param valid_until_epoch: The global snapshot epoch progress for which this is valid until. If not provided, the default value will be currentEpoch + 30. Minumum allowed value: currentEpoch + 5. Maximum allowed value: currentEpoch + 60. """ # TODO: check logged in and valid private key response = await allow_spend( destination=destination, amount=amount, approvers=approvers, source=source or self.account.key_trio.address, fee=fee, currency_id=currency_id or self.network.connected_network.metagraph_id, valid_until_epoch=valid_until_epoch, network=self.network, key_trio=self.account.key_trio, ) return response
[docs] async def create_token_lock( self, amount: int, fee: int = 0, unlock_epoch: int = None, source: str = None, currency_id: Optional[str] = None, ): """ Token locking is used for: Node collateral staking Delegated staking participation Governance requirements Time-based vesting or escrow models :param source: The wallet signing the transaction. The logged in account is the default if left empty. :param amount: The amount to lock. :param currency_id: The Metagraph identifier address for the currency to lock. Leave None, if currency is DAG. :param fee: The fee. Default when None is 0. :param unlock_epoch: The global snapshot epoch progress to unlock the tokens. If provided, must be greater than the current epoch. """ # TODO: check logged in and valid private key # this.account.assertAccountIsActive(); # this.account.assertValidPrivateKey(); # response = await token_lock( source=source, amount=amount, fee=fee, currency_id=currency_id or self.network.connected_network.metagraph_id, unlock_epoch=unlock_epoch, network=self.network, key_trio=self.account.key_trio, ) return response
[docs] async def transfer( self, to_address: str, amount: int, fee: int = 0, auto_estimate_fee: bool = False, ) -> Optional[Dict[str, Any]]: """ Transfer DAG from active account to another DAG address. Amount as integer with the number of decimals used by the Metagraph. :param to_address: DAG address. :param amount: Integer with the number of decimals used by the Metagraph. :param fee: Integer with the number of decimals used by the Metagraph. :param auto_estimate_fee: :return: Dictionary. """ # TODO: Fee api endpoint last_ref = await self.network.get_address_last_accepted_transaction_ref( self.address ) tx, hash_ = await self.account.generate_signed_transaction( to_address, amount, fee, last_ref ) tx_hash = await self.network.post_transaction(tx) if tx_hash: return { "timestamp": datetime.now(), "hash": tx_hash, "amount": amount, "receiver": to_address, "fee": fee, "sender": self.address, "ordinal": last_ref.ordinal, "pending": True, "status": "POSTED", }
[docs] async def wait_for_balance_change(self, initial_value: Optional[int] = None): """ Check if active account balance changes (around 2 minutes). :param initial_value: :return: False if check did not detect balance change, else True. """ if initial_value is None: initial_value = await self.get_balance() await self.wait(5) for _ in range(24): result = await self.get_balance() if result is not None and result != initial_value: return True await self.wait(5) return False
[docs] async def generate_batch_transactions( self, transfers: List[Dict[str, Any]], last_ref: Optional[Union[Dict[str, Any], TransactionReference]] = None, ): """ Takes a list of dictionaries and returns a list of signed transaction objects. :param transfers: List of dictionaries. :param last_ref: Lost hash and ordinal from DAG address. :return: """ if isinstance(last_ref, TransactionReference): last_ref = last_ref.model_dump() if not last_ref: last_ref = await self.network.get_address_last_accepted_transaction_ref( self.address ) last_ref = last_ref.model_dump() txns = [] for transfer in transfers: transaction, hash_ = await self.account.generate_signed_transaction( transfer["to_address"], transfer["amount"], transfer.get("fee", 0), last_ref, ) last_ref = {"hash": hash_, "ordinal": last_ref["ordinal"] + 1} txns.append(transaction) return txns
[docs] async def transfer_batch_transactions( self, transactions: List[SignedTransaction] ) -> List[Optional[str]]: """ Send a list of signed transaction objects from the active account. :param transactions: List of signed transactions. :return: List of transactions. """ hashes = [] for txn in transactions: tx_hash = await self.network.post_transaction(txn) hashes.append(tx_hash) return hashes
[docs] async def transfer_batch( self, transfers: List[Dict[str, Any]], last_ref: Optional[Union[Dict[str, Any], TransactionReference]] = None, ): """ Build and send a list of transactions from the active account. :param transfers: List of dictionaries. :param last_ref: Last ordinal and hash from active account. :return: List of transaction hashes. """ # Metagraph like PACA doesn't seem to support this, needs to wait for the transaction to appear txns = await self.generate_batch_transactions(transfers, last_ref) return await self.transfer_batch_transactions(txns)
[docs] async def wait(self, time_in_seconds: int = 5): """Wait for a number of seconds. :param time_in_seconds: Integer. """ await asyncio.sleep(time_in_seconds)