Loading [MathJax]/extensions/TeX/boldsymbol.js
독학 연구소
공부한 내용을 정리합니다.
머신러닝/강화학습 (16)
PPO(Proximal Policy Optimization)

PPO(Proximal Policy Optimization)

논문 Proximal Policy Optimization Algorithms 으로 소개되었습니다.

 

액터-크리틱과 같은 on-policy 알고리즘들의 단점은 학습하기 위해서는 해당 정책을 통해 발생된 샘플이 필요하며 한번 사용된 샘플은 폐기하기 때문에 데이터 사용의 효율성이 떨어집니다. 또한 정책 그래디언트 기반의 알고리즘들이 갖을 수 있는 문제는 정책이 큰 폭으로 변하여 학습이 불안정할 수 있다는 것입니다.

 

PPO 는 확률적 정책을 갖는 정책 그래디언트 알고리즘으로 정책의 변화를 제한하면서 목적 함수를 최대화하고자 하는 아이디어입니다. 이는 TRPO(Trust Region Policy Optimization) 를 기반한 것으로 정책의 변화를 제한하기 때문에 안정적이고 이에 따라 이전 정책과 현재 정책은 근사하여 이전 정책으로부터 발생된 데이터를 재사용할 수 있습니다. 이에 따라 샘플링된 데이터를 이용하여 여러번의 에폭을 반복하여 최적화 과정을 수행한다는 것입니다.

 

 

목적 함수

TRPO 논문의 Sample-Based Estimation of the Objective and Constraint 챕터에서 제안한 대체 목적 함수(surrogate objective function)를 개선합니다.

 

해당 챕터에서는 몬테카를로 시뮬레이션을 이용해 목적 및 제약 함수를 근사하는 방법론을 설명합니다.

 

maximizeθ^Et[πθ(at|st)πθold(at|st)ˆAt]

subject to ^Et[KL[πθold(|st),πθ(|st)]]δ

 

하지만 이는 구현하기 어려우며 현실적으로 제약 조건 대신 페널티를 사용하는 것을 제안합니다.

 

maximizeθ^Et[πθ(at|st)πθold(at|st)ˆAtβKL[πθold(|st),πθ(|st)]]

 

이것 역시 계수 β 를 선택하기 어렵다는 단점 때문에 개선이 필요합니다.

 

 

먼저 두 정책의 확률 비율을 다음과 같이 표기합니다.

 

rt(θ)=πθ(at|st)πθold(at|st)

 

목적 함수의 기본 형태는 다음과 같습니다.

 

LCPI(θ)=ˆE[πθ(at|st)πθold(at|st)ˆAt]=ˆE[rt(θ)ˆAt]

 

위 식을 그대로 사용한다면 과도한 정책 업데이트가 될 것입니다. 따라서 정책의 변화를 클리핑(clipping)하여 최종적으로 다음과 같은 목적 함수를 제안합니다.

 

LCLIP(θ)=ˆE[min(rt(θ)ˆAt,clip(rt(θ),1ϵ,1+ϵ)ˆAt)]

 

클리핑이란 다음과 같이 범위를 제한하는 것입니다.

 

 

위에서 알아본 KL 발산에 페널티를 주는 방식도 대체 목적 함수로 포함합니다.

 

LKLPEN(θ)=ˆE[πθ(at|st)πθold(at|st)ˆAtβKL[πθold(|st),πθ(|st)]]

 

이는 클리핑 방식보다 성능은 낮지만 중요한 기준(important baseline)입니다.

 

 

사용할 수 있는 대체 목적 함수(surrogate objective function)는 다음과 같습니다.

 

No clipping or penalty:Lt(θ)=rt(θ)ˆAtClipping:Lt(θ)=min(rt(θ)ˆAt,clip(rt(θ),1ϵ,1+ϵ))ˆAtKL penalty (fixed or adaptive):Lt(θ)=rt(θ)ˆAtβKL[πθold,πθ]

 

 

대체 함수에 대한 실험 결과는 클리핑 방식이 가장 우수한 성능을 내었습니다.

 

 

 

GAE(Generalized Advantage Estimator)

PPO 는 기존 어드밴티지 추정 방식에서 확장하는 방법인 GAE 를 도입합니다.

 

먼저 n-스텝 관계식을 이용하여 어드밴티지 추정값 δV 을 구하면 다음과 같습니다.

 

ˆA(1)t:=δVt=V(st)+rt+γV(st+1)ˆA(2)t:=δVt+γδVt+1=V(st)+rt+γrt+1+γ2V(st+2)ˆA(3)t:=δVt+γδVt+1+γ2δVt+2=V(st)+rt+γrt+1+γ2rt+2+γ3V(st+3)

 

ˆA(k)t:=k1l=0γlδVt+l=V(st)+rt+γrt+1++γk1rt+k1+γkV(st+k)

 

k 가 커질수록 γkV(st+k) 는 더욱 감쇠되어 V(st) 에 따른 편향(bias)에 영향을 주지 못할 것입니다. 따라서 k 가 무한으로 커질 때 다음 식을 얻을 수 있습니다. 

 

ˆA()t=l=0γlδVt+l=V(st)+l=0γlrt+l

 

이는 몬테카를로 방식으로 편향은 없지만 큰 분산(variance)을 갖습니다. 따라서 n-스텝 어드밴티지 추정 방식으로 분산과 편향을 조절할 수 있습니다.

 

또한 이를 확장하는 GAE 방법은 기하학적 가중치 평균으로 정의합니다.

 

ˆAGAE(γ,λ)t:=(1λ)(ˆA(1)t+λˆA(2)t+λ2ˆA(3)t+)=(1λ)(δVt+λ(δVt+γδVt+1)+λ2(δVt+δVt+1+γ2δVt+2+)=(1λ)(δVt(1+λ+λ2+)+γδVt+1(λ+λ2+λ3+)+γ2δVt+2(λ2+λ3+λ4+)+)=(1λ)(δVt(11λ)+γλVt+1(λ1λ)+γ2δVt+2(λ21λ)+)=l=0(γλ)lδVt+1

 

λ=0λ=1 의 경우 다음과 같습니다.

 

GAE(γ,0):ˆAt:=δt=rt+γV(st+1)V(st)

GAE(γ,1):ˆAt:=l=0γlδt+1=l=0γlrt+1V(st)

 

λ=0 일 경우 1-스텝 어드밴티지 추정이며 λ=1 일 경우 몬테카를로 방식의 어드밴티지 추정이 됩니다. 따라서 0<λ<1 의 범위로 지정하며 일반적으로 γ 보다 작은 λ 를 설정했을 때 좋은 성능을 보여주고 있습니다.

 

 

알고리즘

 

 

 

구현

알고리즘 테스트 환경은 OpenAI Gym  Pendulum-v0 입니다.

 

 

Pendulum 는 연속적인 행동 공간을 갖는 문제로 진자를 흔들어서 12시 방향으로 세워서 유지하는 문제입니다. 

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('Pendulum-v0')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.shape[0])
print('act bound:', env.action_space.low[0], env.action_space.high[0])

 

상태 정보는 3차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동 또한 [-2, 2] 의 범위의 연속적인 공간을 갖습니다.

obs: 3
act: 1
act bound: -2.0 2.0

 

 

액터 신경망을 정의합니다.

class Actor:
    def __init__(self):
        with tf.name_scope('actor'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
                self.adv = tf.placeholder(tf.float32, name='advantage')

            self.pi, self.params = self.build_net('pi')
            self.old_pi, self.old_params = self.build_net('old_pi', False)
            
            with tf.name_scope('action'):
                self.sample = self.pi.sample()
                self.action = tf.clip_by_value(self.sample, *A_BOUND)
            
            with tf.name_scope('loss'):
                self.ratio = tf.exp(self.pi.log_prob(self.a) - self.old_pi.log_prob(self.a))
                self.clip_ratio = tf.clip_by_value(self.ratio, 1-E, 1+E)

                # No clipping or penalty
    #                 self.loss = tf.reduce_mean(self.ratio * self.adv)

                # KL penalty (fixed or adaptive)
    #                 kl = tf.reduce_mean(tf.distributions.kl_divergence(old_pi, pi))
    #                 self.loss = self.ratio * self.adv - BETA * kl

                # Clipping
                self.loss = tf.reduce_mean(tf.minimum(self.ratio * self.adv, self.clip_ratio * self.adv))

            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)
            
    def build_net(self, scope, trainable=True):
        with tf.variable_scope(scope):
            with tf.variable_scope('layer'):
                fc1 = tf.layers.dense(self.s, 256, tf.nn.relu, trainable=trainable)
                fc2 = tf.layers.dense(fc1, 128, tf.nn.relu, trainable=trainable)
            with tf.variable_scope('output'):
                mu = tf.layers.dense(fc2, N_A, tf.nn.tanh) 
                sigma = tf.layers.dense(fc2, N_A, tf.nn.softplus)
                pi = tf.distributions.Normal(mu * A_BOUND[1], sigma)
        params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope)
        return pi, params

 

확률 분포에 해당하는 현재 정책과 이전 정책을 구분하여 생성합니다.

self.pi, self.params = self.build_net('pi')
self.old_pi, self.old_params = self.build_net('old_pi', False)

 

이전 정책에 해당 하는 파라미터는 학습하지 않을 것이므로 trainable=False 로 설정합니다.

 

현재 정책과 이전 정책의 확률 비율을 계산합니다.

self.ratio = tf.exp(tf.log(pi_probs) - tf.log(old_probs))

 

위에서 구한 비율을 클리핑합니다.

self.clip_ratio = tf.clip_by_value(self.ratio, 1-E, 1+E)

 

손실 함수를 정의합니다.

self.loss = tf.reduce_mean(tf.minimum(self.ratio * self.adv, self.clip_ratio * self.adv))

 

다음 식을 계산하는 것으로 비율과 클리핑된 비율 각각에 어드밴티지를 곱한 것 중 작은 값들에 대한 평균을 구합니다.

 

LCLIP(θ)=ˆE[min(rt(θ)ˆAt,clip(rt(θ),1ϵ,1+ϵ)ˆAt)]

 

 

마이너스를 붙여 손실값이 최대화가 되는 방향으로 학습하도록 설정합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)

 

크리틱 신경망을 정의합니다.

class Critic:
    def __init__(self):
        with tf.variable_scope('critic'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.td_target = tf.placeholder(tf.float32, [None, 1], name='td_target')
            with tf.variable_scope('layer'):
                fc1 = tf.layers.dense(self.s, 256, tf.nn.relu)
                fc2 = tf.layers.dense(fc1, 128, tf.nn.relu)
            with tf.variable_scope('output'):
                self.v = tf.layers.dense(fc2, 1)
            with tf.name_scope('td_error'):
                self.td_error = self.td_target - self.v
            with tf.name_scope('loss'):
                self.loss = tf.reduce_mean(tf.square(self.td_error))
            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_C).minimize(self.loss)

 

크리틱 신경망의 구조는 바닐라 액터-크리틱 구조와 동일합니다.

 

 

에이전트와 환경의 상호작용입니다.

agent = PPO()

for i in range(MAX_EPI + 1):
    memory = []
    
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)

        if done:
            reward = -1
        
        memory.append((state, action, reward, done, next_state))

        if done or len(memory) == BATCH_SIZE:
            agent.train(memory)
            memory = []

        state = next_state

 

매 에피소드마다 메모리를 초기화하고 배치 사이즈만큼 s,a,r,d,s 를 메모리에 저장합니다. 에피소드가 종료되거나 메모리 사이즈가 배치 사이즈와 같아지면 업데이트를 진행합니다.

 

 

신경망을 업데이트합니다.

def train(self, batch):
    # state action reward done next_state
    batch = np.array(batch)

    v_pred = self.get_v(np.append(batch[:, 0].tolist(), batch[-1, 4][np.newaxis, :], 0))

    g = 0
    gae = []
    td_target = []
    for t in reversed(range(len(batch))):
        delta = batch[t, 2] + GAMMA * v_pred[t+1] * ~np.array(batch[t, 3]).astype(bool) - v_pred[t]
        g = delta + GAMMA * GAE_LAMDA * g
        gae.insert(0, g)
        td_target.insert(0, v_pred[t] + g)

    for _ in range(EPOCH):
        self.sess.run(self.critic.train_op, {self.critic.s: batch[:, 0].tolist(),
                                             self.critic.td_target: np.vstack(td_target)})
        self.sess.run(self.actor.train_op, {self.actor.s: batch[:, 0].tolist(),
                                            self.actor.a: batch[:, 1],
                                            self.actor.adv: np.vstack(gae)})

    self.update_pi()

 

먼저 계산에 사용하기 위한 상태 가치를 추정합니다.

def get_v(self, s):
    v = self.sess.run(self.critic.v, {self.critic.s: s})
    return v

...

v_pred = self.get_v(np.append(batch[:, 0].tolist(), batch[-1, 4][np.newaxis, :], 0))

 

마지막 상태 뒤에 마지막 다음 상태를 추가한 것은 계산의 편의성을 위한 것입니다.

 

 

GAE 와 TD 타겟을 계산합니다.

g = 0
gae = []
td_target = []
for t in reversed(range(len(batch))):
    delta = batch[t, 2] + GAMMA * v_pred[t+1] * ~np.array(batch[t, 3]).astype(bool) - v_pred[t]
    g = delta + GAMMA * GAE_LAMDA * g
    gae.insert(0, g)
    td_target.insert(0, v_pred[t] + g)

 

GAE 를 이용한 TD 오차와 타겟 계산은 기존의 액터-크리틱 방식과 차이가 있습니다. TD 오차 계산은 배치 크기만큼 저장된 샘플 데이터를 이용하지만 n-스텝이 아닌 1-스텝으로 계산합니다.

 

δt=rt+γV(st+1)V(st)

 

δt+1=rt+1+γV(st+2)V(st+1)1-스텝 계산δt+1=rt+γrt+1+γ2V(st+2)V(st+1)n-스텝 계산 

 

 

또한 TD 오차를 이용하여 역방향으로 GAE 를 계산합니다.

 

AGAEt=δt+(γλ)AGAEt+1

 

이 또한 tt+1 의 재귀적인 관계로 감쇠를 반복하는 계산을 수행합니다.

 

TD 타겟을 재계산합니다. GAE 를 각 상태별 추정된 가치에 더하여 구할 수 있습니다. 이는 추정된 어드밴티지가 GAE 방식으로 재계산되었기 때문입니다.

 

yt=AGAEt+V(st)

 

 

설정한 에폭만큼 반복하여 신경망을 업데이트합니다.

for _ in range(EPOCH):
    self.sess.run(self.critic.train_op, {self.critic.s: batch[:, 0].tolist(),
                                         self.critic.td_target: np.vstack(td_target)})
    self.sess.run(self.actor.train_op, {self.actor.s: batch[:, 0].tolist(),
                                        self.actor.a: batch[:, 1],
                                        self.actor.adv: np.vstack(gae)})

 

신경망 업데이트가 완료되면 현재 정책의 파라미터를 이전 정책의 파라미터로 복사합니다.

self._update_pi = [op.assign(p) for op, p in zip(self.actor.old_params, self.actor.params)]

...

def update_pi(self):
    self.sess.run(self._update_pi)

...

self.update_pi()

 

 

1000번의 에피소드 동안 학습한 결과입니다.

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/ppo_pendulum/model/model
episode: 0 / reward: -259.1
episode: 1 / reward: -127.0
episode: 2 / reward: -550.9
episode: 3 / reward: -254.3
episode: 4 / reward: -251.1
episode: 5 / reward: -134.4

 

 

 

  Comments,     Trackbacks
DDPG(Deep Deterministic Policy Gradient)

DDPG(Deep Deterministic Policy Gradient)

논문 Continuous control with deep reinforcement learning 으로 소개되었습니다.

 

DQN 은 off-policy 알고리즘으로 experience replay, target network 기법을 통해 좋은 성능을 내지만 연속적인 행동 공간(continuous action space)을 갖는 환경에서는 적용할 수 없으며 정책 그래디언트 기반의 액터-크리틱 방법론은 분산(variance)은 감소시켰지만 on-policy 알고리즘이므로 데이터간에 상관관계(correlation)가 크기 때문에 편향(bias)을 갖는 단점이 있습니다.

 

DDPG 는 확정적 정책(deterministic policy)을 취하는 정책 그래디언트 알고리즘으로 off-policy 이면서 연속적인 행동 공간을 갖는 환경에서도 적용이 가능한 것이 특징입니다. 또한 액터-크리틱 방법론은 액터 신경망에서 평균과 표준 편차를 출력함으로써 가우시안 분포를 갖는 확률 밀도 함수를 추정하였는데 DDPG 는 행동 그 자체를 추정하자는 아이디어인 것입니다.

 

