LLM 上层设施 trl 微调
本次的目的是使用微调技术增强模型的性能。
DataWhale 提供的代码推理使用了 vLLM 加速,这里没有使用。而且推理是使用简单的逐个推理,没有使用批量推理。因此速度会显著的慢一些。
Task 3 将使用 trl 微调来优化模型的表现。此外在推理时,我们还使用多路投票的方式来提高准确率。
Parameter Efficient Fine-Tuning
PEFT 是一种参数高效的微调方法,它通过在原始模型的基础上添加一个小的线性层来实现微调。这种方法的优势在于,它不需要重新训练整个模型,而只需要训练一个小的线性层,因此可以大大减少训练时间。
PEFT 有许多不同的注入方法和注入点。例如最常用的 LoRA 技术,即在两层之间插入一个小的线性层。
假设 是原本的网络(可以为一层或多层), 是新的线性层,那么 LoRA 的计算公式如下:
其中, 和 是可学习的参数, 是一个超参数。 和 都是不满秩的矩阵,这样一方面可以使得模型区提取新的特征而非完全覆盖原有的特征,另一方面可以减少参数量。
除了 LoRA 之外,还有许多的微调方式,这里仅举一例。具体请参考 Huggingface PEFT 的 conceptual guide 部分。
RLHF (Reinforcement Learning for Human Feedback)
RLHF 是一种基于强化学习的人类反馈微调方法。它通过与人类交互,根据人类的反馈来调整模型的参数。这种方法的优势在于,可以根据人类的反馈来调整模型的参数,从而更好地适应人类的需求。
RLHF 有三个阶段:
- SFT (Supervised Fine-Tuning) 阶段:在这个阶段,模型会根据人类提供的标签进行微调。这个阶段是我们通常的微调过程。
- Reward Learning 阶段:在这个阶段,模型会根据人类的反馈来学习奖励函数。这个阶段是 RLHF 的核心。不过,这个阶段通常是在产品上线后进行的,因为需要大量的人类反馈。
- PPO (Proximal Policy Optimization) 阶段:综合考虑 SFT 和 Reward Learning 阶段的结果,进一步微调模型。
做 demo 时,只要完成 SFT 阶段即可。
使用 trl 进行 SFT 微调
微调技术刚开始时,没有统一的标准,后来出现了peft库,但是peft库的使用还是比较复杂。后来出现了更加简单的trl库,trl库是基于peft库的封装,使用起来更加简单。
我们同时还会使用 Accelerate 库来加速,可参考我之前的简介。
我先在自己的笔记本电脑上进行了开发,确保模型能跑通后再放到了服务器上。所以前半段使用的模型是 0.5B 的小模型,后半段使用的是 7B 的模型。
此外我同时参考了 Huggingface 的资料和 wandb 提供的资料。相比 Huggingface 的,wandb 提供的这一篇更加详细且具体。
准备工作
先从阿里竞赛主页获取训练集,然后解析。
from dataclasses import dataclass, field
@dataclass
class QuestionItem:
question: str
options: list[str]
answer: str | None = field(default=None)
@dataclass
class Entry:
id: str
problem: str = field(default="")
questions: list[QuestionItem] = field(default_factory=list)
def parse_file(file_path: str) -> list[Entry]:
with open(file_path, "r") as f:
lines = f.readlines()
entries = []
import json
for line in lines:
entry = json.loads(line)
questions = []
for question in entry["questions"]:
questions.append(QuestionItem(**question))
entries.append(Entry(problem=entry["problem"], questions=questions, id=entry["id"]))
return entries
加载模型,由于我们需要大量进行 tokenization,所以使用了 tokenizer 的 fast 版本,它与一般的 tokenizer 相比,速度更快。不过因为 fast tokenizer 有一个 rust 的 backend 绑定,所以一般的 tokenizer 被留了下来。不过一般直接用 fast tokenizer 就可以了。
model_path = "../Qwen2-0.5B"
from transformers import Qwen2ForCausalLM, Qwen2TokenizerFast
def get_model_and_tokenizer() -> tuple[Qwen2ForCausalLM, Qwen2TokenizerFast]:
model = Qwen2ForCausalLM.from_pretrained(model_path)
tokenizer = Qwen2TokenizerFast.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token
return model, tokenizer
注意,qwen 模型的 tokenizer 没有 pad token,所以我们将 pad token 设置为 eos token。
数据集构建
由于我们的数据集比较复杂,因此要使用 python 代码构建数据集,而不能直接加载。如果有现成的数据集,可以这样加载。
dataset = load_dataset("json", data_files="path/to/dataset.jsonl", split="train")
split 可以是 train, validation, test 之一。
不过我们要手动构建,使用 from_dict 方法。构建的代码见后文。
推理过程生成
这里可以用通过告知模型的题目和答案要求给出推理过程,然后再将推理过程加入 completion。
实现如下,首先向 QuestionItem 对象添加一个reasoning: str | None = field(default=None),方便后续代码编写。然后我们写好推理生成的 chain。
def reasoning_generation_chain() -> Chain:
# from transformers import pipeline
# generator = pipeline("text-generation", model=model, tokenizer=tokenizer)
from langchain_community.llms.tongyi import Tongyi
model = Tongyi()
from langchain_huggingface import HuggingFacePipeline
prompt = ChatPromptTemplate.from_messages([
("system", '你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。我会向你提供答案,而你要给出逐步的解析来交会我如何得到答案。我是一个小学生,所以每一步的推理不要太复杂。'),
("user", """### 题目
{problem}
### 问题
{question}
{options}
### 答案
{answer}""")])
return prompt | model
然后生成推理过程。
def generate_reasoning_inplace(entries: list[Entry]) -> None:
chain = reasoning_generation_chain()
import time
def process_question(entry, question):
if question.reasoning is not None:
return
max_retries = 3
for attempt in range(max_retries):
try:
reasoning: str = chain.invoke({
"problem": entry.problem,
"question": question.question,
"options": format_options(question.options),
"answer": question.answer
})
question.reasoning = reasoning
break # Exit the loop if successful
except Exception as e:
if attempt < max_retries - 1:
time.sleep(1) # Optional: wait a bit before retrying
else:
print(f"Failed to process question.")
print(str(e))
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for entry in tqdm(entries):
for question in entry.questions:
futures.append(executor.submit(process_question, entry, question))
for future in tqdm(futures):
future.result()
然后再给构建的数据集加上推理过程。
def generate_reasoning_inplace(entries: list[Entry]) -> None:
chain = reasoning_generation_chain()
import time
def process_question(entry, question):
if question.reasoning is not None:
return
max_retries = 3
for attempt in range(max_retries):
try:
reasoning: str = chain.invoke({
"problem": entry.problem,
"question": question.question,
"options": format_options(question.options),
"answer": question.answer
})
question.reasoning = reasoning
break # Exit the loop if successful
except Exception as e:
if attempt < max_retries - 1:
time.sleep(1) # Optional: wait a bit before retrying
else:
print(f"Failed to process question.")
print(str(e))
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for entry in tqdm(entries):
for question in entry.questions:
futures.append(executor.submit(process_question, entry, question))
for future in tqdm(futures):
future.result()
这一步当然也可以用本地的模型完成。
这样,我们就构建好了数据集。
我们把数据集保存到本地,以便后续使用。
def create_train_dataset(entries: list[Entry]) -> Dataset:
dataset = []
for entry in entries:
for question in entry.questions:
if question.reasoning is None or question.answer is None:
continue
dataset.append({
"problem": entry.problem,
"question": question.question,
"options": format_options(question.options),
"reasoning": question.reasoning,
"answer": question.answer
})
return Dataset.from_list(dataset)
entries = parse_file("./round1_train_data.jsonl")
generate_reasoning_inplace(entries)
ds = create_train_dataset(entries)
ds.save_to_disk("./train_dataset")
之后就可以,
train_dataset = Dataset.load_from_disk("./train_dataset")
数据集 Encode
def encode(x):
head = r"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。如果你做对了这个题目,你会获得的一亿奖金。题目如下:"""
problem = x["problem"]
question = x["question"]
reasoning = x["reasoning"]
answer = x["answer"]
full_text = f"""{head}
### 题目
{problem}
### 问题
{question}
{format_options(x['options'])}
### 分析过程
{reasoning}
### 答案
答案是:{answer}"""
encoded = tokenizer(full_text + tokenizer.eos_token, max_length=max_new_tokens, truncation=True, pad_to_multiple_of=8)
return encoded
train_dataset = train_dataset.map(
lambda x: encode(x),
batched=False
).remove_columns(["prompt", "completion"])
有些模型不把原始列删除也可以,不过推荐删除,只保留input_ids等直接输入模型等参数。
注意,我们使用的模型是 instruct 模型,instruct 模型在训练的数据集格式如下,
Some instruct
### Some Info
Text
### Other Info
Text
因此我们的格式和这个格式是一样的,这样模型会有更好的表现。
这一步和微调一起做。
微调配置
Peft 配置
这里使用 LoRA 的默认配置。
def get_peft_config() -> LoraConfig:
from peft.utils.constants import TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING
return LoraConfig(
task_type="casual_lm",
target_modules=TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING["qwen2"]
)
target_modules 是指定要注入的模块。Peft 里有一个专门的映射表,我们直接用就可以了。
对于其他的微调方法,如果有target_modules,也可以用相应的映射表。
一般来说,这个注入的模块会放在前反馈层。
Wandb 配置(可选)
Wandb 的配置是用环境变量实现的。
如果不了解 Wandb,可参考我的这篇文章。
os.environ["WANDB_PROJECT"] = "fine-tune-logic-inference"
训练参数配置
直接配置即可,
def get_training_args() -> SFTConfig:
return SFTConfig(
report_to="wandb",
per_device_train_batch_size=micro_batch,
learning_rate=2e-4,
lr_scheduler_type="cosine",
num_train_epochs=3,
gradient_accumulation_steps=2,
output_dir=output_path,
bf16=True,
remove_unused_columns=False,
)
训练
首先我们用 Accelerator 进行加速,并省去张量和模型的位置问题。然后创建 trainer,填入参数即可。
def finetune():
from accelerate import Accelerator
acc = Accelerator()
from accelerate.accelerator import AcceleratorState
AcceleratorState().deepspeed_plugin.deepspeed_config['train_micro_batch_size_per_gpu'] = micro_batch
model, tokenizer = get_model_and_tokenizer()
train_dataset = Dataset.load_from_disk("./train_dataset")
def encode(x):
head = r"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。如果你做对了这个题目,你会获得的一亿奖金。题目如下:"""
problem = x["problem"]
question = x["question"]
reasoning = x["reasoning"]
answer = x["answer"]
full_text = f"""{head}
### 题目
{problem}
### 问题
{question}
{format_options(x['options'])}
### 分析过程
{reasoning}
### 答案
答案是:{answer}"""
encoded = tokenizer(full_text + tokenizer.eos_token, max_length=max_new_tokens, truncation=True, pad_to_multiple_of=8)
return encoded
train_dataset = train_dataset.map(
lambda x: encode(x),
batched=False
).remove_columns(["prompt", "completion"])
model, train_dataset = acc.prepare(
model, train_dataset
)
peft_config = get_peft_config()
training_args = get_training_args()
trainer = SFTTrainer(
model=model,
args=training_args,
train_dataset=train_dataset,
tokenizer=tokenizer,
peft_config=peft_config,
)
trainer.train()
trainer.save_model(output_path)
这里配了 deepspeed stage 0,为了多卡训练,当然也可以不做。
Qwen 的 tokenizer 不自动加 eos token,所以我们在 encode 的时候手动加上。
推理
加载模型
def get_pipeline() -> TextGenerationPipeline:
import torch
generator: TextGenerationPipeline = pipeline("text-generation", model=model_path, tokenizer=model_path, device="cuda", torch_dtype=torch.bfloat16)
from peft.config import PeftConfig
conf = PeftConfig.from_pretrained(adapter_path)
generator.model.add_adapter(conf)
return generator
数据处理
import json
class EntryEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Entry):
return obj.__dict__
if isinstance(obj, QuestionItem):
return obj.__dict__
if isinstance(obj, str):
return obj
return super().default(obj)
@dataclass
class QuestionItem:
question: str
options: list[str]
answer: str | None = field(default=None)
@dataclass
class Entry:
problem: str = field(default="")
questions: list[QuestionItem] = field(default_factory=list)
id: str = field(default="")
def parse_file(file_path: str) -> list[Entry]:
with open(file_path, "r") as f:
lines = f.readlines()
entries = []
import json
for line in lines:
entry = json.loads(line)
questions = []
for question in entry["questions"]:
questions.append(QuestionItem(**question))
entries.append(Entry(problem=entry["problem"], questions=questions, id=entry["id"]))
return entries
def format_options(options):
return '\n'.join(
[
f'{chr(ord("A") + i)}: {option}'
for i, option in enumerate(options)
]
)
推理
这里在生成时指定num_sequence=voter,来一次生成多个结果,然后再投票。
如果打开num_sequence,需要开do_sample或者开 beam search 等。
def generate_answer_inplace(entry: Entry, generator: TextGenerationPipeline):
for question in entry.questions:
got_answer = False
attempts = 2
while not got_answer and attempts > 0:
prompt_template = """你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。题目如下:
### 题目:
{problem}
### 问题:
{question}
{options}
### 分析过程
"""
prompt = prompt_template.format(
**{
"problem": entry.problem,
"question": question.question,
"options": format_options(question.options)
}
)
voter = 3
result = generator(prompt, max_new_tokens=max_new_tokens, truncation=True, num_return_sequences=voter, do_sample=True)
# answer = result[0]["generated_text"].split("答案是:")[-1].strip()
counter = {}
with open("a.txt", "a") as f:
f.write("\n" + prompt + "\n" + "=" * 10)
for i in range(voter):
raw_response = result[i]["generated_text"]
raw_response = raw_response.removeprefix(prompt)
raw_response = raw_response.split("---")[0]
raw_response = raw_response.split("### 题目")[0]
raw_response = raw_response.split("### 问题")[0]
import re
# print("=" * 10)
# print(raw_response)
# print("=" * 10)
with open("a.txt", "a") as f:
f.write(f"\n voter {i} {'=' * 10}\n{raw_response}")
match = re.search(r"答案是:[A-Z]", raw_response)
if match:
answer = match.group()
answer = answer.split(":")[-1]
else:
answer = None
counter[answer] = counter.get(answer, 0) + 1
answer = max(counter, key=counter.get)
with open("a.txt", "a") as f:
f.write(str(counter) + "\n" + "=" * 10)
if answer:
question.answer = answer.split(":")[-1]
got_answer = True
else:
print("Failed to find answer in response.")
print("Setting answer to C.")
question.answer = "C"
attempts -= 1