독학 연구소
공부한 내용을 정리합니다.
합성곱 신경망(Convolutional Neural Network)

 

2020/11/12 - [머신러닝/강화학습] - 소프트맥스 회귀(Softmax Regression) (2)

 

합성곱 신경망(Convolutional Neural Network)

일반적인 신경망은 완전 연결층(fully connected layer)의 구조를 가지고 있습니다. 하지만 이는 이미지 처리에 사용하기에는 한계가 있습니다. 이미지와 같은 입력 데이터는 채널을 포함하여 3차원 텐서로 표현됩니다. 이를 완전 연결층에서 처리하기 위해서는 1차원 텐서인 벡터로 변환하여 사용해야 하는데 이는 공간적 구조 정보가 유실된 것이라고 할 수 있으며 좋은 성능을 내지 못합니다.

 

이러한 문제에 대하여 개선한 것이 합성곱 신경망(convolutional neural network)입니다. 합성곱 연산을 통해 이미지의 공간적 구조 정보를 보존하여 좋은 성능을 내는 것입니다.

 

 

신경망의 구조는 합성곱 연산을 수행하는 합성곱층, 풀링 연산을 수행하는 풀링층, 플래튼층, 완전 연결층 등으로 구성할 수 있습니다.

 

 

커널(Kernel)

합성곱 신경망의 가중치(weight)이며 합성곱 연산을 수행하는 행렬을 필터(filter) 또는 커널(kernel)이라고 합니다. 

 

일반 신경망에서 3x3 이미지를 입력받아 4개의 출력을 한다면 가중치는 36개의 가중치가 필요로 합니다.

 

 

합성곱 신경망에서 가중치의 개수는 커널의 원소로 다음 커널은 $w_0, w_1, w_2, w_3$ 4개의 가중치만을 갖습니다.

 

 

 

스트라이드(Stride)

커널이 미끄러지는 간격을 의미합니다.

 

다음 그림은 입력 이미지 5x5, 커널 사이즈 3x3, 스트라이드 2의 합성곱 연산 과정입니다.

 

 

왼쪽에서 오른쪽으로, 위쪽에서 아래쪽으로 2칸씩 이동하며 연산을 수행합니다.

 

 

합성곱 연산(Convolution operation)

합성곱 연산은 입력 데이터(이미지)의 특징을 추출하는 역할을 합니다.

 

입력과 커널 간의 합성곱 연산의 개념은 다음과 같습니다.

 

 

$0*0 + 1*1 + 3*2 + 4*3 = 19$

$0*1 + 1*2 + 2*4 + 3*5 = 25$

$3*0 + 4*1 + 2*6 + 7*3 = 37$

$4*0 + 5*1 + 2*7 + 8*3 = 43$

 

import tensorflow as tf
import numpy as np

x = tf.placeholder(tf.float32, [None, 3, 3, 1])

init = tf.constant_initializer([0, 1, 2, 3])
conv = tf.contrib.layers.conv2d(x, 1, 2, 1, weights_initializer=init, padding='valid')

flatten = tf.layers.flatten(conv)

fc = tf.layers.dense(flatten, 1, tf.nn.relu)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    
    input_data = np.array([[0,1,2], [3,4,5], [6,7,8]])
    input_data = input_data.reshape(1, 3, 3, 1)
    output, _ = sess.run([conv, fc], {x: input_data})
    
    print(output)
[[[[19.]
   [25.]]

  [[37.]
   [43.]]]]

 

입력과 커널간에 합성곱 수행 과정으로 스트라이드 만큼 왼쪽에서 오른쪽, 위쪽에서 아래쪽으로 건너가며 연산하여 특성 맵(feature map)을 생성합니다. 이러한 특성 맵은 이미지의 특징 정보를 담고 있습니다.

 

 

여기서 입력 데이터는 3채널을 갖는 7x7x3 의 3차원 텐서입니다. 또한 커널의 사이즈는 3x3 이며 개수는 2개 이므로 출력되는 특성 맵의 개수는 2개가 됩니다.

 

 

패딩(Padding)

합성곱 연산을 수행하기 전에 입력 이미지의 양 끝에 원소를 채우는 것을 의미합니다. 이 때 원소는 주로 0으로 채우는 제로 패딩(zero padding)을 이용합니다. 또한 패딩은 입력 행렬 원소의 연산 참여도와 연산 후 얻는 출력(특성 맵)의 크기에 영향을 줍니다.

 

밸리드 패딩(valid padding)은 패딩을 사용하지 않는 것입니다.

 

 

패딩을 사용하지 않으므로 연산 횟수가 적어 속도가 빠르며 입력 이미지 원소의 연산 참여도가 다르다는 특징이 있습니다. 연산 참여도란 합성곱 연산을 위해 커널과 맵핑되는 원소의 참여도를 말하는 것으로 양 끝 원소의 연산 참여도가 낮습니다.

 

 

풀 패딩(full padding)은 입력 데이터의 모든 원소에 대해 동일한 연산 참여도를 갖도록 패딩을 채우는 것입니다.

 

