독학 연구소
공부한 내용을 정리합니다.
머신러닝/딥러닝 (17)
오토인코더(AutoEncoder)

오토인코더(AutoEncoder, AE)

오토인코더는 인코더(encoder)디코더(decoder)로 이루어진 신경망입니다. 인코더는 고차원의 입력 데이터를 저차원의 표현 벡터(representation vector)로 압축하며, 디코더는 압축을 해제하여 복원합니다. 즉 신경망의 출력은 입력 데이터에 대해 재구성된 것입니다. 이러한 입력과 출력 간의 손실을 최소화하는 방향으로 학습하며 레이블없는 데이터를 사용하는 비지도학습(unsupervised learning)입니다.

 

 

군집화(Clustering)

인코더의 출력은 고차원의 입력 데이터에 대한 압축되어진 특징이라고 할 수 있습니다. 이를 2차원 벡터로 하여 산점도로 표현하면 다음과 같습니다.

 

 

이는 비슷한 특징을 가진 데이터 간에 군집을 이루며 분류되는 것입니다.

 

이러한 특징을 이용하여 입력 데이터에 대해 재구성하는 것이 디코더의 역할입니다.

 

 

구현

오토인코더 모델을 구현합니다,

 

MNIST 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras import datasets

(x_train, y_train), (x_test, y_test) = datasets.mnist.load_data()

x_train = x_train.reshape(-1, 28, 28, 1)
x_test = x_test.reshape(-1, 28, 28, 1)

print(x_train.shape)
print(x_test.shape)

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf


class Encoder:
    def __init__(self, input, training):
        with tf.name_scope('layer'):
            conv1 = tf.layers.conv2d(input, 32, 3, 1, padding='same')
            conv1 = tf.layers.batch_normalization(conv1, training=training)
            conv1 = tf.nn.leaky_relu(conv1)
            
            conv2 = tf.layers.conv2d(conv1, 64, 3, 2, padding='same')
            conv2 = tf.layers.batch_normalization(conv2, training=training)
            conv2 = tf.nn.leaky_relu(conv2)
        
            conv3 = tf.layers.conv2d(conv2, 64, 3, 2, padding='same')
            conv3 = tf.layers.batch_normalization(conv3, training=training)
            conv3 = tf.nn.leaky_relu(conv3)
        
            conv4 = tf.layers.conv2d(conv3, 64, 3, 1, padding='same')
            conv4 = tf.layers.batch_normalization(conv4, training=training)
            conv4 = tf.nn.leaky_relu(conv4)
            
            flat = tf.layers.flatten(conv4)
        
        with tf.name_scope('output'):
            self.output = tf.layers.dense(flat, 2)

class Decoder:
    def __init__(self, input, training):
        with tf.name_scope('layer'):
            fc = tf.layers.dense(input, 3136)
            reshape = tf.reshape(fc, [-1, 7, 7, 64])
            
            conv_tran1 = tf.layers.conv2d_transpose(reshape, 64, 3, 1, padding='same')
            conv_tran1 = tf.layers.batch_normalization(conv_tran1, training=training)
            conv_tran1 = tf.nn.leaky_relu(conv_tran1)
            
            conv_tran2 = tf.layers.conv2d_transpose(conv_tran1, 64, 3, 2, padding='same')
            conv_tran2 = tf.layers.batch_normalization(conv_tran2, training=training)
            conv_tran2 = tf.nn.leaky_relu(conv_tran2)
            
            conv_tran3 = tf.layers.conv2d_transpose(conv_tran2, 64, 3, 2, padding='same')
            conv_tran3 = tf.layers.batch_normalization(conv_tran3, training=training)
            conv_tran3 = tf.nn.leaky_relu(conv_tran3)
            
            conv_tran4 = tf.layers.conv2d_transpose(conv_tran3, 32, 3, 1, padding='same')
            conv_tran4 = tf.layers.batch_normalization(conv_tran4, training=training)
            conv_tran4 = tf.nn.leaky_relu(conv_tran4)
        
        with tf.name_scope('output'):
            self.output = tf.layers.conv2d_transpose(conv_tran4, 1, 3, 1, padding='same', activation=tf.nn.sigmoid)
            
class Model:
    def __init__(self, lr=1e-3):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.input = tf.placeholder(tf.float32, [None, 28, 28, 1])
            self.training = tf.placeholder(tf.bool)

        with tf.name_scope('preprocessing'):
            input_norm = self.input / 255.0
            
        self.encoder = Encoder(input_norm, self.training)
        self.decoder = Decoder(self.encoder.output, self.training)
        
        with tf.name_scope('loss'):
            self.loss = tf.losses.mean_squared_error(input_norm, self.decoder.output)
        
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        with tf.name_scope('optimizer'):
            train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)
            self.train_op = tf.group([train_op, update_ops])
        
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())

    def encode(self, x):
        return self.sess.run(self.encoder.output, {self.input: x, self.training: False})  
    
    def decode(self, x):
        return self.sess.run(self.decoder.output, {self.input: x, self.training: False})
        
    def train(self, x_train, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l = []
            idx = np.random.permutation(np.arange(data_size))
            _x_train = x_train[idx]
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch = _x_train[si:ei, :, :]
                
                _, loss = self.sess.run([self.train_op, self.loss], {self.input: x_batch, self.training: True})
                t_l.append(loss)
                
            print('epoch:', e + 1, '/ loss:', np.mean(t_l))   

 

모델을 학습합니다.

model = Model(lr=5e-4)
model.train(x_train, epochs=10)
epoch: 1 / loss: 0.050720453
epoch: 2 / loss: 0.044929303
epoch: 3 / loss: 0.043423332
epoch: 4 / loss: 0.042490218
epoch: 5 / loss: 0.04188228
epoch: 6 / loss: 0.041391663
epoch: 7 / loss: 0.041031186
epoch: 8 / loss: 0.040655673
epoch: 9 / loss: 0.04040248
epoch: 10 / loss: 0.040120468

 

테스트 데이터로 2차원 표현 벡터를 산점도로 나타내고 무작위 데이터의 포인트를 확인합니다.

import matplotlib.pyplot as plt

encoder_output = model.encode(x_test)

plt.figure(figsize=(16, 8))
plt.scatter(encoder_output[:, 0], encoder_output[:, 1], c=y_test, cmap=plt.get_cmap('Paired'), s=10)
plt.colorbar()
plt.title('Encoder output')

n = 4
idx = np.random.choice(range(len(x_test)), n)

encoder_output = model.encode(x_test[idx])
decoder_output = model.decode(x_test[idx])

plt.scatter(encoder_output[:, 0], encoder_output[:, 1], c='black', s=100)
plt.show()

fig, axs = plt.subplots(n, 2, constrained_layout=True, sharey=True, figsize=(5, 5))

for i in range(n):
    axs[i][0].imshow(x_test[idx][i], aspect='auto', cmap='gray')
    axs[i][0].axis('off')
    axs[i][1].imshow(decoder_output[i], aspect='auto', cmap='gray')
    axs[i][1].axis('off')
    
    if i == 0:
        axs[i][0].set_title('True')
        axs[i][1].set_title('Prediction')
    
plt.show()

 

군집화가 명확하지 않은 영역에 대한 복원은 잘 이루어지지 않는 것을 볼 수 있습니다.

 

 

 

'머신러닝 > 딥러닝' 카테고리의 다른 글

드롭아웃(Dropout)  (0) 2021.02.09
가중치 규제(Weight Regularization)  (0) 2021.02.09
배치 정규화(Batch Normalization)  (0) 2021.02.09
가중치 초기화(Weight Initialization)  (0) 2021.02.08
LSTM(Long Short-Term Memory)  (0) 2021.02.03
  Comments,     Trackbacks
드롭아웃(Dropout)

 

2020/11/13 - [머신러닝/딥러닝] - 합성곱 신경망(Convolutional Neural Network)

 

드롭아웃(Dropout)

과대적합을 억제하는 기법으로 무작위로 일부 뉴런 혹은 유닛을 비활성화합니다. 이에 따라 일부 유닛을 과도하게 의존하는 것을 방지하는 것입니다. 이러한 드롭아웃은 학습시에 적용하고 테스트에는 모든 유닛을 활성화합니다.

 

 

 

구현

합성곱 신경망 모델에 드롭아웃을 적용합니다.

 

패션 MNIST 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras import datasets

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

print('data shape:', x_train.shape)

print('target shape:', y_train.shape)
print('target label:', np.unique(y_train, return_counts=True))
data shape: (60000, 28, 28)
target shape: (60000,)
target label: (array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8), array([6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000],
      dtype=int64))

 

데이터 형태를 변형합니다.

x_train = x_train.reshape(60000, 28, 28, 1)
x_test = x_test.reshape(10000, 28, 28, 1)

 

학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf

class Model:
    def __init__(self, lr=1e-3):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 28, 28, 1])
            self.y = tf.placeholder(tf.int64)
            self.dropout_rate = tf.placeholder(tf.float32)

        with tf.name_scope('preprocessing'):
            x_norm = self.x / 255.0
            y_onehot = tf.one_hot(self.y, 10)
        
        with tf.name_scope('layer'):
            conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)
            pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')
            conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)
            pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')
            flat = tf.layers.flatten(pool2)
            dropout = tf.nn.dropout(flat, self.dropout_rate)
            fc = tf.layers.dense(dropout, 64, tf.nn.relu)
            logits = tf.layers.dense(fc, 10)
        
        with tf.name_scope('output'):
            self.predict = tf.argmax(tf.nn.softmax(logits), 1)

        with tf.name_scope('accuracy'):
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.to_int64(self.predict), self.y), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=y_onehot, logits=logits)
            self.loss = tf.reduce_mean(cross_entropy)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/cnn_fashion_mnist/dropout_train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/cnn_fashion_mnist/dropout_val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())
    
    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch, self.dropout_rate: 0.5})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val, self.dropout_rate: 1.0})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y, self.dropout_rate: 1.0})

 

드롭아웃층은 유닛을 유지하는 비율을 지정합니다.(tf.nn.dropout)

with tf.name_scope('input'):
    self.x = tf.placeholder(tf.float32, [None, 28, 28, 1])
    self.y = tf.placeholder(tf.int64)
    self.dropout_rate = tf.placeholder(tf.float32)

...

with tf.name_scope('layer'):
    conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)
    pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')
    conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)
    pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')
    flat = tf.layers.flatten(pool2)
    dropout = tf.nn.dropout(flat, self.dropout_rate)
    fc = tf.layers.dense(dropout, 64, tf.nn.relu)
    logits = tf.layers.dense(fc, 10)

 

