"""Run codes.""" # pylint: disable=line-too-long, broad-exception-caught, invalid-name, missing-function-docstring, too-many-instance-attributes, missing-class-docstring # ruff: noqa: E501 import os import platform import random import time from dataclasses import asdict, dataclass, field from pathlib import Path from textwrap import dedent # from types import SimpleNamespace import gradio as gr import psutil from about_time import about_time from ctransformers import AutoModelForCausalLM from dl_hf_model import dl_hf_model from loguru import logger from examples_list import examples_list url = "https://huggingface.co/TheBloke/llama-2-13B-Guanaco-QLoRA-GGML/blob/main/llama-2-13b-guanaco-qlora.ggmlv3.q4_K_S.bin" # 8.14G LLM = None if "forindo" in platform.node(): # deploy 70b model locally # url = "https://huggingface.co/TheBloke/llama-2-70b-Guanaco-QLoRA-GGML/blob/main/llama-2-70b-guanaco-qlora.ggmlv3.q3_K_S.bin" # 29.7G # model_loc = "/home/mu2018/github/langchain-llama-2-70b-guanaco-qlora-ggml/models/llama-2-70b-guanaco-qlora.ggmlv3.q3_K_S.bin" _ = """ url = "https://huggingface.co/TheBloke/StableBeluga2-70B-GGML/blob/main/stablebeluga2-70b.ggmlv3.q3_K_S.bin" try: model_loc, file_size = dl_hf_model(url) logger.info(f"done load llm {model_loc=} {file_size=}G") except Exception as exc_: logger.error(exc_) raise SystemExit(1) from exc_ # """ model_loc = "models/stablebeluga2-70b.ggmlv3.q3_K_S.bin" assert Path(model_loc).exists(), f"Make sure {model_loc=} exists." else: try: logger.debug(f" dl {url}") model_loc, file_size = dl_hf_model(url) logger.info(f"done load llm {model_loc=} {file_size=}G") except Exception as exc_: logger.error(exc_) raise SystemExit(1) from exc_ # raise SystemExit(0) # Prompt template: Guanaco # {past_history} prompt_template = """You are a helpful assistant. Let's think step by step. ### Human: {question} ### Assistant:""" human_prefix = "### Human" ai_prefix = "### Assistant" stop_list = [f"{human_prefix}:"] if "beluga" in model_loc.lower(): prompt_template = dedent( """ ### System: You are Stable Beluga, an AI that follows instructions extremely well. Help as much as you can. Let's think step by step. ### User: {question} ### Assistant: """ ).lstrip() human_prefix = "### User" ai_prefix = "### Assistant" stop_list = [f"{human_prefix}:"] _ = psutil.cpu_count(logical=False) - 1 cpu_count: int = int(_) if _ else 1 logger.debug(f"{cpu_count=}") logger.debug(f"{model_loc=}") LLM = AutoModelForCausalLM.from_pretrained( model_loc, model_type="llama", threads=cpu_count, ) os.environ["TZ"] = "Asia/Shanghai" try: time.tzset() # type: ignore # pylint: disable=no-member except Exception: # Windows logger.warning("Windows, cant run time.tzset()") @dataclass class GenerationConfig: temperature: float = 0.7 top_k: int = 50 top_p: float = 0.9 repetition_penalty: float = 1.0 max_new_tokens: int = 512 seed: int = 42 reset: bool = False stream: bool = True threads: int = cpu_count stop: list[str] = field(default_factory=lambda: stop_list) def generate( question: str, llm=LLM, config: GenerationConfig = GenerationConfig(), ): """Run model inference, will return a Generator if streaming is true.""" # _ = prompt_template.format(question=question) # print(_) prompt = prompt_template.format(question=question) return llm( prompt, **asdict(config), ) logger.debug(f"{asdict(GenerationConfig())=}") def user(user_message, history): # return user_message, history + [[user_message, None]] if history is None: history = [] history.append([user_message, None]) return user_message, history # keep user_message def user1(user_message, history): # return user_message, history + [[user_message, None]] if history is None: history = [] history.append([user_message, None]) return "", history # clear user_message def bot_(history): user_message = history[-1][0] resp = random.choice(["How are you?", "I love you", "I'm very hungry"]) bot_message = user_message + ": " + resp history[-1][1] = "" for character in bot_message: history[-1][1] += character time.sleep(0.02) yield history history[-1][1] = resp yield history def bot(history): user_message = "" try: user_message = history[-1][0] except Exception as exc: logger.error(exc) response = [] logger.debug(f"{user_message=}") with about_time() as atime: # type: ignore flag = 1 prefix = "" then = time.time() logger.debug("about to generate") config = GenerationConfig(reset=True) for elm in generate(user_message, config=config): if flag == 1: logger.debug("in the loop") prefix = f"({time.time() - then:.2f}s) " flag = 0 print(prefix, end="", flush=True) logger.debug(f"{prefix=}") print(elm, end="", flush=True) # logger.debug(f"{elm}") response.append(elm) history[-1][1] = prefix + "".join(response) yield history _ = ( f"(time elapsed: {atime.duration_human}, " # type: ignore f"{atime.duration/len(''.join(response)):.2f}s/char)" # type: ignore ) history[-1][1] = "".join(response) + f"\n{_}" yield history def predict_api(prompt): logger.debug(f"{prompt=}") try: # user_prompt = prompt config = GenerationConfig( temperature=0.2, top_k=10, top_p=0.9, repetition_penalty=1.0, max_new_tokens=512, # adjust as needed seed=42, reset=True, # reset history (cache) stream=False, # threads=cpu_count, # stop=prompt_prefix[1:2], ) response = generate( prompt, config=config, ) logger.debug(f"api: {response=}") except Exception as exc: logger.error(exc) response = f"{exc=}" # bot = {"inputs": [response]} # bot = [(prompt, response)] return response css = """ .importantButton { background: linear-gradient(45deg, #7e0570,#5d1c99, #6e00ff) !important; border: none !important; } .importantButton:hover { background: linear-gradient(45deg, #ff00e0,#8500ff, #6e00ff) !important; border: none !important; } .disclaimer {font-variant-caps: all-small-caps; font-size: xx-small;} .xsmall {font-size: x-small;} """ logger.info("start block") with gr.Blocks( title=f"{Path(model_loc).name}", # theme=gr.themes.Soft(text_size="sm", spacing_size="sm"), theme=gr.themes.Glass(text_size="sm", spacing_size="sm"), css=css, ) as block: # buff_var = gr.State("") with gr.Accordion("🎈 Info", open=False): gr.Markdown( f"""