transformers_Trainer

2025616

10:46

Trainer

 

class transformers.Trainer   <source>

( model: typing.Union[transformers.modeling_utils.PreTrainedModel, torch.nn.modules.module.Module, NoneType] = Noneargs: TrainingArguments = Nonedata_collator: typing.Optional[transformers.data.data_collator.DataCollator] = Nonetrain_dataset: typing.Union[torch.utils.data.dataset.Dataset, torch.utils.data.dataset.IterableDataset, ForwardRef('datasets.Dataset'), NoneType] = Noneeval_dataset: typing.Union[torch.utils.data.dataset.Dataset, dict[str, torch.utils.data.dataset.Dataset], ForwardRef('datasets.Dataset'), NoneType] = Noneprocessing_class: typing.Union[transformers.tokenization_utils_base.PreTrainedTokenizerBase, transformers.image_processing_utils.BaseImageProcessor, transformers.feature_extraction_utils.FeatureExtractionMixin, transformers.processing_utils.ProcessorMixin, NoneType] = Nonemodel_init: typing.Optional[typing.Callable[[], transformers.modeling_utils.PreTrainedModel]] = Nonecompute_loss_func: typing.Optional[typing.Callable] = Nonecompute_metrics: typing.Optional[typing.Callable[[transformers.trainer_utils.EvalPrediction], dict]] = Nonecallbacks: typing.Optional[list[transformers.trainer_callback.TrainerCallback]] = Noneoptimizers: tuple = (None, None)optimizer_cls_and_kwargs: typing.Optional[tuple[type[torch.optim.optimizer.Optimizer], dict[str, typing.Any]]] = Nonepreprocess_logits_for_metrics: typing.Optional[typing.Callable[[torch.Tensor, torch.Tensor], torch.Tensor]] = None )

Parameters

  • model (PreTrainedModel or torch.nn.Module, optional) — The model to train, evaluate or use for predictions. If not provided, a model_init must be passed.
    Trainer is optimized to work with the PreTrainedModel provided by the library. You can still use your own models defined as torch.nn.Module as long as they work the same way as the 🤗 Transformers models.
  • args (TrainingArgumentsoptional) — The arguments to tweak for training. Will default to a basic instance of TrainingArguments with the output_dir set to a directory named tmp_trainer in the current directory if not provided.
  • data_collator (DataCollator, optional) — The function to use to form a batch from a list of elements of train_dataset or eval_dataset. Will default to default_data_collator() if no processing_class is provided, an instance of DataCollatorWithPadding otherwise if the processing_class is a feature extractor or tokenizer.
  • train_dataset (Union[torch.utils.data.Dataset, torch.utils.data.IterableDataset, datasets.Dataset], optional) — The dataset to use for training. If it is a Dataset, columns not accepted by the model.forward() method are automatically removed.
    Note that if it’s a torch.utils.data.IterableDataset with some randomization and you are training in a distributed fashion, your iterable dataset should either use a internal attribute generator that is a torch.Generator for the randomization that must be identical on all processes (and the Trainer will manually set the seed of this generator at each epoch) or have a set_epoch() method that internally sets the seed of the RNGs used.
  • eval_dataset (Union[torch.utils.data.Dataset, Dict[str, torch.utils.data.Dataset, datasets.Dataset]), optional) — The dataset to use for evaluation. If it is a Dataset, columns not accepted by the model.forward() method are automatically removed. If it is a dictionary, it will evaluate on each dataset prepending the dictionary key to the metric name.
  • processing_class (PreTrainedTokenizerBase or BaseImageProcessor or FeatureExtractionMixin or ProcessorMixin, optional) — Processing class used to process the data. If provided, will be used to automatically process the inputs for the model, and it will be saved along the model to make it easier to rerun an interrupted training or reuse the fine-tuned model. This supersedes the tokenizer argument, which is now deprecated.
  • model_init (Callable[[], PreTrainedModel], optional) — A function that instantiates the model to be used. If provided, each call to train() will start from a new instance of the model as given by this function.
    The function may have zero argument, or a single one containing the optuna/Ray Tune/SigOpt trial object, to be able to choose different architectures according to hyper parameters (such as layer count, sizes of inner layers, dropout probabilities etc).
  • compute_loss_func (Callable, optional) — A function that accepts the raw model outputs, labels, and the number of items in the entire accumulated batch (batch_size * gradient_accumulation_steps) and returns the loss. For example, see the default loss function used by Trainer.
  • compute_metrics (Callable[[EvalPrediction], Dict], optional) — The function that will be used to compute metrics at evaluation. Must take a EvalPrediction and return a dictionary string to metric values. Note When passing TrainingArgs with batch_eval_metrics set to True, your compute_metrics function must take a boolean compute_result argument. This will be triggered after the last eval batch to signal that the function needs to calculate and return the global summary statistics rather than accumulating the batch-level statistics
  • callbacks (List of TrainerCallbackoptional) — A list of callbacks to customize the training loop. Will add those to the list of default callbacks detailed in here.
    If you want to remove one of the default callbacks used, use the 
    Trainer.remove_callback() method.
  • optimizers (Tuple[torch.optim.Optimizer, torch.optim.lr_scheduler.LambdaLR], optional, defaults to (None, None)) — A tuple containing the optimizer and the scheduler to use. Will default to an instance of AdamW on your model and a scheduler given by get_linear_schedule_with_warmup() controlled by args.
  • optimizer_cls_and_kwargs (Tuple[Type[torch.optim.Optimizer], Dict[str, Any]], optional) — A tuple containing the optimizer class and keyword arguments to use. Overrides optim and optim_args in args. Incompatible with the optimizers argument.
    Unlike optimizers, this argument avoids the need to place model parameters on the correct devices before initializing the Trainer.
  • preprocess_logits_for_metrics (Callable[[torch.Tensor, torch.Tensor], torch.Tensor], optional) — A function that preprocess the logits right before caching them at each evaluation step. Must take two tensors, the logits and the labels, and return the logits once processed as desired. The modifications made by this function will be reflected in the predictions received by compute_metrics.
    Note that the labels (second parameter) will be None if the dataset does not have them.

 

 