비율을 0.3으로 지정한다면 무작위 유닛 70%을 비활성화합니다. 테스트 및 검증에는 1.0으로 지정하여 모든 유닛을 활성화합니다.

 

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=20)
model.score(x_test, y_test)
epoch: 1  / train_loss: 0.54586285 / train_acc: 0.8024167  / val_loss: 0.3937745 / val_acc: 0.8516667
epoch: 2  / train_loss: 0.37997714 / train_acc: 0.861  / val_loss: 0.32430208 / val_acc: 0.88058335
epoch: 3  / train_loss: 0.33082 / train_acc: 0.8768542  / val_loss: 0.3169227 / val_acc: 0.88075
epoch: 4  / train_loss: 0.3031651 / train_acc: 0.8883333  / val_loss: 0.28263652 / val_acc: 0.8961667
epoch: 5  / train_loss: 0.27801484 / train_acc: 0.89666665  / val_loss: 0.2777464 / val_acc: 0.896
epoch: 6  / train_loss: 0.26787764 / train_acc: 0.89902085  / val_loss: 0.28193024 / val_acc: 0.8936667
epoch: 7  / train_loss: 0.25405225 / train_acc: 0.904875  / val_loss: 0.26641348 / val_acc: 0.90258336
epoch: 8  / train_loss: 0.24449031 / train_acc: 0.9074375  / val_loss: 0.26794034 / val_acc: 0.89975
epoch: 9  / train_loss: 0.23362926 / train_acc: 0.91216666  / val_loss: 0.25061563 / val_acc: 0.90791667
epoch: 10  / train_loss: 0.2289409 / train_acc: 0.9132917  / val_loss: 0.25986716 / val_acc: 0.90566665
epoch: 11  / train_loss: 0.21823755 / train_acc: 0.9184375  / val_loss: 0.2457858 / val_acc: 0.9076667
epoch: 12  / train_loss: 0.21376282 / train_acc: 0.918375  / val_loss: 0.25176075 / val_acc: 0.9073333
epoch: 13  / train_loss: 0.21100517 / train_acc: 0.92108333  / val_loss: 0.252951 / val_acc: 0.90783334
epoch: 14  / train_loss: 0.20125681 / train_acc: 0.9237083  / val_loss: 0.25745368 / val_acc: 0.90758336
epoch: 15  / train_loss: 0.19526783 / train_acc: 0.9266667  / val_loss: 0.24359514 / val_acc: 0.9130833
epoch: 16  / train_loss: 0.19302467 / train_acc: 0.9270833  / val_loss: 0.23960693 / val_acc: 0.91116667
epoch: 17  / train_loss: 0.18864958 / train_acc: 0.92808336  / val_loss: 0.24460652 / val_acc: 0.91275
epoch: 18  / train_loss: 0.18488875 / train_acc: 0.9294583  / val_loss: 0.23526335 / val_acc: 0.9159167
epoch: 19  / train_loss: 0.17939131 / train_acc: 0.932125  / val_loss: 0.2448088 / val_acc: 0.9115
epoch: 20  / train_loss: 0.17674284 / train_acc: 0.93214583  / val_loss: 0.23607413 / val_acc: 0.9155833

0.9164

 

에포크에 대한 정확도와 손실 함수의 그래프는 다음과 같습니다. (주황: 학습, 파랑: 검증, 빨강: 드롭아웃 학습, 하늘: 드롭아웃 검증)

 

 

드롭아웃을 적용하여 과대적합이 발생하지 않으며 성능이 소폭 향상되었습니다. 

 

 

  Comments,     Trackbacks
가중치 규제(Weight Regularization)

 

2020/11/13 - [머신러닝/딥러닝] - 합성곱 신경망(Convolutional Neural Network)

 

가중치 규제(Weight Regularization)

가중치의 값이 커지지 않도록 제한하는 기법으로 과대적합(overfitting)을 억제합니다. 손실 함수에 norm을 추가하는 것으로 대표적으로 L1 규제와 L2 규제가 있습니다.

 

L1 규제는 손실 함수에 L1 norm을 추가하는 것입니다.

 

$ L + \alpha \left| \left| w \right| \right|_1 $

 

L1 norm은 가중치의 절대값을 더한 값이며,  $ \alpha $ 는 규제의 양을 조절하는 파라미터입니다.

 

$ \left| \left| w \right| \right|_1 = \displaystyle \sum_{i=1}^n \left| w_i \right| $

 

경사 하강법을 이용한 업데이트 식은 다음과 같습니다.

 

$\begin{aligned}
w' & = w + { \partial \over \partial w } L + \color{ red }{ { \partial \over \partial w } \alpha \ \displaystyle \sum_{i=1}^n \left| w_i \right| } \\
& = w + { \partial \over \partial w } L + \color{ red }{ \alpha \ sign(w) }
\end{aligned}$

 

$ \left| w \right| $ 을 미분하면 부호만 남습니다. $ sign(w) $ 은 $ w $ 의 부호라는 의미입니다.

 

 

L2 규제는 손실 함수에 L2 norm을 추가하는 것입니다.

 

$ L + \alpha \ \left| \left| w \right| \right|_2 $

 

L2 norm은 가중치의 제곱을 더한 값이며,  $ \alpha $ 는 규제의 양을 조절하는 파라미터입니다.

 

$ \left| \left| w \right| \right|_2 = { 1 \over 2 } \displaystyle \sum_{i=1}^n \left| w_i \right|^2 $

 

경사 하강법을 이용한 업데이트 식은 다음과 같습니다.

$\begin{aligned}
w' & = w + { \partial \over \partial w } L + \color{ red }{ { \partial \over \partial w } { 1 \over 2 } \alpha \ \displaystyle \sum_{i=1}^n \left| w_i \right|^2 }\\
& = w + { \partial \over \partial w } L + \color{ red }{ \alpha \ w }
\end{aligned}$

 

 

텐서플로에서 제공하는 규제를 이용할 수 있습니다. (tf.contrib.layers.l2_regularizer)

import tensorflow as tf

tf.reset_default_graph()

x = tf.placeholder(tf.float32, [None, 1])

reg1 = tf.contrib.layers.l1_regularizer(scale=0.1)
reg2 = tf.contrib.layers.l2_regularizer(scale=0.1)

fc1 = tf.layers.dense(x, 2, kernel_regularizer=reg1)
fc2 = tf.layers.dense(x, 2, kernel_regularizer=reg2)

vars = tf.trainable_variables()
norm = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    w1 = sess.run(vars[0])
    w2 = sess.run(vars[2])
    norm_ = sess.run(norm, {x: [[1.]]})

    print('weights')
    print(w1)
    print(w2)
    print()
    
    print('L1 norm')
    print(norm_[0], np.sum(abs(w1)) * 0.1)
    print('L2 norm')
    print(norm_[1], np.sum(np.square(w2))* 0.5 * 0.1)
weights
[[-0.5047436  1.1104265]]
[[-0.5299379   0.07715666]]

L1 norm
0.16151701 0.16151701211929323
L2 norm
0.014339368 0.014339368045330049

 

신경망의 손실 함수에 추가할 norm은 규제가 적용된 가중치의 합으로 구합니다.

import tensorflow as tf

tf.reset_default_graph()

x = tf.placeholder(tf.float32, [None, 1])

reg = tf.contrib.layers.l2_regularizer(scale=0.1)

fc1 = tf.layers.dense(x, 2, kernel_regularizer=reg)
fc2 = tf.layers.dense(x, 2, kernel_regularizer=reg)

vars = tf.trainable_variables()
norm = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    w1 = sess.run(vars[0])
    w2 = sess.run(vars[2])
    fc1_ = np.sum(np.square(w1))* 0.5 * 0.1
    fc2_ = np.sum(np.square(w2))* 0.5 * 0.1
    norm_ = sess.run(norm, {x: [[1.]]})

    print('weights')
    print(w1)
    print(w2)
    print()
    
    print('norm')
    print(np.sum(norm_))
    print(fc1_ + fc2_)
weights
[[0.26062596 0.3645985 ]]
[[-0.8904719 -0.8322536]]

norm
0.08432221
0.08432220667600632

 

 

구현

합성곱 신경망 모델에 L2 규제를 적용합니다.

 

패션 MNIST 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras import datasets

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

print('data shape:', x_train.shape)

print('target shape:', y_train.shape)
print('target label:', np.unique(y_train, return_counts=True))
data shape: (60000, 28, 28)
target shape: (60000,)
target label: (array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8), array([6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000],
      dtype=int64))

 

데이터 형태를 변형합니다.

x_train = x_train.reshape(60000, 28, 28, 1)
x_test = x_test.reshape(10000, 28, 28, 1)

 

학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf

class Model:
    def __init__(self, lr=1e-3, reg=False, lambda_reg=0.01, path_name=''):
        tf.reset_default_graph()
        
        regularizer=None
        if reg:
            regularizer = tf.contrib.layers.l2_regularizer(scale=lambda_reg)
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 28, 28, 1])
            self.y = tf.placeholder(tf.int64)

        with tf.name_scope('preprocessing'):
            x_norm = self.x / 255.0
            y_onehot = tf.one_hot(self.y, 10)
            
        with tf.name_scope('layer'):
            conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)
            pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')
            conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)
            pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')
            flat = tf.layers.flatten(pool2)

            fc = tf.layers.dense(flat, 64, tf.nn.relu, kernel_regularizer=regularizer)
            logits = tf.layers.dense(fc, 10, kernel_regularizer=regularizer)
            
        with tf.name_scope('output'):
            self.predict = tf.argmax(tf.nn.softmax(logits), 1)

        with tf.name_scope('accuracy'):
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.to_int64(self.predict), self.y), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=y_onehot, logits=logits)
            self.loss = tf.reduce_mean(cross_entropy)
            if reg:
                norm = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
                self.loss += tf.reduce_sum(norm)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/cnn_fashion_mnist/' + path_name + 'train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/cnn_fashion_mnist/' + path_name + 'val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())
    
    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y})

 

규제를 정의합니다. 

regularizer=None
if reg:
    regularizer = tf.contrib.layers.l2_regularizer(scale=lambda_reg)

 

규제를 적용할 층에 매개변수로 지정합니다.

with tf.name_scope('layer'):
    conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)
    pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')
    conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)
    pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')
    flat = tf.layers.flatten(pool2)

    ## regularizer ##
    fc = tf.layers.dense(flat, 64, tf.nn.relu, kernel_regularizer=regularizer)
    logits = tf.layers.dense(fc, 10, kernel_regularizer=regularizer)

 

손실 함수에 norm을 추가합니다.

with tf.name_scope('loss'):
    cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=y_onehot, logits=logits)
    self.loss = tf.reduce_mean(cross_entropy)
    if reg:
        norm = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
        self.loss += tf.reduce_sum(norm)

 

