인공지능/자연어 처리

seq2seq 번역 모델 만들기

이게될까 2024. 5. 3. 00:53
728x90
728x90
import tensorflow as tf

import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from sklearn.model_selection import train_test_split

import unicodedata
import re
import numpy as np
import os
import io
import time

필요한 라이브러리들 불러오기

def unicode_to_ascii(s): # 유니 코드를 아스키 코드로 바꾼다. 
    return ''.join(c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn')


def preprocess_sentence(w):
    w = unicode_to_ascii(w.lower().strip())

    # 단어와 단어 뒤에 오는 구두점(.)사이에 공백을 생성합니다.
    # 예시: "he is a boy." => "he is a boy ."
    # 참고:- https://stackoverflow.com/questions/3645931/python-padding-punctuation-with-white-spaces-keeping-punctuation
    w = re.sub(r"([?.!,¿])", r" \1 ", w)
    w = re.sub(r'[" "]+', " ", w)

    # (a-z, A-Z, ".", "?", "!", ",")을 제외한 모든 것을 공백으로 대체합니다.
    # w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w)

    w = w.strip()

    # 모델이 예측을 시작하거나 중단할 때를 알게 하기 위해서
    # 문장에 start와 end 토큰을 추가합니다.
    # 디코더의 첫번째 input은 start다. 
    # end 뒤에는 끝이 나게 된다. 
    w = '<start> ' + w + ' <end>'
    return w

전처리 및 토큰을 붙여줍니다.

# 1. 문장에 있는 억양을 제거합니다.
# 2. 불필요한 문자를 제거하여 문장을 정리합니다.
# 3. 다음과 같은 형식으로 문장의 쌍을 반환합니다: [영어, 스페인어]
def create_dataset(path, num_examples):
    ens = []
    spas = []
    lines = io.open(path, encoding='UTF-8').read().strip().split('\n')
    for l in lines[:num_examples]:
        word_pairs = [preprocess_sentence(w) for w in l.split('\t')[:2]]
        en, spa = word_pairs
        ens.append(en)
        spas.append(spa)
    return ens, spas

path_to_file = '/kaggle/input/machinetranslation/kor.txt'
en, kor = create_dataset(path_to_file, None)

print(en[-1])
print(kor[-1])

print(len(en)) # 5890
print(len(kor)) # 5890
<start> doubtless there exists in this world precisely the right woman for any given man to marry and vice versa; but when you consider that a human being has the opportunity of being acquainted with only a few hundred people , and out of the few hundred that there are but a dozen or less whom he knows intimately , and out of the dozen , one or two friends at most , it will easily be seen , when we remember the number of millions who inhabit this world , that probably , since the earth was created , the right man has never yet met the right woman . <end>
<start> 의심의 여지 없이 세상에는 어떤 남자이든 정확히 딱 알맞는 여자와 결혼하거나 그 반대의 상황이 존재하지 . 그런데 인간이 수백 명의 사람만 알고 지내는 사이가 될 기회를 갖는다고 생각해 보면 , 또 그 수백 명 중 열여 명 쯤 이하만 잘 알 수 있고 , 그리고 나서 그 열여 명 중에 한두 명만 친구가 될 수 있다면 , 그리고 또 만일 우리가 이 세상에 살고 있는 수백만 명의 사람들만 기억하고 있다면 , 딱 맞는 남자는 지구가 생겨난 이래로 딱 맞는 여자를 단 한번도 만난 적이 없을 수도 있을 거라는 사실을 쉽게 눈치챌 수 있을 거야 . <end>
5890
5890
 

한글과 영어를 나눠주고 필요없는 것 버리기, 시작과 마지막에 토큰을 붙여줍니다.

def tokenize(lang):
    lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    lang_tokenizer.fit_on_texts(lang)
    tensor = lang_tokenizer.texts_to_sequences(lang)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor,padding='post')
    return tensor, lang_tokenizer

