波波算法笔记

Bob Peng

如何不使用PEFT手写Lora微调

2025-03-31
如何不使用PEFT手写Lora微调

如何不使用PEFT,手写Lora微调?

前阵子的面试给了我很多启发,公司的业务为了效率都会套用框架,这是一件很简单的事情.但是会用不代表懂,你还真不一定理解内部运行原理.

我称其为 调参侠 哈哈.尽管很多时候,能用就行.但是算法工程师的基本素质还是需要具备的,这些内容在面试中经常涉及.

或许某一天代码和你,总得跑一个.

讲点理论与思路

首先要实现模型训练,明确思路 数据使用优化器来更新模型参数 ,必须要三个模块- 模型 数据 优化器

  • 模型:由于我们使用lora训练模型,须知,原理是AB小矩阵模拟大矩阵W更新,那么就需要修改模型结构,给原有层的前向传播附加一条通路,且冻结原有模型参数,只对AB矩阵进行更新。
  • 数据:明确输入与输出,使用分词器进行分词,目标数据集格式包含input_id、label、attention_mask。同时创建dataloader类用于数据一次一个batch传输。
  • 优化器:确定好优化器,loss计算、参数优化。配置训练参数。如:batch_size,epoch等。
  • 后续:需要进行模型的保存,如何将AB矩阵参数保存下来 模型,保存后的文件如何加载装配到base模型?怎么合并模型参数?

针对以上思路进行一步步构建代码!

模型(Model)

加载base模型

import torch  
import torch.nn as nn  
import torch.nn.functional as F  
import math  
from transformers import AutoModelForCausalLM, AutoTokenizer  
  
  
  
model_name = "/home/phb/phb/models/Qwen2.5-Coder-0.5B-Instruct"  
  
base_model = AutoModelForCausalLM.from_pretrained(model_name)  
tokenizer = AutoTokenizer.from_pretrained(model_name)  

编写Lora层,以及包装Loramodel

此类构造了一个LoraLinear线性层,对AB矩阵、rank、alpha进行初始化,注意哦,只包含lora的单条通路,与前向传播。

class LoraLinear(nn.Module):  
    def __init__(self,in_features,out_features,rank,alpha):  
        super().__init__()  
        self.rank=rank  
        self.scaling=alpha/rank  
  
        self.lora_A=nn.Parameter(torch.zeros(in_features,rank))  
        self.lora_B=nn.Parameter(torch.zeros(rank,out_features))  
  
        nn.init.zeros_(self.lora_B)  
        nn.init.kaiming_uniform_(self.lora_A,a=math.sqrt(5))  
    def forward(self,x):  
        return (self.scaling*(x@self.lora_A@self.lora_B))  
      

这一个类LoraLayer,我们需要把原有线性层修改为Lora层。所以类初始化传入了layer,rank、alpha。冻结原有线性层的参数(params),同时修改前向传播方法,给原线性层base_layer前向传播,加上了lora层的计算。

class LoraLayer(nn.Module):  
    def __init__(self,linear_layer,rank,alpha):  
        super().__init__()  
        self.in_features=linear_layer.in_features  
        self.out_features=linear_layer.out_features  
        self.base_layer=linear_layer  
        for param in self.base_layer.parameters():  
            param.requires_grad=False  
        self.lora=LoraLinear(self.in_features,self.out_features,rank,alpha)  
  
    def forward(self,x):  
        return self.base_layer(x)+self.lora(x)  

最终我们需要替换原模型(base_model)所有目标层(target_module)。也就是包装成Loramodel。这里多出了一个参数taget_module。需要遍历模型所有层,判断是否是目标层,是则进行lora层包装替换。

class LoraModel(nn.Module):  
    def __init__(self,base_model,rank,alpha,target_modules=["q_proj","v_proj"]):  
        super().__init__()  
  
        self.base_model=base_model  
  
        for param in self.base_model.parameters():  
            param.requires_grad=False  
          
        self._replace_layers(target_modules,rank,alpha)  
  
    def _replace_layers(self,target_modules,rank,alpha):  
        for name,module in self.base_model.named_modules():  
            if isinstance(module,nn.Linear) and any(target in name for target in target_modules):  
                parent_name = '.'.join(name.split('.')[:-1])  
                child_name=name.split('.')[-1]  
                if parent_name:  
                    parent = self.base_model.get_submodule(parent_name)  
                else:  
                    parent = self.base_model  
  
                setattr(parent, child_name, LoraLayer(module, rank, alpha))  
              
    def forward(self,*args,**kwargs):  
        return self.base_model(*args,**kwargs)  