규제를 적용하지 않은 모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=20)
model.score(x_test, y_test)
epoch: 1  / train_loss: 0.4899995 / train_acc: 0.82308334  / val_loss: 0.34418163 / val_acc: 0.87675
epoch: 2  / train_loss: 0.32420996 / train_acc: 0.8821458  / val_loss: 0.32503137 / val_acc: 0.8814167
epoch: 3  / train_loss: 0.2795372 / train_acc: 0.89872915  / val_loss: 0.27930364 / val_acc: 0.8980833
epoch: 4  / train_loss: 0.24964015 / train_acc: 0.9086667  / val_loss: 0.25830066 / val_acc: 0.90525
epoch: 5  / train_loss: 0.22372074 / train_acc: 0.91735417  / val_loss: 0.25123677 / val_acc: 0.9059167
epoch: 6  / train_loss: 0.20405148 / train_acc: 0.92397916  / val_loss: 0.23442969 / val_acc: 0.91391665
epoch: 7  / train_loss: 0.18366154 / train_acc: 0.93202084  / val_loss: 0.25422248 / val_acc: 0.906
epoch: 8  / train_loss: 0.16686963 / train_acc: 0.93939584  / val_loss: 0.24268445 / val_acc: 0.91508335
epoch: 9  / train_loss: 0.1534329 / train_acc: 0.94364583  / val_loss: 0.24112262 / val_acc: 0.91525
epoch: 10  / train_loss: 0.13780946 / train_acc: 0.9493542  / val_loss: 0.23617037 / val_acc: 0.91541666
epoch: 11  / train_loss: 0.12407387 / train_acc: 0.9548333  / val_loss: 0.26025593 / val_acc: 0.91616666
epoch: 12  / train_loss: 0.112011366 / train_acc: 0.9585417  / val_loss: 0.2636378 / val_acc: 0.916
epoch: 13  / train_loss: 0.10216832 / train_acc: 0.96141666  / val_loss: 0.2987651 / val_acc: 0.91066664
epoch: 14  / train_loss: 0.09138283 / train_acc: 0.9656875  / val_loss: 0.29418647 / val_acc: 0.9174167
epoch: 15  / train_loss: 0.08280988 / train_acc: 0.96975  / val_loss: 0.31967795 / val_acc: 0.91425
epoch: 16  / train_loss: 0.075191446 / train_acc: 0.97283334  / val_loss: 0.3187853 / val_acc: 0.9095
epoch: 17  / train_loss: 0.06787912 / train_acc: 0.9746042  / val_loss: 0.3280637 / val_acc: 0.91525
epoch: 18  / train_loss: 0.06372369 / train_acc: 0.9764583  / val_loss: 0.34051234 / val_acc: 0.91608334
epoch: 19  / train_loss: 0.05584117 / train_acc: 0.9784167  / val_loss: 0.36320615 / val_acc: 0.9105
epoch: 20  / train_loss: 0.053313486 / train_acc: 0.9807708  / val_loss: 0.42804903 / val_acc: 0.9059167

0.8992

 

10 에포크 이후에 과대적합이 발생합니다.

 

 

규제의 양을 조절하는 파라미터 값 0.0001, 0.001, 0.01을 비교합니다.

 

파라미터 값을 0.0001로 지정한 경우에는 규제의 강도가 미비하여 과대적합이 발생합니다.

model = Model(reg=True, lambda_reg=0.0001, path_name='weight_decay_0.0001_')
model.train(x_train, y_train, x_val, y_val, epochs=20)
model.score(x_test, y_test)
epoch: 1  / train_loss: 0.51481104 / train_acc: 0.8163125  / val_loss: 0.41753772 / val_acc: 0.85225
epoch: 2  / train_loss: 0.345886 / train_acc: 0.8766875  / val_loss: 0.32372278 / val_acc: 0.88841665
epoch: 3  / train_loss: 0.30285904 / train_acc: 0.89360416  / val_loss: 0.31370842 / val_acc: 0.89208335
epoch: 4  / train_loss: 0.27255118 / train_acc: 0.9043125  / val_loss: 0.28951547 / val_acc: 0.90108335
epoch: 5  / train_loss: 0.24855728 / train_acc: 0.91485417  / val_loss: 0.29960057 / val_acc: 0.89675
epoch: 6  / train_loss: 0.23295376 / train_acc: 0.92170835  / val_loss: 0.28628847 / val_acc: 0.90316665
epoch: 7  / train_loss: 0.21696827 / train_acc: 0.9271875  / val_loss: 0.2921477 / val_acc: 0.902
epoch: 8  / train_loss: 0.20461519 / train_acc: 0.9319792  / val_loss: 0.27835524 / val_acc: 0.90925
epoch: 9  / train_loss: 0.1896936 / train_acc: 0.939875  / val_loss: 0.3034753 / val_acc: 0.90358335
epoch: 10  / train_loss: 0.17822354 / train_acc: 0.94325  / val_loss: 0.3268171 / val_acc: 0.9
epoch: 11  / train_loss: 0.17124838 / train_acc: 0.94602084  / val_loss: 0.2925508 / val_acc: 0.91108334
epoch: 12  / train_loss: 0.1610266 / train_acc: 0.95104164  / val_loss: 0.32277325 / val_acc: 0.90291667
epoch: 13  / train_loss: 0.15321763 / train_acc: 0.9546458  / val_loss: 0.31398556 / val_acc: 0.907
epoch: 14  / train_loss: 0.1440719 / train_acc: 0.95783335  / val_loss: 0.32070822 / val_acc: 0.90833336
epoch: 15  / train_loss: 0.13773532 / train_acc: 0.960125  / val_loss: 0.33933806 / val_acc: 0.90608335
epoch: 16  / train_loss: 0.13126785 / train_acc: 0.96283334  / val_loss: 0.34860352 / val_acc: 0.90891665
epoch: 17  / train_loss: 0.12526983 / train_acc: 0.96520835  / val_loss: 0.3467158 / val_acc: 0.909
epoch: 18  / train_loss: 0.119342156 / train_acc: 0.9675625  / val_loss: 0.34521878 / val_acc: 0.9098333
epoch: 19  / train_loss: 0.11426294 / train_acc: 0.9703125  / val_loss: 0.38304567 / val_acc: 0.90875
epoch: 20  / train_loss: 0.11378552 / train_acc: 0.9705625  / val_loss: 0.37654424 / val_acc: 0.91125

0.9094

 

 

파라미터 값을 0.001로 지정한 경우에는 과대적합이 발생하지 않으며 성능이 소폭 향상되었습니다.

model = Model(reg=True, lambda_reg=0.001, path_name='weight_decay_0.001_')
model.train(x_train, y_train, x_val, y_val, epochs=20)
model.score(x_test, y_test)
epoch: 1  / train_loss: 0.55039114 / train_acc: 0.8218333  / val_loss: 0.4208436 / val_acc: 0.87133336
epoch: 2  / train_loss: 0.39176953 / train_acc: 0.87941664  / val_loss: 0.37457174 / val_acc: 0.88558334
epoch: 3  / train_loss: 0.3536052 / train_acc: 0.8917292  / val_loss: 0.3548428 / val_acc: 0.89208335
epoch: 4  / train_loss: 0.3287368 / train_acc: 0.90052086  / val_loss: 0.34722483 / val_acc: 0.8950833
epoch: 5  / train_loss: 0.31021914 / train_acc: 0.907625  / val_loss: 0.35307658 / val_acc: 0.89208335
epoch: 6  / train_loss: 0.29850483 / train_acc: 0.9120625  / val_loss: 0.3384591 / val_acc: 0.89933336
epoch: 7  / train_loss: 0.28659913 / train_acc: 0.9167917  / val_loss: 0.31821793 / val_acc: 0.9065833
epoch: 8  / train_loss: 0.27779076 / train_acc: 0.9196042  / val_loss: 0.34279007 / val_acc: 0.8980833
epoch: 9  / train_loss: 0.26766524 / train_acc: 0.9232708  / val_loss: 0.3185545 / val_acc: 0.9059167
epoch: 10  / train_loss: 0.25809973 / train_acc: 0.9271042  / val_loss: 0.31635404 / val_acc: 0.90716666
epoch: 11  / train_loss: 0.2516942 / train_acc: 0.9291667  / val_loss: 0.33354875 / val_acc: 0.9030833
epoch: 12  / train_loss: 0.24395902 / train_acc: 0.93291664  / val_loss: 0.3235069 / val_acc: 0.90716666
epoch: 13  / train_loss: 0.23825896 / train_acc: 0.93379164  / val_loss: 0.32964057 / val_acc: 0.9026667
epoch: 14  / train_loss: 0.23224322 / train_acc: 0.93604165  / val_loss: 0.34270942 / val_acc: 0.90216666
epoch: 15  / train_loss: 0.22589256 / train_acc: 0.93854165  / val_loss: 0.32007968 / val_acc: 0.91108334
epoch: 16  / train_loss: 0.21986204 / train_acc: 0.94083333  / val_loss: 0.32736585 / val_acc: 0.90816665
epoch: 17  / train_loss: 0.2155721 / train_acc: 0.9426875  / val_loss: 0.3209575 / val_acc: 0.912
epoch: 18  / train_loss: 0.211696 / train_acc: 0.94416666  / val_loss: 0.3179196 / val_acc: 0.91366667
epoch: 19  / train_loss: 0.20734118 / train_acc: 0.94677085  / val_loss: 0.32665992 / val_acc: 0.91225
epoch: 20  / train_loss: 0.2022699 / train_acc: 0.9475833  / val_loss: 0.31761256 / val_acc: 0.91358334

0.9106

 

 

파라미터 값을 0.01로 지정한 경우에는 규제의 강도가 커서 과소적합이 발생합니다.

model = Model(reg=True, lambda_reg=0.01, path_name='weight_decay_0.01_')
model.train(x_train, y_train, x_val, y_val, epochs=20)
model.score(x_test, y_test)
epoch: 1  / train_loss: 0.7216794 / train_acc: 0.8040625  / val_loss: 0.58291274 / val_acc: 0.83925
epoch: 2  / train_loss: 0.50784343 / train_acc: 0.8576667  / val_loss: 0.49030805 / val_acc: 0.8609167
epoch: 3  / train_loss: 0.4646731 / train_acc: 0.8681667  / val_loss: 0.4526889 / val_acc: 0.87375
epoch: 4  / train_loss: 0.43478292 / train_acc: 0.8775833  / val_loss: 0.47555774 / val_acc: 0.8635833
epoch: 5  / train_loss: 0.4173219 / train_acc: 0.8821458  / val_loss: 0.41809207 / val_acc: 0.8805
epoch: 6  / train_loss: 0.40076128 / train_acc: 0.88591665  / val_loss: 0.4433061 / val_acc: 0.87191665
epoch: 7  / train_loss: 0.38866675 / train_acc: 0.89079165  / val_loss: 0.43044075 / val_acc: 0.8745
epoch: 8  / train_loss: 0.3777382 / train_acc: 0.89391667  / val_loss: 0.39097238 / val_acc: 0.88958335
epoch: 9  / train_loss: 0.36583894 / train_acc: 0.8960417  / val_loss: 0.3819606 / val_acc: 0.89225
epoch: 10  / train_loss: 0.35933015 / train_acc: 0.8998333  / val_loss: 0.4025547 / val_acc: 0.8825833
epoch: 11  / train_loss: 0.35212287 / train_acc: 0.90122914  / val_loss: 0.37651968 / val_acc: 0.89225
epoch: 12  / train_loss: 0.34527814 / train_acc: 0.9032292  / val_loss: 0.37082198 / val_acc: 0.8933333
epoch: 13  / train_loss: 0.33904266 / train_acc: 0.9048125  / val_loss: 0.3604129 / val_acc: 0.89525
epoch: 14  / train_loss: 0.33481276 / train_acc: 0.9063333  / val_loss: 0.35880318 / val_acc: 0.89675
epoch: 15  / train_loss: 0.33108407 / train_acc: 0.90708333  / val_loss: 0.3690926 / val_acc: 0.8955
epoch: 16  / train_loss: 0.32683825 / train_acc: 0.9093958  / val_loss: 0.36419964 / val_acc: 0.8979167
epoch: 17  / train_loss: 0.3242014 / train_acc: 0.908125  / val_loss: 0.35681537 / val_acc: 0.8965833
epoch: 18  / train_loss: 0.31887272 / train_acc: 0.9120625  / val_loss: 0.3517346 / val_acc: 0.8995
epoch: 19  / train_loss: 0.31730837 / train_acc: 0.9111875  / val_loss: 0.3501565 / val_acc: 0.89933336
epoch: 20  / train_loss: 0.3142373 / train_acc: 0.9133125  / val_loss: 0.34369817 / val_acc: 0.90216666