토큰화 하고, 길이에 맞춰서 패딩을 진행하는 것 같네요

def load_dataset(path, num_examples=None):
    # 전처리된 타겟 문장과 입력 문장 쌍을 생성합니다.
    targ_lang, inp_lang = create_dataset(path, num_examples) # 인풋이 한국어가 된다.

    input_tensor, inp_lang_tokenizer = tokenize(inp_lang)
    target_tensor, targ_lang_tokenizer = tokenize(targ_lang)
    return input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer

함수화 했던 내용을 다 진행하네요

# 언어 데이터셋을 아래의 크기로 제한하여 훈련과 검증을 수행합니다.
num_examples = 10000
input_tensor, target_tensor, inp_lang, targ_lang = load_dataset(path_to_file, num_examples)

# 타겟 텐서와 입력 텐서의 최대 길이를 계산합니다.
max_length_targ, max_length_inp = target_tensor.shape[1], input_tensor.shape[1]

# 훈련 집합과 검증 집합을 80대 20으로 분리합니다.
input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2)

# 훈련 집합과 검증 집합의 데이터 크기를 출력합니다.
print(len(input_tensor_train), len(target_tensor_train), len(input_tensor_val), len(target_tensor_val))
4712 4712 1178 1178

데이터를 나눠줍니다.

def convert(lang, tensor): # index -> word로 변환하는 단계를 거쳐야한다.
    for t in tensor:
        if t!=0:
            print ("%d ----> %s" % (t, lang.index_word[t]))
            
print ("Input Language; index to word mapping")
convert(inp_lang, input_tensor_train[0])
print ()
print ("Target Language; index to word mapping")
convert(targ_lang, target_tensor_train[0])
Input Language; index to word mapping
1 ----> <start>
5 ----> 톰은
1734 ----> 바나나를
35 ----> 정말
237 ----> 좋아한다
3 ----> .
2 ----> <end>

Target Language; index to word mapping
1 ----> <start>
5 ----> tom
313 ----> loves
1393 ----> bananas
3 ----> .
2 ----> <end>

음 이건 사진 보면 더 이해가 편할수도 있겠네요

train 데이터엔 인덱스가 들어가 있으므로 convert 함수를 통해 인덱스를 단어로 바꿔주네요

BUFFER_SIZE = len(input_tensor_train)
BATCH_SIZE = 64
steps_per_epoch = len(input_tensor_train)//BATCH_SIZE
embedding_dim = 128
units = 512
vocab_inp_size = len(inp_lang.word_index)+1
vocab_tar_size = len(targ_lang.word_index)+1

dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)

print('iteration = ',steps_per_epoch )
iteration =  73
 

이제 각종 파라미터를 준비해줍니다.

# dataset 크기 출력
example_input_batch, example_target_batch = next(iter(dataset))
example_input_batch.shape, example_target_batch.shape
(TensorShape([64, 97]), TensorShape([64, 112]))
 

사이즈를 보면 배치 사이즈로 나와있는 것을 볼 수 있습니다. 한 iteration마다 저 데이터 셋이 사용되는 것 입니다.