확정적 정책을 갖는 알고리즘에는 Q 를 학습하는 DQN, SARSA 등이 있습니다. DDPG 또한 Q 를 학습하며 연속적인 행동 공간을 갖는 환경에도 적용할 수 있는 DQN 이라고 할 수 있습니다. 하지만 연속적인 행동 공간에서 Q 를 학습하기 위해서는 정책 그래디언트에 근거한 접근을 이용하기 때문에 정책 그래디언트 기반의 방법론인 것입니다.

 

DDPG 알고리즘은 DQN 에서 적용하는 기법인 experience replay, target network 와 추가적으로 탐색을 위한 noise 기법을 이용합니다.

 

 

타겟 네트워크(Target Network)

일반 신경망과 동일한 구조의 타겟 신경망을 두는 기법입니다. 업데이트 계산을 위한 타겟은 이 타겟 신경망으로부터 구하는데 이와 같이 하는 이유는 동일한 신경망으로부터 타겟과 예측을 구한다면 매번 다른 값이 나올 수 있기 때문입니다.

 

DQN 은 일정 주기마다 일반 신경망의 파라미터를 타겟 신경망으로 한 번에 복사하여 타겟 신경망을 일정 주기동안 고정하지만 DDPG 는 소프트 업데이트(soft update) 방식으로 매 학습마다 파라미터값을 조금씩 업데이트합니다. 따라서 타겟 신경망의 파라미터값이 고정되지 않고 계속 변한다는 특징이 있습니다.

 

파라미터 업데이트 식은 다음과 같습니다.

 

θQτθQ+(1τ)θQ Critic θμτθμ+(1τ)θμ Actor 

 

 

이를 구현하면 다음과 같습니다.

TAU = 0.01

x = 0.54123
x_target = 0.01

for i in range(1000):
    x_target = TAU * x + (1 - TAU) * x_target
    print(i, x_target)
0 0.015312300000000001
1 0.020571477
2 0.025778062230000003
3 0.030932581607700002

...

996 0.5412063641292437
997 0.5412066004879513
998 0.5412068344830718
999 0.5412070661382411

 

 

OU 노이즈(Ornstein-Uhlenbeck Noise)

연속적인 행동 공간을 갖는 환경에서의 주된 과제는 탐색(exporation)입니다. DDPG 는 확정적 정책으로 탐색을 위한 무작위성이 없기 때문에 OU 노이즈를 이용하여 이를 해결하고자 합니다.

 

노이즈를 생성하는 식은 다음과 같습니다.

 

dxt=θ(μxt)dt+σdWt

 

μ 는 평균, θ 는 평균에 회귀하는 것, σ 는 표준 편차, Wt 는 평균이 0이고 분산이 1인 가우시안 화이트 노이즈를 의미합니다.

 

OU 노이즈를 구현하면 다음과 같습니다.

import matplotlib.pyplot as plt
import numpy as np

class OUNoise:
    def __init__(self,action_dimension,mu=0, theta=0.15, sigma=0.3):
        self.action_dimension = action_dimension
        self.mu = mu
        self.theta = theta
        self.sigma = sigma
        self.state = np.ones(self.action_dimension) * self.mu
        self.reset()

    def reset(self):
        self.state = np.ones(self.action_dimension) * self.mu

    def noise(self):
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(len(x))
        self.state = x + dx
        return self.state

ou1 = OUNoise(1, sigma=0.4)
ou2 = OUNoise(1, sigma=0.3)
ou3 = OUNoise(1, sigma=0.2)
ou4 = OUNoise(1, sigma=0.1)

multi_ou = OUNoise(3, sigma=0.2)

r = []

for i in range(100):
    r.append((ou1.noise(), ou2.noise(), ou3.noise(), ou4.noise(), multi_ou.noise()))

r = np.array(r)

plt.figure(figsize=(12, 6))
plt.plot(r[:, 0], label='sigma=0.4')
plt.plot(r[:, 1], label='sigma=0.3')
plt.plot(r[:, 2], label='sigma=0.2')
plt.plot(r[:, 3], label='sigma=0.1')
plt.title('OU Noise')
plt.ylabel('N')
plt.xlabel('time step')
plt.legend(loc='upper right')
plt.show()

plt.figure(figsize=(12, 6))
plt.plot(r[:, 4].tolist())
plt.title('OU Noise - multiple output')
plt.ylabel('N')
plt.xlabel('time step')
plt.show()

 

액터 신경망에서 출력된 행동 μ(st|θμt) 에 노이즈 N 를 추가하는 것입니다.

 

μ(st)=μ(st|θμt)+N Train μ(st)=μ(st|θμt) Play 

 

또한 노이즈는 탐색을 위한 것으로 학습시에만 필요한 것입니다.

 

 

목적 함수

액터 신경망의 목표는 리턴의 기댓값을 최대로 만드는 최적의 정책을 구하는 것입니다.

 

따라서 목적 함수는 다음과 같이 리턴의 기댓값인 가치 함수로부터 시작합니다.

 

J(θ)=Vμθ

 

증명을 통해 재구성된 목적 함수의 그래디언트 식은 다음과 같습니다.

 

θμJEstρβ[θμQ(s,a|θQ)|s=st,a=μ(st|θμ)]=Estρβ[aQ(s,a|θQ)|s=st,a=μ(st)θμμ(s|θμ)|s=st]

 

Q(s,a|θQ)θμ 에서 체인 룰을 이용해 Q(s,a|θQ)aaθμ 로 유도된 것으로 이를 통해 액터 신경망의 파라미터 θμ 를 학습할 수 있는 것입니다.

 

크리틱 신경망의 손실함수는 TD 타겟과 예측의 오차를 계산합니다.

 

L(θQ)=Estρβ,atβ,rtE[(Q(st,at|θQ)yt)2]

 

TD 타겟을 계산하는 식은 다음과 같으며

 

yt=r(st,at)+γQ(st+1,μ(st+1|θμ)|θQ

 

타겟 신경망을 이용한 계산으로 고치면 다음과 같습니다.

 

yt=r(st,at)+γQ(st+1,μ(st+1|θμ)|θQ

 

 

알고리즘

 

 

 

구현

알고리즘 테스트 환경은 OpenAI GymBipedalWalker-v2 입니다.

 

 

BipedalWalker 은 로봇을 목표지점까지 걷게하는 문제입니다.

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('BipedalWalker-v3')

print('obs', env.observation_space.shape[0])
print('act', env.action_space.shape[0])
print('act low', env.action_space.low)
print('act high', env.action_space.high)

 

상태 정보는 24차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 4차원의 벡터로 각 요소는 [-1, 1] 의 범위의 연속적인 공간을 갖습니다.

obs 24
act 4
act low -1.0
act high 1.0

 

 

액터 신경망을 정의합니다.

class Actor:
    def __init__(self, scope):
        with tf.variable_scope(scope):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
            with tf.variable_scope('layer'):
                self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
                self.fc2 = tf.layers.dense(self.fc1, 256, tf.nn.relu)
            with tf.name_scope('out'):
                self.a = tf.layers.dense(self.fc2, N_A, tf.nn.tanh) * A_BOUND[1]
        self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)

 

액터 신경망은 확정적 정책으로 상태를 입력으로 받으면 바로 행동을 출력합니다.

self.a = tf.layers.dense(self.fc2, N_A, tf.nn.tanh) * A_BOUND[1]

...

def get_action(self, s):
    action = self.sess.run(self.actor.a, {self.actor.s: s[np.newaxis, :]})[0]
    return action

 

출력으로 tanh 함수를 사용하여 출력의 범위를 [-1, 1] 로 하는 것입니다. 만약 환경이 더 큰 범위를 갖는다면 뒤에 상수를 곱함으로써 범위를 조정할 수 있습니다.

 

 

크리틱 신경망을 정의합니다.

class Critic:
    def __init__(self, scope):
        with tf.variable_scope(scope):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
            with tf.variable_scope('layer'):
                self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
                concat = tf.concat([self.a, self.fc1], axis=-1)
                self.fc2 = tf.layers.dense(concat, 256, tf.nn.relu)
            with tf.name_scope('out'):
                self.q = tf.layers.dense(self.fc2, 1)
        self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)

 

액터 신경망으로부터 전달받은 행동층과 결합하여 최종적으로 Q 를 출력합니다.

with tf.name_scope('input'):
    self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
    self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
with tf.variable_scope('layer'):
    self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
    concat = tf.concat([self.a, self.fc1], axis=-1)
    self.fc2 = tf.layers.dense(concat, 256, tf.nn.relu)
with tf.name_scope('out'):
    self.q = tf.layers.dense(self.fc2, 1)

 

일반 신경망과 타겟 신경망을 생성합니다. 

class DDPG:
    def __init__(self, load=False):
        self.actor = Actor('actor/eval')
        self.critic = Critic('critic/eval')
        self.target_actor = Actor('actor/target')
        self.target_critic = Critic('critic/target')

 

TD 타겟을 입력으로 받습니다.

with tf.name_scope('input'):
    self.td_target = tf.placeholder(tf.float32, [None, 1])

 

크리틱 신경망의 손실 함수를 정의합니다.

with tf.name_scope('c_loss'):
    self.c_loss = tf.losses.mean_squared_error(self.td_target, self.critic.q)

 

Q 타겟과 추정 Q 평균 제곱 오차(mean squared error, MSE)를 계산합니다.

 

손실값을 최소화하는 방향으로 학습하도록 설정합니다.

with tf.name_scope('c_optimizer'):
    self.c_train_op = tf.train.AdamOptimizer(LR_C).minimize(self.c_loss)

 

그래디언트 계산을 정의합니다.

with tf.name_scope('grads'):
    self.q_grads = tf.gradients(self.critic.q, self.critic.a)
    self.a_grads = tf.gradients(-self.actor.a, self.actor.params, self.q_grads)

 

목적함수 그래디언트 식은 다음과 같습니다.

 

θμJEstρβ[aQ(s,a|θQ)|s=st,a=μ(st)θμμ(s|θμ)|s=st]

 

Q(s,a|θQ)aaθμ 를 계산하는 것으로 크리틱 신경망의 파라미터 θQ 에 의한 행동에 대한 Q 에 대한 그래디언트 Q(s,a|θQ)a 를 계산하고 이는 최종적으로 계산하고자 하는 aθμ 그라디언트 계산에 초기값으로 곱해지는 것입니다. 또한 최대화를 해야하므로 마이너스 부호를 붙입니다.(tf.gradients)

 

그래디언트와 적용할 파라미터를 설정합니다.(tf.train.Optimizer)

with tf.name_scope('a_optimizer'):
    self.a_train_op = tf.train.AdamOptimizer(LR_A).apply_gradients(zip(self.a_grads, self.actor.params))

 

크리틱 신경망에서 행동에 대한 Q의 그래디언트를 계산하고 이를 통해 액터 신경망의 파라미터를 업데이트하는 것입니다.

 

 

타겟 신경망에 대한 소프트 업데이트에 대한 부분입니다.

self.update_op = [tf.assign(t_p, TAU * t_p + (1 - TAU) * e_p) for t_p, e_p in zip(self.target_actor.params + self.target_critic.params, self.actor.params + self.critic.params)]

...

def update_target(self):
    self.sess.run(self.update_op)

 

환경이 갖는 행동 크기만큼 OU 노이즈 객체를 생성합니다.

N_A = env.action_space.shape[0]

...

self.ou = OUNoise(N_A)

 

이는 각 노이즈가 다른 궤적을 갖는 것을 의미합니다.

 

 

에이전트와 환경의 상호작용입니다.

agent = DDPG()

for i in range(MAX_EPI + 1):
    agent.ou.reset()
    
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        noise = agent.ou.noise()
        action = np.clip(action + noise, *A_BOUND)
        next_state, reward, done, _ = env.step(action)

        agent.replay_memory.append((state, action, reward, done, next_state))
        
        if len(agent.replay_memory) >= MAX_MEM * 0.1:
            agent.train()
        
        state = next_state

 

매 에피소드마다 OU 노이즈를 초기화하여 노이즈의 무작위성을 보장합니다.

agent.ou.reset()

 

액터 신경망으로부터 출력된 행동에 노이즈를 추가하고 행동 범위를 클리핑합니다.

action = agent.get_action(state)
noise = agent.ou.noise()
action = np.clip(action + noise, *A_BOUND)

 

이를 통해 확정적 정책에 대해 탐색 문제를 해소합니다.

 

μ(st)=μ(st|θμt)+N

 

 

환경으로부터 받은 정보를 리플레이 메모리에 저장하고 일정 크기부터 업데이트를 진행합니다.

MAX_MEM = 50000

...

next_state, reward, done, _ = env.step(action)
        
agent.replay_memory.append((state, action, reward, done, next_state))

if len(agent.replay_memory) >= MAX_MEM * 0.1:
    agent.train()

 

신경망 업데이트에 대한 부분입니다.

def train(self):
    minibatch = random.sample(self.replay_memory, BATCH_SIZE)
    minibatch = np.array(minibatch)

    a_ = self.sess.run(self.target_actor.a, {self.target_actor.s: minibatch[:, 4].tolist()})
    target_q = self.sess.run(self.target_critic.q, {self.target_critic.s: minibatch[:, 4].tolist(),
                                              self.target_critic.a: a_})

    td_target = minibatch[:, 2] + GAMMA * target_q.squeeze() * ~minibatch[:, 3].astype(np.bool)

    self.sess.run(self.c_train_op, {self.critic.s: minibatch[:, 0].tolist(),
                                    self.critic.a: minibatch[:, 1].tolist(),
                                    self.td_target: td_target[:, np.newaxis]})

    self.sess.run(self.a_train_op, {self.actor.s: minibatch[:, 0].tolist(),
                                    self.critic.s: minibatch[:, 0].tolist(),
                                    self.critic.a: minibatch[:, 1].tolist()})

    self.update_target()

 

리플레이 메모리에 저장된 샘플을 배치 사이즈만큼 랜덤하게 추출합니다.

minibatch = random.sample(self.replay_memory, BATCH_SIZE)
minibatch = np.array(minibatch)

 