Trainer的几个重要method,可继承并重写:

method

description

get_train_dataloader()

create a training DataLoader

train_dataloader默认是随机采样

 

Returns the training ~torch.utils.data.DataLoader.

 

Will use no sampler if train_dataset does not implement __len__, a random sampler (adapted to distributed training if necessary) otherwise.

 

Subclass and override this method if you want to inject some custom behavior.

 

get_eval_dataloader()

    create an evaluation DataLoader

    Parameters

    • eval_dataset (str or torch.utils.data.Dataset, optional) — If a str, will use self.eval_dataset[eval_dataset] as the evaluation dataset. If a Dataset, will override self.eval_dataset and must implement __len__. If it is a Dataset, columns not accepted by the model.forward() method are automatically removed.

     

    Returns the evaluation ~torch.utils.data.DataLoader.

    Subclass and override this method if you want to inject some custom behavior.

     

     

     

get_test_dataloader()

create a test DataLoader

log()

log information about the training process

create_optimizer_and_scheduler()

create an optimizer and learning rate scheduler (can also be separately customized with create_optimizer() and create_scheduler() if they weren’t passed in __init__)

compute_loss()

compute the loss of a batch of training inputs

 

How the loss is computed by Trainer. By default, all models return the loss in the first element.

 

training_step()

perform the training step

prediction_step()

perform the prediction and test step

evaluate()

evaluate the model and return the evaluation metric

predict()

make a prediction (with metrics if labels are available) on the test set

create_optimizer()

Setup the optimizer.

 

We provide a reasonable default that works well. If you want to use something else, you can pass a tuple in the Trainer’s init through optimizers, or subclass and override this method in a subclass.

create_optimizer_and_scheduler(num_training_steps: int)

Setup the optimizer and the learning rate scheduler.

 

We provide a reasonable default that works well. If you want to use something else, you can pass a tuple in the Trainer’s init through optimizers, or subclass and override this method (or create_optimizer and/or create_scheduler) in a subclass.

