Skip to content

Index

GCPKmsAccount

Bases: BaseModel

Account implementation using Google Cloud KMS.

Source code in web3_google_hsm/accounts/gcp_kms_account.py
class GCPKmsAccount(BaseModel):
    """Account implementation using Google Cloud KMS."""

    # Public fields
    key_path: str = Field(default="")

    # Private attributes
    _client: kms.KeyManagementServiceClient = PrivateAttr()
    _cached_public_key: bytes | None = PrivateAttr(default=None)
    _settings: BaseConfig = PrivateAttr()

    def __init__(self, config: BaseConfig | None = None, credentials: dict | None = None, **data: Any):
        """
        Initialize GCP KMS Account with either config or credentials.
        If neither is provided, uses Google SDK default auth mechanism.

        Args:
            config: BaseConfig instance for environment-based configuration
            credentials: Dictionary containing GCP credentials
            **data: Additional data passed to BaseModel

        Raises:
            ValueError: If both config and credentials are provided
        """

        super().__init__(**data)

        if isinstance(credentials, dict):
            credentials, _ = load_credentials_from_dict(credentials)
        # Initialize client based on provided auth method
        self._client = (
            kms.KeyManagementServiceClient(credentials=credentials) if credentials else kms.KeyManagementServiceClient()  # type: ignore
        )

        # Initialize settings if config is provided, otherwise None
        self._settings = config or BaseConfig.from_env()

        # Set key path based on config or credentials
        self.key_path = self._get_key_version_path()

    def _get_key_version_path(self) -> str:
        """Get the full path to the key version in Cloud KMS."""
        return self._client.crypto_key_version_path(
            self._settings.project_id,
            self._settings.location_id,
            self._settings.key_ring_id,
            self._settings.key_id,
            "1",  # Using version 1
        )

    @property
    def public_key(self) -> bytes:
        """Get public key bytes from KMS."""
        if self._cached_public_key is None:
            response = self._client.get_public_key({"name": self.key_path})
            if not response.pem:
                msg = "No PEM data in response"
                raise ValueError(msg)

            self._cached_public_key = extract_public_key_bytes(response.pem)
        return self._cached_public_key

    @cached_property
    def address(self) -> ChecksumAddress:
        """Get Ethereum address derived from public key."""
        return to_checksum_address(keccak(self.public_key)[-20:].hex().lower())

    @classmethod
    def create_eth_key(
        cls,
        project_id: str,
        location_id: str,
        key_ring_id: str,
        key_id: str,
        retention_days: int = 365,
    ) -> kms.CryptoKey:
        """
        Creates a new Ethereum signing key in Cloud KMS backed by Cloud HSM.

        Args:
            project_id: Google Cloud project ID
            location_id: Cloud KMS location (e.g. 'us-east1')
            key_ring_id: ID of the Cloud KMS key ring
            key_id: ID of the key to create
            retention_days: Days to retain key versions before destruction (default: 365)

        Returns:
            CryptoKey: Created Cloud KMS key

        Raises:
            Exception: If key creation fails
        Reference:
            https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/kms/snippets/create_key_hsm.py
        """
        try:
            client = kms.KeyManagementServiceClient()
            key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)

            # Configure for Ethereum signing
            purpose = kms.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_SIGN
            algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_SECP256K1_SHA256
            protection_level = kms.ProtectionLevel.HSM

            key = {
                "purpose": purpose,
                "version_template": {
                    "algorithm": algorithm,
                    "protection_level": protection_level,
                },
                "destroy_scheduled_duration": duration_pb2.Duration().FromTimedelta(
                    datetime.timedelta(days=retention_days)
                ),
            }

            return client.create_crypto_key(
                request={"parent": key_ring_name, "crypto_key_id": key_id, "crypto_key": key}
            )
        except Exception as e:
            msg = f"Failed to create key: {e}"
            raise Exception(msg) from e

    def _sign_raw_hash(self, msghash: bytes) -> bytes | None:
        """Sign a message hash using KMS."""
        try:
            response = self._client.asymmetric_sign(request={"name": self.key_path, "digest": {"sha256": msghash}})
            return response.signature
        except Exception as e:
            msg = f"Signing error: {e}"
            raise Exception(msg) from e

    def sign_message(self, message: str | bytes) -> Signature:
        """
        Sign a message with the GCP KMS key.

        Args:
            message: Message to sign (str or bytes)

        Returns:
            Signature: The v, r, s components of the signature

        Raises:
            TypeError: If message is not str or bytes
            ValueError: If message hash length is invalid
            SignatureError: If signature verification fails
            Exception: If signing fails

        Example:
            ```{ .python .copy }
                account = GCPKmsAccount()
                print(f"GCP KMS Account address: {account.address}")
                message = "Hello Ethereum!"
                # Sign the message
                signed_message = account.sign_message(message)
            ```
        """
        # Convert message to SignableMessage format
        if isinstance(message, str):
            if message.startswith("0x"):
                hash_message = encode_defunct(hexstr=message)
            else:
                hash_message = encode_defunct(text=message)
        elif isinstance(message, bytes):
            hash_message = encode_defunct(primitive=message)
        else:
            msg = f"Unsupported message type: {type(message)}"
            raise TypeError(msg)

        # Sign message hash
        msghash = _hash_eip191_message(hash_message)
        if len(msghash) != MSG_HASH_LENGTH:
            msg = "Invalid message hash length"
            raise ValueError(msg)

        der_signature = self._sign_raw_hash(msghash)
        if not der_signature:
            msg = "Failed to sign message"
            raise Exception(msg)

        # Try both v values (27 and 28) to find the correct one
        for v_value in (27, 28):
            sig_dict = convert_der_to_rsv(der_signature, v_value)
            signature = Signature(v=sig_dict["v"], r=sig_dict["r"], s=sig_dict["s"])

            # Verify the signature
            recovered = Account.recover_message(hash_message, vrs=(signature.v, signature.r, signature.s))

            if recovered.lower() == self.address.lower():
                return signature

        msg = "Failed to create valid signature"
        raise SignatureError(msg)

    def sign_transaction(self, transaction: Transaction) -> bytes | None:
        """
        Sign an EIP-155 transaction.

        Args:
            transaction: Transaction to sign

        Returns:
            bytes | None: Serialized signed transaction or None if signing fails
        """
        # Create unsigned transaction dictionary
        unsigned_tx = {
            "nonce": transaction.nonce,
            "gasPrice": transaction.gas_price,
            "gas": transaction.gas_limit,
            "to": transaction.to,
            "value": transaction.value,
            "data": transaction.data,
            "chainId": transaction.chain_id,
        }

        # Convert to UnsignedTransaction and get hash
        unsigned_tx_obj = serializable_unsigned_transaction_from_dict(unsigned_tx)  # type: ignore
        msg_hash = unsigned_tx_obj.hash()

        # Sign the transaction hash
        der_signature = self._sign_raw_hash(msg_hash)
        if not der_signature:
            return None

        # Calculate v value based on chain ID
        v_base = (2 * transaction.chain_id + 35) if transaction.chain_id else 27
        sig_dict = convert_der_to_rsv(der_signature, v_base)

        # Create RLP serializable fields
        rlp_data = [
            transaction.nonce,
            transaction.gas_price,
            transaction.gas_limit,
            bytes.fromhex(transaction.to[2:]),  # Convert address to bytes
            transaction.value,
            bytes.fromhex(transaction.data[2:] if transaction.data.startswith("0x") else transaction.data),
            sig_dict["v"],
            int.from_bytes(sig_dict["r"], "big"),
            int.from_bytes(sig_dict["s"], "big"),
        ]

        # RLP encode the transaction and ensure it returns bytes
        encoded_tx = cast(bytes, rlp.encode(rlp_data))

        # Verify the signature
        recovered = Account.recover_transaction(encoded_tx)
        if recovered.lower() != self.address.lower():
            # Try with v + 1
            rlp_data[6] = sig_dict["v"] + 1  # Update v value
            encoded_tx = cast(bytes, rlp.encode(rlp_data))

            # Verify again
            recovered = Account.recover_transaction(encoded_tx)
            if recovered.lower() != self.address.lower():
                msg = "Failed to create valid signature"
                raise SignatureError(msg)

        return encoded_tx

