import json
import pandas as pd
import torch
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq
import os
import swanlab

# 设置 SwanLab 项目名称
os.environ["SWANLAB_PROJECT"] = "qwen3-sft-dialog"

# 定义提示(PROMPT)和最大序列长度
PROMPT = "你是一个对话助手,你需要根据用户的问题,给出相应的回答。"
MAX_LENGTH = 2048

# 更新 SwanLab 配置
swanlab.config.update({
    "model": "Qwen/Qwen3-0.5B",
    "prompt": PROMPT,
    "data_max_length": MAX_LENGTH,
})

# 数据集格式转换函数
def dataset_jsonl_transfer(origin_path, new_path):
    messages = []
    with open(origin_path, "r") as file:
        for line in file:
            data = json.loads(line)
            message = {
                "instruction": PROMPT,
                "input": data["question"],
                "output": data["answer"],
            }
            messages.append(message)
    with open(new_path, "w", encoding="utf-8") as file:
        for message in messages:
            file.write(json.dumps(message, ensure_ascii=False) + "\n")

# 数据预处理函数
def process_func(example):
    input_ids, attention_mask, labels = [], [], []
    instruction = tokenizer(
        f"<|im_start|>system\n{PROMPT}<|im_end|>\n<|im_start|>user\n{example['input']}<|im_end|>\n<|im_start|>assistant\n",
        add_special_tokens=False,
    )
    response = tokenizer(f"{example['output']}", add_special_tokens=False)
    input_ids = instruction["input_ids"] + response["input_ids"] + [tokenizer.pad_token_id]
    attention_mask = instruction["attention_mask"] + response["attention_mask"] + [1]
    labels = [-100] * len(instruction["input_ids"]) + response["input_ids"] + [tokenizer.pad_token_id]
    if len(input_ids) > MAX_LENGTH:
        input_ids = input_ids[:MAX_LENGTH]
        attention_mask = attention_mask[:MAX_LENGTH]
        labels = labels[:MAX_LENGTH]
    return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}

# 推理函数
def predict(messages, model, tokenizer):
    device = "cuda"
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    model_inputs = tokenizer([text], return_tensors="pt").to(device)
    generated_ids = model.generate(
        model_inputs.input_ids,
        max_new_tokens=MAX_LENGTH,
    )
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return response

# 模型路径
model_dir = "/tmp/workspace/model/.cache/huggingface/download/naive"

# 加载 tokenizer 和模型
tokenizer = AutoTokenizer.from_pretrained(model_dir, use_fast=False, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto", torch_dtype=torch.bfloat16)
model.enable_input_require_grads()  # 开启梯度检查点支持

# 数据集路径
train_dataset_path = "/tmp/workspace/RussianEnglishDialogue/Dataset/format/train.jsonl"
test_dataset_path = "/tmp/workspace/RussianEnglishDialogue/Dataset/format/val.jsonl"
train_jsonl_new_path = "/tmp/workspace/RussianEnglishDialogue/Dataset/format/train_format.jsonl"
test_jsonl_new_path = "/tmp/workspace/RussianEnglishDialogue/Dataset/format/val_format.jsonl"

# 转换数据集格式
if not os.path.exists(train_jsonl_new_path):
    dataset_jsonl_transfer(train_dataset_path, train_jsonl_new_path)
if not os.path.exists(test_jsonl_new_path):
    dataset_jsonl_transfer(test_dataset_path, test_jsonl_new_path)

# 加载并处理训练集
train_df = pd.read_json(train_jsonl_new_path, lines=True)
train_ds = Dataset.from_pandas(train_df)
train_dataset = train_ds.map(process_func, remove_columns=train_ds.column_names)

# 加载并处理验证集
eval_df = pd.read_json(test_jsonl_new_path, lines=True)
eval_ds = Dataset.from_pandas(eval_df)
eval_dataset = eval_ds.map(process_func, remove_columns=eval_ds.column_names)

# 设置训练参数
args = TrainingArguments(
    output_dir="/root/autodl-tmp/output/Qwen3-0.5B",
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    gradient_accumulation_steps=4,
    eval_strategy="steps",
    eval_steps=100,
    logging_steps=10,
    num_train_epochs=2,
    save_steps=400,
    learning_rate=1e-4,
    save_on_each_node=True,
    gradient_checkpointing=True,
    report_to="swanlab",
    run_name="qwen3-0.5B",
)

# 初始化 Trainer
trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer, padding=True),
)

