Loading [MathJax]/jax/output/CommonHTML/jax.js
독학 연구소
공부한 내용을 정리합니다.
PPO(Proximal Policy Optimization)

PPO(Proximal Policy Optimization)

논문 Proximal Policy Optimization Algorithms 으로 소개되었습니다.

 

액터-크리틱과 같은 on-policy 알고리즘들의 단점은 학습하기 위해서는 해당 정책을 통해 발생된 샘플이 필요하며 한번 사용된 샘플은 폐기하기 때문에 데이터 사용의 효율성이 떨어집니다. 또한 정책 그래디언트 기반의 알고리즘들이 갖을 수 있는 문제는 정책이 큰 폭으로 변하여 학습이 불안정할 수 있다는 것입니다.

 

PPO 는 확률적 정책을 갖는 정책 그래디언트 알고리즘으로 정책의 변화를 제한하면서 목적 함수를 최대화하고자 하는 아이디어입니다. 이는 TRPO(Trust Region Policy Optimization) 를 기반한 것으로 정책의 변화를 제한하기 때문에 안정적이고 이에 따라 이전 정책과 현재 정책은 근사하여 이전 정책으로부터 발생된 데이터를 재사용할 수 있습니다. 이에 따라 샘플링된 데이터를 이용하여 여러번의 에폭을 반복하여 최적화 과정을 수행한다는 것입니다.

 

 

목적 함수

TRPO 논문의 Sample-Based Estimation of the Objective and Constraint 챕터에서 제안한 대체 목적 함수(surrogate objective function)를 개선합니다.

 

해당 챕터에서는 몬테카를로 시뮬레이션을 이용해 목적 및 제약 함수를 근사하는 방법론을 설명합니다.

 

maximizeθ^Et[πθ(at|st)πθold(at|st)ˆAt]

subject to ^Et[KL[πθold(|st),πθ(|st)]]δ

 

하지만 이는 구현하기 어려우며 현실적으로 제약 조건 대신 페널티를 사용하는 것을 제안합니다.

 

maximizeθ^Et[πθ(at|st)πθold(at|st)ˆAtβKL[πθold(|st),πθ(|st)]]

 

이것 역시 계수 β 를 선택하기 어렵다는 단점 때문에 개선이 필요합니다.

 

 

먼저 두 정책의 확률 비율을 다음과 같이 표기합니다.

 

rt(θ)=πθ(at|st)πθold(at|st)

 

목적 함수의 기본 형태는 다음과 같습니다.

 

LCPI(θ)=ˆE[πθ(at|st)πθold(at|st)ˆAt]=ˆE[rt(θ)ˆAt]

 

위 식을 그대로 사용한다면 과도한 정책 업데이트가 될 것입니다. 따라서 정책의 변화를 클리핑(clipping)하여 최종적으로 다음과 같은 목적 함수를 제안합니다.

 

LCLIP(θ)=ˆE[min(rt(θ)ˆAt,clip(rt(θ),1ϵ,1+ϵ)ˆAt)]

 

클리핑이란 다음과 같이 범위를 제한하는 것입니다.

 

 

위에서 알아본 KL 발산에 페널티를 주는 방식도 대체 목적 함수로 포함합니다.

 

LKLPEN(θ)=ˆE[πθ(at|st)πθold(at|st)ˆAtβKL[πθold(|st),πθ(|st)]]

 

이는 클리핑 방식보다 성능은 낮지만 중요한 기준(important baseline)입니다.

 

 

사용할 수 있는 대체 목적 함수(surrogate objective function)는 다음과 같습니다.

 

No clipping or penalty:Lt(θ)=rt(θ)ˆAtClipping:Lt(θ)=min(rt(θ)ˆAt,clip(rt(θ),1ϵ,1+ϵ))ˆAtKL penalty (fixed or adaptive):Lt(θ)=rt(θ)ˆAtβKL[πθold,πθ]

 

 

대체 함수에 대한 실험 결과는 클리핑 방식이 가장 우수한 성능을 내었습니다.

 

 

 

GAE(Generalized Advantage Estimator)

