LLM 上层设施 trl 微调

本次的目的是使用微调技术增强模型的性能。

DataWhale 提供的代码推理使用了 vLLM 加速,这里没有使用。而且推理是使用简单的逐个推理,没有使用批量推理。因此速度会显著的慢一些。

Task 3 将使用 trl 微调来优化模型的表现。此外在推理时,我们还使用多路投票的方式来提高准确率。

Parameter Efficient Fine-Tuning

PEFT 是一种参数高效的微调方法,它通过在原始模型的基础上添加一个小的线性层来实现微调。这种方法的优势在于,它不需要重新训练整个模型,而只需要训练一个小的线性层,因此可以大大减少训练时间。

PEFT 有许多不同的注入方法和注入点。例如最常用的 LoRA 技术,即在两层之间插入一个小的线性层。

假设 是原本的网络(可以为一层或多层), 是新的线性层,那么 LoRA 的计算公式如下:

其中, 是可学习的参数, 是一个超参数。 都是不满秩的矩阵,这样一方面可以使得模型区提取新的特征而非完全覆盖原有的特征,另一方面可以减少参数量。

除了 LoRA 之外,还有许多的微调方式,这里仅举一例。具体请参考 Huggingface PEFT 的 conceptual guide 部分

RLHF (Reinforcement Learning for Human Feedback)

RLHF 是一种基于强化学习的人类反馈微调方法。它通过与人类交互,根据人类的反馈来调整模型的参数。这种方法的优势在于,可以根据人类的反馈来调整模型的参数,从而更好地适应人类的需求。

RLHF 有三个阶段:

  1. SFT (Supervised Fine-Tuning) 阶段:在这个阶段,模型会根据人类提供的标签进行微调。这个阶段是我们通常的微调过程。
  2. Reward Learning 阶段:在这个阶段,模型会根据人类的反馈来学习奖励函数。这个阶段是 RLHF 的核心。不过,这个阶段通常是在产品上线后进行的,因为需要大量的人类反馈。
  3. PPO (Proximal Policy Optimization) 阶段:综合考虑 SFT 和 Reward Learning 阶段的结果,进一步微调模型。

做 demo 时,只要完成 SFT 阶段即可。

使用 trl 进行 SFT 微调

微调技术刚开始时,没有统一的标准,后来出现了peft库,但是peft库的使用还是比较复杂。后来出现了更加简单的trl库,trl库是基于peft库的封装,使用起来更加简单。

我们同时还会使用 Accelerate 库来加速,可参考我之前的简介

我先在自己的笔记本电脑上进行了开发,确保模型能跑通后再放到了服务器上。所以前半段使用的模型是 0.5B 的小模型,后半段使用的是 7B 的模型。

此外我同时参考了 Huggingface 的资料和 wandb 提供的资料。相比 Huggingface 的,wandb 提供的这一篇更加详细且具体。

准备工作

先从阿里竞赛主页获取训练集,然后解析。

from dataclasses import dataclass, field

@dataclass
class QuestionItem:
    question: str
    options: list[str]
    answer: str | None = field(default=None)

@dataclass
class Entry:
    id: str
    problem: str = field(default="")
    questions: list[QuestionItem] = field(default_factory=list)

def parse_file(file_path: str) -> list[Entry]:
    with open(file_path, "r") as f:
        lines = f.readlines()
    entries = []
    import json
    for line in lines:
        entry = json.loads(line)
        questions = []
        for question in entry["questions"]:
            questions.append(QuestionItem(**question))
        entries.append(Entry(problem=entry["problem"], questions=questions, id=entry["id"]))
    return entries

加载模型,由于我们需要大量进行 tokenization,所以使用了 tokenizer 的 fast 版本,它与一般的 tokenizer 相比,速度更快。不过因为 fast tokenizer 有一个 rust 的 backend 绑定,所以一般的 tokenizer 被留了下来。不过一般直接用 fast tokenizer 就可以了。

model_path = "../Qwen2-0.5B"

from transformers import Qwen2ForCausalLM, Qwen2TokenizerFast

