PPO(Proximal Policy Optimization)
논문 Proximal Policy Optimization Algorithms 으로 소개되었습니다.
액터-크리틱과 같은 on-policy 알고리즘들의 단점은 학습하기 위해서는 해당 정책을 통해 발생된 샘플이 필요하며 한번 사용된 샘플은 폐기하기 때문에 데이터 사용의 효율성이 떨어집니다. 또한 정책 그래디언트 기반의 알고리즘들이 갖을 수 있는 문제는 정책이 큰 폭으로 변하여 학습이 불안정할 수 있다는 것입니다.
PPO 는 확률적 정책을 갖는 정책 그래디언트 알고리즘으로 정책의 변화를 제한하면서 목적 함수를 최대화하고자 하는 아이디어입니다. 이는 TRPO(Trust Region Policy Optimization) 를 기반한 것으로 정책의 변화를 제한하기 때문에 안정적이고 이에 따라 이전 정책과 현재 정책은 근사하여 이전 정책으로부터 발생된 데이터를 재사용할 수 있습니다. 이에 따라 샘플링된 데이터를 이용하여 여러번의 에폭을 반복하여 최적화 과정을 수행한다는 것입니다.
목적 함수
TRPO 논문의 Sample-Based Estimation of the Objective and Constraint 챕터에서 제안한 대체 목적 함수(surrogate objective function)를 개선합니다.
해당 챕터에서는 몬테카를로 시뮬레이션을 이용해 목적 및 제약 함수를 근사하는 방법론을 설명합니다.
maximizeθ^Et[πθ(at|st)πθold(at|st)ˆAt]
subject to ^Et[KL[πθold(⋅|st),πθ(⋅|st)]]≤δ
하지만 이는 구현하기 어려우며 현실적으로 제약 조건 대신 페널티를 사용하는 것을 제안합니다.
maximizeθ^Et[πθ(at|st)πθold(at|st)ˆAt−βKL[πθold(⋅|st),πθ(⋅|st)]]
이것 역시 계수 β 를 선택하기 어렵다는 단점 때문에 개선이 필요합니다.
먼저 두 정책의 확률 비율을 다음과 같이 표기합니다.
rt(θ)=πθ(at|st)πθold(at|st)
목적 함수의 기본 형태는 다음과 같습니다.
LCPI(θ)=ˆE[πθ(at|st)πθold(at|st)ˆAt]=ˆE[rt(θ)ˆAt]
위 식을 그대로 사용한다면 과도한 정책 업데이트가 될 것입니다. 따라서 정책의 변화를 클리핑(clipping)하여 최종적으로 다음과 같은 목적 함수를 제안합니다.
LCLIP(θ)=ˆE[min(rt(θ)ˆAt,clip(rt(θ),1−ϵ,1+ϵ)ˆAt)]
클리핑이란 다음과 같이 범위를 제한하는 것입니다.

위에서 알아본 KL 발산에 페널티를 주는 방식도 대체 목적 함수로 포함합니다.
LKLPEN(θ)=ˆE[πθ(at|st)πθold(at|st)ˆAt−βKL[πθold(⋅|st),πθ(⋅|st)]]
이는 클리핑 방식보다 성능은 낮지만 중요한 기준(important baseline)입니다.
사용할 수 있는 대체 목적 함수(surrogate objective function)는 다음과 같습니다.
No clipping or penalty:Lt(θ)=rt(θ)ˆAtClipping:Lt(θ)=min(rt(θ)ˆAt,clip(rt(θ),1−ϵ,1+ϵ))ˆAtKL penalty (fixed or adaptive):Lt(θ)=rt(θ)ˆAt−βKL[πθold,πθ]
대체 함수에 대한 실험 결과는 클리핑 방식이 가장 우수한 성능을 내었습니다.