Lora_model=LoraModel(base_model,rank=4,alpha=16,target_modules=["q_proj","v_proj"])  

至此模型结构已经修改完毕,可训练参数可查

def get_parameter_percentage(model):  
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)  
    total = sum(p.numel() for p in model.parameters())  
      
    print(f"可训练参数: {trainable:,}")  
    print(f"总参数: {total:,}")  
    print(f"可训练参数占比: {100 * trainable / total:.2f}%")  



可训练参数: 270,336  
总参数: 494,303,104  
可训练参数占比: 0.05%  
  

数据(Datasets)

预处理数据

具体内部怎么处理字段就不写了,根据数据集来定。

def preprocess_function(examples):  
    xxx  
    xxx  
    xxx  
    return {  
        "input_ids": tokenized_inputs["input_ids"],  
        "attention_mask": tokenized_inputs["attention_mask"],  
        "labels": tokenized_outputs["input_ids"],  # 模型需要 label  
    }  
  
# 将数据转换为Dataset格式  
dataset = datasets.Dataset.from_list(train_data)  
tokenized_dataset = dataset.map(preprocess_function, batched=True).  

最终的格式如下:

Dataset({  
    features: ['input_ids', 'attention_mask', 'labels'],  
    num_rows: 40  
})  

构造Dataloader

由于训练时,数据是一个批次进行计算的,所以不可以单纯用for,就有了loader

import random  
class MyDataLoader:  
    def __init__(self, dataset, batch_size, shuffle=True):  
        self.dataset = dataset  
        self.batch_size = batch_size  
        self.shuffle = shuffle  
        self.index = 0  
        self.length = len(dataset)  
        self.indices = list(range(self.length))  
        random.shuffle(self.indices)  
      
    def __iter__(self):  
        self.index = 0  
        return self   
      
    def __next__(self):  
        if self.index >= self.length:  
  
            random.shuffle(self.indices)  
            raise StopIteration   
              
        # 获取当前batch的索引  
        batch_indices = self.indices[self.index:min(self.index + self.batch_size, self.length)]  
          
        # 构造batch数据  
        batch = {  
            'input_ids': torch.stack([torch.tensor(self.dataset[i]['input_ids']) for i in batch_indices]),  
            'attention_mask': torch.stack([torch.tensor(self.dataset[i]['attention_mask']) for i in batch_indices]),  
            'labels': torch.stack([torch.tensor(self.dataset[i]['labels']) for i in batch_indices])  
        }  
          
        self.index += self.batch_size  
        return batch  
data_loader=MyDataLoader(tokenized_dataset,batch_size=2,shuffle=True)  

优化器与训练配置

配置优化器,遍历可训练参数传入,转移模型权重,启动训练模式.

optimizer=torch.optim.AdamW(  
    [p for p in Lora_model.parameters() if p.requires_grad],  
    lr=2e-4  
)  
device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  
Lora_model.to(device)  
Lora_model.train()  

前向传播,计算损失,反向传播,更新参数,重置梯度. 以此不断循环batch数据.

def train_lora(model,train_dataloader,nums_epochs):  
    for epoch in range(nums_epochs):  
        total_loss=0  
        for batch in data_loader:  
            input_ids=batch["input_ids"].to(device)  
            label=batch["labels"].to(device)  
  
            output=model(input_ids=input_ids,labels=label)  
            loss=output.loss  
            # causal llm前向传播自动计算交叉熵损失  
  
            loss.backward()  
            optimizer.step()  
            optimizer.zero_grad()  
  
            total_loss+=loss.item()  
  
        print(f"Epoch {epoch+1}, Average Loss: {total_loss/2}")  
  
  

保存模型

同样遍历lora权重,字典保存.

def save_lora_weights(model, path):  
    """保存LoRA权重"""  
    lora_state_dict = {}  
    for name, module in model.named_modules():  
        if isinstance(module, LoraLayer):  
            # 通过lora子模块访问权重  
            lora_A = module.lora.lora_A.data.cpu()  
            lora_B = module.lora.lora_B.data.cpu()  
            clean_name = name.replace('.base_layer', '')   
            lora_state_dict[f"{clean_name}.A"] = lora_A  
            lora_state_dict[f"{clean_name}.B"] = lora_B  
    torch.save(lora_state_dict, path)  



save_lora_weights(Lora_model,"/home/phb/phb/project/cursor/handle/SFT/Lora/result/lora_weights.pt")  
tokenizer.save_pretrained("/home/phb/phb/project/cursor/handle/SFT/Lora/result")