evaluate( eval_dataset: typing.Union[torch.utils.data.dataset.Dataset, dict[str, torch.utils.data.dataset.Dataset], NoneType] = Noneignore_keys: typing.Optional[list[str]] = Nonemetric_key_prefix: str = 'eval' )

    重写Evaluate函数,

    Parameters

    • eval_dataset (Union[Dataset, Dict[str, Dataset]), optional) — Pass a dataset if you wish to override self.eval_dataset. If it is a Dataset, columns not accepted by the model.forward() method are automatically removed. If it is a dictionary, it will evaluate on each dataset, prepending the dictionary key to the metric name. Datasets must implement the __len__ method.
      If you pass a dictionary with names of datasets as keys and datasets as values, evaluate will run separate evaluations on each dataset. This can be useful to monitor how training affects other datasets or simply to get a more fine-grained evaluation. When used with load_best_model_at_end, make sure metric_for_best_model references exactly one of the datasets. If you, for example, pass in {"data1": data1, "data2": data2} for two datasets data1 and data2, you could specify metric_for_best_model="eval_data1_loss" for using the loss on data1 and metric_for_best_model="eval_data2_loss" for the loss on data2.
    • ignore_keys (List[str], optional) — A list of keys in the output of your model (if it is a dictionary) that should be ignored when gathering predictions.
    • metric_key_prefix (str, optional, defaults to "eval") — An optional prefix to be used as the metrics key prefix. For example the metrics “bleu” will be named “eval_bleu” if the prefix is “eval” (default)

     

    Run evaluation and returns metrics.

    The calling script will be responsible for providing a method to compute metrics, as they are task-dependent (pass it to the init compute_metrics argument).

    You can also subclass and override this method to inject custom behavior.

     

     

evaluation_loop( dataloader: DataLoaderdescription: strprediction_loss_only: typing.Optional[bool] = Noneignore_keys: typing.Optional[list[str]] = Nonemetric_key_prefix: str = 'eval' )

 

Trainer.evaluate() and Trainer.predict()都要调用这个函数

 

Prediction/evaluation loop, shared by Trainer.evaluate() and Trainer.predict().

 

Works both with or without labels.

 

Trainer.train()

主要是_inner_training_loop函数

def _inner_training_loop(
    self, batch_size=None, args=None, resume_from_checkpoint=None, trial=None, ignore_keys_for_eval=None):

# 1. 初始化阶段,初始化训练环境,处理自动批次大小调整和 DeepSpeed 配置。

 

# 2. 获取训练数据加载器 train dataloader,计算训练步数、epoch数等关键变量

# 获取训练数据加载器
train_dataloader = self.get_train_dataloader()   # 这里调用了.get_train_dataloader(),get_train_dataloader()返回的是

return self._get_dataloader(

            dataset=self.train_dataset,

            description="Training",

            batch_size=self._train_batch_size,

            sampler_fn=self._get_train_sampler,

            is_training=True,

        )

self._get_dataloader函数会调用self.data_collator。data_collator函数是在每个batch划分好之后(此时每个batchbatch_size个dataset中的条目,每个条目由自定义的dataset类的__getitem__函数定义) ,进行的后加工。

 

# 计算训练步数、epoch数等关键变量
total_train_batch_size = self._train_batch_size * args.gradient_accumulation_steps * args.world_size
(
    num_train_epochs,
    num_update_steps_per_epoch,
    num_examples,
    num_train_samples,
    epoch_based,
    len_dataloader,
    max_steps,
) = self.set_initial_training_values(args, train_dataloader, total_train_batch_size)

 

# 3. 处理调试选项,初始化优化器和学习率调度器,支持 DeepSpeed/FSDP 等特殊场景。

# 创建优化器和学习率调度器
if not delay_optimizer_creation:      # 这里调用了.create_optimizer_and_scheduler()
    self
.create_optimizer_and_scheduler(num_training_steps=max_steps)

 

# 4. 将模型包装为分布式训练模式(如 DDP/FSDP),并通过 Accelerator 处理设备分配

# 模型包装(如 DDP、FSDP、DeepSpeed)
model = self._wrap_model(self.model_wrapped)

# 使用 Accelerator 准备模型(如多 GPU 分布式训练)
if use_accelerator_prepare:
    if self.is_fsdp_enabled:
        self.model = unwrap_model(self.model, recursive=True)
    if delay_optimizer_creation:
        self._fsdp_qlora_plugin_updates()
        if self.accelerator.mixed_precision != "fp8":
            self.model = self.accelerator.prepare(self.model)
    self.create_optimizer_and_scheduler(num_training_steps=max_steps)

# 准备优化器和调度器
if use_accelerator_prepare:
    self.model.train()
    if hasattr(self.lr_scheduler, "step"):
        model, self.optimizer = self.accelerator.prepare(self.model, self.optimizer)
    else:
        model, self.optimizer, self.lr_scheduler = self.accelerator.prepare(
            self.model, self.optimizer, self.lr_scheduler
        )

 