타겟 신경망으로부터 다음 상태에 대한 Q(si+1,μ(si+1|θμ) 를 구합니다.

a_ = self.sess.run(self.target_actor.a, {self.target_actor.s: minibatch[:, 4].tolist()})
target_q = self.sess.run(self.target_critic.q, {self.target_critic.s: minibatch[:, 4].tolist(), self.target_critic.a: a_})

 

TD 타겟인 ri+γQ(si+1,μ(si+1|θμ)|θQ) 을 계산합니다.

q_target = minibatch[:, 2] + GAMMA * target_q.squeeze() * ~minibatch[:, 3].astype(np.bool)

 

크리틱 신경망과 액터 신경망을 각각 업데이트합니다.

self.sess.run(self.c_train_op, {self.critic.s: minibatch[:, 0].tolist(),
                                self.critic.a: minibatch[:, 1].tolist(),
                                self.q_target: q_target[:, np.newaxis]})

self.sess.run(self.a_train_op, {self.actor.s: minibatch[:, 0].tolist(),
                                self.critic.s: minibatch[:, 0].tolist(),
                                self.critic.a: minibatch[:, 1].tolist()})

 

크리틱 신경망의 파라미터는 타겟과 예측에 대한 오차를 최소화하는 방향으로 업데이트하며 액터 신경망의 파라미터는 크리틱 신경망으로부터 계산된 Q 그래디언트를 이용하여 목적 함수가 최대가 되도록 행동을 수정하는 방향으로 업데이트합니다.

 

일반 신경망의 파라미터를 타겟 신경망의 파라미터로 복제합니다.

self.update_target()

 

 

1000번의 에피소드 동안 학습한 결과입니다.

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/ddpg_bipedalwalker/model/model
episode: 0 reward: 190.82911344149113
episode: 1 reward: 32.018837167782266
episode: 2 reward: 66.25328426062981
episode: 3 reward: 186.63262380705766

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

PPO(Proximal Policy Optimization)  (0) 2020.12.07
A3C(Asynchronous Advantage Actor-Critic)  (0) 2020.11.30
액터-크리틱(Actor-Critic)  (0) 2020.11.27
정책 그래디언트(Policy Gradient)  (0) 2020.11.24
DQN(Deep Q-Networks) (2)  (0) 2020.11.24
  Comments,     Trackbacks
A3C(Asynchronous Advantage Actor-Critic)

A3C(Asynchronous Advantage Actor-Critic)

논문 Asynchronous Methods for Deep Reinforcement Learning 으로 소개되었습니다.

 

A2C 는 온라인 학습과 어드밴티지를 통해 분산을 줄였지만 여전히 문제를 가지고 있습니다. (A2C 단일 에이전트) 학습에 사용하는 샘플 데이터들은 시간의 흐름에 따라 순차적인 것으로 데이터들이 서로 연관된 것입니다. 이러한 데이터간의 높은 상관관계(correlation)는 목적 함수의 그래디언트를 편향시키고 학습을 불안정하게 만들 수 있습니다.

 

A3C 는 비동기(asynchronous)라는 의미가 추가된 것으로 다중 에이전트를 통해 비동기적으로 공유 신경망을 학습하는 방법론입니다. 각 에이전트는 독립적인 환경에서 에피소드를 수행하며 얻은 샘플 데이터를 이용하여 공유 신경망의 파라미터를 업데이트하고 그 파라미터를 자신의 네트워크로 복제하는 과정을 통해 학습하는 것입니다.

 

 

이러한 다중 에이전트를 병렬적으로 운용함에 따라 수집되는 데이터의 다양성은 증가하게 되어 데이터간의 상관관계를 줄일 수 있다는 것입니다.

 

또한 목적 함수 그래디언트 계산에 엔트로피(entropy)를 추가하는데 이는 조기 수렴을 막아 탐색(exploration)이 향상된다는 것입니다.

 

 

엔트로피(Entropy)

먼저 정보량은 다음과 같이 정의됩니다.

 

h(x)=logp(x)

 

이는 빈번하게 일어나는 사건은 새로울 것이 없으므로 정보량은 적으며 반대로 빈번하지 않은 사건은 정보량이 많다는 것입니다.

 

 

엔트로피는 정보량의 기댓값으로 정의됩니다.

 

H(p)=Exp(x)[logp(x)]=xp(x)logp(x)dx

 

 

p(x) 가 다음과 같이 평균이 μ 이고 표준 편차가 σ 인 확률 밀도 함수라면

 

p(x)1σ2πexp((xμ)22σ2)

 

엔트로피는 다음과 같습니다.

 

H(p)=12(1+log(2πσ2))

 

이 경우 엔트로피는 분산 σ2 에만 영향을 받으며 분산이 커질수록 증가하는 것입니다. 분산이 커질수록 사건의 무작위성이 커지고 특정 사건의 발생 빈도수가 작아지기 때문에 정보량이 증가하는 것입니다.

 

 

이에 따라 엔트로피를 추가한 목적 함수의 그래디언트 식은 다음과 같습니다.

 

θJ(θ)=θlogπ(at|st;θ)(RtV(st;θv))+βθH(π(st;θ))

 

추가된 엔트로피에 의해 확률 분포는 영향을 받게 되는데 하이퍼 파라미터 β 탐색(exploration) 활용(exploitation) 사이를 제어할 수 있습니다. β 를 큰 값으로 하면 그래디언트 계산에 엔트로피의 영향력이 커져 무작위성이 증가하게 되고 β 를 작은 값으로 하면 그에 대한 영향력은 작아지게 되는 것입니다.

 

 

알고리즘

 

 

 

구현

알고리즘 테스트 환경은 OpenAI Gym  Pendulum-v0 입니다.

 

 

Pendulum 는 연속적인 행동 공간을 갖는 문제로 진자를 흔들어서 12시 방향으로 세워서 유지하는 문제입니다. 

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('Pendulum-v0')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.shape[0])
print('act bound:', env.action_space.low[0], env.action_space.high[0])

 

상태 정보는 3차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동 또한 [-2, 2] 의 범위의 연속적인 공간을 갖습니다.

obs: 3
act: 1
act bound: -2.0 2.0

 

 

신경망을 정의합니다.

def build_net(self, scope):
    with tf.variable_scope('actor'):
        a_fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
        a_fc2 = tf.layers.dense(a_fc1, 256, tf.nn.relu)
        mu = tf.layers.dense(a_fc2, N_A, tf.nn.tanh) * A_BOUND[1]
        sigma = tf.layers.dense(a_fc2, N_A, tf.nn.softplus)

    with tf.variable_scope('critic'):
        c_fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
        c_fc2 = tf.layers.dense(c_fc1, 256, tf.nn.relu)
        v = tf.layers.dense(c_fc2, 1)

    a_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope + '/actor')
    c_params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope + '/critic')

    return a_params, c_params, v, mu, sigma

 

액터 신경망의 출력층으로 확률 밀도 함수를 나타내는 평균과 표준 편차를 출력합니다.

mu = tf.layers.dense(a_fc2, N_A, tf.nn.tanh) * A_BOUND[1]
sigma = tf.layers.dense(a_fc2, N_A, tf.nn.softplus)

 

활성화 함수 tanh 은 [-1, 1] 의 범위를 갖기 때문에 환경이 갖는 행동 범위 [-2, 2] 로 조정합니다.

A_BOUND = [env.action_space.low, env.action_space.high]

...

mu = tf.layers.dense(a_fc2, N_A, tf.nn.tanh) * A_BOUND[1]

 

tanh 함수는 sigmoid 를 변환해서 나온 함수입니다.

 

 

표준 편차에는 활성화 함수 softplus 를 이용합니다.

sigma = tf.layers.dense(a_fc2, N_A, tf.nn.softplus)

 

softplus 함수는 부드러운 ReLU 라고 할 수 있습니다.

 

 

 

스코프를 통해 공유 신경망과 일반 신경망을 구분합니다.

if scope == 'global':
    with tf.variable_scope(scope):
        with tf.name_scope('input'):
            self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
        self.a_params, self.c_params = self.build_net(scope)[:2]
else:
    with tf.variable_scope(scope):
        with tf.name_scope('input'):
            self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
            self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
            self.td_target = tf.placeholder(tf.float32, [None, 1], name='td_target')

        self.a_params, self.c_params, self.v, self.mu, self.sigma = self.build_net(scope)

 

크리틱 신경망의 손실 함수를 정의합니다.

with tf.name_scope('td_error'):
    self.td_error = self.td_target - self.v
with tf.name_scope('c_loss'):
    self.c_loss = tf.reduce_mean(tf.square(self.td_error))

 

TD 오차를 계산한 후 제곱의 평균을 계산하는 것으로 계산된 TD 오차는 어드밴티지로 사용합니다.

 

 

실수 범위를 갖는 행동을 선택합니다.

with tf.name_scope('output'):
    self.dist = tf.distributions.Normal(self.mu, self.sigma)
    self.sample = self.dist.sample()
    self.action = tf.clip_by_value(self.sample, *A_BOUND)

 

출력층의 평균과 표준 편차로 가우시안 분포를 구합니다. (tf.distributions.Normal)

self.dist = tf.distributions.Normal(self.mu, self.sigma)

 

가우시안 분포로 부터 샘플을 선택하고 범위를 클리핑하여 행동을 구합니다.

self.sample = self.dist.sample()
self.action = tf.clip_by_value(self.sample, *A_BOUND)

 

 

액터 신경망의 손실 함수를 정의합니다.

with tf.name_scope('a_loss'):
    self.a_loss = self.dist.log_prob(self.a) * tf.stop_gradient(self.td_error) + self.dist.entropy() * ENTROPY_BETA

 

tf.stop_gradient 를 사용한 것은 공통된 입력층을 통해 입력받아 액터와 크리틱 신경망을 동시에 업데이트하기 때문에 TD 오차 계산과 관련된 크리틱의 파라미터에 대한 그래디언트 계산을 중지하는 것입니다.

 

 

공유 신경망과 일반 신경망간의 업데이트를 정의합니다.

with tf.name_scope('local_grad'):
    self.a_grads = tf.gradients(self.a_loss, self.a_params)
    self.c_grads = tf.gradients(self.c_loss, self.c_params)

with tf.name_scope('pull'):
    self.a_pull_op = [l_p.assign(g_p) for l_p, g_p in zip(self.a_params, globalAC.a_params)]
    self.c_pull_op = [l_p.assign(g_p) for l_p, g_p in zip(self.c_params, globalAC.c_params)]
with tf.name_scope('push'):
    self.a_push_op = tf.train.RMSPropOptimizer(LR_A).apply_gradients(zip(self.a_grads, globalAC.a_params))
    self.c_push_op = tf.train.RMSPropOptimizer(LR_C).apply_gradients(zip(self.c_grads, globalAC.c_params))
        
...

def update_global(self, feed_dict):
    self.sess.run([self.c_push_op, self.a_push_op], feed_dict)
    
def pull_global(self):
    self.sess.run([self.c_pull_op, self.a_pull_op])

 

일반 신경망의 그래디언트 계산을 정의합니다.(tf.gradients)

with tf.name_scope('local_grad'):
    self.a_grads = tf.gradients(-self.a_loss, self.a_params)
    self.c_grads = tf.gradients(self.c_loss, self.c_params)

 

글로벌 신경망의 파라미터를 일반 신경망의 파라미터로 복제하는 부분입니다.

with tf.name_scope('pull'):
    self.a_pull_op = [l_p.assign(g_p) for l_p, g_p in zip(self.a_params, globalAC.a_params)]
    self.c_pull_op = [l_p.assign(g_p) for l_p, g_p in zip(self.c_params, globalAC.c_params)]

 

그래디언트와 적용할 파라미터를 설정합니다.(tf.train.Optimizer)

with tf.name_scope('push'):
    self.a_push_op = tf.train.RMSPropOptimizer(LR_A).apply_gradients(zip(self.a_grads, globalAC.a_params))
    self.c_push_op = tf.train.RMSPropOptimizer(LR_C).apply_gradients(zip(self.c_grads, globalAC.c_params))

 

일반 신경망의 파라미터를 통해 그래디언트를 계산하고 글로벌 신경망의 파라미터를 업데이트하는 것입니다.

 

θ=θ+θJ(θ) 

 

쓰레드를 통해 독립적으로 에피소드를 진행하는 각 에이전트는 공유 파라미터를 이용해 업데이트를 수행하고 다시 일반 신경망의 파라미터로 복제함으로써 공유하는 것입니다.

 

 

쓰레드를 통해 다중 에이전트에 학습을 수행합니다.(tf.train.Coordinator)

sess = tf.Session(config=config)
coord = tf.train.Coordinator()

workers = []
with tf.device('/cpu:0'):
    globalAC = A3C('global')
    for i in range(16):
        name = 'worker_%i' % i
        workers.append(A3C(name, globalAC, sess, coord, max_epi))

sess.run(tf.global_variables_initializer())
        
worker_threads = []
for w in workers:
    t = threading.Thread(target=w.run)
    t.start()
    worker_threads.append(t)

coord.join(worker_threads)

 

 

1000번의 에피소드 동안 학습한 결과입니다.

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

PPO(Proximal Policy Optimization)  (0) 2020.12.07
DDPG(Deep Deterministic Policy Gradient)  (0) 2020.11.30
액터-크리틱(Actor-Critic)  (0) 2020.11.27
정책 그래디언트(Policy Gradient)  (0) 2020.11.24
DQN(Deep Q-Networks) (2)  (0) 2020.11.24
  Comments,     Trackbacks
액터-크리틱(Actor-Critic)

액터-크리틱(AC, Actor-Critic)

정책을 의미하는 액터와 가치 함수를 의미하는 크리틱을 같이 두는 방법론으로 크리틱은 가치 함수를 추정하며 정책을 평가하는 역할을 합니다.

 

 

정책 그래디언트 정리와 REINFORCE 알고리즘의 J(θ) 식은 다음과 같습니다.

 

J(θ)sμ(s)aqπ(s,a)π(a|s,θ) Policy Gradient Theorem =Eπ[Gtlnπ(a|s,θ)]REINFORCE

 

REINFORCE 알고리즘에서는 따로 가치망을 두지 않고 한 에피소드의 리턴 Gt 을 이용해서 정책을 평가하고 개선했습니다. 리턴이 양수라면 취했던 행동의 확률은 증가하고 음수라면 확률을 감소하는 방향으로 정책 파라미터를 업데이트하는 것입니다.

 

이러한 REINFORCE 알고리즘은 직관적이고 틀릴 수 없는 알고리즘이지만 episodic 환경에서만 사용할 수 있으며 리턴을 이용하므로 분산이 크다는 단점을 갖습니다. 따라서 유연하며 확장성이 좋은 액터-크리틱 방법론을 알아보도록 하겠습니다.

 

먼저 알아볼 알고리즘은 QAC 입니다.

 

Q 가 붙은 이유는 행동 가치 함수 Q(s,a) 를 학습하기 때문입니다. J(θ) 계산은 정책 그래디언트 정리의 qπ 가 아닌 추정 Qw 로 사용합니다. 이는 액터의 파라미터 θ 가 아닌 크리틱의 파라미터 w 라는 의미입니다. QAC 알고리즘은 이러한 Qw 를 이용해 정책을 평가하고 개선하는 것입니다.

 

 

QAC 알고리즘은 다음과 같습니다.

 

 

 

구현

QAC 알고리즘을 테스트할 환경은 OpenAI Gym  CartPole-v0 입니다.

 

 

CartPole 은 좌우로 카트를 이동하여 막대기가 쓰러지지 않도록 오래 유지하는 문제입니다. 

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('CartPole-v0')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.n)

 

상태 정보는 4차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 스칼라로 2개의 범위를 갖는 이산적인 공간입니다.

obs: 4
act: 2

 

 

액터 신경망을 정의합니다.

class Actor:
    def __init__(self):
        with tf.variable_scope('actor'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.a = tf.placeholder(tf.int32, name='action')
                self.q = tf.placeholder(tf.float32, name='q')

            with tf.variable_scope('layer'):
                self.fc = tf.layers.dense(self.s, 32, tf.nn.relu)

            with tf.variable_scope('output'):
                self.pi = tf.layers.dense(self.fc, N_A, tf.nn.softmax)

            with tf.name_scope('loss'):
                self.loss = -tf.log(self.pi[0, self.a]) * self.q[self.a]

            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_A).minimize(self.loss)

 

상태, 행동, Q(s,a) 를 입력으로 받습니다.

with tf.name_scope('input'):
    self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
    self.a = tf.placeholder(tf.int32, name='action')
    self.q = tf.placeholder(tf.float32, name='q')

 

출력층으로 입력으로 받은 상태에 대한 행동의 확률을 출력합니다.

with tf.variable_scope('output'):
    self.pi = tf.layers.dense(self.fc, N_A, tf.nn.softmax)

 

소프트맥스 활성화 함수를 이용하여 출력의 합이 1인 확률 분포를 출력하는 것입니다.

 

손실 함수를 정의합니다.

with tf.name_scope('loss'):
    self.loss = tf.log(self.pi[0, self.a]) * self.q[self.a]

 

해당 타임스텝에서 취한 행동에 대해서 추정 Q(s,a) 가 양수라면 취한 행동의 확률은 증가하고 음수이면 그 확률은 감소할 것입니다. 또한 log 함수를 통해 높은 확률을 갖는 행동에 대해서는 변화량이 적으며 낮은 확률을 갖는 행동의 변화량은 클 것입니다.

 

마이너스를 붙여 손실값이 최대가 되는 방향으로 학습하도록 설정합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)

 

크리틱 신경망을 정의합니다.

class Critic:
    def __init__(self):
        with tf.variable_scope('critic'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.a = tf.placeholder(tf.int32, name='action')
                self.td_target = tf.placeholder(tf.float32, name='td_target')

            with tf.variable_scope('layer'):
                self.fc = tf.layers.dense(self.s, 32, tf.nn.relu)

            with tf.variable_scope('output'):
                self.q = tf.layers.dense(self.fc, N_A)

            with tf.name_scope('loss'):
                self.loss = tf.losses.mean_squared_error(self.td_target, self.q[0, self.a])

            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_C).minimize(self.loss)

 

상태, 행동, TD 타겟을 입력으로 받습니다.

with tf.name_scope('input'):
    self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
    self.a = tf.placeholder(tf.int32, name='action')
    self.td_target = tf.placeholder(tf.float32, name='td_target')

 

출력층으로 입력으로 받은 상태에 대한 Q(s,a) 를 출력합니다. 

with tf.variable_scope('output'):
    self.q = tf.layers.dense(self.fc, N_A)

 

손실 함수를 정의합니다.

with tf.name_scope('loss'):
    self.loss = tf.losses.mean_squared_error(self.td_target, self.q[0, self.a])

 