0.9005

 

과소적합으로 학습과 검증의 학습 곡선의 간격이 가까운 것을 확인할 수 있습니다.

 

 

 

'머신러닝 > 딥러닝' 카테고리의 다른 글

오토인코더(AutoEncoder)  (0) 2021.02.16
드롭아웃(Dropout)  (0) 2021.02.09
배치 정규화(Batch Normalization)  (0) 2021.02.09
가중치 초기화(Weight Initialization)  (0) 2021.02.08
LSTM(Long Short-Term Memory)  (0) 2021.02.03
  Comments,     Trackbacks
배치 정규화(Batch Normalization)

 

2020/11/12 - [머신러닝/강화학습] - 소프트맥스 회귀(Softmax Regression) (2)

 

배치 정규화(Batch Normalization)

입력 데이터의 분포를 평균이 0, 표준편차가 1인 정규 분포로 만들어 주는 것으로, 활성화 함수의 앞 혹은 뒤에서 수행하여 층의 활성화값을 편향되지 않게 적절히 분포하도록 조정하는 것입니다. 이에 따라 신경망의 학습 속도를 개선하며 가중치 초기화에 영향을 덜 받습니다. 또한 과대적합(overfitting)을 억제하는 효과가 있습니다.

 

 

다음은 입력 데이터의 분포를 평균이 0, 표준편차가 1인 정규화를 나타냅니다.

 

$\begin{aligned} 

\mu_B^{\ } & \leftarrow { 1 \over n } \displaystyle \sum_{i=1}^n x_i \\

\sigma_B^2 & \leftarrow { 1 \over n } \displaystyle \sum_{i=1}^n ( x_i - \mu_B^{\ } )^2 \\

\hat x_i & \leftarrow { { x_i - \mu_B^{\ } } \over \sqrt{ \sigma_B^2 + \varepsilon } }

\end{aligned}$

 

미니배치 $ B $ 는 입력 데이터의 집합을 의미합니다.

 

$ B = \left\{ x_1, x_2, \cdots, x_n \right\} $

 

아주 작은 값인 엡실론 $ \varepsilon $ 은 0으로 나누는 것을 방지합니다.

 

 

정규화 이후에는 확대(scale)이동(shift)을 수행합니다.

 

$ y_i \leftarrow \gamma \hat x_i + \beta $

 

$ \gamma $ 는 확대, $ \beta $ 는 이동을 의미하며 역전파를 통해 학습하는 신경망의 파라미터입니다.

 

 

구현

소프트맥스 회귀 모델에 배치 정규화를 적용합니다.

 

패션 MNIST 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras import datasets

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

print('data shape:', x_train.shape)

print('target shape:', y_train.shape)
print('target label:', np.unique(y_train, return_counts=True))
data shape: (60000, 28, 28)
target shape: (60000,)
target label: (array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8), array([6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000],
      dtype=int64))

 

학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf

class Model:
    def __init__(self, lr=1e-3, momentum=0.9):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 28, 28])
            self.y = tf.placeholder(tf.int64)
            self.training = tf.placeholder(tf.bool)

        with tf.name_scope('preprocessing'):
            x_norm = self.x / 255.0
            y_onehot = tf.one_hot(self.y, 10)
            
        with tf.name_scope('layer'):
            flat = tf.layers.flatten(x_norm)
            fc = tf.layers.dense(flat, 128)
            batch_norm = tf.layers.batch_normalization(fc, momentum=momentum, training=self.training)
            a = tf.nn.relu(batch_norm)
            logits = tf.layers.dense(a, 10)
            
        with tf.name_scope('output'):
            self.predict = tf.argmax(tf.nn.softmax(logits), 1)

        with tf.name_scope('accuracy'):
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.to_int64(self.predict), self.y), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=y_onehot, logits=logits)
            self.loss = tf.reduce_mean(cross_entropy)
        
        update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
        with tf.name_scope('optimizer'):
            train_op = tf.train.GradientDescentOptimizer(lr).minimize(self.loss)
            self.train_op = tf.group([train_op, update_ops])

#         with tf.control_dependencies(update_ops):
#             with tf.name_scope('optimizer'):
#                 self.train_op = tf.train.GradientDescentOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/softmax-regression_fashion_mnist/batch_norm_train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/softmax-regression_fashion_mnist/batch_norm_val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())
    
    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch, self.training: True})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val, self.training: False})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y, self.training: False})

 

입력 데이터는 완전 연결층을 통한 선형 출력 뒤에 배치 정규화층을 거쳐 렐루 함수로 입력됩니다. (tf.layers.batch_normalization)

# with tf.name_scope('layer'):
#    flat = tf.layers.flatten(x_norm)
#    fc = tf.layers.dense(flat, 128, tf.nn.relu)
#    logits = tf.layers.dense(fc, 10)

with tf.name_scope('layer'):
    flat = tf.layers.flatten(x_norm)
    fc = tf.layers.dense(flat, 128, tf.nn.relu)
    batch_norm = tf.layers.batch_normalization(fc, momentum=momentum, training=self.training)
    a = tf.nn.relu(batch_norm)
    logits = tf.layers.dense(a, 10)

 

다음은 moving_mean 과 moving_var 에 대한 업데이트를 수행하기 위한 것입니다. 

update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.name_scope('optimizer'):
    train_op = tf.train.GradientDescentOptimizer(lr).minimize(self.loss)
    self.train_op = tf.group([train_op, update_ops])

 

추론시에도 입력되는 데이터에 대해 학습 데이터의 통계를 이용하는 것입니다.

 

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=200)
model.score(x_test, y_test)
epoch: 1  / train_loss: 1.1277851 / train_acc: 0.6495625  / val_loss: 0.78220785 / val_acc: 0.75175
epoch: 2  / train_loss: 0.73689634 / train_acc: 0.7668333  / val_loss: 0.65835 / val_acc: 0.78533334
epoch: 3  / train_loss: 0.6495253 / train_acc: 0.78889585  / val_loss: 0.5982603 / val_acc: 0.80341667
epoch: 4  / train_loss: 0.59939104 / train_acc: 0.80097914  / val_loss: 0.55921036 / val_acc: 0.8124167
epoch: 5  / train_loss: 0.5659029 / train_acc: 0.8105417  / val_loss: 0.5313488 / val_acc: 0.8205

...

epoch: 100  / train_loss: 0.2811836 / train_acc: 0.902875  / val_loss: 0.34466657 / val_acc: 0.87416667
epoch: 101  / train_loss: 0.28024608 / train_acc: 0.90358335  / val_loss: 0.34613973 / val_acc: 0.87616664
epoch: 102  / train_loss: 0.28024498 / train_acc: 0.90272915  / val_loss: 0.34177572 / val_acc: 0.87658334
epoch: 103  / train_loss: 0.27824426 / train_acc: 0.90414584  / val_loss: 0.34291297 / val_acc: 0.8750833
epoch: 104  / train_loss: 0.2787735 / train_acc: 0.9031875  / val_loss: 0.34284526 / val_acc: 0.87575
epoch: 105  / train_loss: 0.2761775 / train_acc: 0.9042708  / val_loss: 0.34198853 / val_acc: 0.87625

...

epoch: 195  / train_loss: 0.20221558 / train_acc: 0.9321875  / val_loss: 0.3365637 / val_acc: 0.8795
epoch: 196  / train_loss: 0.20303316 / train_acc: 0.93095833  / val_loss: 0.33854735 / val_acc: 0.87941664
epoch: 197  / train_loss: 0.20070541 / train_acc: 0.931125  / val_loss: 0.3392148 / val_acc: 0.87991667
epoch: 198  / train_loss: 0.19906428 / train_acc: 0.93375  / val_loss: 0.33758998 / val_acc: 0.87916666
epoch: 199  / train_loss: 0.19856991 / train_acc: 0.932375  / val_loss: 0.33869392 / val_acc: 0.87883335
epoch: 200  / train_loss: 0.19747803 / train_acc: 0.9331458  / val_loss: 0.3381471 / val_acc: 0.87908334

0.8772

 

에포크에 대한 정확도와 손실 함수의 그래프는 다음과 같습니다.(주황: 학습, 파랑: 검증, 빨강: 배치 정규화 학습, 하늘: 배치 정규화 검증)

 

 

배치 정규화를 적용하지 않은 모델과 비교하여 학습 속도가 개선된 것을 볼 수 있습니다.

 

 

  Comments,     Trackbacks
가중치 초기화(Weight Initialization)

가중치 초기화(Weight  Initialization)

과대적합(Overfitting)을 억제하는 기법으로 은닉층을 이루는 가중치의 초기값을 0이 아닌 최대한 작은 값에서 시작하는 것입니다. 이는 활성화 함수에 따라 다른 방법을 사용합니다.

 

활성화 함수로 시그모이드(Sigmoid)를 사용하는 경우에 권장하는 방법으로 Xavier 초기화 방법이 있습니다. Xavier는 표준편차가 $ \sqrt{ 1 \over n } $ 인 정규분포를 이용합니다. ($n$ 은 앞 계층의 노드 수)

 

활성화 함수로 렐루(ReLU)를 사용하는 경우에는 He 초기화 방법이 있습니다. He는 표준편차가 $ \sqrt{ 2 \over n } $ 인 정규분포를 이용합니다. 2배의 계수로 인해 Xavier 보다 더 넓게 분포할 것입니다.

 

 

구현

가중치 초기화 방법에 따른 활성화 분포를 비교를 하기 위한 시각화를 구현합니다.

import numpy as np
import matplotlib.pyplot as plt

plt.rc('figure', figsize=(10, 3))

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def relu(x):
    return np.maximum(x, 0)

def weight_initialize(node_num, std=1):
    return np.random.randn(node_num, node_num) * std

def forward(title, init, activation_function, hidden_layer_size=5):
    activations = []
    
    x = np.random.randn(1000, 100)
    for i in range(hidden_layer_size):
        w = init
        z = np.dot(x, w)
        a = activation_function(z)
        activations.append(a)
        x = a
    
    fig, axs = plt.subplots(1, len(activations), constrained_layout=True, sharey=True)
    plt.ylim(0, 10000)
    for i, layer in enumerate(activations):
        axs[i].hist(layer.flatten(), 20, range=(0, 1))
        axs[i].set_xlabel('{}-layer'.format(str(i+1)))
    fig.suptitle(title)

 

시그모이드 함수를 사용하는 경우에 표준편차 1, 0.01, $ \sqrt{ 1 \over n } $ (Xavier) 초기화에 따른 분포를 비교합니다.