# 5. 从检查点恢复模型、优化器和训练状态。

# 加载模型检查点
if resume_from_checkpoint is not None:
    if self.is_deepspeed_enabled:
        deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint)
    elif is_sagemaker_mp_enabled() or self.is_fsdp_enabled:
        self._load_from_checkpoint(resume_from_checkpoint, self.model_wrapped)

# 加载优化器和调度器状态    Check if saved optimizer or scheduler states exist
self
._load_optimizer_and_scheduler(resume_from_checkpoint)
self
._load_scaler(resume_from_checkpoint)

 

# 6. 执行训练循环,包括前向传播、反向传播、梯度裁剪和优化器更新,定期记录日志和保存模型。

 

        # important: at this point:

        # self.model         is the Transformers Model

        # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model),

        # FSDP(Transformers Model), Dynamo Optimized Module(Transformers Model) etc.

       # 此时self.model必须是这几种class之一!

 

# 初始化训练状态变量
self.state.epoch = 0                # 当前训练的 epoch 数(初始为 0)
start_time = time.time()            # 记录训练开始时间(用于计算总耗时)
epochs_trained = 0                  # 已训练的 epoch 数(用于恢复训练时跳过已完成 epoch)
steps_trained_in_current_epoch = 0  # 当前 epoch 中已训练的 step 数(用于恢复训练时跳过已计算 batch)
steps_trained_progress_bar = None   # 进度条控制变量(初始为 None)

# 检查是否从检查点(checkpoint)恢复训练

# 检查是否是从checkpoint开始继续训练,也就是说如果要继续训练,Trainer会自动跳过之前已经训练过的epoch,而不需要人工手动减去训练过的nepoch
if resume_from_checkpoint is not None and os.path.isfile(
    os
.path.join(resume_from_checkpoint, TRAINER_STATE_NAME)
):
   
# 加载保存的训练状态(包括 global_step、epoch、最佳指标等)
    self
.state = TrainerState.load_from_json(os.path.join(resume_from_checkpoint, TRAINER_STATE_NAME))
   
   
# 检查训练参数是否与 checkpoint 保存的参数一致(防止配置冲突)
    self
.compare_trainer_and_checkpoint_args(self.args, self.state)
   
   
# 恢复回调函数的状态(如 EarlyStopping 的 patience 计数)
    self
._load_callback_state()
   
   
# 计算已完成的 epoch 数(global_step // 每个 epoch 的更新步数)
    epochs_trained
= int(self.state.global_step // num_update_steps_per_epoch)
   
   
# 是否跳过已训练的数据(由 ignore_data_skip 参数控制)
   
if not args.ignore_data_skip:
       
# 计算当前 epoch 中已训练的 step 数(考虑梯度累积)
        steps_trained_in_current_epoch
= self.state.global_step % (num_update_steps_per_epoch)
        steps_trained_in_current_epoch
*= args.gradient_accumulation_steps
   
else:
        steps_trained_in_current_epoch
= 0  # 不跳过数据(从头开始训练当前 epoch)

# 更新回调函数的引用(确保回调能访问最新模型、优化器等对象)
for attr in ("model", "optimizer", "lr_scheduler"):
    setattr(self.callback_handler, attr, getattr(self, attr))
self.callback_handler.train_dataloader = train_dataloader  # 绑定训练数据加载器

# 初始化训练状态引用(关联 Trainer 对象和最大步数/epoch 数)
self.state.init_training_references(self, max_steps, num_train_epochs, trial)

# 初始化损失记录变量
tr_loss = torch.tensor(0.0, device=args.device)  # 避免 TPU 同步问题的张量损失
self._total_loss_scalar = 0.0                   # 累计损失标量值(用于日志)
self._globalstep_last_logged = self.state.global_step  # 记录最后一次日志的 step

# 清零模型梯度
model.zero_grad()

# 初始化其他训练变量
grad_norm: Optional[float] = None  # 梯度范数(用于裁剪时记录)
learning_rate = None               # 当前学习率(动态更新)

# 触发训练开始的回调(如自定义日志、初始化操作)
self.control = self.callback_handler.on_train_begin(args, self.state, self.control)

# 如果设置 eval_on_start,在训练开始前先评估模型,评估时,调用的是_evaluate()函数,
if args.eval_on_start:
    self
._evaluate(trial, ignore_keys_for_eval, skip_scheduler=True)

 

关于_evaluate()函数:_evaluate()函数首先调用了evaluate函数,

metrics = self.evaluate(ignore_keys=ignore_keys_for_eval)也就是说自定义的evaluate函数会影响到Trainer本身的评估环节。

其次,在得到metrics之后,如果学习率调度器是 ReduceLROnPlateau 且不跳过调度器更新,由于ReduceLROnPlateau 的特殊性:依赖验证指标:需要根据外部指标(如验证损失)动态调整学习率。

触发条件:在验证阶段而非训练步骤调用 scheduler.step(metrics)),此时需要根据验证指标调整学习率(如指标未提升则降低学习率)

 