GAE(Generalized Advantage Estimator)
PPO 는 기존 어드밴티지 추정 방식에서 확장하는 방법인 GAE 를 도입합니다.
먼저 n-스텝 관계식을 이용하여 어드밴티지 추정값 δV 을 구하면 다음과 같습니다.
ˆA(1)t:=δVt=−V(st)+rt+γV(st+1)ˆA(2)t:=δVt+γδVt+1=−V(st)+rt+γrt+1+γ2V(st+2)ˆA(3)t:=δVt+γδVt+1+γ2δVt+2=−V(st)+rt+γrt+1+γ2rt+2+γ3V(st+3)
ˆA(k)t:=k−1∑l=0γlδVt+l=−V(st)+rt+γrt+1+⋯+γk−1rt+k−1+γkV(st+k)
k 가 커질수록 γkV(st+k) 는 더욱 감쇠되어 −V(st) 에 따른 편향(bias)에 영향을 주지 못할 것입니다. 따라서 k 가 무한으로 커질 때 다음 식을 얻을 수 있습니다.
ˆA(∞)t=∞∑l=0γlδVt+l=−V(st)+∞∑l=0γlrt+l
이는 몬테카를로 방식으로 편향은 없지만 큰 분산(variance)을 갖습니다. 따라서 n-스텝 어드밴티지 추정 방식으로 분산과 편향을 조절할 수 있습니다.
또한 이를 확장하는 GAE 방법은 기하학적 가중치 평균으로 정의합니다.
ˆAGAE(γ,λ)t:=(1−λ)(ˆA(1)t+λˆA(2)t+λ2ˆA(3)t+…)=(1−λ)(δVt+λ(δVt+γδVt+1)+λ2(δVt+δVt+1+γ2δVt+2+…)=(1−λ)(δVt(1+λ+λ2+…)+γδVt+1(λ+λ2+λ3+…)+γ2δVt+2(λ2+λ3+λ4+…)+…)=(1−λ)(δVt(11−λ)+γλVt+1(λ1−λ)+γ2δVt+2(λ21−λ)+…)=∞∑l=0(γλ)lδVt+1
λ=0 과 λ=1 의 경우 다음과 같습니다.
GAE(γ,0):ˆAt:=δt=rt+γV(st+1)−V(st)
GAE(γ,1):ˆAt:=∞∑l=0γlδt+1=∞∑l=0γlrt+1−V(st)
λ=0 일 경우 1-스텝 어드밴티지 추정이며 λ=1 일 경우 몬테카를로 방식의 어드밴티지 추정이 됩니다. 따라서 0<λ<1 의 범위로 지정하며 일반적으로 γ 보다 작은 λ 를 설정했을 때 좋은 성능을 보여주고 있습니다.
알고리즘

구현
알고리즘 테스트 환경은 OpenAI Gym 의 Pendulum-v0 입니다.

Pendulum 는 연속적인 행동 공간을 갖는 문제로 진자를 흔들어서 12시 방향으로 세워서 유지하는 문제입니다.
환경으로부터 받을 수 있는 정보를 출력합니다.
import gym
env = gym.make('Pendulum-v0')
print('obs:', env.observation_space.shape[0])
print('act:', env.action_space.shape[0])
print('act bound:', env.action_space.low[0], env.action_space.high[0])
상태 정보는 3차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동 또한 [-2, 2] 의 범위의 연속적인 공간을 갖습니다.
obs: 3
act: 1
act bound: -2.0 2.0
액터 신경망을 정의합니다.
class Actor:
def __init__(self):
with tf.name_scope('actor'):
with tf.name_scope('input'):
self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
self.adv = tf.placeholder(tf.float32, name='advantage')
self.pi, self.params = self.build_net('pi')
self.old_pi, self.old_params = self.build_net('old_pi', False)
with tf.name_scope('action'):
self.sample = self.pi.sample()
self.action = tf.clip_by_value(self.sample, *A_BOUND)
with tf.name_scope('loss'):
self.ratio = tf.exp(self.pi.log_prob(self.a) - self.old_pi.log_prob(self.a))
self.clip_ratio = tf.clip_by_value(self.ratio, 1-E, 1+E)
# No clipping or penalty
# self.loss = tf.reduce_mean(self.ratio * self.adv)
# KL penalty (fixed or adaptive)
# kl = tf.reduce_mean(tf.distributions.kl_divergence(old_pi, pi))
# self.loss = self.ratio * self.adv - BETA * kl
# Clipping
self.loss = tf.reduce_mean(tf.minimum(self.ratio * self.adv, self.clip_ratio * self.adv))
with tf.name_scope('optimizer'):
self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)
def build_net(self, scope, trainable=True):
with tf.variable_scope(scope):
with tf.variable_scope('layer'):
fc1 = tf.layers.dense(self.s, 256, tf.nn.relu, trainable=trainable)
fc2 = tf.layers.dense(fc1, 128, tf.nn.relu, trainable=trainable)
with tf.variable_scope('output'):
mu = tf.layers.dense(fc2, N_A, tf.nn.tanh)
sigma = tf.layers.dense(fc2, N_A, tf.nn.softplus)
pi = tf.distributions.Normal(mu * A_BOUND[1], sigma)
params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope)
return pi, params
확률 분포에 해당하는 현재 정책과 이전 정책을 구분하여 생성합니다.
self.pi, self.params = self.build_net('pi')
self.old_pi, self.old_params = self.build_net('old_pi', False)
이전 정책에 해당 하는 파라미터는 학습하지 않을 것이므로 trainable=False 로 설정합니다.
현재 정책과 이전 정책의 확률 비율을 계산합니다.
self.ratio = tf.exp(tf.log(pi_probs) - tf.log(old_probs))
위에서 구한 비율을 클리핑합니다.
self.clip_ratio = tf.clip_by_value(self.ratio, 1-E, 1+E)
손실 함수를 정의합니다.
self.loss = tf.reduce_mean(tf.minimum(self.ratio * self.adv, self.clip_ratio * self.adv))
다음 식을 계산하는 것으로 비율과 클리핑된 비율 각각에 어드밴티지를 곱한 것 중 작은 값들에 대한 평균을 구합니다.
LCLIP(θ)=ˆE[min(rt(θ)ˆAt,clip(rt(θ),1−ϵ,1+ϵ)ˆAt)]
마이너스를 붙여 손실값이 최대화가 되는 방향으로 학습하도록 설정합니다.
with tf.name_scope('optimizer'):
self.train_op = tf.train.AdamOptimizer(LR_A).minimize(-self.loss)
크리틱 신경망을 정의합니다.
class Critic:
def __init__(self):
with tf.variable_scope('critic'):
with tf.name_scope('input'):
self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
self.td_target = tf.placeholder(tf.float32, [None, 1], name='td_target')
with tf.variable_scope('layer'):
fc1 = tf.layers.dense(self.s, 256, tf.nn.relu)
fc2 = tf.layers.dense(fc1, 128, tf.nn.relu)
with tf.variable_scope('output'):
self.v = tf.layers.dense(fc2, 1)
with tf.name_scope('td_error'):
self.td_error = self.td_target - self.v
with tf.name_scope('loss'):
self.loss = tf.reduce_mean(tf.square(self.td_error))
with tf.name_scope('optimizer'):
self.train_op = tf.train.AdamOptimizer(LR_C).minimize(self.loss)
크리틱 신경망의 구조는 바닐라 액터-크리틱 구조와 동일합니다.
에이전트와 환경의 상호작용입니다.
agent = PPO()
for i in range(MAX_EPI + 1):
memory = []
state = env.reset()
done = False
while not done:
action = agent.get_action(state)
next_state, reward, done, _ = env.step(action)
if done:
reward = -1
memory.append((state, action, reward, done, next_state))
if done or len(memory) == BATCH_SIZE:
agent.train(memory)
memory = []
state = next_state
매 에피소드마다 메모리를 초기화하고 배치 사이즈만큼 s,a,r,d,s′ 를 메모리에 저장합니다. 에피소드가 종료되거나 메모리 사이즈가 배치 사이즈와 같아지면 업데이트를 진행합니다.
신경망을 업데이트합니다.
def train(self, batch):
# state action reward done next_state
batch = np.array(batch)
v_pred = self.get_v(np.append(batch[:, 0].tolist(), batch[-1, 4][np.newaxis, :], 0))
g = 0
gae = []
td_target = []
for t in reversed(range(len(batch))):
delta = batch[t, 2] + GAMMA * v_pred[t+1] * ~np.array(batch[t, 3]).astype(bool) - v_pred[t]
g = delta + GAMMA * GAE_LAMDA * g
gae.insert(0, g)
td_target.insert(0, v_pred[t] + g)
for _ in range(EPOCH):
self.sess.run(self.critic.train_op, {self.critic.s: batch[:, 0].tolist(),
self.critic.td_target: np.vstack(td_target)})
self.sess.run(self.actor.train_op, {self.actor.s: batch[:, 0].tolist(),
self.actor.a: batch[:, 1],
self.actor.adv: np.vstack(gae)})
self.update_pi()
먼저 계산에 사용하기 위한 상태 가치를 추정합니다.
def get_v(self, s):
v = self.sess.run(self.critic.v, {self.critic.s: s})
return v
...
v_pred = self.get_v(np.append(batch[:, 0].tolist(), batch[-1, 4][np.newaxis, :], 0))
마지막 상태 뒤에 마지막 다음 상태를 추가한 것은 계산의 편의성을 위한 것입니다.
GAE 와 TD 타겟을 계산합니다.
g = 0
gae = []
td_target = []
for t in reversed(range(len(batch))):
delta = batch[t, 2] + GAMMA * v_pred[t+1] * ~np.array(batch[t, 3]).astype(bool) - v_pred[t]
g = delta + GAMMA * GAE_LAMDA * g
gae.insert(0, g)
td_target.insert(0, v_pred[t] + g)
GAE 를 이용한 TD 오차와 타겟 계산은 기존의 액터-크리틱 방식과 차이가 있습니다. TD 오차 계산은 배치 크기만큼 저장된 샘플 데이터를 이용하지만 n-스텝이 아닌 1-스텝으로 계산합니다.
δt=rt+γV(st+1)−V(st)
δt+1=rt+1+γV(st+2)−V(st+1)1-스텝 계산δt+1=rt+γrt+1+γ2V(st+2)−V(st+1)n-스텝 계산
또한 TD 오차를 이용하여 역방향으로 GAE 를 계산합니다.
AGAEt=δt+(γλ)AGAEt+1
이 또한 t 와 t+1 의 재귀적인 관계로 감쇠를 반복하는 계산을 수행합니다.
TD 타겟을 재계산합니다. GAE 를 각 상태별 추정된 가치에 더하여 구할 수 있습니다. 이는 추정된 어드밴티지가 GAE 방식으로 재계산되었기 때문입니다.
yt=AGAEt+V(st)
설정한 에폭만큼 반복하여 신경망을 업데이트합니다.
for _ in range(EPOCH):
self.sess.run(self.critic.train_op, {self.critic.s: batch[:, 0].tolist(),
self.critic.td_target: np.vstack(td_target)})
self.sess.run(self.actor.train_op, {self.actor.s: batch[:, 0].tolist(),
self.actor.a: batch[:, 1],
self.actor.adv: np.vstack(gae)})
신경망 업데이트가 완료되면 현재 정책의 파라미터를 이전 정책의 파라미터로 복사합니다.
self._update_pi = [op.assign(p) for op, p in zip(self.actor.old_params, self.actor.params)]
...
def update_pi(self):
self.sess.run(self._update_pi)
...
self.update_pi()
1000번의 에피소드 동안 학습한 결과입니다.

저장된 모델을 불러와 테스트합니다.
INFO:tensorflow:Restoring parameters from ./tmp/ppo_pendulum/model/model
episode: 0 / reward: -259.1
episode: 1 / reward: -127.0
episode: 2 / reward: -550.9
episode: 3 / reward: -254.3
episode: 4 / reward: -251.1
episode: 5 / reward: -134.4

'머신러닝 > 강화학습' 카테고리의 다른 글
DDPG(Deep Deterministic Policy Gradient) (0) | 2020.11.30 |
---|---|
A3C(Asynchronous Advantage Actor-Critic) (0) | 2020.11.30 |
액터-크리틱(Actor-Critic) (0) | 2020.11.27 |
정책 그래디언트(Policy Gradient) (0) | 2020.11.24 |
DQN(Deep Q-Networks) (2) (0) | 2020.11.24 |