머신러닝/강화학습

PPO(Proximal Policy Optimization)

독학자78 2020. 12. 7. 16:17

PPO(Proximal Policy Optimization)

논문 Proximal Policy Optimization Algorithms 으로 소개되었습니다.

 

액터-크리틱과 같은 on-policy 알고리즘들의 단점은 학습하기 위해서는 해당 정책을 통해 발생된 샘플이 필요하며 한번 사용된 샘플은 폐기하기 때문에 데이터 사용의 효율성이 떨어집니다. 또한 정책 그래디언트 기반의 알고리즘들이 갖을 수 있는 문제는 정책이 큰 폭으로 변하여 학습이 불안정할 수 있다는 것입니다.

 

PPO 는 확률적 정책을 갖는 정책 그래디언트 알고리즘으로 정책의 변화를 제한하면서 목적 함수를 최대화하고자 하는 아이디어입니다. 이는 TRPO(Trust Region Policy Optimization) 를 기반한 것으로 정책의 변화를 제한하기 때문에 안정적이고 이에 따라 이전 정책과 현재 정책은 근사하여 이전 정책으로부터 발생된 데이터를 재사용할 수 있습니다. 이에 따라 샘플링된 데이터를 이용하여 여러번의 에폭을 반복하여 최적화 과정을 수행한다는 것입니다.

 

 

목적 함수

TRPO 논문의 Sample-Based Estimation of the Objective and Constraint 챕터에서 제안한 대체 목적 함수(surrogate objective function)를 개선합니다.

 

해당 챕터에서는 몬테카를로 시뮬레이션을 이용해 목적 및 제약 함수를 근사하는 방법론을 설명합니다.

 

$\underset {\theta} {\text{maximize}} \; \quad \hat {\mathbb{E}_t} \left[ {\pi_{\theta}(a_t | s_t) \over \pi_{\theta_{\text{old}}} ( a_t | s_t )} \hat A_t \right]$

$\text{subject to } \quad \hat {\mathbb{E}_t} [KL[ \pi_{\theta_{\text{old}}} (\cdot|s_t), \pi_{\theta}(\cdot|s_t) ]] \le \delta$

 

하지만 이는 구현하기 어려우며 현실적으로 제약 조건 대신 페널티를 사용하는 것을 제안합니다.

 

$\underset {\theta} {\text{maximize}} \; \quad \hat {\mathbb{E}_t} \left[ {\pi_{\theta}(a_t | s_t) \over \pi_{\theta_{\text{old}}} ( a_t | s_t )} \hat A_t - \beta KL[ \pi_{\theta_{\text{old}}} (\cdot|s_t), \pi_{\theta}(\cdot|s_t) ] \right]$

 

이것 역시 계수 $\beta$ 를 선택하기 어렵다는 단점 때문에 개선이 필요합니다.

 

 

먼저 두 정책의 확률 비율을 다음과 같이 표기합니다.

 

$r_t(\theta) = { \pi_\theta(a_t|s_t) \over \pi_{\theta_{\text{old}}} (a_t|s_t)}$

 

목적 함수의 기본 형태는 다음과 같습니다.

 

$L^{CPI}(\theta) = \hat {\mathbb{E}} [{ \pi_\theta(a_t|s_t) \over \pi_{\theta_{\text{old}}} (a_t|s_t)} \hat A_t ] = \hat {\mathbb{E}} [r_t(\theta) \hat A_t]$

 

위 식을 그대로 사용한다면 과도한 정책 업데이트가 될 것입니다. 따라서 정책의 변화를 클리핑(clipping)하여 최종적으로 다음과 같은 목적 함수를 제안합니다.

 

$L^{CLIP}(\theta) = \hat {\mathbb{E}} [\text{min} (r_t(\theta) \hat A_t, \text{clip} ( r_t(\theta), 1-\epsilon, 1+\epsilon ) \hat A_t )]$

 

클리핑이란 다음과 같이 범위를 제한하는 것입니다.

 

 