关于evaluate()函数:

def evaluate(

    self,

    eval_dataset: Optional[Union[Dataset, dict[str, Dataset]]] = None,

    ignore_keys: Optional[list[str]] = None,

    metric_key_prefix: str = "eval",

) -> dict[str, float]:

 

    # 这里调用了.get_eval_dataloader(),get_eval_dataloader()返回的是return self._get_dataloader(

            dataset=eval_dataset,

            description="Evaluation",

            batch_size=self.args.eval_batch_size,

            sampler_fn=self._get_eval_sampler,

            dataloader_key=dataloader_key, )

,其中_get_dataloader函数和get_train_dataloader()中用到的是一个函数,也就是说,默认情况下,eval_data会经过和train_data同样的data_collator处理。

 

    eval_dataloader = self.get_eval_dataloader(eval_dataset)

    # 主要是进行evaluation_loop()

    output = eval_loop(

        eval_dataloader,

        description="Evaluation",

        # 如果没有计算指标的方法,则只计算loss

        prediction_loss_only=True if self.compute_metrics is None else None,

        ignore_keys=ignore_keys,  # 忽略指定的输出键

        metric_key_prefix=metric_key_prefix,  # 指标前缀

    )

 

 

# 开始训练

 # 这里遍历每个epoch

for epoch in range(epochs_trained, num_train_epochs):
    epoch_dataloader = train_dataloader  # 获取当前epoch的数据加载器
   
# 计算当前epoch的总步数(考虑梯度累积)
    steps_in_epoch = (
        len(epoch_dataloader) if len_dataloader is not None
        else args.max_steps * args.gradient_accumulation_steps
    )
   
    # 回调函数:epoch开始事件
    self.control = self.callback_handler.on_epoch_begin(args, self.state, self.control)

    # 如果是恢复训练且当前epoch是初始epoch
    if (epoch == epochs_trained and
        resume_from_checkpoint is not None and
        steps_trained_in_current_epoch == 0):
        self._load_rng_state(resume_from_checkpoint)  # 加载随机数状态

        rng_to_sync = False
   
    steps_skipped = 0
   
   
# 跳过已训练过的step(恢复训练时)
   
if steps_trained_in_current_epoch > 0:
        epoch_dataloader
= skip_first_batches(epoch_dataloader, steps_trained_in_current_epoch)
        steps_skipped
= steps_trained_in_current_epoch
        steps_trained_in_current_epoch
= 0
        rng_to_sync
= True  # 标记需要同步RNG状态

 

    step = -1
   
epoch_iterator = iter(epoch_dataloader)  # 创建迭代器,在这里创建了iter
   
   
# 计算梯度累积的批次划分
    remainder
= steps_in_epoch % args.gradient_accumulation_steps
   
if remainder == 0:
        remainder
= args.gradient_accumulation_steps
   
   
# 计算总更新次数(参数更新次数=总步数/梯度累积步数)
    total_updates
= steps_in_epoch // args.gradient_accumulation_steps + int(
        remainder
< args.gradient_accumulation_steps
   
)

# 这里遍历每个update

total_updates 表示 一个epoch内需要进行参数更新的总次数,其计算逻辑为:

total_updates = steps_in_epoch // args.gradient_accumulation_steps + int(remainder < args.gradient_accumulation_steps)

  • steps_in_epoch:当前epoch的总步数(即数据加载器的批次数量)。
  • args.gradient_accumulation_steps:梯度累积步数(例如设为4时,每4个batch才更新一次参数)。
  • remaindersteps_in_epoch 除以 gradient_accumulation_steps 的余数。

