Source code for bittensor.commands.utils

# The MIT License (MIT)
# Copyright © 2021 Yuma Rao

# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the “Software”), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all copies or substantial portions of
# the Software.

# THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
# THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

import sys
import os
import bittensor
import requests
from bittensor.utils.registration import torch
from bittensor.utils.balance import Balance
from bittensor.utils import U64_NORMALIZED_FLOAT, U16_NORMALIZED_FLOAT
from typing import List, Dict, Any, Optional, Tuple
from rich.prompt import Confirm, PromptBase
from dataclasses import dataclass
from . import defaults

console = bittensor.__console__


[docs] class IntListPrompt(PromptBase): """Prompt for a list of integers."""
[docs] def check_choice(self, value: str) -> bool: assert self.choices is not None # check if value is a valid choice or all the values in a list of ints are valid choices return ( value == "All" or value in self.choices or all( val.strip() in self.choices for val in value.replace(",", " ").split() ) )
[docs] def check_netuid_set( config: "bittensor.config", subtensor: "bittensor.subtensor", allow_none: bool = False, ): if subtensor.network != "nakamoto": all_netuids = [str(netuid) for netuid in subtensor.get_subnets()] if len(all_netuids) == 0: console.print(":cross_mark:[red]There are no open networks.[/red]") sys.exit() # Make sure netuid is set. if not config.is_set("netuid"): if not config.no_prompt: netuid = IntListPrompt.ask( "Enter netuid", choices=all_netuids, default=str(all_netuids[0]) ) else: netuid = str(defaults.netuid) if not allow_none else "None" else: netuid = config.netuid if isinstance(netuid, str) and netuid.lower() in ["none"] and allow_none: config.netuid = None else: if isinstance(netuid, list): netuid = netuid[0] try: config.netuid = int(netuid) except: raise ValueError('netuid must be an integer or "None" (if applicable)')
[docs] def check_for_cuda_reg_config(config: "bittensor.config") -> None: """Checks, when CUDA is available, if the user would like to register with their CUDA device.""" if torch and torch.cuda.is_available(): if not config.no_prompt: if config.pow_register.cuda.get("use_cuda") is None: # flag not set # Ask about cuda registration only if a CUDA device is available. cuda = Confirm.ask("Detected CUDA device, use CUDA for registration?\n") config.pow_register.cuda.use_cuda = cuda # Only ask about which CUDA device if the user has more than one CUDA device. if ( config.pow_register.cuda.use_cuda and config.pow_register.cuda.get("dev_id") is None ): devices: List[str] = [str(x) for x in range(torch.cuda.device_count())] device_names: List[str] = [ torch.cuda.get_device_name(x) for x in range(torch.cuda.device_count()) ] console.print("Available CUDA devices:") choices_str: str = "" for i, device in enumerate(devices): choices_str += " {}: {}\n".format(device, device_names[i]) console.print(choices_str) dev_id = IntListPrompt.ask( "Which GPU(s) would you like to use? Please list one, or comma-separated", choices=devices, default="All", ) if dev_id.lower() == "all": dev_id = list(range(torch.cuda.device_count())) else: try: # replace the commas with spaces then split over whitespace., # then strip the whitespace and convert to ints. dev_id = [ int(dev_id.strip()) for dev_id in dev_id.replace(",", " ").split() ] except ValueError: console.log( ":cross_mark:[red]Invalid GPU device[/red] [bold white]{}[/bold white]\nAvailable CUDA devices:{}".format( dev_id, choices_str ) ) sys.exit(1) config.pow_register.cuda.dev_id = dev_id else: # flag was not set, use default value. if config.pow_register.cuda.get("use_cuda") is None: config.pow_register.cuda.use_cuda = defaults.pow_register.cuda.use_cuda
[docs] def get_hotkey_wallets_for_wallet(wallet) -> List["bittensor.wallet"]: hotkey_wallets = [] hotkeys_path = wallet.path + "/" + wallet.name + "/hotkeys" try: hotkey_files = next(os.walk(os.path.expanduser(hotkeys_path)))[2] except StopIteration: hotkey_files = [] for hotkey_file_name in hotkey_files: try: hotkey_for_name = bittensor.wallet( path=wallet.path, name=wallet.name, hotkey=hotkey_file_name ) if ( hotkey_for_name.hotkey_file.exists_on_device() and not hotkey_for_name.hotkey_file.is_encrypted() ): hotkey_wallets.append(hotkey_for_name) except Exception: pass return hotkey_wallets
[docs] def get_coldkey_wallets_for_path(path: str) -> List["bittensor.wallet"]: try: wallet_names = next(os.walk(os.path.expanduser(path)))[1] return [bittensor.wallet(path=path, name=name) for name in wallet_names] except StopIteration: # No wallet files found. wallets = [] return wallets
[docs] def get_all_wallets_for_path(path: str) -> List["bittensor.wallet"]: all_wallets = [] cold_wallets = get_coldkey_wallets_for_path(path) for cold_wallet in cold_wallets: if ( cold_wallet.coldkeypub_file.exists_on_device() and not cold_wallet.coldkeypub_file.is_encrypted() ): all_wallets.extend(get_hotkey_wallets_for_wallet(cold_wallet)) return all_wallets
[docs] def filter_netuids_by_registered_hotkeys( cli, subtensor, netuids, all_hotkeys ) -> List[int]: netuids_with_registered_hotkeys = [] for wallet in all_hotkeys: netuids_list = subtensor.get_netuids_for_hotkey(wallet.hotkey.ss58_address) bittensor.logging.debug( f"Hotkey {wallet.hotkey.ss58_address} registered in netuids: {netuids_list}" ) netuids_with_registered_hotkeys.extend(netuids_list) if not cli.config.netuids: netuids = netuids_with_registered_hotkeys else: netuids = [netuid for netuid in netuids if netuid in cli.config.netuids] netuids.extend(netuids_with_registered_hotkeys) return list(set(netuids))
[docs] def normalize_hyperparameters( subnet: bittensor.SubnetHyperparameters, ) -> List[Tuple[str, str, str]]: """ Normalizes the hyperparameters of a subnet. Args: subnet: The subnet hyperparameters object. Returns: A list of tuples containing the parameter name, value, and normalized value. """ param_mappings = { "adjustment_alpha": U64_NORMALIZED_FLOAT, "min_difficulty": U64_NORMALIZED_FLOAT, "max_difficulty": U64_NORMALIZED_FLOAT, "difficulty": U64_NORMALIZED_FLOAT, "bonds_moving_avg": U64_NORMALIZED_FLOAT, "max_weight_limit": U16_NORMALIZED_FLOAT, "kappa": U16_NORMALIZED_FLOAT, "alpha_high": U16_NORMALIZED_FLOAT, "alpha_low": U16_NORMALIZED_FLOAT, "min_burn": Balance.from_rao, "max_burn": Balance.from_rao, } normalized_values: List[Tuple[str, str, str]] = [] subnet_dict = subnet.__dict__ for param, value in subnet_dict.items(): try: if param in param_mappings: norm_value = param_mappings[param](value) if isinstance(norm_value, float): norm_value = f"{norm_value:.{10}g}" else: norm_value = value except Exception as e: bittensor.logging.warning(f"Error normalizing parameter '{param}': {e}") norm_value = "-" normalized_values.append((param, str(value), str(norm_value))) return normalized_values
[docs] @dataclass class DelegatesDetails: name: str url: str description: str signature: str
[docs] @classmethod def from_json(cls, json: Dict[str, any]) -> "DelegatesDetails": return cls( name=json["name"], url=json["url"], description=json["description"], signature=json["signature"], )
[docs] def _get_delegates_details_from_github( requests_get, url: str ) -> Dict[str, DelegatesDetails]: response = requests_get(url) if response.status_code == 200: all_delegates: Dict[str, Any] = response.json() all_delegates_details = {} for delegate_hotkey, delegates_details in all_delegates.items(): all_delegates_details[delegate_hotkey] = DelegatesDetails.from_json( delegates_details ) return all_delegates_details else: return {}
[docs] def get_delegates_details(url: str) -> Optional[Dict[str, DelegatesDetails]]: try: return _get_delegates_details_from_github(requests.get, url) except Exception: return None # Fail silently