위에서 알아본 KL 발산에 페널티를 주는 방식도 대체 목적 함수로 포함합니다.

 

$L^{KLPEN}(\theta) = \hat {\mathbb{E}} \left[{ \pi_\theta(a_t|s_t) \over \pi_{\theta_{old}} (a_t|s_t)} \hat A_t - \beta KL[\pi_{\theta_{old}} (\cdot | s_t), \pi_\theta(\cdot | s_t) ]\right]$

 

이는 클리핑 방식보다 성능은 낮지만 중요한 기준(important baseline)입니다.

 

 

사용할 수 있는 대체 목적 함수(surrogate objective function)는 다음과 같습니다.

 

$\begin{aligned}   

& \text{No clipping or penalty:} && L_t(\theta) = r_t(\theta) \hat A_t \\   

& \text{Clipping:} && L_t(\theta) = \text{min} ( r_t(\theta) \hat A_t, \text{clip} ( r_t(\theta), 1-\epsilon, 1+\epsilon )) \hat A_t \\   

& \text{KL penalty (fixed or adaptive):} && L_t(\theta) = r_t(\theta) \hat A_t - \beta KL[\pi_{\theta_{\text{old}}}, \pi_{\theta}] \\

\end{aligned}$

 

 

대체 함수에 대한 실험 결과는 클리핑 방식이 가장 우수한 성능을 내었습니다.

 

 

 

GAE(Generalized Advantage Estimator)

PPO 는 기존 어드밴티지 추정 방식에서 확장하는 방법인 GAE 를 도입합니다.

 

먼저 n-스텝 관계식을 이용하여 어드밴티지 추정값 $\delta^V$ 을 구하면 다음과 같습니다.

 

\begin{aligned}    
\hat A^{(1)}_t := & \;\delta^V_t \qquad\qquad\qquad\quad\; = -V(s_t) + r_t + \gamma V(s_{t+1}) \\    
\hat A^{(2)}_t := & \;\delta^V_t + \gamma \delta^V_{t+1} \qquad\quad\;\;\; = -V(s_t) + r_t + \gamma r_{t+1} + \gamma^2 V(s_{t+2}) \\    
\hat A^{(3)}_t := & \;\delta^V_t + \gamma \delta^V_{t+1} + \gamma^2 \delta^V_{t+2} = -V(s_t) + r_t + \gamma r_{t+1} + \gamma^2 r_{t+2} + \gamma^3 V(s_{t+3})
\end{aligned}

 

$\hat A^{(k)}_t := \displaystyle \sum_{l=0}^{k-1} \gamma^l \delta^V_{t+l} = -V(s_t) + r_t + \gamma r_{t+1} + \cdots + \gamma^{k-1} r_{t+k-1} + \color {red} {\gamma^k V(s_{t+k})}$

 

$k$ 가 커질수록 $\gamma^k V(s_{t+k})$ 는 더욱 감쇠되어 $-V(s_t)$ 에 따른 편향(bias)에 영향을 주지 못할 것입니다. 따라서 $k$ 가 무한으로 커질 때 다음 식을 얻을 수 있습니다. 

 

$\hat A^{(\infty)}_t = \displaystyle \sum_{l=0}^\infty \gamma^l \delta^V_{t+l} = -V(s_t) + \displaystyle \sum_{l=0}^\infty \gamma^l r_{t+l}$

 

이는 몬테카를로 방식으로 편향은 없지만 큰 분산(variance)을 갖습니다. 따라서 n-스텝 어드밴티지 추정 방식으로 분산과 편향을 조절할 수 있습니다.

 

또한 이를 확장하는 GAE 방법은 기하학적 가중치 평균으로 정의합니다.

 

$\begin{aligned}    

