인공지능/자연어 처리

자연어 처리 python 실습 - Neural Language model 구현

이게될까 2024. 3. 1. 18:49
728x90
728x90
# Neural Language Modeling

#네이버 영화 댓글 데이터를 이용해 Neural language model을 구현해보고, 구현한 Neural language model로 댓글 생성을 해본다.

from google.colab import drive
drive.mount('/content/drive')

base_path = "./drive/MyDrive/fastcampus/practice"

import pandas as pd

df = pd.read_csv(f"{base_path}/data/nsmc/ratings_train.txt", sep='\t')

# pos, neg 비율
df['label'].value_counts()

0    75173
1    74827
Name: label, dtype: int64

## Missing 데이터 확인

# missing doc
sum(df['document'].isnull()) # 5

df = df[~df['document'].isnull()]

sum(df['document'].isnull()) # 0

이번엔 캐릭터 단위 토큰화!

# Tokenization

list("안녕하세요 반갑습니다")

['안', '녕', '하', '세', '요', ' ', '반', '갑', '습', '니', '다']

# tokenization
vocab_cnt_dict = {} 
for doc in df['document']:
    for token in list(doc): # 음절 단위로 쪼개기
        if token not in vocab_cnt_dict:
            vocab_cnt_dict[token] = 0
        vocab_cnt_dict[token] += 1
        
vocab_cnt_list = [(token, cnt) for token, cnt in vocab_cnt_dict.items()]
vocab_cnt_list[:10]

[('아', 47071),
 (' ', 987741),
 ('더', 9863),
 ('빙', 738),
 ('.', 241461),
 ('진', 18522),
 ('짜', 11514),
 ('증', 2160),
 ('나', 42976),
 ('네', 18756)]

len(vocab_cnt_list) # 3004개다. 4만여개 토큰보다 훨씬 적다!
top_vocabs = sorted(vocab_cnt_list, key=lambda tup:tup[1], reverse=True)

import matplotlib.pyplot as plt
import numpy as np

cnts = [cnt for _, cnt in top_vocabs]

plt.plot(range(len(cnts)), cnts)

시각화

 

np.mean(cnts) # 1757.82

vocabs = [token for token, _ in vocab_cnt_list] # 모든 음절을 다 사용한다!

len(vocabs) # 3004

## Special token들 추가

- UNK token은 Unknown token
- PAD token은 Padding을 위한 token
- SOS token start of sentence token, 문장 시작에 위치함
- EOS token end of sentence token, 문장의 끝에 위치함

unk_token = '[UNK]'
unk_token in vocabs # false

pad_token = '[PAD]'
pad_token in vocabs

sos_token = '[SOS]'
sos_token in vocabs

eos_token = '[EOS]'
eos_token in vocabs

vocabs.insert(0, eos_token)
vocabs.insert(0, sos_token)
vocabs.insert(0, unk_token)
vocabs.insert(0, pad_token)

vocabs[:5]

['[PAD]', '[UNK]', '[SOS]', '[EOS]', '아']

idx_to_token = vocabs
token_to_idx = {token: i for i, token in enumerate(idx_to_token)}

class Tokenizer:
    def __init__(self,
                 vocabs,
                 use_padding=True,
                 max_padding=64,
                 pad_token='[PAD]',
                 unk_token='[UNK]',
                 sos_token='[SOS]',
                 eos_token='[EOS]'):
        self.idx_to_token = vocabs
        self.token_to_idx = {token: i for i, token in enumerate(self.idx_to_token)}
        
        self.use_padding = use_padding
        self.max_padding = max_padding
        
        self.pad_token = pad_token
        self.unk_token = unk_token
        self.sos_token = sos_token
        self.eos_token = eos_token
        
        self.unk_token_idx = self.token_to_idx[self.unk_token]
        self.pad_token_idx = self.token_to_idx[self.pad_token]
        self.sos_token_idx = self.token_to_idx[self.sos_token]
        self.eos_token_idx = self.token_to_idx[self.eos_token]
        
        
    def __call__(self, x):
        token_ids = [self.sos_token_idx] # 맨 처음에 sos를 넣어준다.
        
        token_list = list(x) 
        
        for token in token_list:
            if token in self.token_to_idx:
                token_idx = self.token_to_idx[token]
            else:
                token_idx = self.unk_token_idx
            token_ids.append(token_idx)
            
        token_ids.append(self.eos_token_idx)
            
        if self.use_padding:
            token_ids.pop() # remove eos token
            token_ids = token_ids[:self.max_padding-1]
            n_pads = self.max_padding - len(token_ids) - 1
            token_ids = token_ids + [self.eos_token_idx] +[self.pad_token_idx] * n_pads # EOS도 넣어준다.
        
        return token_ids
    
    def cvt_ids_to_tokens(self, ids): # 토큰 id를 받으면 토큰 리스트 들을 반환해준다.
        return [self.idx_to_token[idx] for idx in ids]
    
    def cvt_ids_to_str(self, ids): # 토큰 id를 string으로 변환.
        return ''.join([self.idx_to_token[idx] for idx in ids]) # 모든 음절을 이어붙여준다.
        