def get_model_and_tokenizer() -> tuple[Qwen2ForCausalLM, Qwen2TokenizerFast]:
    model = Qwen2ForCausalLM.from_pretrained(model_path)
    tokenizer = Qwen2TokenizerFast.from_pretrained(model_path)
    tokenizer.pad_token = tokenizer.eos_token
    return model, tokenizer

注意,qwen 模型的 tokenizer 没有 pad token,所以我们将 pad token 设置为 eos token。

数据集构建

由于我们的数据集比较复杂,因此要使用 python 代码构建数据集,而不能直接加载。如果有现成的数据集,可以这样加载。

dataset = load_dataset("json", data_files="path/to/dataset.jsonl", split="train")

split 可以是 train, validation, test 之一。

不过我们要手动构建,使用 from_dict 方法。构建的代码见后文。

推理过程生成

这里可以用通过告知模型的题目和答案要求给出推理过程,然后再将推理过程加入 completion。

实现如下,首先向 QuestionItem 对象添加一个reasoning: str | None = field(default=None),方便后续代码编写。然后我们写好推理生成的 chain。

def reasoning_generation_chain() -> Chain:
    # from transformers import pipeline
    # generator = pipeline("text-generation", model=model, tokenizer=tokenizer)
    from langchain_community.llms.tongyi import Tongyi
    model = Tongyi()
    from langchain_huggingface import HuggingFacePipeline
    prompt = ChatPromptTemplate.from_messages([
        ("system", '你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。我会向你提供答案,而你要给出逐步的解析来交会我如何得到答案。我是一个小学生,所以每一步的推理不要太复杂。'),
        ("user", """### 题目
{problem}

### 问题
{question}
{options}

### 答案
{answer}""")])
    return prompt | model

然后生成推理过程。

def generate_reasoning_inplace(entries: list[Entry]) -> None:
    chain = reasoning_generation_chain()
    import time
    def process_question(entry, question):
        if question.reasoning is not None:
            return
        max_retries = 3
        for attempt in range(max_retries):
            try:
                reasoning: str = chain.invoke({
                    "problem": entry.problem,
                    "question": question.question,
                    "options": format_options(question.options),
                    "answer": question.answer
                })
                question.reasoning = reasoning
                break  # Exit the loop if successful
            except Exception as e:
                if attempt < max_retries - 1:
                    time.sleep(1)  # Optional: wait a bit before retrying
                else:
                    print(f"Failed to process question.")
                    print(str(e))

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = []
        for entry in tqdm(entries):
            for question in entry.questions:
                futures.append(executor.submit(process_question, entry, question))

        for future in tqdm(futures):
            future.result()

然后再给构建的数据集加上推理过程。

def generate_reasoning_inplace(entries: list[Entry]) -> None:
    chain = reasoning_generation_chain()
    import time
    def process_question(entry, question):
        if question.reasoning is not None:
            return
        max_retries = 3
        for attempt in range(max_retries):
            try:
                reasoning: str = chain.invoke({
                    "problem": entry.problem,
                    "question": question.question,
                    "options": format_options(question.options),
                    "answer": question.answer
                })
                question.reasoning = reasoning
                break  # Exit the loop if successful
            except Exception as e:
                if attempt < max_retries - 1:
                    time.sleep(1)  # Optional: wait a bit before retrying
                else:
                    print(f"Failed to process question.")
                    print(str(e))

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = []
        for entry in tqdm(entries):
            for question in entry.questions:
                futures.append(executor.submit(process_question, entry, question))

        for future in tqdm(futures):
            future.result()

这一步当然也可以用本地的模型完成。

这样,我们就构建好了数据集。

我们把数据集保存到本地,以便后续使用。

def create_train_dataset(entries: list[Entry]) -> Dataset:
    dataset = []
    for entry in entries:
        for question in entry.questions:
            if question.reasoning is None or question.answer is None:
                continue
            dataset.append({
                "problem": entry.problem,
                "question": question.question,
                "options": format_options(question.options),
                "reasoning": question.reasoning,
                "answer": question.answer
            })
    return Dataset.from_list(dataset)

