LLM 上层设施 trl 微调
本次的目的是使用微调技术增强模型的性能。
DataWhale 提供的代码推理使用了 vLLM 加速,这里没有使用。而且推理是使用简单的逐个推理,没有使用批量推理。因此速度会显著的慢一些。
Task 3 将使用 trl
微调来优化模型的表现。此外在推理时,我们还使用多路投票的方式来提高准确率。
Parameter Efficient Fine-Tuning
PEFT 是一种参数高效的微调方法,它通过在原始模型的基础上添加一个小的线性层来实现微调。这种方法的优势在于,它不需要重新训练整个模型,而只需要训练一个小的线性层,因此可以大大减少训练时间。
PEFT 有许多不同的注入方法和注入点。例如最常用的 LoRA 技术,即在两层之间插入一个小的线性层。
假设 是原本的网络(可以为一层或多层), 是新的线性层,那么 LoRA 的计算公式如下:
其中, 和 是可学习的参数, 是一个超参数。 和 都是不满秩的矩阵,这样一方面可以使得模型区提取新的特征而非完全覆盖原有的特征,另一方面可以减少参数量。
除了 LoRA 之外,还有许多的微调方式,这里仅举一例。具体请参考 Huggingface PEFT 的 conceptual guide 部分。
RLHF (Reinforcement Learning for Human Feedback)
RLHF 是一种基于强化学习的人类反馈微调方法。它通过与人类交互,根据人类的反馈来调整模型的参数。这种方法的优势在于,可以根据人类的反馈来调整模型的参数,从而更好地适应人类的需求。
RLHF 有三个阶段:
- SFT (Supervised Fine-Tuning) 阶段:在这个阶段,模型会根据人类提供的标签进行微调。这个阶段是我们通常的微调过程。
- Reward Learning 阶段:在这个阶段,模型会根据人类的反馈来学习奖励函数。这个阶段是 RLHF 的核心。不过,这个阶段通常是在产品上线后进行的,因为需要大量的人类反馈。
- PPO (Proximal Policy Optimization) 阶段:综合考虑 SFT 和 Reward Learning 阶段的结果,进一步微调模型。
做 demo 时,只要完成 SFT 阶段即可。
使用 trl 进行 SFT 微调
微调技术刚开始时,没有统一的标准,后来出现了peft
库,但是peft
库的使用还是比较复杂。后来出现了更加简单的trl
库,trl
库是基于peft
库的封装,使用起来更加简单。
我们同时还会使用 Accelerate 库来加速,可参考我之前的简介。
我先在自己的笔记本电脑上进行了开发,确保模型能跑通后再放到了服务器上。所以前半段使用的模型是 0.5B 的小模型,后半段使用的是 7B 的模型。
此外我同时参考了 Huggingface 的资料和 wandb 提供的资料。相比 Huggingface 的,wandb 提供的这一篇更加详细且具体。
准备工作
先从阿里竞赛主页获取训练集,然后解析。
from dataclasses import dataclass, field
@dataclass
class QuestionItem:
question: str
options: list[str]
answer: str | None = field(default=None)
@dataclass
class Entry:
id: str
problem: str = field(default="")
questions: list[QuestionItem] = field(default_factory=list)
def parse_file(file_path: str) -> list[Entry]:
with open(file_path, "r") as f:
lines = f.readlines()
entries = []
import json
for line in lines:
entry = json.loads(line)
questions = []
for question in entry["questions"]:
questions.append(QuestionItem(**question))
entries.append(Entry(problem=entry["problem"], questions=questions, id=entry["id"]))
return entries
加载模型,由于我们需要大量进行 tokenization,所以使用了 tokenizer 的 fast 版本,它与一般的 tokenizer 相比,速度更快。不过因为 fast tokenizer 有一个 rust 的 backend 绑定,所以一般的 tokenizer 被留了下来。不过一般直接用 fast tokenizer 就可以了。
model_path = "../Qwen2-0.5B"
from transformers import Qwen2ForCausalLM, Qwen2TokenizerFast
def get_model_and_tokenizer() -> tuple[Qwen2ForCausalLM, Qwen2TokenizerFast]:
model = Qwen2ForCausalLM.from_pretrained(model_path)
tokenizer = Qwen2TokenizerFast.from_pretrained(model_path)
tokenizer.pad_token = tokenizer.eos_token
return model, tokenizer
注意,qwen 模型的 tokenizer 没有 pad token,所以我们将 pad token 设置为 eos token。
数据集构建
由于我们的数据集比较复杂,因此要使用 python 代码构建数据集,而不能直接加载。如果有现成的数据集,可以这样加载。
dataset = load_dataset("json", data_files="path/to/dataset.jsonl", split="train")
split 可以是 train, validation, test 之一。
不过我们要手动构建,使用 from_dict
方法。构建的代码见后文。
推理过程生成
这里可以用通过告知模型的题目和答案要求给出推理过程,然后再将推理过程加入 completion。
实现如下,首先向 QuestionItem
对象添加一个reasoning: str | None = field(default=None)
,方便后续代码编写。然后我们写好推理生成的 chain。
def reasoning_generation_chain() -> Chain:
# from transformers import pipeline
# generator = pipeline("text-generation", model=model, tokenizer=tokenizer)
from langchain_community.llms.tongyi import Tongyi
model = Tongyi()
from langchain_huggingface import HuggingFacePipeline
prompt = ChatPromptTemplate.from_messages([
("system", '你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。我会向你提供答案,而你要给出逐步的解析来交会我如何得到答案。我是一个小学生,所以每一步的推理不要太复杂。'),
("user", """### 题目
{problem}
### 问题
{question}
{options}
### 答案
{answer}""")])
return prompt | model
然后生成推理过程。
def generate_reasoning_inplace(entries: list[Entry]) -> None:
chain = reasoning_generation_chain()
import time
def process_question(entry, question):
if question.reasoning is not None:
return
max_retries = 3
for attempt in range(max_retries):
try:
reasoning: str = chain.invoke({
"problem": entry.problem,
"question": question.question,
"options": format_options(question.options),
"answer": question.answer
})
question.reasoning = reasoning
break # Exit the loop if successful
except Exception as e:
if attempt < max_retries - 1:
time.sleep(1) # Optional: wait a bit before retrying
else:
print(f"Failed to process question.")
print(str(e))
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for entry in tqdm(entries):
for question in entry.questions:
futures.append(executor.submit(process_question, entry, question))
for future in tqdm(futures):
future.result()
然后再给构建的数据集加上推理过程。
def generate_reasoning_inplace(entries: list[Entry]) -> None:
chain = reasoning_generation_chain()
import time
def process_question(entry, question):
if question.reasoning is not None:
return
max_retries = 3
for attempt in range(max_retries):
try:
reasoning: str = chain.invoke({
"problem": entry.problem,
"question": question.question,
"options": format_options(question.options),
"answer": question.answer
})
question.reasoning = reasoning
break # Exit the loop if successful
except Exception as e:
if attempt < max_retries - 1:
time.sleep(1) # Optional: wait a bit before retrying
else:
print(f"Failed to process question.")
print(str(e))
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for entry in tqdm(entries):
for question in entry.questions:
futures.append(executor.submit(process_question, entry, question))
for future in tqdm(futures):
future.result()
这一步当然也可以用本地的模型完成。
这样,我们就构建好了数据集。
我们把数据集保存到本地,以便后续使用。
def create_train_dataset(entries: list[Entry]) -> Dataset:
dataset = []
for entry in entries:
for question in entry.questions:
if question.reasoning is None or question.answer is None:
continue
dataset.append({
"problem": entry.problem,
"question": question.question,
"options": format_options(question.options),
"reasoning": question.reasoning,
"answer": question.answer
})
return Dataset.from_list(dataset)
entries = parse_file("./round1_train_data.jsonl")
generate_reasoning_inplace(entries)
ds = create_train_dataset(entries)
ds.save_to_disk("./train_dataset")
之后就可以,
train_dataset = Dataset.load_from_disk("./train_dataset")
数据集 Encode
def encode(x):
head = r"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。如果你做对了这个题目,你会获得的一亿奖金。题目如下:"""
problem = x["problem"]
question = x["question"]
reasoning = x["reasoning"]
answer = x["answer"]
full_text = f"""{head}
### 题目
{problem}
### 问题
{question}
{format_options(x['options'])}
### 分析过程
{reasoning}
### 答案
答案是:{answer}"""
encoded = tokenizer(full_text + tokenizer.eos_token, max_length=max_new_tokens, truncation=True, pad_to_multiple_of=8)
return encoded
train_dataset = train_dataset.map(
lambda x: encode(x),
batched=False
).remove_columns(["prompt", "completion"])
有些模型不把原始列删除也可以,不过推荐删除,只保留input_ids
等直接输入模型等参数。
注意,我们使用的模型是 instruct 模型,instruct 模型在训练的数据集格式如下,
Some instruct
### Some Info
Text
### Other Info
Text
因此我们的格式和这个格式是一样的,这样模型会有更好的表现。
这一步和微调一起做。
微调配置
Peft 配置
这里使用 LoRA 的默认配置。
def get_peft_config() -> LoraConfig:
from peft.utils.constants import TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING
return LoraConfig(
task_type="casual_lm",
target_modules=TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING["qwen2"]
)
target_modules
是指定要注入的模块。Peft 里有一个专门的映射表,我们直接用就可以了。
对于其他的微调方法,如果有target_modules
,也可以用相应的映射表。
一般来说,这个注入的模块会放在前反馈层。
Wandb 配置(可选)
Wandb 的配置是用环境变量实现的。
如果不了解 Wandb,可参考我的这篇文章。
os.environ["WANDB_PROJECT"] = "fine-tune-logic-inference"
训练参数配置
直接配置即可,
def get_training_args() -> SFTConfig:
return SFTConfig(
report_to="wandb",
per_device_train_batch_size=micro_batch,
learning_rate=2e-4,
lr_scheduler_type="cosine",
num_train_epochs=3,
gradient_accumulation_steps=2,
output_dir=output_path,
bf16=True,
remove_unused_columns=False,
)
训练
首先我们用 Accelerator
进行加速,并省去张量和模型的位置问题。然后创建 trainer,填入参数即可。
def finetune():
from accelerate import Accelerator
acc = Accelerator()
from accelerate.accelerator import AcceleratorState
AcceleratorState().deepspeed_plugin.deepspeed_config['train_micro_batch_size_per_gpu'] = micro_batch
model, tokenizer = get_model_and_tokenizer()
train_dataset = Dataset.load_from_disk("./train_dataset")
def encode(x):
head = r"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。如果你做对了这个题目,你会获得的一亿奖金。题目如下:"""
problem = x["problem"]
question = x["question"]
reasoning = x["reasoning"]
answer = x["answer"]
full_text = f"""{head}
### 题目
{problem}
### 问题
{question}
{format_options(x['options'])}
### 分析过程
{reasoning}
### 答案
答案是:{answer}"""
encoded = tokenizer(full_text + tokenizer.eos_token, max_length=max_new_tokens, truncation=True, pad_to_multiple_of=8)
return encoded
train_dataset = train_dataset.map(
lambda x: encode(x),
batched=False
).remove_columns(["prompt", "completion"])
model, train_dataset = acc.prepare(
model, train_dataset
)
peft_config = get_peft_config()
training_args = get_training_args()
trainer = SFTTrainer(
model=model,
args=training_args,
train_dataset=train_dataset,
tokenizer=tokenizer,
peft_config=peft_config,
)
trainer.train()
trainer.save_model(output_path)
这里配了 deepspeed stage 0,为了多卡训练,当然也可以不做。
Qwen 的 tokenizer 不自动加 eos token,所以我们在 encode 的时候手动加上。
推理
加载模型
def get_pipeline() -> TextGenerationPipeline:
import torch
generator: TextGenerationPipeline = pipeline("text-generation", model=model_path, tokenizer=model_path, device="cuda", torch_dtype=torch.bfloat16)
from peft.config import PeftConfig
conf = PeftConfig.from_pretrained(adapter_path)
generator.model.add_adapter(conf)
return generator
数据处理
import json
class EntryEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Entry):
return obj.__dict__
if isinstance(obj, QuestionItem):
return obj.__dict__
if isinstance(obj, str):
return obj
return super().default(obj)
@dataclass
class QuestionItem:
question: str
options: list[str]
answer: str | None = field(default=None)
@dataclass
class Entry:
problem: str = field(default="")
questions: list[QuestionItem] = field(default_factory=list)
id: str = field(default="")
def parse_file(file_path: str) -> list[Entry]:
with open(file_path, "r") as f:
lines = f.readlines()
entries = []
import json
for line in lines:
entry = json.loads(line)
questions = []
for question in entry["questions"]:
questions.append(QuestionItem(**question))
entries.append(Entry(problem=entry["problem"], questions=questions, id=entry["id"]))
return entries
def format_options(options):
return '\n'.join(
[
f'{chr(ord("A") + i)}: {option}'
for i, option in enumerate(options)
]
)
推理
这里在生成时指定num_sequence=voter
,来一次生成多个结果,然后再投票。
如果打开num_sequence
,需要开do_sample
或者开 beam search 等。
def generate_answer_inplace(entry: Entry, generator: TextGenerationPipeline):
for question in entry.questions:
got_answer = False
attempts = 2
while not got_answer and attempts > 0:
prompt_template = """你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。题目如下:
### 题目:
{problem}
### 问题:
{question}
{options}
### 分析过程
"""
prompt = prompt_template.format(
**{
"problem": entry.problem,
"question": question.question,
"options": format_options(question.options)
}
)
voter = 3
result = generator(prompt, max_new_tokens=max_new_tokens, truncation=True, num_return_sequences=voter, do_sample=True)
# answer = result[0]["generated_text"].split("答案是:")[-1].strip()
counter = {}
with open("a.txt", "a") as f:
f.write("\n" + prompt + "\n" + "=" * 10)
for i in range(voter):
raw_response = result[i]["generated_text"]
raw_response = raw_response.removeprefix(prompt)
raw_response = raw_response.split("---")[0]
raw_response = raw_response.split("### 题目")[0]
raw_response = raw_response.split("### 问题")[0]
import re
# print("=" * 10)
# print(raw_response)
# print("=" * 10)
with open("a.txt", "a") as f:
f.write(f"\n voter {i} {'=' * 10}\n{raw_response}")
match = re.search(r"答案是:[A-Z]", raw_response)
if match:
answer = match.group()
answer = answer.split(":")[-1]
else:
answer = None
counter[answer] = counter.get(answer, 0) + 1
answer = max(counter, key=counter.get)
with open("a.txt", "a") as f:
f.write(str(counter) + "\n" + "=" * 10)
if answer:
question.answer = answer.split(":")[-1]
got_answer = True
else:
print("Failed to find answer in response.")
print("Setting answer to C.")
question.answer = "C"
attempts -= 1