class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
        super(Encoder, self).__init__()
        self.batch_sz = batch_sz # 배치 사이즈
        self.enc_units = enc_units # 인코더 유닛 수
        # 유닛 수 : 여기선 GRU의 출력 공간 차원 
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) # 임베딩 레이어 만들기
        self.gru = tf.keras.layers.GRU(self.enc_units, #GRU 레이어 생성
                                   return_sequences=True, # 모든 타임 스텝의 출력 반환
                                   return_state=True, # 마지막 상태 반환
                                   recurrent_initializer='glorot_uniform') # 가중치 초기화 방법


    def call(self, x, hidden):
        x = self.embedding(x) # 임베딩
        output, state = self.gru(x, initial_state = hidden)# GRU 통과
        return output, state

    def initialize_hidden_state(self):
        return tf.zeros((self.batch_sz, self.enc_units)) # 초기상태를 0으로 설정
    
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units) #q
        self.W2 = tf.keras.layers.Dense(units) # k
        self.V = tf.keras.layers.Dense(1) # v

    def call(self, query, values):
        query_with_time_axis = tf.expand_dims(query, 1)# 텐서 차원 추가[배치크기,1,특성 수]
        score = self.V(tf.nn.tanh( # BahdanauAttention은 dot product 대신에 합연산 후 tanh를 진행한다.
            self.W1(query_with_time_axis) + self.W2(values))) # attention score구하기 [배치크기,시퀀스 길이,1]
        
        attention_weights = tf.nn.softmax(score, axis=1)  # score 소프트 맥스하기
        context_vector = attention_weights * values  # score와 v 곱하기
        context_vector = tf.reduce_sum(context_vector, axis=1) # score*v한 백터 합치기 

        return context_vector, attention_weights
    
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
        super(Decoder, self).__init__()
        self.batch_sz = batch_sz # 배치 크기
        self.dec_units = dec_units # GRU 셀 수 
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) # 임베딩 벡터로 전환
        self.gru = tf.keras.layers.GRU(self.dec_units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc = tf.keras.layers.Dense(vocab_size) # 실제 아웃풋을 만들어내야 한다.
        self.attention = BahdanauAttention(self.dec_units) # 디코더 유닛을 넣어 어텐션한다.

    def call(self, x, hidden, enc_output): # 디코더 현재 인풋, 이전 satate, 인코더의 아웃풋 값들이 어텐션에 사용
        context_vector, attention_weights = self.attention(hidden, enc_output)# 더코더의 현재 히든 스테이트와 인코더의 아웃풋 값을 어텐션
        x = self.embedding(x) # 입력을 임베딩으로 전환
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) # 어텐션 출력 값과 임베딩 결합 
        output, state = self.gru(x) # gru 진행
        output = tf.reshape(output, (-1, output.shape[2])) # 출력 형태 조정 
        x = self.fc(output) # FCN 계산을 진행
        return x, state, attention_weights

여기에는 주석을 다 달아놓긴 했는데 살짝 복잡한.....

# encoder
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)

# 샘플 입력
sample_hidden = encoder.initialize_hidden_state()
sample_output, sample_hidden = encoder(example_input_batch, sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print ('Encoder Hidden state shape: (batch size, units) {}'.format(sample_hidden.shape))

# encoder-decoder attention
attention_layer = BahdanauAttention(10)
attention_result, attention_weights = attention_layer(sample_hidden, sample_output)

print("Attention result shape: (batch size, units) {}".format(attention_result.shape))
print("Attention weights shape: (batch_size, sequence_length, 1) {}".format(attention_weights.shape))

# decoder
decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)

sample_decoder_output, _, _ = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                      sample_hidden, sample_output)

print ('Decoder output shape: (batch_size, vocab size) {}'.format(sample_decoder_output.shape))
Encoder output shape: (batch size, sequence length, units) (64, 97, 512)
Encoder Hidden state shape: (batch size, units) (64, 512)
Attention result shape: (batch size, units) (64, 512)
Attention weights shape: (batch_size, sequence_length, 1) (64, 97, 1)
Decoder output shape: (batch_size, vocab size) (64, 3211)
 

데이터 사이즈를 확인할 수 있습니다.

optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none') # 단어를 생성하는 것은 모든 보캡에 대해 loss를 구해야된다 -> 확률값을 비교해야된다. -> 연산이 너무 많다. -> 아주 작은 값도 크게 키워준다.?

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

checkpoint = tf.train.Checkpoint(optimizer=optimizer,
                                 encoder=encoder,
                                 decoder=decoder)

loss function과 optimizer등 지정해줍니다.

