728x90
728x90
import tensorflow as tf
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
from sklearn.model_selection import train_test_split
import unicodedata
import re
import numpy as np
import os
import io
import time
필요한 라이브러리들 불러오기
def unicode_to_ascii(s): # 유니 코드를 아스키 코드로 바꾼다.
return ''.join(c for c in unicodedata.normalize('NFD', s)
if unicodedata.category(c) != 'Mn')
def preprocess_sentence(w):
w = unicode_to_ascii(w.lower().strip())
# 단어와 단어 뒤에 오는 구두점(.)사이에 공백을 생성합니다.
# 예시: "he is a boy." => "he is a boy ."
# 참고:- https://stackoverflow.com/questions/3645931/python-padding-punctuation-with-white-spaces-keeping-punctuation
w = re.sub(r"([?.!,¿])", r" \1 ", w)
w = re.sub(r'[" "]+', " ", w)
# (a-z, A-Z, ".", "?", "!", ",")을 제외한 모든 것을 공백으로 대체합니다.
# w = re.sub(r"[^a-zA-Z?.!,¿]+", " ", w)
w = w.strip()
# 모델이 예측을 시작하거나 중단할 때를 알게 하기 위해서
# 문장에 start와 end 토큰을 추가합니다.
# 디코더의 첫번째 input은 start다.
# end 뒤에는 끝이 나게 된다.
w = '<start> ' + w + ' <end>'
return w
전처리 및 토큰을 붙여줍니다.
# 1. 문장에 있는 억양을 제거합니다.
# 2. 불필요한 문자를 제거하여 문장을 정리합니다.
# 3. 다음과 같은 형식으로 문장의 쌍을 반환합니다: [영어, 스페인어]
def create_dataset(path, num_examples):
ens = []
spas = []
lines = io.open(path, encoding='UTF-8').read().strip().split('\n')
for l in lines[:num_examples]:
word_pairs = [preprocess_sentence(w) for w in l.split('\t')[:2]]
en, spa = word_pairs
ens.append(en)
spas.append(spa)
return ens, spas
path_to_file = '/kaggle/input/machinetranslation/kor.txt'
en, kor = create_dataset(path_to_file, None)
print(en[-1])
print(kor[-1])
print(len(en)) # 5890
print(len(kor)) # 5890
<start> doubtless there exists in this world precisely the right woman for any given man to marry and vice versa; but when you consider that a human being has the opportunity of being acquainted with only a few hundred people , and out of the few hundred that there are but a dozen or less whom he knows intimately , and out of the dozen , one or two friends at most , it will easily be seen , when we remember the number of millions who inhabit this world , that probably , since the earth was created , the right man has never yet met the right woman . <end>
<start> 의심의 여지 없이 세상에는 어떤 남자이든 정확히 딱 알맞는 여자와 결혼하거나 그 반대의 상황이 존재하지 . 그런데 인간이 수백 명의 사람만 알고 지내는 사이가 될 기회를 갖는다고 생각해 보면 , 또 그 수백 명 중 열여 명 쯤 이하만 잘 알 수 있고 , 그리고 나서 그 열여 명 중에 한두 명만 친구가 될 수 있다면 , 그리고 또 만일 우리가 이 세상에 살고 있는 수백만 명의 사람들만 기억하고 있다면 , 딱 맞는 남자는 지구가 생겨난 이래로 딱 맞는 여자를 단 한번도 만난 적이 없을 수도 있을 거라는 사실을 쉽게 눈치챌 수 있을 거야 . <end>
5890
5890
한글과 영어를 나눠주고 필요없는 것 버리기, 시작과 마지막에 토큰을 붙여줍니다.
def tokenize(lang):
lang_tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
lang_tokenizer.fit_on_texts(lang)
tensor = lang_tokenizer.texts_to_sequences(lang)
tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor,padding='post')
return tensor, lang_tokenizer
토큰화 하고, 길이에 맞춰서 패딩을 진행하는 것 같네요
def load_dataset(path, num_examples=None):
# 전처리된 타겟 문장과 입력 문장 쌍을 생성합니다.
targ_lang, inp_lang = create_dataset(path, num_examples) # 인풋이 한국어가 된다.
input_tensor, inp_lang_tokenizer = tokenize(inp_lang)
target_tensor, targ_lang_tokenizer = tokenize(targ_lang)
return input_tensor, target_tensor, inp_lang_tokenizer, targ_lang_tokenizer
함수화 했던 내용을 다 진행하네요
# 언어 데이터셋을 아래의 크기로 제한하여 훈련과 검증을 수행합니다.
num_examples = 10000
input_tensor, target_tensor, inp_lang, targ_lang = load_dataset(path_to_file, num_examples)
# 타겟 텐서와 입력 텐서의 최대 길이를 계산합니다.
max_length_targ, max_length_inp = target_tensor.shape[1], input_tensor.shape[1]
# 훈련 집합과 검증 집합을 80대 20으로 분리합니다.
input_tensor_train, input_tensor_val, target_tensor_train, target_tensor_val = train_test_split(input_tensor, target_tensor, test_size=0.2)
# 훈련 집합과 검증 집합의 데이터 크기를 출력합니다.
print(len(input_tensor_train), len(target_tensor_train), len(input_tensor_val), len(target_tensor_val))
4712 4712 1178 1178
데이터를 나눠줍니다.
def convert(lang, tensor): # index -> word로 변환하는 단계를 거쳐야한다.
for t in tensor:
if t!=0:
print ("%d ----> %s" % (t, lang.index_word[t]))
print ("Input Language; index to word mapping")
convert(inp_lang, input_tensor_train[0])
print ()
print ("Target Language; index to word mapping")
convert(targ_lang, target_tensor_train[0])
Input Language; index to word mapping
1 ----> <start>
5 ----> 톰은
1734 ----> 바나나를
35 ----> 정말
237 ----> 좋아한다
3 ----> .
2 ----> <end>
Target Language; index to word mapping
1 ----> <start>
5 ----> tom
313 ----> loves
1393 ----> bananas
3 ----> .
2 ----> <end>
음 이건 사진 보면 더 이해가 편할수도 있겠네요
train 데이터엔 인덱스가 들어가 있으므로 convert 함수를 통해 인덱스를 단어로 바꿔주네요
BUFFER_SIZE = len(input_tensor_train)
BATCH_SIZE = 64
steps_per_epoch = len(input_tensor_train)//BATCH_SIZE
embedding_dim = 128
units = 512
vocab_inp_size = len(inp_lang.word_index)+1
vocab_tar_size = len(targ_lang.word_index)+1
dataset = tf.data.Dataset.from_tensor_slices((input_tensor_train, target_tensor_train)).shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE, drop_remainder=True)
print('iteration = ',steps_per_epoch )
iteration = 73
이제 각종 파라미터를 준비해줍니다.
# dataset 크기 출력
example_input_batch, example_target_batch = next(iter(dataset))
example_input_batch.shape, example_target_batch.shape
(TensorShape([64, 97]), TensorShape([64, 112]))
사이즈를 보면 배치 사이즈로 나와있는 것을 볼 수 있습니다. 한 iteration마다 저 데이터 셋이 사용되는 것 입니다.
class Encoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, enc_units, batch_sz):
super(Encoder, self).__init__()
self.batch_sz = batch_sz # 배치 사이즈
self.enc_units = enc_units # 인코더 유닛 수
# 유닛 수 : 여기선 GRU의 출력 공간 차원
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) # 임베딩 레이어 만들기
self.gru = tf.keras.layers.GRU(self.enc_units, #GRU 레이어 생성
return_sequences=True, # 모든 타임 스텝의 출력 반환
return_state=True, # 마지막 상태 반환
recurrent_initializer='glorot_uniform') # 가중치 초기화 방법
def call(self, x, hidden):
x = self.embedding(x) # 임베딩
output, state = self.gru(x, initial_state = hidden)# GRU 통과
return output, state
def initialize_hidden_state(self):
return tf.zeros((self.batch_sz, self.enc_units)) # 초기상태를 0으로 설정
class BahdanauAttention(tf.keras.layers.Layer):
def __init__(self, units):
super(BahdanauAttention, self).__init__()
self.W1 = tf.keras.layers.Dense(units) #q
self.W2 = tf.keras.layers.Dense(units) # k
self.V = tf.keras.layers.Dense(1) # v
def call(self, query, values):
query_with_time_axis = tf.expand_dims(query, 1)# 텐서 차원 추가[배치크기,1,특성 수]
score = self.V(tf.nn.tanh( # BahdanauAttention은 dot product 대신에 합연산 후 tanh를 진행한다.
self.W1(query_with_time_axis) + self.W2(values))) # attention score구하기 [배치크기,시퀀스 길이,1]
attention_weights = tf.nn.softmax(score, axis=1) # score 소프트 맥스하기
context_vector = attention_weights * values # score와 v 곱하기
context_vector = tf.reduce_sum(context_vector, axis=1) # score*v한 백터 합치기
return context_vector, attention_weights
class Decoder(tf.keras.Model):
def __init__(self, vocab_size, embedding_dim, dec_units, batch_sz):
super(Decoder, self).__init__()
self.batch_sz = batch_sz # 배치 크기
self.dec_units = dec_units # GRU 셀 수
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) # 임베딩 벡터로 전환
self.gru = tf.keras.layers.GRU(self.dec_units,
return_sequences=True,
return_state=True,
recurrent_initializer='glorot_uniform')
self.fc = tf.keras.layers.Dense(vocab_size) # 실제 아웃풋을 만들어내야 한다.
self.attention = BahdanauAttention(self.dec_units) # 디코더 유닛을 넣어 어텐션한다.
def call(self, x, hidden, enc_output): # 디코더 현재 인풋, 이전 satate, 인코더의 아웃풋 값들이 어텐션에 사용
context_vector, attention_weights = self.attention(hidden, enc_output)# 더코더의 현재 히든 스테이트와 인코더의 아웃풋 값을 어텐션
x = self.embedding(x) # 입력을 임베딩으로 전환
x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) # 어텐션 출력 값과 임베딩 결합
output, state = self.gru(x) # gru 진행
output = tf.reshape(output, (-1, output.shape[2])) # 출력 형태 조정
x = self.fc(output) # FCN 계산을 진행
return x, state, attention_weights
여기에는 주석을 다 달아놓긴 했는데 살짝 복잡한.....
# encoder
encoder = Encoder(vocab_inp_size, embedding_dim, units, BATCH_SIZE)
# 샘플 입력
sample_hidden = encoder.initialize_hidden_state()
sample_output, sample_hidden = encoder(example_input_batch, sample_hidden)
print ('Encoder output shape: (batch size, sequence length, units) {}'.format(sample_output.shape))
print ('Encoder Hidden state shape: (batch size, units) {}'.format(sample_hidden.shape))
# encoder-decoder attention
attention_layer = BahdanauAttention(10)
attention_result, attention_weights = attention_layer(sample_hidden, sample_output)
print("Attention result shape: (batch size, units) {}".format(attention_result.shape))
print("Attention weights shape: (batch_size, sequence_length, 1) {}".format(attention_weights.shape))
# decoder
decoder = Decoder(vocab_tar_size, embedding_dim, units, BATCH_SIZE)
sample_decoder_output, _, _ = decoder(tf.random.uniform((BATCH_SIZE, 1)),
sample_hidden, sample_output)
print ('Decoder output shape: (batch_size, vocab size) {}'.format(sample_decoder_output.shape))
Encoder output shape: (batch size, sequence length, units) (64, 97, 512)
Encoder Hidden state shape: (batch size, units) (64, 512)
Attention result shape: (batch size, units) (64, 512)
Attention weights shape: (batch_size, sequence_length, 1) (64, 97, 1)
Decoder output shape: (batch_size, vocab size) (64, 3211)
데이터 사이즈를 확인할 수 있습니다.
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none') # 단어를 생성하는 것은 모든 보캡에 대해 loss를 구해야된다 -> 확률값을 비교해야된다. -> 연산이 너무 많다. -> 아주 작은 값도 크게 키워준다.?
def loss_function(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 0))
loss_ = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_mean(loss_)
checkpoint = tf.train.Checkpoint(optimizer=optimizer,
encoder=encoder,
decoder=decoder)
loss function과 optimizer등 지정해줍니다.
def train_step(inp, targ, enc_hidden):
loss = 0
with tf.GradientTape() as tape:
# encoder
enc_output, enc_hidden = encoder(inp, enc_hidden)
dec_hidden = enc_hidden
# decoder
dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)
# 교사 강요(teacher forcing) - 다음 입력으로 타겟을 피딩(feeding)합니다.
for t in range(1, targ.shape[1]):
# enc_output를 디코더에 전달합니다.
predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
loss += loss_function(targ[:, t], predictions) # 여기서 출력하면 인덱스 형태가 낭로 것이다.
# 교사 강요(teacher forcing)를 사용합니다.
dec_input = tf.expand_dims(targ[:, t], 1) # input을 prediction으로 두는게 아니라 label을 넣는다.
batch_loss = (loss / int(targ.shape[1]))
variables = encoder.trainable_variables + decoder.trainable_variables
gradients = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(gradients, variables))
return batch_loss
학습 함수를 정의해줍니다. 주석 추가 버전으로 하나 더 넣을게요
def train_step(inp, targ, enc_hidden):
loss = 0
# GradientTape를 사용하여 학습 동안 발생하는 연산을 기록합니다.
# 이를 통해 자동 미분을 사용하여 그래디언트를 계산할 수 있습니다.
with tf.GradientTape() as tape:
# 인코더를 실행하여 입력 데이터에 대한 출력과 숨겨진 상태를 얻습니다.
enc_output, enc_hidden = encoder(inp, enc_hidden)
dec_hidden = enc_hidden # 디코더의 초기 상태를 인코더의 최종 상태로 설정
# 디코더의 첫 입력 설정. 모든 예시에 대해 '<start>' 토큰을 사용
dec_input = tf.expand_dims([targ_lang.word_index['<start>']] * BATCH_SIZE, 1)
# 타겟 문장의 각 토큰에 대해 디코더를 반복 실행합니다.
# 교사 강요(Teacher Forcing)를 사용하여 현재 타겟 토큰을 다음 입력으로 제공합니다.
for t in range(1, targ.shape[1]): # 첫 번째 토큰은 이미 처리했으므로 1부터 시작
# 디코더를 실행하여 예측 결과와 디코더의 숨겨진 상태를 얻습니다.
predictions, dec_hidden, _ = decoder(dec_input, dec_hidden, enc_output)
# 손실을 계산하여 총 손실에 추가합니다.
loss += loss_function(targ[:, t], predictions)
# 다음 입력을 위해 현재 타겟 토큰을 디코더 입력으로 설정합니다.
dec_input = tf.expand_dims(targ[:, t], 1)
# 배치에 대한 평균 손실을 계산합니다.
batch_loss = (loss / int(targ.shape[1]))
# 학습 가능한 변수들(인코더와 디코더의 변수)을 가져옵니다.
variables = encoder.trainable_variables + decoder.trainable_variables
# 기록된 연산을 바탕으로 변수들의 그래디언트를 계산합니다.
gradients = tape.gradient(loss, variables)
# 계산된 그래디언트를 이용하여 모델의 변수를 업데이트합니다.
optimizer.apply_gradients(zip(gradients, variables))
return batch_loss
# 이 파트가 매우 오래걸림
EPOCHS = 1
for epoch in range(EPOCHS):
start = time.time()
enc_hidden = encoder.initialize_hidden_state()
total_loss = 0
for (batch, (inp, targ)) in enumerate(dataset.take(steps_per_epoch)):
batch_loss = train_step(inp, targ, enc_hidden)
total_loss += batch_loss
if batch % 10 == 0:
print('Epoch {} / Batch {} / Loss {:.4f} / Time taken {} sec'.format(epoch + 1,
batch,
batch_loss.numpy(),
time.time() - start))
# 에포크가 2번 실행될때마다 모델 저장 (체크포인트)
#if (epoch + 1) % 2 == 0:
# checkpoint.save(file_prefix = checkpoint_prefix)
print('Epoch {} / Loss {:.4f}'.format(epoch + 1,
total_loss / steps_per_epoch))
print('Time taken for {} epoch {} sec\n'.format(epoch + 1,
time.time() - start))
학습을 시켜주는데 거의 30~40분이....
def evaluate(sentence):
attention_plot = np.zeros((max_length_targ, max_length_inp))
sentence = preprocess_sentence(sentence)
inputs = [inp_lang.word_index[i] for i in sentence.split(' ')]
inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
maxlen=max_length_inp,
padding='post')
inputs = tf.convert_to_tensor(inputs)
result = ''
hidden = [tf.zeros((1, units))]
enc_out, enc_hidden = encoder(inputs, hidden)
dec_hidden = enc_hidden
dec_input = tf.expand_dims([targ_lang.word_index['<start>']], 0)
for t in range(max_length_targ):
predictions, dec_hidden, attention_weights = decoder(dec_input,
dec_hidden,
enc_out)
# 나중에 어텐션 가중치를 시각화하기 위해 어텐션 가중치를 저장합니다.
attention_weights = tf.reshape(attention_weights, (-1, ))
attention_plot[t] = attention_weights.numpy()
predicted_id = tf.argmax(predictions[0]).numpy()
result += targ_lang.index_word[predicted_id] + ' '
if targ_lang.index_word[predicted_id] == '<end>':
return result, sentence, attention_plot
# 예측된 ID를 모델에 다시 피드합니다.
dec_input = tf.expand_dims([predicted_id], 0)
return result, sentence, attention_plot
평가 함수! 이것도 주석 달아서
def evaluate(sentence):
# 어텐션 가중치를 기록하기 위한 행렬 초기화. 시각화할 때 사용합니다.
attention_plot = np.zeros((max_length_targ, max_length_inp))
# 입력 문장을 전처리합니다 (소문자화, 구두점 처리, 토큰 추가 등).
sentence = preprocess_sentence(sentence)
# 전처리된 문장을 단어 인덱스로 변환합니다.
inputs = [inp_lang.word_index[i] for i in sentence.split(' ')]
# 패딩을 추가하여 모든 입력 시퀀스의 길이를 일정하게 맞춥니다.
inputs = tf.keras.preprocessing.sequence.pad_sequences([inputs],
maxlen=max_length_inp,
padding='post')
# 텐서로 변환합니다.
inputs = tf.convert_to_tensor(inputs)
# 결과를 저장할 빈 문자열 초기화
result = ''
# 인코더 초기 상태 설정
hidden = [tf.zeros((1, units))]
# 인코더 실행
enc_out, enc_hidden = encoder(inputs, hidden)
# 디코더의 초기 상태를 인코더의 최종 상태로 설정
dec_hidden = enc_hidden
# 디코더의 첫 입력 설정 ('<start>' 토큰)
dec_input = tf.expand_dims([targ_lang.word_index['<start>']], 0)
# 타겟 문장의 최대 길이까지 반복
for t in range(max_length_targ):
# 디코더 실행
predictions, dec_hidden, attention_weights = decoder(dec_input,
dec_hidden,
enc_out)
# 어텐션 가중치를 저장하여 나중에 시각화에 사용
attention_weights = tf.reshape(attention_weights, (-1, ))
attention_plot[t] = attention_weights.numpy()
# 가장 확률이 높은 단어의 인덱스를 추출
predicted_id = tf.argmax(predictions[0]).numpy()
# 인덱스를 단어로 변환하여 결과 문자열에 추가
result += targ_lang.index_word[predicted_id] + ' '
# 만약 예측된 단어가 종료 토큰이면, 반복을 종료
if targ_lang.index_word[predicted_id] == '<end>':
return result, sentence, attention_plot
# 예측된 단어를 다음 입력으로 사용
dec_input = tf.expand_dims([predicted_id], 0)
# 최대 길이까지 번역이 완료되면 결과 반환
return result, sentence, attention_plot
def translate(sentence):
result, sentence, attention_plot = evaluate(sentence)
print('Input: %s' % (sentence))
print('Predicted translation: {}'.format(result))
번역 함수입니다!
이렇게 사용 가능합니다!
학습이 너무 덜 되었네요...
728x90
'인공지능 > 자연어 처리' 카테고리의 다른 글
NLP Python - BERT for Question Answering, Tokenizer, Evaluate(f1 score) (32) | 2024.05.22 |
---|---|
한->영 번역기 만들기 python 실습 - seq2seq, LSTM, GRU, BLEU score (0) | 2024.05.05 |
자연어 처리 python 양방향(Bidirectional) LSTM 진행하기 (0) | 2024.04.28 |
자연어 처리 python 실습 - 한to영 기계 번역 모델 학습 및 평가 (1) | 2024.04.26 |
자연어 처리 python 실습 - 한국어 기계 번역 데이터 수집 및 전처리 (0) | 2024.04.26 |