# sigmoid, activations

node_num = 100

init = weight_initialize(node_num)
forward('std=1', init, sigmoid)

init = weight_initialize(node_num, 0.01)
forward('std=0.01', init, sigmoid)

xavier = (1 / np.sqrt(node_num))
init = weight_initialize(node_num, xavier)
forward('Xavier', init, sigmoid)

 

표준편차를 0.01로 하였을 경우, 분포가 0.5에 집중된 것을 볼 수 있습니다. 이는 많은 가중치를 갖는 의미가 없는 것입니다.

 

 

렐루 함수를 사용하는 경우에 표준편차 0.01, $ \sqrt{ 1 \over n } $ (Xavier), $ \sqrt{ 2 \over n } $ (He) 초기화에 따른 분포를 비교합니다.

# relu, activations

node_num = 100

init = weight_initialize(node_num, 0.01)
forward('std=0.01', init, relu)

xavier = (1 / np.sqrt(node_num))
init = weight_initialize(node_num, xavier)
forward('Xavier', init, relu)

he = (np.sqrt(2 / node_num))
init = weight_initialize(node_num, he)
forward('He', init, relu)

 

각 층의 분포를 살펴보면, 표준편차가 0.01인 경우에는 거의 학습이 이루어지지 않을 것입니다. 활성화 값은 0에 가까운 작은 값이며, 학습시에 역으로 전파되는 가중치의 기울기 역시 매우 작다는 것입니다. Xavier인 경우에도 층이 깊어지면서 활성화 값이 점점 0으로 편향되는 것을 볼 수 있습니다. 이와 같은 현상은 학습시에 기울기 소실(Gradient Vanishing) 문제를 일으키는 것입니다.

 

은닉층의 크기를 10으로 지정해보면 더욱 큰 차이를 나타냅니다.

node_num = 100

xavier = (1 / np.sqrt(node_num))
init = weight_initialize(node_num, xavier)
forward('Xavier', init, relu, 10)

he = (np.sqrt(2 / node_num))
init = weight_initialize(node_num, he)
forward('He', init, relu, 10)

 

이에 반해 He는 층이 깊어져도 고르게 분포되는 것을 볼 수 있습니다.

 

 

  Comments,     Trackbacks
LSTM(Long Short-Term Memory)

 

2021/01/12 - [머신러닝/딥러닝] - 순환 신경망(Recurrent Neural Network) (3)

 

TBPTT(Truncated Back-Propagation Through Time)

바닐라 순환 신경망의 역전파인 BPTT(back-propagation through time)는 기울기를 거슬러 전파할 때 동일한 가중치를 반복적으로 곱하는데, 이로 인해 기울기가 너무 작아지는 현상인 기울기 소실(vanishing gradient) 혹은 커지는 현상인 기울기 폭주(exploding gradient)가 발생할 수 있는 것입니다. 이를 방지하기 위해 기울기의 전파하는 타임 스텝의 수를 제한하는 것이 TBPTT 입니다.

 

 

 

LSTM(Long Short-Term Memory)

TBPTT 방법은 기울기가 타임 스텝 끝까지 전파되지 않아 전체적인 관계를 파악하기 어렵습니다. LSTM은 긴 시퀀스를 모델링하기 위해 고안된 것으로 기울기 소실 문제를 개선합니다. (여전히 기울기 폭주는 문제는 갖을 수 있습니다.)

 

다음은 LSTM의 셀 구조를 나타냅니다. (은닉 상태 $ h $, 셀 상태 $ c $)

 

 

이전 타임 스텝의 은닉층 $ h^{t-1} $ 과 입력 $ x^t $ 의 선형 계산값이 여러 갈래($ f^t $, $ i^t $, $ n^t $, $ o^t $)로 나뉘어지는데, $ f^t $ 는 시그모이드 함수를 통과한 값으로 이전 셀 상태 정보 $ c^{t-1} $ 의 몇 퍼센트를 기억할 것인지 계산합니다. $ i^t $, $ n^t $ 는 시그모이드 함수와 하이퍼볼릭 탄젠트 함수를 각각 통과한 값으로, 이를 곱하고 $ f^t $ 와 더하여 현재 셀 상태 $ c^t $ 를 계산합니다. 현재 은닉 상태 $ h^t $ 는 $ c^t $ 을 하이퍼볼릭 탄젠트 함수를 통과하고 시그모이드 함수를 통과한 $ o^t $ 와 곱하여 계산합니다.

 

텐서플로에서 제공하는 LSTM의 셀은 다음과 같습니다. (tf.nn.rnn_cell.LSTMCell)

import tensorflow as tf

tf.reset_default_graph()

x = tf.constant([[[0., 0.], [0.1, 0.1], [0.2, 0.2]]])

cell = tf.nn.rnn_cell.LSTMCell(4)

outputs, state = tf.nn.dynamic_rnn(cell, x, dtype=tf.float32)

fc = tf.layers.dense(state.h, 1, tf.nn.sigmoid)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print(sess.run(state))
    print(sess.run(state).c)
    print(sess.run(state).h)
    
    print(sess.run(state).c.shape)
    print(sess.run(state).h.shape)
    
    print(sess.run(outputs).shape)
#     print(sess.run(fc))
LSTMStateTuple(c=array([[0.01844714, 0.01318356, 0.08716054, 0.07302517]], dtype=float32), h=array([[0.00916587, 0.00682306, 0.04287135, 0.0369204 ]], dtype=float32))
[[0.01844714 0.01318356 0.08716054 0.07302517]]
[[0.00916587 0.00682306 0.04287135 0.0369204 ]]
(1, 4)
(1, 4)
(1, 3, 4)

 

상태의 출력은 셀 상태와 은닉 상태로 구성된 튜플입니다. 다대일 모델이라 가정하면, 위와 같이 마지막 타임 스텝의 은닉 상태를 완전 연결층과 결합합니다.

 

 

구현

다대일 순환 신경망 모델을 구현합니다.

 

IMDB 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras.datasets import imdb

(x_train, y_train), (x_test, y_test) = imdb.load_data(skip_top=20, num_words=1000)

 

skip_top 은 빈도수 상위 20개를 생략하는 것이고, num_words 는 데이터를 이루는 단어의 개수를 지정합니다.

 

어휘 사전을 불러옵니다.

word_to_index = imdb.get_word_index()
index_to_word = { v: k for k, v in word_to_index.items() }

index_to_word
{34701: 'fawn',
 52006: 'tsukino',
 52007: 'nunnery',
 16816: 'sonja',
 63951: 'vani',
 1408: 'woods',
 16115: 'spiders',
 2345: 'hanging',
 2289: 'woody',
 52008: 'trawling',
 ...
 }

 

2는 어휘 사전에 없는 단어로 생략합니다.

for i in range(len(x_train)):
    x_train[i] = [w for w in x_train[i] if w > 2]
    
for i in range(len(x_test)):
    x_test[i] = [w for w in x_test[i] if w > 2]
    
x_train[0]
[22, 43, 65, 66, 36, 25, 43, 50, 35, 39, 38, 50, 22, 22, 71, 87, 43, 38, 76, 22, 62, 66, 33, 38, 25, 51, 36, 48, 25, 33, 22, 28, 77, 52, 82, 36, 71, 43, 26, 46, 88, 98, 32, 56, 26, 22, 21, 26, 30, 51, 36, 28, 92, 25, 65, 38, 88, 32, 32]

 

샘플의 길이를 동일하게 조정합니다. (tf.keras.preprocessing.sequence.pad_sequences)

from tensorflow.keras.preprocessing import sequence

x_train = sequence.pad_sequences(x_train, 100)
x_test = sequence.pad_sequences(x_test, 100)

x_train.shape
(25000, 100)

 

전처리를 끝내고 학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

print(x_train.shape)
print(x_val.shape)
(20000, 100, 100)
(5000, 100, 100)

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Embedding

class Model:
    def __init__(self, lr=1e-3):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 100])
            self.y = tf.placeholder(tf.float32, [None, 1])

        with tf.name_scope('layer'):
            ebd = Embedding(1000, 32)
            output = ebd(self.x)
            cell = tf.nn.rnn_cell.LSTMCell(32)
            outputs, state = tf.nn.dynamic_rnn(cell, output, dtype=tf.float32)
                       
        with tf.name_scope('output'):
            self.output = tf.layers.dense(state.h, 1, tf.nn.sigmoid)
            
        with tf.name_scope('accuracy'):
            self.predict = tf.cast(tf.greater(self.output, tf.constant([0.5])), dtype=tf.float32)
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(self.y, self.predict), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.keras.losses.binary_crossentropy(y_true=self.y, y_pred=self.output)
            self.loss = tf.reduce_mean(cross_entropy)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/lstm_imdb/train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/lstm_imdb/val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())

    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / loss:', np.mean(t_l), '/ acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y})

 

타겟 데이터의 형태를 변형합니다.

y_train = y_train.reshape(-1, 1)
y_val = y_val.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=5)
model.score(x_test, y_test)
epoch: 1  / loss: 0.4271072 / acc: 0.80275  / val_loss: 0.35389245 / val_acc: 0.8508
epoch: 2  / loss: 0.3363528 / acc: 0.8554  / val_loss: 0.3609104 / val_acc: 0.8476
epoch: 3  / loss: 0.3165999 / acc: 0.86565  / val_loss: 0.3475892 / val_acc: 0.8526
epoch: 4  / loss: 0.30119175 / acc: 0.8721  / val_loss: 0.343326 / val_acc: 0.8588
epoch: 5  / loss: 0.28766757 / acc: 0.8763  / val_loss: 0.3521059 / val_acc: 0.8558

0.851

 

 

  Comments,     Trackbacks
순환 신경망(Recurrent Neural Network) (3)

 

2021/01/04 - [머신러닝/딥러닝] - 순환 신경망(Recurrent Neural Network) (2)

 

단어 임베딩(Word Embedding)

신경망에 텍스트 데이터를 처리하는 경우에는 텍스트를 숫자화해야 하는데, 이는 원-핫 인코딩으로 전처리할 수 있습니다.

from tensorflow.keras.utils import to_categorical

word = {
    'man': 0,
    'woman': 1
}

to_categorical(list(word.values()))
array([[1., 0.],
       [0., 1.]], dtype=float32)

 

하지만 텍스트 데이터를 원-핫 인코딩할 경우 단어 사이의 관계를 나타내지 못하게됩니다. 이러한 문제를 해결하는 것이 단어 임베딩(word embedding)입니다.

 

 

케라스에서 제공하는 임베딩층을 통해 동작을 확인합니다. (tf.keras.layers.Embedding)

from tensorflow.keras.layers import Embedding

embedding = tf.keras.layers.Embedding(input_dim=2, output_dim=3)
output = embedding(tf.constant([0, 1, 2, 3, 0, 1]))

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print(output.eval())
[[-0.04695427  0.04983367 -0.04970566]
 [ 0.03672746  0.00194643 -0.00861054]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [-0.04695427  0.04983367 -0.04970566]
 [ 0.03672746  0.00194643 -0.00861054]]

 

