Monitor

Note

In the future, we aim to create a more flexible caching solution, similar to the one found in web3.py <https://github.com/ethereum/web3.py/blob/main/web3/utils/caching.py>_.

The Monitor class handles transaction caching and observes events emitted by Account instances.

account = DagAccount()
monitor = Monitor(account=account, state_storage_path="/full/path/state_storage.json")

Caching

Transactions

Transactions are added to cache like this:

account = DagAccount()
monitor = Monitor(account=account, state_storage_path="/full/path/state_storage.json")
pending_tx = await account.transfer(secret.to_address, 50000, 200000)
await monitor.add_to_mem_pool_monitor(pending_tx) # Add transaction to cache and monitor for state changes.

After adding a transaction to the cache, it will be monitored for state changes until the transaction is confirmed. The caching mechanism relies on the StateStorageDB, which is also used for keyring storage operations (keyring data is registered with the key pypergraph-vault). Transactions are cached with the key format "pypergraph-network-{network_id}-mempool" (e.g., "pypergraph-network-mainnet-mempool" for mainnet).

Memory Pool Store Content
{
  "pypergraph-vault": {
    "version": 1,
    "ciphertext": "1ae7f92f88e5982b282e5cdbd783bc46aade307703b86bfde06b0802430ef9e65910c7b3bcc6028b1804e3086501a5ab12ddcf1b79a86dae1069ebe9ef66b88aabaa4ad6e9deeb5bb028c3d7276368949d18242a6fe05c064730054a3a8edabf8f0162237753b1376a3e984b1ecf96d37d59ad48ea0f21e47eb101161ede7ba2a7522a6ceba01ac493915eede03f5be3a81a29470faf56547defc1967b337114ada16c0403d5f9fd60741c10ac58c8d143de842ddf6d40c15ce01e1adf80d3dfdaf8b376cc00245b6f55b5960512e3319849be0ca6dc29887126103e3458a3d295f37d2ecf36935b087a6a6f54deaf41e6d9d932f8a8cc30e4d06401c2ea4bb7590edb7991ef5b48bc6eeeeec18098e58a3c1337994110f7898f65916d86ee94836314595c4928084bb78c830527bfe650d654b71a0c5254321fad810ccaee0a8d32928ef111330b43fcc08e67668ca251157923749b74e55d01cbc1184316c7fe171c1dfab6bca923b9b1d74da7fca52904d23823364de95e53b33bf7b1a11f4f1e2da7ebf2ef908a938f7f8a4f93c7ae5a516a4cd4454b11d7355bb99ea5d1f397c1ec970ef34736f2b8fc5eec9cfef184ac959e7dfecd3d98406cf2914f515afa8254b9a926226508281d112214d9222f805f8420847dbc2dec80dfb776d284da84a8106b8f6f5bdf370c67e6595dbdf9caba86",
    "salt": "12b6621029ee7ce650c978c9b934215747771e8e6c37ad18085e84cf2bd451cd",
    "nonce": "96bf4577c5d262226148a400",
    "hmac": "1598d5d3c1d8844620e2dd302b819e3053c3739144a3851c2b8dd7b642f42c62",
    "kdf_params": {
      "algorithm": "argon2id",
      "time_cost": 3,
      "memory_cost": 65536,
      "parallelism": 1
    }
  },
  "pypergraph-network-testnet-mempool": [
    {
      "hash": "a123...",
      "sender": "DAG0...",
      "receiver": "DAG5...",
      "amount": 50000,
      "ordinal": 60,
      "status": "GLOBAL_STATE_PENDING",
      "pending": true,
      "pending_msg": "Will confirm shortly...",
      "timestamp": 1744052539670,
      "fee": 200000
    },
    {
      "hash": "f123...",
      "sender": "DAG0...",
      "receiver": "DAG1...",
      "amount": 50000,
      "ordinal": 61,
      "status": "GLOBAL_STATE_PENDING",
      "pending": true,
      "pending_msg": "Will confirm shortly...",
      "timestamp": 1744052589929,
      "fee": 200000
    }
  ],
  "pypergraph-network-integrationnet-mempool": []
}

Event Observer

Events are emitted and observed using RxPy. There’s three ways methods for easily subscribe to network_changes, account login and logout and transaction mem_pool updates.


Transaction Memory Pool

Transactions state changes are updated in DagWalletUpdate.

DagWalletUpdate

Key

Type

Description

pending_has_confirmed

bool, False (default)

If a pending has confirmed.

trans_txs

List[PendingTransaction]

A list of all pending transactions.

tx_changed

bool, False (default)

If any change occurred (e.g. transaction dropped, confirmed, etc.)

Subscribe to mem_pool Updates

from pypergraph import DagAccount

account = DagAccount()
monitor = Monitor(account, state_storage_file_path="state_storage.json")

def safe_mem_pool_process_event(observable):
    print(f"Monitor :: Transaction Memory Pool: {observable}")
    return of(observable)

mem_pool_sub = monitor.subscribe_mem_pool(safe_mem_pool_process_event)
account.connect('integrationnet')
account.login_with_seed_phrase(secret.mnemo)
pending_tx = await account.transfer(secret.to_address, 50000, 200000)
await monitor.add_to_mem_pool_monitor(pending_tx)
await asyncio.sleep(90)
account.logout()
mem_pool_sub.dispose()

The pending transactions will be monitored until all is confirmed by the network.


Network Changes

The DagTokenNetwork has a class variable _network_change BehaviorSubject that emits updates with the following structure:

Observable Event

Key

Value

Description

"module"

"network"

ID of the emitting Python module.

"type"

"network_change"

The type of event emitted by the module.

"event"

{"network_id": "integrationnet", ...}

New network settings.

Subscribe to Network Changes

from pypergraph import DagAccount

account = DagAccount()
monitor = Monitor(account, state_storage_file_path="state_storage.json")

def safe_network_process_event(observable: dict):
    # Simulate event processing (replace with your logic)
    print(f"Monitor :: Injected callable network event subscription: {observable}")
    return of(observable)  # Emit the event downstream

network_sub = monitor.subscribe_network(safe_network_process_event)
account.connect('integrationnet')
network_sub.dispose()
asyncio.sleep(1)

Account Events

Observable Event

Key

Value

Description

"module"

"account"

ID of the emitting Python module.

"event"

"login" or "logout"

Subscribe to Account Events

from pypergraph import DagAccount, Monitor

account = DagAccount()
monitor = Monitor(account, state_storage_file_path="state_storage.json")

def safe_account_process_event(observable):
    if observable["event"] == "logout":
        print("Monitor :: Injected callable account event: logout signal received.")
    elif observable["event"] == "login":
        print("Monitor :: Injected callable account event: login signal received.")
    else:
        print(f"Monitor :: Unknown signal received by injected callable account event: {observable}")
    return of(observable)

account_sub = monitor.subscribe_account(safe_account_process_event)
account.login_with_seed_phrase(secret.mnemo)
await asyncio.sleep(1) # Wait a bit for update
account.logout()
await asyncio.sleep(1)
account_sub.dispose()

Get Pending and Confirmed Transactions

The Monitor class provides methods to retrieve pending and confirmed transactions from the cache and the block explorer.

from pypergraph import DagAccount, Monitor

account = DagAccount()
account.login_with_seed_phrase(secret.mnemo)
monitor = Monitor(account, state_storage_file_path="state_storage.json")
lst = await monitor.get_latest_transactions(address=account.address, limit=10, search_after=None)