def train_step(inp, targ, enc_hidden):
    loss = 0
    with tf.GradientTape() as tape:
        # encoder
        enc_output, enc_hidden = encoder(inp, enc_hidden)
        dec_hidden = enc_hidden
        
        # decoder
        dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)

        # 교사 강요(teacher forcing) - 다음 입력으로 타겟을 피딩(feeding)합니다.
        for t in range(1, targ.shape[1]):
            # enc_output를 디코더에 전달합니다.
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
            loss += loss_function(targ[:, t], predictions) # 여기서 출력하면 인덱스 형태가 낭로 것이다.

            # 교사 강요(teacher forcing)를 사용합니다.
            dec_input = tf.expand_dims(targ[:, t], 1) # input을 prediction으로 두는게 아니라 label을 넣는다.

    batch_loss = (loss / int(targ.shape[1]))
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    return batch_loss

학습 함수를 정의해줍니다. 주석 추가 버전으로 하나 더 넣을게요

def train_step(inp, targ, enc_hidden):
    loss = 0
    # GradientTape를 사용하여 학습 동안 발생하는 연산을 기록합니다. 
    # 이를 통해 자동 미분을 사용하여 그래디언트를 계산할 수 있습니다.
    with tf.GradientTape() as tape:
        # 인코더를 실행하여 입력 데이터에 대한 출력과 숨겨진 상태를 얻습니다.
        enc_output, enc_hidden = encoder(inp, enc_hidden)
        dec_hidden = enc_hidden  # 디코더의 초기 상태를 인코더의 최종 상태로 설정
        
        # 디코더의 첫 입력 설정. 모든 예시에 대해 '<start>' 토큰을 사용
        dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)

        # 타겟 문장의 각 토큰에 대해 디코더를 반복 실행합니다.
        # 교사 강요(Teacher Forcing)를 사용하여 현재 타겟 토큰을 다음 입력으로 제공합니다.
        for t in range(1, targ.shape[1]):  # 첫 번째 토큰은 이미 처리했으므로 1부터 시작
            # 디코더를 실행하여 예측 결과와 디코더의 숨겨진 상태를 얻습니다.
            predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
            # 손실을 계산하여 총 손실에 추가합니다.
            loss += loss_function(targ[:, t], predictions)

            # 다음 입력을 위해 현재 타겟 토큰을 디코더 입력으로 설정합니다.
            dec_input = tf.expand_dims(targ[:, t], 1)

    # 배치에 대한 평균 손실을 계산합니다.
    batch_loss = (loss / int(targ.shape[1]))

    # 학습 가능한 변수들(인코더와 디코더의 변수)을 가져옵니다.
    variables = encoder.trainable_variables + decoder.trainable_variables
    # 기록된 연산을 바탕으로 변수들의 그래디언트를 계산합니다.
    gradients = tape.gradient(loss, variables)
    # 계산된 그래디언트를 이용하여 모델의 변수를 업데이트합니다.
    optimizer.apply_gradients(zip(gradients, variables))

    return batch_loss
# 이 파트가 매우 오래걸림

EPOCHS = 1

for epoch in range(EPOCHS):
    start = time.time()

    enc_hidden = encoder.initialize_hidden_state()
    total_loss = 0

    for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
        batch_loss = train_step(inp, targ, enc_hidden)
        total_loss += batch_loss

        if batch % 10 == 0:
            print('Epoch {} / Batch {} / Loss {:.4f} / Time taken {} sec'.format(epoch + 1,
                                                                                 batch,
                                                                                 batch_loss.numpy(),
                                                                                 time.time() - start))
    # 에포크가 2번 실행될때마다 모델 저장 (체크포인트)
    #if (epoch + 1) % 2 == 0:
    #    checkpoint.save(file_prefix = checkpoint_prefix)

    print('Epoch {} / Loss {:.4f}'.format(epoch + 1,
                                          total_loss / steps_per_epoch))
    print('Time taken for {} epoch {} sec\n'.format(epoch + 1,
                                                    time.time() - start))