타겟과 예측 사이의 평균 제곱 오차(mean suqared error, MSE) TD 타겟과 추정 Q(s,a) 간의 계산입니다.

 

손실값을 최소화하는 방향으로 학습하도록 설정합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.AdamOptimizer(LR_C).minimize(self.loss)

 

에이전트와 환경의 상호작용입니다.

num_episode = 200
for i in range(num_episode):
    state = env.reset()
    action = agent.get_action(state)
    done = False
    while not done:
        next_state, reward, done, _ = env.step(action)
        next_action = agent.get_action(next_state)
        
        agent.update_critic(done, reward, state, next_state, next_action, action)
        agent.update_actor(state, action)
        
        state = next_state
        action = next_action

 

에피소드를 진행하며 매 타임스텝마다 액터와 크리틱의 신경망을 업데이트합니다.

 

추정 Qt(s,a) 를 입력으로 하여 액터 신경망을 업데이트합니다.

def update_actor(self, state, action):
    q = self.get_q(state)
    self.sess.run(self.actor.train_op, {self.actor.s: state[np.newaxis, :], self.actor.a: action, self.actor.q: q})

def get_q(self, s):
    return self.sess.run(self.critic.q, {self.critic.s: s[np.newaxis, :]})[0]

 

크리틱 신경망을 업데이트합니다.

def update_critic(self, done, reward, state, next_state, next_action, action):
    if done:
        action_value = 0
    else:
        q = self.get_q(next_state)
        action_value = q[next_action]

    td_target = reward + GAMMA * action_value

    self.sess.run(self.critic.train_op, {self.critic.s: state[np.newaxis, :], self.critic.a: action, self.critic.td_target: td_target})

 

에피소드가 종료되지 않았다면 다음 상태와 다음 행동으로 추정 Q(s,a) 를 이용하여 1-스텝 TD 타겟을 계산합니다.

 

 

500번의 에피소드 동안 학습한 결과입니다.

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/qac_cartpole/model/model
episode: 0 / step: 146
episode: 1 / step: 200
episode: 2 / step: 150
episode: 3 / step: 31
episode: 4 / step: 180

 

 

어드밴티지 액터-크리틱(Advantage Actor-Critic, A2C)

QAC 는 추정 Q(s,a) 를 이용해서 정책을 평가하고 개선했습니다.

 

하지만 이보다 더욱 효과적인 방법이 있는데 바로 어드밴티지(advantage)를 이용하는 방법입니다.

 

A(s,a)=Qw(s,a)Vv(s)

 

상태 s 에서 행동 a 를 취했을 때의 추가되는 이득만 고려하는 것이라고 할 수 있습니다. 따라서 추정치의 분산을 작게 만드는 효과가 있습니다. 하지만 위의 식은 πθ, Vϕ, Qw 를 사용하기 때문에 번거롭습니다.

 

좀더 효율적으로 어드밴티지를 구할 수 있는데 다음 식을 통해 알아보겠습니다.

 

δ=r+γV(s)V(s)

 

δ 는 1-스텝 TD 오차이며 크리틱 신경망에서는 이 TD 오차를 감소하는 방향으로 학습한다는 것을 알아보았습니다. 또한 δ 를 어드밴티지로 사용할 수 있는데 이는 δ 의 기댓값이 A(s,a) 이기 때문입니다. 따라서 Qw 대신 r+γV(s) 를 이용해서 어드밴티지를 계산할 수 있는 것입니다. 

 

이러한 TD 오차 어드밴티지 방식을 TD Actor-Critic 이라고 하며 일반적으로 Advantage Actor-Critic 이라고 합니다.

 

 

A2C 알고리즘은 다음과 같습니다.

 

 

 

구현

A2C 알고리즘 테스트 환경은 OpenAI GymAcrobot-v1 입니다.

 

 

Acrobot 은 두 개의 관절과 두 개의 연결된 막대를 갖으며 주어진 높이까지 스윙하는 문제입니다. 한 에피소드에 500번의 타임스텝이 주어지며 단축된 스텝으로 클리어하는 것을 목표로 합니다.

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('Acrobot-v1')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.n)

 

상태 정보는 6차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 3개의 범위를 갖는 이산적인 공간입니다.

obs: 6
act: 3

 

 

액터 신경망을 정의합니다.

class Actor:
    def __init__(self):
        with tf.variable_scope('actor'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.a = tf.placeholder(tf.int32, name='action')
                self.adv = tf.placeholder(tf.float32, name='advantage')

            with tf.variable_scope('layer'):
                self.fc = tf.layers.dense(self.s, 64, tf.nn.relu)

            with tf.variable_scope('output'):
                self.pi = tf.layers.dense(self.fc, N_A, tf.nn.softmax)

            with tf.name_scope('loss'):
                probs = tf.reduce_sum(self.pi * tf.one_hot(self.a, N_A), axis=1, keep_dims=True)
                self.loss = -tf.log(probs) * self.adv

            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_A).minimize(self.loss)

 

상태, 행동, 어드밴티지를 입력으로 받습니다.

with tf.name_scope('input'):
    self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
    self.a = tf.placeholder(tf.int32, name='action')
    self.adv = tf.placeholder(tf.float32, name='advantage')

 

손실 함수를 정의합니다.

with tf.name_scope('loss'):
    probs = tf.reduce_sum(self.pi * tf.one_hot(self.a, N_A), axis=1, keep_dims=True)
    self.loss = tf.log(probs) * self.adv

 

여기서는 n-스텝 TD 를 계산하기 때문에 행동 시퀀스를 입력으로 받습니다. 이것은 정수 인코딩된 값이므로 원-핫 인코딩을 통해 소프트맥스 함수를 통해 출력된 행렬에서 해당 인덱스의 확률 값을 계산하는 것입니다. 또한 TD 오차 어드밴티지를 사용하여 손실값을 계산합니다.

 

위의 손실값이 최대화(maximize) 하는 방향으로 학습해야 하므로 - 를 취합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)

 

크리틱 신경망을 정의합니다.

class Critic:
    def __init__(self):
        with tf.variable_scope('critic'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.td_target = tf.placeholder(tf.float32, [None, 1], name='td_target')

            with tf.variable_scope('layer'):
                self.fc = tf.layers.dense(self.s, 64, tf.nn.relu)

            with tf.variable_scope('output'):
                self.v = tf.layers.dense(self.fc, 1)

            with tf.name_scope('td_error'):
                self.td_error = self.td_target - self.v
                
            with tf.name_scope('loss'):
                self.loss = tf.reduce_mean(tf.square(self.td_error))

            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_C).minimize(self.loss)

 

손실 함수를 정의합니다.

with tf.name_scope('td_error'):
    self.td_error = self.td_target - self.v

with tf.name_scope('loss'):
    self.loss = tf.reduce_mean(tf.square(self.td_error))

 

TD 오차를 계산한 후 제곱의 평균을 계산하는 것으로 먼저 계산된 TD 오차는 어드밴티지로 사용합니다.

 

에피소드를 진행하여 상태 행동 보상을 메모리에 저장합니다.

for i in range(MAX_EPI + 1):
    memory = []
    
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)
        
        memory.append((state, action, reward))
        
        ...

 

n-스텝의 TD 타겟을 계산하여 업데이트를 수행합니다.

T_MAX = 8

...

if done or len(memory) == T_MAX:
    memory = np.array(memory)

    if done:
        v_ = 0
    else:
        v_ = agent.get_v(next_state)

    td_target = []
    for r in reversed(memory[:, 2]):
        v_ = r + GAMMA * v_
        td_target.append(v_)
    td_target.reverse()
    
    td_error = agent.update_critic(memory[:, 0], td_target)
    agent.update_actor(memory[:, 0], memory[:, 1], td_error)

    memory = []

 

에피소드가 종료되었다면 다음 상태의 추정 가치 V(s) 는 0으로 하여 리턴 계산과 동일하며 에피소드가 종료되지 않았다면 다음 상태의 추정 가치로부터 계산을 시작하는 것입니다.

 

저장된 보상 시퀀스를 역방향으로 n-스텝 리턴 또는 TD 타겟을 계산합니다.

td_target = []
for r in reversed(memory[:, 2]):
    v_ = r + GAMMA * v_
    td_target.insert(0, v_)

 

TD 타겟을 이용하여 크리틱 신경망과 액터 신경망을 업데이트합니다.

td_error = agent.update_critic(memory[:, 0], td_target)
agent.update_actor(memory[:, 0], memory[:, 1], td_error)

memory = []

 

크리틱 신경망에서는 TD 타겟과 추정 V(s) 간의 오차를 감소하는 방향으로 학습하는 것이고 액터 신경망은 크리틱에서 계산된 오차를 이용하여 정책을 평가하고 개선하는 방향으로 학습하는 것입니다.

 

def update_critic(self, state, td_target):
    _, td_error = self.sess.run([self.critic.train_op, self.critic.td_error], {self.critic.s: state.tolist(), self.critic.td_target: np.vstack(td_target)})
    return td_error

def update_actor(self, state, action, td_error):
    self.sess.run(self.actor.train_op, {self.actor.s: state.tolist(), self.actor.a: action, self.actor.adv: td_error})

def get_v(self, s):
    return self.sess.run(self.critic.v, {self.critic.s: s[np.newaxis, :]})[0, 0]

 

 

300번의 에피소드 동안 학습한 결과입니다.

 

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/a2c_acrobot/model/model
episode: 0 / step: 117
episode: 1 / step: 84
episode: 2 / step: 143
episode: 3 / step: 124
episode: 4 / step: 114

 

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

DDPG(Deep Deterministic Policy Gradient)  (0) 2020.11.30
A3C(Asynchronous Advantage Actor-Critic)  (0) 2020.11.30
정책 그래디언트(Policy Gradient)  (0) 2020.11.24
DQN(Deep Q-Networks) (2)  (0) 2020.11.24
DQN(Deep Q-Networks) (1)  (0) 2020.11.20
  Comments,     Trackbacks
정책 그래디언트(Policy Gradient)

Value-Based vs Policy-Based

정책 기반의 방법론에 대해 알아볼 것입니다.


하지만 그전에 가치 기반 방법론이 갖는 문제를 몇 가지 살펴보도록 하겠습니다.

 

SARSA, Q-learning 과 같은 가치 기반 방법론은 행동 가치 함수인 Q 를 학습하고 greedy 정책을 통해 각 상태에서 행동 가치가 가장 높은 최선의 행동을 선택하는데 이를 확정적 정책(deterministic policy) 이라고 합니다.

 

결정론적 정책은 가위바위보 같은 게임에서 적용할 수 있을까요?

 

가위바위보에서 최적의 정책은 하나의 최선의 행동이 아닌 동일한 확률 분포를 갖는 정책일 것입니다.

 

이러한 문제에서 정책 기반 방법론은 확률적 정책(stochastic policy)을 취할 수 있습니다.

 

또 다른 문제는 환경이 연속적인 행동 공간(continuous action space)을 갖는 경우입니다.

 

행동 집합이 0<=x<=1 의 실수 범위를 갖는다면 무한에 가까운 행동의 범위에 대하여 모든 가치를 저장하기 어려울 것입니다.

 

위와 같은 문제에서도 정책 기반 방법론을 통해 적용할 수 있습니다. 

 

 

정책 그래디언트(Policy Gradient, PG)

정책 기반의 방법론으로 가치 함수 없이 행동을 선택할 수 있는 파라미터 기반의 정책(parameterized policy)을 직접 학습하는 방법입니다. 가치 기반 방법론과는 달리 πθ 을 근사하는 것이기 때문에 Q 를 보고 행동을 선택하는 것이 아닌 상태 s 에 대해 바로 행동을 선택합니다.

 

이러한 정책 그래디언트 방법론은 확률적 정책(stochastic policy)을 취할 수 있는데 환경이 이산적인 행동 공간(discrete action space)을 갖는다면 이를 상태-행동 쌍에 대해 파라미터화된 수치적 선호도로 확률을 나타낼 수 있으며 이는 소프트맥스 함수(softmax function)로 하여 출력의 합을 1로 하는 확률 분포를 만듭니다.

 

π(a|s,θ)eh(s,a,θ)beh(s,b,θ)

 

소프트맥스 함수를 구현하면 다음과 같습니다.

import numpy as np

def softmax(array): 
    r = [] 
    for z in array: 
        r.append(np.exp(z) / sum(np.exp(array))) 
    return r 

a = np.array([0.3, 2.6, 4.1]) 

a = softmax(a)
print(a)
print(sum(a))
[0.01796126464567195, 0.1791489306951451, 0.8028898046591829]
1.0

 

텐서플로에서는 활성화 함수 tf.nn.softmax 를 이용합니다.

import tensorflow as tf

x = tf.placeholder(tf.float32, [None, 1])

y = tf.layers.dense(x, 3, tf.nn.softmax)

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    
    _y = sess.run(y, {x: [[0.1]]})
    
    print(_y)
    print(np.sum(_y))
[[0.35735703 0.3322687  0.3103743 ]]
1.0

 

 

또한 환경이 연속적인 행동 공간(continuous action space)을 갖는다면 정책 파라미터에 의해 추정된 연속 확률 분포(continuous probability distribution)에서 실수값의 행동을 선택할 수 있습니다. 그 연속 확률 분포를 나타내는 확률 밀도 함수(probability density function, PDF)에 대한 평균과 표준 편차는 정책 파라미터에 의한 것입니다.

 

가우시안 분포(gaussian distribution)를 나타내는 확률 밀도 함수는 다음과 같습니다.

 

p(x)1σ2πexp((xμ)22σ2)

 

확률 밀도 함수를 통해 가우시안 분포를 그리면 다음과 같습니다.

import numpy as np
import matplotlib.pyplot as plt

def pdf(x, mu, sigma):
    return 1 / (sigma * np.sqrt(2 * np.pi)) * np.exp(-(x - mu) ** 2 / (2 * sigma ** 2))

x = np.linspace(-5, 5, 100)

plt.plot(x, pdf(x, mu=0, sigma=1), x, pdf(x, mu=1, sigma=2))
plt.title('Gaussian distribution')
plt.xlabel('x')
plt.ylabel('probability density')
plt.show()

 

텐서플로에서는 가우시안 분포에 대한 함수 tf.distributions.Normal 를 이용할 수 있습니다.

import tensorflow as tf
import numpy as np

MU = 0.
SIGMA = 1.

def pdf(x):
    return 1 / (SIGMA * np.sqrt(2 * np.pi)) * np.exp(-(x - MU) ** 2 / (2 * SIGMA ** 2))

a = tf.placeholder(tf.float32)
dist = tf.distributions.Normal(tf.constant([MU]), tf.constant([SIGMA]))
sample = dist.sample()
prob = dist.prob(a)

x = -0.1

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())
    print('action sample:', sess.run(sample))
    print('probability density:', sess.run(prob, {a: x}))
print('probability density:', pdf(x))
action sample: [0.16933359]
probability density: [0.39695257]
probability density: 0.3969525474770118

 

 

정책 그래디언트 정리(Policy Gradient Theorem)

목적 함수는 다음과 같이 정의됩니다. (episodic 문제라고 가정)

 

J(\boldsymbol{\theta}) \doteq v_{\pi_{\theta}}(s_0)

 

에피소드의 시작 상태의 가치를 나타내는 것으로 이러한 목적 함수를 최대화하는 방향으로 정책 파라미터를 학습하고자 하는 것입니다. (s_0 무작위하지 않은 특정 상태에서 시작한다고 가정하는 것을 의미)

 

\boldsymbol{\theta}_{t+1} = \boldsymbol{\theta}_t + \alpha \widehat { \nabla J(\boldsymbol{\theta}_t) }

 

하지만 파라미터 기반의 함수 근사(function approximation)하는 것은 어려움이 있습니다. 이는 행동 선택과 그 행동으로 인한 상태의 분포가 달라진다는 것인데 정책에 의해 영향을 받은 상태의 분포는 환경에 따라 변하는 함수이고 이는 일반적으로 모른다는 것입니다.

 

정책의 변화가 상태의 분포에 미치는 영향을 모르는 상황에서 어떻게 정책 파라미터를 학습할 수 있을까요?

 

이는 정책 그래디언트 정리라는 이론적인 해결책을 통해 다음 관계를 성립합니다.

 

\begin{aligned}     \nabla J(\boldsymbol{\theta}) & \propto \sum_s \mu(s) \sum_a q_{\pi}(s,a) \nabla \pi (a|s, \boldsymbol{\theta}) & \scriptstyle{ \text{ episodic } } \\    & = \sum_s \mu(s) \sum_a q_{\pi}(s, a) \nabla \pi(a|s, \boldsymbol{\theta}) & \scriptstyle{ \text{ continuous } }     \end{aligned}

 

 

