독학 연구소
공부한 내용을 정리합니다.
단어 임베딩 (1)
순환 신경망(Recurrent Neural Network) (3)

 

2021/01/04 - [머신러닝/딥러닝] - 순환 신경망(Recurrent Neural Network) (2)

 

단어 임베딩(Word Embedding)

신경망에 텍스트 데이터를 처리하는 경우에는 텍스트를 숫자화해야 하는데, 이는 원-핫 인코딩으로 전처리할 수 있습니다.

from tensorflow.keras.utils import to_categorical

word = {
    'man': 0,
    'woman': 1
}

to_categorical(list(word.values()))
array([[1., 0.],
       [0., 1.]], dtype=float32)

 

하지만 텍스트 데이터를 원-핫 인코딩할 경우 단어 사이의 관계를 나타내지 못하게됩니다. 이러한 문제를 해결하는 것이 단어 임베딩(word embedding)입니다.

 

 

케라스에서 제공하는 임베딩층을 통해 동작을 확인합니다. (tf.keras.layers.Embedding)

from tensorflow.keras.layers import Embedding

embedding = tf.keras.layers.Embedding(input_dim=2, output_dim=3)
output = embedding(tf.constant([0, 1, 2, 3, 0, 1]))

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print(output.eval())
[[-0.04695427  0.04983367 -0.04970566]
 [ 0.03672746  0.00194643 -0.00861054]
 [ 0.          0.          0.        ]
 [ 0.          0.          0.        ]
 [-0.04695427  0.04983367 -0.04970566]
 [ 0.03672746  0.00194643 -0.00861054]]

 

output_dim은 출력 길이이고, input_dim은 처리할 단어의 개수입니다. 위와 같이 단어의 개수를 2로 지정하면 먼저 입력된 0, 1은 정상적으로 실수값을 출력을 하는 반면에 2, 3은 0을 출력하는 것을 볼 수 있습니다.

 

 

구현

다대일 순환 신경망 모델을 구현합니다.

 

IMDB 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras.datasets import imdb

(x_train, y_train), (x_test, y_test) = imdb.load_data(skip_top=20, num_words=1000)

 

skip_top 은 빈도수 상위 20개를 생략하는 것이고, num_words 는 데이터를 이루는 단어의 개수를 지정합니다.

 

어휘 사전을 불러옵니다.

word_to_index = imdb.get_word_index()
index_to_word = { v: k for k, v in word_to_index.items() }

index_to_word
{34701: 'fawn',
 52006: 'tsukino',
 52007: 'nunnery',
 16816: 'sonja',
 63951: 'vani',
 1408: 'woods',
 16115: 'spiders',
 2345: 'hanging',
 2289: 'woody',
 52008: 'trawling',
 ...
 }

 

2는 어휘 사전에 없는 단어로 생략합니다.

for i in range(len(x_train)):
    x_train[i] = [w for w in x_train[i] if w > 2]
    
for i in range(len(x_test)):
    x_test[i] = [w for w in x_test[i] if w > 2]
    
x_train[0]
[22, 43, 65, 66, 36, 25, 43, 50, 35, 39, 38, 50, 22, 22, 71, 87, 43, 38, 76, 22, 62, 66, 33, 38, 25, 51, 36, 48, 25, 33, 22, 28, 77, 52, 82, 36, 71, 43, 26, 46, 88, 98, 32, 56, 26, 22, 21, 26, 30, 51, 36, 28, 92, 25, 65, 38, 88, 32, 32]

 

샘플의 길이를 동일하게 조정합니다. (tf.keras.preprocessing.sequence.pad_sequences)

from tensorflow.keras.preprocessing import sequence

x_train = sequence.pad_sequences(x_train, 100)
x_test = sequence.pad_sequences(x_test, 100)

x_train.shape
(25000, 100)

 

전처리를 끝내고 학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

print(x_train.shape)
print(x_val.shape)
(20000, 100, 100)
(5000, 100, 100)

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Embedding

class Model:
    def __init__(self, lr=1e-3):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 100])
            self.y = tf.placeholder(tf.float32, [None, 1])

        with tf.name_scope('layer'):
            ebd = Embedding(1000, 32)
            output = ebd(self.x)
            cell = tf.nn.rnn_cell.BasicRNNCell(32)
            outputs, state = tf.nn.dynamic_rnn(cell, output, dtype=tf.float32)
                       
        with tf.name_scope('output'):
            self.output = tf.layers.dense(state, 1, tf.nn.sigmoid)
            
        with tf.name_scope('accuracy'):
            self.predict = tf.cast(tf.greater(self.output, tf.constant([0.5])), dtype=tf.float32)
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(self.y, self.predict), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.keras.losses.binary_crossentropy(y_true=self.y, y_pred=self.output)
            self.loss = tf.reduce_mean(cross_entropy)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.AdamOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/rnn_imdb_2/train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/rnn_imdb_2/val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())

    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs, batch_size=32):
        data_size = len(x_train)
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / loss:', np.mean(t_l), '/ acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y})

 

타겟 데이터의 형태를 변형합니다.

y_train = y_train.reshape(-1, 1)
y_val = y_val.reshape(-1, 1)
y_test = y_test.reshape(-1, 1)

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=5)
model.score(x_test, y_test)
epoch: 1  / loss: 0.5451177 / acc: 0.72195  / val_loss: 0.45631307 / val_acc: 0.7916
epoch: 2  / loss: 0.4257674 / acc: 0.81515  / val_loss: 0.4409481 / val_acc: 0.806
epoch: 3  / loss: 0.42866415 / acc: 0.8042  / val_loss: 0.49786302 / val_acc: 0.764
epoch: 4  / loss: 0.37565607 / acc: 0.8398  / val_loss: 0.45572335 / val_acc: 0.7998
epoch: 5  / loss: 0.3322474 / acc: 0.8655  / val_loss: 0.45722726 / val_acc: 0.8138

0.8172

 

 

  Comments,     Trackbacks