entries = parse_file("./round1_train_data.jsonl")
generate_reasoning_inplace(entries)
ds = create_train_dataset(entries)
ds.save_to_disk("./train_dataset")

之后就可以,

train_dataset = Dataset.load_from_disk("./train_dataset")

数据集 Encode

def encode(x):
        head = r"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。如果你做对了这个题目,你会获得的一亿奖金。题目如下:"""
        problem = x["problem"]
        question = x["question"]
        reasoning = x["reasoning"]
        answer = x["answer"]
        full_text = f"""{head}
### 题目        
{problem}

### 问题
{question}
{format_options(x['options'])}

### 分析过程
{reasoning}

### 答案
答案是:{answer}"""
        encoded = tokenizer(full_text + tokenizer.eos_token, max_length=max_new_tokens, truncation=True, pad_to_multiple_of=8)
        return encoded
    train_dataset = train_dataset.map(
        lambda x: encode(x),
        batched=False
    ).remove_columns(["prompt", "completion"])

有些模型不把原始列删除也可以,不过推荐删除,只保留input_ids等直接输入模型等参数。

注意,我们使用的模型是 instruct 模型,instruct 模型在训练的数据集格式如下,

Some instruct
### Some Info
Text

### Other Info
Text

因此我们的格式和这个格式是一样的,这样模型会有更好的表现。

这一步和微调一起做。

微调配置

Peft 配置

这里使用 LoRA 的默认配置。

def get_peft_config() -> LoraConfig:
    from peft.utils.constants import TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING
    return LoraConfig(
        task_type="casual_lm",
        target_modules=TRANSFORMERS_MODELS_TO_LORA_TARGET_MODULES_MAPPING["qwen2"]
    )

target_modules 是指定要注入的模块。Peft 里有一个专门的映射表,我们直接用就可以了。

对于其他的微调方法,如果有target_modules,也可以用相应的映射表。

一般来说,这个注入的模块会放在前反馈层。

Wandb 配置(可选)

Wandb 的配置是用环境变量实现的。

如果不了解 Wandb,可参考我的这篇文章

os.environ["WANDB_PROJECT"] = "fine-tune-logic-inference"

训练参数配置

直接配置即可,

def get_training_args() -> SFTConfig:
    return SFTConfig(
        report_to="wandb",
        per_device_train_batch_size=micro_batch,
        learning_rate=2e-4,
        lr_scheduler_type="cosine",
        num_train_epochs=3,
        gradient_accumulation_steps=2,
        output_dir=output_path,
        bf16=True,
        remove_unused_columns=False,
    )

训练

首先我们用 Accelerator 进行加速,并省去张量和模型的位置问题。然后创建 trainer,填入参数即可。

def finetune():
    from accelerate import Accelerator
    acc = Accelerator()
    from accelerate.accelerator import AcceleratorState
    AcceleratorState().deepspeed_plugin.deepspeed_config['train_micro_batch_size_per_gpu'] = micro_batch
    model, tokenizer = get_model_and_tokenizer()
    train_dataset = Dataset.load_from_disk("./train_dataset")
    def encode(x):
        head = r"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。如果你做对了这个题目,你会获得的一亿奖金。题目如下:"""
        problem = x["problem"]
        question = x["question"]
        reasoning = x["reasoning"]
        answer = x["answer"]
        full_text = f"""{head}
### 题目        
{problem}

### 问题
{question}
{format_options(x['options'])}

### 分析过程
{reasoning}

### 答案
答案是:{answer}"""
        encoded = tokenizer(full_text + tokenizer.eos_token, max_length=max_new_tokens, truncation=True, pad_to_multiple_of=8)
        return encoded
    train_dataset = train_dataset.map(
        lambda x: encode(x),
        batched=False
    ).remove_columns(["prompt", "completion"])
    model, train_dataset = acc.prepare(
        model, train_dataset
    )

    peft_config = get_peft_config()
    training_args = get_training_args()
    trainer = SFTTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        tokenizer=tokenizer,
        peft_config=peft_config,
    )
    trainer.train()
    trainer.save_model(output_path)