address cached property

Get Ethereum address derived from public key.

public_key property

Get public key bytes from KMS.

__init__(config=None, credentials=None, **data)

Initialize GCP KMS Account with either config or credentials. If neither is provided, uses Google SDK default auth mechanism.

Parameters:

Name Type Description Default
config BaseConfig | None

BaseConfig instance for environment-based configuration

None
credentials dict | None

Dictionary containing GCP credentials

None
**data Any

Additional data passed to BaseModel

{}

Raises:

Type Description
ValueError

If both config and credentials are provided

Source code in web3_google_hsm/accounts/gcp_kms_account.py
def __init__(self, config: BaseConfig | None = None, credentials: dict | None = None, **data: Any):
    """
    Initialize GCP KMS Account with either config or credentials.
    If neither is provided, uses Google SDK default auth mechanism.

    Args:
        config: BaseConfig instance for environment-based configuration
        credentials: Dictionary containing GCP credentials
        **data: Additional data passed to BaseModel

    Raises:
        ValueError: If both config and credentials are provided
    """

    super().__init__(**data)

    if isinstance(credentials, dict):
        credentials, _ = load_credentials_from_dict(credentials)
    # Initialize client based on provided auth method
    self._client = (
        kms.KeyManagementServiceClient(credentials=credentials) if credentials else kms.KeyManagementServiceClient()  # type: ignore
    )

    # Initialize settings if config is provided, otherwise None
    self._settings = config or BaseConfig.from_env()

    # Set key path based on config or credentials
    self.key_path = self._get_key_version_path()

