如何不使用PEFT手写Lora微调
编辑
6
2025-03-31

如何不使用PEFT,手写Lora微调?
❝
前阵子的面试给了我很多启发,公司的业务为了效率都会套用框架,这是一件很简单的事情.但是会用不代表懂,你还真不一定理解内部运行原理.
我称其为
调参侠
哈哈.尽管很多时候,能用就行.但是算法工程师的基本素质还是需要具备的,这些内容在面试中经常涉及.或许某一天代码和你,总得跑一个.
讲点理论与思路
首先要实现模型训练,明确思路 数据使用优化器来更新模型参数
,必须要三个模块- 模型
、 数据
、 优化器
。
- 模型:由于我们使用lora训练模型,须知,原理是AB小矩阵模拟大矩阵W更新,那么就需要修改模型结构,给原有层的前向传播附加一条通路,且冻结原有模型参数,只对AB矩阵进行更新。
- 数据:明确输入与输出,使用分词器进行分词,目标数据集格式包含input_id、label、attention_mask。同时创建dataloader类用于数据一次一个batch传输。
- 优化器:确定好优化器,loss计算、参数优化。配置训练参数。如:batch_size,epoch等。
- 后续:需要进行模型的保存,如何将AB矩阵参数保存下来 模型,保存后的文件如何加载装配到base模型?怎么合并模型参数?
针对以上思路进行一步步构建代码!
模型(Model)
加载base模型
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = "/home/phb/phb/models/Qwen2.5-Coder-0.5B-Instruct"
base_model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
编写Lora层,以及包装Loramodel
此类构造了一个LoraLinear线性层,对AB矩阵、rank、alpha进行初始化,注意哦,只包含lora的单条通路,与前向传播。
class LoraLinear(nn.Module):
def __init__(self,in_features,out_features,rank,alpha):
super().__init__()
self.rank=rank
self.scaling=alpha/rank
self.lora_A=nn.Parameter(torch.zeros(in_features,rank))
self.lora_B=nn.Parameter(torch.zeros(rank,out_features))
nn.init.zeros_(self.lora_B)
nn.init.kaiming_uniform_(self.lora_A,a=math.sqrt(5))
def forward(self,x):
return (self.scaling*(x@self.lora_A@self.lora_B))
这一个类LoraLayer,我们需要把原有线性层修改为Lora层。所以类初始化传入了layer,rank、alpha。冻结原有线性层的参数(params),同时修改前向传播方法,给原线性层base_layer前向传播,加上了lora层的计算。
class LoraLayer(nn.Module):
def __init__(self,linear_layer,rank,alpha):
super().__init__()
self.in_features=linear_layer.in_features
self.out_features=linear_layer.out_features
self.base_layer=linear_layer
for param in self.base_layer.parameters():
param.requires_grad=False
self.lora=LoraLinear(self.in_features,self.out_features,rank,alpha)
def forward(self,x):
return self.base_layer(x)+self.lora(x)
最终我们需要替换原模型(base_model)所有目标层(target_module)。也就是包装成Loramodel。这里多出了一个参数taget_module。需要遍历模型所有层,判断是否是目标层,是则进行lora层包装替换。
class LoraModel(nn.Module):
def __init__(self,base_model,rank,alpha,target_modules=["q_proj","v_proj"]):
super().__init__()
self.base_model=base_model
for param in self.base_model.parameters():
param.requires_grad=False
self._replace_layers(target_modules,rank,alpha)
def _replace_layers(self,target_modules,rank,alpha):
for name,module in self.base_model.named_modules():
if isinstance(module,nn.Linear) and any(target in name for target in target_modules):
parent_name = '.'.join(name.split('.')[:-1])
child_name=name.split('.')[-1]
if parent_name:
parent = self.base_model.get_submodule(parent_name)
else:
parent = self.base_model
setattr(parent, child_name, LoraLayer(module, rank, alpha))
def forward(self,*args,**kwargs):
return self.base_model(*args,**kwargs)
Lora_model=LoraModel(base_model,rank=4,alpha=16,target_modules=["q_proj","v_proj"])
至此模型结构已经修改完毕,可训练参数可查
def get_parameter_percentage(model):
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
total = sum(p.numel() for p in model.parameters())
print(f"可训练参数: {trainable:,}")
print(f"总参数: {total:,}")
print(f"可训练参数占比: {100 * trainable / total:.2f}%")
可训练参数: 270,336
总参数: 494,303,104
可训练参数占比: 0.05%
数据(Datasets)
预处理数据
具体内部怎么处理字段就不写了,根据数据集来定。
def preprocess_function(examples):
xxx
xxx
xxx
return {
"input_ids": tokenized_inputs["input_ids"],
"attention_mask": tokenized_inputs["attention_mask"],
"labels": tokenized_outputs["input_ids"], # 模型需要 label
}
# 将数据转换为Dataset格式
dataset = datasets.Dataset.from_list(train_data)
tokenized_dataset = dataset.map(preprocess_function, batched=True).
最终的格式如下:
Dataset({
features: ['input_ids', 'attention_mask', 'labels'],
num_rows: 40
})
构造Dataloader
由于训练时,数据是一个批次进行计算的,所以不可以单纯用for,就有了loader
import random
class MyDataLoader:
def __init__(self, dataset, batch_size, shuffle=True):
self.dataset = dataset
self.batch_size = batch_size
self.shuffle = shuffle
self.index = 0
self.length = len(dataset)
self.indices = list(range(self.length))
random.shuffle(self.indices)
def __iter__(self):
self.index = 0
return self
def __next__(self):
if self.index >= self.length:
random.shuffle(self.indices)
raise StopIteration
# 获取当前batch的索引
batch_indices = self.indices[self.index:min(self.index + self.batch_size, self.length)]
# 构造batch数据
batch = {
'input_ids': torch.stack([torch.tensor(self.dataset[i]['input_ids']) for i in batch_indices]),
'attention_mask': torch.stack([torch.tensor(self.dataset[i]['attention_mask']) for i in batch_indices]),
'labels': torch.stack([torch.tensor(self.dataset[i]['labels']) for i in batch_indices])
}
self.index += self.batch_size
return batch
data_loader=MyDataLoader(tokenized_dataset,batch_size=2,shuffle=True)
优化器与训练配置
配置优化器,遍历可训练参数传入,转移模型权重,启动训练模式.
optimizer=torch.optim.AdamW(
[p for p in Lora_model.parameters() if p.requires_grad],
lr=2e-4
)
device=torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
Lora_model.to(device)
Lora_model.train()
前向传播,计算损失,反向传播,更新参数,重置梯度. 以此不断循环batch数据.
def train_lora(model,train_dataloader,nums_epochs):
for epoch in range(nums_epochs):
total_loss=0
for batch in data_loader:
input_ids=batch["input_ids"].to(device)
label=batch["labels"].to(device)
output=model(input_ids=input_ids,labels=label)
loss=output.loss
# causal llm前向传播自动计算交叉熵损失
loss.backward()
optimizer.step()
optimizer.zero_grad()
total_loss+=loss.item()
print(f"Epoch {epoch+1}, Average Loss: {total_loss/2}")
保存模型
同样遍历lora权重,字典保存.
def save_lora_weights(model, path):
"""保存LoRA权重"""
lora_state_dict = {}
for name, module in model.named_modules():
if isinstance(module, LoraLayer):
# 通过lora子模块访问权重
lora_A = module.lora.lora_A.data.cpu()
lora_B = module.lora.lora_B.data.cpu()
clean_name = name.replace('.base_layer', '')
lora_state_dict[f"{clean_name}.A"] = lora_A
lora_state_dict[f"{clean_name}.B"] = lora_B
torch.save(lora_state_dict, path)
save_lora_weights(Lora_model,"/home/phb/phb/project/cursor/handle/SFT/Lora/result/lora_weights.pt")
tokenizer.save_pretrained("/home/phb/phb/project/cursor/handle/SFT/Lora/result")
- 0
- 0
-
分享