\hat A_t^{GAE(\gamma, \lambda)} := & \;(1-\lambda) \left(\hat A^{(1)}_t + \lambda \hat A^{(2)}_t + \lambda^2 \hat A_t^{(3)} + \dots \right) \\   
= & (1 - \lambda)(\delta^V_t + \lambda(\delta^V_t + \gamma \delta^V_{t+1}) + \lambda^2 ( \delta^V_t + \delta^V_{t+1} + \gamma^2 \delta^V_{t+2} + \dots) \\ 

= & (1 - \lambda)(\delta^V_t (1 + \lambda + \lambda^2 + \dots) + \gamma \delta^V_{t+1}(\lambda + \lambda^2 + \lambda^3 + \dots) \\ 
& \qquad + \gamma^2 \delta^V_{t+2}(\lambda^2 + \lambda^3 + \lambda^4 + \dots) + \dots )  \\
= &  (1 - \lambda) \left( \delta^V_t ({1 \over 1 - \lambda}) + \gamma \lambda^V_{t+1} ({\lambda \over 1 - \lambda}) + \gamma^2 \delta^V_{t+2} ({\lambda^2 \over 1 - \lambda}) + \dots \right) \\ 
= & \displaystyle \sum_{l=0}^{\infty} (\gamma \lambda)^l \delta^V_{t+1}
\end{aligned}$

 

$\lambda = 0$ 과 $\lambda = 1$ 의 경우 다음과 같습니다.

 

$GAE(\gamma, 0) : \quad \hat A_t := \delta_t \qquad\quad\; = r_t + \gamma V(s_{t+1}) - V(s_t)$

$GAE(\gamma, 1) : \quad \hat A_t := \displaystyle \sum_{l=0}^{\infty} \gamma^l \delta_{t+1} = \displaystyle \sum_{l=0}^{\infty} \gamma^l r_{t+1} - V(s_t)$

 

$\lambda = 0$ 일 경우 1-스텝 어드밴티지 추정이며 $\lambda = 1$ 일 경우 몬테카를로 방식의 어드밴티지 추정이 됩니다. 따라서 $0 < \lambda < 1$ 의 범위로 지정하며 일반적으로 $\gamma$ 보다 작은 $\lambda$ 를 설정했을 때 좋은 성능을 보여주고 있습니다.

 

 

알고리즘

 

 

 

구현

알고리즘 테스트 환경은 OpenAI Gym  Pendulum-v0 입니다.

 

 

Pendulum 는 연속적인 행동 공간을 갖는 문제로 진자를 흔들어서 12시 방향으로 세워서 유지하는 문제입니다. 

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('Pendulum-v0')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.shape[0])
print('act bound:', env.action_space.low[0], env.action_space.high[0])

 

상태 정보는 3차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동 또한 [-2, 2] 의 범위의 연속적인 공간을 갖습니다.

obs: 3
act: 1
act bound: -2.0 2.0

 

 

액터 신경망을 정의합니다.

class Actor:
    def __init__(self):
        with tf.name_scope('actor'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
                self.adv = tf.placeholder(tf.float32, name='advantage')

            self.pi, self.params = self.build_net('pi')
            self.old_pi, self.old_params = self.build_net('old_pi', False)
            
            with tf.name_scope('action'):
                self.sample = self.pi.sample()
                self.action = tf.clip_by_value(self.sample, *A_BOUND)
            
            with tf.name_scope('loss'):
                self.ratio = tf.exp(self.pi.log_prob(self.a) - self.old_pi.log_prob(self.a))
                self.clip_ratio = tf.clip_by_value(self.ratio, 1-E, 1+E)

                # No clipping or penalty
    #                 self.loss = tf.reduce_mean(self.ratio * self.adv)

                # KL penalty (fixed or adaptive)
    #                 kl = tf.reduce_mean(tf.distributions.kl_divergence(old_pi, pi))
    #                 self.loss = self.ratio * self.adv - BETA * kl

                # Clipping
                self.loss = tf.reduce_mean(tf.minimum(self.ratio * self.adv, self.clip_ratio * self.adv))

            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)
            
    def build_net(self, scope, trainable=True):
        with tf.variable_scope(scope):
            with tf.variable_scope('layer'):
                fc1 = tf.layers.dense(self.s, 256, tf.nn.relu, trainable=trainable)
                fc2 = tf.layers.dense(fc1, 128, tf.nn.relu, trainable=trainable)
            with tf.variable_scope('output'):
                mu = tf.layers.dense(fc2, N_A, tf.nn.tanh) 
                sigma = tf.layers.dense(fc2, N_A, tf.nn.softplus)
                pi = tf.distributions.Normal(mu * A_BOUND[1], sigma)
        params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope)
        return pi, params

 