output_dim은 출력 길이이고, input_dim은 처리할 단어의 개수입니다. 위와 같이 단어의 개수를 2로 지정하면 먼저 입력된 0, 1은 정상적으로 실수값을 출력을 하는 반면에 2, 3은 0을 출력하는 것을 볼 수 있습니다.

 

 

구현

다대일 순환 신경망 모델을 구현합니다.

 

IMDB 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras.datasets import imdb

(x_train, y_train), (x_test, y_test) = imdb.load_data(skip_top=20, num_words=1000)

 

skip_top 은 빈도수 상위 20개를 생략하는 것이고, num_words 는 데이터를 이루는 단어의 개수를 지정합니다.

 

어휘 사전을 불러옵니다.

word_to_index = imdb.get_word_index()
index_to_word = { v: k for k, v in word_to_index.items() }

index_to_word
{34701: 'fawn',
 52006: 'tsukino',
 52007: 'nunnery',
 16816: 'sonja',
 63951: 'vani',
 1408: 'woods',
 16115: 'spiders',
 2345: 'hanging',
 2289: 'woody',
 52008: 'trawling',
 ...
 }

 

2는 어휘 사전에 없는 단어로 생략합니다.

for i in range(len(x_train)):
    x_train[i] = [w for w in x_train[i] if w > 2]
    
for i in range(len(x_test)):
    x_test[i] = [w for w in x_test[i] if w > 2]
    
x_train[0]
[22, 43, 65, 66, 36, 25, 43, 50, 35, 39, 38, 50, 22, 22, 71, 87, 43, 38, 76, 22, 62, 66, 33, 38, 25, 51, 36, 48, 25, 33, 22, 28, 77, 52, 82, 36, 71, 43, 26, 46, 88, 98, 32, 56, 26, 22, 21, 26, 30, 51, 36, 28, 92, 25, 65, 38, 88, 32, 32]

 

샘플의 길이를 동일하게 조정합니다. (tf.keras.preprocessing.sequence.pad_sequences)

from tensorflow.keras.preprocessing import sequence

x_train = sequence.pad_sequences(x_train, 100)
x_test = sequence.pad_sequences(x_test, 100)

x_train.shape
(25000, 100)

 

전처리를 끝내고 학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

print(x_train.shape)
print(x_val.shape)
(20000, 100, 100)
(5000, 100, 100)

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Embedding

class Model:
    def __init__(self, lr=1e-3):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 100])
            self.y = tf.placeholder(tf.float32, [None, 1])

        with tf.name_scope('layer'):
            ebd = Embedding(1000, 32)
            output = ebd(self.x)
            cell = tf.nn.rnn_cell.BasicRNNCell(32)
            outputs, state = tf.nn.dynamic_rnn(cell, output, dtype=tf.float32)
                       
        with tf.name_scope('output'):
            self.output = tf.layers.dense(state, 1, tf.nn.sigmoid)
            
        with tf.name_scope('accuracy'):
            self.predict = tf.cast(tf.greater(self.output, tf.constant([0.5])), dtype=tf.float32)
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(self.y, self.predict), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.keras.losses.binary_crossentropy(y_true=self.y, y_pred=self.output)
            self.loss = tf.reduce_mean(cross_entropy)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/rnn_imdb_2/train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/rnn_imdb_2/val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())

    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / loss:', np.mean(t_l), '/ acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y})

 

타겟 데이터의 형태를 변형합니다.

y_train = y_train.reshape(-1, 1)
y_val = y_val.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=5)
model.score(x_test, y_test)
epoch: 1  / loss: 0.5451177 / acc: 0.72195  / val_loss: 0.45631307 / val_acc: 0.7916
epoch: 2  / loss: 0.4257674 / acc: 0.81515  / val_loss: 0.4409481 / val_acc: 0.806
epoch: 3  / loss: 0.42866415 / acc: 0.8042  / val_loss: 0.49786302 / val_acc: 0.764
epoch: 4  / loss: 0.37565607 / acc: 0.8398  / val_loss: 0.45572335 / val_acc: 0.7998
epoch: 5  / loss: 0.3322474 / acc: 0.8655  / val_loss: 0.45722726 / val_acc: 0.8138

0.8172

 

 

  Comments,     Trackbacks
순환 신경망(Recurrent Neural Network) (2)

 

2020/11/16 - [머신러닝/딥러닝] - 순환 신경망(Recurrent Neural Network) (1)

 

구현

다대일 순환 신경망 모델을 구현합니다.

 

IMDB 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras.datasets import imdb

(x_train, y_train), (x_test, y_test) = imdb.load_data(skip_top=20, num_words=100)

 

skip_top 은 빈도수 상위 20개를 생략하는 것이고, num_words 는 데이터를 이루는 단어의 개수를 지정합니다.

 

데이터를 구성하는 정수 인코딩된 값들은 단어를 나타내는 것입니다.

x_train[0]
[2, 2, 22, 2, 43, 2, 2, 2, 2, 65, 2, 2, 66, 2, 2, 2, 36, 2, 2, 25, 2, 43, 2, 2, 50, 2, 2, 2, 35, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 39, 2, 2, 2, 2, 2, 2, 38, 2, 2, 2, 2, 50, 2, 2, 2, 2, 2, 2, 22, 2, 2, 2, 2, 2, 22, 71, 87, 2, 2, 43, 2, 38, 76, 2, 2, 2, 2, 22, 2, 2, 2, 2, 2, 2, 2, 2, 2, 62, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 66, 2, 33, 2, 2, 2, 2, 38, 2, 2, 25, 2, 51, 36, 2, 48, 25, 2, 33, 2, 22, 2, 2, 28, 77, 52, 2, 2, 2, 2, 82, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 36, 71, 43, 2, 2, 26, 2, 2, 46, 2, 2, 2, 2, 2, 2, 88, 2, 2, 2, 2, 98, 32, 2, 56, 26, 2, 2, 2, 2, 2, 2, 2, 22, 21, 2, 2, 26, 2, 2, 2, 30, 2, 2, 51, 36, 28, 2, 92, 25, 2, 2, 2, 65, 2, 38, 2, 88, 2, 2, 2, 2, 2, 2, 2, 2, 32, 2, 2, 2, 2, 2, 32]

 

어휘 사전을 불러옵니다.

word_to_index = imdb.get_word_index()
index_to_word = { v: k for k, v in word_to_index.items() }

index_to_word
{34701: 'fawn',
 52006: 'tsukino',
 52007: 'nunnery',
 16816: 'sonja',
 63951: 'vani',
 1408: 'woods',
 16115: 'spiders',
 2345: 'hanging',
 2289: 'woody',
 52008: 'trawling',
 ...
 }

 

2는 어휘 사전에 없는 단어로 생략합니다.

for i in range(len(x_train)):
    x_train[i] = [w for w in x_train[i] if w > 2]
    
for i in range(len(x_test)):
    x_test[i] = [w for w in x_test[i] if w > 2]
    
x_train[0]
[22, 43, 65, 66, 36, 25, 43, 50, 35, 39, 38, 50, 22, 22, 71, 87, 43, 38, 76, 22, 62, 66, 33, 38, 25, 51, 36, 48, 25, 33, 22, 28, 77, 52, 82, 36, 71, 43, 26, 46, 88, 98, 32, 56, 26, 22, 21, 26, 30, 51, 36, 28, 92, 25, 65, 38, 88, 32, 32]

 

정수값을 인덱스로 어휘 사전과 맵핑하기 위해서는 -3을 해야합니다.

for w in x_train[0]:
    print(index_to_word[w - 3], end=' ')
film just story really they you just there an from so there film film were great just so much film would really at so you what they if you at film have been good also they were just are out because them all up are film but are be what they have don't you story so because all all

 

각 샘플의 길이는 다르므로 전처리가 필요합니다.

print(len(x_train[0]))
print(len(x_train[1]))
59 
32

 

0으로 채우는 패딩을 이용합니다. (tf.keras.preprocessing.sequence.pad_sequences)

from tensorflow.keras.preprocessing import sequence

x_train = sequence.pad_sequences(x_train, 100)
x_test = sequence.pad_sequences(x_test, 100)

x_train.shape
(25000, 100)

 

정수값을 원-핫 인코딩합니다. (tf.keras.utils.to_categorical)

from tensorflow.keras.utils import to_categorical

x_train = to_categorical(x_train)
x_test = to_categorical(x_test)

x_train.shape
(25000, 100, 100)

 

전처리를 끝내고 학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

print(x_train.shape)
print(x_val.shape)
(20000, 100, 100)
(5000, 100, 100)

 

타겟 데이터의 형태를 변형합니다.

y_train = y_train.reshape(-1, 1)
y_val = y_val.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf

class Model:
    def __init__(self, lr=1e-3):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 100, 100])
            self.y = tf.placeholder(tf.float32, [None, 1])

        with tf.name_scope('layer'):

            cell = tf.nn.rnn_cell.BasicRNNCell(32)
            outputs, state = tf.nn.dynamic_rnn(cell, self.x, dtype=tf.float32)
                       
        with tf.name_scope('output'):
            self.output = tf.layers.dense(state, 1, tf.nn.sigmoid)
            
        with tf.name_scope('accuracy'):
            self.predict = tf.cast(tf.greater(self.output, tf.constant([0.5])), dtype=tf.float32)
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(self.y, self.predict), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.keras.losses.binary_crossentropy(y_true=self.y, y_pred=self.output)
            self.loss = tf.reduce_mean(cross_entropy)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/rnn_imdb/train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/rnn_imdb/val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())

    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / loss:', np.mean(t_l), '/ acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y})

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=10)
model.score(x_test, y_test)
epoch: 1  / loss: 0.6221659 / acc: 0.6369  / val_loss: 0.5833103 / val_acc: 0.6894
epoch: 2  / loss: 0.57779557 / acc: 0.69685  / val_loss: 0.58450437 / val_acc: 0.6992
epoch: 3  / loss: 0.56940305 / acc: 0.7049  / val_loss: 0.577118 / val_acc: 0.6984
epoch: 4  / loss: 0.56505644 / acc: 0.70675  / val_loss: 0.5694209 / val_acc: 0.7142
epoch: 5  / loss: 0.5640247 / acc: 0.7107  / val_loss: 0.5725659 / val_acc: 0.698
epoch: 6  / loss: 0.56029844 / acc: 0.7155  / val_loss: 0.55980057 / val_acc: 0.7198
epoch: 7  / loss: 0.55862373 / acc: 0.7167  / val_loss: 0.563976 / val_acc: 0.7166
epoch: 8  / loss: 0.56085664 / acc: 0.71315  / val_loss: 0.559803 / val_acc: 0.7206
epoch: 9  / loss: 0.5551869 / acc: 0.71845  / val_loss: 0.56608874 / val_acc: 0.7114
epoch: 10  / loss: 0.55442244 / acc: 0.7194  / val_loss: 0.58167785 / val_acc: 0.7012

0.69952

 

에포크에 대한 정확도와 손실 함수의 그래프는 다음과 같습니다.

 

 

 

  Comments,     Trackbacks
순환 신경망(Recurrent Neural Network) (1)

순차 데이터(Sequential Data)

