from abc import ABC, abstractmethod
from typing import Optional, List, Any, Dict
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ec
from eth_utils import keccak, to_checksum_address
from eth_keys import keys
from pydantic import BaseModel, Field, ConfigDict
[docs]
class EcdsaAccount(BaseModel, ABC):
tokens: Dict[str, dict] = Field(default_factory=dict)
wallet: Optional[ec.EllipticCurvePrivateKey] = None
assets: List[Any] = Field(default_factory=list)
bip44_index: Optional[int] = None
provider: Any = None
label: Optional[str] = None
model_config = ConfigDict(arbitrary_types_allowed=True)
@property
@abstractmethod
def decimals(self) -> int:
pass
@property
@abstractmethod
def network_id(self) -> str:
pass
@property
@abstractmethod
def has_token_support(self) -> bool:
pass
@property
@abstractmethod
def supported_assets(self) -> List[str]:
pass
[docs]
@abstractmethod
def verify_message(self, msg: str, signature: str, says_address: str) -> bool:
pass
[docs]
def get_decimals(self) -> int:
return self.decimals
[docs]
def get_label(self) -> str:
return self.label
[docs]
def create(self, private_key: Optional[str]):
if private_key:
# Convert hex private key to cryptography object
private_key_bytes = bytes.fromhex(private_key)
private_key_int = int.from_bytes(private_key_bytes, byteorder="big")
self.wallet = ec.derive_private_key(
private_value=private_key_int,
curve=ec.SECP256K1(),
backend=default_backend(),
)
else:
self.wallet = ec.generate_private_key(
curve=ec.SECP256K1(), backend=default_backend()
)
return self
[docs]
def save_token_info(self, address: str):
pass
[docs]
def get_web3_provider(self):
return self._provider
[docs]
def set_web3_provider(self, provider):
self._provider = provider
[docs]
def get_tokens(self) -> Optional[List[str]]:
return self.tokens.copy() if self.tokens else []
[docs]
def set_tokens(self, tokens: Dict[str, dict]):
if tokens:
self.tokens = tokens.copy()
[docs]
def get_bip44_index(self) -> Optional[int]:
return self.bip44_index
[docs]
def get_state(self) -> Dict[str, Any]:
result = {
"address": self.get_address(),
"supported_assets": self.supported_assets,
}
if self.label:
result["label"] = self.label
if self.tokens:
result["tokens"] = self.tokens
return result
[docs]
def get_network_id(self):
return self.network_id
[docs]
def serialize(self, include_private_key: bool = True) -> Dict[str, Any]:
result = {}
if include_private_key:
result["private_key"] = self.get_private_key()
if self.label:
result["label"] = self.label
if self.tokens:
result["tokens"] = self.tokens.copy()
if self.bip44_index is not None:
result["bip44_index"] = self.bip44_index
return result
[docs]
def deserialize(
self,
bip44_index: Optional[int] = None,
label: Optional[str] = None,
private_key: Optional[str] = None,
public_key: Optional[str] = None,
tokens: Optional[List[str]] = None,
):
self.label = label
self.bip44_index = bip44_index
self.tokens = tokens or self.tokens
if private_key:
# Convert hex private key to cryptography object
private_key_bytes = bytes.fromhex(private_key)
private_key_int = int.from_bytes(private_key_bytes, byteorder="big")
self.wallet = ec.derive_private_key(
private_value=private_key_int,
curve=ec.SECP256K1(),
backend=default_backend(),
)
else:
raise NotImplementedError(
"EcdsaAccount :: Wallet instance from public key isn't supported."
)
# TODO: This doesn't work since the library doens't seem to have any equivalent
return self
# TODO
# def sign_message(self, msg: str) -> str:
# private_key = self.get_private_key_buffer()
# msg_hash = eth_util.hash_personal_message(msg.encode())
#
# v, r, s = eth_util.ecsign(msg_hash, private_key)
#
# if not eth_util.is_valid_signature(v, r, s):
# raise ValueError("Sign-Verify failed")
#
# return eth_util.strip_hex_prefix(eth_util.to_rpc_sig(v, r, s))
[docs]
def recover_signed_msg_public_key(self, msg: str, signature: str) -> str:
# Compute the hash of the message in Ethereum's personal_sign format
msg_hash = keccak(text=f"\x19Ethereum Signed Message:\n{len(msg)}{msg}")
# Decode the signature (remove '0x' prefix if present)
signature_bytes = bytes.fromhex(
signature[2:] if signature.startswith("0x") else signature
)
v, r, s = signature_bytes[-1], signature_bytes[:32], signature_bytes[32:64]
# Recover the public key
try:
public_key = keys.ecdsa_recover(
msg_hash,
keys.Signature(
vrs=(v, int.from_bytes(r, "big"), int.from_bytes(s, "big"))
),
)
except Exception as e:
raise ValueError(f"EcdsaAccount :: Failed to recover public key: {e}")
# Return the public key in hexadecimal format
return public_key.to_hex()
[docs]
def get_address(self) -> str:
public_key = self.wallet.public_key()
public_bytes = public_key.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
# Take keccak of everything except the first byte (0x04)
address = keccak(public_bytes[1:])[-20:]
return to_checksum_address("0x" + address.hex())
[docs]
def get_public_key(self) -> str:
public_key = self.wallet.public_key()
public_bytes = public_key.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
return public_bytes.hex()
[docs]
def get_private_key(self) -> str:
private_bytes = self.wallet.private_numbers().private_value.to_bytes(32, "big")
return private_bytes.hex()
[docs]
def get_private_key_buffer(self):
return self.wallet.private_numbers().private_value.to_bytes(32, "big")