DDPG(Deep Deterministic Policy Gradient)
논문 Continuous control with deep reinforcement learning 으로 소개되었습니다.
DQN 은 off-policy 알고리즘으로 experience replay, target network 기법을 통해 좋은 성능을 내지만 연속적인 행동 공간(continuous action space)을 갖는 환경에서는 적용할 수 없으며 정책 그래디언트 기반의 액터-크리틱 방법론은 분산(variance)은 감소시켰지만 on-policy 알고리즘이므로 데이터간에 상관관계(correlation)가 크기 때문에 편향(bias)을 갖는 단점이 있습니다.
DDPG 는 확정적 정책(deterministic policy)을 취하는 정책 그래디언트 알고리즘으로 off-policy 이면서 연속적인 행동 공간을 갖는 환경에서도 적용이 가능한 것이 특징입니다. 또한 액터-크리틱 방법론은 액터 신경망에서 평균과 표준 편차를 출력함으로써 가우시안 분포를 갖는 확률 밀도 함수를 추정하였는데 DDPG 는 행동 그 자체를 추정하자는 아이디어인 것입니다.
확정적 정책을 갖는 알고리즘에는 Q 를 학습하는 DQN, SARSA 등이 있습니다. DDPG 또한 Q 를 학습하며 연속적인 행동 공간을 갖는 환경에도 적용할 수 있는 DQN 이라고 할 수 있습니다. 하지만 연속적인 행동 공간에서 Q 를 학습하기 위해서는 정책 그래디언트에 근거한 접근을 이용하기 때문에 정책 그래디언트 기반의 방법론인 것입니다.
DDPG 알고리즘은 DQN 에서 적용하는 기법인 experience replay, target network 와 추가적으로 탐색을 위한 noise 기법을 이용합니다.
타겟 네트워크(Target Network)
일반 신경망과 동일한 구조의 타겟 신경망을 두는 기법입니다. 업데이트 계산을 위한 타겟은 이 타겟 신경망으로부터 구하는데 이와 같이 하는 이유는 동일한 신경망으로부터 타겟과 예측을 구한다면 매번 다른 값이 나올 수 있기 때문입니다.
DQN 은 일정 주기마다 일반 신경망의 파라미터를 타겟 신경망으로 한 번에 복사하여 타겟 신경망을 일정 주기동안 고정하지만 DDPG 는 소프트 업데이트(soft update) 방식으로 매 학습마다 파라미터값을 조금씩 업데이트합니다. 따라서 타겟 신경망의 파라미터값이 고정되지 않고 계속 변한다는 특징이 있습니다.
파라미터 업데이트 식은 다음과 같습니다.
θQ′←τθQ+(1−τ)θQ′ Critic θμ′←τθμ+(1−τ)θμ′ Actor
이를 구현하면 다음과 같습니다.
TAU = 0.01
x = 0.54123
x_target = 0.01
for i in range(1000):
x_target = TAU * x + (1 - TAU) * x_target
print(i, x_target)
0 0.015312300000000001
1 0.020571477
2 0.025778062230000003
3 0.030932581607700002
...
996 0.5412063641292437
997 0.5412066004879513
998 0.5412068344830718
999 0.5412070661382411
OU 노이즈(Ornstein-Uhlenbeck Noise)
연속적인 행동 공간을 갖는 환경에서의 주된 과제는 탐색(exporation)입니다. DDPG 는 확정적 정책으로 탐색을 위한 무작위성이 없기 때문에 OU 노이즈를 이용하여 이를 해결하고자 합니다.
노이즈를 생성하는 식은 다음과 같습니다.
dxt=θ(μ−xt)dt+σdWt
μ 는 평균, θ 는 평균에 회귀하는 것, σ 는 표준 편차, Wt 는 평균이 0이고 분산이 1인 가우시안 화이트 노이즈를 의미합니다.
OU 노이즈를 구현하면 다음과 같습니다.
import matplotlib.pyplot as plt
import numpy as np
class OUNoise:
def __init__(self,action_dimension,mu=0, theta=0.15, sigma=0.3):
self.action_dimension = action_dimension
self.mu = mu
self.theta = theta
self.sigma = sigma
self.state = np.ones(self.action_dimension) * self.mu
self.reset()
def reset(self):
self.state = np.ones(self.action_dimension) * self.mu
def noise(self):
x = self.state
dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(len(x))
self.state = x + dx
return self.state
ou1 = OUNoise(1, sigma=0.4)
ou2 = OUNoise(1, sigma=0.3)
ou3 = OUNoise(1, sigma=0.2)
ou4 = OUNoise(1, sigma=0.1)
multi_ou = OUNoise(3, sigma=0.2)
r = []
for i in range(100):
r.append((ou1.noise(), ou2.noise(), ou3.noise(), ou4.noise(), multi_ou.noise()))
r = np.array(r)
plt.figure(figsize=(12, 6))
plt.plot(r[:, 0], label='sigma=0.4')
plt.plot(r[:, 1], label='sigma=0.3')
plt.plot(r[:, 2], label='sigma=0.2')
plt.plot(r[:, 3], label='sigma=0.1')
plt.title('OU Noise')
plt.ylabel('N')
plt.xlabel('time step')
plt.legend(loc='upper right')
plt.show()
plt.figure(figsize=(12, 6))
plt.plot(r[:, 4].tolist())
plt.title('OU Noise - multiple output')
plt.ylabel('N')
plt.xlabel('time step')
plt.show()