따라서 패딩 방식중에 연산 횟수가 가장 많아 속도가 느립니다.

 

 

세임 패딩(same padding)은 출력의 크기가 입력과 같아지도록 패딩을 채우는 것으로 입력 데이터의 크기와 합성곱 연산을 수행하여 출력되는 특징 맵의 크기가 같습니다.

 

 

모든 패딩을 한 그림으로 보면 다음과 같습니다.

 

 

 

풀링(Pooling)

특성 맵의 크기를 줄이는 연산으로 신경망의 연산을 줄이는 역할을 합니다.

 

앞서 합성곱층의 커널과 비슷한 개념인 풀링 영역으로 입력으로 받은 특성 맵을 스트라이드 만큼 건너가며 연산을 수행합니다. 그 과정에서 맵핑되는 원소들에 대하여 최대 값을 가지는 원소를 선택하는 연산을 최대 풀링(max pooling), 원소들의 평균을 연산하는 것을 평균 풀링(average pooling)이라고 합니다.

 

다음은 풀 사이즈 2x2 , 스트라이드 2의 연산 결과를 나타냅니다. 

 

 

import tensorflow as tf
import numpy as np

x = tf.placeholder(tf.float32, [None, 4, 4, 1])

max_pooling = tf.nn.max_pool2d(x, ksize=2, strides=2, padding='VALID')
avg_pooling = tf.nn.avg_pool2d(x, ksize=2, strides=2, padding='VALID')

flatten = tf.layers.flatten(max_pooling + avg_pooling)
fc = tf.layers.dense(flatten, 1, tf.nn.relu)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    
    input_data = np.array([29, 15, 28, 184, 0, 100, 70, 38, 12, 12, 7, 2, 12, 12, 45, 6]).reshape(1, 4, 4, 1)
    
    max_pooling_output, avg_pooling_output, _ = sess.run([max_pooling, avg_pooling, fc], {x: input_data})
    
    print(max_pooling_output)
    print(avg_pooling_output)
[[[[100.]
   [184.]]

  [[ 12.]
   [ 45.]]]]
[[[[36.]
   [80.]]

  [[12.]
   [15.]]]]

 

 

구현

CNN 모델을 구현합니다.

 

패션 MNIST 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras import datasets

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

print('data shape:', x_train.shape)

print('target shape:', y_train.shape)
print('target label:', np.unique(y_train, return_counts=True))
data shape: (60000, 28, 28)
target shape: (60000,)
target label: (array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8), array([6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000],
      dtype=int64))

 

학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

 

데이터는 28x28의 그레이스케일 이미지입니다.

x_train[0].shape
(28, 28)

 

또한 이미지의 구성값은 픽셀(fixel)이라고 하며 $ [0, 255] $ 의 범위를 갖습니다. 

import matplotlib.pyplot as plt

plt.imshow(x_train[0])
plt.title('Sample')
plt.colorbar()
plt.show()

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf

class Model:
    def __init__(self, lr=1e-3):
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 28, 28, 1])
            self.y = tf.placeholder(tf.int64)

        with tf.name_scope('preprocessing'):
            x_norm = self.x / 255.0
            y_onehot = tf.one_hot(self.y, 10)
            
        with tf.name_scope('layer'):
            conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)
            pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')
            conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)
            pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')
            flat = tf.layers.flatten(pool2)
            
            fc = tf.layers.dense(flat, 64, tf.nn.relu)
            logits = tf.layers.dense(fc, 10)
            
        with tf.name_scope('output'):
            self.predict = tf.argmax(tf.nn.softmax(logits), 1)

        with tf.name_scope('accuracy'):
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.to_int64(self.predict), self.y), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=y_onehot, logits=logits)
            self.loss = tf.reduce_mean(cross_entropy)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/cnn_fashion_mnist/train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/cnn_fashion_mnist/val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())
    
    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y})

 

입력 데이터는 3차원 배열이며, 샘플은 2차원 배열로 28x28 이미지입니다.

self.x = tf.placeholder(tf.float32, [None, 28, 28])

 

입력 데이터의 픽셀값의 범위는 $ [0, 255] $ 을 갖기 때문에 255로 나누어 정규화합니다.

x_norm = self.x / 255.0

 

타겟 데이터는 정수 인코딩된 값입니다.

self.y = tf.placeholder(tf.int64)

 

원-핫 인코딩합니다.

y_onehot = tf.one_hot(self.y, 10)

 

합성곱층 > 풀링층 > 합성곱층 > 풀링층 > 플래튼층 > 완전 연결층

with tf.name_scope('layer'):
    conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)
    pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')
    conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)
    pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')
    flat = tf.layers.flatten(pool2)
    fc = tf.layers.dense(flat, 64, tf.nn.relu)
    logits = tf.layers.dense(fc, 10)

 

첫 번째 합성곱층으로 커널의 개수는 32개, 사이즈는 3x3, 밸리드 패딩, 활성화 함수는 렐루를 사용합니다.