# 开始训练
trainer.train()

# 测试模型输出
test_df = pd.read_json(test_jsonl_new_path, lines=True)[:3]
test_text_list = []
for index, row in test_df.iterrows():
    instruction = row['instruction']
    input_value = row['input']
    messages = [
        {"role": "system", "content": f"{instruction}"},
        {"role": "user", "content": f"{input_value}"}
    ]
    response = predict(messages, model, tokenizer)
    response_text = f"""
    Question: {input_value}

    LLM:{response}
    """
    test_text_list.append(swanlab.Text(response_text))
    print(response_text)

# 记录测试结果并结束实验
swanlab.log({"Prediction": test_text_list})
swanlab.finish()

解释:


1. 基本环境与依赖导入

import json
import pandas as pd
import torch
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer, DataCollatorForSeq2Seq
import os
import swanlab

2. 设置 SwanLab 项目与全局配置

# 设置 SwanLab 项目名称
os.environ["SWANLAB_PROJECT"] = "qwen3-sft-dialog"
# 定义提示(PROMPT)和最大序列长度
PROMPT = "你是一个对话助手,你需要根据用户的问题,给出相应的回答。"
MAX_LENGTH = 2048

# 更新 SwanLab 配置
swanlab.config.update({
    "model": "Qwen/Qwen3-0.5B",
    "prompt": PROMPT,
    "data_max_length": MAX_LENGTH,
})

3. 数据集格式转换

3.1 原始数据假设

3.2 转换成带 “instruction/input/output” 的格式

def dataset_jsonl_transfer(origin_path, new_path):
    messages = []
    with open(origin_path, "r") as file:
        for line in file:
            data = json.loads(line)
            message = {
                "instruction": PROMPT,
                "input": data["question"],
                "output": data["answer"],
            }
            messages.append(message)
    with open(new_path, "w", encoding="utf-8") as file:
        for message in messages:
            file.write(json.dumps(message, ensure_ascii=False) + "\n")

4. 数据预处理函数(Tokenize & 构造 labels)

在对话或 SFT(Supervised Fine-Tuning)场景下,需要手动拼接“提示”“用户输入”“模型输出”三部分,并生成 input_ids, attention_mask, labels。labels 的构造方式是让模型只惩罚(loss)属于“回答”部分,而不惩罚“提示+用户输入”那段。

def process_func(example):
    input_ids, attention_mask, labels = [], [], []
    instruction = tokenizer(
        f"<|im_start|>system\n{PROMPT}<|im_end|>\n"
        f"<|im_start|>user\n{example['input']}<|im_end|>\n"
        f"<|im_start|>assistant\n",
        add_special_tokens=False,
    )
    response = tokenizer(f"{example['output']}", add_special_tokens=False)

    # 把 instruction 和 response 的 ids 拼接起来,末尾多一个 pad_token_id,用于强制生成结束
    input_ids = instruction["input_ids"] + response["input_ids"] + [tokenizer.pad_token_id]
    attention_mask = instruction["attention_mask"] + response["attention_mask"] + [1]

    # labels:前面 instruction 的部分都标成 -100(表示这个位置的 token 不计算 loss),
    # 后面才是真正要让模型去预测的回复 token,最后一位 pad_token_id 也参与计算(可以算作一个结束标记)。
    labels = [-100] * len(instruction["input_ids"]) + response["input_ids"] + [tokenizer.pad_token_id]

    # 如果长度超过了 MAX_LENGTH,就进行截断
    if len(input_ids) > MAX_LENGTH:
        input_ids = input_ids[:MAX_LENGTH]
        attention_mask = attention_mask[:MAX_LENGTH]
        labels = labels[:MAX_LENGTH]
    return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}

5. 推理(Inference)函数

def predict(messages, model, tokenizer):
    device = "cuda"
    text = tokenizer.apply_chat_template(
        messages,
        tokenize=False,
        add_generation_prompt=True
    )
    model_inputs = tokenizer([text], return_tensors="pt").to(device)
    generated_ids = model.generate(
        model_inputs.input_ids,
        max_new_tokens=MAX_LENGTH,
    )
    # 这里去掉输入部分,只保留模型“新生成的” token
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    return response

6. 预训练模型加载:model_dir