샘플에 순서를 갖는 데이터를 말합니다. 순서를 갖는다는 것은 각 샘플이 독립적이지 않다는 것을 의미합니다.

 

다음은 주식 데이터입니다.

 

 

일정 시간 간격으로 배치되어 현재의 정보가 과거의 정보와 연관된 것입니다. 이러한 순차 데이터는 특히 시계열 데이터(time series data)라고 합니다.

 

 

순환 신경망(Recurrent Neural Network, RNN)

완전 연결 신경망이나 합성곱 신경망은 이전의 샘플에 대해 고려하지 않기 때문에 순차 데이터를 처리하기에 적합하지 않습니다.

 

이를 위한 것이 순환 신경망(recurrent nueral network)으로 순환 구조를 통해 순차 데이터를 모델링할 수 있습니다. 순환 구조는 이전 타임 스텝의 출력을 현재 타임 스텝의 입력으로 사용하는 것인데, 신경망 내부에 상태를 저장하는 것이라고 할 수 있습니다. 또, 이러한 상태를 은닉 상태(hidden state)라고 합니다.

 

순환 신경망은 용도에 따라 입력과 출력의 길이를 다르게 설계할 수 있습니다. 텍스트 이진 분류와 같은 문제는 다대일(many-to-one)에 해당되며, 텍스트 생성, 번역기 등과 같은 문제는 다대다(many-to-many)에 해당됩니다.

 

순환 신경망의 뉴런을 셀(cell)이라고 하며, 하나의 셀을 타임 스텝으로 펼치면 다음과 같습니다.

 

이전 타임 스텝의 은닉 상태를 계속해서 다음 타임 스텝으로 전달함으로써 과거의 정보를 기억하고 업데이트하는 것이라고 할 수 있습니다.

 

 

하이퍼볼릭 탄젠트 함수(Hyperbolic Tangent Function)

비선형 활성화 함수 중 하나로 순환 신경망에서 셀의 출력인 은닉 상태의 활성화 함수로 사용합니다. 출력은 $ (-1, 1) $ 의 범위를 갖습니다.

 

$ tanh(x) =\frac { { e }^{ x }-{ e }^{ -x } }{ { e }^{ x }+{ e }^{ -x } } $

 

$ \frac{d}{dx}tanh(x)=1-tanh(x)^{2} $

 

import numpy as np
import matplotlib.pyplot as plt

def tanh(x):
    return (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))

x = np.arange(-10, 10, 0.1)

plt.plot(x, tanh(x))
plt.title('Tanh Function')
plt.xlabel('x')
plt.ylabel('tanh(x)')
plt.show()

 

 

순전파(Forward-Propagation)

입력 데이터의 시퀀스 길이(타임 스텝)만큼 순환되는 출력(은닉상태)의 계산 과정을 알아봅니다.

 

셀을 이루는 가중치는 입력 데이터 $X$ 와 곱해지는 $ W_x $ 와 은닉 상태 $ H $ 와 곱해지는 $ W_h $ 그리고 편향 $ b $ 입니다.

 

 

현재 타임 스텝의 은닉 상태 $ H_t $ 에 대한 계산은 다음과 같습니다.

 

$ H_t = tanh(X_t W_x + H_{t-1} W_h + b) $

 

다음은 순환 신경망의 순전파 계산입니다. (시퀀스 길이 8, 입력 벡터 차원 2, 은닉층 크기 4)

import numpy as np

t = 8 
input_dim = 2
hidden_size = 4

Wx = np.random.random((hidden_size, input_dim))
Wh = np.random.random((hidden_size, hidden_size))
b = np.random.random((hidden_size,))

print('Wx shape:', np.shape(Wx))
print('Wh shape:', np.shape(Wh))
print('b shape:', np.shape(b))
print('total params:', Wx.shape[0] * Wx.shape[1] + Wh.shape[0] * Wh.shape[1] + b.shape[0])

hidden_states = []
hidden_state = np.zeros((hidden_size,))
input_ = np.random.random((t, input_dim))
# print('input:', input_)

for x in input_:
    hidden_state = np.tanh(np.dot(Wx, x) + np.dot(Wh, hidden_state) + b)
    hidden_states.append(list(hidden_state))
    print('timestep:', len(hidden_states), hidden_state)
    
# print(np.stack(hidden_states))
Wx shape: (4, 2)
Wh shape: (4, 4)
b shape: (4,)
total params: 28
timestep: 1 [0.84105377 0.8297238  0.91622967 0.67137979]
timestep: 2 [0.99128408 0.97884487 0.99519833 0.9828166 ]
timestep: 3 [0.99892693 0.99607506 0.99888751 0.99707515]
timestep: 4 [0.99853079 0.99417363 0.99869668 0.9965562 ]
timestep: 5 [0.99916201 0.9972471  0.99901747 0.99749684]
timestep: 6 [0.99737831 0.99456041 0.99775821 0.99475771]
timestep: 7 [0.99921775 0.99734956 0.99906598 0.99759901]
timestep: 8 [0.99661161 0.99204837 0.99747657 0.99395993]

 

케라스를 이용하여 파라미터 개수 정보를 확인합니다.

from keras.models import Sequential
from keras.layers import SimpleRNN

model = Sequential()
model.add(SimpleRNN(4, input_shape=(4, 2)))
model.summary()
____________________________________________________________________________________________________
Layer (type)                     Output Shape          Param       Connected to                     
====================================================================================================
simplernn_1 (SimpleRNN)          (None, 4)             28          simplernn_input_1[0][0]          
====================================================================================================
Total params: 28
Trainable params: 28
Non-trainable params: 0
____________________________________________________________________________________________________

 

 

구현

다대다 순환 신경망 모델을 구현합니다.

 

학습 데이터를 정의합니다.

char = {
    0: 'h',
    1: 'e',
    2: 'l',
    3: 'r',
    4: 'o'
}

x_train = [[[1, 0, 0, 0, 0],
            [0, 1, 0, 0, 0],
            [0, 0, 0, 1, 0],
            [0, 0, 0, 0, 1]],
           [[1, 0, 0, 0, 0],
            [0, 1, 0, 0, 0],
            [0, 0, 1, 0, 0],
            [0, 0, 1, 0, 0]],
           [[1, 0, 0, 0, 0],
            [0, 0, 0, 0, 1],
            [0, 0, 1, 0, 0],
            [0, 1, 0, 0, 0]]]

y_train = [[0, 1, 3, 4], # hero
           [0, 1, 2, 2], # hell
           [0, 4, 2, 1]] # hole

 

신경망을 정의합니다.

import tensorflow as tf
import numpy as np

class Model:
    def __init__(self):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 4, 5])
            self.y = tf.placeholder(tf.int32, [None, 4])
    
        with tf.name_scope('layer'):
            cell = tf.nn.rnn_cell.BasicRNNCell(4)
            outputs, state = tf.nn.dynamic_rnn(cell, self.x, dtype=tf.float32)
        
        with tf.name_scope('output'):
            self.logits = tf.layers.dense(outputs, 5)
            
        with tf.name_scope('loss'):
            weights = tf.ones([3, 4])
            self.loss = tf.contrib.seq2seq.sequence_loss(logits=self.logits, targets=self.y, weights=weights)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(0.1).minimize(self.loss)
        
        self.sess = tf.Session()
        self.sess.run(tf.global_variables_initializer())
    
    def predict(self, x):
        logits = self.sess.run(self.logits, {self.x: x})
        return np.argmax(logits, 2)
        
    def train(self, x_train, y_train, epochs):
        for e in range(epochs):
            loss, _ = self.sess.run([self.loss, self.train_op], {self.x: x_train, self.y: y_train})
#             predict = self.predict(x=x_train)
#             print('epoch:', e + 1, '/ loss:', loss, '/ predict:', predict)

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, epochs=30)

predict = model.predict(x_train)
for i, word in enumerate(predict):
    print(i + 1, 'word: ', end='')
    for c in word:
        print(char[c], end='')
    print()
1 word: hero
2 word: hell
3 word: hole

 

 

  Comments,     Trackbacks
합성곱 신경망(Convolutional Neural Network)

 

2020/11/12 - [머신러닝/강화학습] - 소프트맥스 회귀(Softmax Regression) (2)

 

합성곱 신경망(Convolutional Neural Network)

일반적인 신경망은 완전 연결층(fully connected layer)의 구조를 가지고 있습니다. 하지만 이는 이미지 처리에 사용하기에는 한계가 있습니다. 이미지와 같은 입력 데이터는 채널을 포함하여 3차원 텐서로 표현됩니다. 이를 완전 연결층에서 처리하기 위해서는 1차원 텐서인 벡터로 변환하여 사용해야 하는데 이는 공간적 구조 정보가 유실된 것이라고 할 수 있으며 좋은 성능을 내지 못합니다.

 

이러한 문제에 대하여 개선한 것이 합성곱 신경망(convolutional neural network)입니다. 합성곱 연산을 통해 이미지의 공간적 구조 정보를 보존하여 좋은 성능을 내는 것입니다.

 

 

신경망의 구조는 합성곱 연산을 수행하는 합성곱층, 풀링 연산을 수행하는 풀링층, 플래튼층, 완전 연결층 등으로 구성할 수 있습니다.

 

 

커널(Kernel)

합성곱 신경망의 가중치(weight)이며 합성곱 연산을 수행하는 행렬을 필터(filter) 또는 커널(kernel)이라고 합니다. 

 

일반 신경망에서 3x3 이미지를 입력받아 4개의 출력을 한다면 가중치는 36개의 가중치가 필요로 합니다.

 

 

합성곱 신경망에서 가중치의 개수는 커널의 원소로 다음 커널은 $w_0, w_1, w_2, w_3$ 4개의 가중치만을 갖습니다.

 

 

 

스트라이드(Stride)

커널이 미끄러지는 간격을 의미합니다.

 

다음 그림은 입력 이미지 5x5, 커널 사이즈 3x3, 스트라이드 2의 합성곱 연산 과정입니다.

 

 

왼쪽에서 오른쪽으로, 위쪽에서 아래쪽으로 2칸씩 이동하며 연산을 수행합니다.

 

 

합성곱 연산(Convolution operation)

합성곱 연산은 입력 데이터(이미지)의 특징을 추출하는 역할을 합니다.

 

입력과 커널 간의 합성곱 연산의 개념은 다음과 같습니다.

 

 

$0*0 + 1*1 + 3*2 + 4*3 = 19$

$0*1 + 1*2 + 2*4 + 3*5 = 25$

$3*0 + 4*1 + 2*6 + 7*3 = 37$

$4*0 + 5*1 + 2*7 + 8*3 = 43$

 

import tensorflow as tf
import numpy as np

x = tf.placeholder(tf.float32, [None, 3, 3, 1])

init = tf.constant_initializer([0, 1, 2, 3])
conv = tf.contrib.layers.conv2d(x, 1, 2, 1, weights_initializer=init, padding='valid')

flatten = tf.layers.flatten(conv)

fc = tf.layers.dense(flatten, 1, tf.nn.relu)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    
    input_data = np.array([[0,1,2], [3,4,5], [6,7,8]])
    input_data = input_data.reshape(1, 3, 3, 1)
    output, _ = sess.run([conv, fc], {x: input_data})
    
    print(output)
