Source code for training_model.one_file_train

"""Main file for model training"""
import functools
import gc
import json
import logging
import os
import shutil
import subprocess
from contextlib import contextmanager
from tempfile import TemporaryDirectory
from typing import Callable, Dict, Generator, Tuple

# from typing import Any, Dict, List, Union
# import numpy as np
import requests
import torch
from datasets import Dataset
from hydra.utils import get_original_cwd
from omegaconf import DictConfig
from peft import LoraConfig, PeftModel, get_peft_model
from requests.auth import HTTPBasicAuth
from torch import Tensor

# from torch.nn import CrossEntropyLoss
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
)
from trl import SFTConfig, SFTTrainer

import wandb

from .grpo_train import grpo_train
from .logging_config import configure_logging
from .utils import dataset_to_json, tokens_init


[docs] @contextmanager def change_dir(destination: str) -> Generator[None, None, None]: """Context manager for temporarily changing the working directory. Args: destination (str): Path to the target directory Yields: None: Enters the target directory during context execution """ current_dir = os.getcwd() os.chdir(destination) try: yield finally: os.chdir(current_dir)
[docs] def generate_prompt(tokenizer: AutoTokenizer, data_point: Dict[str, str]) -> str: """Generate a chat template prompt for the model. Args: tokenizer (AutoTokenizer): Hugging Face tokenizer data_point (Dict[str, str]): Dictionary containing system, user and bot messages Returns: str: Formatted chat prompt """ return tokenizer.apply_chat_template( [ {"role": "system", "content": data_point["system"]}, {"role": "user", "content": data_point["user"]}, {"role": "assistant", "content": data_point["bot"]}, ], tokenize=False, )
[docs] def tokenize( tokenizer: AutoTokenizer | Callable, cutoff_len: int, prompt: str ) -> Dict[str, torch.Tensor]: """Tokenize text with specified length constraints. Args: tokenizer (AutoTokenizer): Hugging Face tokenizer cutoff_len (int): Maximum sequence length prompt (str): Text to tokenize Returns: Dict[str, torch.Tensor]: Tokenized output dictionary """ return tokenizer( prompt, truncation=True, max_length=cutoff_len, padding="max_length", return_tensors=None, add_special_tokens=True, )
[docs] def generate_and_tokenize_prompt( data_point: Dict[str, str], tokenizer: AutoTokenizer, cutoff: int, should_add_prompt: bool = False, ) -> Dict[str, str] | Dict[str, Tensor]: """Generate and tokenize a complete prompt. Args: data_point (Dict[str, str]): Dictionary containing conversation data tokenizer (AutoTokenizer): Hugging Face tokenizer cutoff (int): Maximum sequence length should_add_prompt (bool): used for grpo, when needed dict with keyword "prompt" returned Returns: Dict[str, torch.Tensor]: Tokenized prompt dictionary """ full_prompt = generate_prompt(tokenizer, data_point) tokenized_full_prompt = tokenize( tokenizer, cutoff, full_prompt, ) if should_add_prompt: return {"prompt": full_prompt} else: return tokenized_full_prompt
[docs] def data_preparation( cfg: DictConfig, tokenizer: AutoTokenizer, should_add_prompt: bool = False ) -> Tuple[Dataset, Dataset]: """Prepare and preprocess training and validation datasets. Args: cfg (DictConfig): Configuration object tokenizer (AutoTokenizer): Hugging Face tokenizer Returns: Tuple[Dataset, Dataset]: Tuple containing train and validation datasets """ data_dir = os.path.join(get_original_cwd(), cfg.paths.data_dir) with open(os.path.join(data_dir, "test_ru.json"), "r", encoding="utf-8") as file: test_dataset = json.load(file) with open( os.path.join(get_original_cwd(), cfg.model.dataset_name), "r", encoding="utf-8" ) as file: train_dataset = json.load(file) # Use temporary directory for JSON files with TemporaryDirectory() as temp_dir: train_json = os.path.join(temp_dir, "train.json") test_json = os.path.join(temp_dir, "test.json") dataset_to_json(train_dataset, train_json) dataset_to_json(test_dataset, test_json) from datasets import load_dataset dataset = load_dataset( "json", data_files={"train": train_json, "test": test_json} ) tokenize_partial = functools.partial( generate_and_tokenize_prompt, tokenizer=tokenizer, cutoff=cfg.other.cutoff_len, should_add_prompt=should_add_prompt, ) train_data = dataset["train"].map(tokenize_partial) val_data = dataset["test"].map(tokenize_partial) return train_data, val_data
[docs] def model_merge_for_converting(cfg: DictConfig, steps: int, save_path: str) -> None: """Merge base model with adapter weights and save the result. Args: cfg (DictConfig): Configuration object steps (int): Training step number for checkpoint selection save_path (str): Path to save merged model """ model_path = cfg.model.model_name adapter_path = f"{cfg.model.new_model}/checkpoint-{steps}" model = AutoModelForCausalLM.from_pretrained( model_path, torch_dtype="auto", device_map="auto" ) tokenizer = AutoTokenizer.from_pretrained(model_path) model = PeftModel.from_pretrained(model, adapter_path) model = model.merge_and_unload() model.save_pretrained(save_path) tokenizer.save_pretrained(save_path) del model gc.collect() torch.cuda.empty_cache() logging.info("Model merged")
[docs] def train(cfg: DictConfig) -> int: """Execute full training pipeline. Args: cfg (DictConfig): Configuration object Returns: int: Number of global training steps completed """ tokens_init(cfg) torch_dtype = ( getattr(torch, cfg.model.torch_dtype) if isinstance(cfg.model.torch_dtype, str) else cfg.model.torch_dtype ) if not cfg.model.use_8bit: bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch_dtype, bnb_4bit_use_double_quant=True, ) else: bnb_config = BitsAndBytesConfig( load_in_8bit=True, llm_int8_threshold=6.0, torch_dtype=torch_dtype, ) model = AutoModelForCausalLM.from_pretrained( cfg.model.model_name, quantization_config=bnb_config, device_map="auto", use_cache=False, ) logging.info("Model loaded") tokenizer = AutoTokenizer.from_pretrained(cfg.model.model_name) tokenizer.padding_side = "right" if tokenizer.pad_token is None or tokenizer.pad_token_id is None: tokenizer.add_special_tokens({"pad_token": "<|pad|>"}) tokenizer.pad_token = "<|pad|>" if tokenizer.pad_token_id is None: tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids( tokenizer.pad_token ) model.resize_token_embeddings(len(tokenizer)) peft_config = LoraConfig( r=cfg.model.lora_r, lora_alpha=cfg.model.lora_alpha, lora_dropout=cfg.model.lora_dropout, bias="none", task_type="CAUSAL_LM", target_modules=[ "up_proj", "down_proj", "gate_proj", "k_proj", "q_proj", "v_proj", "o_proj", ], modules_to_save=["lm_head"], inference_mode=False, ) model = get_peft_model(model, peft_config) train_data, val_data = data_preparation(cfg, tokenizer) logging.info("Data prepared") global_steps = 0 if cfg.training.use_sft: sft_config = SFTConfig( output_dir=cfg.model.new_model, max_seq_length=cfg.training.max_seq_length, dataset_kwargs={"skip_prepare_dataset": True}, packing=False, run_name=cfg.model.new_model, per_device_train_batch_size=cfg.training.per_device_train_batch_size, per_device_eval_batch_size=cfg.training.per_device_eval_batch_size, gradient_accumulation_steps=cfg.training.gradient_accumulation_steps, gradient_checkpointing=cfg.training.gradient_checkpointing, # max_steps=cfg.model.train_steps, optim=cfg.training.optim, num_train_epochs=cfg.training.num_train_epochs, eval_strategy="steps", eval_steps=cfg.training.eval_steps, logging_steps=cfg.training.logging_steps, warmup_steps=cfg.training.warmup_steps, logging_strategy="steps", learning_rate=cfg.training.learning_rate, fp16=cfg.training.fp16, bf16=cfg.training.bf16, weight_decay=cfg.training.weight_decay, neftune_noise_alpha=cfg.training.neftune_noise_alpha, gradient_checkpointing_kwargs={"use_reentrant": False}, group_by_length=True, report_to="wandb", save_total_limit=cfg.training.save_total_limit, load_best_model_at_end=cfg.training.load_best, ) trainer = SFTTrainer( model=model, train_dataset=train_data, eval_dataset=val_data, peft_config=peft_config, processing_class=tokenizer, args=sft_config, ) trainer.train() global_steps: int = trainer.state.global_step if cfg.training.use_grpo: grpo_train( model=model, tokenizer=tokenizer, cfg=cfg, data_preparing_func=data_preparation, ) if not cfg.training.use_grpo and not cfg.training.use_sft: logging.warning("Model training not configured") else: logging.info("Model trained") merged_model = model.merge_and_unload() merged_model.save_pretrained(cfg.paths.output_dir) tokenizer.save_pretrained(cfg.paths.output_dir) logging.info("Model saved") del model, merged_model gc.collect() torch.cuda.empty_cache() return global_steps
[docs] def convert_to_gguf( model_path: str, outfile: str, python_exe: str, outtype: str, cfg: DictConfig, ) -> None: """Convert Hugging Face model to GGUF format. Args: model_path (str): Path to input model directory outfile (str): Output file path python_exe (str): Python executable path outtype (str): Output type specification cfg (DictConfig): Configuration object Raises: FileNotFoundError: If required paths are missing """ try: llama_cpp_dir = os.path.abspath(cfg.paths.llama_cpp_dir) conversion_script = os.path.join(llama_cpp_dir, "convert_hf_to_gguf.py") if not os.path.isdir(llama_cpp_dir): raise FileNotFoundError(f"llama.cpp directory not found: {llama_cpp_dir}") if not os.path.exists(conversion_script): raise FileNotFoundError(f"Conversion script missing: {conversion_script}") model_path = os.path.normpath(os.path.abspath(model_path)) outfile = os.path.normpath(os.path.abspath(outfile)) python_exe = os.path.normpath(cfg.paths.venv_python_path) subprocess.run( [ python_exe, conversion_script, model_path, "--outfile", outfile, "--outtype", outtype, ], check=True, cwd=llama_cpp_dir, ) except subprocess.CalledProcessError: logging.error( f"GGUF conversion failed. Check:\n" f"- llama.cpp exists at {cfg.paths.llama_cpp_dir}\n" f"- Conversion script exists: {os.path.join(cfg.paths.llama_cpp_dir, 'convert_hf_to_gguf.py')}\n" f"- Python executable: {python_exe}\n" f"- Model path: {model_path}" )
[docs] def quantize_model( model_path: str, outfile: str, qtype: str = "q4_0", llama_cpp_path: str = ".", quantized_path: str = "llama-quantize.exe", ) -> bool: """Quantize GGUF model using llama.cpp quantizer. Args: model_path (str): Path to input GGUF model outfile (str): Path for quantized output qtype (str): Quantization type (default: q4_0) llama_cpp_path (str): Path to llama.cpp directory quantized_path (str): Name of quantizer executable Returns: bool: True if quantization succeeded, False otherwise """ llama_cpp_dir = os.path.abspath(llama_cpp_path) llama_quantize_path = os.path.join(llama_cpp_dir, quantized_path) model_path = os.path.abspath(model_path) outfile = os.path.abspath(outfile) if not os.path.exists(llama_quantize_path): logging.error(f"Error: llama-quantize.exe not found at {llama_quantize_path}") return False logging.info("Trying to quantize model...") command = [llama_quantize_path, model_path, outfile, qtype] logging.info(f"Running command: {command}") try: process = subprocess.run( command, check=True, capture_output=True, cwd=llama_cpp_dir ) logging.info("Model quantized") except subprocess.CalledProcessError as e: logging.error(f"Command failed with exit code {e.returncode}") return False logging.info(f"Command output: {process.stdout.decode()}") logging.info( f"Command stderr: {process.stderr.decode() if process.stderr else 'No stderr output.'}" ) return True
[docs] def copy_data( file: str, gguf_directory: str = "custom-model", destination: str = r"T:\lm-studio\models\game-model", ) -> None: """Move file to destination directory with versioning. Args: file (str): Source file name gguf_directory (str): Version subdirectory destination (str): Root destination directory """ destination_path = os.path.join(destination, gguf_directory, file) os.makedirs(os.path.dirname(destination_path), exist_ok=True) shutil.move(os.path.join(os.getcwd(), file), destination_path)
[docs] def train_pipeline(cfg: DictConfig) -> None: """Execute complete training pipeline including conversion and quantization. Args: cfg (DictConfig): Configuration object Raises: RuntimeError: If quantization step fails """ try: steps = train(cfg) with TemporaryDirectory() as merged_model_dir: model_merge_for_converting(cfg, steps, merged_model_dir) outfile = cfg.model.outfile convert_to_gguf( model_path=merged_model_dir, outfile=os.path.join(merged_model_dir, outfile), python_exe=cfg.paths.venv_python_path, outtype="f16", cfg=cfg, ) logging.info(f"Converted to GGUF: {outfile}") quantized_file = outfile if quantize_model( model_path=os.path.join(merged_model_dir, outfile), outfile=quantized_file, qtype=cfg.model.qtype, llama_cpp_path=os.path.abspath(cfg.paths.llama_cpp_dir), quantized_path=cfg.paths.quantized_path, ): copy_data( quantized_file, cfg.model.gguf_directory, cfg.paths.final_weights_path, ) if os.path.exists(quantized_file): os.remove(quantized_file) logging.info(f"Removed intermediate file: {quantized_file}") else: raise RuntimeError("Quantization failed") logging.info("Training pipeline completed") finally: wandb.finish() logging.info("Wandb finished") gc.collect() torch.cuda.empty_cache()
[docs] def main_train(data_dir: str, cfg: DictConfig) -> None: """Main training entry point with dataset processing. Args: data_dir (str): Directory containing training data cfg (DictConfig): Configuration object """ train_pipeline(cfg) with open(os.path.join(data_dir, "test_ru.json"), "r", encoding="utf-8") as file: test_dataset = json.load(file) dataset_to_json(test_dataset, cfg.testing.output_test_file)
[docs] def post_new_dataset() -> None: """Upload new dataset version to remote server.""" url = "https://dataset.ser13volk.me/dataset_ru" with open(os.path.join("../data", "dataset_ru.json"), "rb") as f: files = {"file": f} response = requests.post(url, files=files, auth=HTTPBasicAuth("admin", "")) logging.info(response.json())
if __name__ == "__main__": configure_logging(logging.DEBUG) main_train() # post_new_dataset()