[docs]classKeyringMonitor:def__init__(self,keyring_manager:Optional[KeyringManager]=None):self._scheduler=AsyncIOScheduler(asyncio.get_event_loop())self._keyring_manager=keyring_managerorKeyringManager()defevent_handler(event):"""Handles incoming keyring events."""ifnotisinstance(event,dict):logging.warning(f"⚠️ Unexpected event format: {event}")returnevent_type=event.get("type")ifevent_type=="lock":logging.debug("🔒 Vault locked!")elifevent_type=="unlock":logging.debug("🔓 Vault unlocked!")elifevent_type=="account_update":logging.info("🔄 Account updated:",event["data"])elifevent_type=="removed_account":logging.info("❌ Account removed:",event["data"])elifevent_type=="state_update":logging.debug(f"⚡ State updated, has {len(event['data']['wallets'])} wallet(s): {event['data']}")else:logging.warning(f"⚠️ Unknown event type: {event_type}")defstate_handler(state):"""Handles state changes."""# print(f"Wallet {'unlocked' if state['is_unlocked'] else 'locked'}: {len(state['wallets'])} wallets present")passdefsafe_event_processing(event):"""Processes an event safely, catching errors per event."""try:event_handler(event)returnof(event)# Ensure an observable is returnedexceptExceptionase:logging.error(f"🚨 Error processing event {event}: {e}",exc_info=True)# return of(None) # Send placeholder down the linereturnempty()# End the current stream entirely# Subscribing to state updatesself._keyring_manager._state_subject.pipe(ops.observe_on(self._scheduler),ops.distinct_until_changed(),ops.retry(3),# Retry on transient errorsops.catch(lambdae,src:of(None)),# Keep the stream alive after retries fail).subscribe(on_next=state_handler)# Subscribing to events safelyself._keyring_manager._event_subject.pipe(ops.observe_on(self._scheduler),ops.flat_map(safe_event_processing),# Ensures event processing continues).subscribe()
# Running the setup
[docs]asyncdefmain():keyring=KeyringManager(storage_file_path="key_storage.json")# monitor = KeyringMonitor(keyring)keyring._event_subject.on_next({"invalid":"error"})# Logs warning but doesn't crashawaitkeyring.login("super_S3cretP_Asswo0rd")awaitkeyring.logout()