import ast
import hashlib
from collections import namedtuple
from typing import Any, Literal, Union, Optional, TYPE_CHECKING
from urllib.parse import urlparse
import scalecodec
from async_substrate_interface.utils import (
hex_to_bytes,
)
from bittensor_wallet import Keypair
from bittensor_wallet.errors import KeyFileError, PasswordError
from scalecodec import ss58_decode, is_valid_ss58_address as _is_valid_ss58_address
from bittensor.core import settings
from bittensor.core.settings import SS58_FORMAT
from bittensor.utils.btlogging import logging
from .registration import torch, use_torch
from .version import version_checking, check_version, VersionCheckError
if TYPE_CHECKING:
from bittensor_wallet import Wallet
# redundant aliases
logging = logging
torch = torch
use_torch = use_torch
version_checking = version_checking
check_version = check_version
VersionCheckError = VersionCheckError
ss58_decode = ss58_decode
hex_to_bytes = hex_to_bytes
RAOPERTAO = 1e9
U16_MAX = 65535
U64_MAX = 18446744073709551615
UnlockStatus = namedtuple("UnlockStatus", ["success", "message"])
[docs]
class Certificate(str):
def __new__(cls, data: Union[str, dict]):
if isinstance(data, dict):
tuple_ascii = data["public_key"][0]
string = chr(data["algorithm"]) + "".join(chr(i) for i in tuple_ascii)
else:
string = data
return str.__new__(cls, string)
def _decode_hex_identity_dict(info_dictionary: dict[str, Any]) -> dict[str, Any]:
# TODO why does this exist alongside `decode_hex_identity_dict`?
"""Decodes a dictionary of hexadecimal identities."""
decoded_info = {}
for k, v in info_dictionary.items():
if isinstance(v, dict):
item = next(iter(v.values()))
else:
item = v
if isinstance(item, tuple):
try:
decoded_info[k] = bytes(item).decode()
except UnicodeDecodeError:
print(f"Could not decode: {k}: {item}")
else:
decoded_info[k] = item
return decoded_info
[docs]
def ss58_to_vec_u8(ss58_address: str) -> list[int]:
ss58_bytes: bytes = ss58_address_to_bytes(ss58_address)
encoded_address: list[int] = [int(byte) for byte in ss58_bytes]
return encoded_address
[docs]
def strtobool(val: str) -> Union[bool, Literal["==SUPRESS=="]]:
"""
Converts a string to a boolean value.
truth-y values are 'y', 'yes', 't', 'true', 'on', and '1';
false-y values are 'n', 'no', 'f', 'false', 'off', and '0'.
Raises ValueError if 'val' is anything else.
"""
val = val.lower()
if val in ("y", "yes", "t", "true", "on", "1"):
return True
elif val in ("n", "no", "f", "false", "off", "0"):
return False
else:
raise ValueError("invalid truth value %r" % (val,))
def _get_explorer_root_url_by_network_from_map(
network: str, network_map: dict[str, dict[str, str]]
) -> Optional[dict[str, str]]:
"""
Returns the explorer root url for the given network name from the given network map.
Args:
network(str): The network to get the explorer url for.
network_map(dict[str, str]): The network map to get the explorer url from.
Returns:
The explorer url for the given network.
Or None if the network is not in the network map.
"""
explorer_urls: Optional[dict[str, str]] = {}
for entity_nm, entity_network_map in network_map.items():
if network in entity_network_map:
explorer_urls[entity_nm] = entity_network_map[network]
return explorer_urls
[docs]
def get_explorer_url_for_network(
network: str, block_hash: str, network_map: dict[str, dict[str, str]]
) -> Optional[dict[str, str]]:
"""
Returns the explorer url for the given block hash and network.
Args:
network(str): The network to get the explorer url for.
block_hash(str): The block hash to get the explorer url for.
network_map(dict[str, dict[str, str]]): The network maps to get the explorer urls from.
Returns:
The explorer url for the given block hash and network.
Or None if the network is not known.
"""
explorer_urls: Optional[dict[str, str]] = {}
# Will be None if the network is not known. i.e. not in network_map
explorer_root_urls: Optional[dict[str, str]] = (
_get_explorer_root_url_by_network_from_map(network, network_map)
)
if explorer_root_urls != {}:
# We are on a known network.
explorer_opentensor_url = (
f"{explorer_root_urls.get('opentensor')}/query/{block_hash}"
)
explorer_taostats_url = (
f"{explorer_root_urls.get('taostats')}/extrinsic/{block_hash}"
)
explorer_urls["opentensor"] = explorer_opentensor_url
explorer_urls["taostats"] = explorer_taostats_url
return explorer_urls
[docs]
def ss58_address_to_bytes(ss58_address: str) -> bytes:
"""Converts a ss58 address to a bytes object."""
account_id_hex: str = scalecodec.ss58_decode(ss58_address, SS58_FORMAT)
return bytes.fromhex(account_id_hex)
[docs]
def u16_normalized_float(x: int) -> float:
return float(x) / float(U16_MAX)
[docs]
def u64_normalized_float(x: int) -> float:
return float(x) / float(U64_MAX)
[docs]
def get_hash(content, encoding="utf-8"):
sha3 = hashlib.sha3_256()
# Update the hash object with the concatenated string
sha3.update(content.encode(encoding))
# Produce the hash
return sha3.hexdigest()
[docs]
def is_valid_ss58_address(address: str) -> bool:
"""
Checks if the given address is a valid ss58 address.
Args:
address(str): The address to check.
Returns:
True if the address is a valid ss58 address for Bittensor, False otherwise.
"""
try:
return _is_valid_ss58_address(
address, valid_ss58_format=SS58_FORMAT
) or _is_valid_ss58_address(
address, valid_ss58_format=42
) # Default substrate ss58 format (legacy)
except IndexError:
return False
def _is_valid_ed25519_pubkey(public_key: Union[str, bytes]) -> bool:
"""
Checks if the given public_key is a valid ed25519 key.
Args:
public_key(Union[str, bytes]): The public_key to check.
Returns:
True if the public_key is a valid ed25519 key, False otherwise.
"""
try:
if isinstance(public_key, str):
if len(public_key) != 64 and len(public_key) != 66:
raise ValueError("a public_key should be 64 or 66 characters")
elif isinstance(public_key, bytes):
if len(public_key) != 32:
raise ValueError("a public_key should be 32 bytes")
else:
raise ValueError("public_key must be a string or bytes")
keypair = Keypair(public_key=public_key)
ss58_addr = keypair.ss58_address
return ss58_addr is not None
except (ValueError, IndexError):
return False
[docs]
def is_valid_bittensor_address_or_public_key(address: Union[str, bytes]) -> bool:
"""
Checks if the given address is a valid destination address.
Args:
address(Union[str, bytes]): The address to check.
Returns:
True if the address is a valid destination address, False otherwise.
"""
if isinstance(address, str):
# Check if ed25519
if address.startswith("0x"):
return _is_valid_ed25519_pubkey(address)
else:
# Assume ss58 address
return is_valid_ss58_address(address)
elif isinstance(address, bytes):
# Check if ed25519
return _is_valid_ed25519_pubkey(address)
else:
# Invalid address type
return False
[docs]
def decode_hex_identity_dict(info_dictionary) -> dict[str, Any]:
"""
Decodes hex-encoded strings in a dictionary.
This function traverses the given dictionary, identifies hex-encoded strings, and decodes them into readable
strings. It handles nested dictionaries and lists within the dictionary.
Args:
info_dictionary (dict): The dictionary containing hex-encoded strings to decode.
Returns:
dict: The dictionary with decoded strings.
Examples:
input_dict = {
... "name": {"value": "0x6a6f686e"},
... "additional": [
... [{"data": "0x64617461"}]
... ]
... }
decode_hex_identity_dict(input_dict)
{'name': 'john', 'additional': [('data', 'data')]}
"""
def get_decoded(data: str) -> Optional[str]:
"""Decodes a hex-encoded string."""
try:
return bytes.fromhex(data[2:]).decode()
except UnicodeDecodeError:
print(f"Could not decode: {key}: {item}")
for key, value in info_dictionary.items():
if isinstance(value, dict):
item = list(value.values())[0]
if isinstance(item, str) and item.startswith("0x"):
try:
info_dictionary[key] = get_decoded(item)
except UnicodeDecodeError:
print(f"Could not decode: {key}: {item}")
else:
info_dictionary[key] = item
if key == "additional":
additional = []
for item in value:
additional.append(
tuple(
get_decoded(data=next(iter(sub_item.values())))
for sub_item in item
)
)
info_dictionary[key] = additional
return info_dictionary
[docs]
def validate_chain_endpoint(endpoint_url: str) -> tuple[bool, str]:
"""Validates if the provided endpoint URL is a valid WebSocket URL."""
parsed = urlparse(endpoint_url)
if parsed.scheme not in ("ws", "wss"):
return False, (
f"Invalid URL or network name provided: ({endpoint_url}).\n"
"Allowed network names are finney, test, local. "
"Valid chain endpoints should use the scheme `ws` or `wss`.\n"
)
if not parsed.netloc:
return False, "Invalid URL passed as the endpoint"
return True, ""
[docs]
def unlock_key(wallet: "Wallet", unlock_type="coldkey") -> "UnlockStatus":
"""
Attempts to decrypt a wallet's coldkey or hotkey
Args:
wallet: a Wallet object
unlock_type: the key type, 'coldkey' or 'hotkey'
Returns: UnlockStatus for success status of unlock, with error message if unsuccessful
"""
if unlock_type == "coldkey":
unlocker = "unlock_coldkey"
elif unlock_type == "hotkey":
unlocker = "unlock_hotkey"
else:
raise ValueError(
f"Invalid unlock type provided: {unlock_type}. Must be 'coldkey' or 'hotkey'."
)
try:
getattr(wallet, unlocker)()
return UnlockStatus(True, "")
except PasswordError:
err_msg = f"The password used to decrypt your {unlock_type.capitalize()} keyfile is invalid."
return UnlockStatus(False, err_msg)
except KeyFileError:
err_msg = f"{unlock_type.capitalize()} keyfile is corrupt, non-writable, or non-readable, or non-existent."
return UnlockStatus(False, err_msg)
[docs]
def determine_chain_endpoint_and_network(
network: str,
) -> tuple[Optional[str], Optional[str]]:
"""Determines the chain endpoint and network from the passed network or chain_endpoint.
Arguments:
network (str): The network flag. The choices are: ``finney`` (main network), ``archive`` (archive network
+300 blocks), ``local`` (local running network), ``test`` (test network).
Returns:
tuple[Optional[str], Optional[str]]: The network and chain endpoint flag. If passed, overrides the
``network`` argument.
"""
if network is None:
return None, None
if network in settings.NETWORKS:
return network, settings.NETWORK_MAP[network]
substrings_map = {
"entrypoint-finney.opentensor.ai": ("finney", settings.FINNEY_ENTRYPOINT),
"test.finney.opentensor.ai": ("test", settings.FINNEY_TEST_ENTRYPOINT),
"archive.chain.opentensor.ai": ("archive", settings.ARCHIVE_ENTRYPOINT),
"subvortex": ("subvortex", settings.SUBVORTEX_ENTRYPOINT),
"127.0.0.1": ("local", network),
"localhost": ("local", network),
}
for substring, result in substrings_map.items():
if substring in network and validate_chain_endpoint(network):
return result
return "unknown", network