tokenizer = Tokenizer(vocabs, use_padding=False)
sample = df['document'].iloc[0]
print(sample)

아 더빙.. 진짜 짜증나네요 목소리

tokenizer(sample)

[2, 4, 5, 6, 7, 8, 8, 5, 9, 10, 5, 10, 11, 12, 13, 14, 5, 15, 16, 17, 3]

tokenizer.cvt_ids_to_str(tokenizer(sample))

'[SOS]아 더빙.. 진짜 짜증나네요 목소리[EOS]'

# DataLoader

import torch
from torch.utils.data import Dataset, DataLoader

# Split data into train, valid, testset

train_valid_df = pd.read_csv(f"{base_path}/data/nsmc/ratings_train.txt", sep='\t')
test_df = pd.read_csv(f"{base_path}/data/nsmc/ratings_test.txt", sep='\t')

print(f"# of train valid samples: {len(train_valid_df)}")
print(f"# of test samples: {len(test_df)}")

# of train valid samples: 150000
# of test samples: 50000

train_valid_df = train_valid_df.sample(frac=1.)

train_ratio = 0.8

n_train = int(len(train_valid_df) * train_ratio)

train_df = train_valid_df[:n_train]
valid_df = train_valid_df[n_train:]

print(f"# of train samples: {len(train_df)}")
print(f"# of valid samples: {len(valid_df)}")
print(f"# of test samples: {len(test_df)}")

# of train samples: 120000
# of valid samples: 30000
# of test samples: 50000

# 1/100으로 샘플링

train_df = train_df.sample(frac=0.01)
valid_df = valid_df.sample(frac=0.01)
test_df = test_df.sample(frac=0.01)

len(train_df) # 1200
class NSMCDataset(Dataset):
    
    def __init__(self, data_df, tokenizer=None):
        self.data_df = data_df
        self.tokenizer = tokenizer
        
    
    def __len__(self):
        return len(self.data_df)
    
    def __getitem__(self, idx):
        sample_raw = self.data_df.iloc[idx]
        sample = {}
        
        sample['doc'] = str(sample_raw['document'])
        
        if self.tokenizer is not None:
            sample['doc_ids'] = self.tokenizer(sample['doc'])  # 레이블이 없어지게 된다.
        
        return sample
        
def collate_fn(batch):
    keys = [key for key in batch[0].keys()]
    data = {key: [] for key in keys}

    for item in batch:
        for key in keys:
            data[key].append(item[key])
    return data
    
train_dataset = NSMCDataset(data_df=train_df, tokenizer=tokenizer)
valid_dataset = NSMCDataset(data_df=valid_df, tokenizer=tokenizer)
test_dataset = NSMCDataset(data_df=test_df, tokenizer=tokenizer)

# 학습 데이터셋은 shuffle을 해줘야함
train_dataloader= DataLoader(train_dataset,
                             batch_size=128,
                             collate_fn=collate_fn,
                             shuffle=True)

valid_dataloader= DataLoader(valid_dataset,
                             batch_size=1024,
                             collate_fn=collate_fn,
                             shuffle=False)

test_dataloader= DataLoader(test_dataset,
                            batch_size=1024,
                            collate_fn=collate_fn,
                            shuffle=False)
                            
sample = next(iter(test_dataloader))
# AutoregressiveSelfAttention model

