728x90
728x90
2024.05.03 - [인공지능/자연어 처리] - seq2seq 번역 모델 만들기
여기에 기존 번역 모델 (GRU를 사용한 번역기) 만드는 과정은 있습니다.
GPU를 사용해서 학습하는게 자꾸 오류가 떠서....
LSTM은 겨우 되는 코드를 찾았네요
BUFFER_SIZE = len(input_tensor_train)
BATCH_SIZE = 16
steps_per_epoch = len(input_tensor_train)//BATCH_SIZE
embedding_dim = 256
units = 512
vocab_inp_size = len(inp_lang.word_index)+1
vocab_tar_size = len(targ_lang.word_index)+1
dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
print('iteration = ',steps_per_epoch )
학습 돌릴 시간이 충분하지 않아서 배치 사이즈를 줄여 빠른 학습 속도를 보이겠습니다.
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
super(Encoder, self).__init__()
self.batch_sz = batch_sz
self.enc_units = enc_units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
self.lstm = tf.keras.layers.LSTM(self.enc_units,
return_sequences=True,
return_state=True,
recurrent_initializer='glorot_uniform')
def call(self, x, hidden):
x = self.embedding(x)
output, h_state, c_state = self.lstm(x, initial_state=hidden)
return output, h_state
def initialize_hidden_state(self, batch_sz):
# LSTM의 hidden state와 cell state를 초기화합니다.
# 각각의 상태는 (batch_size, LSTM_units)의 크기를 가져야 합니다.
return [tf.zeros((batch_sz, self.enc_units)), tf.zeros((batch_sz, self.enc_units))]
class BahdanauAttention(tf.keras.layers.Layer):
def __init__(self, units):
super(BahdanauAttention, self).__init__()
self.W1 = tf.keras.layers.Dense(units) #q
self.W2 = tf.keras.layers.Dense(units) # k
self.V = tf.keras.layers.Dense(1) # v
def call(self, query, values):
query_with_time_axis = tf.expand_dims(query, 1)# 텐서 차원 추가[배치크기,1,특성 수]
score = self.V(tf.nn.tanh( # BahdanauAttention은 dot product 대신에 합연산 후 tanh를 진행한다.
self.W1(query_with_time_axis) + self.W2(values))) # attention score구하기 [배치크기,시퀀스 길이,1]
attention_weights = tf.nn.softmax(score, axis=1) # score 소프트 맥스하기
context_vector = attention_weights * values # score와 v 곱하기
context_vector = tf.reduce_sum(context_vector, axis=1) # score*v한 백터 합치기
return context_vector, attention_weights
class Decoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
super(Decoder, self).__init__()
self.batch_sz = batch_sz # 배치 크기
self.dec_units = dec_units # GRU 셀 수
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) # 임베딩 벡터로 전환
self.lstm = tf.keras.layers.LSTM(self.dec_units,
return_sequences=True,
return_state=True,
recurrent_initializer='glorot_uniform')
self.fc = tf.keras.layers.Dense(vocab_size) # 실제 아웃풋을 만들어내야 한다.
self.attention = BahdanauAttention(self.dec_units) # 디코더 유닛을 넣어 어텐션한다.
def call(self, x, hidden, enc_output): # 디코더 현재 인풋, 이전 satate, 인코더의 아웃풋 값들이 어텐션에 사용
context_vector, attention_weights = self.attention(hidden, enc_output)# 더코더의 현재 히든 스테이트와 인코더의 아웃풋 값을 어텐션
x = self.embedding(x) # 입력을 임베딩으로 전환
x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) # 어텐션 출력 값과 임베딩 결합
output, state,cstate = self.lstm(x) # gru 진행
output = tf.reshape(output, (-1, output.shape[2])) # 출력 형태 조정
x = self.fc(output) # FCN 계산을 진행
return x, state, attention_weights
# encoder
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)
# encoder-decoder attention
attention_layer = BahdanauAttention(10)
# decoder
decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none') # 단어를 생성하는 것은 모든 보캡에 대해 loss를 구해야된다 -> 확률값을 비교해야된다. -> 연산이 너무 많다. -> 아주 작은 값도 크게 키워준다.?
def loss_function(real, pred):
# 'real' 텐서에서 0이 아닌 요소를 찾아 True, 0인 요소는 False인 마스크를 생성합니다.
# 여기서 0은 일반적으로 패딩된 값으로 간주되어 손실 계산에서 제외되어야 합니다.
mask = tf.math.logical_not(tf.math.equal(real, 0))
# 'loss_object'를 사용하여 실제 값과 예측 값 사이의 손실을 계산합니다.
# 'loss_object'는 이전에 정의되어야 하며, 일반적으로 텐서플로우의 손실 함수 중 하나를 사용합니다.
loss_ = loss_object(real, pred)
# 마스크를 손실 텐서의 데이터 타입으로 변환합니다.
mask = tf.cast(mask, dtype=loss_.dtype)
# 계산된 손실에 마스크를 적용하여 실제로 중요한 값들에 대해서만 손실을 계산합니다.
# 패딩된 부분(즉, 마스크에서 False인 부분)의 손실은 0이 되어 손실에 기여하지 않습니다.
loss_ *= mask
# 최종 손실값을 계산하기 위해 마스크가 적용된 손실의 평균을 구합니다.
# 여기서 tf.reduce_mean 함수는 텐서의 모든 요소에 대한 평균을 계산합니다.
return tf.reduce_mean(loss_)
def train_step(inp, targ, enc_hidden):
loss = 0
with tf.GradientTape() as tape:
# encoder
enc_output, enc_hidden = encoder(inp, enc_hidden)
dec_hidden = enc_hidden
# decoder
dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)
# 교사 강요(teacher forcing) - 다음 입력으로 타겟을 피딩(feeding)합니다.
for t in range(1, targ.shape[1]):
# enc_output를 디코더에 전달합니다.
predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
loss += loss_function(targ[:, t], predictions) # 여기서 출력하면 인덱스 형태가 낭로 것이다.
# 교사 강요(teacher forcing)를 사용합니다.
dec_input = tf.expand_dims(targ[:, t], 1) # input을 prediction으로 두는게 아니라 label을 넣는다.
batch_loss = (loss / int(targ.shape[1]))
variables = encoder.trainable_variables + decoder.trainable_variables
gradients = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(gradients, variables))
return batch_loss
여기는 이전 글 읽어 보시면 편하실텐데 LSTM의 출력, 입력 차원 조정해 준 것 말고는 없습니다.
# 이 파트가 매우 오래걸림
EPOCHS = 8
for epoch in range(EPOCHS):
start = time.time()
enc_hidden = encoder.initialize_hidden_state(BATCH_SIZE)
total_loss = 0
for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
batch_loss = train_step(inp, targ, enc_hidden)
total_loss += batch_loss
if batch % 10 == 0:
print('Epoch {} / Batch {} / Loss {:.4f} / Time taken {} sec'.format(epoch + 1,
batch,
batch_loss.numpy(),
time.time() - start))
# 에포크가 2번 실행될때마다 모델 저장 (체크포인트)
#if (epoch + 1) % 2 == 0:
# checkpoint.save(file_prefix = checkpoint_prefix)
print('Epoch {} / Loss {:.4f}'.format(epoch + 1,
total_loss / steps_per_epoch))
print('Time taken for {} epoch {} sec\n'.format(epoch + 1,
time.time() - start))
학습은 1batch 당 kaggle GPU 사용해서 3~4초 정도 걸립니다.
def evaluate(sentence):
attention_plot = np.zeros((max_length_targ, max_length_inp))
sentence = preprocess_sentence(sentence)
inputs = [inp_lang.word_index[i] for i in sentence.split(' ')]
inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
maxlen=max_length_inp,
padding='post')
inputs = tf.convert_to_tensor(inputs)
batch_size = inputs.shape[0]
hidden = encoder.initialize_hidden_state(batch_size)
enc_out, enc_hidden = encoder(inputs, hidden) # 이 줄이 세 개의 값을 정확히 반환하도록 보장
dec_hidden = enc_hidden
dec_input = tf.expand_dims([targ_lang.word_index['<start>']], 0)
result = ''
for t in range(max_length_targ):
predictions, dec_hidden, attention_weights = decoder(dec_input,
dec_hidden,
enc_out)
attention_weights = tf.reshape(attention_weights, (-1, ))
attention_plot[t] = attention_weights.numpy()
predicted_id = tf.argmax(predictions[0]).numpy()
result += targ_lang.index_word[predicted_id] + ' '
if targ_lang.index_word[predicted_id] == '<end>':
return result, sentence, attention_plot
return result, sentence, attention_plot
여기도 크게 달라진 것은 없고, 차원정도 고쳤습니다.
def translate(sentence):
result, sentence, attention_plot = evaluate(sentence)
#print('Input: %s' % (sentence))
#print('Predicted translation: {}'.format(result))
return result
print(translate(kor[6]))
여기 수정하느라 애를 많이 먹었네요 ㅎㅎㅎ,,,,
def unicode_to_ascii2(s): # 유니 코드를 아스키 코드로 바꾼다.
return ''.join(c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn')
def preprocess_sentence2(w):
w = unicode_to_ascii(w.lower().strip())
# 단어와 단어 뒤에 오는 구두점(.)사이에 공백을 생성합니다.
# 예시: "he is a boy." => "he is a boy ."
# 참고:- https://stackoverflow.com/questions/3645931/python-padding-punctuation-with-white-spaces-keeping-punctuation
w = re.sub(r"([?.!,¿])", r" \1 ", w)
w = re.sub(r'[" "]+', " ", w)
# (a-z, A-Z, ".", "?", "!", ",")을 제외한 모든 것을 공백으로 대체합니다.
# w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w)
w = w.strip()
# 모델이 예측을 시작하거나 중단할 때를 알게 하기 위해서
# 문장에 start와 end 토큰을 추가합니다.
# 디코더의 첫번째 input은 start다.
# end 뒤에는 끝이 나게 된다.
return w
import random
def create_dataset2(path, num_examples):
ens = []
spas = []
# 파일을 읽고 라인으로 분할합니다.
with io.open(path, encoding='UTF-8') as file:
lines = file.read().strip().split('\n')
# num_examples가 None이 아니면, 랜덤한 라인을 선택합니다.
if num_examples is not None and num_examples < len(lines):
lines = random.sample(lines, num_examples)
# 선택된 각 라인에 대해 처리를 수행합니다.
for l in lines:
word_pairs = [preprocess_sentence2(w) for w in l.split('\t')[:2]]
en, spa = word_pairs
ens.append(en)
spas.append(spa)
return ens, spas
path_to_file = 'kor.txt'
en2, kor2 = create_dataset2(path_to_file, 500)
GPU는 빠르니까 랜덤하게 500개 뽑아서 BLEU score를 위해 데이터를 준비해줍니다.
qw=18
for qw in range (18,55,1):
print(translate(kor2[qw]))
print(en2[qw])
print(kor2[qw])
이 코드를 사용하여 번역되는 것을 확인해볼 수 있구요
import nltk
from nltk.translate.bleu_score import sentence_bleu
def clean_translation(translation):
return translation.replace('<end>', '').strip()
def calculate_sentence_bleu(reference, candidate):
reference_tokens = reference.split()
candidate_tokens = candidate.split()
return sentence_bleu([reference_tokens], candidate_tokens)
translated_sentences = [translate(sentence) for sentence in kor2]
cleaned_translations = [clean_translation(translation) for translation in translated_sentences]
# 개별 BLEU 점수 계산 및 전체 평균 BLEU 점수 계산
individual_bleu_scores = [calculate_sentence_bleu(ref, trans) for ref, trans in zip(en2, cleaned_translations)]
average_bleu_score = sum(individual_bleu_scores) / len(individual_bleu_scores)
#print("Individual BLEU Scores:", individual_bleu_scores)
print("Average BLEU Score:", average_bleu_score)
여기서 BLEU 스코어 평균이랑 개별 점수 각각 보실 수 있습니다!
728x90
'인공지능 > 자연어 처리' 카테고리의 다른 글
자연어 처리 10강 - Language Modeling with GPT (0) | 2024.06.03 |
---|---|
NLP Python - BERT for Question Answering, Tokenizer, Evaluate(f1 score) (32) | 2024.05.22 |
seq2seq 번역 모델 만들기 (0) | 2024.05.03 |
자연어 처리 python 양방향(Bidirectional) LSTM 진행하기 (0) | 2024.04.28 |
자연어 처리 python 실습 - 한to영 기계 번역 모델 학습 및 평가 (1) | 2024.04.26 |