확률 분포에 해당하는 현재 정책과 이전 정책을 구분하여 생성합니다.

self.pi, self.params = self.build_net('pi')
self.old_pi, self.old_params = self.build_net('old_pi', False)

 

이전 정책에 해당 하는 파라미터는 학습하지 않을 것이므로 trainable=False 로 설정합니다.

 

현재 정책과 이전 정책의 확률 비율을 계산합니다.

self.ratio = tf.exp(tf.log(pi_probs) - tf.log(old_probs))

 

위에서 구한 비율을 클리핑합니다.

self.clip_ratio = tf.clip_by_value(self.ratio, 1-E, 1+E)

 

손실 함수를 정의합니다.

self.loss = tf.reduce_mean(tf.minimum(self.ratio * self.adv, self.clip_ratio * self.adv))

 

다음 식을 계산하는 것으로 비율과 클리핑된 비율 각각에 어드밴티지를 곱한 것 중 작은 값들에 대한 평균을 구합니다.

 

$L^{CLIP}(\theta) = \hat {\mathbb{E}} [min(r_t(\theta) \hat A_t, clip( r_t(\theta), 1-\epsilon, 1+\epsilon ) \hat A_t )]$

 

 

마이너스를 붙여 손실값이 최대화가 되는 방향으로 학습하도록 설정합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)

 

크리틱 신경망을 정의합니다.

class Critic:
    def __init__(self):
        with tf.variable_scope('critic'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.td_target = tf.placeholder(tf.float32, [None, 1], name='td_target')
            with tf.variable_scope('layer'):
                fc1 = tf.layers.dense(self.s, 256, tf.nn.relu)
                fc2 = tf.layers.dense(fc1, 128, tf.nn.relu)
            with tf.variable_scope('output'):
                self.v = tf.layers.dense(fc2, 1)
            with tf.name_scope('td_error'):
                self.td_error = self.td_target - self.v
            with tf.name_scope('loss'):
                self.loss = tf.reduce_mean(tf.square(self.td_error))
            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_C).minimize(self.loss)

 

크리틱 신경망의 구조는 바닐라 액터-크리틱 구조와 동일합니다.

 

 

에이전트와 환경의 상호작용입니다.

agent = PPO()

for i in range(MAX_EPI + 1):
    memory = []
    
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)

        if done:
            reward = -1
        
        memory.append((state, action, reward, done, next_state))

        if done or len(memory) == BATCH_SIZE:
            agent.train(memory)
            memory = []

        state = next_state

 

매 에피소드마다 메모리를 초기화하고 배치 사이즈만큼 $s, a, r, d, s'$ 를 메모리에 저장합니다. 에피소드가 종료되거나 메모리 사이즈가 배치 사이즈와 같아지면 업데이트를 진행합니다.

 

 

신경망을 업데이트합니다.