import math
import torch
import torch.nn as nn
import torch.nn.functional as F


class AutoregressiveSelfAttention(nn.Module):

    def __init__(self,
                 vocab_size,
                 embed_dim):
        """
        Args:
            vocab_size (int): size of vocabulary.
            embed_dim (int): dimension of embedding.
        """
        super().__init__()
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.output_dim = embed_dim

        self.embeddings = nn.Embedding(vocab_size, embed_dim, padding_idx=0)

        self.q_linear = nn.Linear(embed_dim, embed_dim)
        self.k_linear = nn.Linear(embed_dim, embed_dim)
        self.v_linear = nn.Linear(embed_dim, embed_dim)
        
    def forward(self, X, return_attention_score=False):
        """Feed-forward CNN.
        Args:
            X (torch.Tensor): inputs, shape of (batch_size, sequence).
        Returns:
            torch.tensor, Sentence representation.
        """
        batch_size, seq_len = X.size()
        X = self.embeddings(X) # batch size x seq_len x embed_dim
        
        q, k, v = self.q_linear(X), self.k_linear(X), self.v_linear(X) # batch_size x seq_len x dim
        
        attention_score_raw = q @ k.transpose(-2,-1) / math.sqrt(self.embed_dim) # batch_size x seq_len x seq_len
        
        mask = torch.tril(torch.ones(seq_len,seq_len, device=X.device)) # seq_len x seq_len
        # 마스크 사용! 
        attention_score_raw = attention_score_raw.masked_fill(mask==0.,-9e9)
        attention_score = torch.softmax(attention_score_raw, dim=2) # batch_size x seq_len x seq_len
        
        context = attention_score @ v # batch_size x seq_len x dim
        
        if return_attention_score:
            return context, attention_score
        return context

마스킹을 통해 확인할 수 있는 부분 제한

seq_len=3

torch.ones(seq_len,seq_len)

tensor([[1., 1., 1.],
        [1., 1., 1.],
        [1., 1., 1.]])

mask = torch.tril(torch.ones(seq_len,seq_len)) # 대각선 부분 윗 부분을 0으로 만든다!

mask

tensor([[1., 0., 0.],
        [1., 1., 0.],
        [1., 1., 1.]])

x = torch.randn(2, 3,3)

x

tensor([[[-1.2015,  0.3968, -0.8582],
         [ 0.1763, -0.9854,  2.7532],
         [-0.3871,  0.0410, -0.6848]],

        [[ 0.3722,  0.0104, -0.6333],
         [ 0.2610, -1.3882, -1.2030],
         [-0.7213, -1.3855, -1.4519]]])

x.masked_fill(mask==0, -9e-9) # 0인 위치에 -9e-9를 넣어라

tensor([[[-8.9851e-01, -9.0000e-09, -9.0000e-09],
         [ 4.9847e-01, -1.5322e+00, -9.0000e-09],
         [-4.2938e-01,  3.1852e-01,  1.2727e-01]],

        [[ 5.4362e-01, -9.0000e-09, -9.0000e-09],
         [ 3.5968e-01, -2.2000e-01, -9.0000e-09],
         [ 6.5648e-01, -3.5736e-01,  7.7671e-01]]])

torch.softmax(x.masked_fill(mask==0, -9e9), dim=2)# 마스크 된것을 soft max하면 0이 나온다.

tensor([[[1.0000, 0.0000, 0.0000],
         [0.8840, 0.1160, 0.0000],
         [0.2059, 0.4349, 0.3592]],

        [[1.0000, 0.0000, 0.0000],
         [0.6410, 0.3590, 0.0000],
         [0.4015, 0.1457, 0.4528]]])

class Classifier(nn.Module):
    """A classifier, arbitary graph, on the top of sentence representation.
    Attributes:
        sr_model: A sentence representation module.
        input_dim: Input dimension of the classifier. Input_dim is set with sr_model output.
        output_dim: Output dimension of the model.
    """
    def __init__(self, sr_model, output_dim, vocab_size, embed_dim, **kwargs):
        """Initialization of the classifier.
        
        Args:
            sr_model (torch.nn.Module): A sentence representation module.
            output_dim (int): Output dimension of the model.
            vocab_size (int): The size of vocabulary.
            embed_dim (int): The word embedding dimension.
        """
        super().__init__()

        self.sr_model = sr_model(vocab_size=vocab_size,
                                 embed_dim=embed_dim,
                                 **kwargs)

        self.input_dim = self.sr_model.output_dim
        self.output_dim = output_dim
        self.fc = nn.Linear(self.input_dim, self.output_dim)

    def forward(self, x):
        return self.fc(self.sr_model(x))
        