액터 신경망에서 출력된 행동 μ(st|θμt) 에 노이즈 N 를 추가하는 것입니다.
μ′(st)=μ(st|θμt)+N Train μ′(st)=μ(st|θμt) Play
또한 노이즈는 탐색을 위한 것으로 학습시에만 필요한 것입니다.
목적 함수
액터 신경망의 목표는 리턴의 기댓값을 최대로 만드는 최적의 정책을 구하는 것입니다.
따라서 목적 함수는 다음과 같이 리턴의 기댓값인 가치 함수로부터 시작합니다.
J(θ)=Vμθ
증명을 통해 재구성된 목적 함수의 그래디언트 식은 다음과 같습니다.
∇θμJ≈Est∽ρβ[∇θμQ(s,a|θQ)|s=st,a=μ(st|θμ)]=Est∽ρβ[∇aQ(s,a|θQ)|s=st,a=μ(st)∇θμμ(s|θμ)|s=st]
∂Q(s,a|θQ)∂θμ 에서 체인 룰을 이용해 ∂Q(s,a|θQ)∂a∂a∂θμ 로 유도된 것으로 이를 통해 액터 신경망의 파라미터 θμ 를 학습할 수 있는 것입니다.
크리틱 신경망의 손실함수는 TD 타겟과 예측의 오차를 계산합니다.
L(θQ)=Est∽ρβ,at∽β,rt∽E[(Q(st,at|θQ)−yt)2]
TD 타겟을 계산하는 식은 다음과 같으며
yt=r(st,at)+γQ(st+1,μ(st+1|θμ)|θQ
타겟 신경망을 이용한 계산으로 고치면 다음과 같습니다.
yt=r(st,at)+γQ′(st+1,μ′(st+1|θμ′)|θQ′
알고리즘

구현
알고리즘 테스트 환경은 OpenAI Gym 의 BipedalWalker-v2 입니다.

BipedalWalker 은 로봇을 목표지점까지 걷게하는 문제입니다.
환경으로부터 받을 수 있는 정보를 출력합니다.
import gym
env = gym.make('BipedalWalker-v3')
print('obs', env.observation_space.shape[0])
print('act', env.action_space.shape[0])
print('act low', env.action_space.low)
print('act high', env.action_space.high)
상태 정보는 24차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 4차원의 벡터로 각 요소는 [-1, 1] 의 범위의 연속적인 공간을 갖습니다.
obs 24
act 4
act low -1.0
act high 1.0
액터 신경망을 정의합니다.
class Actor:
def __init__(self, scope):
with tf.variable_scope(scope):
with tf.name_scope('input'):
self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
with tf.variable_scope('layer'):
self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
self.fc2 = tf.layers.dense(self.fc1, 256, tf.nn.relu)
with tf.name_scope('out'):
self.a = tf.layers.dense(self.fc2, N_A, tf.nn.tanh) * A_BOUND[1]
self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)
액터 신경망은 확정적 정책으로 상태를 입력으로 받으면 바로 행동을 출력합니다.
self.a = tf.layers.dense(self.fc2, N_A, tf.nn.tanh) * A_BOUND[1]
...
def get_action(self, s):
action = self.sess.run(self.actor.a, {self.actor.s: s[np.newaxis, :]})[0]
return action
출력으로 tanh 함수를 사용하여 출력의 범위를 [-1, 1] 로 하는 것입니다. 만약 환경이 더 큰 범위를 갖는다면 뒤에 상수를 곱함으로써 범위를 조정할 수 있습니다.
크리틱 신경망을 정의합니다.
class Critic:
def __init__(self, scope):
with tf.variable_scope(scope):
with tf.name_scope('input'):
self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
with tf.variable_scope('layer'):
self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
concat = tf.concat([self.a, self.fc1], axis=-1)
self.fc2 = tf.layers.dense(concat, 256, tf.nn.relu)
with tf.name_scope('out'):
self.q = tf.layers.dense(self.fc2, 1)
self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)
액터 신경망으로부터 전달받은 행동층과 결합하여 최종적으로 Q 를 출력합니다.
with tf.name_scope('input'):
self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
with tf.variable_scope('layer'):
self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
concat = tf.concat([self.a, self.fc1], axis=-1)
self.fc2 = tf.layers.dense(concat, 256, tf.nn.relu)
with tf.name_scope('out'):
self.q = tf.layers.dense(self.fc2, 1)
일반 신경망과 타겟 신경망을 생성합니다.
class DDPG:
def __init__(self, load=False):
self.actor = Actor('actor/eval')
self.critic = Critic('critic/eval')
self.target_actor = Actor('actor/target')
self.target_critic = Critic('critic/target')
TD 타겟을 입력으로 받습니다.
with tf.name_scope('input'):
self.td_target = tf.placeholder(tf.float32, [None, 1])
크리틱 신경망의 손실 함수를 정의합니다.
with tf.name_scope('c_loss'):
self.c_loss = tf.losses.mean_squared_error(self.td_target, self.critic.q)
Q 타겟과 추정 Q 의 평균 제곱 오차(mean squared error, MSE)를 계산합니다.
손실값을 최소화하는 방향으로 학습하도록 설정합니다.
with tf.name_scope('c_optimizer'):
self.c_train_op = tf.train.AdamOptimizer(LR_C).minimize(self.c_loss)
그래디언트 계산을 정의합니다.
with tf.name_scope('grads'):
self.q_grads = tf.gradients(self.critic.q, self.critic.a)
self.a_grads = tf.gradients(-self.actor.a, self.actor.params, self.q_grads)
목적함수 그래디언트 식은 다음과 같습니다.
∇θμJ≈Est∽ρβ[∇aQ(s,a|θQ)|s=st,a=μ(st)∇θμμ(s|θμ)|s=st]
즉 ∂Q(s,a|θQ)∂a∂a∂θμ 를 계산하는 것으로 크리틱 신경망의 파라미터 θQ 에 의한 행동에 대한 Q 에 대한 그래디언트 ∂Q(s,a|θQ)∂a 를 계산하고 이는 최종적으로 계산하고자 하는 ∂a∂θμ 그라디언트 계산에 초기값으로 곱해지는 것입니다. 또한 최대화를 해야하므로 마이너스 부호를 붙입니다.(tf.gradients)
그래디언트와 적용할 파라미터를 설정합니다.(tf.train.Optimizer)
with tf.name_scope('a_optimizer'):
self.a_train_op = tf.train.AdamOptimizer(LR_A).apply_gradients(zip(self.a_grads, self.actor.params))
크리틱 신경망에서 행동에 대한 Q의 그래디언트를 계산하고 이를 통해 액터 신경망의 파라미터를 업데이트하는 것입니다.
타겟 신경망에 대한 소프트 업데이트에 대한 부분입니다.
self.update_op = [tf.assign(t_p, TAU * t_p + (1 - TAU) * e_p) for t_p, e_p in zip(self.target_actor.params + self.target_critic.params, self.actor.params + self.critic.params)]
...
def update_target(self):
self.sess.run(self.update_op)
환경이 갖는 행동 크기만큼 OU 노이즈 객체를 생성합니다.
N_A = env.action_space.shape[0]
...
self.ou = OUNoise(N_A)
이는 각 노이즈가 다른 궤적을 갖는 것을 의미합니다.