create_eth_key(project_id, location_id, key_ring_id, key_id, retention_days=365) classmethod

Creates a new Ethereum signing key in Cloud KMS backed by Cloud HSM.

Parameters:

Name Type Description Default
project_id str

Google Cloud project ID

required
location_id str

Cloud KMS location (e.g. 'us-east1')

required
key_ring_id str

ID of the Cloud KMS key ring

required
key_id str

ID of the key to create

required
retention_days int

Days to retain key versions before destruction (default: 365)

365

Returns:

Name Type Description
CryptoKey CryptoKey

Created Cloud KMS key

Raises:

Type Description
Exception

If key creation fails

Reference: https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/kms/snippets/create_key_hsm.py

Source code in web3_google_hsm/accounts/gcp_kms_account.py
@classmethod
def create_eth_key(
    cls,
    project_id: str,
    location_id: str,
    key_ring_id: str,
    key_id: str,
    retention_days: int = 365,
) -> kms.CryptoKey:
    """
    Creates a new Ethereum signing key in Cloud KMS backed by Cloud HSM.

    Args:
        project_id: Google Cloud project ID
        location_id: Cloud KMS location (e.g. 'us-east1')
        key_ring_id: ID of the Cloud KMS key ring
        key_id: ID of the key to create
        retention_days: Days to retain key versions before destruction (default: 365)

    Returns:
        CryptoKey: Created Cloud KMS key

    Raises:
        Exception: If key creation fails
    Reference:
        https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/kms/snippets/create_key_hsm.py
    """
    try:
        client = kms.KeyManagementServiceClient()
        key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)

        # Configure for Ethereum signing
        purpose = kms.CryptoKey.CryptoKeyPurpose.ASYMMETRIC_SIGN
        algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.EC_SIGN_SECP256K1_SHA256
        protection_level = kms.ProtectionLevel.HSM

        key = {
            "purpose": purpose,
            "version_template": {
                "algorithm": algorithm,
                "protection_level": protection_level,
            },
            "destroy_scheduled_duration": duration_pb2.Duration().FromTimedelta(
                datetime.timedelta(days=retention_days)
            ),
        }

        return client.create_crypto_key(
            request={"parent": key_ring_name, "crypto_key_id": key_id, "crypto_key": key}
        )
    except Exception as e:
        msg = f"Failed to create key: {e}"
        raise Exception(msg) from e

