인공지능/자연어 처리

vllm 통해 reasoning path 데이터 만들기

이게될까 2025. 3. 24. 22:11
728x90
728x90

지금 데이터를 늘리는 작업을 진행해서... 

문서와 질문 그리고 정답을 통해 정답이 추론되는 과정을 만들려고 합니다.

import jsonlines
import json
import time
from typing import Any, Optional

import torch
from transformers import AutoTokenizer  # AutoModelForCausalLM 사용 안 함

# vllm 임포트 (원래 주석처리 되어있던 부분을 활성화)
from vllm import LLM, SamplingParams

# from huggingface_hub import login
# login("만약 access 필요한 모델이면, 토큰 발급받고 여기에 입력하삼!")

import base64
import time
from typing import Any
from typing import Optional

import openai
from openai import OpenAI
from multiprocessing.pool import ThreadPool
from tqdm import tqdm

import os

from setproctitle import setproctitle
setproctitle("")
os.environ["CUDA_VISIBLE_DEVICES"] = "0, 1, 2, 3"

각종 임포트와 GPU 몇 개 쓴느지 작성하기... 

서버 사용하니 내가 뭐 하는지도 작성해주고요

 


class HuggingFaceChatCompletionSampler():
    """
    기존 HuggingFace 모델을 사용했던 생성 부분을 vllm 기반으로 변경한 클래스입니다.
    이제 vllm의 LLM.generate()를 사용하여 응답을 생성합니다.
    """
    def __init__(
        self,
        model: str = "LGAI-EXAONE/EXAONE-3.5-32B-Instruct", 
        max_new_tokens: int = 1024,
    ):
        self.model_name = model
        self.max_new_tokens = max_new_tokens
        self.temperature = 0.6

        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name, use_fast=False)
        self.llm = LLM(self.model_name, tensor_parallel_size=1,  device="cuda")

    def _handle_text(self, text: str):
        return {"type": "text", "text": text}

    def _pack_message(self, role: str, content: Any):
        return {"role": str(role), "content": content}

    def __call__(self, message_list) -> list[str]:

        sampling_params = SamplingParams(
            max_tokens=self.max_new_tokens,
            temperature=self.temperature,
            top_p  = 0.9
            #stop = ["</think>","정답: ", "질문: ", "문서: ", "# 추론 과정: ", "# 예시: "] # 여긴 이전 모델에서 너무 길게 만들어서 냅뒀는데 이번엔 지웠습니다.
        )

        responses = self.llm.generate(message_list, sampling_params)
        #print(responses)
        return [response.outputs[0].text for response in responses]

vllm을 통해서 생성을 진행합니다.

이게 엄청 빠르더라고요...

 

def load_jsonl(file_path):
    data = []
    with jsonlines.open(file_path, mode='r') as reader:
        for obj in reader:
            data.append(obj)
    return data


def save_jsonl(data, file_path):
    """
    Save a list of dictionaries to a file in JSON Lines format.

    :param data: List of dictionaries to be saved.
    :param file_path: The path to the file where data will be written.
    """
    with open(file_path, 'w', encoding='utf-8') as f:
        for entry in data:
            json_line = json.dumps(entry, ensure_ascii=False)
            f.write(json_line + '\n')

여긴 기본적인 저장과 읽기 입니다.

 

def process_file(file_path, output_dir, model_name, multithreading=False):
    data = load_jsonl(file_path)

    message_list = [
        f"# 질문: {row['messages_ko']}\n"
        f"# 문서: {row['document_ko']}\n"
        f"# 정답: {row['answers_ko']}\n[|assistant|]"
        for row in data
    ]
    
    results = sampler(message_list)
    
    os.makedirs(output_dir, exist_ok=True)

    for row, result in zip(data, results):
        row['model'] = result

    output_file_path = os.path.join(output_dir, f'all_{model_name.split("/")[-1]}.jsonl')
    save_jsonl(data, output_file_path)
    print(f"Results saved to {output_file_path}")

이제 여기서 data 쌓아서 주면 알아서 vllm에서 배치로 진행합니다.

 

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Run chat completion with a specified model.")
    parser.add_argument('--model', type=str, required=True, help='Model name to use for completion')
    args = parser.parse_args()

    sampler = HuggingFaceChatCompletionSampler(model=args.model)

    directories = []  # ✅ 올바른 파일 경로 유지
    for file_path in directories:
        output_dir = os.path.join('results', os.path.basename(file_path).split('.')[0]) 
        process_file(file_path, output_dir, args.model, multithreading=False)

 

이제 메인 함수 입니다.

 

이렇게 보면 별 거 없는데...

 

이제 멀티턴은 머리가 아파서 GPT 시켜봤습니다.

def process_file(file_path, output_dir, model_name, multithreading=False):
    data = load_jsonl(file_path)
    prompts = []
    prompt_meta = []

    for row_idx, row in enumerate(data):
        messages_ko = row["messages_ko"]
        messages_en = row.get("messages_en", None)
        document = row["document_ko"]
        answers_ko = row["answers_ko"]

        updated_messages_ko = []
        i = 0
        while i < len(messages_ko):
            user_msg = messages_ko[i]
            updated_messages_ko.append({"content": user_msg["content"], "role": "user"})

            if i + 1 < len(messages_ko) and messages_ko[i + 1]["role"] == "assistant":
                gold_answer = messages_ko[i + 1]["content"]

                # reasoning 생성용 prompt 구성
                history = ""
                for m in updated_messages_ko:
                    prefix = "[|user|]" if m["role"] == "user" else "[|assistant|]"
                    history += f"{prefix}{m['content']}\n"

                prompt = (
                    "[|system|]You are EXAONE model from LG AI Research, a helpful assistant.[|endofturn|]\n"
                    f"{history}"
                    f"# 질문: {user_msg['content']}\n"
                    f"# 문서: {document}\n"
                    f"# 정답: {gold_answer}\n[|assistant|]"
                )

                prompts.append(prompt)
                prompt_meta.append((row_idx, len(updated_messages_ko)))  # assistant 위치 저장
                updated_messages_ko.append({"content": "", "role": "assistant"})  # 빈 자리
                i += 2
            else:
                i += 1

        # 마지막 user 질문 → answers_ko로 reasoning 생성
        last_user_msg = messages_ko[-1]["content"]
        history = ""
        for m in updated_messages_ko:
            prefix = "[|user|]" if m["role"] == "user" else "[|assistant|]"
            history += f"{prefix}{m['content']}\n"

        final_prompt = (
            "[|system|]You are EXAONE model from LG AI Research, a helpful assistant.[|endofturn|]\n"
            f"{history}"
            f"# 질문: {last_user_msg}\n"
            f"# 문서: {document}\n"
            f"# 정답: {answers_ko}\n[|assistant|]"
        )
        prompts.append(final_prompt)
        prompt_meta.append((row_idx, "final"))

        # 반영
        row["messages_ko"] = updated_messages_ko
        data[row_idx] = row

    # 모델 추론
    generations = sampler(prompts)

    # 결과 반영
    for gen, (row_idx, loc) in zip(generations, prompt_meta):
        if loc == "final":
            data[row_idx]["model"] = gen
        else:
            data[row_idx]["messages_ko"][loc]["content"] = gen

    # 저장
    os.makedirs(output_dir, exist_ok=True)
    save_path = os.path.join(output_dir, f"multi_turn_{model_name.split('/')[-1]}.jsonl")
    save_jsonl(data, save_path)
    print(f"✅ 저장 완료: {save_path}")

 

 

이렇게 진행됩니다!

728x90