DDPG(Deep Deterministic Policy Gradient)
논문 Continuous control with deep reinforcement learning 으로 소개되었습니다.
DQN 은 off-policy 알고리즘으로 experience replay, target network 기법을 통해 좋은 성능을 내지만 연속적인 행동 공간(continuous action space)을 갖는 환경에서는 적용할 수 없으며 정책 그래디언트 기반의 액터-크리틱 방법론은 분산(variance)은 감소시켰지만 on-policy 알고리즘이므로 데이터간에 상관관계(correlation)가 크기 때문에 편향(bias)을 갖는 단점이 있습니다.
DDPG 는 확정적 정책(deterministic policy)을 취하는 정책 그래디언트 알고리즘으로 off-policy 이면서 연속적인 행동 공간을 갖는 환경에서도 적용이 가능한 것이 특징입니다. 또한 액터-크리틱 방법론은 액터 신경망에서 평균과 표준 편차를 출력함으로써 가우시안 분포를 갖는 확률 밀도 함수를 추정하였는데 DDPG 는 행동 그 자체를 추정하자는 아이디어인 것입니다.
확정적 정책을 갖는 알고리즘에는 $Q$ 를 학습하는 DQN, SARSA 등이 있습니다. DDPG 또한 $Q$ 를 학습하며 연속적인 행동 공간을 갖는 환경에도 적용할 수 있는 DQN 이라고 할 수 있습니다. 하지만 연속적인 행동 공간에서 $Q$ 를 학습하기 위해서는 정책 그래디언트에 근거한 접근을 이용하기 때문에 정책 그래디언트 기반의 방법론인 것입니다.
DDPG 알고리즘은 DQN 에서 적용하는 기법인 experience replay, target network 와 추가적으로 탐색을 위한 noise 기법을 이용합니다.
타겟 네트워크(Target Network)
일반 신경망과 동일한 구조의 타겟 신경망을 두는 기법입니다. 업데이트 계산을 위한 타겟은 이 타겟 신경망으로부터 구하는데 이와 같이 하는 이유는 동일한 신경망으로부터 타겟과 예측을 구한다면 매번 다른 값이 나올 수 있기 때문입니다.
DQN 은 일정 주기마다 일반 신경망의 파라미터를 타겟 신경망으로 한 번에 복사하여 타겟 신경망을 일정 주기동안 고정하지만 DDPG 는 소프트 업데이트(soft update) 방식으로 매 학습마다 파라미터값을 조금씩 업데이트합니다. 따라서 타겟 신경망의 파라미터값이 고정되지 않고 계속 변한다는 특징이 있습니다.
파라미터 업데이트 식은 다음과 같습니다.
$\begin{aligned}
& \theta^{Q'} \leftarrow \tau \theta^Q + (1 - \tau)\theta^{Q'} & \scriptstyle{ \text{ Critic } } \\
& \theta^{\mu'} \leftarrow \tau \theta^\mu + (1 - \tau)\theta^{\mu'} & \scriptstyle{ \text{ Actor } }
\end{aligned}$
이를 구현하면 다음과 같습니다.
TAU = 0.01
x = 0.54123
x_target = 0.01
for i in range(1000):
x_target = TAU * x + (1 - TAU) * x_target
print(i, x_target)
0 0.015312300000000001
1 0.020571477
2 0.025778062230000003
3 0.030932581607700002
...
996 0.5412063641292437
997 0.5412066004879513
998 0.5412068344830718
999 0.5412070661382411
OU 노이즈(Ornstein-Uhlenbeck Noise)
연속적인 행동 공간을 갖는 환경에서의 주된 과제는 탐색(exporation)입니다. DDPG 는 확정적 정책으로 탐색을 위한 무작위성이 없기 때문에 OU 노이즈를 이용하여 이를 해결하고자 합니다.
노이즈를 생성하는 식은 다음과 같습니다.
$dx_t = \theta (\mu - x_t) dt + \sigma dW_t$
$\mu$ 는 평균, $\theta$ 는 평균에 회귀하는 것, $\sigma$ 는 표준 편차, $W_t$ 는 평균이 0이고 분산이 1인 가우시안 화이트 노이즈를 의미합니다.
OU 노이즈를 구현하면 다음과 같습니다.
import matplotlib.pyplot as plt
import numpy as np
class OUNoise:
def __init__(self,action_dimension,mu=0, theta=0.15, sigma=0.3):
self.action_dimension = action_dimension
self.mu = mu
self.theta = theta
self.sigma = sigma
self.state = np.ones(self.action_dimension) * self.mu
self.reset()
def reset(self):
self.state = np.ones(self.action_dimension) * self.mu
def noise(self):
x = self.state
dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(len(x))
self.state = x + dx
return self.state
ou1 = OUNoise(1, sigma=0.4)
ou2 = OUNoise(1, sigma=0.3)
ou3 = OUNoise(1, sigma=0.2)
ou4 = OUNoise(1, sigma=0.1)
multi_ou = OUNoise(3, sigma=0.2)
r = []
for i in range(100):
r.append((ou1.noise(), ou2.noise(), ou3.noise(), ou4.noise(), multi_ou.noise()))
r = np.array(r)
plt.figure(figsize=(12, 6))
plt.plot(r[:, 0], label='sigma=0.4')
plt.plot(r[:, 1], label='sigma=0.3')
plt.plot(r[:, 2], label='sigma=0.2')
plt.plot(r[:, 3], label='sigma=0.1')
plt.title('OU Noise')
plt.ylabel('N')
plt.xlabel('time step')
plt.legend(loc='upper right')
plt.show()
plt.figure(figsize=(12, 6))
plt.plot(r[:, 4].tolist())
plt.title('OU Noise - multiple output')
plt.ylabel('N')
plt.xlabel('time step')
plt.show()
액터 신경망에서 출력된 행동 $\mu(s_t|\theta^\mu_t)$ 에 노이즈 $\mathcal{N}$ 를 추가하는 것입니다.
$\begin{aligned}
& \mu'(s_t) = \mu(s_t|\theta^\mu_t) + \mathcal{N} & \scriptstyle{ \text{ Train } } \\
& \mu'(s_t) = \mu(s_t|\theta^\mu_t) & \scriptstyle{ \text{ Play } }
\end{aligned}$
또한 노이즈는 탐색을 위한 것으로 학습시에만 필요한 것입니다.
목적 함수
액터 신경망의 목표는 리턴의 기댓값을 최대로 만드는 최적의 정책을 구하는 것입니다.
따라서 목적 함수는 다음과 같이 리턴의 기댓값인 가치 함수로부터 시작합니다.
$J(\theta) = V^{\mu_\theta}$
증명을 통해 재구성된 목적 함수의 그래디언트 식은 다음과 같습니다.
$\begin{aligned}
\nabla_{\theta^\mu} J \approx & \mathbb{E}_{s_t \backsim \rho^\beta} [ \nabla_{\theta^\mu} Q(s, a \vert \theta^Q) \vert_{s=s_t, a=\mu(s_t|\theta^\mu)}] & \\
=& \mathbb{E}_{s_t \backsim \rho^\beta} [ \nabla_{a} Q(s, a \vert \theta^Q) \vert_{s=s_t, a=\mu(s_t)} \nabla_{\theta_\mu}\mu(s|\theta^\mu) \vert_{s=s_t}]
\end{aligned}$
${\partial Q(s,a|\theta^Q) \over \partial \theta^\mu}$ 에서 체인 룰을 이용해 ${\partial Q(s,a|\theta^Q) \over \partial a} {\partial a \over \partial \theta^\mu}$ 로 유도된 것으로 이를 통해 액터 신경망의 파라미터 $\theta^{\mu}$ 를 학습할 수 있는 것입니다.
크리틱 신경망의 손실함수는 TD 타겟과 예측의 오차를 계산합니다.
$L(\theta^{Q}) = {\rm E}_{s_t \backsim \rho^\beta , a_t \backsim \beta , r_t \backsim E} [(Q(s_t, a_t \vert \theta^Q)-y_t)^2]$
TD 타겟을 계산하는 식은 다음과 같으며
$y_t = r(s_t, a_t) + \gamma Q(s_{t+1}, \mu(s_{t+1}|\theta^{\mu})|\theta^{Q}$
타겟 신경망을 이용한 계산으로 고치면 다음과 같습니다.
$y_t = r(s_t, a_t) + \gamma \color{red} {Q'(s_{t+1}, \mu'(s_{t+1}|\theta^{\mu'})|\theta^{Q'}}$
알고리즘
구현
알고리즘 테스트 환경은 OpenAI Gym 의 BipedalWalker-v2 입니다.
BipedalWalker 은 로봇을 목표지점까지 걷게하는 문제입니다.
환경으로부터 받을 수 있는 정보를 출력합니다.
import gym
env = gym.make('BipedalWalker-v3')
print('obs', env.observation_space.shape[0])
print('act', env.action_space.shape[0])
print('act low', env.action_space.low)
print('act high', env.action_space.high)
상태 정보는 24차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 4차원의 벡터로 각 요소는 [-1, 1] 의 범위의 연속적인 공간을 갖습니다.
obs 24
act 4
act low -1.0
act high 1.0
액터 신경망을 정의합니다.
class Actor:
def __init__(self, scope):
with tf.variable_scope(scope):
with tf.name_scope('input'):
self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
with tf.variable_scope('layer'):
self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
self.fc2 = tf.layers.dense(self.fc1, 256, tf.nn.relu)
with tf.name_scope('out'):
self.a = tf.layers.dense(self.fc2, N_A, tf.nn.tanh) * A_BOUND[1]
self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)
액터 신경망은 확정적 정책으로 상태를 입력으로 받으면 바로 행동을 출력합니다.
self.a = tf.layers.dense(self.fc2, N_A, tf.nn.tanh) * A_BOUND[1]
...
def get_action(self, s):
action = self.sess.run(self.actor.a, {self.actor.s: s[np.newaxis, :]})[0]
return action
출력으로 tanh 함수를 사용하여 출력의 범위를 [-1, 1] 로 하는 것입니다. 만약 환경이 더 큰 범위를 갖는다면 뒤에 상수를 곱함으로써 범위를 조정할 수 있습니다.
크리틱 신경망을 정의합니다.
class Critic:
def __init__(self, scope):
with tf.variable_scope(scope):
with tf.name_scope('input'):
self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
with tf.variable_scope('layer'):
self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
concat = tf.concat([self.a, self.fc1], axis=-1)
self.fc2 = tf.layers.dense(concat, 256, tf.nn.relu)
with tf.name_scope('out'):
self.q = tf.layers.dense(self.fc2, 1)
self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)
액터 신경망으로부터 전달받은 행동층과 결합하여 최종적으로 $Q$ 를 출력합니다.
with tf.name_scope('input'):
self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
with tf.variable_scope('layer'):
self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
concat = tf.concat([self.a, self.fc1], axis=-1)
self.fc2 = tf.layers.dense(concat, 256, tf.nn.relu)
with tf.name_scope('out'):
self.q = tf.layers.dense(self.fc2, 1)
일반 신경망과 타겟 신경망을 생성합니다.
class DDPG:
def __init__(self, load=False):
self.actor = Actor('actor/eval')
self.critic = Critic('critic/eval')
self.target_actor = Actor('actor/target')
self.target_critic = Critic('critic/target')
TD 타겟을 입력으로 받습니다.
with tf.name_scope('input'):
self.td_target = tf.placeholder(tf.float32, [None, 1])
크리틱 신경망의 손실 함수를 정의합니다.
with tf.name_scope('c_loss'):
self.c_loss = tf.losses.mean_squared_error(self.td_target, self.critic.q)
$Q$ 타겟과 추정 $Q$ 의 평균 제곱 오차(mean squared error, MSE)를 계산합니다.
손실값을 최소화하는 방향으로 학습하도록 설정합니다.
with tf.name_scope('c_optimizer'):
self.c_train_op = tf.train.AdamOptimizer(LR_C).minimize(self.c_loss)
그래디언트 계산을 정의합니다.
with tf.name_scope('grads'):
self.q_grads = tf.gradients(self.critic.q, self.critic.a)
self.a_grads = tf.gradients(-self.actor.a, self.actor.params, self.q_grads)
목적함수 그래디언트 식은 다음과 같습니다.
$\nabla_{\theta^\mu} J \approx \mathbb{E}_{s_t \backsim \rho^\beta} [ \nabla_{a} Q(s, a \vert \theta^Q) \vert_{s=s_t, a=\mu(s_t)} \nabla_{\theta_\mu}\mu(s|\theta^\mu) \vert_{s=s_t}]$
즉 ${\partial Q(s,a|\theta^Q) \over \partial a} {\partial a \over \partial \theta^\mu}$ 를 계산하는 것으로 크리틱 신경망의 파라미터 $\theta^Q$ 에 의한 행동에 대한 $Q$ 에 대한 그래디언트 ${\partial Q(s,a|\theta^Q) \over \partial a}$ 를 계산하고 이는 최종적으로 계산하고자 하는 ${\partial a \over \partial \theta^\mu}$ 그라디언트 계산에 초기값으로 곱해지는 것입니다. 또한 최대화를 해야하므로 마이너스 부호를 붙입니다.(tf.gradients)
그래디언트와 적용할 파라미터를 설정합니다.(tf.train.Optimizer)
with tf.name_scope('a_optimizer'):
self.a_train_op = tf.train.AdamOptimizer(LR_A).apply_gradients(zip(self.a_grads, self.actor.params))
크리틱 신경망에서 행동에 대한 $Q$의 그래디언트를 계산하고 이를 통해 액터 신경망의 파라미터를 업데이트하는 것입니다.
타겟 신경망에 대한 소프트 업데이트에 대한 부분입니다.
self.update_op = [tf.assign(t_p, TAU * t_p + (1 - TAU) * e_p) for t_p, e_p in zip(self.target_actor.params + self.target_critic.params, self.actor.params + self.critic.params)]
...
def update_target(self):
self.sess.run(self.update_op)
환경이 갖는 행동 크기만큼 OU 노이즈 객체를 생성합니다.
N_A = env.action_space.shape[0]
...
self.ou = OUNoise(N_A)
이는 각 노이즈가 다른 궤적을 갖는 것을 의미합니다.
에이전트와 환경의 상호작용입니다.
agent = DDPG()
for i in range(MAX_EPI + 1):
agent.ou.reset()
state = env.reset()
done = False
while not done:
action = agent.get_action(state)
noise = agent.ou.noise()
action = np.clip(action + noise, *A_BOUND)
next_state, reward, done, _ = env.step(action)
agent.replay_memory.append((state, action, reward, done, next_state))
if len(agent.replay_memory) >= MAX_MEM * 0.1:
agent.train()
state = next_state
매 에피소드마다 OU 노이즈를 초기화하여 노이즈의 무작위성을 보장합니다.
agent.ou.reset()
액터 신경망으로부터 출력된 행동에 노이즈를 추가하고 행동 범위를 클리핑합니다.
action = agent.get_action(state)
noise = agent.ou.noise()
action = np.clip(action + noise, *A_BOUND)
이를 통해 확정적 정책에 대해 탐색 문제를 해소합니다.
$\mu'(s_t) = \mu(s_t|\theta^\mu_t) + \mathcal{N}$
환경으로부터 받은 정보를 리플레이 메모리에 저장하고 일정 크기부터 업데이트를 진행합니다.
MAX_MEM = 50000
...
next_state, reward, done, _ = env.step(action)
agent.replay_memory.append((state, action, reward, done, next_state))
if len(agent.replay_memory) >= MAX_MEM * 0.1:
agent.train()
신경망 업데이트에 대한 부분입니다.
def train(self):
minibatch = random.sample(self.replay_memory, BATCH_SIZE)
minibatch = np.array(minibatch)
a_ = self.sess.run(self.target_actor.a, {self.target_actor.s: minibatch[:, 4].tolist()})
target_q = self.sess.run(self.target_critic.q, {self.target_critic.s: minibatch[:, 4].tolist(),
self.target_critic.a: a_})
td_target = minibatch[:, 2] + GAMMA * target_q.squeeze() * ~minibatch[:, 3].astype(np.bool)
self.sess.run(self.c_train_op, {self.critic.s: minibatch[:, 0].tolist(),
self.critic.a: minibatch[:, 1].tolist(),
self.td_target: td_target[:, np.newaxis]})
self.sess.run(self.a_train_op, {self.actor.s: minibatch[:, 0].tolist(),
self.critic.s: minibatch[:, 0].tolist(),
self.critic.a: minibatch[:, 1].tolist()})
self.update_target()
리플레이 메모리에 저장된 샘플을 배치 사이즈만큼 랜덤하게 추출합니다.
minibatch = random.sample(self.replay_memory, BATCH_SIZE)
minibatch = np.array(minibatch)
타겟 신경망으로부터 다음 상태에 대한 $Q'(s_{i+1}, \mu'(s_{i+1}|\theta^{\mu'})$ 를 구합니다.
a_ = self.sess.run(self.target_actor.a, {self.target_actor.s: minibatch[:, 4].tolist()})
target_q = self.sess.run(self.target_critic.q, {self.target_critic.s: minibatch[:, 4].tolist(), self.target_critic.a: a_})
TD 타겟인 $r_i + \gamma Q'(s_{i+1}, \mu'(s_{i+1}|\theta^{\mu'})|\theta^{Q'})$ 을 계산합니다.
q_target = minibatch[:, 2] + GAMMA * target_q.squeeze() * ~minibatch[:, 3].astype(np.bool)
크리틱 신경망과 액터 신경망을 각각 업데이트합니다.
self.sess.run(self.c_train_op, {self.critic.s: minibatch[:, 0].tolist(),
self.critic.a: minibatch[:, 1].tolist(),
self.q_target: q_target[:, np.newaxis]})
self.sess.run(self.a_train_op, {self.actor.s: minibatch[:, 0].tolist(),
self.critic.s: minibatch[:, 0].tolist(),
self.critic.a: minibatch[:, 1].tolist()})
크리틱 신경망의 파라미터는 타겟과 예측에 대한 오차를 최소화하는 방향으로 업데이트하며 액터 신경망의 파라미터는 크리틱 신경망으로부터 계산된 $Q$ 그래디언트를 이용하여 목적 함수가 최대가 되도록 행동을 수정하는 방향으로 업데이트합니다.
일반 신경망의 파라미터를 타겟 신경망의 파라미터로 복제합니다.
self.update_target()
1000번의 에피소드 동안 학습한 결과입니다.
저장된 모델을 불러와 테스트합니다.
INFO:tensorflow:Restoring parameters from ./tmp/ddpg_bipedalwalker/model/model
episode: 0 reward: 190.82911344149113
episode: 1 reward: 32.018837167782266
episode: 2 reward: 66.25328426062981
episode: 3 reward: 186.63262380705766
'머신러닝 > 강화학습' 카테고리의 다른 글
PPO(Proximal Policy Optimization) (0) | 2020.12.07 |
---|---|
A3C(Asynchronous Advantage Actor-Critic) (0) | 2020.11.30 |
액터-크리틱(Actor-Critic) (0) | 2020.11.27 |
정책 그래디언트(Policy Gradient) (0) | 2020.11.24 |
DQN(Deep Q-Networks) (2) (0) | 2020.11.24 |