에이전트와 환경의 상호작용입니다.
agent = DDPG()
for i in range(MAX_EPI + 1):
agent.ou.reset()
state = env.reset()
done = False
while not done:
action = agent.get_action(state)
noise = agent.ou.noise()
action = np.clip(action + noise, *A_BOUND)
next_state, reward, done, _ = env.step(action)
agent.replay_memory.append((state, action, reward, done, next_state))
if len(agent.replay_memory) >= MAX_MEM * 0.1:
agent.train()
state = next_state
매 에피소드마다 OU 노이즈를 초기화하여 노이즈의 무작위성을 보장합니다.
agent.ou.reset()
액터 신경망으로부터 출력된 행동에 노이즈를 추가하고 행동 범위를 클리핑합니다.
action = agent.get_action(state)
noise = agent.ou.noise()
action = np.clip(action + noise, *A_BOUND)
이를 통해 확정적 정책에 대해 탐색 문제를 해소합니다.
μ′(st)=μ(st|θμt)+N
환경으로부터 받은 정보를 리플레이 메모리에 저장하고 일정 크기부터 업데이트를 진행합니다.
MAX_MEM = 50000
...
next_state, reward, done, _ = env.step(action)
agent.replay_memory.append((state, action, reward, done, next_state))
if len(agent.replay_memory) >= MAX_MEM * 0.1:
agent.train()
신경망 업데이트에 대한 부분입니다.
def train(self):
minibatch = random.sample(self.replay_memory, BATCH_SIZE)
minibatch = np.array(minibatch)
a_ = self.sess.run(self.target_actor.a, {self.target_actor.s: minibatch[:, 4].tolist()})
target_q = self.sess.run(self.target_critic.q, {self.target_critic.s: minibatch[:, 4].tolist(),
self.target_critic.a: a_})
td_target = minibatch[:, 2] + GAMMA * target_q.squeeze() * ~minibatch[:, 3].astype(np.bool)
self.sess.run(self.c_train_op, {self.critic.s: minibatch[:, 0].tolist(),
self.critic.a: minibatch[:, 1].tolist(),
self.td_target: td_target[:, np.newaxis]})
self.sess.run(self.a_train_op, {self.actor.s: minibatch[:, 0].tolist(),
self.critic.s: minibatch[:, 0].tolist(),
self.critic.a: minibatch[:, 1].tolist()})
self.update_target()
리플레이 메모리에 저장된 샘플을 배치 사이즈만큼 랜덤하게 추출합니다.
minibatch = random.sample(self.replay_memory, BATCH_SIZE)
minibatch = np.array(minibatch)
타겟 신경망으로부터 다음 상태에 대한 Q′(si+1,μ′(si+1|θμ′) 를 구합니다.
a_ = self.sess.run(self.target_actor.a, {self.target_actor.s: minibatch[:, 4].tolist()})
target_q = self.sess.run(self.target_critic.q, {self.target_critic.s: minibatch[:, 4].tolist(), self.target_critic.a: a_})
TD 타겟인 ri+γQ′(si+1,μ′(si+1|θμ′)|θQ′) 을 계산합니다.
q_target = minibatch[:, 2] + GAMMA * target_q.squeeze() * ~minibatch[:, 3].astype(np.bool)
크리틱 신경망과 액터 신경망을 각각 업데이트합니다.
self.sess.run(self.c_train_op, {self.critic.s: minibatch[:, 0].tolist(),
self.critic.a: minibatch[:, 1].tolist(),
self.q_target: q_target[:, np.newaxis]})
self.sess.run(self.a_train_op, {self.actor.s: minibatch[:, 0].tolist(),
self.critic.s: minibatch[:, 0].tolist(),
self.critic.a: minibatch[:, 1].tolist()})
크리틱 신경망의 파라미터는 타겟과 예측에 대한 오차를 최소화하는 방향으로 업데이트하며 액터 신경망의 파라미터는 크리틱 신경망으로부터 계산된 Q 그래디언트를 이용하여 목적 함수가 최대가 되도록 행동을 수정하는 방향으로 업데이트합니다.
일반 신경망의 파라미터를 타겟 신경망의 파라미터로 복제합니다.
self.update_target()
1000번의 에피소드 동안 학습한 결과입니다.

저장된 모델을 불러와 테스트합니다.
INFO:tensorflow:Restoring parameters from ./tmp/ddpg_bipedalwalker/model/model
episode: 0 reward: 190.82911344149113
episode: 1 reward: 32.018837167782266
episode: 2 reward: 66.25328426062981
episode: 3 reward: 186.63262380705766

'머신러닝 > 강화학습' 카테고리의 다른 글
PPO(Proximal Policy Optimization) (0) | 2020.12.07 |
---|---|
A3C(Asynchronous Advantage Actor-Critic) (0) | 2020.11.30 |
액터-크리틱(Actor-Critic) (0) | 2020.11.27 |
정책 그래디언트(Policy Gradient) (0) | 2020.11.24 |
DQN(Deep Q-Networks) (2) (0) | 2020.11.24 |