model_dir = "/tmp/workspace/model/.cache/huggingface/download/naive"

# 加载 tokenizer 和模型
tokenizer = AutoTokenizer.from_pretrained(model_dir, use_fast=False, trust_remote_code=True)
model = AutoModelForCausalLM.from_pretrained(model_dir, device_map="auto", torch_dtype=torch.bfloat16)
model.enable_input_require_grads()  # 开启梯度检查点支持

7. 构建 Dataset 对象

# --- 训练集 ---
train_df = pd.read_json(train_jsonl_new_path, lines=True)
train_ds = Dataset.from_pandas(train_df)
train_dataset = train_ds.map(process_func, remove_columns=train_ds.column_names)

# --- 验证集 ---
eval_df = pd.read_json(test_jsonl_new_path, lines=True)
eval_ds = Dataset.from_pandas(eval_df)
eval_dataset = eval_ds.map(process_func, remove_columns=eval_ds.column_names)

8. 设置训练参数(TrainingArguments

args = TrainingArguments(
    output_dir="/root/autodl-tmp/output/Qwen3-0.5B",
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    gradient_accumulation_steps=4,
    eval_strategy="steps",
    eval_steps=100,
    logging_steps=10,
    num_train_epochs=2,
    save_steps=400,
    learning_rate=1e-4,
    save_on_each_node=True,
    gradient_checkpointing=True,
    report_to="swanlab",
    run_name="qwen3-0.5B",
)

重点参数说明:


9. 初始化 Trainer 并开始训练

trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=DataCollatorForSeq2Seq(tokenizer=tokenizer, padding=True),
)

# 开始训练
trainer.train()

训练过程

  1. train_dataset 按照 per_device_train_batch_size=1 拆成一条条输入,4 步累积一次梯度。
  2. 每 100 步跑一次 eval_dataset 测试,并把结果打印出来。
  3. 每 400 步把当前 model state(包括模型权重、optimizer 状态、lr scheduler 状态等)保存到 output_dir/checkpoint-400/,以此类推。
  4. 2 个 epoch 结束后,trainer.train() 会把最终的模型(等同于 model.save_pretrained(output_dir))自动写到 output_dir,覆盖之前的权重文件。

10. 测试模型输出并将结果通过 SwanLab 上报

训练完成后,我们用同样的验证集前 3 条数据做一次简单的推理,看看模型的回答与真实答案有何差距,并把推理结果也上传到 SwanLab。

# 读取验证集前三条
test_df = pd.read_json(test_jsonl_new_path, lines=True)[:3]
test_text_list = []

for index, row in test_df.iterrows():
    instruction = row['instruction']
    input_value = row['input']
    messages = [
        {"role": "system",    "content": f"{instruction}"},
        {"role": "user",      "content": f"{input_value}"}
    ]
    response = predict(messages, model, tokenizer)
    response_text = f"""
    Question: {input_value}

    LLM:{response}
    """
    test_text_list.append(swanlab.Text(response_text))
    print(response_text)

11. “训练完的模型放在哪里了?”


12. 小结

  1. 代码流程

    • 设置环境(SwanLab、prompt、最大长度)
    • 把原始的 QA 数据转换成带 instruction / input / output 格式的 JSONL
    • 定义 process_func,拼接 system/user/assistant 三段,生成 input_ids, attention_mask, labels
    • 加载预训练模型和 tokenizer
    • Dataset.from_pandas + .map(process_func) 得到能用于 Hugging Face Trainer 的格式化数据集
    • 构造 TrainingArguments,指定 output_dir、训练超参、将日志上报给 SwanLab
    • 初始化 Trainer 并执行 .train(),训练完成后会把模型保存到 output_dir
    • 最后用前三条验证集数据跑一次 .generate(),把预测结果上传到 SwanLab
  2. 训练结束后模型存放的位置

    • 最终模型(以及所有中间 checkpoint)都存放在 TrainingArgumentsoutput_dir 指定的目录下。

    • 本例中是:

      /root/autodl-tmp/output/Qwen3-0.5B
      
    • 进入该文件夹后,你会看到 config.json, pytorch_model.bin, tokenizer_config.json, … 等一系列文件,以及若干 checkpoint-XXX 子文件夹。

只要在训练完毕后,通过文件系统浏览或脚本 ls /root/autodl-tmp/output/Qwen3-0.5B,就能确认模型确实保存在哪个子目录下。