증명(Proof of Policy Gradient Theorem)

상태 가치 함수의 미분으로 시작합니다.

 

\begin{aligned}          \nabla v_{\pi}(s) & = \nabla \left[ \sum_a \pi_\theta(a|s) q_{\pi}(s, a) \right] & \\          & = \sum_a \Big[ \nabla \pi(a | s)q_{\pi}(s, a) + \pi(a|s) \color{red} {\nabla q_{\pi}(s, a)} \Big] & \scriptstyle{\text{곱의 법칙}} \\          & = \sum_a \Big[ \nabla \pi(a | s)q_{\pi}(s, a) + \pi(a|s) \color{red} {\nabla \sum_{s', r} p(s',r | s,a)(r + v_{\pi}(s')) } \Big] & \scriptstyle{q_{\pi} \text{ 를 } v_{\pi} \text{ 대한 식으로}} \\          & = \sum_a \Big[ \nabla \pi(a | s)q_{\pi}(s, a) + \pi(a|s) \sum_{s'} p(s' | s,a) \color{red} {\nabla v_{\pi}(s' )} \Big] & \scriptstyle{p(s',r | s,a) \text{ 와 } r \text{ 은 } \theta \text{ 의 함수가 아니므로 }} \\  & = \sum_a \Big[ \nabla \pi(a | s)q_{\pi}(s, a) + \pi(a|s) \sum_{s'} p(s' | s,a) \\   & \qquad\quad \sum_{a'} [ \nabla \pi(a' | s') q_{\pi}(s', a') + \pi(a'|s') \sum_{s'} p(s'' | s', a') \nabla v_{\pi}(s'') ] \Big] & \scriptstyle{ \text{ 이 과정을 무한히 반복 } } \\ & = \sum_{x \in S} \sum_{k=0}^\infty \text{Pr}( s \rightarrow x, k, \pi ) \sum_a \nabla \pi(a|x) q_{\pi} (x,a) \end{aligned}

 

Pr(s \rightarrow x, k, \pi) 는 정책 \pi 에 의해서 상태 s 에서 k 단계만에 전이할 확률을 나타냅니다.

 

\begin{aligned}      \nabla J(\boldsymbol{\theta}) & = \nabla v_{\pi}(s_0) \\      & = \sum_s \Big( \color{red} {\sum_{k=0}^\infty \text{Pr}( s_0 \rightarrow s, k, \pi )} \Big) \sum_a \nabla \pi(a|s) q_{\pi}(s,a) \\     & = \sum_s \color{red} {\eta (s)} \sum_a \nabla \pi(a|s) q_{\pi}(s,a) \\     & = \sum_s' \eta (s') \sum_s \color{red}{ \eta (s) \over \sum_{s'} \eta (s') } \sum_a \nabla \pi(a|s) q_{\pi}(s,a) \\   & = \sum_s' \eta (s') \sum_s \color{red}{ \mu(s) } \sum_a \nabla \pi(a|s) q_{\pi}(s, a) & \scriptstyle { \mu(s) \text{ 는 고정 분포(stationary distribution) } }\\   & \propto \sum_s \mu(s) \sum_a \nabla \pi (a|s) q_{\pi}(s, a) \end{aligned}  

 

\mu정책 \pi 을 따르는 고정 분포(stationary distribution)로 이를 on-policy distribution 라고 합니다.

 

episodic 문제의 경우 비례 상수 \propto 는 에피소드의 평균 길이이고 continous 문제의 경우 비례 상수는 1이 됨으로써 관계식은 등식으로 바뀌게 됩니다.

 

\begin{aligned}    \nabla J(\boldsymbol{\theta}) & \propto \sum_s \mu(s) \sum_a q_{\pi}(s,a) \nabla \pi (a|s, \boldsymbol{\theta}) & \scriptstyle{ \text{ episodic } } \\   & = \sum_s \mu(s) \sum_a q_{\pi}(s, a) \nabla \pi(a|s, \boldsymbol{\theta}) & \scriptstyle{ \text{ continuous } }    \end{aligned}

 

 

증명의 내용 중 일부 변동 사항은 다음 링크를 통해 확인할 수 있습니다.

 

Sutton and Barto, 2020; p325

Sutton and Barto, 2017; p269

 

 

REINFORCE(MC Policy Gradient)

정책 그래디언트를 기반으로 하며 기초가 되는 알고리즘입니다.

 

샘플 기반 그래디언트의 기댓값이 실제 그래디언트에 근사하는 것을 목적으로 다음과 같이 표현할 수 있습니다.

 

\begin{aligned} \nabla J(\boldsymbol{\theta}) & \propto \sum_s \mu(s) \sum_a q_{\pi}(s, a) \nabla \pi(a|s, \boldsymbol{\theta}) \\ & = \mathbb{E}_{\pi} \left[ q_{\pi}(S_t, a) \nabla \pi(a|S_t, \boldsymbol{\theta}) \right] \end{aligned}

 

또한 \nabla J(\theta) 는 다음과 같이 유도됩니다.

 

\begin{aligned}   \nabla J(\boldsymbol{\theta}) & = \mathbb{E}_{\pi} \left[ \color{red} {\sum_a \pi(a|S_t, \boldsymbol{\theta})} q_{\pi}(S_t, a) { \nabla \pi(a|S_t, \boldsymbol{\theta}) \over \pi(a|S_t, \boldsymbol{\theta})} \right] & \scriptstyle{ \text{ 행동 a 를 샘플 } A_t \thicksim \pi \text{ 로 대체 } } \\   & = \mathbb{E}_{\pi} \left[ \color{red}{q_{\pi}(S_t, A_t)} { \nabla \pi(A_t|S_t, \boldsymbol{\theta}) \over \pi(A_t|S_t, \boldsymbol{\theta})} \right] & \scriptstyle{ \mathbb{E}_{\pi}[ G_t | S_t, A_t ] \; = \; q_{\pi}(S_t, A_t) } \\  & = \mathbb{E}_{\pi} \left[ G_t \color{red}{ \nabla \pi(A_t|S_t, \boldsymbol{\theta}) \over \pi(A_t| S_t, \boldsymbol{\theta})} \right] & \scriptstyle{ \nabla \text{ln}x \; = \; { \nabla x \over x }} \\  & = \mathbb{E}_{\pi} \Big[ G_t \nabla \text{ln } \pi(A_t|S_t, \boldsymbol{\theta}) \Big]  \end{aligned} 

이처럼 REINFORCE 는 리턴 G_t 를 이용하기 때문에 MC policy gradient 라고도 합니다.

 

 

위에서 얻은 그래디언트를 통해 정책 파라미터는 다음과 같이 업데이트됩니다.

 

\boldsymbol{\theta}_{t+1} \doteq \boldsymbol{\theta}_t + \alpha \, G_t \nabla \text{ln } \pi(A_t|S_t, \boldsymbol{\theta_t})

 

파라미터의 벡터 \boldsymbol{\theta_{t+1}} 의 증가량은 리턴 G_t 에 의해 비례하고 행동이 선택될 확률에 반비례합니다. 행동이 선택될 확률에 반비례하지 않으면 자주 선택되는 행동이 유리할 것이고 최대의 리턴을 얻지 못하더라도 더 많이 선호될 수 있다는 것입니다.

 

 

알고리즘은 다음과 같습니다.

 

 

 

구현

REINFORCE 알고리즘 테스트 환경은 OpenAI Gym  CartPole-v0 입니다.

 

 

CartPole 은 좌우로 카트를 이동하여 막대기가 쓰러지지 않도록 오래 유지하는 문제입니다. 

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('CartPole-v0')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.n)

 

상태 정보는 4차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 스칼라로 2개의 범위를 갖는 이산적인 공간입니다.

obs: 4
act: 2

 

신경망을 정의합니다.

def build_net(self):
    with tf.name_scope('inputs'):
        self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
        self.a = tf.placeholder(tf.int32, name='action')
        self.g = tf.placeholder(tf.float32, name='return')

    with tf.variable_scope('layer'):
        self.fc = tf.layers.dense(self.s, 32, tf.nn.relu)

    with tf.variable_scope('output'):
        self.act_prob = tf.layers.dense(self.fc, N_A, tf.nn.softmax)

    with tf.name_scope('loss'):
        self.loss = tf.log(self.act_prob[0, self.a]) * self.g

    with tf.name_scope('optimizer'):
        self.train_op = tf.train.AdamOptimizer(LR).minimize(-self.loss)

 

상태, 행동, 리턴을 입력으로 받습니다.

with tf.name_scope('inputs'):
    self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
    self.a = tf.placeholder(tf.int32, name='action')
    self.g = tf.placeholder(tf.float32, name='return')

 

출력층으로 입력으로 받은 상태에 대한 행동의 확률을 출력합니다.

with tf.variable_scope('output'):
    self.act_prob = tf.layers.dense(self.fc, N_A, tf.nn.softmax)

 

소프트맥스 활성화 함수를 이용하여 출력의 합이 1인 확률 분포를 출력하는 것입니다.

 

출력된 확률은 np.random.choice 를 이용해서 행동을 선택할 수 있습니다.

def get_action(self, s):
    act_prob = self.sess.run(self.act_prob, {self.s: s[np.newaxis, :]})[0]
    return np.random.choice(range(N_A), p=act_prob)

 

손실 함수를 정의합니다.

with tf.name_scope('loss'):
    self.loss = tf.log(self.act_prob[0, self.a]) * self.g

 

해당 타임스텝에서 취한 행동에 대해서 리턴 값이 양수이면 취한 행동의 확률은 증가하고 음수이면 그 확률은 감소할 것입니다. 또한 log 함수를 통해 높은 확률을 갖는 행동에 대해서는 변화량이 작으며 낮은 확률을 갖는 행동의 변화량은 클 것입니다.

 

마이너스를 붙여 손실값이 최대가 되는 방향으로 학습하도록 설정합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.AdamOptimizer(LR).minimize(-self.loss)

 

에이전트와 환경의 상호작용입니다.

num_episode = 200
for i in range(num_episode):
    memory = []

    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)
        memory.append((state, action, reward))
        state = next_state
    
    agent.train(memory)

 

에이전트는 각 타임스텝의 상태, 행동, 보상을 메모리에 저장하며 에피소드가 종료되면 학습을 진행합니다.

 

역방향으로 계산하여 감쇠된 리턴은 각 상태에 대해 업데이트를 수행합니다.

def train(self, memory):
    g = 0
    for s, a, r in reversed(memory):
        g = r + GAMMA * g
        self.sess.run(self.train_op, {self.s: s[np.newaxis, :], self.a:a, self.g: g})

 

 

 

200번의 에피소드 동안 학습한 결과입니다.

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/reinforce_cartpole/model/model
episode: 0 / step: 123
episode: 1 / step: 200
episode: 2 / step: 179
episode: 3 / step: 144
episode: 4 / step: 200

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

A3C(Asynchronous Advantage Actor-Critic)  (0) 2020.11.30
액터-크리틱(Actor-Critic)  (0) 2020.11.27
DQN(Deep Q-Networks) (2)  (0) 2020.11.24
DQN(Deep Q-Networks) (1)  (0) 2020.11.20
함수 근사(Function Approximation)  (0) 2020.11.18
  Comments,     Trackbacks
DQN(Deep Q-Networks) (2)

Double DQN

논문 Deep Reinforcement Learning with Double Q-learning 으로 소개되었습니다.

 

DQN 에서 Q 의 가치가 과대 평가되는 문제를 해결하기 위한 아이디어입니다. 타겟을 계산하는데 있어 DQN 과 차이가 있는데 Double DQN 은 일반 네트워크 \theta 에서 행동 가치가 가장 높은 행동을 찾고 그 행동에 대한 가치는 타겟 네트워크 \theta^- 에서 구합니다.

 

Y_t^{\text{DQN}} \equiv R_{t+1} + \gamma {\underset{a} {\text{max}}}Q(S_{t+1}, a; \boldsymbol{\theta}_t^-)

 

Y_t^{\text{DoubleDQN}} \equiv R_{t+1} + \gamma Q(S_{t+1}, {\underset{a} {\text{argmax}}} Q(S_{t+1}, a; \boldsymbol{\theta}_t); \boldsymbol{\theta}_t^-)

 

 

이는 DQN 과 DDQN 의 Q 에 대해 최적 가치 V_*(s) 와의 오차를 비교한 것입니다.

 

DQN 은 취할 수 있는 행동의 수가 증가함에 따라 최대 행동 가치는 더욱 과대 평가되어 오차는 증가합니다.

 

 

Dueling DQN

논문 Dueling Network Architectures for Deep Reinforcement Learning 으로 소개되었습니다.

 

행동 가치 함수 Q 를 가치 함수 V 와 어드밴티지 함수 A 로 나눈다는 아이디어입니다. A 를 이용하여 상태별 행동 가치에 대해 평균을 제하고 V 를 더함으로써 행동 가치 함수 Q 를 나타내는 것입니다.

 

Q(s, a) = V(s) + A(s, a) - \frac{1}{|\mathcal{A}|}\sum_{a} A(s, a)

 

DQN vs Dueling DQN

 

 

구현

Double DQN 과 Dueling DQN 을 모두 포함하는 DDDQN 알고리즘을 구현합니다.

 

알고리즘 테스트 환경은 OpenAI GymLunarLander-v2 입니다.

 

 

LunarLander 는 드론을 깃발안에 안정적으로 착륙하는 문제입니다. 드론이 안정적인 착륙에 성공하면 좋은 보상을 받을 수 있으며 깃발 안쪽에 착륙한다면 더 큰 보상을 받습니다. 반대로 화면에서 사라지거나 착륙에 실패하면 나쁜 보상을 받게 됩니다.

 

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('LunarLander-v2')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.n)

 

상태 정보는 8차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 4개의 범위를 갖는 이산적인 공간입니다.

obs: 8
act: 4

 

신경망을 정의합니다.

class Network:
    def __init__(self, scope):
        with tf.variable_scope(scope):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S])
                self.q_target = tf.placeholder(tf.float32, [None, N_A])
            
            with tf.variable_scope('layer'):
                self.fc1 = tf.layers.dense(self.s, 256, tf.nn.relu)
                self.fc2 = tf.layers.dense(self.fc1, 128, tf.nn.relu)
                self.v = tf.layers.dense(self.fc2, 1)
                self.adv = tf.layers.dense(self.fc2, N_A)
                
            with tf.variable_scope('output'):
                self.q = self.v + (self.adv - tf.reduce_mean(self.adv, axis=1, keep_dims=True))
                
            with tf.name_scope('loss'):
                self.loss = tf.losses.mean_squared_error(self.q_target, self.q)
                
            with tf.name_scope('optimizer'):
                self.train_op = tf.train.RMSPropOptimizer(LR).minimize(self.loss)
        
            self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)

 

Dueling DQN 에서 제안하는 구조로 층을 구성합니다.

with tf.variable_scope('layer'):
    self.fc1 = tf.layers.dense(self.s, 256, tf.nn.relu)
    self.fc2 = tf.layers.dense(self.fc1, 128, tf.nn.relu)
    self.v = tf.layers.dense(self.fc2, 1)
    self.adv = tf.layers.dense(self.fc2, N_A)

 

두 번째 층에서 VA 의 층으로 연결되는 것을 볼 수 있습니다. 또한 A 층의 출력이 행동의 크기입니다.

 

V 와 평균을 제한 A 를 합하여 Q 를 나타냅니다.

with tf.variable_scope('out'):
    self.q = self.v + (self.adv - tf.reduce_mean(self.adv, axis=1, keep_dims=True))

 

손실 함수를 정의합니다.

with tf.name_scope('loss'):
    self.loss = tf.losses.mean_squared_error(self.q_target, self.q)

 

타겟과 예측 사이의 평균 제곱 오차(mean squared error, MSE)Q 타겟과 추정 Q 간의 계산입니다.

 

손실값을 최소화하는 방향으로 학습하도록 설정합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.RMSPropOptimizer(LR).minimize(self.loss)

 

\epsilon-greedy 정책을 정의합니다.

self.epsilons = np.linspace(self.epsilon, 0.1, 200000)

...

def decay_epsilon(self):
    self.epsilon = self.epsilons[min(self.train_cnt, 200000-1)]
        
def get_action(self, s):
    if np.random.rand() < self.epsilon:
        action = np.random.randint(N_A)
    else:
        q = self.sess.run(self.main.q, {self.main.s: s[np.newaxis, :]})
        action = np.argmax(q)
    return action
    
