seq2seq 번역 모델 만들기
import tensorflow as tfimport matplotlib.pyplot as pltimport matplotlib.ticker as tickerfrom sklearn.model_selection import train_test_splitimport unicodedataimport reimport numpy as npimport osimport ioimport time필요한 라이브러리들 불러오기d
여기에 기존 번역 모델 (GRU를 사용한 번역기) 만드는 과정은 있습니다.
GPU를 사용해서 학습하는게 자꾸 오류가 떠서....
LSTM은 겨우 되는 코드를 찾았네요
BUFFER_SIZE = len(input_tensor_train)
steps_per_epoch = len(input_tensor_train)//BATCH_SIZE
embedding_dim = 256
units = 512
vocab_inp_size = len(inp_lang.word_index)+1
vocab_tar_size = len(targ_lang.word_index)+1
dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
print('iteration = ',steps_per_epoch )
학습 돌릴 시간이 충분하지 않아서 배치 사이즈를 줄여 빠른 학습 속도를 보이겠습니다.
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
super(Encoder, self).__init__()
self.batch_sz = batch_sz
self.enc_units = enc_units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
self.lstm = tf.keras.layers.LSTM(self.enc_units,
def call(self, x, hidden):
x = self.embedding(x)
output, h_state, c_state = self.lstm(x, initial_state=hidden)
return output, h_state
def initialize_hidden_state(self, batch_sz):
# LSTM의 hidden state와 cell state를 초기화합니다.
# 각각의 상태는 (batch_size, LSTM_units)의 크기를 가져야 합니다.
return [tf.zeros((batch_sz, self.enc_units)), tf.zeros((batch_sz, self.enc_units))]
class BahdanauAttention(tf.keras.layers.Layer):
def __init__(self, units):
super(BahdanauAttention, self).__init__()
self.W1 = tf.keras.layers.Dense(units) #q
self.W2 = tf.keras.layers.Dense(units) # k
self.V = tf.keras.layers.Dense(1) # v
def call(self, query, values):
query_with_time_axis = tf.expand_dims(query, 1)# 텐서 차원 추가[배치크기,1,특성 수]
score = self.V(tf.nn.tanh( # BahdanauAttention은 dot product 대신에 합연산 후 tanh를 진행한다.
self.W1(query_with_time_axis) + self.W2(values))) # attention score구하기 [배치크기,시퀀스 길이,1]
attention_weights = tf.nn.softmax(score, axis=1) # score 소프트 맥스하기
context_vector = attention_weights * values # score와 v 곱하기
context_vector = tf.reduce_sum(context_vector, axis=1) # score*v한 백터 합치기
return context_vector, attention_weights
class Decoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
super(Decoder, self).__init__()
self.batch_sz = batch_sz # 배치 크기
self.dec_units = dec_units # GRU 셀 수
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) # 임베딩 벡터로 전환
self.lstm = tf.keras.layers.LSTM(self.dec_units,
self.fc = tf.keras.layers.Dense(vocab_size) # 실제 아웃풋을 만들어내야 한다.
self.attention = BahdanauAttention(self.dec_units) # 디코더 유닛을 넣어 어텐션한다.
def call(self, x, hidden, enc_output): # 디코더 현재 인풋, 이전 satate, 인코더의 아웃풋 값들이 어텐션에 사용
context_vector, attention_weights = self.attention(hidden, enc_output)# 더코더의 현재 히든 스테이트와 인코더의 아웃풋 값을 어텐션
x = self.embedding(x) # 입력을 임베딩으로 전환
x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) # 어텐션 출력 값과 임베딩 결합
output, state,cstate = self.lstm(x) # gru 진행
output = tf.reshape(output, (-1, output.shape[2])) # 출력 형태 조정
x = self.fc(output) # FCN 계산을 진행
return x, state, attention_weights
# encoder
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)
# encoder-decoder attention
attention_layer = BahdanauAttention(10)
# decoder
decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none') # 단어를 생성하는 것은 모든 보캡에 대해 loss를 구해야된다 -> 확률값을 비교해야된다. -> 연산이 너무 많다. -> 아주 작은 값도 크게 키워준다.?
def loss_function(real, pred):
# 'real' 텐서에서 0이 아닌 요소를 찾아 True, 0인 요소는 False인 마스크를 생성합니다.
# 여기서 0은 일반적으로 패딩된 값으로 간주되어 손실 계산에서 제외되어야 합니다.
mask = tf.math.logical_not(tf.math.equal(real, 0))
# 'loss_object'를 사용하여 실제 값과 예측 값 사이의 손실을 계산합니다.
# 'loss_object'는 이전에 정의되어야 하며, 일반적으로 텐서플로우의 손실 함수 중 하나를 사용합니다.
loss_ = loss_object(real, pred)
# 마스크를 손실 텐서의 데이터 타입으로 변환합니다.
mask = tf.cast(mask, dtype=loss_.dtype)
# 계산된 손실에 마스크를 적용하여 실제로 중요한 값들에 대해서만 손실을 계산합니다.
# 패딩된 부분(즉, 마스크에서 False인 부분)의 손실은 0이 되어 손실에 기여하지 않습니다.
loss_ *= mask
# 최종 손실값을 계산하기 위해 마스크가 적용된 손실의 평균을 구합니다.
# 여기서 tf.reduce_mean 함수는 텐서의 모든 요소에 대한 평균을 계산합니다.
return tf.reduce_mean(loss_)
def train_step(inp, targ, enc_hidden):
loss = 0
with tf.GradientTape() as tape:
# encoder
enc_output, enc_hidden = encoder(inp, enc_hidden)
dec_hidden = enc_hidden
# decoder
dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)
# 교사 강요(teacher forcing) - 다음 입력으로 타겟을 피딩(feeding)합니다.
for t in range(1, targ.shape[1]):
# enc_output를 디코더에 전달합니다.
predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
loss += loss_function(targ[:, t], predictions) # 여기서 출력하면 인덱스 형태가 낭로 것이다.
# 교사 강요(teacher forcing)를 사용합니다.
dec_input = tf.expand_dims(targ[:, t], 1) # input을 prediction으로 두는게 아니라 label을 넣는다.
batch_loss = (loss / int(targ.shape[1]))
variables = encoder.trainable_variables + decoder.trainable_variables
gradients = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(gradients, variables))
return batch_loss
여기는 이전 글 읽어 보시면 편하실텐데 LSTM의 출력, 입력 차원 조정해 준 것 말고는 없습니다.
# 이 파트가 매우 오래걸림
for epoch in range(EPOCHS):
start = time.time()
enc_hidden = encoder.initialize_hidden_state(BATCH_SIZE)
total_loss = 0
for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
batch_loss = train_step(inp, targ, enc_hidden)
total_loss += batch_loss
if batch % 10 == 0:
print('Epoch {} / Batch {} / Loss {:.4f} / Time taken {} sec'.format(epoch + 1,
time.time() - start))
# 에포크가 2번 실행될때마다 모델 저장 (체크포인트)
#if (epoch + 1) % 2 == 0:
# checkpoint.save(file_prefix = checkpoint_prefix)
print('Epoch {} / Loss {:.4f}'.format(epoch + 1,
total_loss / steps_per_epoch))
print('Time taken for {} epoch {} sec\n'.format(epoch + 1,
time.time() - start))
학습은 1batch 당 kaggle GPU 사용해서 3~4초 정도 걸립니다.
def evaluate(sentence):
attention_plot = np.zeros((max_length_targ, max_length_inp))
sentence = preprocess_sentence(sentence)
inputs = [inp_lang.word_index[i] for i in sentence.split(' ')]
inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
inputs = tf.convert_to_tensor(inputs)
batch_size = inputs.shape[0]
hidden = encoder.initialize_hidden_state(batch_size)
enc_out, enc_hidden = encoder(inputs, hidden) # 이 줄이 세 개의 값을 정확히 반환하도록 보장
dec_hidden = enc_hidden
dec_input = tf.expand_dims([targ_lang.word_index['<start>']], 0)
result = ''
for t in range(max_length_targ):
predictions, dec_hidden, attention_weights = decoder(dec_input,
attention_weights = tf.reshape(attention_weights, (-1, ))
attention_plot[t] = attention_weights.numpy()
predicted_id = tf.argmax(predictions[0]).numpy()
result += targ_lang.index_word[predicted_id] + ' '
if targ_lang.index_word[predicted_id] == '<end>':
return result, sentence, attention_plot
return result, sentence, attention_plot
여기도 크게 달라진 것은 없고, 차원정도 고쳤습니다.
def translate(sentence):
result, sentence, attention_plot = evaluate(sentence)
#print('Input: %s' % (sentence))
#print('Predicted translation: {}'.format(result))
return result
여기 수정하느라 애를 많이 먹었네요 ㅎㅎㅎ,,,,
def unicode_to_ascii2(s): # 유니 코드를 아스키 코드로 바꾼다.
return ''.join(c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn')
def preprocess_sentence2(w):
w = unicode_to_ascii(w.lower().strip())
# 단어와 단어 뒤에 오는 구두점(.)사이에 공백을 생성합니다.
# 예시: "he is a boy." => "he is a boy ."
# 참고:- https://stackoverflow.com/questions/3645931/python-padding-punctuation-with-white-spaces-keeping-punctuation
w = re.sub(r"([?.!,¿])", r" \1 ", w)
w = re.sub(r'[" "]+', " ", w)
# (a-z, A-Z, ".", "?", "!", ",")을 제외한 모든 것을 공백으로 대체합니다.
# w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w)
w = w.strip()
# 모델이 예측을 시작하거나 중단할 때를 알게 하기 위해서
# 문장에 start와 end 토큰을 추가합니다.
# 디코더의 첫번째 input은 start다.
# end 뒤에는 끝이 나게 된다.
return w
import random
def create_dataset2(path, num_examples):
ens = []
spas = []
# 파일을 읽고 라인으로 분할합니다.
with io.open(path, encoding='UTF-8') as file:
lines = file.read().strip().split('\n')
# num_examples가 None이 아니면, 랜덤한 라인을 선택합니다.
if num_examples is not None and num_examples < len(lines):
lines = random.sample(lines, num_examples)
# 선택된 각 라인에 대해 처리를 수행합니다.
for l in lines:
word_pairs = [preprocess_sentence2(w) for w in l.split('\t')[:2]]
en, spa = word_pairs
return ens, spas
path_to_file = 'kor.txt'
en2, kor2 = create_dataset2(path_to_file, 500)
GPU는 빠르니까 랜덤하게 500개 뽑아서 BLEU score를 위해 데이터를 준비해줍니다.
for qw in range (18,55,1):
이 코드를 사용하여 번역되는 것을 확인해볼 수 있구요
import nltk
from nltk.translate.bleu_score import sentence_bleu
def clean_translation(translation):
return translation.replace('<end>', '').strip()
def calculate_sentence_bleu(reference, candidate):
reference_tokens = reference.split()
candidate_tokens = candidate.split()
return sentence_bleu([reference_tokens], candidate_tokens)
translated_sentences = [translate(sentence) for sentence in kor2]
cleaned_translations = [clean_translation(translation) for translation in translated_sentences]
# 개별 BLEU 점수 계산 및 전체 평균 BLEU 점수 계산
individual_bleu_scores = [calculate_sentence_bleu(ref, trans) for ref, trans in zip(en2, cleaned_translations)]
average_bleu_score = sum(individual_bleu_scores) / len(individual_bleu_scores)
#print("Individual BLEU Scores:", individual_bleu_scores)
print("Average BLEU Score:", average_bleu_score)
여기서 BLEU 스코어 평균이랑 개별 점수 각각 보실 수 있습니다!