这里配了 deepspeed stage 0,为了多卡训练,当然也可以不做。

Qwen 的 tokenizer 不自动加 eos token,所以我们在 encode 的时候手动加上。

推理

加载模型

def get_pipeline() -> TextGenerationPipeline:
    import torch
    generator: TextGenerationPipeline =  pipeline("text-generation", model=model_path, tokenizer=model_path, device="cuda", torch_dtype=torch.bfloat16)
    from peft.config import PeftConfig
    conf = PeftConfig.from_pretrained(adapter_path)
    generator.model.add_adapter(conf)
    return generator

数据处理

import json

class EntryEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Entry):
            return obj.__dict__
        if isinstance(obj, QuestionItem):
            return obj.__dict__
        if isinstance(obj, str):
            return obj
        return super().default(obj)

@dataclass
class QuestionItem:
    question: str
    options: list[str]
    answer: str | None = field(default=None)

@dataclass
class Entry:
    problem: str = field(default="")
    questions: list[QuestionItem] = field(default_factory=list)
    id: str = field(default="")

def parse_file(file_path: str) -> list[Entry]:
    with open(file_path, "r") as f:
        lines = f.readlines()
    entries = []
    import json
    for line in lines:
        entry = json.loads(line)
        questions = []
        for question in entry["questions"]:
            questions.append(QuestionItem(**question))
        entries.append(Entry(problem=entry["problem"], questions=questions, id=entry["id"]))
    return entries

def format_options(options):
    return '\n'.join(
        [
            f'{chr(ord("A") + i)}: {option}'
            for i, option in enumerate(options)
        ]
    )

推理

这里在生成时指定num_sequence=voter,来一次生成多个结果,然后再投票。

如果打开num_sequence,需要开do_sample或者开 beam search 等。

def generate_answer_inplace(entry: Entry, generator: TextGenerationPipeline):
    for question in entry.questions:
        got_answer = False
        attempts = 2
        while not got_answer and attempts > 0:
            prompt_template = """你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。每个问题都保证能通过一系列基于形式逻辑的推理(包括同一律,矛盾律,排中律的使用等)得到确定的答案。请逐步分析问题,写出思考过程,并在最后一行输出答案,最后一行的格式为"答案是:A"或"答案是:B"或"答案是:C"或"答案是:D"等等。题目如下:
### 题目:
{problem}

### 问题:
{question}
{options}

### 分析过程
"""
            prompt = prompt_template.format(
                **{
                    "problem": entry.problem,
                    "question": question.question,
                    "options": format_options(question.options)
                }
            )
            voter = 3
            result = generator(prompt, max_new_tokens=max_new_tokens, truncation=True, num_return_sequences=voter, do_sample=True)
            # answer = result[0]["generated_text"].split("答案是:")[-1].strip()
            counter = {}
            with open("a.txt", "a") as f:
                f.write("\n" + prompt + "\n" + "=" * 10)
            for i in range(voter):
                raw_response = result[i]["generated_text"]
                raw_response = raw_response.removeprefix(prompt)
                raw_response = raw_response.split("---")[0]
                raw_response = raw_response.split("### 题目")[0]
                raw_response = raw_response.split("### 问题")[0]
                import re
                # print("=" * 10)
                # print(raw_response)
                # print("=" * 10)
                with open("a.txt", "a") as f:
                    f.write(f"\n voter {i}  {'=' * 10}\n{raw_response}")

                match = re.search(r"答案是:[A-Z]", raw_response)
                if match:
                    answer = match.group()
                    answer = answer.split(":")[-1]
                else:
                    answer = None
                counter[answer] = counter.get(answer, 0) + 1
            answer = max(counter, key=counter.get)
            with open("a.txt", "a") as f:
                f.write(str(counter) + "\n" + "=" * 10)
            if answer:
                question.answer = answer.split(":")[-1]
                got_answer = True
            else:
                print("Failed to find answer in response.")
                print("Setting answer to C.")
                question.answer = "C"
            attempts -= 1