计算逻辑

  • 如果 steps_in_epoch 能被 gradient_accumulation_steps 整除(余数=0):
    total_updates = steps_in_epoch // gradient_accumulation_steps
  • 如果不能整除(余数>0):
    total_updates = (steps_in_epoch // gradient_accumulation_steps) + 1
    (因为剩余的步数需要额外一次更新)

 

    for _ in range(total_updates):  # 遍历每个参数更新周期
        update_step += 1
       
        # 当前更新周期的batch数量(最后一步可能不足)
        num_batches = (args.gradient_accumulation_steps
                      if update_step != (total_updates - 1)
                      else remainder)
       
        # 获取批次数据
        batch_samples, num_items_in_batch = self.get_batch_samples(
            epoch_iterator
, num_batches, args.device
       
)

 

# 这里遍历每个batch

        for i, inputs in enumerate(batch_samples):  # 遍历每个micro batch
            step += 1
           
            # 判断是否进行梯度同步(最后一步或累积步数满足时)
           
do_sync_step = ((step + 1) % args.gradient_accumulation_steps == 0
                         
or (step + 1) == steps_in_epoch)
           
            # 设置梯度同步状态(用于混合精度/分布式训练)
            self.accelerator.gradient_state._set_sync_gradients(do_sync_step)

# 统计已处理的token数量(可选)
            if self.args.include_num_input_tokens_seen:
                main_input_name = getattr(self.model, "main_input_name", "input_ids")
                if main_input_name in inputs:
                    input_tokens = inputs[main_input_name].numel()
                    # 多GPU时聚合所有设备的token数
                    self.state.num_input_tokens_seen += self.accelerator.gather(
                        torch.tensor(input_tokens, device=self.args.device)
                    ).sum().item()

# 恢复训练时同步随机状态
            if rng_to_sync:
                self._load_rng_state(resume_from_checkpoint)
                rng_to_sync = False

            # 跳过已训练的steps(恢复训练时)
            if steps_trained_in_current_epoch > 0:
                steps_trained_in_current_epoch -= 1
                continue

# 回调函数:step开始事件
            if step % args.gradient_accumulation_steps == 0:
                self.control = self.callback_handler.on_step_begin(args, self.state, self.control)

# 梯度累积上下文(非最后一步时不同步梯度)
            context
= (
                functools
.partial(self.accelerator.no_sync, model=model)
               
if (i != len(batch_samples) - 1 and
                    self
.accelerator.distributed_type != DistributedType.DEEPSPEED)
               
else contextlib.nullcontext()
           
)
           
           
# 执行训练步骤(前向+反向)
           
with context():
               
tr_loss_step = self.training_step(model, inputs, num_items_in_batch) #单个batch的训练step

# 处理NaN/Inf损失(调试用)
            if (args.logging_nan_inf_filter and
                not is_torch_xla_available() and
                (torch.isnan(tr_loss_step) or torch.isinf(tr_loss_step))):
                tr_loss = tr_loss + tr_loss / (1 + self.state.global_step - self._globalstep_last_logged)
            else:
                tr_loss = tr_loss + tr_loss_step  # 累积损失

# 统计FLOPs(浮点运算次数)
            self.current_flos += float(self.floating_point_ops(inputs))

            # 梯度同步时执行参数更新
            if do_sync_step:
                # 梯度裁剪(防止梯度爆炸)
                if args.max_grad_norm is not None and args.max_grad_norm > 0:
                    if is_sagemaker_mp_enabled() and args.fp16:
                        _grad_norm = self.optimizer.clip_master_grads(args.max_grad_norm)
                    elif self.use_apex:
                        _grad_norm = nn.utils.clip_grad_norm_(
                            amp.master_params(self.optimizer), args.max_grad_norm)
                    else:
                        _grad_norm = self.accelerator.clip_grad_norm_(
                            model.parameters(), args.max_grad_norm)

# 回调函数:优化器step前事件
                self.control = self.callback_handler.on_pre_optimizer_step(args, self.state, self.control)
               
                # 参数更新
                self.optimizer.step()
               
                # 回调函数:优化器step后事件
                self.control = self.callback_handler.on_optimizer_step(args, self.state, self.control)

# 学习率调度(非ReduceLROnPlateau类型)
                learning_rate = self._get_learning_rate()
                if not self.accelerator.optimizer_step_was_skipped:
                    if not isinstance(self.lr_scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
                        self.lr_scheduler.step()

# 清空梯度、更新训练状态
                model
.zero_grad()
                self
.state.global_step += 1
                self
.state.epoch = epoch + (step + 1 + steps_skipped) / steps_in_epoch
               
               
# 回调函数:step结束事件
                self
.control = self.callback_handler.on_step_end(args, self.state, self.control)
               
                # 每个step执行完后,调用_maybe_log_save_evaluate
                self._maybe_log_save_evaluate(
                    tr_loss
, grad_norm, model, trial, epoch,
                    ignore_keys_for_eval
, start_time, learning_rate=learning_rate
               
)这个函数在do_sync_step 时才执行,也就是真正执行梯度更新时的那个step执行。

else:
                # 回调函数:子步结束事件(梯度累积中)
                self.control = self.callback_handler.on_substep_end(args, self.state, self.control)

 

            # 提前终止检查(如EarlyStopping回调触发)
            if self.control.should_epoch_stop or self.control.should_training_stop:
                if is_torch_xla_available():  # TPU特殊处理
                    xm.mark_step()
                break  # 终止内层循环

# 双重检查终止条件(同时跳出外层循环)
        if self.control.should_epoch_stop or self.control.should_training_stop:
            if is_torch_xla_available():
                xm.mark_step()
            break

# 空数据检查
    if step < 0:
        logger.warning("No training data available!")
        self.control.should_training_stop = True

 

    # 回调函数:epoch结束事件
    self.control = self.callback_handler.on_epoch_end(args, self.state, self.control)
   
   # 在每个epoch执行完后,调用_maybe_log_save_evaluate,_maybe_log_save_evaluate这个函数内部调用了_evluate()函数和_save_checkpoint()函数。_save_checkpoint()函数内部调用了self.save_model函数,以及

        if not self.args.save_only_model:

            # Save optimizer and scheduler

            self._save_optimizer_and_scheduler(output_dir)

            self._save_scaler(output_dir)

            # Save RNG state

            self._save_rng_state(output_dir)

并且save_model()函数中调用了_save()函数。
    self._maybe_log_save_evaluate(
        tr_loss, grad_norm, model, trial, epoch,
        ignore_keys_for_eval, start_time, learning_rate=learning_rate
    )

# TPU调试信息输出(可选)
    if DebugOption.TPU_METRICS_DEBUG in self.args.debug:
        if is_torch_xla_available():
            xm.master_print(met.metrics_report())

# 全局终止检查
    if self.control.should_training_stop:
        break

 

# 7. 加载最佳模型(如果配置),计算最终训练指标,释放资源并返回结果。

 

 

 

 

 

评估时的关键函数:evaluation_loop 代码,模型评估主函数

def evaluation_loop(
        self,
        dataloader: DataLoader,
        description:
str,
        prediction_loss_only: Optional[
bool] = None,
        ignore_keys: Optional[
list[str]] = None,
        metric_key_prefix:
str = "eval",
    ) -> EvalLoopOutput:

   
"""
    预测/评估循环,由 Trainer.evaluate() 和 Trainer.predict() 共享
    支持带标签或不带标签的评估场景
    """

  # 主评估循环:迭代处理每个数据批次
   
for step, inputs in enumerate(dataloader):
        # 更新已观察样本数
        observed_batch_size = find_batch_size(inputs)
       
if observed_batch_size is not None:
            observed_num_examples += observed_batch_size
            # 动态确定批次大小(适用于批次大小不固定的情况)
           
if batch_size is None:
                batch_size = observed_batch_size

# 执行预测步骤,获取损失、logits和标签

 losses, logits, labels = self.prediction_step(model, inputs, prediction_loss_only, ignore_keys=ignore_keys)

 

# 关于self.prediction_step函数,它根据不同情况计算并返回三个关键值:损失(loss)、对数几率(logits)和标签(labels)。has_labels:判断输入是否包含标签。

has_labels = False if len(self.label_names) == 0

self.label_names = default_label_names if self.args.label_names is None else self.args.label_names

 #也就是说当手动指定了label_names或者modelforward函数里有label这个参数的时候,has_labelsTrue

如果有标签,从输入中提取标签并移至 CPU。

#loss_without_labels为True:model的forward 方法有 return_loss=True 参数。

if has_labels or loss_without_labels:

    labels = nested_detach(tuple(inputs.get(name) for name in self.label_names))

# 如果有标签,调用self.compute_loss函数计算lossoutputs

 if has_labels or loss_without_labels :     

                    with self.compute_loss_context_manager():

                        loss, outputs = self.compute_loss(model, inputs, return_outputs=True)

#关于self.compute_loss函数,

def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):

在这个函数中调用了User-defined compute_loss用户自定义损失计算函数,self.compute_loss_func,并引入 了标签平滑self.label_smoother,并且如果inputs中没有label,那么model的输出字典中必须包含loss这个key,否则报错。

 

                    loss = loss.detach().mean()

 

                    if isinstance(outputs, dict):

                        logits = tuple(v for k, v in outputs.items() if k not in ignore_keys + ["loss"])

                    else:

                        logits = outputs[1:]

else:        #如果没有标签label,那么 直接运行模型前向传播

                    loss = None

                    with self.compute_loss_context_manager():

                        outputs = model(**inputs)                     if isinstance(outputs, dict):

                        logits = tuple(v for k, v in outputs.items() if k not in ignore_keys)

                    else:

                        logits = outputs

                    # TODO: this needs to be fixed and made cleaner later.

                    if self.args.past_index >= 0:

                        self._past = outputs[self.args.past_index - 1]

 

 

 

 


        main_input_name =
getattr(self.model, "main_input_name", "input_ids")
        inputs_decode = (
            self._prepare_input(inputs[main_input_name])
if "inputs" in args.include_for_metrics else None
        )

#计算metric
   
if (
        self.compute_metrics
is not None
       
and all_preds is not None
       
and all_labels is not None
       
and not self.args.batch_eval_metrics
    ):
        eval_set_kwargs[
"losses"] = all_losses if "loss" in args.include_for_metrics else None
        eval_set_kwargs[
"inputs"] = all_inputs if "inputs" in args.include_for_metrics else None
        # 使用所有批次的合并结果计算指标,app_predsall_labels相当于是self.prediction_step输出的logits, labels,只不过把所有eval_batch合并了。
        metrics = self.compute_metrics(
            EvalPrediction(predictions=all_preds, label_ids=all_labels, **eval_set_kwargs)
        )

   
elif metrics is None:
        metrics = {}  # 初始化空指标字典

# 转换指标数据类型(确保可JSON序列化)
    metrics = denumpify_detensorize(metrics)

# 添加损失指标到结果中
   
if isinstance(all_losses, list) and all_losses:
        metrics[
f"{metric_key_prefix}_loss"] = np.concatenate(all_losses).mean().item()
   
elif isinstance(all_losses, np.ndarray):
        metrics[
f"{metric_key_prefix}_loss"] = all_losses.mean().item()
   
if hasattr(self, "jit_compilation_time"):
        metrics[
f"{metric_key_prefix}_jit_compilation_time"] = self.jit_compilation_time
   
if hasattr(self, "model_preparation_time"):
        metrics[
f"{metric_key_prefix}_model_preparation_time"] = self.model_preparation_time

# 为指标键添加前缀(如"eval_loss")
   
for key in list(metrics.keys()):
       
if not key.startswith(f"{metric_key_prefix}_"):
            metrics[
f"{metric_key_prefix}_{key}"] = metrics.pop(key)

# 返回评估结果(包含预测值、标签、指标和样本数)
   
return EvalLoopOutput(predictions=all_preds, label_ids=all_labels, metrics=metrics, num_samples=num_samples)

 

 

训练时的关键函数:training_step 代码,训练单个batch的函数

    核心计算部分:loss = self.compute_loss(model, inputs, num_items_in_batch=num_items_in_batch), 也就是计算loss

 

 

 

 

当需要在Trainer中使用自定义model

 When using it with your own model, make sure:

  • your model always return tuples or subclasses of ModelOutput
  • your model can compute the loss if a labels argument is provided and that loss is returned as the first element of the tuple (if your model returns tuples)
  • your model can accept multiple label arguments (use label_names in TrainingArguments to indicate their name to the Trainer) but none of them should be named "label"

 

 

 

已使用 OneNote 创建。