zhzluke96
update
bed01bd
import logging
import sys
from functools import lru_cache
import torch
from modules import config
logger = logging.getLogger(__name__)
if sys.platform == "darwin":
from modules.devices import mac_devices
def has_mps() -> bool:
if sys.platform != "darwin":
return False
else:
return mac_devices.has_mps
def get_cuda_device_id():
return (
int(config.runtime_env_vars.device_id)
if config.runtime_env_vars.device_id is not None
and config.runtime_env_vars.device_id.isdigit()
else 0
) or torch.cuda.current_device()
def get_cuda_device_string():
if config.runtime_env_vars.device_id is not None:
return f"cuda:{config.runtime_env_vars.device_id}"
return "cuda"
def get_available_gpus() -> list[tuple[int, int]]:
"""
Get the list of available GPUs and their free memory.
:return: A list of tuples where each tuple contains (GPU index, free memory in bytes).
"""
available_gpus = []
for i in range(torch.cuda.device_count()):
props = torch.cuda.get_device_properties(i)
free_memory = props.total_memory - torch.cuda.memory_reserved(i)
available_gpus.append((i, free_memory))
return available_gpus
def get_memory_available_gpus(min_memory=2048):
available_gpus = get_available_gpus()
memory_available_gpus = [
gpu for gpu, free_memory in available_gpus if free_memory > min_memory
]
return memory_available_gpus
def get_target_device_id_or_memory_available_gpu():
memory_available_gpus = get_memory_available_gpus()
device_id = get_cuda_device_id()
if device_id not in memory_available_gpus:
if len(memory_available_gpus) != 0:
logger.warning(
f"Device {device_id} is not available or does not have enough memory. will try to use {memory_available_gpus}"
)
config.runtime_env_vars.device_id = str(memory_available_gpus[0])
else:
logger.warning(
f"Device {device_id} is not available or does not have enough memory. Using CPU instead."
)
return "cpu"
return get_cuda_device_string()
def get_optimal_device_name():
if config.runtime_env_vars.use_cpu == "all":
return "cpu"
if torch.cuda.is_available():
return get_target_device_id_or_memory_available_gpu()
if has_mps():
return "mps"
return "cpu"
def get_optimal_device():
return torch.device(get_optimal_device_name())
def get_device_for(task):
if (
task in config.runtime_env_vars.use_cpu
or "all" in config.runtime_env_vars.use_cpu
):
return cpu
return get_optimal_device()
def torch_gc():
try:
if torch.cuda.is_available():
with torch.cuda.device(get_cuda_device_string()):
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
if has_mps():
mac_devices.torch_mps_gc()
except Exception as e:
logger.error(f"Error in torch_gc", exc_info=True)
cpu: torch.device = torch.device("cpu")
device: torch.device = None
dtype: torch.dtype = torch.float32
dtype_dvae: torch.dtype = torch.float32
dtype_vocos: torch.dtype = torch.float32
dtype_gpt: torch.dtype = torch.float32
dtype_decoder: torch.dtype = torch.float32
def reset_device():
global device
global dtype
global dtype_dvae
global dtype_vocos
global dtype_gpt
global dtype_decoder
if config.runtime_env_vars.use_cpu is None:
config.runtime_env_vars.use_cpu = []
if "all" in config.runtime_env_vars.use_cpu and not config.runtime_env_vars.no_half:
logger.warning(
"Cannot use half precision with CPU, using full precision instead"
)
config.runtime_env_vars.no_half = True
if not config.runtime_env_vars.no_half:
dtype = torch.float16
dtype_dvae = torch.float16
dtype_vocos = torch.float16
dtype_gpt = torch.float16
dtype_decoder = torch.float16
logger.info("Using half precision: torch.float16")
else:
dtype = torch.float32
dtype_dvae = torch.float32
dtype_vocos = torch.float32
dtype_gpt = torch.float32
dtype_decoder = torch.float32
logger.info("Using full precision: torch.float32")
if "all" in config.runtime_env_vars.use_cpu:
device = cpu
else:
device = get_optimal_device()
logger.info(f"Using device: {device}")
@lru_cache
def first_time_calculation():
"""
just do any calculation with pytorch layers - the first time this is done it allocaltes about 700MB of memory and
spends about 2.7 seconds doing that, at least wih NVidia.
"""
x = torch.zeros((1, 1)).to(device, dtype)
linear = torch.nn.Linear(1, 1).to(device, dtype)
linear(x)
x = torch.zeros((1, 1, 3, 3)).to(device, dtype)
conv2d = torch.nn.Conv2d(1, 1, (3, 3)).to(device, dtype)
conv2d(x)