인공지능/공부

센서신호, FFT, STFT data를 통해 하중 예측하기 -4 마지막

이게될까 2023. 12. 5. 14:22
728x90
728x90

2023.11.30 - [인공지능/공부] - 센서신호, FFT, STFT data를 통해 하중 예측하기 -3 스케쥴러

 

센서신호, FFT, STFT data를 통해 하중 예측하기 -3 스케쥴러

2023.11.30 - [인공지능/공부] - 센서신호, FFT, STFT data를 통해 하중 예측하기 -2 LSTM 센서신호, FFT, STFT data를 통해 하중 예측하기 -2 LSTM 2023.11.30 - [인공지능/공부] - 센서신호, FFT, STFT data를 통해 하중

yoonschallenge.tistory.com

이전 코드와 달라진 것은 batchNormalization이 추가되었다. 이 함수의 역할은 밑에 사진과 같다.

sigmoid를 사용하다보면 널리 퍼져있는 값들은 무시하는 것이 컸다. 그래서 그 값들을 무시하지 않고, 최대한 다 사용하기 위해 위와 같이 이동시켜준다고 생각하면 편하다. 스케줄러도 내 코드에 맞게 파라미터들을 고쳤다. 5시간 걸릴 학습이 1시간이면 끝나기 때문에 최대한 잘 고친 코드이다.

A_scan_amp_train_sclae = (A_scan_amp_train-np.min(A_scan_amp_train))/np.max(A_scan_amp_train-np.min(A_scan_amp_train))
A_scan_amp_test_sclae = (A_scan_amp_test-np.min(A_scan_amp_train))/np.max(A_scan_amp_train-np.min(A_scan_amp_train))
FFT_magnitude_train_sclae = (FFT_magnitude_train-np.min(FFT_magnitude_train))/np.max(FFT_magnitude_train-np.min(FFT_magnitude_train))
FFT_magnitude_test_sclae = (FFT_magnitude_test-np.min(FFT_magnitude_train))/np.max(FFT_magnitude_train-np.min(FFT_magnitude_train))
STFT_ArbMag_train_sclae = (STFT_ArbMag_train-np.min(STFT_ArbMag_train))/np.max(STFT_ArbMag_train-np.min(STFT_ArbMag_train))
STFT_ArbMag_test_sclae = (STFT_ArbMag_test-np.min(STFT_ArbMag_train))/np.max(STFT_ArbMag_train-np.min(STFT_ArbMag_train))

스케일링도 고쳤다. 이 값에 대한 결과는 밑과 같다.

728x90

 

최대를 1, 최소를 0으로 맞춰준 것이다. test는 트레인과 맞춰줘야 하므로 train에 사용된 값을 그대로 사용하였다.

 

initial_learning_rate = 0.05
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate,
    decay_steps=63,
    decay_rate=0.90,
    staircase=True)

한 epoch당 learning rate를 90%로 만드는 것으로 했더니 확실하게 저점을 찾아갔다.

input_2d = tf.keras.Input(shape=(1836, 100, 1))
x = tf.keras.layers.Conv2D(32, (3, 3), activation='sigmoid', padding = 'same')(input_2d)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = BatchNormalization()(x)
x = tf.keras.layers.Conv2D(64, (3, 3), activation='sigmoid', padding = 'same')(x)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = BatchNormalization()(x)
x = tf.keras.layers.Conv2D(128, (3, 3), activation='sigmoid', padding = 'same')(x)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = BatchNormalization()(x)
x = tf.keras.layers.Conv2D(256, (3, 3), activation='sigmoid', padding = 'same')(x)
x = BatchNormalization()(x)
x = tf.keras.layers.Conv2D(256, (3, 3), activation='sigmoid', padding = 'same')(x)
x = GlobalAveragePooling2D()(x)
x = tf.keras.Model(inputs=input_2d, outputs=x)

# 1D 데이터(예: 벡터)를 위한 입력 경로
input_1d = tf.keras.Input(shape=(10001,1))
y = LSTM(36, return_sequences=True)(input_1d)
y = BatchNormalization()(y)
y = LSTM(64)(y)  # 마지막 LSTM 층에서는 return_sequences를 False로 설정 (기본값)
y = BatchNormalization()(y)
y =  tf.keras.layers.Dense(128, activation='sigmoid', kernel_regularizer = l2(0.05))(y)
y = tf.keras.Model(inputs=input_1d, outputs=y)

input_scalar = tf.keras.Input(shape=(3504,1))
z = LSTM(36, return_sequences=True)(input_scalar)
z = BatchNormalization()(z)
z = LSTM(64)(z)  # 마지막 LSTM 층에서는 return_sequences를 False로 설정 (기본값)
z = BatchNormalization()(z)
z =  tf.keras.layers.Dense(128, activation='sigmoid', kernel_regularizer = l2(0.05))(z)

z = tf.keras.Model(inputs=input_scalar, outputs=z)

# 두 경로의 결합
combined = tf.keras.layers.concatenate([x.output, y.output, z.output])

# 추가 레이어 및 최종 예측 레이어
final_layer = Dense(128, activation='sigmoid', kernel_regularizer = l2(0.05))(combined)
final_layer = BatchNormalization()(final_layer)
#final_layer = Dropout (0.3)(final_layer)
final_layer = Dense(8, activation='sigmoid', kernel_regularizer = l2(0.05))(final_layer)
final_output = Dense(1)(final_layer)

# 최종 모델
model = tf.keras.Model(inputs=[x.input, y.input, z.input], outputs=final_output)

# 모델 컴파일
model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = lr_schedule), loss='mse', metrics=[r_squared])

네트워크는 크게 달라진 점이 없다.

from tensorflow.keras.models import load_model

# 저장된 모델 파일 경로
saved_model_path = "best_model.h5"

# 모델 불러오기
loaded_model = load_model(saved_model_path, custom_objects={'r_squared': r_squared})

# 불러온 모델을 사용한 예측 또는 평가
# 예를 들어, 테스트 데이터셋에 대한 평가를 수행할 수 있습니다.
#loss, rsquare = loaded_model.evaluate([STFT_ArbMag_test, A_scan_amp_test, FFT_magnitude_test])
#print("Test Loss:", loss)
#print("Test rsquare:", rsquare)

이걸로 모델 읽어서 

batchsize = 21
yhat =[]
for i in range (0,252,batchsize) :
    pred = loaded_model.predict([STFT_ArbMag_test_sclae[i:i+batchsize,:,:,:], A_scan_amp_test_sclae[i:i+batchsize,:,:], FFT_magnitude_test_sclae[i:i+batchsize,:,:]])
    print(loaded_model.evaluate([STFT_ArbMag_test_sclae[i:i+batchsize,:,:,:], A_scan_amp_test_sclae[i:i+batchsize,:,:], FFT_magnitude_test_sclae[i:i+batchsize,:,:]],y_test[i:i+batchsize]))
    yhat.extend(pred)

한번에 다 못 읽으니 배치 나눠서 평가하고

yhat_array = np.concatenate(yhat, axis=0)
r2_value = r_squared_numpy(np.array(y_test), yhat_array)
print("R-squared:", r2_value)
plt.plot(y_test,label = 'real')
plt.plot(yhat_array, label = 'pred')
plt.legend()

그에 대한 그래프 확인하면 끝!

728x90