sign_message(message)

Sign a message with the GCP KMS key.

Parameters:

Name Type Description Default
message str | bytes

Message to sign (str or bytes)

required

Returns:

Name Type Description
Signature Signature

The v, r, s components of the signature

Raises:

Type Description
TypeError

If message is not str or bytes

ValueError

If message hash length is invalid

SignatureError

If signature verification fails

Exception

If signing fails

Example
    account = GCPKmsAccount()
    print(f"GCP KMS Account address: {account.address}")
    message = "Hello Ethereum!"
    # Sign the message
    signed_message = account.sign_message(message)
Source code in web3_google_hsm/accounts/gcp_kms_account.py
def sign_message(self, message: str | bytes) -> Signature:
    """
    Sign a message with the GCP KMS key.

    Args:
        message: Message to sign (str or bytes)

    Returns:
        Signature: The v, r, s components of the signature

    Raises:
        TypeError: If message is not str or bytes
        ValueError: If message hash length is invalid
        SignatureError: If signature verification fails
        Exception: If signing fails

    Example:
        ```{ .python .copy }
            account = GCPKmsAccount()
            print(f"GCP KMS Account address: {account.address}")
            message = "Hello Ethereum!"
            # Sign the message
            signed_message = account.sign_message(message)
        ```
    """
    # Convert message to SignableMessage format
    if isinstance(message, str):
        if message.startswith("0x"):
            hash_message = encode_defunct(hexstr=message)
        else:
            hash_message = encode_defunct(text=message)
    elif isinstance(message, bytes):
        hash_message = encode_defunct(primitive=message)
    else:
        msg = f"Unsupported message type: {type(message)}"
        raise TypeError(msg)

    # Sign message hash
    msghash = _hash_eip191_message(hash_message)
    if len(msghash) != MSG_HASH_LENGTH:
        msg = "Invalid message hash length"
        raise ValueError(msg)

    der_signature = self._sign_raw_hash(msghash)
    if not der_signature:
        msg = "Failed to sign message"
        raise Exception(msg)

    # Try both v values (27 and 28) to find the correct one
    for v_value in (27, 28):
        sig_dict = convert_der_to_rsv(der_signature, v_value)
        signature = Signature(v=sig_dict["v"], r=sig_dict["r"], s=sig_dict["s"])

        # Verify the signature
        recovered = Account.recover_message(hash_message, vrs=(signature.v, signature.r, signature.s))

        if recovered.lower() == self.address.lower():
            return signature

    msg = "Failed to create valid signature"
    raise SignatureError(msg)

sign_transaction(transaction)

Sign an EIP-155 transaction.

Parameters:

Name Type Description Default
transaction Transaction

Transaction to sign

required

Returns:

Type Description
bytes | None

bytes | None: Serialized signed transaction or None if signing fails

