source-code for model version v0.16_20250319_204732506_UTC- retrain-pipelines 0.1.1
058ff6b
verified
| from unsloth import FastLanguageModel, \ | |
| is_bfloat16_supported, UnslothTrainer, \ | |
| UnslothTrainingArguments | |
| import torch | |
| import os | |
| import sys | |
| import gc | |
| import json | |
| import time | |
| import shutil | |
| import logging | |
| import traceback | |
| import subprocess | |
| import importlib.util | |
| from enum import Enum | |
| from io import StringIO | |
| from textwrap import dedent | |
| from datetime import datetime | |
| from contextlib import redirect_stdout | |
| import numpy as np | |
| import pandas as pd | |
| import polars as pl | |
| from polars.exceptions import ComputeError | |
| import matplotlib | |
| import matplotlib.pyplot as plt | |
| from jinja2 import Environment, FileSystemLoader | |
| from metaflow import FlowSpec, step, Parameter, JSONType, \ | |
| IncludeFile, current, metaflow_config as mf_config, \ | |
| resources, Flow, Task, card | |
| from metaflow.current import Current | |
| from metaflow.cards import Image, Table, Markdown, \ | |
| Artifact, get_cards | |
| from datasets import load_dataset, Dataset, DatasetDict | |
| from datasets.config import HF_DATASETS_CACHE, HF_CACHE_HOME | |
| from huggingface_hub import list_repo_commits | |
| from transformers import AutoTokenizer | |
| from transformers.utils import logging as hf_logging | |
| from retrain_pipelines import __version__ | |
| from retrain_pipelines.dataset.hf_utils import get_lazy_df, \ | |
| get_column_info, iterable_dataset_multi_buffer_sampler, \ | |
| push_dataset_version_to_hub | |
| from retrain_pipelines.dataset.tool_calls import \ | |
| get_unique_tools, count_tool_occurrences, \ | |
| plot_tools_occurences, column_words_stats, \ | |
| plot_words_count | |
| from retrain_pipelines.utils.hf_utils import \ | |
| get_new_repo_minor_version, push_files_to_hub_repo_branch | |
| from retrain_pipelines.utils import create_requirements | |
| class LocalServeReadinessEnum(Enum): | |
| """ | |
| tracking local-serve (infra-validation) | |
| status using a "3+"-states enum : | |
| - "-1" for "not applicable" | |
| (i.e. "model version not blessed"), | |
| - "0/1" bool for failure/success. | |
| """ | |
| NOT_APPLICABLE = -1 | |
| FAILURE = 0 | |
| FAILURE_NO_DOCKER = 2 | |
| SUCCESS = 1 | |
| class UnslothFuncCallFlow(FlowSpec): | |
| """ | |
| Training pipeline | |
| """ | |
| # @see https://github.com/unslothai/unsloth/wiki | |
| #--- flow parameters ------------------------------------------------------- | |
| RETRAIN_PIPELINE_TYPE = "mf_unsloth_func_call_litserve" | |
| # in order to share the config across subprocesses | |
| os.environ["retrain_pipeline_type"] = RETRAIN_PIPELINE_TYPE | |
| hf_dataset = Parameter( | |
| "hf_dataset", | |
| help="dict with 'repo_id' and 'commit_hash' keys. " + \ | |
| "if 'commit_hash is None, falls back to latest version " +\ | |
| "of the dataset available in parquet format.\n" + | |
| "Note that there are 3 required 'attributes' of type " + \ | |
| "str, list[str], list[str]", | |
| type=JSONType, | |
| default=dedent("""{ | |
| "repo_id": "Salesforce/xlam-function-calling-60k", | |
| "config_name": "", | |
| "commit_hash": "", | |
| "attributes": { | |
| "query_attr": "query", | |
| "answers_attr": "answers", | |
| "tools_attr": "tools" | |
| } | |
| }""").replace("'", '"').strip('"') | |
| ) | |
| augmentation_rate = Parameter( | |
| "augmentation_rate", | |
| type=float, | |
| default=.05, | |
| help="proportion of records to be augmented "+\ | |
| "(x% of original dataset is created"+\ | |
| " as additional augmented datapoints), i.e. "+\ | |
| "truncated queries to serve as negative examples, "+\ | |
| "meaning they trigger no tool call "+\ | |
| "due to info incompleteness." | |
| ) | |
| hf_enrich_dataset = Parameter( | |
| "hf_enrich_dataset", | |
| help="dict with 'repo_id', 'config_name' and 'commit_hash', "+\ | |
| "query_attribute' and 'query_attribute_handler' keys. "+\ | |
| "if 'commit_hash is None, falls back to latest version "+\ | |
| "of the dataset available in parquet format."+\ | |
| "'query_attribute' depicts the dataset attribute "+\ | |
| "from which 'queries' are to be sampled."+\ | |
| "'query_attribute_handler' serves for attributes "+\ | |
| "that have complex structure, "+\ | |
| "other than 'string' datatype.", | |
| type=JSONType, | |
| # @see https://huggingface.co/datasets/google-research-datasets/natural_questions | |
| default=dedent("""{ | |
| "repo_id": "lighteval/natural_questions_clean", | |
| "config_name": "", | |
| "commit_hash": "", | |
| "query_attribute": "question", | |
| "query_attribute_handler": "lambda x: x" | |
| }""").replace("'", '"').strip('"') | |
| ) | |
| enrichment_rate = Parameter( | |
| "enrichment_rate", | |
| type=float, | |
| default=.1, | |
| help="proportion of records "+\ | |
| "to be added from the 'hf_enrich_dataset'"+\ | |
| "(x% of original dataset is sampled and"+\ | |
| " added as enriching datapoints), i.e. "+\ | |
| "queries to serve as negative examples, "+\ | |
| "due to their complete disconnexion "+\ | |
| "to tool calling situations." | |
| ) | |
| dataset_repo_id = Parameter( | |
| "dataset_repo_id", | |
| type=str, | |
| default="retrain-pipelines/func_calls", | |
| help="The 'repo_id' to be used " + \ | |
| "for the Hugging Face dataset version push " + \ | |
| "(will be created at runtime" + \ | |
| " if doesn't already exist)." | |
| ) | |
| hf_base_model = Parameter( | |
| "hf_base_model", | |
| help="dict with 'repo_id' and 'commit_hash' keys."+\ | |
| "if 'commit_hash is None, falls back "+\ | |
| "to latest available version of the model.", | |
| type=JSONType, | |
| default=dedent("""{ | |
| "repo_id": "unsloth/Qwen2.5-1.5B", | |
| "commit_hash": "" | |
| }""").replace("'", '"').strip('"') | |
| ) | |
| cpt_training_args = Parameter( | |
| "cpt_training_args", | |
| help="dict with `TrainingArguments` params "+\ | |
| "for the CPT job.", | |
| type=JSONType, | |
| default=dedent("""{ | |
| "warmup_ratio": 0.1, | |
| "num_train_epochs": 1 | |
| }""").replace("'", '"').strip('"') | |
| ) | |
| sft_training_args = Parameter( | |
| "sft_training_args", | |
| help="dict with `TrainingArguments` params "+\ | |
| "for the SFT job.", | |
| type=JSONType, | |
| default=dedent("""{ | |
| "warmup_ratio": 0.1, | |
| "num_train_epochs": 1 | |
| }""").replace("'", '"').strip('"') | |
| ) | |
| model_repo_id = Parameter( | |
| "model_repo_id", | |
| type=str, | |
| default="retrain-pipelines/function_caller", | |
| help="The 'repo_id' to be used " + \ | |
| "for the Hugging Face model version push " + \ | |
| "(will be created at runtime" + \ | |
| " if doesn't already exist)." | |
| ) | |
| default_pipeline_card_module_dir = \ | |
| os.path.dirname( | |
| importlib.util.find_spec( | |
| f"retrain_pipelines.pipeline_card."+ | |
| f"{RETRAIN_PIPELINE_TYPE}" | |
| ).origin) | |
| pipeline_card_artifacts_path = Parameter( | |
| "pipeline_card_artifacts_path", | |
| type=str, | |
| default=default_pipeline_card_module_dir, | |
| help="pipeline_card artifacts location "+\ | |
| "(i.e. dir hosting your optional " + \ | |
| " custom documentation files :" + \ | |
| " 'pipeline_card.py' and/or 'template.html'"+\ | |
| " and/or 'model_readme.py'"+\ | |
| " and/or 'model_readme_template.md'," +\ | |
| " and/or 'dataset_readme.py'"+\ | |
| " and/or 'dataset_readme_template.md' file), " +\ | |
| "if different from default." | |
| ) | |
| def copy_default_dataset_readme_module( | |
| target_dir: str, | |
| exists_ok: bool = False | |
| ) -> None: | |
| os.makedirs(target_dir, exist_ok=True) | |
| if ( | |
| not exists_ok and | |
| os.path.exists(os.path.join(target_dir, "dataset_readme.py")) | |
| ): | |
| print("File already exists. Skipping copy.") | |
| else: | |
| filefullname = os.path.join( | |
| UnslothFuncCallFlow.default_pipeline_card_module_dir, | |
| "dataset_readme.py" | |
| ) | |
| shutil.copy(filefullname, target_dir) | |
| print(filefullname) | |
| def copy_default_dataset_readme_template( | |
| target_dir: str, | |
| exists_ok: bool = False | |
| ) -> None: | |
| os.makedirs(target_dir, exist_ok=True) | |
| if ( | |
| not exists_ok and | |
| os.path.exists(os.path.join(target_dir, | |
| "dataset_readme_template.md")) | |
| ): | |
| print("File already exists. Skipping copy.") | |
| else: | |
| filefullname = os.path.join( | |
| UnslothFuncCallFlow.default_pipeline_card_module_dir, | |
| "dataset_readme_template.md") | |
| shutil.copy(filefullname, target_dir) | |
| print(filefullname) | |
| def copy_default_model_readme_module( | |
| target_dir: str, | |
| exists_ok: bool = False | |
| ) -> None: | |
| os.makedirs(target_dir, exist_ok=True) | |
| if ( | |
| not exists_ok and | |
| os.path.exists(os.path.join(target_dir, "model_readme.py")) | |
| ): | |
| print("File already exists. Skipping copy.") | |
| else: | |
| filefullname = os.path.join( | |
| UnslothFuncCallFlow.default_pipeline_card_module_dir, | |
| "model_readme.py" | |
| ) | |
| shutil.copy(filefullname, target_dir) | |
| print(filefullname) | |
| def copy_default_model_readme_template( | |
| target_dir: str, | |
| exists_ok: bool = False | |
| ) -> None: | |
| os.makedirs(target_dir, exist_ok=True) | |
| if ( | |
| not exists_ok and | |
| os.path.exists(os.path.join(target_dir, | |
| "model_readme_template.md")) | |
| ): | |
| print("File already exists. Skipping copy.") | |
| else: | |
| filefullname = os.path.join( | |
| UnslothFuncCallFlow.default_pipeline_card_module_dir, | |
| "model_readme_template.md") | |
| shutil.copy(filefullname, target_dir) | |
| print(filefullname) | |
| def copy_default_pipeline_card_module( | |
| target_dir: str, | |
| exists_ok: bool = False | |
| ) -> None: | |
| os.makedirs(target_dir, exist_ok=True) | |
| if ( | |
| not exists_ok and | |
| os.path.exists(os.path.join(target_dir, "pipeline_card.py")) | |
| ): | |
| print("File already exists. Skipping copy.") | |
| else: | |
| filefullname = os.path.join( | |
| UnslothFuncCallFlow.default_pipeline_card_module_dir, | |
| "pipeline_card.py" | |
| ) | |
| shutil.copy(filefullname, target_dir) | |
| print(filefullname) | |
| def copy_default_pipeline_card_html_template( | |
| target_dir: str, | |
| exists_ok: bool = False | |
| ) -> None: | |
| os.makedirs(target_dir, exist_ok=True) | |
| if ( | |
| not exists_ok and | |
| os.path.exists(os.path.join(target_dir, "template.html")) | |
| ): | |
| print("File already exists. Skipping copy.") | |
| else: | |
| filefullname = os.path.join( | |
| UnslothFuncCallFlow.default_pipeline_card_module_dir, | |
| "template.html") | |
| shutil.copy(filefullname, target_dir) | |
| print(filefullname) | |
| del RETRAIN_PIPELINE_TYPE | |
| #--------------------------------------------------------------------------- | |
| def start(self): | |
| print(f"{current.flow_name} - {current.run_id}") | |
| # GPU availability | |
| print(torch.cuda.get_device_name(0)) | |
| print(torch.__version__) | |
| self.engine = "gpu" if torch.cuda.is_available() else "cpu" | |
| # hf_dataset | |
| hf_dataset_dict = \ | |
| get_lazy_df( | |
| repo_id=self.hf_dataset["repo_id"], | |
| commit_hash=self.hf_dataset["commit_hash"], | |
| files_filter=( | |
| self.hf_dataset['config_name']+"/.*\\.parquet" | |
| if ( | |
| self.hf_dataset["config_name"] and | |
| "" < self.hf_dataset["config_name"] | |
| ) else ".*\\.parquet" | |
| ), | |
| hf_token=os.getenv("HF_TOKEN", None) | |
| ) | |
| try: | |
| print(hf_dataset_dict["repo_id"], ", ", | |
| hf_dataset_dict["commit_hash"], " - ", | |
| hf_dataset_dict["commit_datetime"], "\n", | |
| hf_dataset_dict["lazy_df"].explain()) | |
| except ComputeError as ex: | |
| if "HF_TOKEN" not in os.environ: | |
| print("Does the Hugging Face-hosted dataset " + | |
| "require authentication ?", | |
| file=sys.stderr, flush=True) | |
| raise ex | |
| self.hf_dataset_dict = hf_dataset_dict | |
| # hf_enrich_dataset | |
| print(self.hf_enrich_dataset) | |
| hf_enrich_dataset_dict = \ | |
| get_lazy_df( | |
| repo_id=self.hf_enrich_dataset["repo_id"], | |
| commit_hash=self.hf_enrich_dataset["commit_hash"], | |
| files_filter=( | |
| self.hf_enrich_dataset['config_name']+"/.*\\.parquet" | |
| if ( | |
| self.hf_enrich_dataset["config_name"] and | |
| "" < self.hf_enrich_dataset["config_name"] | |
| ) else ".*\\.parquet" | |
| ), | |
| hf_token=os.getenv("HF_TOKEN", None) | |
| ) | |
| print(' ; '.join(f"{k}: {hf_enrich_dataset_dict[k]}" | |
| for k in ['commit_hash', | |
| 'commit_datetime'])) | |
| self.hf_enrich_dataset_dict = hf_enrich_dataset_dict | |
| # hf_base_model | |
| hf_base_model_commits = list_repo_commits( | |
| repo_id=self.hf_base_model["repo_id"], | |
| revision=( | |
| None if (rev_commit_hash:=self.hf_base_model["commit_hash"]) == "" | |
| else rev_commit_hash | |
| ), | |
| repo_type="model", | |
| token=os.getenv("HF_TOKEN", None)) | |
| self.hf_base_model_dict = { | |
| "repo_id": self.hf_base_model["repo_id"], | |
| "commit_hash": hf_base_model_commits[0].commit_id, | |
| "commit_datetime": \ | |
| hf_base_model_commits[0].created_at | |
| } | |
| self.model_version_blessed = False | |
| self.current_blessed_run = None | |
| self.current_blessed_version_dict = None | |
| current.run.remove_tag("model_version_blessed") | |
| self.retrain_pipelines = f"retrain-pipelines {__version__}" | |
| self.retrain_pipeline_type = os.environ["retrain_pipeline_type"] | |
| self.serving_artifacts_local_folder = \ | |
| os.path.realpath(os.path.join( | |
| os.path.dirname(__file__), | |
| '..', '..', 'serving_artifacts', | |
| os.path.sep.join(current.run.path_components) | |
| )) | |
| if not os.path.exists(self.serving_artifacts_local_folder): | |
| os.makedirs(self.serving_artifacts_local_folder) | |
| self.unsloth_dir = os.path.join( | |
| self.serving_artifacts_local_folder, | |
| "Unsloth" | |
| ) | |
| print(f"unsloth_dir : {self.unsloth_dir}") | |
| self.cpt_model_dir = os.path.join( | |
| self.unsloth_dir, "cpt_model") | |
| self.sft_model_dir = os.path.join( | |
| self.unsloth_dir, "sft_model") | |
| self.next(self.eda) | |
| def eda(self): | |
| """ | |
| exploratory data analysis. | |
| """ | |
| ############################ | |
| # features and label # | |
| # basic counts # | |
| ############################ | |
| self.records_count = self.hf_dataset_dict["lazy_df"] \ | |
| .select(pl.len()).collect(engine=self.engine).item() | |
| self.data_schema = get_column_info( | |
| self.hf_dataset_dict["lazy_df"], engine=self.engine) | |
| ############################ | |
| ############################ | |
| # Answers # | |
| # tools count # | |
| ############################ | |
| struct_schema = pl.Struct([ | |
| pl.Field("name", | |
| pl.String | |
| ), | |
| pl.Field("arguments", | |
| pl.List(pl.String) # we retrieve list of args names | |
| # (without assigned values) | |
| ) | |
| ]) | |
| tool_answer_occurrences_df = \ | |
| count_tool_occurrences( | |
| self.hf_dataset_dict["lazy_df"], | |
| self.hf_dataset["attributes"]["answers_attr"], | |
| struct_schema) \ | |
| .collect(engine=self.engine) | |
| print(f"{tool_answer_occurrences_df['occurrences'].sum():,} " + | |
| f"query/tool-calls pairs") | |
| fig = plot_tools_occurences(tool_answer_occurrences_df, | |
| title_prefix="Dataset answers - ") | |
| self.answers_tools_count_fig = fig | |
| ############################ | |
| ############################ | |
| # Query # | |
| # words count # | |
| ############################ | |
| queries_max_length = self.hf_dataset_dict["lazy_df"].select( | |
| pl.col( | |
| self.hf_dataset["attributes"]["query_attr"] | |
| ).str.len_chars().max().alias("max_query_length") | |
| ).collect(engine=self.engine) | |
| print(f"longuest query counts " + | |
| f"{queries_max_length['max_query_length'][0]:,} characters") | |
| # queries length quartiles | |
| self.query_words_stats = \ | |
| column_words_stats( | |
| self.hf_dataset_dict["lazy_df"], | |
| self.hf_dataset["attributes"]["query_attr"] | |
| ).collect(engine=self.engine) | |
| print(self.query_words_stats.to_pandas().to_string(index=False)) | |
| print("Two thirds of the records have a query with less than " + | |
| f"{self.query_words_stats['q3'][0]} words.") | |
| fig = plot_words_count( | |
| self.hf_dataset_dict["lazy_df"], | |
| column_name=self.hf_dataset["attributes"]["query_attr"], | |
| engine=self.engine) | |
| self.words_count_fig = fig | |
| ############################ | |
| ############################ | |
| # hf_enrich_dataset # | |
| # Query words count # | |
| ############################ | |
| enrich_question_words_stats = \ | |
| column_words_stats( | |
| self.hf_enrich_dataset_dict['lazy_df'], | |
| self.hf_enrich_dataset["query_attribute"], | |
| column_attr_handler=eval( | |
| self.hf_enrich_dataset["query_attribute_handler"]) | |
| ).collect(engine=self.engine) | |
| print(enrich_question_words_stats.to_pandas() | |
| .to_string(index=False)) | |
| del enrich_question_words_stats | |
| ############################ | |
| self.next(self.augment_data) | |
| def augment_data(self): | |
| """ | |
| Add 'negative' examples, where | |
| queries do not trigger any tool call. | |
| To achieve that, we sample long user queries, | |
| truncate at half words count, and | |
| associate this to an empty list of tool-calls. | |
| """ | |
| """ | |
| We only consider : | |
| - records with longuest queries, | |
| i.e. queries in the last quartile | |
| of "queries with most word-counts" | |
| (this is to avoid that 'truncated' queries | |
| get really short) | |
| - records with answers consisting | |
| in a single tool-call | |
| (in order to minimize the risk | |
| that truncating actually gives | |
| a valid answer with | |
| one tool-call [or more]) | |
| Note on flow 'augmentation_rate' : | |
| we add that many records (at most), | |
| as quartiles size permits. | |
| """ | |
| print("Sampling within the population with more than " + | |
| str(self.query_words_stats['q3'][0]) + | |
| " words (longest queries quartile) =>") | |
| samples_count = \ | |
| int(self.records_count * self.augmentation_rate) | |
| print(f"would represent {samples_count:,.0f} " + | |
| f"records to be sampled") | |
| eligible_records_df = \ | |
| self.hf_dataset_dict["lazy_df"].filter( | |
| pl.col( | |
| self.hf_dataset["attributes"]["query_attr"] | |
| ) | |
| .str.extract_all(r"\w+") | |
| .map_elements( | |
| lambda arr: len(arr), | |
| return_dtype=pl.Int16) | |
| .gt(self.query_words_stats['q3'][0]) | |
| & pl.col("answers") | |
| .map_elements( | |
| lambda x: len(json.loads(x)) == 1 | |
| if isinstance(x, str) | |
| else False, | |
| return_dtype=pl.Boolean) | |
| ) \ | |
| .collect(engine=self.engine) | |
| eligible_records_count = \ | |
| eligible_records_df.select(pl.len())["len"][0] | |
| print(f"eligible_records_count : " + | |
| f"{eligible_records_count:,.0f}") | |
| samples_count = min(samples_count, eligible_records_count) | |
| self.actual_augmentation_rate = \ | |
| samples_count / self.records_count | |
| print("actual augmentation rate : " + | |
| f"{self.actual_augmentation_rate:.1%}") | |
| sampled_records_df = eligible_records_df.sample( | |
| n=samples_count | |
| ) | |
| self.augmented_records_df = \ | |
| sampled_records_df.with_columns( | |
| pl.col("query") | |
| .map_elements( | |
| lambda query: | |
| " ".join( | |
| query.split()[ | |
| :len(query.split()) // 2]), | |
| return_dtype=pl.Utf8) | |
| .alias("truncated_query") | |
| ).select([ | |
| pl.col("truncated_query").alias("query"), | |
| pl.lit("[]").alias("answers") | |
| ]) | |
| print(self.augmented_records_df.height, | |
| self.augmented_records_df.columns) | |
| self.next(self.enrich_data) | |
| def enrich_data(self): | |
| """ | |
| Further enrich our dataset with 'negative' records from | |
| another dataset (can be general-purpose text dataset) | |
| as specified by the the flow 'hf_enrich_dataset' argument. | |
| """ | |
| """ | |
| Note : we here use the Hugging Face `datasets` library | |
| in 'streaming' mode for records sampling. | |
| """ | |
| hf_enrich_ds = load_dataset( | |
| path=self.hf_enrich_dataset["repo_id"], | |
| name=self.hf_enrich_dataset["config_name"], | |
| revision=self.hf_enrich_dataset_dict["commit_hash"], | |
| streaming=True) | |
| print(hf_enrich_ds["train"]) | |
| samples_count = \ | |
| int(self.records_count * self.enrichment_rate) | |
| print(f"Samplig {samples_count:,.0f} records") | |
| query_attribute_handler = \ | |
| eval(self.hf_enrich_dataset["query_attribute_handler"]) | |
| samples_iterator = iterable_dataset_multi_buffer_sampler( | |
| hf_enrich_ds["train"], | |
| total_samples=samples_count, | |
| attributes_selector=\ | |
| (lambda x:query_attribute_handler( | |
| x[self.hf_enrich_dataset["query_attribute"]])), | |
| buffer_size=3_000, | |
| num_passes=3, | |
| seed=None | |
| ) | |
| # Capitalize and add end punctuation if missing | |
| start_time = time.time() | |
| print("Starting sample enriching records, " + | |
| "this may take some time if the source dataset " + | |
| "has a complex structure..") | |
| samples_list = [ | |
| s.capitalize() + ("" if s[-1] in ".!?" else "?") | |
| for s in samples_iterator] | |
| elapsed_time = time.time() - start_time | |
| print(f".. sampling completed " + | |
| f"({int(elapsed_time // 3_600)}h:" + | |
| f"{int((elapsed_time % 3_600) // 60)}m:" + | |
| f"{int(elapsed_time % 60)}s).") | |
| enriched_records_df = pl.DataFrame( | |
| {"query": samples_list, | |
| "answers": \ | |
| ["[]"] * \ | |
| len(samples_list)} | |
| ) | |
| self.enriched_records_df = enriched_records_df | |
| self.next(self.dataset_to_hub) | |
| def dataset_to_hub(self): | |
| """ | |
| Push to hub dataset version | |
| - continued pre-training dataset | |
| - training and validation splits of the | |
| augmented and enriched | |
| supervised finetuning dataset | |
| - readme with versioning info | |
| """ | |
| ############################# | |
| # case of user-provided # | |
| # documentation artifact(s) # | |
| ############################# | |
| # note that user can provide either | |
| # 'pipeline_card.py' or 'template.html' | |
| # or 'dataset_readme.py' | |
| # or 'dataset_readme_template.md' | |
| # or 'model_readme.py' | |
| # or 'model_readme_template.md' | |
| # or any combination of those | |
| # when specifying custom | |
| # 'pipeline_card_artifacts_path' | |
| if ( | |
| "dataset_readme_template.md" in | |
| os.listdir(self.pipeline_card_artifacts_path) | |
| ): | |
| template_dir = self.pipeline_card_artifacts_path | |
| else: | |
| template_dir = os.path.dirname( | |
| importlib.util.find_spec( | |
| f"retrain_pipelines.pipeline_card."+ | |
| f"{os.getenv('retrain_pipeline_type')}" | |
| ).origin) | |
| print(f"template_dir : '{template_dir}'") | |
| ############################# | |
| if "dataset_readme.py" in os.listdir( | |
| self.pipeline_card_artifacts_path): | |
| from retrain_pipelines.utils import \ | |
| get_get_dataset_readme_content | |
| get_dataset_readme_content = \ | |
| get_get_dataset_readme_content( | |
| self.pipeline_card_artifacts_path) | |
| else: | |
| from retrain_pipelines.pipeline_card import \ | |
| get_dataset_readme_content | |
| ############################# | |
| ############################# | |
| # augmented & enriched # | |
| # finetuning dataset # | |
| ############################# | |
| merged_df = pl.concat([ | |
| # dataset | |
| self.hf_dataset_dict["lazy_df"].select([ | |
| self.hf_dataset["attributes"]["query_attr"], | |
| self.hf_dataset["attributes"]["answers_attr"] | |
| ]).collect(engine=self.engine), | |
| # truncated queries augmentation | |
| self.augmented_records_df, | |
| # enriching dataset | |
| self.enriched_records_df | |
| ]).sample( | |
| # shuffling | |
| fraction=1, | |
| shuffle=True, | |
| with_replacement=False | |
| ) | |
| merged_df = merged_df.sample(fraction=1, shuffle=True) | |
| merged_df.rechunk() | |
| print(("merged_df", f"{merged_df.shape[0]:,.0F}", | |
| merged_df.columns)) | |
| pandas_df = merged_df.to_pandas() | |
| train_size = int(0.8 * len(pandas_df)) | |
| print(f"validation : {len(pandas_df) - train_size}") | |
| sft_dataset = DatasetDict({ | |
| "train": Dataset.from_pandas(pandas_df[:train_size]), | |
| "validation": Dataset.from_pandas(pandas_df[train_size:]) | |
| }) | |
| ############################# | |
| ############################# | |
| # continued pre-training # | |
| # dataset # | |
| ############################# | |
| struct_schema = pl.Struct([ | |
| pl.Field("name", pl.String), | |
| pl.Field("description", pl.String), | |
| pl.Field( | |
| "parameters", | |
| pl.String # Use String to allow | |
| # for varying structures | |
| # (different tools indeed having | |
| # different sets of parameters | |
| # i.e. different parameters counts, | |
| # datatypes and names) | |
| # so parsing must be tolerant. | |
| ) | |
| ]) | |
| unique_tools_df = get_unique_tools( | |
| self.hf_dataset_dict["lazy_df"], | |
| tools_attr_name=\ | |
| self.hf_dataset["attributes"]["tools_attr"], | |
| struct_schema=struct_schema | |
| ).collect(engine=self.engine) | |
| unique_tools_arrow_table = unique_tools_df.to_arrow() | |
| self.unique_tools_dataset = \ | |
| Dataset(unique_tools_arrow_table) | |
| print(self.unique_tools_dataset) | |
| ############################# | |
| ############################# | |
| # DatasetDict # | |
| # with multiple tables # | |
| ############################# | |
| dataset_dict = DatasetDict({ | |
| "continued_pre_training": \ | |
| self.unique_tools_dataset, | |
| "supervised_finetuning": sft_dataset | |
| }) | |
| print(dataset_dict, flush=True) | |
| ############################# | |
| ############################# | |
| # dataset README # | |
| # from template # | |
| ############################# | |
| commit_datetime = datetime.utcnow() | |
| new_dataset_version_label = get_new_repo_minor_version( | |
| repo_id=self.dataset_repo_id, | |
| repo_type="dataset", | |
| hf_token=os.getenv("HF_TOKEN", None)) | |
| readme_content = get_dataset_readme_content( | |
| template_folder=template_dir, | |
| hf_dataset_dict=self.hf_dataset_dict, | |
| hf_enrich_dataset_dict=self.hf_enrich_dataset_dict, | |
| dataset_dict=dataset_dict, | |
| augmentation_rate=self.actual_augmentation_rate, | |
| enrichment_rate=self.enrichment_rate, | |
| version_label=new_dataset_version_label, | |
| commit_datetime=commit_datetime, | |
| mf_flow_name=current.flow_name, | |
| mf_run_id=current.run.id, | |
| engine=self.engine | |
| ) | |
| ############################# | |
| dataset_commit_hash = push_dataset_version_to_hub( | |
| repo_id=self.dataset_repo_id, | |
| version_label=new_dataset_version_label, | |
| timestamp_str=commit_datetime.strftime( | |
| "%Y-%m-%d %H:%M:%S UTC"), | |
| dataset_dict=dataset_dict, | |
| dataset_readme_content=readme_content, | |
| hf_token=os.getenv("HF_TOKEN", None) | |
| ) | |
| if not dataset_commit_hash: | |
| raise Exception( | |
| "Failed to publish dataset version.") | |
| print(f"https://huggingface.co/datasets/{self.dataset_repo_id}" + | |
| f"/blob/{dataset_commit_hash}/README.md") | |
| self.dataset_commit_dict = { | |
| "repo_id": self.dataset_repo_id, | |
| "commit_hash": dataset_commit_hash, | |
| "version_label": new_dataset_version_label, | |
| "commit_datetime": commit_datetime, | |
| } | |
| self.next(self.continued_pre_training) | |
| def continued_pre_training(self): | |
| """ | |
| Gives the base model some additional intrinsic knowkledge | |
| through continued pre-training. | |
| See unsloth.ai/blog/contpretraining | |
| """ | |
| from retrain_pipelines.model.hf_utils import \ | |
| plot_log_history | |
| ####################################### | |
| # base-model and associated tokenizer # | |
| # from Hub (or local cache) # | |
| ####################################### | |
| self.max_seq_length = 2048 | |
| model, tokenizer = FastLanguageModel.from_pretrained( | |
| model_name=self.hf_base_model_dict["repo_id"], | |
| revision=self.hf_base_model_dict["commit_hash"], | |
| max_seq_length=self.max_seq_length, | |
| dtype=None, | |
| load_in_4bit=False, | |
| # case of a gated or private base-model | |
| token=os.getenv("HF_TOKEN", None) | |
| ) | |
| ####################################### | |
| ####################################### | |
| # dataset prompt_template mapping # | |
| ####################################### | |
| tools_dataset = DatasetDict( | |
| {"train": self.unique_tools_dataset}) | |
| print(tools_dataset) | |
| tool_prompt_template = "tool: {}" | |
| def formatting_prompts_func(tools_batch): | |
| tools_batch = tools_batch["tool"] | |
| outputs = [] | |
| for tool in tools_batch: | |
| # Must add EOS_TOKEN, | |
| # otherwise generation will go on forever! | |
| text = tool_prompt_template.format(tool) + \ | |
| tokenizer.eos_token | |
| outputs.append(text) | |
| return { "tools" : outputs, } | |
| cpt_dataset = tools_dataset["train"].map( | |
| formatting_prompts_func, batched=True,) | |
| ####################################### | |
| ####################################### | |
| # PEFT adapter # | |
| # for continued pre-training # | |
| ####################################### | |
| model = FastLanguageModel.get_peft_model( | |
| model, | |
| r = 128, # any number >0 ; 8, 16, 32, 64, 128, 256 | |
| target_modules = ["q_proj", "k_proj", "v_proj", "o_proj", | |
| "gate_proj", "up_proj", "down_proj", | |
| # Add for continued pretraining | |
| "embed_tokens", "lm_head",], | |
| lora_alpha = 32, | |
| lora_dropout = 0, # Supports any, 0 is optimized | |
| bias = "none", # Supports any, "none" is optimized | |
| # True or "unsloth" for very long context | |
| use_gradient_checkpointing = "unsloth", | |
| use_rslora = True, # rank-stabilized LoRA | |
| loftq_config = None, # LoftQ | |
| #random_state = 3407, | |
| ) | |
| ####################################### | |
| ####################################### | |
| # cpt_trainer # | |
| ####################################### | |
| if ( | |
| "records_cap" in self.cpt_training_args and | |
| self.cpt_training_args["records_cap"] is not None and | |
| isinstance(self.cpt_training_args["records_cap"], int) | |
| ): | |
| cpt_dataset = cpt_dataset.take( | |
| self.cpt_training_args["records_cap"]) | |
| print(f"cpt_dataset : {cpt_dataset}") | |
| train_args = UnslothTrainingArguments( | |
| # https://huggingface.co/docs/transformers/main_classes/trainer#transformers.TrainingArguments.save_strategy | |
| per_device_train_batch_size=2, | |
| gradient_accumulation_steps=8, | |
| **{k: v for k, v in self.cpt_training_args.items() | |
| if k != "records_cap"}, | |
| # 2 to 10x smaller learning rate | |
| # for the embedding matrices | |
| learning_rate=5e-5, | |
| embedding_learning_rate=1e-5, | |
| fp16=not is_bfloat16_supported(), | |
| bf16=is_bfloat16_supported(), | |
| logging_steps=1, | |
| optim="adamw_8bit", | |
| weight_decay=0.01, | |
| lr_scheduler_type="linear", | |
| #seed=3407, | |
| output_dir=os.path.join( | |
| self.unsloth_dir, "outputs", "cpt"), | |
| save_total_limit = 2, | |
| report_to="tensorboard", | |
| logging_dir=os.path.join( | |
| self.sft_model_dir, | |
| "runs", "cpt") | |
| ) | |
| self.cpt_traces_file_fullname = os.path.join( | |
| self.unsloth_dir, "cpt_trainer_traces.txt") | |
| print("Training started. " + | |
| f"Check {self.cpt_traces_file_fullname} for live traces.", | |
| flush=True) | |
| trainer = UnslothTrainer( | |
| model=model, tokenizer=tokenizer, | |
| train_dataset=cpt_dataset, | |
| dataset_text_field="tools", | |
| max_seq_length=self.max_seq_length, | |
| dataset_num_proc=2, | |
| args=train_args, | |
| ) | |
| ####################################### | |
| ####################################### | |
| # Show current memory stats # | |
| ####################################### | |
| torch.cuda.ipc_collect() | |
| torch.cuda.empty_cache() | |
| _ = gc.collect() | |
| gpu_stats = torch.cuda.get_device_properties(0) | |
| self.start_gpu_memory = \ | |
| round(torch.cuda.max_memory_reserved() | |
| / 1024 / 1024 / 1024, 3) | |
| self.max_memory = \ | |
| round(gpu_stats.total_memory | |
| / 1024 / 1024 / 1024, 3) | |
| print(f"GPU = {gpu_stats.name}. " + | |
| f"Max memory = {self.max_memory} GB.") | |
| print(f"{self.start_gpu_memory} GB of memory reserved.") | |
| ####################################### | |
| with open(self.cpt_traces_file_fullname, 'w') as f: | |
| with redirect_stdout(f): | |
| hf_logging.set_verbosity_error() | |
| hf_logging.disable_progress_bar() | |
| trainer_stats = trainer.train() | |
| hf_logging.set_verbosity_info() | |
| hf_logging.enable_progress_bar() | |
| print(f"{trainer_stats.metrics['train_runtime']} " + | |
| f"seconds used for training " + | |
| f"({round(trainer_stats.metrics['train_runtime']/60, 2)}" + | |
| f" minutes).") | |
| self.cpt_log_history = trainer.state.log_history | |
| # print(self.cpt_log_history) | |
| self.cpt_log_history_fig = \ | |
| plot_log_history( | |
| self.cpt_log_history, | |
| title="Continued pretraining loss" | |
| ) | |
| model.save_pretrained_merged( | |
| save_directory=self.cpt_model_dir, | |
| tokenizer=tokenizer, | |
| save_method="lora" | |
| ) | |
| print(f"cpt_model_dir : {self.cpt_model_dir}\n") | |
| self.next(self.supervised_finetuning) | |
| def supervised_finetuning(self): | |
| """ | |
| Trains the model on tool-calling | |
| task specialization. | |
| """ | |
| from retrain_pipelines.model.hf_utils import \ | |
| plot_log_history | |
| torch.cuda.ipc_collect() | |
| torch.cuda.empty_cache() | |
| _ = gc.collect() | |
| model, tokenizer = FastLanguageModel.from_pretrained( | |
| model_name=self.cpt_model_dir, | |
| max_seq_length=self.max_seq_length, | |
| dtype=None, | |
| load_in_4bit=False, | |
| ) | |
| # !!!! bug fix BEGIN !!!! | |
| # otherwise, 'embed_tokens' and 'lm_head' | |
| # trained during CPT are "ignored", | |
| # i.e. not saved after SFT | |
| # (note that, alternatively, we could also | |
| # do this fix after sft-training and | |
| # just before saving ; | |
| # which would be equivalent to | |
| # freezing embeddings during finetuning | |
| # for better pretrained knowledge retention) | |
| # @see https://www.reddit.com/r/unsloth/comments/1dtzcd6/fastlanguagemodelpatch_peft_model_changing/ | |
| model.model.model.embed_tokens.modules_to_save.default.to( | |
| device="cuda:0", | |
| dtype=torch.float32, | |
| non_blocking=True) | |
| model.model.model.embed_tokens.modules_to_save.default \ | |
| .requires_grad_(True) | |
| model.model.lm_head.modules_to_save.default.to( | |
| device="cuda:0", | |
| dtype=torch.float32, | |
| non_blocking=True) | |
| model.model.lm_head.modules_to_save.default \ | |
| .requires_grad_(True) | |
| # !!!! bug fix END !!!! | |
| ####################################### | |
| # dataset prompt_template mapping # | |
| ####################################### | |
| # download from Hub (or get from local cache) | |
| queries_dataset = load_dataset( | |
| path=self.dataset_commit_dict["repo_id"], | |
| name="supervised_finetuning", | |
| revision=self.dataset_commit_dict["commit_hash"], | |
| token=os.getenv("HF_TOKEN", None)) | |
| print(f"HF_DATASETS_CACHE : {HF_DATASETS_CACHE}") # HF_CACHE_HOME | |
| self.sft_prompt_template = dedent(""" | |
| You specialize in generating tool calls. Given a query, your task is to return a list of tool calls based on your knowledge of known tools. | |
| Rules: | |
| 1. You can only use tools you know. Do not create new tools under any circumstances. | |
| 2. If a query does not match any known tool, return an empty list ([]). | |
| 3. If information is missing to use a known tool, do not attempt to use it. | |
| 4. Your response must always be a valid JSON array, and nothing else. | |
| Be precise and do not guess. | |
| # query: | |
| {} | |
| # response: | |
| {} | |
| """).strip() | |
| tokenizer.chat_template = self.sft_prompt_template | |
| EOS_TOKEN = tokenizer.eos_token | |
| def formatting_prompts_func(records): | |
| query = records["query"] | |
| tools = records["answers"] | |
| outputs = [] | |
| for query, tools in zip(query, tools): | |
| # Must add EOS_TOKEN, | |
| # otherwise your generation will go on forever | |
| text = self.sft_prompt_template.format(query, tools) \ | |
| + EOS_TOKEN | |
| outputs.append(text) | |
| return { "text" : outputs, } | |
| sft_train_dataset = queries_dataset["train"].map( | |
| formatting_prompts_func, batched=True) | |
| sft_valid_dataset = queries_dataset["validation"].map( | |
| formatting_prompts_func, batched=True,) | |
| ####################################### | |
| ####################################### | |
| # PEFT adapter # | |
| # for supervised finetuning # | |
| ####################################### | |
| # for cases where CPT has been merged into overall model | |
| # otherwize, keep on training current LoRa adapter | |
| # model = FastLanguageModel.get_peft_model( | |
| # model, | |
| # r = 128, # any number >0 ; 8, 16, 32, 64, 128, 256 | |
| # target_modules = ["q_proj", "k_proj", "v_proj", "o_proj", | |
| # "gate_proj", "up_proj", "down_proj"], | |
| # lora_alpha = 32, | |
| # lora_dropout = 0, # Supports any, but = 0 is optimized | |
| # bias = "none", # Supports any, but = "none" is optimized | |
| # # True or "unsloth" for very long context | |
| # use_gradient_checkpointing = "unsloth", | |
| # random_state = 3407, | |
| # use_rslora = True, # rank stabilized LoRA | |
| # loftq_config = None, # LoftQ | |
| # ) | |
| ####################################### | |
| ####################################### | |
| # sft_trainer # | |
| ####################################### | |
| split = sft_train_dataset.train_test_split( | |
| test_size=1000, | |
| #seed=42 | |
| ) | |
| train_dataset = split['train'] | |
| eval_dataset = split['test'] | |
| if ( | |
| "records_cap" in self.sft_training_args and | |
| self.sft_training_args["records_cap"] is not None and | |
| isinstance(self.sft_training_args["records_cap"], int) | |
| ): | |
| train_dataset = train_dataset.take( | |
| self.sft_training_args["records_cap"]) | |
| eval_dataset = eval_dataset.take( | |
| self.sft_training_args["records_cap"]) | |
| print(f"train_dataset : {train_dataset}") | |
| print(f"eval_dataset : {eval_dataset}") | |
| train_args = UnslothTrainingArguments( | |
| per_device_train_batch_size=2, | |
| gradient_accumulation_steps=8, | |
| **{k: v for k, v in self.sft_training_args.items() | |
| if k != "records_cap"}, | |
| per_device_eval_batch_size=2, | |
| eval_steps=200, | |
| eval_strategy="steps", | |
| do_eval=True, | |
| learning_rate=5e-5, | |
| # embedding_learning_rate=1e-5, # Optionally here | |
| fp16=not is_bfloat16_supported(), | |
| bf16=is_bfloat16_supported(), | |
| optim="adamw_8bit", | |
| weight_decay=0.00, | |
| lr_scheduler_type="linear", | |
| #seed=3407, | |
| output_dir=os.path.join( | |
| self.unsloth_dir, "outputs", "sft"), | |
| save_total_limit=2, | |
| logging_steps=1, | |
| report_to="tensorboard", | |
| logging_dir=os.path.join( | |
| self.sft_model_dir, | |
| "runs", "sft") | |
| ) | |
| self.sft_traces_file_fullname = os.path.join( | |
| self.unsloth_dir, "sft_trainer_traces.txt") | |
| print("Training started. " + | |
| f"Check {self.sft_traces_file_fullname} for live traces.", | |
| flush=True) | |
| trainer = UnslothTrainer( | |
| model=model, tokenizer=tokenizer, | |
| train_dataset=train_dataset, | |
| dataset_text_field="text", | |
| eval_dataset=eval_dataset, | |
| max_seq_length=self.max_seq_length, | |
| dataset_num_proc=8, | |
| args=train_args | |
| ) | |
| trainer.can_return_loss = True | |
| ####################################### | |
| ####################################### | |
| # Show current memory stats # | |
| ####################################### | |
| torch.cuda.ipc_collect() | |
| torch.cuda.empty_cache() | |
| _ = gc.collect() | |
| used_memory = \ | |
| round(torch.cuda.max_memory_reserved() | |
| /1024/1024/1024, 3) | |
| used_memory_for_lora = \ | |
| round(used_memory-self.start_gpu_memory, 3) | |
| used_percentage = \ | |
| round(used_memory/self.max_memory*100, 3) | |
| lora_percentage = \ | |
| round(used_memory_for_lora/self.max_memory*100, | |
| 3) | |
| print(f"Peak reserved memory = " + | |
| f"{used_memory} GB.") | |
| print(f"Peak reserved memory for " + | |
| f"training = {used_memory_for_lora} " + | |
| f"GB.") | |
| print(f"Peak reserved memory % of " + | |
| f"max memory = {used_percentage} %.") | |
| print(f"Peak reserved memory for training " + | |
| f"% of max memory = {lora_percentage} %.") | |
| ####################################### | |
| with open(self.sft_traces_file_fullname, 'w') as f: | |
| with redirect_stdout(f): | |
| hf_logging.set_verbosity_error() | |
| hf_logging.disable_progress_bar() | |
| trainer_stats = trainer.train() | |
| hf_logging.set_verbosity_info() | |
| hf_logging.enable_progress_bar() | |
| print(f"{trainer_stats.metrics['train_runtime']} " + | |
| f"seconds used for training " + | |
| f"({round(trainer_stats.metrics['train_runtime']/60, 2)}" + | |
| f" minutes).") | |
| self.sft_log_history = trainer.state.log_history | |
| self.sft_log_history_fig = \ | |
| plot_log_history( | |
| self.sft_log_history, | |
| title="Supervised finetuning loss" | |
| ) | |
| model.save_pretrained_merged( | |
| self.sft_model_dir, tokenizer, | |
| save_method = "lora" | |
| ) | |
| print(f"sft_model_dir : {self.sft_model_dir}\n") | |
| self.next(self.evaluate_model) | |
| def evaluate_model(self): | |
| """ | |
| Batch inference on the SFT validation dataset. | |
| """ | |
| from retrain_pipelines.model import \ | |
| infer_validation, compute_counts_n_metrics, \ | |
| plot_validation_completions | |
| torch.cuda.ipc_collect() | |
| torch.cuda.empty_cache() | |
| _ = gc.collect() | |
| ###################################################### | |
| # loading trained adapter # | |
| ###################################################### | |
| # Unsloth [and hf transformers before it] # | |
| # (if loading both model & tokenizer at once # | |
| # same as we did in prior tasks, but now # | |
| # with tokenizer.chat_template being set # | |
| # in tokenizer.config) is forcing on us some kind of # | |
| # chat_template format hard-requirements. # | |
| ###################################################### | |
| # load base from cache | |
| # (with base tokenizer, which we ignore) | |
| model, _ = FastLanguageModel.from_pretrained( | |
| model_name=self.hf_base_model_dict["repo_id"], | |
| revision=self.hf_base_model_dict["commit_hash"], | |
| max_seq_length=self.max_seq_length, | |
| dtype=None, | |
| load_in_4bit=False, | |
| # case of a gated or private base-model | |
| token=os.getenv("HF_TOKEN", None) | |
| ) | |
| model = FastLanguageModel.for_inference(model) | |
| # load our CPT+SFT trained & locally-saved adapter | |
| model.load_adapter(peft_model_id=self.sft_model_dir) | |
| # Separately load our (potentially trained &) | |
| # locally-saved adapter-tokenizer | |
| # (loading it below via HF and not Unsloth) | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| pretrained_model_name_or_path=self.sft_model_dir | |
| ) | |
| ###################################################### | |
| ###################################################### | |
| # validation dataset # | |
| ###################################################### | |
| # download from Hub (or get from local cache) | |
| queries_dataset = load_dataset( | |
| path=self.dataset_commit_dict["repo_id"], | |
| name="supervised_finetuning", | |
| revision=self.dataset_commit_dict["commit_hash"], | |
| token=os.getenv("HF_TOKEN", None)) | |
| if ( | |
| "records_cap" in self.sft_training_args and | |
| self.sft_training_args["records_cap"] is not None and | |
| isinstance(self.sft_training_args["records_cap"], int) | |
| ): | |
| validation_data = queries_dataset["validation"].take( | |
| self.sft_training_args["records_cap"]) | |
| else: | |
| validation_data = queries_dataset["validation"] | |
| print(validation_data, flush=True) | |
| ###################################################### | |
| self.max_new_tokens = 400 | |
| start_time = time.time() | |
| validation_results = infer_validation( | |
| tokenizer=tokenizer, | |
| model=model, | |
| validation_data=validation_data, | |
| prompt_template=tokenizer.chat_template, | |
| batch_size=32, # 64, | |
| queries_attr_name=\ | |
| self.hf_dataset["attributes"]["query_attr"], | |
| answers_attr_name=\ | |
| self.hf_dataset["attributes"]["answers_attr"], | |
| max_new_tokens=self.max_new_tokens, | |
| device="cuda" | |
| ) | |
| print("infer_validation - Elapsed time: " + | |
| f"{(time.time() - start_time):.2f} seconds") | |
| self.validation_results = validation_results # <= to artifacts store | |
| eval_df = pl.LazyFrame(validation_results) | |
| records = eval_df.with_columns( | |
| (pl.col("answer") == pl.col("completion")) \ | |
| .alias("is_ground_truth_identical") | |
| ).collect() #engine=self.engine) | |
| print("perfect characters-match accuracy : " + | |
| str(records['is_ground_truth_identical'].mean())) | |
| eval_metrics_df = compute_counts_n_metrics( | |
| eval_df, is_format_fault_tolerant=True) | |
| overall_metrics_df = eval_metrics_df.select([ | |
| pl.col("precision").mean(), | |
| pl.col("recall").mean(), | |
| pl.col("f1").mean(), | |
| pl.col("jaccard").mean() | |
| ]).collect() #engine=self.engine) | |
| self.perf_metrics = overall_metrics_df.row(0, named=True) | |
| print(self.perf_metrics) | |
| self.validation_completions_fig = \ | |
| plot_validation_completions( | |
| eval_metrics_df, engine=self.engine) | |
| del model | |
| del tokenizer | |
| torch.cuda.ipc_collect() | |
| torch.cuda.empty_cache() | |
| _ = gc.collect() | |
| self.next(self.model_version_blessing) | |
| def model_version_blessing(self): | |
| """ | |
| Comparing newly-retrained model version | |
| against best-performing predecessor. | |
| """ | |
| """ | |
| Note: for Hugging Face integrated pipelines, | |
| we compare against lastest commit of main branch | |
| of the model repository there. | |
| When it comes to local "mf_run_id" of the pipeline run | |
| having generated that best prior model version | |
| (retrieved from model card metadata from HF yaml section), | |
| we check against records of the herein ML-framework instance, | |
| as "prior best version" of the model here beign retrained | |
| may have been originated from another one | |
| than the one executing the current retraining | |
| (in which case, we simply don't includ a "local" hyperlink | |
| in the model version pipeline_cards that will be | |
| produced later in the herein pipeline run). | |
| """ | |
| from retrain_pipelines.model.hf_utils import \ | |
| current_blessed_model_version_dict | |
| main_perf_metric_name = "jaccard" | |
| current_blessed_version_dict = \ | |
| current_blessed_model_version_dict( | |
| repo_id=self.model_repo_id, | |
| hf_token=os.getenv("HF_TOKEN", None) | |
| ) | |
| print("current_blessed_version_dict : " + | |
| str(current_blessed_version_dict)) | |
| if current_blessed_version_dict is None: | |
| print("case 'no prior blessed model version found" | |
| " => blessing.'") | |
| self.model_version_blessed = True | |
| elif ( | |
| main_perf_metric_name in | |
| current_blessed_version_dict["perf_metrics"] | |
| ): | |
| current_blessed_run_id = \ | |
| current_blessed_version_dict["mf_run_id"] | |
| current_blessed_metric_value = \ | |
| current_blessed_version_dict[ | |
| "perf_metrics"][main_perf_metric_name] | |
| self.model_version_blessed = ( | |
| self.perf_metrics[main_perf_metric_name] >= | |
| current_blessed_metric_value | |
| ) | |
| if not self.model_version_blessed: | |
| self.current_blessed_version_dict = \ | |
| current_blessed_version_dict | |
| for run in Flow(self.__class__.__name__): | |
| if str(run.id) == current_blessed_run_id: | |
| last_run_step = next(iter(run.steps())) | |
| last_task = next(iter(last_run_step.tasks())) | |
| # further filtering on successful runs that are | |
| # retraining of a prior version of the same model | |
| # (to minimize the risk that this was obtained | |
| # on another ML-framework instance) | |
| if ( | |
| last_task.successful and | |
| hasattr(last_task.artifacts, 'model_version_blessed') and | |
| last_task.artifacts.model_version_blessed.data and | |
| hasattr(last_task.artifacts, 'model_repo_id') and | |
| last_task.artifacts.model_repo_id.data == self.model_repo_id | |
| ): | |
| self.current_blessed_run = run | |
| break | |
| if not self.current_blessed_run: | |
| print( | |
| "Couldn't find blessed run " + | |
| f"{current_blessed_run_id} !\n" + | |
| "It seems that prior blessed run was " + | |
| "executed on another ML framework instance.", | |
| file=sys.stderr, flush=True) | |
| print("new : " + | |
| str(self.perf_metrics[main_perf_metric_name]) + | |
| " - previous best : " + | |
| str(current_blessed_metric_value) + | |
| " - model_version_blessing : " + | |
| str(self.model_version_blessed)) | |
| else: | |
| raise Exception( | |
| "Performance metric '" + | |
| main_perf_metric_name + | |
| "' can't be found in eval results " + | |
| "from blessed run " + | |
| str(current_blessed_version_dict[ | |
| "mf_run_id"]) + " !") | |
| # self.model_version_blessed = True ### DEBUG - DELETE ### | |
| self.next(self.model_to_hub) | |
| def model_to_hub(self): | |
| """ | |
| Push to hub model version, including | |
| readme with versioning info. | |
| """ | |
| ############################# | |
| # case of user-provided # | |
| # documentation artifact(s) # | |
| ############################# | |
| # note that user can provide either | |
| # 'pipeline_card.py' or 'template.html' | |
| # or 'dataset_readme.py' | |
| # or 'dataset_readme_template.md' | |
| # or 'model_readme.py' | |
| # or 'model_readme_template.md' | |
| # or any combination of those | |
| # when specifying custom | |
| # 'pipeline_card_artifacts_path' | |
| if ( | |
| "model_readme_template.md" in | |
| os.listdir(self.pipeline_card_artifacts_path) | |
| ): | |
| template_dir = self.pipeline_card_artifacts_path | |
| else: | |
| template_dir = os.path.dirname( | |
| importlib.util.find_spec( | |
| f"retrain_pipelines.pipeline_card."+ | |
| f"{os.getenv('retrain_pipeline_type')}" | |
| ).origin) | |
| print(f"template_dir : '{template_dir}'") | |
| ############################# | |
| if "model_readme.py" in os.listdir( | |
| self.pipeline_card_artifacts_path): | |
| from retrain_pipelines.utils import \ | |
| get_get_model_readme_content | |
| get_model_readme_content = \ | |
| get_get_model_readme_content( | |
| self.pipeline_card_artifacts_path) | |
| else: | |
| from retrain_pipelines.pipeline_card import \ | |
| get_model_readme_content | |
| ############################# | |
| from retrain_pipelines.model.hf_utils import \ | |
| push_model_version_to_hub | |
| ############################# | |
| # model README # | |
| # from template # | |
| ############################# | |
| commit_datetime = datetime.utcnow() | |
| new_model_version_label = get_new_repo_minor_version( | |
| repo_id=self.model_repo_id, | |
| repo_type="model", | |
| hf_token=os.getenv("HF_TOKEN", None)) | |
| readme_content = get_model_readme_content( | |
| template_folder=template_dir, | |
| model_repo_id=self.model_repo_id, | |
| base_model_dict=self.hf_base_model_dict, | |
| training_dataset_dict=self.dataset_commit_dict, | |
| version_label=new_model_version_label, | |
| commit_datetime=commit_datetime, | |
| perf_metrics=self.perf_metrics, | |
| mf_flow_name=current.flow_name, | |
| mf_run_id=current.run.id | |
| ) | |
| ############################# | |
| print("Pushing model version to HF hub " + | |
| ("(blessed). " if self.model_version_blessed | |
| else "(not blessed). ") + | |
| "May take a while..", | |
| flush=True) | |
| model_commit_hash = push_model_version_to_hub( | |
| repo_id=self.model_repo_id, | |
| model_version_blessed=\ | |
| self.model_version_blessed, | |
| version_label=new_model_version_label, | |
| timestamp_str=commit_datetime.strftime( | |
| "%Y-%m-%d %H:%M:%S UTC"), | |
| model_dir=self.sft_model_dir, | |
| model_readme_content=readme_content, | |
| hf_token=os.getenv("HF_TOKEN", None) | |
| ) | |
| if not model_commit_hash: | |
| raise Exception( | |
| "Failed to publish model version.") | |
| print("Push of model version to HF hub completed.", | |
| flush=True) | |
| print(f"https://huggingface.co/{self.model_repo_id}" + | |
| f"/blob/{model_commit_hash}/README.md") | |
| self.model_commit_dict = { | |
| "repo_id": self.model_repo_id, | |
| "commit_hash": model_commit_hash, | |
| "version_label": new_model_version_label, | |
| "commit_datetime": commit_datetime, | |
| } | |
| self.next(self.infra_validator) | |
| def infra_validator(self): | |
| """ | |
| If the trained model version is blessed, | |
| validate serving. | |
| """ | |
| """ | |
| Note that using isolated virtual env | |
| (using @conda task decorator) | |
| is advisable to not embark the whole | |
| pipeline dependencies into the local server. | |
| We don't for educational purpose, | |
| keep things "simple" to grasp | |
| as well as to avoid forcing conda | |
| (for instance miniconda) as | |
| a virtual environment management mean | |
| to the user. | |
| """ | |
| """ | |
| Note : We load base model from HF-cache | |
| (mounted as /huggingface_hub_cache | |
| docker volume) and adapter from local dir | |
| (mounted as /FuncCallAdater docker volume. | |
| """ | |
| self.local_serve_is_ready = LocalServeReadinessEnum.NOT_APPLICABLE | |
| if self.model_version_blessed: | |
| from retrain_pipelines.utils.docker import \ | |
| env_has_docker | |
| if env_has_docker(): | |
| model_module_dir = \ | |
| os.path.dirname( | |
| importlib.util.find_spec( | |
| "retrain_pipelines.model." + | |
| os.getenv('retrain_pipeline_type') | |
| ).origin) | |
| # server & data-model & server-config modules artifacts | |
| files_to_copy = [ | |
| "litserve_server.py", | |
| "litserve_datamodel.py", | |
| "litserve_serverconfig.py", | |
| ".dockerignore" # docker context loading | |
| # at image-build time, | |
| # exclude model weights | |
| ] | |
| for filename in files_to_copy: | |
| shutil.copy( | |
| os.path.join(model_module_dir, "litserve", | |
| filename), | |
| os.path.join(self.serving_artifacts_local_folder, | |
| filename) | |
| ) | |
| # save dependencies as artifact | |
| create_requirements(self.serving_artifacts_local_folder, | |
| exclude=["cudf-polars-.*", "cuda-python", | |
| "nvidia-.*", "(py)?libcudf-.*", | |
| "nvtx", "rmm-.*", "litserve", | |
| ".*retrain-pipelines.*"] | |
| ) | |
| # server config yaml | |
| env = Environment(loader=FileSystemLoader( | |
| os.path.join(model_module_dir, "litserve"))) | |
| template = env.get_template( | |
| "litserve_serverconfig_template.yaml") | |
| server_config_data = { | |
| "port": "8000", | |
| "max_seq_length": self.max_seq_length, | |
| "max_new_token": self.max_new_tokens, | |
| "base_model": { | |
| "repo_id": self.hf_base_model_dict["repo_id"], | |
| "revision": self.hf_base_model_dict["commit_hash"] | |
| }, | |
| "adapters": [ | |
| { | |
| "name": "func_caller", | |
| "path": "/FuncCallAdapter" | |
| } | |
| ] | |
| } | |
| server_config_yaml = template.render(server_config_data) | |
| print(server_config_yaml) | |
| with open(os.path.join( | |
| self.serving_artifacts_local_folder, | |
| "litserve_serverconfig.yaml"), 'w' | |
| ) as output_file: | |
| output_file.write(server_config_yaml) | |
| # Dockerfile | |
| env = Environment(loader=FileSystemLoader( | |
| os.path.join(model_module_dir))) | |
| template = env.get_template( | |
| "Dockerfile.litserve_template") | |
| # Change CUDA version here from available list | |
| # @see https://hub.docker.com/r/nvidia/cuda/tags | |
| dockerfile_content = template.render( | |
| {"cuda_version": "12.0.0"}) | |
| with open(os.path.join( | |
| self.serving_artifacts_local_folder, | |
| "Dockerfile.litserve"), 'w' | |
| ) as output_file: | |
| output_file.write(dockerfile_content) | |
| os.environ["no_proxy"] = "localhost,127.0.0.1,0.0.0.0" | |
| ############################################ | |
| # actually deploy the inference service # | |
| ############################################ | |
| start_time = time.time() | |
| from retrain_pipelines.utils.docker import \ | |
| build_and_run_docker, print_container_log_tail, \ | |
| cleanup_docker | |
| from retrain_pipelines.model.litserve import \ | |
| endpoint_started, endpoint_is_ready | |
| self.port = 8765 | |
| HF_HUB_CACHE = os.path.realpath(os.path.expanduser( | |
| os.getenv( | |
| "HF_HUB_CACHE", | |
| os.path.join(os.getenv("HF_HOME", | |
| "~/.cache/huggingface"), | |
| "hub") | |
| ))) | |
| print(f"HF_HUB_CACHE : {HF_HUB_CACHE}") | |
| image_name = container_name = "litserve-model" | |
| serving_container = build_and_run_docker( | |
| image_name=image_name, image_tag="1.0", | |
| build_path=self.serving_artifacts_local_folder, | |
| dockerfile="Dockerfile.litserve", | |
| ports_publish_dict={'8000/tcp': self.port}, | |
| env_vars_dict={ | |
| "HF_HUB_CACHE": "/huggingface_hub_cache", | |
| "HF_TOKEN": os.getenv("HF_TOKEN") | |
| }, | |
| volumes_dict={ | |
| self.sft_model_dir: | |
| {"bind": "/FuncCallAdapter", | |
| "mode": "ro"}, | |
| HF_HUB_CACHE: | |
| {"bind": "/huggingface_hub_cache", | |
| "mode": "ro"} | |
| } | |
| ) | |
| if not serving_container: | |
| print("failed spinning the LitServe container", | |
| file=sys.stderr) | |
| self.local_serve_is_ready = \ | |
| LocalServeReadinessEnum.FAILURE | |
| try: | |
| cleanup_docker( | |
| container_name=container_name, | |
| image_name=f"{image_name}:1.0", | |
| no_pruning=True # for intermediate layers recycling | |
| # (during later re-runs) | |
| # to avoid long rebuild time | |
| # of exactly the same. | |
| ) | |
| except Exception as cleanup_ex: | |
| # fail silently | |
| pass | |
| else: | |
| print("Awaiting endpoint launch..") | |
| start_time = time.time() | |
| if not endpoint_started( | |
| container_name, port=self.port, timeout=10*60 | |
| ): | |
| print( | |
| f"The endpoint '{container_name}' " + | |
| f"did not start.") | |
| self.local_serve_is_ready = \ | |
| LocalServeReadinessEnum.FAILURE | |
| # health check on the spun-up endpoint | |
| elif endpoint_is_ready(port=self.port): | |
| self.local_serve_is_ready = \ | |
| LocalServeReadinessEnum.SUCCESS | |
| elapsed_time = time.time() - start_time | |
| print("deploy_local - Elapsed time: " + | |
| f"{elapsed_time:.2f} seconds") | |
| ############################################ | |
| else: | |
| # env doesn't have docker | |
| self.local_serve_is_ready = \ | |
| LocalServeReadinessEnum.FAILURE_NO_DOCKER | |
| if LocalServeReadinessEnum.SUCCESS == self.local_serve_is_ready: | |
| from retrain_pipelines.model.litserve.litserve_datamodel \ | |
| import Response | |
| import requests | |
| url = f"http://localhost:{self.port}/predict" | |
| headers = {"accept": "application/x-www-form-urlencoded"} | |
| try: | |
| start_time = time.time() | |
| data = { | |
| "adapter_name": "func_caller", | |
| "queries": '["Hello.", "Is 49 a perfect square?"]' | |
| } | |
| print(f"inference test - data: {data}") | |
| response = requests.post(url, headers=headers, data=data) | |
| parsed_response = Response(**{"output": response.json()}) | |
| elapsed_time = time.time() - start_time | |
| print("parsed_response ('func_caller' adapter ON) :" + | |
| str(parsed_response) + | |
| f"\t-\tElapsed time: {elapsed_time:.2f} seconds") | |
| start_time = time.time() | |
| data = { | |
| "queries": '["Hello.", "Is 49 a perfect square?"]' | |
| } | |
| print(f"inference test - data: {data}") | |
| response = requests.post(url, headers=headers, data=data) | |
| parsed_response = Response(**{"output": response.json()}) | |
| elapsed_time = time.time() - start_time | |
| print(f"parsed_response (no adapter) : {parsed_response}" + | |
| f"\t-\tElapsed time: {elapsed_time:.2f} seconds") | |
| except Exception as ex: | |
| print(ex, file=sys.stderr) | |
| traceback.print_tb(ex.__traceback__, file=sys.stderr) | |
| self.local_serve_is_ready = \ | |
| LocalServeReadinessEnum.FAILURE | |
| pass | |
| try: | |
| cleanup_docker( | |
| container_name=container_name, | |
| image_name=f"{image_name}:1.0", | |
| no_pruning=True # for intermediate layers recycling | |
| # (during later re-runs) | |
| # to avoid long rebuild time | |
| # of exactly the same. | |
| ) | |
| except Exception as cleanup_ex: | |
| # fail silently | |
| pass | |
| self.next(self.pipeline_card) | |
| def pipeline_card(self): | |
| import re | |
| import datetime | |
| import importlib.metadata | |
| ############################# | |
| # case of user-provided # | |
| # documentation artifact(s) # | |
| ############################# | |
| # note that user can provide either | |
| # 'pipeline_card.py' or 'template.html' | |
| # or 'dataset_readme.py' | |
| # or 'dataset_readme_template.md' | |
| # or 'model_readme.py' | |
| # or 'model_readme_template.md' | |
| # or any combination of those | |
| # when specifying custom | |
| # 'pipeline_card_artifacts_path' | |
| if "template.html" in os.listdir( | |
| self.pipeline_card_artifacts_path | |
| ): | |
| template_dir = self.pipeline_card_artifacts_path | |
| else: | |
| template_dir = os.path.dirname( | |
| importlib.util.find_spec( | |
| f"retrain_pipelines.pipeline_card."+ | |
| f"{os.getenv('retrain_pipeline_type')}" | |
| ).origin) | |
| ############################# | |
| if "pipeline_card.py" in os.listdir( | |
| self.pipeline_card_artifacts_path | |
| ): | |
| from retrain_pipelines.utils import get_get_html | |
| get_html = \ | |
| get_get_html(self.pipeline_card_artifacts_path) | |
| else: | |
| from retrain_pipelines.pipeline_card import \ | |
| get_html | |
| from retrain_pipelines.pipeline_card.helpers import \ | |
| mf_dag_svg | |
| ############################# | |
| ############################# | |
| ## "default" card ## | |
| ############################# | |
| self.metadata = { | |
| "name": "TabNet Model", | |
| "version": "1.0", | |
| "retrain_pipelines": f"retrain-pipelines {__version__}", | |
| "retrain_pipeline_type": os.environ["retrain_pipeline_type"], | |
| "description": "A PyTorch TabNet model retrained", | |
| "authors": [current.username], | |
| "tags": ["classification", "tabnet"], | |
| "license": "MIT License", | |
| "data_augmentation": [ | |
| { | |
| "name": "Augmentation", | |
| "description": "Truncating queries and " + \ | |
| "associate those to " + \ | |
| "no tool-call answers. " + \ | |
| "Intent being to instruct on " + \ | |
| "not hallucinating missing " + \ | |
| "tool-calls parameters values." | |
| }, | |
| { | |
| "name": "Enrichment", | |
| "description": "Addition of records " + \ | |
| "from an external data-source. " + \ | |
| "Here to instruct on no tool-call." | |
| } | |
| ], | |
| "references": [ | |
| { | |
| "title": "Base model", | |
| "link": f"https://hf.co/{self.hf_base_model_dict['repo_id']}" | |
| }, | |
| { | |
| "title": "Function-calling dataset", | |
| "link": f"https://hf.co/{self.hf_dataset_dict['repo_id']}" | |
| }, | |
| { | |
| "title": "Data-enrichment dataset", | |
| "link": f"https://hf.co/{self.hf_enrich_dataset_dict['repo_id']}" | |
| }, | |
| { | |
| "title": "Unsloth", | |
| "link": "https://unsloth.ai/blog/contpretraining" | |
| } | |
| ] | |
| } | |
| current.card['default'].append(Markdown( | |
| "model_version_blessed : **%s**" % str(self.model_version_blessed))) | |
| current.card['default'].append(Artifact( | |
| {"model_version_blessed": self.model_version_blessed})) | |
| current.card['default'].append( | |
| Image.from_matplotlib(self.sft_log_history_fig)) | |
| current.card['default'].append( | |
| Image.from_matplotlib(self.validation_completions_fig)) | |
| ############################# | |
| ############################# | |
| ## html "custom" card ## | |
| ############################# | |
| dt = datetime.datetime.now(tz=datetime.timezone.utc) | |
| formatted_dt = dt.strftime("%A %b %d %Y %I:%M:%S %p %Z") | |
| task_obj_python_cmd = f"metaflow.Task(" + \ | |
| f"\"{current.pathspec}\", " + \ | |
| f"attempt={str(current.retry_count)})" | |
| params={ | |
| 'template_dir': template_dir, | |
| 'title': f"{current.flow_name}", | |
| "subtitle": f"(flow run # {len(list(current.run.parent.runs()))}," + \ | |
| f" run_id: {str(current.run.id)} - {formatted_dt})", | |
| 'model_version_blessed': self.model_version_blessed, | |
| # 'current_blessed_run': self.current_blessed_run, | |
| 'current_blessed_model_commit_hash': ( | |
| self.current_blessed_version_dict["commit_hash"] | |
| if self.current_blessed_version_dict | |
| else None | |
| ), | |
| 'LocalServeReadinessEnum': LocalServeReadinessEnum, | |
| 'local_serve_is_ready': self.local_serve_is_ready, | |
| # EDA | |
| 'main_dataset_repo_id': self.hf_dataset['repo_id'], | |
| 'main_dataset_commit_hash': self.hf_dataset_dict['commit_hash'], | |
| 'main_dataset_commit_datetime': \ | |
| self.hf_dataset_dict['commit_datetime'], | |
| 'records_count': self.records_count, | |
| 'data_schema': self.data_schema, | |
| 'answers_tools_count_fig': self.answers_tools_count_fig, | |
| 'words_count_fig': self.words_count_fig, | |
| # model training | |
| 'dataset_repo_id': self.dataset_repo_id, | |
| 'dataset_version_label': self.dataset_commit_dict["version_label"], | |
| 'dataset_commit_datetime': self.dataset_commit_dict["commit_datetime"], | |
| 'dataset_commit_hash': self.dataset_commit_dict["commit_hash"], | |
| 'dataset_augmentation_rate': self.actual_augmentation_rate, | |
| 'dataset_enrichment_rate': self.enrichment_rate, | |
| 'model_repo_id': self.model_repo_id, | |
| 'model_version_label': self.model_commit_dict["version_label"], | |
| 'model_commit_datetime': self.model_commit_dict["commit_datetime"], | |
| 'model_commit_hash': self.model_commit_dict["commit_hash"], | |
| 'cpt_log_history_fig': self.cpt_log_history_fig, | |
| 'sft_log_history_fig': self.sft_log_history_fig, | |
| 'validation_completions_fig': self.validation_completions_fig, | |
| 'pipeline_parameters_dict': {"cpt": self.cpt_training_args, | |
| "sft": self.sft_training_args}, | |
| 'metrics_dict': self.perf_metrics, | |
| 'task_obj_python_cmd': task_obj_python_cmd, | |
| 'dag_svg': mf_dag_svg(self) | |
| } | |
| self.html = get_html(params) | |
| ############################# | |
| current | |
| ############################# | |
| self.next(self.pipeline_to_hub) | |
| def pipeline_to_hub(self): | |
| """ | |
| publish versioned source-code and pipeline-card | |
| for ths run on the Hugging Face Hub. | |
| """ | |
| model_commit_datetime = \ | |
| self.model_commit_dict["commit_datetime"] | |
| timestamp_str = \ | |
| "{:%Y%m%d_%H%M%S}".format(model_commit_datetime) + \ | |
| "{:03d}".format(model_commit_datetime.microsecond//1000) + \ | |
| "_UTC" | |
| subfolder_name = \ | |
| "v" + self.model_commit_dict["version_label"] + \ | |
| "_" + timestamp_str | |
| commit_datetime = datetime.utcnow() | |
| ############################### | |
| # source-code # | |
| ############################### | |
| # We upload only herein file # | |
| # plus user-provided versions # | |
| # of the customizable ones # | |
| # (if any). # | |
| ############################### | |
| custom_source_files = [os.path.abspath(__file__)] | |
| if ( | |
| self.pipeline_card_artifacts_path != \ | |
| self.default_pipeline_card_module_dir | |
| ): | |
| candidate_source_files = [ | |
| "pipeline_card.py", | |
| "template.html", | |
| "dataset_readme.py", | |
| "dataset_readme_template.md", | |
| "model_readme.py", | |
| "model_readme_template.md" | |
| ] | |
| for candidate_source_file in candidate_source_files: | |
| file_fullpath = os.path.join( | |
| self.pipeline_card_artifacts_path, | |
| candidate_source_file) | |
| if os.path.exists(file_fullpath): | |
| custom_source_files.append(file_fullpath) | |
| source_code_commit_hash = \ | |
| push_files_to_hub_repo_branch( | |
| repo_id=self.model_repo_id, | |
| branch_name="retrain-pipelines_source-code", | |
| file_fullnames=custom_source_files, | |
| include_requirements_txt=True, | |
| path_in_repo=subfolder_name, | |
| commit_message=\ | |
| "source-code for model version " + \ | |
| subfolder_name + \ | |
| f"- retrain-pipelines {__version__}", | |
| repo_type="model", | |
| hf_token=os.getenv("HF_TOKEN", None) | |
| ) | |
| print(source_code_commit_hash) | |
| self.source_code_commit_dict = { | |
| "repo_id": self.model_repo_id, | |
| "branch_name": "retrain-pipelines_source-code", | |
| "commit_datetime": commit_datetime, | |
| "commit_hash": source_code_commit_hash | |
| } | |
| ############################### | |
| ############################### | |
| # pipeline-card # | |
| ############################### | |
| pipeline_card_fullname = None | |
| for run_step in current.run.steps(): | |
| task = list(run_step.tasks())[0] | |
| task_name = task.path_components[2] | |
| if "pipeline_card" == task_name: | |
| pipeline_card = get_cards( | |
| task, id='custom', type='html')[0] | |
| pipeline_card_fullname = os.path.realpath( | |
| os.path.join( | |
| task.metadata_dict.get("ds-root", None), | |
| mf_config.CARD_SUFFIX, pipeline_card.path | |
| )) | |
| print(pipeline_card_fullname) | |
| break | |
| pipeline_card_commit_hash = \ | |
| push_files_to_hub_repo_branch( | |
| repo_id=self.model_repo_id, | |
| branch_name="retrain-pipelines_pipeline-card", | |
| file_fullnames=[pipeline_card_fullname], | |
| path_in_repo=subfolder_name, | |
| commit_message=\ | |
| "pipeline-card for model version " + \ | |
| subfolder_name + \ | |
| f"- retrain-pipelines {__version__}", | |
| repo_type="model", | |
| hf_token=os.getenv("HF_TOKEN", None) | |
| ) | |
| print(pipeline_card_commit_hash) | |
| self.pipeline_card_commit_dict = { | |
| "repo_id": self.model_repo_id, | |
| "branch_name": "retrain-pipelines_pipeline-card", | |
| "commit_datetime": commit_datetime, | |
| "commit_hash": pipeline_card_commit_hash | |
| } | |
| ############################### | |
| self.next(self.deploy) | |
| def deploy(self): | |
| """ | |
| placeholder for the serving SDK deploy call | |
| (on the target production platform). | |
| Include any artifact you want, | |
| consider including the portable pipelione-card | |
| itself ! | |
| """ | |
| if ( | |
| self.model_version_blessed and | |
| (self.local_serve_is_ready == LocalServeReadinessEnum.SUCCESS) | |
| ): | |
| pass # your code here | |
| self.next(self.load_test) | |
| def load_test(self): | |
| """ | |
| placeholder | |
| """ | |
| if ( | |
| self.model_version_blessed and | |
| (self.local_serve_is_ready == LocalServeReadinessEnum.SUCCESS) | |
| ): | |
| pass # your code here | |
| self.next(self.end) | |
| def end(self): | |
| pass | |
| if __name__ == "__main__": | |
| UnslothFuncCallFlow() | |