Monitor¶
Note
In the future, we aim to create a more flexible caching solution, similar to the one found in web3.py <https://github.com/ethereum/web3.py/blob/main/web3/utils/caching.py>_.
The Monitor class handles transaction caching and observes events emitted by Account instances.
account = DagAccount()
monitor = Monitor(account=account, state_storage_path="/full/path/state_storage.json")
Caching¶
Transactions¶
Transactions are added to cache like this:
account = DagAccount()
monitor = Monitor(account=account, state_storage_path="/full/path/state_storage.json")
pending_tx = await account.transfer(secret.to_address, 50000, 200000)
await monitor.add_to_mem_pool_monitor(pending_tx) # Add transaction to cache and monitor for state changes.
After adding a transaction to the cache, it will be monitored for state changes until the transaction is confirmed.
The caching mechanism relies on the StateStorageDB, which is also used for keyring storage operations (keyring data is registered with the key pypergraph-vault).
Transactions are cached with the key format "pypergraph-network-{network_id}-mempool" (e.g., "pypergraph-network-mainnet-mempool" for mainnet).
Memory Pool Store Content
{
"pypergraph-vault": {
"version": 1,
"ciphertext": "1ae7f92f88e5982b282e5cdbd783bc46aade307703b86bfde06b0802430ef9e65910c7b3bcc6028b1804e3086501a5ab12ddcf1b79a86dae1069ebe9ef66b88aabaa4ad6e9deeb5bb028c3d7276368949d18242a6fe05c064730054a3a8edabf8f0162237753b1376a3e984b1ecf96d37d59ad48ea0f21e47eb101161ede7ba2a7522a6ceba01ac493915eede03f5be3a81a29470faf56547defc1967b337114ada16c0403d5f9fd60741c10ac58c8d143de842ddf6d40c15ce01e1adf80d3dfdaf8b376cc00245b6f55b5960512e3319849be0ca6dc29887126103e3458a3d295f37d2ecf36935b087a6a6f54deaf41e6d9d932f8a8cc30e4d06401c2ea4bb7590edb7991ef5b48bc6eeeeec18098e58a3c1337994110f7898f65916d86ee94836314595c4928084bb78c830527bfe650d654b71a0c5254321fad810ccaee0a8d32928ef111330b43fcc08e67668ca251157923749b74e55d01cbc1184316c7fe171c1dfab6bca923b9b1d74da7fca52904d23823364de95e53b33bf7b1a11f4f1e2da7ebf2ef908a938f7f8a4f93c7ae5a516a4cd4454b11d7355bb99ea5d1f397c1ec970ef34736f2b8fc5eec9cfef184ac959e7dfecd3d98406cf2914f515afa8254b9a926226508281d112214d9222f805f8420847dbc2dec80dfb776d284da84a8106b8f6f5bdf370c67e6595dbdf9caba86",
"salt": "12b6621029ee7ce650c978c9b934215747771e8e6c37ad18085e84cf2bd451cd",
"nonce": "96bf4577c5d262226148a400",
"hmac": "1598d5d3c1d8844620e2dd302b819e3053c3739144a3851c2b8dd7b642f42c62",
"kdf_params": {
"algorithm": "argon2id",
"time_cost": 3,
"memory_cost": 65536,
"parallelism": 1
}
},
"pypergraph-network-testnet-mempool": [
{
"hash": "a123...",
"sender": "DAG0...",
"receiver": "DAG5...",
"amount": 50000,
"ordinal": 60,
"status": "GLOBAL_STATE_PENDING",
"pending": true,
"pending_msg": "Will confirm shortly...",
"timestamp": 1744052539670,
"fee": 200000
},
{
"hash": "f123...",
"sender": "DAG0...",
"receiver": "DAG1...",
"amount": 50000,
"ordinal": 61,
"status": "GLOBAL_STATE_PENDING",
"pending": true,
"pending_msg": "Will confirm shortly...",
"timestamp": 1744052589929,
"fee": 200000
}
],
"pypergraph-network-integrationnet-mempool": []
}
Event Observer¶
Events are emitted and observed using RxPy. There’s three ways methods for easily subscribe to network_changes,
account login and logout and transaction mem_pool updates.
Transaction Memory Pool¶
Transactions state changes are updated in DagWalletUpdate.
DagWalletUpdate
Key |
Type |
Description |
|---|---|---|
|
|
If a pending has confirmed. |
|
|
A list of all pending transactions. |
|
|
If any change occurred (e.g. transaction dropped, confirmed, etc.) |
Subscribe to mem_pool Updates
from pypergraph import DagAccount
account = DagAccount()
monitor = Monitor(account, state_storage_file_path="state_storage.json")
def safe_mem_pool_process_event(observable):
print(f"Monitor :: Transaction Memory Pool: {observable}")
return of(observable)
mem_pool_sub = monitor.subscribe_mem_pool(safe_mem_pool_process_event)
account.connect('integrationnet')
account.login_with_seed_phrase(secret.mnemo)
pending_tx = await account.transfer(secret.to_address, 50000, 200000)
await monitor.add_to_mem_pool_monitor(pending_tx)
await asyncio.sleep(90)
account.logout()
mem_pool_sub.dispose()
The pending transactions will be monitored until all is confirmed by the network.
Network Changes¶
The DagTokenNetwork has a class variable _network_change BehaviorSubject that emits updates with the following structure:
Observable Event
Key |
Value |
Description |
|---|---|---|
|
|
ID of the emitting Python module. |
|
|
The type of event emitted by the module. |
|
|
New network settings. |
Subscribe to Network Changes
from pypergraph import DagAccount
account = DagAccount()
monitor = Monitor(account, state_storage_file_path="state_storage.json")
def safe_network_process_event(observable: dict):
# Simulate event processing (replace with your logic)
print(f"Monitor :: Injected callable network event subscription: {observable}")
return of(observable) # Emit the event downstream
network_sub = monitor.subscribe_network(safe_network_process_event)
account.connect('integrationnet')
network_sub.dispose()
asyncio.sleep(1)
Account Events¶
Observable Event
Key |
Value |
Description |
|---|---|---|
|
|
ID of the emitting Python module. |
|
|
Subscribe to Account Events
from pypergraph import DagAccount, Monitor
account = DagAccount()
monitor = Monitor(account, state_storage_file_path="state_storage.json")
def safe_account_process_event(observable):
if observable["event"] == "logout":
print("Monitor :: Injected callable account event: logout signal received.")
elif observable["event"] == "login":
print("Monitor :: Injected callable account event: login signal received.")
else:
print(f"Monitor :: Unknown signal received by injected callable account event: {observable}")
return of(observable)
account_sub = monitor.subscribe_account(safe_account_process_event)
account.login_with_seed_phrase(secret.mnemo)
await asyncio.sleep(1) # Wait a bit for update
account.logout()
await asyncio.sleep(1)
account_sub.dispose()
Get Pending and Confirmed Transactions¶
The Monitor class provides methods to retrieve pending and confirmed transactions from the cache and the block explorer.
from pypergraph import DagAccount, Monitor
account = DagAccount()
account.login_with_seed_phrase(secret.mnemo)
monitor = Monitor(account, state_storage_file_path="state_storage.json")
lst = await monitor.get_latest_transactions(address=account.address, limit=10, search_after=None)