Source code in web3_google_hsm/accounts/gcp_kms_account.py
def sign_transaction(self, transaction: Transaction) -> bytes | None:
    """
    Sign an EIP-155 transaction.

    Args:
        transaction: Transaction to sign

    Returns:
        bytes | None: Serialized signed transaction or None if signing fails
    """
    # Create unsigned transaction dictionary
    unsigned_tx = {
        "nonce": transaction.nonce,
        "gasPrice": transaction.gas_price,
        "gas": transaction.gas_limit,
        "to": transaction.to,
        "value": transaction.value,
        "data": transaction.data,
        "chainId": transaction.chain_id,
    }

    # Convert to UnsignedTransaction and get hash
    unsigned_tx_obj = serializable_unsigned_transaction_from_dict(unsigned_tx)  # type: ignore
    msg_hash = unsigned_tx_obj.hash()

    # Sign the transaction hash
    der_signature = self._sign_raw_hash(msg_hash)
    if not der_signature:
        return None

    # Calculate v value based on chain ID
    v_base = (2 * transaction.chain_id + 35) if transaction.chain_id else 27
    sig_dict = convert_der_to_rsv(der_signature, v_base)

    # Create RLP serializable fields
    rlp_data = [
        transaction.nonce,
        transaction.gas_price,
        transaction.gas_limit,
        bytes.fromhex(transaction.to[2:]),  # Convert address to bytes
        transaction.value,
        bytes.fromhex(transaction.data[2:] if transaction.data.startswith("0x") else transaction.data),
        sig_dict["v"],
        int.from_bytes(sig_dict["r"], "big"),
        int.from_bytes(sig_dict["s"], "big"),
    ]

    # RLP encode the transaction and ensure it returns bytes
    encoded_tx = cast(bytes, rlp.encode(rlp_data))

    # Verify the signature
    recovered = Account.recover_transaction(encoded_tx)
    if recovered.lower() != self.address.lower():
        # Try with v + 1
        rlp_data[6] = sig_dict["v"] + 1  # Update v value
        encoded_tx = cast(bytes, rlp.encode(rlp_data))

        # Verify again
        recovered = Account.recover_transaction(encoded_tx)
        if recovered.lower() != self.address.lower():
            msg = "Failed to create valid signature"
            raise SignatureError(msg)

    return encoded_tx

Signature

Bases: BaseModel

Represents an Ethereum signature with v, r, s components.

Source code in web3_google_hsm/types/ethereum_types.py
class Signature(BaseModel):
    """Represents an Ethereum signature with v, r, s components."""

    v: int = Field(..., description="Recovery identifier")
    r: bytes = Field(..., description="R component of signature")
    s: bytes = Field(..., description="S component of signature")

    @field_validator("r", "s")
    @classmethod
    def validate_length(cls, v: bytes) -> bytes:
        if len(v) != MSG_HASH_LENGTH:
            msg = f"Length must be 32 bytes, got {len(v)} bytes"
            raise ValueError(msg)
        return v

    @field_validator("v")
    @classmethod
    def validate_v(cls, v: int) -> int:
        if v < 0:
            msg = "v must be non-negative"
            raise ValueError(msg)
        return v

    def to_hex(self) -> str:
        """Convert signature to hex string."""
        return "0x" + (self.r + self.s + bytes([self.v])).hex()

    @classmethod
    def from_hex(cls, hex_str: str) -> "Signature":
        """Create signature from hex string."""
        if hex_str.startswith("0x"):
            hex_str = hex_str[2:]
        sig_bytes = bytes.fromhex(hex_str)
        if len(sig_bytes) != SIGNATURE_LENGTH:
            msg = f"Invalid signature length: {len(sig_bytes)}"
            raise ValueError(msg)
        return cls(v=sig_bytes[64], r=sig_bytes[0:32], s=sig_bytes[32:64])

from_hex(hex_str) classmethod

Create signature from hex string.

Source code in web3_google_hsm/types/ethereum_types.py
@classmethod
def from_hex(cls, hex_str: str) -> "Signature":
    """Create signature from hex string."""
    if hex_str.startswith("0x"):
        hex_str = hex_str[2:]
    sig_bytes = bytes.fromhex(hex_str)
    if len(sig_bytes) != SIGNATURE_LENGTH:
        msg = f"Invalid signature length: {len(sig_bytes)}"
        raise ValueError(msg)
    return cls(v=sig_bytes[64], r=sig_bytes[0:32], s=sig_bytes[32:64])

to_hex()

Convert signature to hex string.

Source code in web3_google_hsm/types/ethereum_types.py
def to_hex(self) -> str:
    """Convert signature to hex string."""
    return "0x" + (self.r + self.s + bytes([self.v])).hex()

SignatureError

Bases: Exception

Raised when there are issues with signing.

Source code in web3_google_hsm/exceptions.py
1
2
3
4
class SignatureError(Exception):
    """
    Raised when there are issues with signing.
    """

Transaction

Bases: BaseModel

Represents an Ethereum transaction.

