import logging
from datetime import datetime
from typing import Optional, Union, Tuple, List
from typing_extensions import Self
from rx.subject import Subject
from pypergraph.account.models.key_trio import KeyTrio
from pypergraph.network.shared.operations import allow_spend, token_lock
from pypergraph.keystore import KeyStore
from pypergraph.network import DagTokenNetwork
from pypergraph.network.models.transaction import (
TransactionStatus,
TransactionReference,
SignedTransaction,
SignatureProof,
PendingTransaction,
)
[docs]
class DagAccount:
def __init__(self):
self.network: DagTokenNetwork = DagTokenNetwork()
self.key_trio: Optional[KeyTrio] = None
self._session_change: Subject = Subject()
[docs]
def connect(
self,
network_id: Optional[str] = "mainnet",
be_url: Optional[str] = None,
l0_host: Optional[str] = None,
cl1_host: Optional[str] = None,
) -> "DagAccount":
"""
Configure the DagAccount network instance. Parameter 'network_id' can be used to change between 'testnet',
'integrationnet' or 'mainnet', without further parameter settings. Default: 'mainnet'.
:param network_id: 'mainnet', 'integrationnet', 'testnet' or any string value.
:param be_url: Block Explorer host URL.
:param l0_host: Layer 0 host URL.
:param cl1_host: Currency Layer 1 host URL.
:return: Configured DagAccount object.
"""
# self.network = DagTokenNetwork() This will stop monitor from emitting network changes
self.network.config(network_id, be_url, l0_host, cl1_host)
return self
@property
def address(self):
"""
Requires login. Get the DagAccount DAG address.
See: login_with_seed_phrase(words=), login_with_private_key(private_key=) and login_with_public_key(public_key=)
:return: DAG address.
"""
if not self.key_trio or not self.key_trio.address:
raise ValueError(
"DagAccount :: Need to login before calling methods on DagAccount."
)
return self.key_trio.address
@property
def public_key(self):
"""
Requires login. Get the DagAccount public key.
See: login_with_seed_phrase(words=), login_with_private_key(private_key=) and login_with_public_key(public_key=)
This method does not support transfer of data or currency, due to missing private key.
:return: Public key.
"""
if not self.key_trio or not self.key_trio.public_key:
raise ValueError(
"DagAccount :: Need to login before calling methods on DagAccount."
)
return self.key_trio.public_key
@property
def private_key(self):
"""
Requires login. Get the DagAccount private key.
See: login_with_seed_phrase(words=), login_with_private_key(private_key=) and login_with_public_key(public_key=)
:return: Private key.
"""
if not self.key_trio or not self.key_trio.private_key:
raise ValueError(
"DagAccount :: Need to login before calling methods on DagAccount."
)
return self.key_trio.private_key
[docs]
def login_with_seed_phrase(self, phrase: str):
"""
Login with a 12 word seed phrase. Before transferring data or currency you need to login using a seed phrase
or private key.
:param phrase: 12 word seed phrase.
:return:
"""
private_key = KeyStore.get_private_key_from_mnemonic(phrase)
self.login_with_private_key(private_key)
[docs]
def login_with_private_key(self, private_key: str):
"""
Login with a private key. Before transferring data or currency you need to login using a seed phrase
or private key.
:param private_key: Private key.
:return:
"""
public_key = KeyStore.get_public_key_from_private(private_key)
address = KeyStore.get_dag_address_from_public_key(public_key)
self._set_keys_and_address(private_key, public_key, address)
[docs]
def login_with_public_key(self, public_key: str):
"""
Login with public key. This method does not enable the account to transfer data or currency.
See: login_with_seed_phrase(words=) or login_with_private_key(private_key=)
:param public_key:
:return:
"""
address = KeyStore.get_dag_address_from_public_key(public_key)
self._set_keys_and_address(None, public_key, address)
[docs]
def is_active(self):
"""
Check if any account is logged in.
:return:
"""
return self.key_trio is not None
[docs]
def logout(self):
"""
Logout the active account (delete key trio).
:return:
"""
self.key_trio = None
try:
self._session_change.on_next({"module": "account", "event": "logout"})
except Exception as e:
# logger.error(f"Error in network change handler: {e}")
print(f"Error in DagAccount session change handler: {e}")
def _set_keys_and_address(
self, private_key: Optional[str], public_key: str, address: str
):
self.key_trio = KeyTrio(
private_key=private_key, public_key=public_key, address=address
)
try:
self._session_change.on_next({"module": "account", "event": "login"})
except Exception as e:
# logger.error(f"Error in network change handler: {e}")
print(f"Error in DagAccount session change handler: {e}")
[docs]
async def get_balance(self):
"""
Get the balance for the active account.
:return:
"""
return await self.get_balance_for(self.address)
[docs]
async def get_balance_for(self, address: str):
"""
Get balance for a given DAG address. Returned as integer with 8 decimals.
:param address: DAG address.
:return: 0 or 8 decimal integer.
"""
response = await self.network.get_address_balance(address)
if response:
return int(response.balance)
return 0
[docs]
async def generate_signed_transaction(
self,
to_address: str,
amount: int,
fee: int = 0,
last_ref: Optional[Union[dict, TransactionReference]] = None,
) -> Tuple[SignedTransaction, str]:
"""
Generate a signed currency transaction from the currently active account.
:param to_address: Recipient DAG address.
:param amount: Integer with 8 decimals constituting the amount to transfer from the active account.
:param fee: (Optional) a minimum fee might be required if the active account is transaction limited.
:param last_ref: (Optional) The ordinal and hash of the last transaction from the active account.
:return: Signed transaction and the transaction hash.
"""
if isinstance(last_ref, dict):
last_ref = TransactionReference(**last_ref)
last_ref = (
last_ref
or await self.network.get_address_last_accepted_transaction_ref(
self.address
)
)
tx, hash_ = KeyStore.prepare_tx(
amount=amount,
to_address=to_address,
from_address=self.key_trio.address,
last_ref=last_ref,
fee=fee,
)
signature = KeyStore.sign(self.key_trio.private_key, hash_)
valid = KeyStore.verify(self.public_key, hash_, signature)
if not valid:
raise ValueError("Wallet :: Invalid signature.")
proof = SignatureProof(id=self.public_key[2:], signature=signature)
tx = SignedTransaction(value=tx, proofs=[proof])
return tx, hash_
[docs]
async def transfer(
self, to_address: str, amount: int, fee: int = 0, auto_estimate_fee=False
) -> Optional[PendingTransaction]:
"""
Build currency transaction, sign and transfer from the active account.
:param to_address: DAG address
:param amount: Integer with 8 decimals (e.g. 100000000 = 1 DAG)
:param fee: Integer with 8 decimals (e.g. 20000 = 0.0002 DAG)
:param auto_estimate_fee:
:return:
"""
# TODO: API fee estimate endpoint
last_ref = await self.network.get_address_last_accepted_transaction_ref(
self.address
)
signed_tx, hash_ = await self.generate_signed_transaction(
to_address, amount, fee, last_ref
)
tx_hash = await self.network.post_transaction(signed_tx)
if tx_hash:
pending_tx = PendingTransaction(
timestamp=int(datetime.now().timestamp() * 1000),
hash=tx_hash,
amount=amount,
receiver=to_address,
fee=fee,
sender=self.address,
ordinal=last_ref.ordinal,
pending=True,
status=TransactionStatus.POSTED,
)
return pending_tx
return None
[docs]
async def create_allow_spend(
self,
destination: str,
amount: int,
approvers: List[str],
source: Optional[str] = None,
fee: int = 0,
currency_id: Optional[str] = None,
valid_until_epoch: Optional[int] = None,
):
"""
Grants permission for another wallet or metagraph to spend up to a specified amount from the user’s wallet in a metagraph token or DAG.
:param source: Wallet address signing the transaction. Address of logged in account, if left None
:param destination: The destination address. This must be a Metagraph address.
:param amount: The amount the destination address is allowed to spend.
:param approvers: A list with single DAG address that can automatically approve the spend, can be Metagraph or wallet address.
:param currency_id: The Metagraph ID used to identify the currency. For DAG, this parameter is left None.
:param fee: Default 0.
:param valid_until_epoch: The global snapshot epoch progress for which this is valid until. If not provided, the default value will be currentEpoch + 30. Minumum allowed value: currentEpoch + 5. Maximum allowed value: currentEpoch + 60.
"""
# TODO: check logged in and valid private key
response = await allow_spend(
destination=destination,
amount=amount,
approvers=approvers,
source=source or self.key_trio.address,
fee=fee,
currency_id=currency_id or self.network.connected_network.metagraph_id,
valid_until_epoch=valid_until_epoch,
network=self.network,
key_trio=self.key_trio,
)
return response
[docs]
async def create_token_lock(
self,
amount: int,
fee: int = 0,
unlock_epoch: int = None,
source: Optional[str] = None,
currency_id: Optional[str] = None,
):
"""
Token locking is used for:
Node collateral staking
Delegated staking participation
Governance requirements
Time-based vesting or escrow models
:param source: The wallet signing the transaction. The logged in account is the default if left empty.
:param amount: The amount to lock.
:param currency_id: The Metagraph identifier address for the currency to lock. Leave None, if currency is DAG.
:param fee: The fee. Default when None is 0.
:param unlock_epoch: The global snapshot epoch progress to unlock the tokens. If provided, must be greater than the current epoch.
"""
# TODO: check logged in and valid private key
# this.account.assertAccountIsActive();
# this.account.assertValidPrivateKey();
#
response = await token_lock(
source=source or self.key_trio.address,
amount=amount,
fee=fee,
currency_id=currency_id,
unlock_epoch=unlock_epoch,
network=self.network,
key_trio=self.key_trio,
)
return response
[docs]
async def create_delegate_stake(self):
pass
[docs]
async def withdraw_delegate_stake(self):
pass
[docs]
async def set_node_parameters(self):
pass
[docs]
async def wait_for_checkpoint_accepted(self, hash: str):
"""
Check if transaction has been processed.
:param hash: Transaction hash.
:return: True if processed, False if not processed.
"""
txn = None
try:
txn = await self.network.get_pending_transaction(hash)
except Exception:
logging.debug("DagAccount :: No pending transaction.")
if txn and txn.get("status") == "Waiting":
return True
try:
await self.network.get_transaction(hash)
except Exception:
return False
return True
[docs]
async def wait_for_balance_change(self, initial_value: Optional[int] = None):
"""
Check if balance changed since initial value.
:param initial_value:
:return: True if balance changed, False if no change.
"""
if initial_value is None:
initial_value = await self.get_balance()
await self.wait(5)
for _ in range(24):
result = await self.get_balance()
if result is not None and result != initial_value:
return True
await self.wait(5)
return False
[docs]
async def generate_batch_transactions(
self,
transfers: List[dict],
last_ref: Optional[Union[dict, TransactionReference]] = None,
):
"""
Generate a batch of transactions to be transferred from the active account.
:param transfers: List of dictionaries, e.g. txn_data = [
{'to_address': to_address, 'amount': 10000000, 'fee': 200000},
{'to_address': to_address, 'amount': 5000000, 'fee': 200000},
{'to_address': to_address, 'amount': 2500000, 'fee': 200000},
{'to_address': to_address, 'amount': 1, 'fee': 200000}
]
:param last_ref: (Optional) Dictionary or with the account's last transaction hash and ordinal.
:return: List of transactions to be transferred (see: transfer_batch_transactions(transactions=))
"""
if isinstance(last_ref, dict):
last_ref = TransactionReference(**last_ref)
if not last_ref:
last_ref = await self.network.get_address_last_accepted_transaction_ref(
self.address
)
txns = []
for transfer in transfers:
transaction, hash_ = await self.generate_signed_transaction(
to_address=transfer["to_address"],
amount=transfer["amount"],
fee=transfer.get("fee", 0),
last_ref=last_ref,
)
last_ref = TransactionReference(ordinal=last_ref.ordinal + 1, hash=hash_)
txns.append(transaction)
return txns
[docs]
async def transfer_batch_transactions(self, transactions: List[SignedTransaction]):
"""
Send a batch (list) of signed currency transactions.
:param transactions: [SignedTransaction, ... ]
:return: List of transaction hashes.
"""
hashes = []
for txn in transactions:
hash_ = await self.network.post_transaction(txn)
hashes.append(hash_)
return hashes
[docs]
async def transfer_batch(
self,
transfers: List[dict],
last_ref: Optional[Union[dict, TransactionReference]] = None,
):
"""
Build and send $DAG currency transactions.
:param transfers: List of dictionaries, e.g. txn_data = [
{'to_address': to_address, 'amount': 10000000, 'fee': 200000},
{'to_address': to_address, 'amount': 5000000, 'fee': 200000},
{'to_address': to_address, 'amount': 2500000, 'fee': 200000},
{'to_address': to_address, 'amount': 1, 'fee': 200000}
]
:param last_ref: Dictionary with former ordinal and transaction hash, e.g.: {'ordinal': x, 'hash': y}.
:return:
"""
txns = await self.generate_batch_transactions(transfers, last_ref)
return await self.transfer_batch_transactions(txns)
[docs]
def get_eth_address(self) -> str:
# TODO
raise NotImplementedError("DagAccount :: Method not implemented.")
[docs]
async def wait(self, time: float = 5.0):
from asyncio import sleep
await sleep(time)