Source code for pypergraph.account.monitor

# TODO: wait_for
#       Storage path

import asyncio
import logging
import time
import json

# from dataclasses import dataclass
from typing import Dict, Union, List, Optional, Any, Callable

from pydantic import BaseModel, Field
from rx import operators as ops, of, empty, Observable
from rx.core.abc import Disposable
from rx.scheduler.eventloop import AsyncIOScheduler
from rx.subject import BehaviorSubject

from pypergraph.account.tests import secret
from pypergraph.core.cross_platform.state_storage_db import StateStorageDb
from pypergraph.network.models.network import NetworkInfo
from pypergraph.network.models.transaction import TransactionStatus, PendingTransaction
from pypergraph.network.models.block_explorer import Transaction

TWELVE_MINUTES = 12 * 60 * 1000


# @dataclass
# class WaitFor:
#     future: asyncio.Future
#     resolve: callable


[docs] class DagWalletMonitorUpdate(BaseModel): pending_has_confirmed: bool = False trans_txs: List[PendingTransaction] = Field(default_factory=list) tx_changed: bool = False
[docs] class Monitor: def __init__(self, account, state_storage_file_path: str): """ Monitors events and stores states. :param account: DagAccount() :param state_storage_file_path: Full path and filename to storage (with file extension). """ self.account = account self._scheduler = AsyncIOScheduler(asyncio.get_event_loop()) self._mem_pool_change = BehaviorSubject(DagWalletMonitorUpdate().model_dump()) self.last_timer = 0.0 self.pending_timer = 0.0 # self.wait_for_map: Dict[str, WaitFor] = {} self.cache_utils = StateStorageDb(file_path=state_storage_file_path) self.cache_utils.set_prefix("pypergraph-")
[docs] def subscribe_mem_pool(self, callback: Callable[[Any], Observable]) -> Disposable: """ Listen for account events like login and logout. Event = {"module": "account", "event": "logout"} """ subscription = self._mem_pool_change.pipe( ops.observe_on(self._scheduler), ops.flat_map(callback), ops.catch( lambda e, src: ( logging.error(f"Monitor :: {e}", exc_info=True), empty(), )[1] ), # Using tuple indexing to return empty() ).subscribe() return subscription # subscription.dispose() to unsub
[docs] def subscribe_account(self, callback: Callable[[Any], Observable]) -> Disposable: """ Listen for account events like login and logout. Event = {"module": "account", "event": "logout"} """ subscription = self.account._session_change.pipe( ops.observe_on(self._scheduler), ops.flat_map(callback), ops.catch( lambda e, src: ( logging.error(f"Monitor :: {e}", exc_info=True), empty(), )[1] ), # Using tuple indexing to return empty() ).subscribe() return subscription # subscription.dispose() to unsub
[docs] def subscribe_network(self, callback: Callable[[Any], Observable]) -> Disposable: """ Listen for network events like network_change. Event = { "module": "network", "type": "network_change", "event": self.get_network(), } """ subscription = self.account.network._network_change.pipe( ops.observe_on(self._scheduler), ops.flat_map(callback), ops.catch( lambda e, src: ( logging.error(f"Monitor :: {e}", exc_info=True), empty(), )[1] ), # Using tuple indexing to return empty() ).subscribe() return subscription # subscription.dispose() to unsub
[docs] async def set_to_mem_pool_monitor(self, pool: List[PendingTransaction]): network_info = self.account.network.get_network() key = f"network-{network_info['network_id'].lower()}-mempool" await self.cache_utils.set(key, [tx.model_dump() for tx in pool])
[docs] async def get_mem_pool_from_monitor( self, address: Optional[str] = None ) -> List[PendingTransaction]: address = address or self.account.address network_info = self.account.network.get_network() try: txs: List[json] = ( await self.cache_utils.get( f"network-{network_info['network_id'].lower()}-mempool" ) or [] ) txs = ( [ PendingTransaction(**json.loads(tx)) if not isinstance(tx, dict) else PendingTransaction(**tx) for tx in txs ] if txs else [] ) except Exception as e: logging.warning(f"Monitor :: {e}, will return empty list.", exc_info=True) return [] return [ tx for tx in txs if not address or not tx.receiver or tx.receiver == address or tx.sender == address ]
[docs] async def add_to_mem_pool_monitor( self, value: PendingTransaction ): # 'value' can be a dict or string network_info = NetworkInfo(**self.account.network.get_network()) key = f"network-{network_info.network_id}-mempool" # Get cached payload or initialize empty list cached = await self.cache_utils.get(key) payload = cached if isinstance(cached, list) else [] payload = [PendingTransaction(**p) for p in payload] # Create transaction object if isinstance(value, str): tx = PendingTransaction( **{"hash": value, "timestamp": int(time.time() * 1000)} ) elif isinstance(value, PendingTransaction): tx = value else: raise ValueError("Monitor :: Must be PendingTransaction or hash.") # Check for existing transaction if not any(p.hash == tx.hash for p in payload): payload.append(tx) payload = [tx.model_dump_json(indent=2) for tx in payload] await self.cache_utils.set(key, payload) self.last_timer = int(time.time() * 1000) self.pending_timer = 1000 asyncio.create_task(self.poll_pending_txs()) return tx.model_dump()
[docs] async def poll_pending_txs(self): try: current_time = int(time.time() * 1000) if current_time - self.last_timer + 1000 < self.pending_timer: logging.debug("Monitor :: Canceling extra timer.") return pending_result = await self.process_pending_txs() pending_txs = pending_result["pending_txs"] tx_changed = pending_result["tx_changed"] trans_txs = pending_result["trans_txs"] pending_has_confirmed = pending_result["pending_has_confirmed"] pool_count = pending_result["pool_count"] if pending_txs: await self.set_to_mem_pool_monitor(pending_txs) self.pending_timer = 1000 self.last_timer = current_time await asyncio.sleep(10) asyncio.create_task(self.poll_pending_txs()) elif pool_count > 0: await self.set_to_mem_pool_monitor([]) self._mem_pool_change.on_next( DagWalletMonitorUpdate( tx_changed=tx_changed, trans_txs=trans_txs, pending_has_confirmed=pending_has_confirmed, ).model_dump() ) logging.debug( f"Monitor :: Memory pool updated: {self._mem_pool_change.value}" ) except Exception as e: logging.error(f"Monitor :: {e}", exc_info=True)
[docs] async def process_pending_txs(self) -> Dict[str, Any]: try: pool = await self.get_mem_pool_from_monitor() trans_txs = [] next_pool = [] pending_has_confirmed = False tx_changed = False for index, pending_tx in enumerate(pool): pending_tx = pool[index] try: tx_hash = pending_tx.hash try: be_tx = await self.account.network.get_transaction(tx_hash) if be_tx: pending_tx.timestamp = int( be_tx.timestamp.timestamp() * 1000 ) pending_has_confirmed = True tx_changed = True pending_tx.pending = False pending_tx.status = TransactionStatus.CONFIRMED.value pending_tx.pending_msg = "Confirmed" # if tx_hash in self.wait_for_map: # self.wait_for_map[tx_hash].resolve(True) # del self.wait_for_map[tx_hash] else: if ( pending_tx.status != "CHECKPOINT_ACCEPTED" and pending_tx.status != TransactionStatus.GLOBAL_STATE_PENDING.value and pending_tx.timestamp + TWELVE_MINUTES < int(time.time() * 1000) ): pending_tx.status = TransactionStatus.DROPPED.value pending_tx.pending = False tx_changed = True else: if ( pending_tx.status != TransactionStatus.GLOBAL_STATE_PENDING.value ): pending_tx.status = ( TransactionStatus.GLOBAL_STATE_PENDING.value ) pending_tx.pending_msg = "Will confirm shortly..." tx_changed = True elif not pending_tx.status: pending_tx.status = "UNKNOWN" pending_tx.pending_msg = "Transaction not found..." next_pool.append(pending_tx) except Exception as e: logging.error(f"Monitor :: {e}", exc_info=True) trans_txs.append(pending_tx) except Exception as e: logging.error(f"Monitor :: {e}", exc_info=True) return { "pending_txs": next_pool, "tx_changed": tx_changed, "trans_txs": trans_txs, "pending_has_confirmed": pending_has_confirmed, "pool_count": len(pool), } except Exception as e: logging.error(f"Monitor :: {e}", exc_info=True)
# async def wait_for_transaction(self, hash: str) -> asyncio.Future: # """Execute function after transaction has finished.""" # # TODO # if hash not in self.wait_for_map: # loop = asyncio.get_event_loop() # future = loop.create_future() # self.wait_for_map[hash] = WaitFor( # future=future, # resolve=lambda result: future.set_result(result) # ) # return self.wait_for_map[hash].future
[docs] def start_monitor(self): asyncio.create_task(self.poll_pending_txs())
[docs] async def get_latest_transactions( self, address: str, limit: Optional[int] = None, search_after: Optional[str] = None, ) -> List[Union[PendingTransaction, Transaction]]: c_txs = await self.account.network.get_transactions_by_address( address, limit, search_after ) pending_result = await self.process_pending_txs() pending_transactions = [p for p in pending_result["pending_txs"]] return pending_transactions + c_txs if c_txs else pending_transactions + []
[docs] async def main(): from pypergraph import DagAccount account = DagAccount() monitor = Monitor(account, state_storage_file_path="state_storage.json") def safe_network_process_event(observable: dict): """Process an event safely, catching errors.""" # Simulate event processing (replace with your logic) print(f"Monitor :: Injected callable network event subscription: {observable}") return of(observable) # Emit the event downstream def safe_account_process_event(observable): if observable["event"] == "logout": print("Monitor :: Injected callable account event: logout signal received.") elif observable["event"] == "login": print("Monitor :: Injected callable account event: login signal received.") else: print( f"Monitor :: Unknown signal received by injected callable account event: {observable}" ) return of(observable) def safe_mem_pool_process_event(observable): print(f"Observable: {observable}") return of(observable) mem_pool_sub = monitor.subscribe_mem_pool(safe_mem_pool_process_event) network_sub = monitor.subscribe_network(safe_network_process_event) # monitor.start_monitor() account_sub = monitor.subscribe_account(safe_account_process_event) account.connect("integrationnet") account.login_with_seed_phrase(secret.mnemo) pending_tx = await account.transfer(secret.to_address, 50000, 200000) await monitor.add_to_mem_pool_monitor(pending_tx) txs = await monitor.get_latest_transactions(address=account.address, limit=20) print(txs) network_sub.dispose() await asyncio.sleep(120) account.logout() await asyncio.sleep(1) mem_pool_sub.dispose() account_sub.dispose() network_sub.dispose()
if __name__ == "__main__": asyncio.run(main())