Source code in web3_google_hsm/types/ethereum_types.py
class Transaction(BaseModel):
    """Represents an Ethereum transaction."""

    model_config = ConfigDict(arbitrary_types_allowed=True, populate_by_name=True)

    chain_id: int = Field(..., description="Chain ID", validation_alias="chainId")
    nonce: int = Field(..., ge=0, description="Transaction nonce")
    gas_price: int = Field(..., gt=0, description="Gas price in Wei", validation_alias="gasPrice")
    gas_limit: int = Field(..., gt=0, description="Gas limit", validation_alias="gas")
    to: str = Field(..., description="Recipient address")
    value: int = Field(..., ge=0, description="Transaction value in Wei")
    data: str = Field("0x", description="Transaction data")
    from_: str = Field(..., description="Sender address", validation_alias="from")
    signature: Signature | None = Field(None, description="Transaction signature")

    @field_validator("to", "from_")
    @classmethod
    def validate_address(cls, v: str) -> str:
        if not is_address(v):
            msg = "Invalid Ethereum address"
            raise ValueError(msg)
        return to_checksum_address(v)

    @field_validator("data")
    @classmethod
    def validate_hex(cls, v: str) -> str:
        if not v.startswith("0x"):
            v = "0x" + v
        try:
            bytes.fromhex(v[2:])
        except ValueError as error:
            msg = "Invalid hex string"
            raise ValueError(msg) from error
        return v

    def to_dict(self) -> dict:
        """Convert transaction to dictionary format for web3.py."""
        tx_dict = {
            "chainId": self.chain_id,
            "nonce": self.nonce,
            "gasPrice": self.gas_price,
            "gas": self.gas_limit,
            "to": self.to,
            "value": self.value,
            "data": self.data,
            "from": self.from_,
        }
        return tx_dict

    def to_transaction_dict(self) -> dict:
        """Convert to dictionary format suitable for signing."""
        tx_dict = {
            "chainId": self.chain_id,
            "nonce": self.nonce,
            "gasPrice": self.gas_price,
            "gas": self.gas_limit,
            "to": self.to,
            "value": self.value,
            "data": self.data,
        }
        # Add signature if present
        if self.signature:
            tx_dict.update(
                {
                    "v": self.signature.v,
                    "r": int.from_bytes(self.signature.r, "big"),
                    "s": int.from_bytes(self.signature.s, "big"),
                }
            )
        return tx_dict

    @classmethod
    def from_dict(cls, data: dict) -> "Transaction":
        """
        Create transaction from dictionary.

        Args:
            data: Transaction data dictionary.

        Returns:
            Transaction: A new transaction instance

        Raises:
            ValueError: If required fields are missing or invalid
        """
        tx_data = data.copy()

        # Handle signature if present
        signature = None
        if all(k in tx_data for k in ["v", "r", "s"]):
            r_value = tx_data.pop("r")
            s_value = tx_data.pop("s")

            # Convert hex strings to bytes if necessary
            if isinstance(r_value, str):
                r_value = bytes.fromhex(r_value[2:] if r_value.startswith("0x") else r_value)
            if isinstance(s_value, str):
                s_value = bytes.fromhex(s_value[2:] if s_value.startswith("0x") else s_value)

            signature = Signature(v=tx_data.pop("v"), r=r_value, s=s_value)
            tx_data["signature"] = signature

        return cls(**tx_data)

    def serialize_transaction(self) -> bytes:
        """
        Serialize a transaction to bytes.

        Returns:
            bytes: The serialized transaction

        Raises:
            SignatureError: If transaction is not signed or signature verification fails
        """
        if not self.signature:
            msg = "The transaction is not signed."
            raise SignatureError(msg)

        # Create transaction dict without 'from' and with proper signature format
        txn_data = self.to_dict()

        if "from" in txn_data:
            txn_data.pop("from")

        # Create unsigned transaction dict
        unsigned_txn = serializable_unsigned_transaction_from_dict(txn_data)
        signature = (self.signature.v, to_int(self.signature.r), to_int(self.signature.s))

        signed_txn = encode_transaction(unsigned_txn, signature)

        # Verify signature
        recovered = Account.recover_transaction(signed_txn)
        if self.from_ and recovered.lower() != self.from_.lower():
            msg = f"Recovered signer doesn't match sender! Expected: {self.from_}, got: {recovered}"
            raise SignatureError(msg)

        return signed_txn

