2021/01/12 - [머신러닝/딥러닝] - 순환 신경망(Recurrent Neural Network) (3)
TBPTT(Truncated Back-Propagation Through Time)
바닐라 순환 신경망의 역전파인 BPTT(back-propagation through time)는 기울기를 거슬러 전파할 때 동일한 가중치를 반복적으로 곱하는데, 이로 인해 기울기가 너무 작아지는 현상인 기울기 소실(vanishing gradient) 혹은 커지는 현상인 기울기 폭주(exploding gradient)가 발생할 수 있는 것입니다. 이를 방지하기 위해 기울기의 전파하는 타임 스텝의 수를 제한하는 것이 TBPTT 입니다.
LSTM(Long Short-Term Memory)
TBPTT 방법은 기울기가 타임 스텝 끝까지 전파되지 않아 전체적인 관계를 파악하기 어렵습니다. LSTM은 긴 시퀀스를 모델링하기 위해 고안된 것으로 기울기 소실 문제를 개선합니다. (여전히 기울기 폭주는 문제는 갖을 수 있습니다.)
다음은 LSTM의 셀 구조를 나타냅니다. (은닉 상태 $ h $, 셀 상태 $ c $)
이전 타임 스텝의 은닉층 $ h^{t-1} $ 과 입력 $ x^t $ 의 선형 계산값이 여러 갈래($ f^t $, $ i^t $, $ n^t $, $ o^t $)로 나뉘어지는데, $ f^t $ 는 시그모이드 함수를 통과한 값으로 이전 셀 상태 정보 $ c^{t-1} $ 의 몇 퍼센트를 기억할 것인지 계산합니다. $ i^t $, $ n^t $ 는 시그모이드 함수와 하이퍼볼릭 탄젠트 함수를 각각 통과한 값으로, 이를 곱하고 $ f^t $ 와 더하여 현재 셀 상태 $ c^t $ 를 계산합니다. 현재 은닉 상태 $ h^t $ 는 $ c^t $ 을 하이퍼볼릭 탄젠트 함수를 통과하고 시그모이드 함수를 통과한 $ o^t $ 와 곱하여 계산합니다.
텐서플로에서 제공하는 LSTM의 셀은 다음과 같습니다. (tf.nn.rnn_cell.LSTMCell)
import tensorflow as tf
tf.reset_default_graph()
x = tf.constant([[[0., 0.], [0.1, 0.1], [0.2, 0.2]]])
cell = tf.nn.rnn_cell.LSTMCell(4)
outputs, state = tf.nn.dynamic_rnn(cell, x, dtype=tf.float32)
fc = tf.layers.dense(state.h, 1, tf.nn.sigmoid)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
print(sess.run(state))
print(sess.run(state).c)
print(sess.run(state).h)
print(sess.run(state).c.shape)
print(sess.run(state).h.shape)
print(sess.run(outputs).shape)
# print(sess.run(fc))
LSTMStateTuple(c=array([[0.01844714, 0.01318356, 0.08716054, 0.07302517]], dtype=float32), h=array([[0.00916587, 0.00682306, 0.04287135, 0.0369204 ]], dtype=float32))
[[0.01844714 0.01318356 0.08716054 0.07302517]]
[[0.00916587 0.00682306 0.04287135 0.0369204 ]]
(1, 4)
(1, 4)
(1, 3, 4)
상태의 출력은 셀 상태와 은닉 상태로 구성된 튜플입니다. 다대일 모델이라 가정하면, 위와 같이 마지막 타임 스텝의 은닉 상태를 완전 연결층과 결합합니다.
구현
다대일 순환 신경망 모델을 구현합니다.
IMDB 데이터셋을 불러옵니다.
import numpy as np
from tensorflow.keras.datasets import imdb
(x_train, y_train), (x_test, y_test) = imdb.load_data(skip_top=20, num_words=1000)
skip_top 은 빈도수 상위 20개를 생략하는 것이고, num_words 는 데이터를 이루는 단어의 개수를 지정합니다.
어휘 사전을 불러옵니다.
word_to_index = imdb.get_word_index()
index_to_word = { v: k for k, v in word_to_index.items() }
index_to_word
{34701: 'fawn',
52006: 'tsukino',
52007: 'nunnery',
16816: 'sonja',
63951: 'vani',
1408: 'woods',
16115: 'spiders',
2345: 'hanging',
2289: 'woody',
52008: 'trawling',
...
}
2는 어휘 사전에 없는 단어로 생략합니다.
for i in range(len(x_train)):
x_train[i] = [w for w in x_train[i] if w > 2]
for i in range(len(x_test)):
x_test[i] = [w for w in x_test[i] if w > 2]
x_train[0]
[22, 43, 65, 66, 36, 25, 43, 50, 35, 39, 38, 50, 22, 22, 71, 87, 43, 38, 76, 22, 62, 66, 33, 38, 25, 51, 36, 48, 25, 33, 22, 28, 77, 52, 82, 36, 71, 43, 26, 46, 88, 98, 32, 56, 26, 22, 21, 26, 30, 51, 36, 28, 92, 25, 65, 38, 88, 32, 32]
샘플의 길이를 동일하게 조정합니다. (tf.keras.preprocessing.sequence.pad_sequences)
from tensorflow.keras.preprocessing import sequence
x_train = sequence.pad_sequences(x_train, 100)
x_test = sequence.pad_sequences(x_test, 100)
x_train.shape
(25000, 100)
전처리를 끝내고 학습 데이터의 20%를 검증 데이터로 분할합니다.
from sklearn.model_selection import train_test_split
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)
print(x_train.shape)
print(x_val.shape)
(20000, 100, 100)
(5000, 100, 100)
신경망을 정의합니다.
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Embedding
class Model:
def __init__(self, lr=1e-3):
tf.reset_default_graph()
with tf.name_scope('input'):
self.x = tf.placeholder(tf.float32, [None, 100])
self.y = tf.placeholder(tf.float32, [None, 1])
with tf.name_scope('layer'):
ebd = Embedding(1000, 32)
output = ebd(self.x)
cell = tf.nn.rnn_cell.LSTMCell(32)
outputs, state = tf.nn.dynamic_rnn(cell, output, dtype=tf.float32)
with tf.name_scope('output'):
self.output = tf.layers.dense(state.h, 1, tf.nn.sigmoid)
with tf.name_scope('accuracy'):
self.predict = tf.cast(tf.greater(self.output, tf.constant([0.5])), dtype=tf.float32)
self.accuracy = tf.reduce_mean(tf.cast(tf.equal(self.y, self.predict), dtype=tf.float32))
with tf.name_scope('loss'):
cross_entropy = tf.keras.losses.binary_crossentropy(y_true=self.y, y_pred=self.output)
self.loss = tf.reduce_mean(cross_entropy)
with tf.name_scope('optimizer'):
self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)
with tf.name_scope('summary'):
self.summary_loss = tf.placeholder(tf.float32)
self.summary_accuracy = tf.placeholder(tf.float32)
tf.summary.scalar('loss', self.summary_loss)
tf.summary.scalar('accuracy', self.summary_accuracy)
self.merge = tf.summary.merge_all()
self.train_writer = tf.summary.FileWriter('./tmp/lstm_imdb/train', tf.get_default_graph())
self.val_writer = tf.summary.FileWriter('./tmp/lstm_imdb/val', tf.get_default_graph())
self.sess = tf.Session()
self.sess.run(tf.global_variables_initializer())
def write_summary(self, tl, ta, vl, va, epoch):
train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
self.train_writer.add_summary(train_summary, epoch)
self.val_writer.add_summary(val_summary, epoch)
def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
data_size = len(x_train)
for e in range(epochs):
t_l, t_a = [], []
idx = np.random.permutation(np.arange(data_size))
_x_train, _y_train = x_train[idx], y_train[idx]
for i in range(0, data_size, batch_size):
si, ei = i, i + batch_size
if ei > data_size:
ei = data_size
x_batch, y_batch = _x_train[si:ei, :], _y_train[si:ei]
tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
t_l.append(tl)
t_a.append(ta)
vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
print('epoch:', e + 1, ' / loss:', np.mean(t_l), '/ acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
def score(self, x, y):
return self.sess.run(self.accuracy, {self.x: x, self.y: y})
타겟 데이터의 형태를 변형합니다.
y_train = y_train.reshape(-1, 1)
y_val = y_val.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)
모델을 학습하고 테스트합니다.
model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=5)
model.score(x_test, y_test)
epoch: 1 / loss: 0.4271072 / acc: 0.80275 / val_loss: 0.35389245 / val_acc: 0.8508
epoch: 2 / loss: 0.3363528 / acc: 0.8554 / val_loss: 0.3609104 / val_acc: 0.8476
epoch: 3 / loss: 0.3165999 / acc: 0.86565 / val_loss: 0.3475892 / val_acc: 0.8526
epoch: 4 / loss: 0.30119175 / acc: 0.8721 / val_loss: 0.343326 / val_acc: 0.8588
epoch: 5 / loss: 0.28766757 / acc: 0.8763 / val_loss: 0.3521059 / val_acc: 0.8558
0.851
'머신러닝 > 딥러닝' 카테고리의 다른 글
배치 정규화(Batch Normalization) (0) | 2021.02.09 |
---|---|
가중치 초기화(Weight Initialization) (0) | 2021.02.08 |
순환 신경망(Recurrent Neural Network) (3) (0) | 2021.01.12 |
순환 신경망(Recurrent Neural Network) (2) (0) | 2021.01.04 |
순환 신경망(Recurrent Neural Network) (1) (0) | 2020.11.16 |