model = Classifier(sr_model=AutoregressiveSelfAttention,
                   output_dim=len(vocabs),
                   vocab_size=len(vocabs),
                   embed_dim=128)
                   
# nn.Embedding(vocab_size, embed_dim, padding_idx=0)
# padding_idx를 설정해놓으면,
# padding_idx의 embedding은 0으로 초기화
# padding index의 embedding은 gradient 계산을 하지 않음, 즉 padding embedding은 업데이트 되지 않음

model.sr_model.embeddings.weight[0]

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0.], grad_fn=<SelectBackward0>)

# Training

use_cuda = True and torch.cuda.is_available()

if use_cuda:
    model.cuda()
print(f"use_cuda: {use_cuda}")

import torch.optim as optim
import numpy as np
from copy import deepcopy

optimizer = optim.Adam(params=model.parameters(), lr=0.01)
calc_loss = nn.CrossEntropyLoss(ignore_index=0) # 패딩 토큰에 대한 로스는 계산 X -> EOS를 발견하면 그 이후는 계산하지 않아도 됨

tokenizer.use_padding = True
n_epoch = 100
global_i = 0

valid_loss_history = [] # [(global_i, valid_loss), ...]
train_loss_history = [] # [(global_i, train_loss), ...]

min_valid_loss = 9e+9
best_model = None
best_epoch_i = None

ema_train_loss = None

for epoch_i in range(n_epoch):
    model.train()
    
    for batch in train_dataloader:
        optimizer.zero_grad()
        x = torch.tensor(batch['doc_ids'])
        if use_cuda:
            x = x.cuda()
        
        y_pred = model(x[:, :-1]) # 마지막 요소는 빼고 넣는다.
        loss = calc_loss(y_pred.view(-1, len(vocabs)), x[:, 1:].flatten()) # 첫번째 토큰을 제외하고 예측

        if global_i % 100 == 0:
            print(f"global_i: {global_i}, epoch_i: {epoch_i}, loss: {loss.item()}")
            
        train_loss_history.append((global_i, loss.item()))
            
        loss.backward()
        optimizer.step()
        global_i += 1
    
    model.eval()
    
    # validation
    valid_loss_list = []
    for batch in valid_dataloader:
        x = torch.tensor(batch['doc_ids'])
        if use_cuda:
            x = x.cuda()
        y_pred = model(x[:, :-1])
        loss = calc_loss(y_pred.view(-1, len(vocabs)), x[:, 1:].flatten())
        valid_loss_list.append(loss.item())

    
    valid_loss_mean = np.mean(valid_loss_list)
    valid_loss_history.append((global_i, valid_loss_mean.item()))
    
    
    if valid_loss_mean < min_valid_loss:
        min_valid_loss = valid_loss_mean
        best_epoch_i = epoch_i
        best_model = deepcopy(model)
        
    if epoch_i % 5 == 0:
        print("*"*30)
        print(f"valid_loss_mean: {valid_loss_mean}")
        print("*"*30)
print(f"best_epoch_i: {best_epoch_i}")

global_i: 0, epoch_i: 0, loss: 8.03954029083252
******************************
valid_loss_mean: 5.246993064880371
******************************
******************************
valid_loss_mean: 4.586403846740723
******************************
******************************
valid_loss_mean: 4.434481143951416
******************************
******************************
valid_loss_mean: 4.4632792472839355
******************************
******************************
valid_loss_mean: 4.568708419799805
******************************
******************************
valid_loss_mean: 4.708747863769531
******************************
******************************
valid_loss_mean: 4.889769554138184
******************************
******************************
valid_loss_mean: 5.009775638580322
******************************
...
******************************
valid_loss_mean: 6.882125377655029
******************************
best_epoch_i: 11
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...

# Learning Curve