PPO 는 기존 어드밴티지 추정 방식에서 확장하는 방법인 GAE 를 도입합니다.

 

먼저 n-스텝 관계식을 이용하여 어드밴티지 추정값 δV 을 구하면 다음과 같습니다.

 

ˆA(1)t:=δVt=V(st)+rt+γV(st+1)ˆA(2)t:=δVt+γδVt+1=V(st)+rt+γrt+1+γ2V(st+2)ˆA(3)t:=δVt+γδVt+1+γ2δVt+2=V(st)+rt+γrt+1+γ2rt+2+γ3V(st+3)

 

ˆA(k)t:=k1l=0γlδVt+l=V(st)+rt+γrt+1++γk1rt+k1+γkV(st+k)

 

k 가 커질수록 γkV(st+k) 는 더욱 감쇠되어 V(st) 에 따른 편향(bias)에 영향을 주지 못할 것입니다. 따라서 k 가 무한으로 커질 때 다음 식을 얻을 수 있습니다. 

 

ˆA()t=l=0γlδVt+l=V(st)+l=0γlrt+l

 

이는 몬테카를로 방식으로 편향은 없지만 큰 분산(variance)을 갖습니다. 따라서 n-스텝 어드밴티지 추정 방식으로 분산과 편향을 조절할 수 있습니다.

 

또한 이를 확장하는 GAE 방법은 기하학적 가중치 평균으로 정의합니다.

 

ˆAGAE(γ,λ)t:=(1λ)(ˆA(1)t+λˆA(2)t+λ2ˆA(3)t+)=(1λ)(δVt+λ(δVt+γδVt+1)+λ2(δVt+δVt+1+γ2δVt+2+)=(1λ)(δVt(1+λ+λ2+)+γδVt+1(λ+λ2+λ3+)+γ2δVt+2(λ2+λ3+λ4+)+)=(1λ)(δVt(11λ)+γλVt+1(λ1λ)+γ2δVt+2(λ21λ)+)=l=0(γλ)lδVt+1

 

λ=0λ=1 의 경우 다음과 같습니다.

 

GAE(γ,0):ˆAt:=δt=rt+γV(st+1)V(st)

GAE(γ,1):ˆAt:=l=0γlδt+1=l=0γlrt+1V(st)

 

λ=0 일 경우 1-스텝 어드밴티지 추정이며 λ=1 일 경우 몬테카를로 방식의 어드밴티지 추정이 됩니다. 따라서 0<λ<1 의 범위로 지정하며 일반적으로 γ 보다 작은 λ 를 설정했을 때 좋은 성능을 보여주고 있습니다.

 

 

알고리즘

 

 

 

구현

알고리즘 테스트 환경은 OpenAI Gym  Pendulum-v0 입니다.

 

 

Pendulum 는 연속적인 행동 공간을 갖는 문제로 진자를 흔들어서 12시 방향으로 세워서 유지하는 문제입니다. 

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('Pendulum-v0')

print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.shape[0])
print('act bound:', env.action_space.low[0], env.action_space.high[0])

 

상태 정보는 3차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동 또한 [-2, 2] 의 범위의 연속적인 공간을 갖습니다.

obs: 3
act: 1
act bound: -2.0 2.0

 

 

액터 신경망을 정의합니다.