def train(self, batch):
    # state action reward done next_state
    batch = np.array(batch)

    v_pred = self.get_v(np.append(batch[:, 0].tolist(), batch[-1, 4][np.newaxis, :], 0))

    g = 0
    gae = []
    td_target = []
    for t in reversed(range(len(batch))):
        delta = batch[t, 2] + GAMMA * v_pred[t+1] * ~np.array(batch[t, 3]).astype(bool) - v_pred[t]
        g = delta + GAMMA * GAE_LAMDA * g
        gae.insert(0, g)
        td_target.insert(0, v_pred[t] + g)

    for _ in range(EPOCH):
        self.sess.run(self.critic.train_op, {self.critic.s: batch[:, 0].tolist(),
                                             self.critic.td_target: np.vstack(td_target)})
        self.sess.run(self.actor.train_op, {self.actor.s: batch[:, 0].tolist(),
                                            self.actor.a: batch[:, 1],
                                            self.actor.adv: np.vstack(gae)})

    self.update_pi()

 

먼저 계산에 사용하기 위한 상태 가치를 추정합니다.

def get_v(self, s):
    v = self.sess.run(self.critic.v, {self.critic.s: s})
    return v

...

v_pred = self.get_v(np.append(batch[:, 0].tolist(), batch[-1, 4][np.newaxis, :], 0))

 

마지막 상태 뒤에 마지막 다음 상태를 추가한 것은 계산의 편의성을 위한 것입니다.

 

 

GAE 와 TD 타겟을 계산합니다.

g = 0
gae = []
td_target = []
for t in reversed(range(len(batch))):
    delta = batch[t, 2] + GAMMA * v_pred[t+1] * ~np.array(batch[t, 3]).astype(bool) - v_pred[t]
    g = delta + GAMMA * GAE_LAMDA * g
    gae.insert(0, g)
    td_target.insert(0, v_pred[t] + g)

 

GAE 를 이용한 TD 오차와 타겟 계산은 기존의 액터-크리틱 방식과 차이가 있습니다. TD 오차 계산은 배치 크기만큼 저장된 샘플 데이터를 이용하지만 n-스텝이 아닌 1-스텝으로 계산합니다.

 

$\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$

 

$\begin{aligned}   

& \delta_{t+1} = r_{t+1} + \gamma \color {red} {V(s_{t+2})} - V(s_{t+1})  & \scriptstyle {\text{1-스텝 계산}} \\
& \delta_{t+1} = r_t + \gamma r_{t+1} + \gamma^2 V(s_{t+2}) - V(s_{t+1}) & \scriptstyle {\text{n-스텝 계산}}
\end{aligned}$ 

 

 

또한 TD 오차를 이용하여 역방향으로 GAE 를 계산합니다.

 

$A^{GAE}_t = \delta_t + (\gamma \lambda)A^{GAE}_{t+1}$

 

이 또한 $t$ 와 $t\!+\!1$ 의 재귀적인 관계로 감쇠를 반복하는 계산을 수행합니다.

 

TD 타겟을 재계산합니다. GAE 를 각 상태별 추정된 가치에 더하여 구할 수 있습니다. 이는 추정된 어드밴티지가 GAE 방식으로 재계산되었기 때문입니다.

 

$y_t = A^{GAE}_t + V(s_t)$

 

 

설정한 에폭만큼 반복하여 신경망을 업데이트합니다.

for _ in range(EPOCH):
    self.sess.run(self.critic.train_op, {self.critic.s: batch[:, 0].tolist(),
                                         self.critic.td_target: np.vstack(td_target)})
    self.sess.run(self.actor.train_op, {self.actor.s: batch[:, 0].tolist(),
                                        self.actor.a: batch[:, 1],
                                        self.actor.adv: np.vstack(gae)})

 

신경망 업데이트가 완료되면 현재 정책의 파라미터를 이전 정책의 파라미터로 복사합니다.

self._update_pi = [op.assign(p) for op, p in zip(self.actor.old_params, self.actor.params)]

...

def update_pi(self):
    self.sess.run(self._update_pi)

...

self.update_pi()

 

 

1000번의 에피소드 동안 학습한 결과입니다.

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/ppo_pendulum/model/model
episode: 0 / reward: -259.1
episode: 1 / reward: -127.0
episode: 2 / reward: -550.9
episode: 3 / reward: -254.3
episode: 4 / reward: -251.1
episode: 5 / reward: -134.4