import base64
import json
import random
from decimal import Decimal
from typing import Tuple, Callable, Optional, Union, Literal, Dict, Any
import base58
import eth_keyfile
from bip32utils import BIP32Key
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import (
decode_dss_signature,
encode_dss_signature,
Prehashed,
)
from cryptography.hazmat.backends import default_backend
import hashlib
import eth_utils
from pypergraph.core.constants import PKCS_PREFIX
from pypergraph.network.models.transaction import Transaction, TransactionReference
from .kryo import Kryo
from .bip_helpers.bip32_helper import Bip32Helper
from .bip_helpers.bip39_helper import Bip39Helper
from .utils import normalize_object, serialize_brotli
from .v3_keystore import V3KeystoreCrypto, V3Keystore
from ..core.constants import BIP_44_PATHS, SECP256K1_ORDER
MIN_SALT = int(Decimal("1e8"))
[docs]
class KeyStore:
"""
Methods dealing with keys.
"""
PERSONAL_SIGN_PREFIX = "\u0019Constellation Signed Message:\n"
DATA_SIGN_PREFIX = "\u0019Constellation Signed Data:\n"
[docs]
@staticmethod
def prepare_tx(
amount: int,
to_address: str,
from_address: str,
last_ref: TransactionReference,
fee: int = 0,
) -> Tuple[Transaction, str]:
"""
Prepare a new transaction.
:param amount: Amount to send.
:param to_address: Destination DAG address.
:param from_address: Source DAG address.
:param last_ref: Dictionary with keys: ordinal, hash.
:param fee: Transaction fee.
:return: TransactionV2 object, sha512hash, rle.
"""
if to_address == from_address:
raise ValueError(
"KeyStore :: An address cannot send a transaction to itself"
)
if int(amount) < 1e-8:
raise ValueError("KeyStore :: Send amount must be greater than 1e-8")
if fee < 0:
raise ValueError("KeyStore :: Send fee must be greater or equal to zero")
# Create transaction
tx = Transaction(
source=from_address,
destination=to_address,
amount=amount,
fee=fee,
parent=last_ref,
salt=MIN_SALT + int(random.getrandbits(48)),
)
# Get encoded transaction
encoded_tx = tx.encoded
kryo = Kryo()
serialized_tx = kryo.serialize(msg=encoded_tx, set_references=False)
hash_value = hashlib.sha256(bytes.fromhex(serialized_tx)).hexdigest()
return tx, hash_value
[docs]
def encode_data(
self,
msg: dict,
prefix: Union[bool, str] = True,
encoding: Optional[
Union[Literal["base64"], Callable[[dict], str], None]
] = None,
) -> str:
"""
Encode custom data transaction for signing or signature verification.
:param msg: Dictionary (the content of 'value' in a SignedTransaction).
:param prefix: Enable or disable the default prefix '\u0019Constellation Signed Data:\n' to the encoded msg.
:param encoding: Can be None (default), 'base64' or a custom encoding function.
:return: Encoded data transaction.
"""
self._remove_nulls(msg)
if encoding:
if callable(encoding):
# Use custom encoding function
msg = encoding(msg)
elif encoding == "base64":
# Used in the VOTING and NFT metagraph example
encoded = json.dumps(msg, separators=(",", ":"))
msg = base64.b64encode(encoded.encode()).decode()
else:
raise ValueError("KeyStore :: Not a valid encoding method.")
else:
# Default: used in the TO-DO, SOCIAL and WATER AND ENERGY metagraph examples
msg = json.dumps(msg, separators=(",", ":"))
if prefix is True:
msg = f"{self.DATA_SIGN_PREFIX}{len(msg)}\n{msg}"
elif isinstance(prefix, str):
msg = f"{prefix}{len(msg)}\n{msg}"
return msg
def _serialize_data(
self, encoded_msg: str, serialization: Optional[Callable] = None
):
"""
Could be a way to add extra customization but since netmet is working on a signature library... :)
"""
if callable(serialization):
return serialization(encoded_msg)
return encoded_msg.encode("utf-8")
def _remove_nulls(self, obj):
def process_value(value):
if value is None:
return None
if isinstance(value, list):
return [process_value(v) for v in value if process_value(v) is not None]
if isinstance(value, dict):
return self._remove_nulls(value)
return value
return {
k: process_value(v) for k, v in obj.items() if process_value(v) is not None
}
[docs]
def data_sign(
self,
private_key,
msg: dict,
prefix: Union[bool, str] = True,
encoding: Optional[
Union[Literal["base64"], Callable[[dict], str], None]
] = None,
) -> Tuple[str, str]:
"""
Encode message according to serializeUpdate on your template module l1.
:param private_key:
:param msg: Dictionary (the content of 'value' in a SignedTransaction).
:param prefix: Enable or disable the default prefix '\u0019Constellation Signed Data:\n' to the encoded msg or inject custom string.
:param encoding: Can be None (default), 'base64' or a custom encoding function.
:return: signature, transaction hash.
"""
# 1. The TO-DO, SOCIAL and WATER AND ENERGY template doesn't add the signing prefix, it only needs the transaction to be formatted as string without spaces and None values:
# # encoded = json.dumps(tx_value, separators=(',', ':'))
# signature, hash_ = keystore.data_sign(pk, encoded, prefix=False) # Default encoding = "hex"
# 2. The VOTING and NFT template does use the dag4JS dataSign (prefix=True), the encoding (before data_sign) is done first by stringifying, then converting to base64:
# # encoded = json.dumps(tx_value, separators=(',', ':'))
# # encoded = base64.b64encode(encoded.encode()).decode()
# signature, hash_ = keystore.data_sign(pk, tx_value, prefix=True, encoding="base64") # Default prefix is True
# 3. The TO-DO, SOCIAL and WATER AND ENERGY template doesn't add the signing prefix, it only needs the transaction to be formatted as string without spaces and None values:
# # encoded = json.dumps(tx_value, separators=(',', ':'))
# signature, hash_ = keystore.data_sign(pk, encoded, prefix=False) # Default encoding = "hex"
# X. Inject a custom encoding function:
# def encode(msg: dict):
# return json.dumps(tx_value, separators=(',', ':'))
#
# signature, hash_ = keystore.data_sign(pk, tx_value, prefix=False, encoding=encode)
""" Encode """
msg = self.encode_data(encoding=encoding, prefix=prefix, msg=msg)
""" Serialize """
serialized = self._serialize_data(msg)
hash_ = hashlib.sha256(serialized).hexdigest()
""" Sign """
signature = self.sign(private_key, hash_)
return signature, hash_
[docs]
def verify_data(
self,
public_key: str,
encoded_msg: str,
signature: str,
):
"""
Verify a signature using the `cryptography` library.
:param public_key: Public key in hex format (64-byte uncompressed, no 0x04 prefix).
:param encoded_msg: Original message string to verify.
:param signature: Canonical DER signature in hex.
:return: True if valid, False otherwise.
"""
# Step 1: Replicate message preprocessing
serialized = encoded_msg.encode("utf-8")
# Compute SHA256 hash of the serialized message
sha256_hash_hex = hashlib.sha256(serialized).hexdigest()
# Compute SHA512 digest of the hex string's UTF-8 bytes and truncate
sha512_digest = hashlib.sha512(sha256_hash_hex.encode("utf-8")).digest()[:32]
# Step 2: Load public key from hex
public_key_bytes = bytes.fromhex(public_key)
if len(public_key_bytes) == 65:
public_key_bytes = public_key_bytes[1:] # Remove 04
if len(public_key_bytes) != 64:
raise ValueError("Public key must be 64 bytes (uncompressed SECP256k1).")
# Split into x and y coordinates (32 bytes each)
x = int.from_bytes(public_key_bytes[:32], byteorder="big")
y = int.from_bytes(public_key_bytes[32:], byteorder="big")
# Create public key object
public_numbers = ec.EllipticCurvePublicNumbers(x, y, ec.SECP256K1())
public_key = public_numbers.public_key(default_backend())
# Step 3: Verify the signature
try:
public_key.verify(
bytes.fromhex(signature),
sha512_digest,
ec.ECDSA(Prehashed(hashes.SHA256())), # Treat digest as SHA256-sized
)
return True
except InvalidSignature:
return False
[docs]
def personal_sign(self, msg, private_key) -> str:
# TODO: How is this used?
message = f"{self.PERSONAL_SIGN_PREFIX}{len(msg)}\n{msg}"
return self.sign(private_key, message)
[docs]
def brotli_sign(self, public_key: str, private_key: str, body: dict):
normalized_msg = normalize_object(body)
serialized_tx = serialize_brotli(body)
msg_hash = hashlib.sha256(serialized_tx).hexdigest()
signature = self.sign(private_key, msg_hash)
return {
"value": normalized_msg,
"proofs": [{"id": public_key, "signature": signature}],
}
[docs]
@staticmethod
def sign(private_key: str, msg: str) -> str:
"""
Create transaction signature using the `cryptography` library.
:param private_key: Private key in hex format.
:param msg: Transaction message (string).
:return: Canonical DER signature in hex.
"""
# Convert hex private key to cryptography object
private_key_bytes = bytes.fromhex(private_key)
private_key_int = int.from_bytes(private_key_bytes, byteorder="big")
private_key = ec.derive_private_key(
private_key_int, ec.SECP256K1(), default_backend()
)
# Prehash message with SHA-512 and truncate to 32 bytes
msg_digest = hashlib.sha512(msg.encode("utf-8")).digest()[:32]
# Sign deterministically (RFC 6979) and enforce canonical form
signature = private_key.sign(
msg_digest, ec.ECDSA(Prehashed(hashes.SHA256()))
) # Prehashed for raw digest
# Decode signature to (r, s) and enforce canonical `s`
r, s = decode_dss_signature(signature)
if s > SECP256K1_ORDER // 2:
s = SECP256K1_ORDER - s
# Re-encode as canonical DER signature
canonical_signature = encode_dss_signature(r, s)
return canonical_signature.hex()
[docs]
@staticmethod
def verify(public_key: str, msg: str, signature: str) -> bool:
"""
Verify is the signature is valid.
:param public_key:
:param msg: Hex format
:param signature:
:return: True or False
"""
# Compute SHA512 digest of the hex string's UTF-8 bytes and truncate
sha512_digest = hashlib.sha512(msg.encode("utf-8")).digest()[:32]
# Step 2: Load public key from hex
public_key_bytes = bytes.fromhex(public_key)
if len(public_key_bytes) == 65:
public_key_bytes = public_key_bytes[1:] # Remove 04
if len(public_key_bytes) != 64:
raise ValueError("Public key must be 64 bytes (uncompressed SECP256k1).")
# Split into x and y coordinates (32 bytes each)
x = int.from_bytes(public_key_bytes[:32], byteorder="big")
y = int.from_bytes(public_key_bytes[32:], byteorder="big")
# Create public key object
public_numbers = ec.EllipticCurvePublicNumbers(x, y, ec.SECP256K1())
public_key = public_numbers.public_key(default_backend())
# Step 3: Verify the signature
try:
public_key.verify(
bytes.fromhex(signature),
sha512_digest,
ec.ECDSA(Prehashed(hashes.SHA256())), # Treat digest as SHA256-sized
)
return True
except InvalidSignature:
return False
[docs]
@staticmethod
def validate_address(address: str) -> bool:
"""
Returns True if DAG address is valid, False if invalid.
:param address: DAG address.
:return: Boolean value.
"""
if not address:
return False
valid_len = len(address) == 40
valid_prefix = address.startswith("DAG")
valid_parity = address[3].isdigit() and 0 <= int(address[3]) < 10
base58_part = address[4:]
valid_base58 = (
len(base58_part) == 36
and base58_part == base58.b58encode(base58.b58decode(base58_part)).decode()
)
return valid_len and valid_prefix and valid_parity and valid_base58
[docs]
@staticmethod
def validate_mnemonic(phrase: str) -> bool:
"""
Returns True is phrase is valid, False if invalid.
:param phrase: String of words (default: 12).
:return: Boolean value.
"""
return Bip39Helper.validate_mnemonic(mnemonic_phrase=phrase)
[docs]
@staticmethod
def generate_mnemonic() -> str:
"""
:return: 12 word mnemonic phrase
"""
bip39 = Bip39Helper()
return bip39.mnemonic()
[docs]
def generate_private_key(self) -> str:
"""
Generates private key.
:return: Private key hex.
"""
return (
ec.generate_private_key(curve=ec.SECP256K1(), backend=default_backend())
.private_numbers()
.private_value.to_bytes(32, byteorder="big")
.hex()
)
[docs]
@staticmethod
def validate_private_key_keystore(data: dict) -> bool:
if not data:
return False
crypto = data.get("crypto", {})
kdfparams = crypto.get("kdfparams", {})
return all(
key in kdfparams and kdfparams[key] is not None
for key in ("salt", "n", "r", "p", "dklen")
)
[docs]
@staticmethod
async def encrypt_phrase(phrase: str, password: str) -> V3Keystore:
"""
Can be used to encrypt the phrase using password.
:param phrase:
:param password:
:return:
"""
return await V3KeystoreCrypto.encrypt_phrase(phrase=phrase, password=password)
[docs]
@staticmethod
async def decrypt_phrase(keystore: V3Keystore, password: str) -> str:
"""
Can be used to decrypt the phrase using password.
:param keystore:
:param password:
:return:
"""
return await V3KeystoreCrypto.decrypt_phrase(
keystore=keystore, password=password
)
[docs]
def encrypt_private_key(
self, password: str, private_key: Optional[str] = None
) -> Dict[str, Any]:
"""
Can be stored (written to disk) and transferred.
:param private_key:
:param password:
:return: Dictionary, use json.dumps()
"""
private_key = private_key or self.generate_private_key()
return eth_keyfile.create_keyfile_json(
private_key=bytes.fromhex(private_key),
password=password.encode("utf-8"), # This is right; should be bytes.
kdf="scrypt",
)
[docs]
def decrypt_private_key(self, data: dict, password: str):
if self.validate_private_key_keystore(data):
wallet = eth_keyfile.decode_keyfile_json(
raw_keyfile_json=data,
password=password.encode("utf-8"), # This is right; should be bytes.
)
return wallet.hex()
[docs]
@staticmethod
def get_master_key_from_mnemonic(
phrase: str, derivation_path: str = BIP_44_PATHS.CONSTELLATION_PATH.value
):
"""
Master key can be used to derive HD keys.
:param phrase:
:param derivation_path:
:return:
"""
bip32 = Bip32Helper()
return bip32.get_master_key_from_mnemonic(phrase, path=derivation_path)
[docs]
@staticmethod
def derive_account_from_master_key(master_key: BIP32Key, index: int) -> str:
"""
Derive HD private key from master key.
:param master_key:
:param index:
:return:
"""
account_key = master_key.ChildKey(index)
return account_key.PrivateKey().hex()
[docs]
@staticmethod
def get_extended_private_key_from_mnemonic(phrase: str):
# Extended keys can be used to derive child keys
bip39 = Bip39Helper()
bip32 = Bip32Helper()
if bip39.validate_mnemonic(phrase):
seed_bytes = bip39.get_seed_from_mnemonic(phrase)
root_key = bip32.get_root_key_from_seed(seed_bytes)
return root_key.ExtendedKey()
[docs]
@staticmethod
def get_private_key_from_mnemonic(
phrase: str, derivation_path=BIP_44_PATHS.CONSTELLATION_PATH.value
) -> str:
"""
Get private key from phrase. Returns the first account.
:param phrase:
:param derivation_path:
:return: Private key as hexadecimal string
"""
bip32 = Bip32Helper()
bip39 = Bip39Helper()
seed = bip39.get_seed_from_mnemonic(phrase)
private_key = bip32.get_private_key_from_seed(seed=seed, path=derivation_path)
return private_key.hex()
[docs]
@staticmethod
def get_public_key_from_private(private_key: str) -> str:
"""
:param private_key:
:return: Public key (Node ID)
"""
bip32 = Bip32Helper()
return bip32.get_public_key_from_private_hex(
private_key=bytes.fromhex(private_key)
)
[docs]
@staticmethod
def get_dag_address_from_public_key(public_key: str) -> str:
"""
:param public_key: The private key as a hexadecimal string.
:return: The DAG address corresponding to the public key (node ID).
"""
# TODO: Use utils.py
if len(public_key) == 128:
public_key = PKCS_PREFIX + "04" + public_key
elif len(public_key) == 130 and public_key[:2] == "04":
public_key = PKCS_PREFIX + public_key
else:
raise ValueError("KeyStore :: Not a valid public key.")
public_key = hashlib.sha256(bytes.fromhex(public_key)).hexdigest()
public_key = base58.b58encode(bytes.fromhex(public_key)).decode()
public_key = public_key[len(public_key) - 36 :]
check_digits = "".join([char for char in public_key if char.isdigit()])
check_digit = 0
for n in check_digits:
check_digit += int(n)
if check_digit >= 9:
check_digit = check_digit % 9
address = f"DAG{check_digit}{public_key}"
return address
[docs]
def get_dag_address_from_private_key(self, private_key: str):
public_key = self.get_public_key_from_private(private_key=private_key)
return self.get_dag_address_from_public_key(public_key=public_key)
[docs]
@staticmethod
def get_eth_address_from_public_key(public_key: str) -> str:
eth_address = eth_utils.keccak(bytes.fromhex(public_key))[-20:]
return "0x" + eth_address.hex()
[docs]
def get_eth_address_from_private_key(self, private_key: str) -> str:
public_key = self.get_public_key_from_private(private_key=private_key)[
2:
] # Removes the 04 prefix from public key
return self.get_eth_address_from_public_key(public_key=public_key)