[[[[19.]
   [25.]]

  [[37.]
   [43.]]]]

 

입력과 커널간에 합성곱 수행 과정으로 스트라이드 만큼 왼쪽에서 오른쪽, 위쪽에서 아래쪽으로 건너가며 연산하여 특성 맵(feature map)을 생성합니다. 이러한 특성 맵은 이미지의 특징 정보를 담고 있습니다.

 

 

여기서 입력 데이터는 3채널을 갖는 7x7x3 의 3차원 텐서입니다. 또한 커널의 사이즈는 3x3 이며 개수는 2개 이므로 출력되는 특성 맵의 개수는 2개가 됩니다.

 

 

패딩(Padding)

합성곱 연산을 수행하기 전에 입력 이미지의 양 끝에 원소를 채우는 것을 의미합니다. 이 때 원소는 주로 0으로 채우는 제로 패딩(zero padding)을 이용합니다. 또한 패딩은 입력 행렬 원소의 연산 참여도와 연산 후 얻는 출력(특성 맵)의 크기에 영향을 줍니다.

 

밸리드 패딩(valid padding)은 패딩을 사용하지 않는 것입니다.

 

 

패딩을 사용하지 않으므로 연산 횟수가 적어 속도가 빠르며 입력 이미지 원소의 연산 참여도가 다르다는 특징이 있습니다. 연산 참여도란 합성곱 연산을 위해 커널과 맵핑되는 원소의 참여도를 말하는 것으로 양 끝 원소의 연산 참여도가 낮습니다.

 

 

풀 패딩(full padding)은 입력 데이터의 모든 원소에 대해 동일한 연산 참여도를 갖도록 패딩을 채우는 것입니다.

 

따라서 패딩 방식중에 연산 횟수가 가장 많아 속도가 느립니다.

 

 

세임 패딩(same padding)은 출력의 크기가 입력과 같아지도록 패딩을 채우는 것으로 입력 데이터의 크기와 합성곱 연산을 수행하여 출력되는 특징 맵의 크기가 같습니다.

 

 

모든 패딩을 한 그림으로 보면 다음과 같습니다.

 

 

 

풀링(Pooling)

특성 맵의 크기를 줄이는 연산으로 신경망의 연산을 줄이는 역할을 합니다.

 

앞서 합성곱층의 커널과 비슷한 개념인 풀링 영역으로 입력으로 받은 특성 맵을 스트라이드 만큼 건너가며 연산을 수행합니다. 그 과정에서 맵핑되는 원소들에 대하여 최대 값을 가지는 원소를 선택하는 연산을 최대 풀링(max pooling), 원소들의 평균을 연산하는 것을 평균 풀링(average pooling)이라고 합니다.

 

다음은 풀 사이즈 2x2 , 스트라이드 2의 연산 결과를 나타냅니다. 

 

 

import tensorflow as tf
import numpy as np

x = tf.placeholder(tf.float32, [None, 4, 4, 1])

max_pooling = tf.nn.max_pool2d(x, ksize=2, strides=2, padding='VALID')
avg_pooling = tf.nn.avg_pool2d(x, ksize=2, strides=2, padding='VALID')

flatten = tf.layers.flatten(max_pooling + avg_pooling)
fc = tf.layers.dense(flatten, 1, tf.nn.relu)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    
    input_data = np.array([29, 15, 28, 184, 0, 100, 70, 38, 12, 12, 7, 2, 12, 12, 45, 6]).reshape(1, 4, 4, 1)
    
    max_pooling_output, avg_pooling_output, _ = sess.run([max_pooling, avg_pooling, fc], {x: input_data})
    
    print(max_pooling_output)
    print(avg_pooling_output)
[[[[100.]
   [184.]]

  [[ 12.]
   [ 45.]]]]
[[[[36.]
   [80.]]

  [[12.]
   [15.]]]]

 

 

구현

CNN 모델을 구현합니다.

 

패션 MNIST 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras import datasets

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

print('data shape:', x_train.shape)

print('target shape:', y_train.shape)
print('target label:', np.unique(y_train, return_counts=True))
data shape: (60000, 28, 28)
target shape: (60000,)
target label: (array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8), array([6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000],
      dtype=int64))

 

학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

 

데이터는 28x28의 그레이스케일 이미지입니다.

x_train[0].shape
(28, 28)

 

또한 이미지의 구성값은 픽셀(fixel)이라고 하며 $ [0, 255] $ 의 범위를 갖습니다. 

import matplotlib.pyplot as plt

plt.imshow(x_train[0])
plt.title('Sample')
plt.colorbar()
plt.show()

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf

class Model:
    def __init__(self, lr=1e-3):
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 28, 28, 1])
            self.y = tf.placeholder(tf.int64)

        with tf.name_scope('preprocessing'):
            x_norm = self.x / 255.0
            y_onehot = tf.one_hot(self.y, 10)
            
        with tf.name_scope('layer'):
            conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)
            pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')
            conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)
            pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')
            flat = tf.layers.flatten(pool2)
            
            fc = tf.layers.dense(flat, 64, tf.nn.relu)
            logits = tf.layers.dense(fc, 10)
            
        with tf.name_scope('output'):
            self.predict = tf.argmax(tf.nn.softmax(logits), 1)

        with tf.name_scope('accuracy'):
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.to_int64(self.predict), self.y), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=y_onehot, logits=logits)
            self.loss = tf.reduce_mean(cross_entropy)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/cnn_fashion_mnist/train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/cnn_fashion_mnist/val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())
    
    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y})

 

입력 데이터는 3차원 배열이며, 샘플은 2차원 배열로 28x28 이미지입니다.

self.x = tf.placeholder(tf.float32, [None, 28, 28])

 

입력 데이터의 픽셀값의 범위는 $ [0, 255] $ 을 갖기 때문에 255로 나누어 정규화합니다.

x_norm = self.x / 255.0

 

타겟 데이터는 정수 인코딩된 값입니다.

self.y = tf.placeholder(tf.int64)

 

원-핫 인코딩합니다.

y_onehot = tf.one_hot(self.y, 10)

 

합성곱층 > 풀링층 > 합성곱층 > 풀링층 > 플래튼층 > 완전 연결층

with tf.name_scope('layer'):
    conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)
    pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')
    conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)
    pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')
    flat = tf.layers.flatten(pool2)
    fc = tf.layers.dense(flat, 64, tf.nn.relu)
    logits = tf.layers.dense(fc, 10)

 

첫 번째 합성곱층으로 커널의 개수는 32개, 사이즈는 3x3, 밸리드 패딩, 활성화 함수는 렐루를 사용합니다.

conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)

 

첫 번째 풀링층은 최대 풀링 연산을 수행하며 풀 사이즈 2x2, 스트라이드 2, 밸리드 패딩을 사용합니다.

pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')

 

두 번째 합성곱층으로 커널의 개수는 64개, 사이즈는 3x3, 밸리드 패딩, 활성화 함수는 렐루를 사용합니다.

conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)

 

첫 번째 풀링층과 같습니다.

pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')

 

플래튼층으로 다차원 배열의 입력 데이터를 일렬로 펼칩니다.(tf.layers.flatten)

flat = tf.layers.flatten(pool2)

 

이러한 플래튼층은 가중치를 가지지 않으며 완전 연결층과 결합을 위한 것입니다.

fc = tf.layers.dense(flat, 64, tf.nn.relu)

 

미니 배치 경사 하강법으로 학습을 수행합니다.

def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
    data_size = len(x_train)
    for e in range(epochs):
        t_l, t_a = [], []

        idx = np.random.permutation(np.arange(data_size))
        _x_train, _y_train = x_train[idx], y_train[idx]

        for i in range(0, data_size, batch_size):
            si, ei = i, i + batch_size
            if ei > data_size:
                ei = data_size

            x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]

            tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
            t_l.append(tl)
            t_a.append(ta)

        vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})

        self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)

        print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=20)
model.score(x_test, y_test)
epoch: 1  / train_loss: 0.4958783 / train_acc: 0.821  / val_loss: 0.3615158 / val_acc: 0.8693333
epoch: 2  / train_loss: 0.33009052 / train_acc: 0.8801042  / val_loss: 0.33994812 / val_acc: 0.87633336
epoch: 3  / train_loss: 0.2863968 / train_acc: 0.8957292  / val_loss: 0.2963421 / val_acc: 0.89033335
epoch: 4  / train_loss: 0.2505999 / train_acc: 0.9079583  / val_loss: 0.309857 / val_acc: 0.8883333
epoch: 5  / train_loss: 0.22701743 / train_acc: 0.916125  / val_loss: 0.28903735 / val_acc: 0.8929167
epoch: 6  / train_loss: 0.2048815 / train_acc: 0.9238542  / val_loss: 0.26345694 / val_acc: 0.9091667
epoch: 7  / train_loss: 0.18721287 / train_acc: 0.9300625  / val_loss: 0.2767538 / val_acc: 0.90166664
epoch: 8  / train_loss: 0.16769926 / train_acc: 0.93795836  / val_loss: 0.25725135 / val_acc: 0.9113333
epoch: 9  / train_loss: 0.1539154 / train_acc: 0.9421667  / val_loss: 0.26997244 / val_acc: 0.9085
epoch: 10  / train_loss: 0.13899541 / train_acc: 0.9480417  / val_loss: 0.25565076 / val_acc: 0.9151667
epoch: 11  / train_loss: 0.12614623 / train_acc: 0.95208335  / val_loss: 0.26323685 / val_acc: 0.9166667
epoch: 12  / train_loss: 0.11178255 / train_acc: 0.958  / val_loss: 0.28505012 / val_acc: 0.909
epoch: 13  / train_loss: 0.101788476 / train_acc: 0.96166664  / val_loss: 0.2979613 / val_acc: 0.9149167
epoch: 14  / train_loss: 0.09371626 / train_acc: 0.96510416  / val_loss: 0.3131187 / val_acc: 0.913
epoch: 15  / train_loss: 0.081892245 / train_acc: 0.9685  / val_loss: 0.31888083 / val_acc: 0.9144167
epoch: 16  / train_loss: 0.07641095 / train_acc: 0.9698542  / val_loss: 0.34911385 / val_acc: 0.91066664
epoch: 17  / train_loss: 0.06667025 / train_acc: 0.97522914  / val_loss: 0.37385282 / val_acc: 0.9115
epoch: 18  / train_loss: 0.063369825 / train_acc: 0.9763542  / val_loss: 0.35609087 / val_acc: 0.915
epoch: 19  / train_loss: 0.05810668 / train_acc: 0.9788333  / val_loss: 0.40346622 / val_acc: 0.91566664
epoch: 20  / train_loss: 0.05347745 / train_acc: 0.97985417  / val_loss: 0.411666 / val_acc: 0.90991664

0.9013

 

에포크에 대한 정확도와 손실 함수의 그래프는 다음과 같습니다.(주황: 학습, 파랑: 검증)

 

 

10 에포크 이후에는 과대적합이 일어나며, 이를 방지하기 위해서는 5~7 에포크 정도에 조기종료해야 합니다.

 

 

  Comments,     Trackbacks