def train(self):
    ...

    self.train_cnt += 1

    self.decay_epsilon()

 

일반 신경망 업데이트 횟수가 200000번까지 \epsilon 을 감소시키며 그 이후부터는 \epsilon 을 0.1로 유지합니다.

 

 

일반 신경망을 업데이트합니다.

def train(self):
    minibatch = random.sample(self.replay_memory, BATCH_SIZE)
    minibatch = np.array(minibatch)

    q = self.sess.run(self.main.q, {self.main.s: minibatch[:, 4].tolist()})
    q_a = np.argmax(q, axis=1)

    target_q = self.sess.run(self.target.q, {self.target.s: minibatch[:, 4].tolist()})
    update_value = minibatch[:, 2] + GAMMA * target_q[range(len(minibatch)), q_a] * ~minibatch[:, 3].astype(np.bool)


    q_target = self.sess.run(self.main.q, {self.main.s: minibatch[:, 0].tolist()})
    q_target[range(len(minibatch)), minibatch[:, 1].tolist()] = update_value

    _, loss = self.sess.run([self.main.train_op, self.main.loss], {self.main.s: minibatch[:, 0].tolist(),
                                                                   self.main.q_target: q_target})

    self.train_cnt += 1

    self.decay_epsilon()

    return loss

 

Double DQN 에서 제안한 식으로 타겟을 계산합니다.

q = self.sess.run(self.main.q, {self.main.s: minibatch[:, 4].tolist()})
q_a = np.argmax(q, axis=1)

target_q = self.sess.run(self.target.q, {self.target.s: minibatch[:, 4].tolist()})
update_value = minibatch[:, 2] + GAMMA * target_q[range(len(minibatch)), q_a] * ~minibatch[:, 3].astype(np.bool)

 

일반 신경망의 Q_\theta 에서 최대의 행동 가치를 갖는 행동 시퀀스를 구하고 이를 이용하여 타겟 신경망의 Q_{\theta^-} 에서 행동 가치를 구하여 계산합니다.

 

Y_t^{\text{DoubleDQN}} \equiv R_{t+1} + \gamma Q(S_{t+1}, {\underset{a} {\text{argmax}}} Q(S_{t+1}, a; \boldsymbol{\theta}_t); \boldsymbol{\theta}_t^-)

 

 

1000번의 에피소드 동안 학습한 결과입니다.

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/dddqn_lunarlander/model/model
episode: 0 / reward 276.7014293397136
episode: 1 / reward 22.280288275624372
episode: 2 / reward 231.75701431710345
episode: 3 / reward 215.13574884255914
episode: 4 / reward 235.63575783693358
episode: 5 / reward 283.90393140514414
episode: 6 / reward 285.0946617743626
episode: 7 / reward -62.710477314100785
episode: 8 / reward -28.79188154959367
episode: 9 / reward 34.11406993609501

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

액터-크리틱(Actor-Critic)  (0) 2020.11.27
정책 그래디언트(Policy Gradient)  (0) 2020.11.24
DQN(Deep Q-Networks) (1)  (0) 2020.11.20
함수 근사(Function Approximation)  (0) 2020.11.18
Model-Free Control  (0) 2020.11.13
  Comments,     Trackbacks
DQN(Deep Q-Networks) (1)

DQN(Deep Q-Networks)

논문 Playing Atari with Deep Reinforcement Learning 으로 소개되었습니다.

 

Q-learning 을 기반으로 하며 연속된 상태 공간(continuous state space)을 가지는 환경의 문제를 해결하기 위한 아이디어입니다. Q 테이블에 모든 상태의 행동 가치를 저장하는 대신 신경망을 이용해서 Q_* 에 근사하는 것을 목적합니다. 

 

 

또한 안정적인 학습과 성능 향상을 위해 fixed Q-targets  experience replay 기법을 이용합니다.

 

 

손실 함수

벨만 최적 방정식을 기반으로 하여 각 상태에서 취할 수 있는 최적의 전략은 다음 상태 s' 와 다음 행동 a' 에 대한 r + \gamma Q^*(s',a') 의 기댓값을 최대화하는 행동을 선택하는 것입니다.

 

Q^*(s,a) = \mathbb{E}_{s'\thicksim \varepsilon} \left[ r + \gamma {\underset {a'} {\text{max}} } Q^*(s',a') | s,a \right]

 

 

value iteration 과 같은 과정을 무한히 반복하여 계산하면 Q_i 는 최적 Q^* 로 수렴하는 것입니다.

 