conv1 = tf.layers.conv2d(x_norm, 32, [3, 3], padding='VALID', activation=tf.nn.relu)

 

첫 번째 풀링층은 최대 풀링 연산을 수행하며 풀 사이즈 2x2, 스트라이드 2, 밸리드 패딩을 사용합니다.

pool1 = tf.layers.max_pooling2d(conv1, [2, 2], [2, 2], padding='VALID')

 

두 번째 합성곱층으로 커널의 개수는 64개, 사이즈는 3x3, 밸리드 패딩, 활성화 함수는 렐루를 사용합니다.

conv2 = tf.layers.conv2d(pool1, 64, [3, 3], padding='VALID', activation=tf.nn.relu)

 

첫 번째 풀링층과 같습니다.

pool2 = tf.layers.max_pooling2d(conv2, [2, 2], [2, 2], padding='VALID')

 

플래튼층으로 다차원 배열의 입력 데이터를 일렬로 펼칩니다.(tf.layers.flatten)

flat = tf.layers.flatten(pool2)

 

이러한 플래튼층은 가중치를 가지지 않으며 완전 연결층과 결합을 위한 것입니다.

fc = tf.layers.dense(flat, 64, tf.nn.relu)

 

미니 배치 경사 하강법으로 학습을 수행합니다.

def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
    data_size = len(x_train)
    for e in range(epochs):
        t_l, t_a = [], []

        idx = np.random.permutation(np.arange(data_size))
        _x_train, _y_train = x_train[idx], y_train[idx]

        for i in range(0, data_size, batch_size):
            si, ei = i, i + batch_size
            if ei > data_size:
                ei = data_size

            x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]

            tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
            t_l.append(tl)
            t_a.append(ta)

        vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})

        self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)

        print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=20)
model.score(x_test, y_test)
epoch: 1  / train_loss: 0.4958783 / train_acc: 0.821  / val_loss: 0.3615158 / val_acc: 0.8693333
epoch: 2  / train_loss: 0.33009052 / train_acc: 0.8801042  / val_loss: 0.33994812 / val_acc: 0.87633336
epoch: 3  / train_loss: 0.2863968 / train_acc: 0.8957292  / val_loss: 0.2963421 / val_acc: 0.89033335
epoch: 4  / train_loss: 0.2505999 / train_acc: 0.9079583  / val_loss: 0.309857 / val_acc: 0.8883333
epoch: 5  / train_loss: 0.22701743 / train_acc: 0.916125  / val_loss: 0.28903735 / val_acc: 0.8929167
epoch: 6  / train_loss: 0.2048815 / train_acc: 0.9238542  / val_loss: 0.26345694 / val_acc: 0.9091667
epoch: 7  / train_loss: 0.18721287 / train_acc: 0.9300625  / val_loss: 0.2767538 / val_acc: 0.90166664
epoch: 8  / train_loss: 0.16769926 / train_acc: 0.93795836  / val_loss: 0.25725135 / val_acc: 0.9113333
epoch: 9  / train_loss: 0.1539154 / train_acc: 0.9421667  / val_loss: 0.26997244 / val_acc: 0.9085
epoch: 10  / train_loss: 0.13899541 / train_acc: 0.9480417  / val_loss: 0.25565076 / val_acc: 0.9151667
epoch: 11  / train_loss: 0.12614623 / train_acc: 0.95208335  / val_loss: 0.26323685 / val_acc: 0.9166667
epoch: 12  / train_loss: 0.11178255 / train_acc: 0.958  / val_loss: 0.28505012 / val_acc: 0.909
epoch: 13  / train_loss: 0.101788476 / train_acc: 0.96166664  / val_loss: 0.2979613 / val_acc: 0.9149167
epoch: 14  / train_loss: 0.09371626 / train_acc: 0.96510416  / val_loss: 0.3131187 / val_acc: 0.913
epoch: 15  / train_loss: 0.081892245 / train_acc: 0.9685  / val_loss: 0.31888083 / val_acc: 0.9144167
epoch: 16  / train_loss: 0.07641095 / train_acc: 0.9698542  / val_loss: 0.34911385 / val_acc: 0.91066664
epoch: 17  / train_loss: 0.06667025 / train_acc: 0.97522914  / val_loss: 0.37385282 / val_acc: 0.9115
epoch: 18  / train_loss: 0.063369825 / train_acc: 0.9763542  / val_loss: 0.35609087 / val_acc: 0.915
epoch: 19  / train_loss: 0.05810668 / train_acc: 0.9788333  / val_loss: 0.40346622 / val_acc: 0.91566664
epoch: 20  / train_loss: 0.05347745 / train_acc: 0.97985417  / val_loss: 0.411666 / val_acc: 0.90991664

0.9013

 

에포크에 대한 정확도와 손실 함수의 그래프는 다음과 같습니다.(주황: 학습, 파랑: 검증)

 

 

10 에포크 이후에는 과대적합이 일어나며, 이를 방지하기 위해서는 5~7 에포크 정도에 조기종료해야 합니다.

 

 

  Comments,     Trackbacks