학습을 시켜주는데 거의 30~40분이....

def evaluate(sentence):
    attention_plot = np.zeros((max_length_targ, max_length_inp))

    sentence = preprocess_sentence(sentence)

    inputs = [inp_lang.word_index[i] for i in sentence.split(' ')]
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
                                                         maxlen=max_length_inp,
                                                         padding='post')
    inputs = tf.convert_to_tensor(inputs)

    result = ''

    hidden = [tf.zeros((1, units))]
    enc_out, enc_hidden = encoder(inputs, hidden)

    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([targ_lang.word_index['<start>']], 0)

    for t in range(max_length_targ):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        # 나중에 어텐션 가중치를 시각화하기 위해 어텐션 가중치를 저장합니다.
        attention_weights = tf.reshape(attention_weights, (-1, ))
        attention_plot[t] = attention_weights.numpy()

        predicted_id = tf.argmax(predictions[0]).numpy()

        result += targ_lang.index_word[predicted_id] + ' '

        if targ_lang.index_word[predicted_id] == '<end>':
            return result, sentence, attention_plot

        # 예측된 ID를 모델에 다시 피드합니다.
        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention_plot

평가 함수! 이것도 주석 달아서

def evaluate(sentence):
    # 어텐션 가중치를 기록하기 위한 행렬 초기화. 시각화할 때 사용합니다.
    attention_plot = np.zeros((max_length_targ, max_length_inp))

    # 입력 문장을 전처리합니다 (소문자화, 구두점 처리, 토큰 추가 등).
    sentence = preprocess_sentence(sentence)

    # 전처리된 문장을 단어 인덱스로 변환합니다.
    inputs = [inp_lang.word_index[i] for i in sentence.split(' ')]
    # 패딩을 추가하여 모든 입력 시퀀스의 길이를 일정하게 맞춥니다.
    inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
                                                           maxlen=max_length_inp,
                                                           padding='post')
    # 텐서로 변환합니다.
    inputs = tf.convert_to_tensor(inputs)

    # 결과를 저장할 빈 문자열 초기화
    result = ''

    # 인코더 초기 상태 설정
    hidden = [tf.zeros((1, units))]
    # 인코더 실행
    enc_out, enc_hidden = encoder(inputs, hidden)

    # 디코더의 초기 상태를 인코더의 최종 상태로 설정
    dec_hidden = enc_hidden
    # 디코더의 첫 입력 설정 ('<start>' 토큰)
    dec_input = tf.expand_dims([targ_lang.word_index['<start>']], 0)

    # 타겟 문장의 최대 길이까지 반복
    for t in range(max_length_targ):
        # 디코더 실행
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        # 어텐션 가중치를 저장하여 나중에 시각화에 사용
        attention_weights = tf.reshape(attention_weights, (-1, ))
        attention_plot[t] = attention_weights.numpy()

        # 가장 확률이 높은 단어의 인덱스를 추출
        predicted_id = tf.argmax(predictions[0]).numpy()

        # 인덱스를 단어로 변환하여 결과 문자열에 추가
        result += targ_lang.index_word[predicted_id] + ' '

        # 만약 예측된 단어가 종료 토큰이면, 반복을 종료
        if targ_lang.index_word[predicted_id] == '<end>':
            return result, sentence, attention_plot

        # 예측된 단어를 다음 입력으로 사용
        dec_input = tf.expand_dims([predicted_id], 0)

    # 최대 길이까지 번역이 완료되면 결과 반환
    return result, sentence, attention_plot

 

def translate(sentence):
    result, sentence, attention_plot = evaluate(sentence)

    print('Input: %s' % (sentence))
    print('Predicted translation: {}'.format(result))

번역 함수입니다!

이렇게 사용 가능합니다!

학습이 너무 덜 되었네요...

 

728x90