from_dict(data) classmethod

Create transaction from dictionary.

Parameters:

Name Type Description Default
data dict

Transaction data dictionary.

required

Returns:

Name Type Description
Transaction Transaction

A new transaction instance

Raises:

Type Description
ValueError

If required fields are missing or invalid

Source code in web3_google_hsm/types/ethereum_types.py
@classmethod
def from_dict(cls, data: dict) -> "Transaction":
    """
    Create transaction from dictionary.

    Args:
        data: Transaction data dictionary.

    Returns:
        Transaction: A new transaction instance

    Raises:
        ValueError: If required fields are missing or invalid
    """
    tx_data = data.copy()

    # Handle signature if present
    signature = None
    if all(k in tx_data for k in ["v", "r", "s"]):
        r_value = tx_data.pop("r")
        s_value = tx_data.pop("s")

        # Convert hex strings to bytes if necessary
        if isinstance(r_value, str):
            r_value = bytes.fromhex(r_value[2:] if r_value.startswith("0x") else r_value)
        if isinstance(s_value, str):
            s_value = bytes.fromhex(s_value[2:] if s_value.startswith("0x") else s_value)

        signature = Signature(v=tx_data.pop("v"), r=r_value, s=s_value)
        tx_data["signature"] = signature

    return cls(**tx_data)

serialize_transaction()

Serialize a transaction to bytes.

Returns:

Name Type Description
bytes bytes

The serialized transaction

Raises:

Type Description
SignatureError

If transaction is not signed or signature verification fails

Source code in web3_google_hsm/types/ethereum_types.py
def serialize_transaction(self) -> bytes:
    """
    Serialize a transaction to bytes.

    Returns:
        bytes: The serialized transaction

    Raises:
        SignatureError: If transaction is not signed or signature verification fails
    """
    if not self.signature:
        msg = "The transaction is not signed."
        raise SignatureError(msg)

    # Create transaction dict without 'from' and with proper signature format
    txn_data = self.to_dict()

    if "from" in txn_data:
        txn_data.pop("from")

    # Create unsigned transaction dict
    unsigned_txn = serializable_unsigned_transaction_from_dict(txn_data)
    signature = (self.signature.v, to_int(self.signature.r), to_int(self.signature.s))

    signed_txn = encode_transaction(unsigned_txn, signature)

    # Verify signature
    recovered = Account.recover_transaction(signed_txn)
    if self.from_ and recovered.lower() != self.from_.lower():
        msg = f"Recovered signer doesn't match sender! Expected: {self.from_}, got: {recovered}"
        raise SignatureError(msg)

    return signed_txn

to_dict()

Convert transaction to dictionary format for web3.py.

Source code in web3_google_hsm/types/ethereum_types.py
def to_dict(self) -> dict:
    """Convert transaction to dictionary format for web3.py."""
    tx_dict = {
        "chainId": self.chain_id,
        "nonce": self.nonce,
        "gasPrice": self.gas_price,
        "gas": self.gas_limit,
        "to": self.to,
        "value": self.value,
        "data": self.data,
        "from": self.from_,
    }
    return tx_dict

to_transaction_dict()

Convert to dictionary format suitable for signing.

Source code in web3_google_hsm/types/ethereum_types.py
def to_transaction_dict(self) -> dict:
    """Convert to dictionary format suitable for signing."""
    tx_dict = {
        "chainId": self.chain_id,
        "nonce": self.nonce,
        "gasPrice": self.gas_price,
        "gas": self.gas_limit,
        "to": self.to,
        "value": self.value,
        "data": self.data,
    }
    # Add signature if present
    if self.signature:
        tx_dict.update(
            {
                "v": self.signature.v,
                "r": int.from_bytes(self.signature.r, "big"),
                "s": int.from_bytes(self.signature.s, "big"),
            }
        )
    return tx_dict