def calc_moving_average(arr, win_size=100):
    new_arr = []
    win = []
    
    for i, val in enumerate(arr):
        win.append(val)
        if len(win) > win_size:
            win.pop(0)
        
        new_arr.append(np.mean(win))
    return np.array(new_arr)
    
valid_loss_history = np.array(valid_loss_history)
train_loss_history =  np.array(train_loss_history)

plt.figure(figsize=(12,8))
plt.plot(train_loss_history[:,0],
         calc_moving_average(train_loss_history[:,1]), color='blue')

plt.plot(valid_loss_history[:,0],
         valid_loss_history[:,1], color='red')
plt.xlabel("step")
plt.ylabel("loss")

오버피팅!

SOS를 넣으면 '오늘'을 가져온다.

오늘을 넣으면 '날씨'를 가져오고, 확률을 가져온 뒤 채워넣기!

마지막도 어때를 똑같이 가져와서 확률에 집어 넣으면 계산 가능하다!

# 문장 확률 계산하기

tokenizer.use_padding = False
def calc_sentence_prob(model, text):

    token_ids = torch.tensor(tokenizer(text)) # 토큰화하기

    if use_cuda:
        token_ids = token_ids.cuda()

    token_ids = token_ids.unsqueeze(0)  # 1 x seq_len
    outputs = model(token_ids) # 1 x seq_len x vocab_size
    token_probs = torch.softmax(outputs, dim=2) # 1 x seq_len x vocab_size 확률이 나온다.

    # lookup token probs
    lookup_ids = token_ids[:, 1:] # SOS이 후 토큰을 가져온다.
    probs_lookup = token_probs[:, :-1].gather(dim=2, index=lookup_ids.unsqueeze(2)).squeeze(2).detach().cpu().numpy()
    
    pr = probs_lookup.squeeze()
    ppl = -1. * np.sum(np.log(pr)) / len(pr)
    return pr, ppl
    
text = "마지막까지 개그하는 노잼 ㅋㅋ"
prob, ppl = calc_sentence_prob(model, text=text)

print(prob)

[6.0295984e-03 4.3006858e-01 3.8277218e-01 7.4049416e-07 6.0803515e-01
 1.9908132e-01 4.5632324e-04 1.5078409e-02 2.1003368e-06 2.5298268e-01
 4.7019348e-01 1.1000846e-04 4.7953269e-01 5.8012754e-01 3.3334929e-02
 8.6858481e-01 6.8226300e-02]

m = 1
for p in prob:
    m *= p
print(m)

9.246196067501022e-30

실제 확률이다. 굉장히 작은 값이다.
토큰이 매우 많기 때문에 1이하의 소수값을 계속 곱하게 되면 작아지게 된다.

확률을 큰 값으로 볼 수 있게 해준다.

text = "마지막까지 개그하는 노잼 ㅋㅋ"
prob, ppl = calc_sentence_prob(model, text=text)


print(text)
print(ppl)

마지막까지 개그하는 노잼 ㅋㅋ
3.9325489717371322

생성하기!

SOS를 넣은 뒤 계쏙 높은 확률을 가져오기

# 문장 생성하기

eos_token_id = token_to_idx[eos_token]
tokenizer.use_padding = False

def generate(model, seed_text, max_iter=10):
    token_ids = tokenizer(seed_text)

    token_ids = torch.tensor(token_ids[:-1]).unsqueeze(0) # 1 x seq_len
    if use_cuda:
        token_ids = token_ids.cuda()

    for _ in range(max_iter): # 최대 길이 정하기
        outputs = model(token_ids)
        last_output_token_probs = outputs[:, -1] # 1 x vocab_size
        last_output_token = last_output_token_probs.argmax(dim=1)  # 1 가장 확률 높은 것 고르기
        
        if last_output_token.squeeze().item() == eos_token_id: # EOS만나기 전까지 반복
            break
        
        token_ids = torch.concat([token_ids, last_output_token.view(1,-1)], dim=1) # 이어 붙이기
        
    text = tokenizer.cvt_ids_to_str(token_ids.flatten().detach().cpu().tolist())
    return text
    
generate(model, seed_text="평점", max_iter=10)

'[SOS]평점이 너무 낮아니야.'

파라미터를 늘리고, 기법을 늘리면 더 늘어난다!

 

728x90