class Actor:
    def __init__(self):
        with tf.name_scope('actor'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
                self.adv = tf.placeholder(tf.float32, name='advantage')

            self.pi, self.params = self.build_net('pi')
            self.old_pi, self.old_params = self.build_net('old_pi', False)
            
            with tf.name_scope('action'):
                self.sample = self.pi.sample()
                self.action = tf.clip_by_value(self.sample, *A_BOUND)
            
            with tf.name_scope('loss'):
                self.ratio = tf.exp(self.pi.log_prob(self.a) - self.old_pi.log_prob(self.a))
                self.clip_ratio = tf.clip_by_value(self.ratio, 1-E, 1+E)

                # No clipping or penalty
    #                 self.loss = tf.reduce_mean(self.ratio * self.adv)

                # KL penalty (fixed or adaptive)
    #                 kl = tf.reduce_mean(tf.distributions.kl_divergence(old_pi, pi))
    #                 self.loss = self.ratio * self.adv - BETA * kl

                # Clipping
                self.loss = tf.reduce_mean(tf.minimum(self.ratio * self.adv, self.clip_ratio * self.adv))

            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)
            
    def build_net(self, scope, trainable=True):
        with tf.variable_scope(scope):
            with tf.variable_scope('layer'):
                fc1 = tf.layers.dense(self.s, 256, tf.nn.relu, trainable=trainable)
                fc2 = tf.layers.dense(fc1, 128, tf.nn.relu, trainable=trainable)
            with tf.variable_scope('output'):
                mu = tf.layers.dense(fc2, N_A, tf.nn.tanh) 
                sigma = tf.layers.dense(fc2, N_A, tf.nn.softplus)
                pi = tf.distributions.Normal(mu * A_BOUND[1], sigma)
        params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope)
        return pi, params

 

확률 분포에 해당하는 현재 정책과 이전 정책을 구분하여 생성합니다.

self.pi, self.params = self.build_net('pi')
self.old_pi, self.old_params = self.build_net('old_pi', False)

 

이전 정책에 해당 하는 파라미터는 학습하지 않을 것이므로 trainable=False 로 설정합니다.

 

현재 정책과 이전 정책의 확률 비율을 계산합니다.

self.ratio = tf.exp(tf.log(pi_probs) - tf.log(old_probs))

 

위에서 구한 비율을 클리핑합니다.

self.clip_ratio = tf.clip_by_value(self.ratio, 1-E, 1+E)

 

손실 함수를 정의합니다.

self.loss = tf.reduce_mean(tf.minimum(self.ratio * self.adv, self.clip_ratio * self.adv))

 

다음 식을 계산하는 것으로 비율과 클리핑된 비율 각각에 어드밴티지를 곱한 것 중 작은 값들에 대한 평균을 구합니다.

 

LCLIP(θ)=ˆE[min(rt(θ)ˆAt,clip(rt(θ),1ϵ,1+ϵ)ˆAt)]

 

 

마이너스를 붙여 손실값이 최대화가 되는 방향으로 학습하도록 설정합니다.

with tf.name_scope('optimizer'):
    self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)

 

크리틱 신경망을 정의합니다.

class Critic:
    def __init__(self):
        with tf.variable_scope('critic'):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.td_target = tf.placeholder(tf.float32, [None, 1], name='td_target')
            with tf.variable_scope('layer'):
                fc1 = tf.layers.dense(self.s, 256, tf.nn.relu)
                fc2 = tf.layers.dense(fc1, 128, tf.nn.relu)
            with tf.variable_scope('output'):
                self.v = tf.layers.dense(fc2, 1)
            with tf.name_scope('td_error'):
                self.td_error = self.td_target - self.v
            with tf.name_scope('loss'):
                self.loss = tf.reduce_mean(tf.square(self.td_error))
            with tf.name_scope('optimizer'):
                self.train_op = tf.train.AdamOptimizer(LR_C).minimize(self.loss)

 

크리틱 신경망의 구조는 바닐라 액터-크리틱 구조와 동일합니다.

 

 

에이전트와 환경의 상호작용입니다.

agent = PPO()

for i in range(MAX_EPI + 1):
    memory = []
    
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        next_state, reward, done, _ = env.step(action)

        if done:
            reward = -1
        
        memory.append((state, action, reward, done, next_state))

        if done or len(memory) == BATCH_SIZE:
            agent.train(memory)
            memory = []

        state = next_state

 

매 에피소드마다 메모리를 초기화하고 배치 사이즈만큼 s,a,r,d,s 를 메모리에 저장합니다. 에피소드가 종료되거나 메모리 사이즈가 배치 사이즈와 같아지면 업데이트를 진행합니다.

 

 

신경망을 업데이트합니다.