Q_{i+1}(s,a) = \mathbb{E}[ r + \gamma \text{max}_{a'} Q_i(s',a'|s,a ]

 

Q_i \rightarrow Q^* \text{ as } i \rightarrow \infty 

 

하지만 이는 선형(linear) 문제이며 신경망과 같은 비선형 function approximator 를 사용한다면 다음과 같은 손실 함수를 사용해야 합니다.

 

L_i(\theta_i) = \mathbb{E}_{s,a \thicksim p(\cdot)} \left[ (y_i - Q(s,a;\theta_i))^2 \right]

 

타겟과 예측의 오차를 계산하는 것으로 이를 최소화하는 방향으로 학습하는 것입니다.

 

 

타겟 y_i 는 이전 파라미터 \theta_{i-1} 에서 계산합니다.

 

y_i = \mathbb{E}_{s,a \thicksim p(\cdot)} \left[ ( r + \gamma {\underset {a'} {\text{max}}}Q(s',a'; \theta_{i-1})|s,a \right] 

 

 

손실 함수를 미분하면 다음과 같은 그래디언트 식을 얻을 수 있습니다.

 

\nabla_{\theta_i} L_i(\theta_i) = \mathbb{E}_{s,a \thicksim p(\cdot)} \left[ ( r + \gamma {\underset {a'} {\text{max}}}Q(s',a'; \theta_{i-1} - Q(s,a;\theta_i) ) \nabla_{\theta_i} Q(s,a;\theta_i) \right]

 

 

타겟 네트워크(Target Network)

위 식 L_i(\theta_i) 의 문제는 타겟 y_i 가 고정되어 있지 않다는 것인데 같은 신경망으로부터 타겟과 예측을 계산한다면 매번 다른 값이 나올 수 있기 때문에 안정적이지 못합니다. 따라서 동일한 구조의 타겟 신경망을 두어 안정적인 학습을 가능하게 하는 것입니다.

 

일반 신경망은 매 타임스텝마다 업데이트되며 타겟 신경망은 일정 주기마다 일반 신경망의 파라미터를 타겟 신경망의 파라미터로 한 번에 복사하게 됩니다. 즉 타겟 신경망을 일정 주기만큼 고정하는 것이라고 할 수 있습니다.

 

이를 통해 손실 함수를 다시 정의하면 다음과 같습니다.

 

L_i(\theta_i) = \mathbb{E}_{s,a \thicksim p(\cdot)} \left[ \left( r + \gamma {\underset {a'} {\text{max}}} Q(s',a'; \color {red} {\theta_i^-}) - Q(s,a;\theta_i) \right)^2 \right]

 

 

경험 리플레이(Experience replay)

이전 경험들을 반복하는 기법을 경험 리플레이(experience replay)라고 합니다. 

 

에피소드를 진행하면서 얻은 샘플 데이터를 리플레이 메모리(replay memory)에 저장하고 일정 개수만큼 무작위로 추출하여 미니배치(mini-batch)를 구성하고 업데이트를 수행합니다. 이에 따라 데이터간에 상관관계(correlation)를 줄여준다는 이점이 있습니다.

 

이러한 경험 리플레이 기법은 DQN, DDPG 과 같은 off-policy 알고리즘에만 사용할 수 있습니다.

 

또한 메모리안의 샘플 데이터는 큐(queue)를 이용한 FIFO(First In First Out) 구조로 저장하여 오래된 데이터가 먼저 폐기됩니다.

 

 

 

DQN 에 사용된 기법들의 사용유무에 대한 실험 결과는 다음과 같습니다.

 

 

 

알고리즘

 

 

 

구현

알고리즘 테스트 환경은 OpenAI GymCartPole-v0 입니다.

 

 

CartPole 은 좌우로 카트를 이동하여 막대기가 쓰러지지 않도록 오래 유지하는 문제입니다. 

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('CartPole-v0')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.n)

 

상태 정보는 4차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 스칼라로 2개의 범위를 갖는 이산적인 공간입니다.

obs: 4
act: 2

 

신경망을 정의합니다.

class Network:
    def __init__(self, scope):
        with tf.variable_scope(scope):
            with tf.name_scope('inputs'):
                self.s = tf.placeholder(tf.float32, [None, N_S])
                self.q_target = tf.placeholder(tf.float32, [None, N_A])
            
            with tf.variable_scope('layer'):
                self.fc = tf.layers.dense(self.s, 16, tf.nn.relu)
            
            with tf.variable_scope('out'):
                self.q = tf.layers.dense(self.fc, N_A)
                
            with tf.name_scope('loss'):
                self.loss = tf.losses.mean_squared_error(self.q_target, self.q)
                
            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR).minimize(self.loss)
        
            self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)

 

상태와 Q 타겟을 입력으로 받습니다.

with tf.name_scope('input'):
  self.s = tf.placeholder(tf.float32, [None, N_S])
  self.q_target = tf.placeholder(tf.float32, [None, N_A])

 

출력층으로 입력으로 받은 상태에 대한 Q 를 출력합니다.

with tf.variable_scope('output'):
    self.q = tf.layers.dense(self.fc, N_A)

 

손실 함수를 정의합니다.

with tf.name_scope('loss'):
    self.loss = tf.losses.mean_squared_error(self.q_target, self.q)

 

타겟과 예측 사이의 평균 제곱 오차(mean squared error, MSE)Q 타겟과 추정 Q 간의 계산입니다.

 

손실값을 최소화하는 방향으로 학습하도록 설정합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.AdamOptimizer(LR).minimize(self.loss)

 

\epsilon-greedy 정책을 정의합니다.

def decay_epsilon(self):
    if self.epsilon > 0.1:
        self.epsilon -= self.decay_rate
    return self.epsilon

def get_action(self, s):
    if np.random.rand() < self.epsilon:
        action = np.random.randint(N_A)
    else:
        q = self.sess.run(self.main.q, {self.main.s: s[np.newaxis, :]})
        action = np.argmax(q)
    return action

...

for i in range(MAX_EPI + 1):
    agent.decay_epsilon()
    
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        ...

 

감쇠 비율은 전체 에피소드 수에 대한 현재 에피소드의 비율입니다. 따라서 매 에피소드마다 엡실론 \epsilon 을 감쇠하여 랜덤한 행동과 최선의 행동 사이의 확률을 조정합니다. \epsilon 의 확률로 랜덤한 행동을 선택하고 1 - \epsilon 의 확률로 최선의 행동을 선택하게 됩니다.

 

 

에이전트와 환경의 상호작용입니다.

agent = DQN(epsilon=1, decay_rate=1/MAX_EPI)

for i in range(MAX_EPI + 1):
    epi_steps = 0

    agent.decay_epsilon()
    
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        
        next_state, reward, done, _ = env.step(action)
        
        if done:
        	reward = -1
        
        agent.replay_memory.append((state, action, reward, done, next_state))
        
        if len(agent.replay_memory) >= BATCH_SIZE:
            agent.train()
        
        if (agent.train_cnt + 1) % UPDATE == 0:
            agent.update_target()
            print('** update **')
        
        state = next_state
        epi_steps += 1

 

매 타임스텝마다 환경으로 받은 정보 s, a, r, d, s' 를 리플레이 메모리에 저장합니다.

for i in range(MAX_EPI + 1):
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)   
        next_state, reward, done, _ = env.step(action)
        agent.replay_memory.append((state, action, reward, done, next_state))

 

메모리의 크기가 일정 크기 이상이면 매 타임스텝마다 일반 신경망 업데이트를 수행합니다.

if len(agent.replay_memory) > agent.replay_memory * 0.1:
    agent.train()

 

일반 신경망을 업데이트합니다.

def train(self):
    minibatch = random.sample(self.replay_memory, BATCH_SIZE)
    minibatch = np.array(minibatch)

    target_q = self.sess.run(self.target.q, {self.target.s: minibatch[:, 4].tolist()})
    update_value = minibatch[:, 2] + GAMMA * np.max(target_q, axis=1) * ~minibatch[:, 3].astype(np.bool)

    q_target = self.sess.run(self.main.q, {self.main.s: minibatch[:, 0].tolist()})
    q_target[range(len(minibatch)), minibatch[:, 1].tolist()] = update_value

    _, loss = self.sess.run([self.main.train_op, self.main.loss], {self.main.s: minibatch[:, 0].tolist(),
                                                                   self.main.q_target: q_target})

    self.train_cnt += 1

    return loss

 

리플레이 메모리에 저장된 샘플을 배치 사이즈만큼 무작위로 추출합니다.

minibatch = random.sample(self.replay_memory, BATCH_SIZE)

 

계산의 편의를 위해 리스트를 numpy.ndarray 타입으로 변환합니다.

minibatch = np.array(minibatch)

 

다음 상태의 시퀀스를 입력으로 타겟 신경망의 Q(s';\theta^-) 를 구합니다. 

target_q = self.sess.run(self.target.q, {self.target.s: minibatch[:, 4].tolist()})

 

 

타겟인 r + \gamma {\underset{a'} {\text{maxQ}}}(s', a'; \theta^-) 를 계산합니다.

update_value = minibatch[:, 2] + GAMMA * np.max(target_q, axis=1) * ~minibatch[:, 3].astype(np.bool)

 

여기서 minibatch[:, 2] 는 보상의 시퀀스이며 np.max(target_q, axis=1) 은 최대 행동 가치 \underset{a'} {\text{max}} Q(s', a'; \theta^-) 시퀀스입니다. 또한 minibatch[:, 3] 은 에피소드 종료 여부를 나타내는 True/False 의 bool 자료형을 가지는 시퀀스이며 앞에 ~ 연산자는 numpy.ndarray 타입의 자료형에서 반전을 나타냅니다. 따라서 종료되지 않은 에피소드에 한해서만 \gamma \underset{a'} {\text{max}} Q(s', a'; \theta^-) 을 계산하라는 것입니다.

 

 

 

일반 신경망의 Q(s;\theta) 에 취한 행동에 대해 계산된 가치를 넣어 타겟의 형태를 완성합니다.

q_target = self.sess.run(self.main.q, {self.main.s: minibatch[:, 0].tolist()})
q_target[range(len(minibatch)), minibatch[:, 1].tolist()] = update_value

 

일반 신경망 업데이트를 수행합니다.

_, loss = self.sess.run([self.main.train_op, self.main.loss], {self.main.s: minibatch[:, 0].tolist(), self.main.q_target: q_target}

 

일반 신경망 업데이트 횟수가 타겟 네트워크 업데이트 주기에 걸리면 그 동안 학습된 일반 신경망의 파라미터 \theta 를 타겟 신경망 파라미터 \theta^- 로 복사합니다. 

self.update_op = [t_p.assign(m_p) for t_p, m_p in zip(self.target.params, self.main.params)]

...

def update_target(self):
    self.sess.run(self.update_op)

...

if (agent.train_cnt + 1) % UPDATE == 0:
    agent.update_target()

 

 

200번의 에피소드를 학습한 결과입니다.

 

 

저장한 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/dqn_cartpole/model/model
episode: 0 / step 200
episode: 1 / step 200
episode: 2 / step 200
episode: 3 / step 200
episode: 4 / step 200

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

정책 그래디언트(Policy Gradient)  (0) 2020.11.24
DQN(Deep Q-Networks) (2)  (0) 2020.11.24
함수 근사(Function Approximation)  (0) 2020.11.18
Model-Free Control  (0) 2020.11.13
소프트맥스 회귀(Softmax Regression) (2)  (0) 2020.11.12
  Comments,     Trackbacks
함수 근사(Function Approximation)

배경

강화학습의 환경에는 크게 두 가지의 상태 공간으로 나눌 수 있습니다.

 

어떤 환경에서 0~20의 상태를 가진다고 하면 이를 이산적 상태 공간(discrete state space) 이라고 하며 모든 상태를 테이블에 기록할 수 있습니다.

 

하지만 실수 범위 내에서 연속적인 값을 가질 수도 있는데 이를 연속적인 상태 공간(continuous state space) 이라고 하며 상태의 개수가 무한히 혹은 무수히 많으면 테이블에 모든 상태를 기록할 수 없습니다.

 

이와 같은 문제를 해결하기 위해 다른 접근법이 필요합니다.

 

 

함수(Function)

다음과 같은 함수가 있습니다.

 

f(x) = x + 3

 

이 함수에 입력으로 1을 넣으면 4, 3을 넣으면 6을 출력합니다.

 

f(1) = 4

 

f(3) = 6

 

상태별 가치를 테이블에 기록한다고 보면 s_1 의 가치는 4, s_3 의 가치는 6이 기록되어 있는 것이라고 볼 수 있습니다.

 

 

이번에는 a, b 변수를 가지는 함수가 있습니다.

 

f(x) = ax + b

 

 

이 함수에 s_{10} = 20, s_{20} = 30 을 기록한다면 다음과 같이 a, b 를 구할 수 있습니다.

 

f(10) = 10a + b = 20

 

f(20) = 20a + b = 30

 

a=1, b=10

 

 

s_{20} = 30s_{20} = 40 로 수정하고 싶다면 a, b 를 다음과 같이 업데이트하면 됩니다.

 

f(10) = 10a + b = 20


f(20) = 20a + b = 40

 

a=2, b=0

 

 

또한 입력으로 넣은 10, 20 외에는 모르는 상황이라고 가정해봅니다. 이런 상황에서도 함수에 30을 넣으면 60이라는 값이 나오게 될 것입니다. 즉 모르는 입력에 대해서도 값을 출력하는데 이것을 일반화(generalization) 이라고 합니다.

 

강화 학습의 관점에서 보면 가보지 않은 상태에 대해서도 어떤 일반화된 가치를 반환하는 것입니다. 가보지 않은 상태의 가치는 알 수 없는 테이블룩업 방식과 비교하면 굉장히 효율적인 방법론이라고 할 수 있습니다.

 

 

함수 근사(Function Approximation)

이러한 함수의 개념을 이용해서 실제 가치 함수에 근사하고자 하는 것입니다.

 

다음과 같이 빨간색으로 표시된 함수에 상태 s 를 넣으면 가치 함수 \hat v(s,w) 를 출력하거나 행동 가치 함수 \hat q(s,a,w) 를 출력하게 한다는 것입니다.

 

 

그리고 이 추정된 가치 함수는 실제 가치 함수에 근사한 것입니다.

 

\hat v(s, w) \approx v_\pi(s)


\hat q(s, a, w) \approx q_\pi(s,a)

 

또한 w 는 함수의 파라미터이며 학습해야할 대상으로 추정된 가치 함수와 실제 가치 함수의 오차를 최소화하기 위해서는 최적의 파라미터를 구해야할 것입니다.

 

어떤 방법으로 최적의 파라미터를 구할 수 있을까요?

 

대표적인 function approximator 로는 신경망(neural network)이 있으며 경사 하강법을 통해 파라미터를 업데이트하는 과정을 알아보겠습니다.

 

 

경사 하강법(Gradient Descent)

신경망을 학습하는 것은 목적 함수를 최소로 하는 방향으로 파라미터를 업데이트하는 것이라고 할 수 있습니다.

 

먼저 목적 함수를 정의해야 합니다.

 

실제 가치 v_\pi(S) 와 추정 가치 \hat v(S, w) 의 차이로 나타낼 수 있습니다.

 

J(w) = \mathbb{E}_\pi[(v_\pi(S) - \hat v(S, w))^2]

 

신경망에는 파라미터 벡터 w_1, w_2, \dots 가 있으며 각 파라미터가 J(w) 에 대한 기여율을 알아보기 위해 편미분을 합니다.

 

\nabla_wJ(w) = ({\partial J(w) \over \partial w_1}, {\partial J(w) \over \partial w_2}, \cdots, {\partial J(w) \over \partial w_n})

 

각 파라미터에 대해 편미분하여 벡터로 나타낸 것이고 이를 각 파라미터에 대한 그래디언트(gradient) 라고 합니다.

 

이 그래디언트를 조금씩 업데이트하기 위해 learning rate 혹은 step size 라고 하는 작은 상수 \alpha 를 곱하여 업데이트의 변화를 조절하며 각 파라미터에 대해 계산해보면 다음과 같습니다.

 

w'_1 = w_1 - a * {\partial J(w) \over \partial w_1}

 

w'_2 = w_2 - a * {\partial J(w) \over \partial w_2}

 

\dots

 

w'_n = w_n - a * {\partial J(w) \over \partial w_n}

 

이를 하나의 식으로 나타내면 다음과 같습니다.

 

w' = w - a * \nabla_w J(w)

 

이처럼 그라디언트를 계산해 파라미터를 업데이트하여 목적 함수를 최소화하는 과정을 경사 하강법이라고 합니다.

 

텐서플로(TensorFlow)파이토치(PyTorch) 같은 자동 미분 라이브러리를 이용하면 이러한 계산 과정은 옵티마이저(optimizer)를 통해 자동으로 계산되어 업데이트됩니다.

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

DQN(Deep Q-Networks) (2)  (0) 2020.11.24
DQN(Deep Q-Networks) (1)  (0) 2020.11.20
Model-Free Control  (0) 2020.11.13
소프트맥스 회귀(Softmax Regression) (2)  (0) 2020.11.12
Model-Free Prediction  (0) 2020.11.09
  Comments,     Trackbacks
Model-Free Control

행동 가치 함수(Action Value Function)

model-free 에서 가치 함수 V(s) 를 추정하는 방법에는 MC 와 TD 가 있었습니다. 하지만 이것만으로는 각 상태에서 어떤 행동을 선택해야 하는지 알 수 없습니다. 따라서 정책 개선 또는 제어(control)하기 위해서는 상태 가치 함수 V(s) 대신에 행동 가치 함수 Q(s,a) 를 구해야 합니다. Q(s,a) 를 이용하면 높은 가치를 가지는 행동을 선택할 수 있기 때문입니다. 또 이러한 행동 가치 함수를 이용한 제어를 행동 가치 기반 방법론이라고 합니다.

 

MC prediction 을 통해 Q(s,a) 를 구하고 이를 이용하여 greedy 정책을 생성합니다.

import time
import numpy as np
import gym

env = gym.make('FrozenLake-v0', is_slippery=False)

q_table = np.zeros((env.observation_space.n, env.action_space.n))

gamma = 0.99
alpha = 0.1

num_episode = 10000
for i in range(num_episode):
    memory = []
    
    state = env.reset()
    done = False
    while not done:
        action = env.action_space.sample()
        next_state, reward, done, _ = env.step(action)
        memory.append((state, action, reward))
        state = next_state
    
    g = 0
    visted = []
    for s, a, r in reversed(memory):
        if s in visted: # 1-visited
            continue
            
        g = r + gamma*g
        q_table[s, a] = q_table[s, a] + alpha*(g-q_table[s, a])
        visted.append(s)

state = env.reset()
done = False
while not done:
    env.render()
    time.sleep(0.2)
    action = np.argmax(q_table[state])
    
    state, reward, done, _ = env.step(action)

if reward == 1:
    print('success')
else:
    print('failure')
        
# print(q_table)
success

 

작은 환경의 경우에는 무작위 정책으로도 실제 가치(true value)에 근사할 만큼 충분한 탐색이 가능했습니다. 하지만 복잡한 환경에서는 무작위 정책으로는 모든 상태를 탐색하기 어려울 것입니다. 모든 상태를 탐색하지 않으면 실제 가치에 근사할 수 없게 됩니다.

 

 

입실론 그리디(\epsilon-Greedy)

무작위 행동과 최선의 행동 사이에 확률적으로 선택함으로써 지속적이고 충분한 탐색(exploration)이용(exploitaion)을 가능하게 하여 정책을 개선하는 방법입니다.

 

f(n)= \begin{cases} 1-\epsilon \text{ if } a^* = & \mbox{ $\underset{a} {argmax}$ q(s,a)} \\ \epsilon & \mbox{ otherwise} \end{cases}

 

\epsilon-greedy 의 \epsilon 는 무작위 행동을 선택할 확률을 나타내는 것이며 이를 점점 감소시키는 전략을 사용하는데 이를 decaying \epsilon-greedy 라고 합니다.

 

그 목적은 Q 테이블이 학습되지 않은 초기에는 랜덤한 행동을 선택할 확률을 높게하여 탐색을 더 많이 하고 그 확률을 점점 감소시키고 최선의 행동을 선택할 확률을 증가시켜 탐색 반경을 넓히는 것입니다.

 

단순하게는 전체 에피소드에 대한 현재 에피소드 비율을 빼줌으로써 구현할 수 있습니다.

import numpy as np
import matplotlib.pyplot as plt

num_episode = 100

start_epsilon = 1

def decay_epsilon(episode):
    e = start_epsilon - episode / num_episode
    return e

episode = np.arange(num_episode)
epsilon = decay_epsilon(episode)

plt.plot(epsilon)
plt.title('e-greedy')
plt.xlabel('episode')
plt.ylabel('e')
plt.show()

 

일반적으로 다음과 같이 \epsilon 에 최소값을 지정하는 방식을 많이 사용합니다.

import numpy as np
import matplotlib.pyplot as plt

num_episode = 100

start_epsilon = 1
end_epsilon = 0.1 

def decay_epsilon(episode):
    e = start_epsilon - episode / num_episode
    return e if e > end_epsilon else end_epsilon

epsilon = []
for i in range(num_episode):
    epsilon.append(decay_epsilon(i))

plt.plot(epsilon, 'red')
plt.title('e-greedy')
plt.xlabel('episode')
plt.ylabel('e')
plt.show()

 

 

MC Control

이러한 \epsilon-greedy 를 이용해서 정책을 개선해야 합니다.

 

그전에 DP 의 policy iteration 을 되새겨보면 어떤 정책 \pi 이 주어지고 정책 평가 단계에서 반복을 통해 수렴된 v_{\pi}(s) 를 구하고 정책 개선 단계에서 v_{\pi}(s) 를 이용해서 최선의 행동을 선택하는 greedy 정책을 생성하고 이 과정을 반복하여 최적의 정책 \pi_* 을 구하는 것이었습니다.

 

이것이 가능한 이유는 DP 는 MDP 모델이 완벽히 주어지며 가능한 모든 상태에 대하여 계산하기 때문입니다. 하지만 model-free 환경에서는 MDP 를 모르기 때문에 DP 의 방식과는 차이가 있습니다.

 

먼저 정책 평가 단계에서 Q(s,a) 를 수렴할 때까지 반복하지 않으며 정책 개선 단계에서 바로 greedy 정책을 생성하지 않습니다. 이는 \epsilon-greedy 를 사용하는 것과 관계가 있는 것입니다.

 

\epsilon-greedy 를 포함하는 policy iteration 과정을 보면 \epsilon 값이 1인 정책을 생성하고 이 정책은 랜덤한 행동만을 선택합니다. 또한 이 정책이 만들어내는 궤적을의 샘플을 이용하여 Q(s,a) 를 계산합니다. \epsilon 값이 0.9인 정책은 90% 확률로 무작위 행동과 10% 확률로 이전 정책(\epsilon=1)을 통해 계산된 Q(s,a) 의 최선의 행동을 선택합니다. 이를 반복하며 최종적으로 \epsilon 값이 0이 되면 greedy 정책이며 지금까지 계산된 Q(s,a) 를 이용하여 최선의 행동만을 선택만을 하게 됩니다.

 

 

구현

MC control 을 구현합니다.

import time
import matplotlib.pyplot as plt
import numpy as np
import gym

env = gym.make('FrozenLake-v0', is_slippery=False)

q_table = np.zeros((env.observation_space.n, env.action_space.n))

gamma = 0.99
alpha = 0.1

success = []
num_episode = 2000
for i in range(num_episode):
    memory = []
    
    e = 1 - i / num_episode
    
    state = env.reset()
    done = False
    while not done:
        if np.random.rand() < e:
            action = env.action_space.sample()
        else:
            action = np.argmax(q_table[state])
            
        next_state, reward, done, _ = env.step(action)
        memory.append((state, action, reward))
        state = next_state
    
    success.append(reward)
    
    g = 0
    visted = []
    for s, a, r in reversed(memory):
        if s in visted: # 1-visited
            continue
            
        g = r + gamma*g
        q_table[s, a] = q_table[s, a] + alpha*(g-q_table[s, a])
        visted.append(s)

plt.plot(range(num_episode), success)
plt.show()

print('success rate:', np.mean(success))

 

성공률이 높은 수치는 아니지만 \epsilon 값이 감소함에 따라 성공이 증가하는 것을 볼 수 있습니다.

 

 

TD Control

MC 대신 TD 를 이용할 수 있습니다. TD 는 종료되지 않는 환경에서도 온라인 학습이 가능하다는 장점을 가지고 있습니다.

 

TD 또한 V(s) 대신 Q(s,a) 를 구해야 하며 업데이트 방식에 따라 SARSA  Q-learning 으로 나누어집니다.

 

 

SARSA

SARSA 는 on-policy TD control 알고리즘으로 벨만 기대 방정식을 이용합니다.

 

Q(s,a) 에 대한 벨만 기대 방정식과 업데이트 식은 다음과 같습니다.

 

q_{\pi}(s,a) = \mathbb{E}_{\pi}[r_{t+1}+\gamma q_\pi(S_{t+1},A_{t+1}) | S_t=s, A_t=a]

 

Q(S_t, A_t) \leftarrow Q(S_t, A_t) + \alpha \left[ R+\gamma Q(S_{t+1}, A_{t+1}) - Q(S_t, A_t) \right]

 

 

Q(S_t,A_t) 를 업데이트하기 위해서는 다음 상태 S_{t+1} 와 다음 행동 A_{t+1} 에 대한 정보가 있어야 합니다. A_{t+1} 은 현재의 정책이 다음 상태에 대해 선택한 행동입니다.

 

 

상태 S 에서 행동 A 를 취하면 보상 R 과 다음 상태 S 을 얻고 여기서 또 다음 상태에서 다음 행동 A 을 선택해서 SARSA 라는 이름이 지어진 것입니다.

 

 

SARSA 알고리즘은 다음과 같습니다.

 

 

 

구현

SARSA 알고리즘을 구현합니다.

import matplotlib.pyplot as plt
import numpy as np
import gym

env = gym.make('FrozenLake-v0', is_slippery=False)

num_episode = 2000
gamma = 0.99
alpha = 0.01

q_table = np.zeros((env.observation_space.n, env.action_space.n))
success = []

def get_action(e, state):
    if np.random.rand() < e:
        return env.action_space.sample()
    else:
        return np.argmax(q_table[state])

for i in range(num_episode):
    e = 1. / (i // 100 + 1)

    state = env.reset()
    action = get_action(e, state)

    done = False
    while not done:
        next_state, reward, done, _ = env.step(action)
        
        next_action = get_action(e, next_state)
        
        td_target = reward + gamma * q_table[next_state][next_action]
        q_table[state][action] = q_table[state][action] + alpha * (td_target - q_table[state][action])

        state = next_state
        action = next_action
    
    success.append(reward)

plt.bar(range(num_episode), success, color='red')
plt.show()

print('success rate:', np.mean(success))

 

 

Q-learning

Q-learning 은 off-policy TD control 알고리즘으로 벨만 최적 방정식을 이용합니다.

 

Q(s,a) 에 대한 벨만 최적 방정식과 업데이트 식은 다음과 같습니다.

 

q_*(s,a) = \mathbb{E} \left[ R_{t+1} + \gamma {\underset {a'} {max}} q_*(S_{t+1},a') | S_t=s A_t=a \right]

 

Q(S_t, A_t) \leftarrow Q(S_t, A_t) + \alpha \left[ R_{t+1} + \gamma {\underset {a} {max}}Q(S_{t+1} ,a) - Q(S_t, A_t) \right]

 

 

Q-learning 알고리즘은 다음과 같습니다.

 

 

 

구현

Q-learning 알고리즘을 구현합니다.

import matplotlib.pyplot as plt
import numpy as np
import gym

env = gym.make('FrozenLake-v0', is_slippery=False)

num_episode = 2000
gamma = 0.99
alpha = 0.01

q_table = np.zeros((env.observation_space.n, env.action_space.n))
success = []

def get_action(e, state):
    if np.random.rand() < e:
        return env.action_space.sample()
    else:
        return np.argmax(q_table[state])

for i in range(num_episode):
    e = 1. / (i // 100 + 1)

    state = env.reset()
    done = False
    while not done:
        action = get_action(e, state)
        next_state, reward, done, _ = env.step(action)
        
        td_target = reward + gamma * np.max(q_table[next_state])
        q_table[state][action] = q_table[state][action] + alpha * (td_target - q_table[state][action])
        
        state = next_state
    
    success.append(reward)

plt.plot(range(num_episode), success)
plt.show()

print('success rate:', np.mean(success))

 

 

On-policy vs Off-policy

TD 를 이용하는 제어 방법으로 off-policy TD control 알고리즘인 SARSA 와 on-policy TD control 알고리즘인 Q-learning 을 알아보았습니다.

 

on-policy 와 off-policy 는 어떤 차이점을 갖을까요?

 

on-policy 란 직접 배우는 것으로 해당 정책을 통해서 나온 경험을 이용해서 학습합니다. 또한 하나의 정책을 사용하여 계속해서 개선해 나가는 것이 특징입니다.

 

대표적인 on-policy 알고리즘으로는 SARSA, 1-visited MC, Actor-Critic, PPO 등이 있습니다.

 

 

off-policy 란 간접적으로 배우는 것이라고 할 수 있습니다. 간접적으로 배우기 때문에 행동 정책과 타겟 정책이 다를 수 있으며 다른 정책을 통해서 나온 경험을 이용해서 학습이 가능합니다. 따라서 과거의 경험을 사용할 수 있기 때문에 데이터 효율성이 좋다고 할 수 있습니다.

 

대표적인 off-policy 알고리즘으로는 Q-learning, every-visited MC, DQN, DDPG,  등이 있습니다.

 

 

SARSA vs Q-Learning

SARSA 는 벨만 기대 방정식, Q-learning 은 벨만 최적 방정식을 기반으로 했습니다.

 

q_{\pi}(s,a) = \mathbb{E}_{\pi}[r_{t+1}+\gamma q_\pi(S_{t+1},A_{t+1}) | S_t=s, A_t=a]

 

q_*(s,a) = \mathbb{E} \left[ R_{t+1} + \gamma {\underset {a'} {max}} q_*(S_{t+1},a') | S_t=s A_t=a \right]

 

두 식의 가장 큰 차이라면 정책의 유무라고 할 수 있습니다. 벨만 기대 방정식에서 다음 상태의 q_{\pi} 는 해당 정책을 따르는 행동 가치 함수로 해당 정책 \pi 에 의존적이며 현재 상태와 다음 상태 간의 상관관계(correlation)가 크다고 할 수 있습니다. 따라서 SARSA 를 on-policy 알고리즘이라고 하는 것입니다.

 

반면에 벨만 최적 방정식은 다음 상태의 q_* 에서 최대의 행동 가치를 이용합니다. 이는 정책 \pi 과 관련없으며 해당 환경의 샘플을 이용하여 업데이트를 반복하면 최적 q_* 에 수렴한다는 것입니다. 이 때문에 Q-learning 을 off-policy 알고리즘이라고 합니다.

 

 

다음과 같은 환경을 통해 on-policy 인 SARSA 와 off-policy 인 Q-learning 의 차이를 살펴볼 수 있습니다.

 

절벽으로 이동하면 -100 의 보상을 얻게되며 그 외에는 일괄적으로 -1 의 보상을 얻습니다.

 

 

구현

환경 정보를 불러오고 공통부분을 정의합니다.

# sarsa vs q-learning

from cliff_walking import CliffWalkingEnv
import numpy as np
import matplotlib.pyplot as plt

env = CliffWalkingEnv()

print('act:', env.action_space.n)
print('obs:', env.observation_space.n)

table = {
    'q-learning': np.zeros((env.observation_space.n, env.action_space.n)),
    'sarsa': np.zeros((env.observation_space.n, env.action_space.n))
}

result = {
    'q-learning': [],
    'sarsa': []
}

num_episode = 2000

decay_e = 1 / num_episode

def predict(state):
    if np.random.rand() < e:
        action = env.action_space.sample()
    else:
        action = np.argmax(q_table[state])
    return action

 

Q-learning 알고리즘으로 Q 테이블을 학습합니다.

# q-learning

q_table = table['q-learning']

gamma = 0.99
alpha = 0.1
e = 1

for i in range(num_episode):
    if e > 0.1:
        e -= decay_e
    
    state = env.reset()
    done = False
    while not done:
        action = predict(state)
            
        next_state, reward, done, _ = env.step(action)
        
        td_target = reward + gamma * np.max(q_table[next_state])
        q_table[state, action] += alpha * (td_target - q_table[state, action])
        
        state = next_state
    
#     print('episode:', i, 'reward', reward)
# print(q_table)

 

SARSA 알고리즘으로 Q 테이블을 학습합니다.

# sarsa

q_table = table['sarsa']

gamma = 0.99
alpha = 0.1
e = 1

for i in range(num_episode):
    if e > 0.1:
        e -= decay_e
    
    state = env.reset()
    action = predict(state)
    
    done = False
    while not done:      
        next_state, reward, done, _ = env.step(action)
        next_action = predict(next_state)
        
        td_target = reward + gamma * q_table[next_state, next_action]
        q_table[state, action] += alpha * (td_target - q_table[state, action])
        
        state = next_state
        action = next_action

#     print('episode:', i, 'reward', reward)
# print(q_table)

 

각 알고리즘을 통해 학습된 Q 테이블을 이용해서 플레이합니다.

# play

import time

for i in range(1):
    state = env.reset()
    done = False
    while not done:
        env.render()
        time.sleep(1)
        action = np.argmax(table['sarsa'][state])
#         action = np.argmax(table['q-learning'][state])
        state, reward, done, _ = env.step(action)
    print(state, reward)

 

Q-learning 은 최적의 경로를 통해 클리어하는 것을 볼 수 있습니다.

 

 

반면에 SARSA 는 절벽을 피해가는 경로를 이용합니다.

 

 

 

각 알고리즘의 테이블을 히트맵으로 확인해봅니다.

import matplotlib.pyplot as plt
import seaborn as sns

q_table = np.max(table['q-learning'], axis=1).reshape(4, 12)
sns.heatmap(np.around(q_table, 1), annot=True)
plt.title('Q-Learning', fontsize=20)
plt.show()

q_table = np.max(table['sarsa'], axis=1).reshape(4, 12)
sns.heatmap(np.around(q_table, 1), annot=True)
plt.title('SARSA', fontsize=20)
plt.show()

 

Q-learning 의 테이블은 최적의 경로의 상태들이 높은 가치로 기록되어 있습니다.

 

 

반대로 SARSA 의 테이블은 나쁜 보상을 주는 절벽에 근접한 상태일 수록 낮은 가치로 기록되어 있습니다.

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

DQN(Deep Q-Networks) (1)  (0) 2020.11.20
함수 근사(Function Approximation)  (0) 2020.11.18
소프트맥스 회귀(Softmax Regression) (2)  (0) 2020.11.12
Model-Free Prediction  (0) 2020.11.09
OpenAI Gym  (0) 2020.11.09
  Comments,     Trackbacks
소프트맥스 회귀(Softmax Regression) (2)

 

2020/11/10 - [머신러닝/딥러닝] - 소프트맥스 회귀(Softmax Regression) (1)

 

구현

소프트맥스 회귀 모델을 구현합니다.

 

패션 MNIST 데이터셋을 불러옵니다.

import numpy as np
from tensorflow.keras import datasets

(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

print('data shape:', x_train.shape)

print('target shape:', y_train.shape)
print('target label:', np.unique(y_train, return_counts=True))
data shape: (60000, 28, 28)
target shape: (60000,)
target label: (array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8), array([6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000, 6000],
      dtype=int64))

 

학습 데이터의 20%를 검증 데이터로 분할합니다.

from sklearn.model_selection import train_test_split

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2)

 

데이터는 28x28 의 그레이스케일 이미지입니다.

x_train[0].shape
(28, 28)

 

또한 이미지의 구성값은 픽셀(fixel)이라고 하며 [0, 255] 의 범위를 갖습니다.

import matplotlib.pyplot as plt

plt.imshow(x_train[0])
plt.title('Sample')
plt.colorbar()
plt.show()

 

신경망을 정의합니다.

import numpy as np
import tensorflow as tf

class Model:
    def __init__(self, lr=1e-3):
        tf.reset_default_graph()
        
        with tf.name_scope('input'):
            self.x = tf.placeholder(tf.float32, [None, 28, 28])
            self.y = tf.placeholder(tf.int64)

        with tf.name_scope('preprocessing'):
            x_norm = self.x / 255.0
            y_onehot = tf.one_hot(self.y, 10)
            
        with tf.name_scope('layer'):
            flat = tf.layers.flatten(x_norm)
            fc = tf.layers.dense(flat, 128, tf.nn.relu)
            logits = tf.layers.dense(fc, 10)
            
        with tf.name_scope('output'):
            self.predict = tf.argmax(tf.nn.softmax(logits), 1)

        with tf.name_scope('accuracy'):
            self.accuracy = tf.reduce_mean(tf.cast(tf.equal(tf.to_int64(self.predict), self.y), dtype=tf.float32))    
        
        with tf.name_scope('loss'):
            cross_entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=y_onehot, logits=logits)
            self.loss = tf.reduce_mean(cross_entropy)
        
        with tf.name_scope('optimizer'):
            self.train_op = tf.train.GradientDescentOptimizer(lr).minimize(self.loss)

        with tf.name_scope('summary'):
            self.summary_loss = tf.placeholder(tf.float32)
            self.summary_accuracy = tf.placeholder(tf.float32)
            
            tf.summary.scalar('loss', self.summary_loss)
            tf.summary.scalar('accuracy', self.summary_accuracy)
            
            self.merge = tf.summary.merge_all()

        self.train_writer = tf.summary.FileWriter('./tmp/softmax-regression_fashion_mnist/train', tf.get_default_graph())
        self.val_writer = tf.summary.FileWriter('./tmp/softmax-regression_fashion_mnist/val', tf.get_default_graph())
        
        self.sess = tf.Session()
        
        self.sess.run(tf.global_variables_initializer())
    
    def write_summary(self, tl, ta, vl, va, epoch):
        train_summary = self.sess.run(self.merge, {self.summary_loss: tl, self.summary_accuracy: ta})
        val_summary = self.sess.run(self.merge, {self.summary_loss: vl, self.summary_accuracy: va})
        
        self.train_writer.add_summary(train_summary, epoch)
        self.val_writer.add_summary(val_summary, epoch)
    
    def train(self, x_train, y_train, x_val, y_val, epochs=100, batch_size=32):
        data_size = len(x_train)
        num_iter = data_size // batch_size
        
        for e in range(epochs):
            t_l, t_a = [], []
    
            idx = np.random.permutation(np.arange(data_size))
            _x_train, _y_train = x_train[idx], y_train[idx]
            
            for i in range(0, data_size, batch_size):
                si, ei = i, i + batch_size
                if ei > data_size:
                    ei = data_size
                
                x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]
                
                tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
                t_l.append(tl)
                t_a.append(ta)
                
            vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})
            
            self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)
            
            print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)
    
    def score(self, x, y):
        return self.sess.run(self.accuracy, {self.x: x, self.y: y})

 

입력 데이터는 3차원 배열이며, 샘플은 2차원 배열로 28x28 이미지입니다. 

self.x = tf.placeholder(tf.float32, [None, 28, 28])

 

입력 데이터의 픽셀값의 범위는 [0, 255] 을 갖기 때문에 255로 나누어 정규화합니다.

x_norm = self.x / 255.0

 

타겟 데이터는 정수 인코딩된 값입니다.

self.y = tf.placeholder(tf.int64)

 

원-핫 인코딩합니다.

y_onehot = tf.one_hot(self.y, 10)

 

입력 데이터는 2차원 배열(28x28)로 완전 연결층과 결합을 위해 플래튼층을 통해 1차원 배열(28*28)로 변환합니다.(tf.layers.flatten)

with tf.name_scope('layer'):
    flat = tf.layers.flatten(x_norm)
    fc = tf.layers.dense(flat, 128, tf.nn.relu)
    logits = tf.layers.dense(fc, 10)

 

미니 배치 경사 하강법으로 학습을 수행합니다.

def train(self, x_train, y_train, x_val, y_val, epochs=100, batch_size=32):
    data_size = len(x_train)
    num_iter = data_size // batch_size

    for e in range(epochs):
        t_l, t_a = [], []

        idx = np.random.permutation(np.arange(data_size))
        _x_train, _y_train = x_train[idx], y_train[idx]

        for i in range(0, data_size, batch_size):
            si, ei = i, i + batch_size
            if ei > data_size:
                ei = data_size

            x_batch, y_batch = _x_train[si:ei, :, :], _y_train[si:ei]

            tl, ta, _ = self.sess.run([self.loss, self.accuracy, self.train_op], {self.x: x_batch, self.y: y_batch})
            t_l.append(tl)
            t_a.append(ta)

        vl, va = self.sess.run([self.loss, self.accuracy], {self.x: x_val, self.y: y_val})

        self.write_summary(np.mean(t_l), np.mean(t_a), vl, va, e)

        print('epoch:', e + 1, ' / train_loss:', np.mean(t_l), '/ train_acc:', np.mean(t_a), ' / val_loss:', vl, '/ val_acc:', va)

 

모델을 학습하고 테스트합니다.

model = Model()
model.train(x_train, y_train, x_val, y_val, epochs=1000)
model.score(x_test, y_test)
epoch: 10  / train_loss: 0.5628707 / train_acc: 0.8145625  / val_loss: 0.5578371 / val_acc: 0.8143333
epoch: 20  / train_loss: 0.48990545 / train_acc: 0.83385414  / val_loss: 0.49084783 / val_acc: 0.834
epoch: 30  / train_loss: 0.45731145 / train_acc: 0.84404165  / val_loss: 0.46129102 / val_acc: 0.83958334
epoch: 40  / train_loss: 0.43643084 / train_acc: 0.84952086  / val_loss: 0.44098717 / val_acc: 0.8476667
epoch: 50  / train_loss: 0.4209792 / train_acc: 0.85539585  / val_loss: 0.42816326 / val_acc: 0.851
epoch: 60  / train_loss: 0.4085188 / train_acc: 0.8597917  / val_loss: 0.4188299 / val_acc: 0.85466665
epoch: 70  / train_loss: 0.39790967 / train_acc: 0.8628333  / val_loss: 0.41054684 / val_acc: 0.857
epoch: 80  / train_loss: 0.3887072 / train_acc: 0.8653333  / val_loss: 0.40308595 / val_acc: 0.8591667
epoch: 90  / train_loss: 0.380518 / train_acc: 0.86833334  / val_loss: 0.39647287 / val_acc: 0.85966665
epoch: 100  / train_loss: 0.3732168 / train_acc: 0.8713125  / val_loss: 0.3912227 / val_acc: 0.86216664

...

epoch: 900  / train_loss: 0.1754578 / train_acc: 0.94079167  / val_loss: 0.32061088 / val_acc: 0.88925
epoch: 910  / train_loss: 0.17418073 / train_acc: 0.94147915  / val_loss: 0.32091013 / val_acc: 0.89016664
epoch: 920  / train_loss: 0.17276596 / train_acc: 0.9421458  / val_loss: 0.32165438 / val_acc: 0.88983333
epoch: 930  / train_loss: 0.17167586 / train_acc: 0.94277084  / val_loss: 0.32007438 / val_acc: 0.89016664
epoch: 940  / train_loss: 0.170372 / train_acc: 0.9423958  / val_loss: 0.322622 / val_acc: 0.88975
epoch: 950  / train_loss: 0.1690425 / train_acc: 0.943875  / val_loss: 0.3234346 / val_acc: 0.888
epoch: 960  / train_loss: 0.16783537 / train_acc: 0.9438125  / val_loss: 0.32234168 / val_acc: 0.8890833
epoch: 970  / train_loss: 0.1663011 / train_acc: 0.944125  / val_loss: 0.32178867 / val_acc: 0.89091665
epoch: 980  / train_loss: 0.16476133 / train_acc: 0.94545835  / val_loss: 0.32161164 / val_acc: 0.889
epoch: 990  / train_loss: 0.16388097 / train_acc: 0.94547915  / val_loss: 0.32247958 / val_acc: 0.8889167
epoch: 1000  / train_loss: 0.16279806 / train_acc: 0.9459375  / val_loss: 0.32239875 / val_acc: 0.891

0.8796

 

에포크에 대한 정확도와 손실 함수의 그래프는 다음과 같습니다.(주황: 학습, 파랑: 검증)

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

함수 근사(Function Approximation)  (0) 2020.11.18
Model-Free Control  (0) 2020.11.13
Model-Free Prediction  (0) 2020.11.09
OpenAI Gym  (0) 2020.11.09
동적 프로그래밍(Dynamic Programming)  (0) 2020.11.06
  Comments,     Trackbacks