Source code for pypergraph.keystore.v3_keystore

import asyncio
import os
import uuid
import hashlib
import json

import eth_keyfile
from eth_hash.backends.pycryptodome import keccak256
from typing_extensions import TypedDict, NotRequired

from concurrent.futures import ThreadPoolExecutor
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from mnemonic import Mnemonic

_executor = ThreadPoolExecutor(max_workers=4)  # Adjust as needed


# Type definitions for type hinting
[docs] class KDFParamsPhrase(TypedDict): prf: str dklen: int salt: str c: int
[docs] class CipherParams(TypedDict): iv: str
[docs] class CryptoStruct(TypedDict): cipher: str ciphertext: str cipherparams: CipherParams kdf: str kdfparams: KDFParamsPhrase mac: str
[docs] class V3Keystore(TypedDict): crypto: CryptoStruct id: str version: int meta: NotRequired[str]
ENCRYPT = { "cipher": "aes-128-ctr", "kdf": "pbkdf2", "prf": "hmac-sha256", "dklen": 32, "c": 262144, "hash": hashes.SHA256().name, }
[docs] def type_check_jphrase(keystore: V3Keystore) -> bool: params = keystore.get("crypto", {}).get("kdfparams") if params and "salt" in params and "c" in params and "dklen" in params: return True raise TypeError("V3Keystore :: Invalid JSON Keystore format.")
[docs] def blake256(data: bytes) -> str: return hashlib.blake2b(data, digest_size=hashlib.blake2b().digest_size).hexdigest()
[docs] async def pbkdf2_async( passphrase: bytes, salt: bytes, iterations: int, keylen: int, digest: str ) -> bytes: loop = asyncio.get_running_loop() return await loop.run_in_executor( _executor, hashlib.pbkdf2_hmac, digest, passphrase, salt, iterations, keylen )
[docs] class V3KeystoreCrypto:
[docs] @staticmethod async def encrypt_phrase(phrase: str, password: str) -> V3Keystore: if not isinstance(phrase, str) or not isinstance(password, str): raise TypeError( "V3KeystoreCrypto :: Both phrase and password must be strings." ) mnemo = Mnemonic("english") if not mnemo.check(phrase): raise TypeError("V3KeystoreCrypto :: Invalid BIP39 phrase.") keystore_id = str(uuid.uuid4()) salt = os.urandom(32) iv = os.urandom(16) phrase_bytes = phrase.encode("utf-8") password_bytes = password.encode("utf-8") # Key derivation derived_key = await pbkdf2_async( password_bytes, salt, ENCRYPT["c"], ENCRYPT["dklen"], ENCRYPT["hash"] ) # AES-128-CTR encryption cipher = Cipher( algorithms.AES(derived_key[:16]), modes.CTR(iv), backend=default_backend() ) encryptor = cipher.encryptor() ciphertext = encryptor.update(phrase_bytes) + encryptor.finalize() # MAC calculation mac_data = derived_key[16:32] + ciphertext mac = keccak256(mac_data).hex() # Build the keystore structure return V3Keystore( **{ "crypto": { "cipher": ENCRYPT["cipher"], "ciphertext": ciphertext.hex(), "cipherparams": {"iv": iv.hex()}, "kdf": ENCRYPT["kdf"], "kdfparams": { "prf": ENCRYPT["prf"], "dklen": ENCRYPT["dklen"], "salt": salt.hex(), "c": ENCRYPT["c"], }, "mac": mac, }, "id": keystore_id, "version": 3, "meta": "stardust-collective/pypergraph", } )
[docs] @staticmethod async def decrypt_phrase(keystore: V3Keystore, password: str) -> str: type_check_jphrase(keystore) crypto = keystore["crypto"] kdfparams = crypto["kdfparams"] password_bytes = password.encode("utf-8") salt = bytes.fromhex(kdfparams["salt"]) ciphertext = bytes.fromhex(crypto["ciphertext"]) iv = bytes.fromhex(crypto["cipherparams"]["iv"]) # Key derivation derived_key = await pbkdf2_async( password_bytes, salt, kdfparams["c"], kdfparams["dklen"], ENCRYPT["hash"] ) # MAC verification mac_data = derived_key[16:32] + ciphertext calculated_mac = keccak256(mac_data).hex() if calculated_mac != crypto["mac"]: raise ValueError("V3KeystoreCrypto :: Invalid password.") try: # AES decryption cipher = Cipher( algorithms.AES(derived_key[:16]), modes.CTR(iv), backend=default_backend(), ) decryptor = cipher.decryptor() phrase_bytes = decryptor.update(ciphertext) + decryptor.finalize() return phrase_bytes.decode("utf-8") except Exception as e: raise ValueError(f"V3KeystoreCrypto :: Decryption failed: {str(e)}")
# Example usage
[docs] async def main(): phrase = ( "legal winner thank year wave sausage worth useful legal winner thank yellow" ) password = "securepassword123" # Encryption encrypted = await V3KeystoreCrypto.encrypt_phrase(phrase, password) print("Encrypted keystore:", json.dumps(encrypted, indent=2)) # Decryption decrypted_eth_keyfile = eth_keyfile.decode_keyfile_json( encrypted, "securepassword123".encode("utf-8") ) print("ETH Keyfile decrypted phrase:", decrypted_eth_keyfile) decrypted = await V3KeystoreCrypto.decrypt_phrase(encrypted, password) print("\nDecrypted phrase:", decrypted)
if __name__ == "__main__": asyncio.run(main())