def train(self, batch):
    # state action reward done next_state
    batch = np.array(batch)

    v_pred = self.get_v(np.append(batch[:, 0].tolist(), batch[-1, 4][np.newaxis, :], 0))

    g = 0
    gae = []
    td_target = []
    for t in reversed(range(len(batch))):
        delta = batch[t, 2] + GAMMA * v_pred[t+1] * ~np.array(batch[t, 3]).astype(bool) - v_pred[t]
        g = delta + GAMMA * GAE_LAMDA * g
        gae.insert(0, g)
        td_target.insert(0, v_pred[t] + g)

    for _ in range(EPOCH):
        self.sess.run(self.critic.train_op, {self.critic.s: batch[:, 0].tolist(),
                                             self.critic.td_target: np.vstack(td_target)})
        self.sess.run(self.actor.train_op, {self.actor.s: batch[:, 0].tolist(),
                                            self.actor.a: batch[:, 1],
                                            self.actor.adv: np.vstack(gae)})

    self.update_pi()

 

먼저 계산에 사용하기 위한 상태 가치를 추정합니다.

def get_v(self, s):
    v = self.sess.run(self.critic.v, {self.critic.s: s})
    return v

...

v_pred = self.get_v(np.append(batch[:, 0].tolist(), batch[-1, 4][np.newaxis, :], 0))

 

마지막 상태 뒤에 마지막 다음 상태를 추가한 것은 계산의 편의성을 위한 것입니다.

 

 

GAE 와 TD 타겟을 계산합니다.

g = 0
gae = []
td_target = []
for t in reversed(range(len(batch))):
    delta = batch[t, 2] + GAMMA * v_pred[t+1] * ~np.array(batch[t, 3]).astype(bool) - v_pred[t]
    g = delta + GAMMA * GAE_LAMDA * g
    gae.insert(0, g)
    td_target.insert(0, v_pred[t] + g)

 

GAE 를 이용한 TD 오차와 타겟 계산은 기존의 액터-크리틱 방식과 차이가 있습니다. TD 오차 계산은 배치 크기만큼 저장된 샘플 데이터를 이용하지만 n-스텝이 아닌 1-스텝으로 계산합니다.

 

δt=rt+γV(st+1)V(st)

 

δt+1=rt+1+γV(st+2)V(st+1)1-스텝 계산δt+1=rt+γrt+1+γ2V(st+2)V(st+1)n-스텝 계산 

 

 

또한 TD 오차를 이용하여 역방향으로 GAE 를 계산합니다.

 

AGAEt=δt+(γλ)AGAEt+1

 

이 또한 tt+1 의 재귀적인 관계로 감쇠를 반복하는 계산을 수행합니다.

 

TD 타겟을 재계산합니다. GAE 를 각 상태별 추정된 가치에 더하여 구할 수 있습니다. 이는 추정된 어드밴티지가 GAE 방식으로 재계산되었기 때문입니다.

 

yt=AGAEt+V(st)

 

 

설정한 에폭만큼 반복하여 신경망을 업데이트합니다.

for _ in range(EPOCH):
    self.sess.run(self.critic.train_op, {self.critic.s: batch[:, 0].tolist(),
                                         self.critic.td_target: np.vstack(td_target)})
    self.sess.run(self.actor.train_op, {self.actor.s: batch[:, 0].tolist(),
                                        self.actor.a: batch[:, 1],
                                        self.actor.adv: np.vstack(gae)})

 

신경망 업데이트가 완료되면 현재 정책의 파라미터를 이전 정책의 파라미터로 복사합니다.

self._update_pi = [op.assign(p) for op, p in zip(self.actor.old_params, self.actor.params)]

...

def update_pi(self):
    self.sess.run(self._update_pi)

...

self.update_pi()

 

 

1000번의 에피소드 동안 학습한 결과입니다.

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/ppo_pendulum/model/model
episode: 0 / reward: -259.1
episode: 1 / reward: -127.0
episode: 2 / reward: -550.9
episode: 3 / reward: -254.3
episode: 4 / reward: -251.1
episode: 5 / reward: -134.4

 

 

 

  Comments,     Trackbacks