독학 연구소
공부한 내용을 정리합니다.
DDPG(Deep Deterministic Policy Gradient)

DDPG(Deep Deterministic Policy Gradient)

논문 Continuous control with deep reinforcement learning 으로 소개되었습니다.

 

DQN 은 off-policy 알고리즘으로 experience replay, target network 기법을 통해 좋은 성능을 내지만 연속적인 행동 공간(continuous action space)을 갖는 환경에서는 적용할 수 없으며 정책 그래디언트 기반의 액터-크리틱 방법론은 분산(variance)은 감소시켰지만 on-policy 알고리즘이므로 데이터간에 상관관계(correlation)가 크기 때문에 편향(bias)을 갖는 단점이 있습니다.

 

DDPG 는 확정적 정책(deterministic policy)을 취하는 정책 그래디언트 알고리즘으로 off-policy 이면서 연속적인 행동 공간을 갖는 환경에서도 적용이 가능한 것이 특징입니다. 또한 액터-크리틱 방법론은 액터 신경망에서 평균과 표준 편차를 출력함으로써 가우시안 분포를 갖는 확률 밀도 함수를 추정하였는데 DDPG 는 행동 그 자체를 추정하자는 아이디어인 것입니다.

 

확정적 정책을 갖는 알고리즘에는 $Q$ 를 학습하는 DQN, SARSA 등이 있습니다. DDPG 또한 $Q$ 를 학습하며 연속적인 행동 공간을 갖는 환경에도 적용할 수 있는 DQN 이라고 할 수 있습니다. 하지만 연속적인 행동 공간에서 $Q$ 를 학습하기 위해서는 정책 그래디언트에 근거한 접근을 이용하기 때문에 정책 그래디언트 기반의 방법론인 것입니다.

 

DDPG 알고리즘은 DQN 에서 적용하는 기법인 experience replay, target network 와 추가적으로 탐색을 위한 noise 기법을 이용합니다.

 

 

타겟 네트워크(Target Network)

일반 신경망과 동일한 구조의 타겟 신경망을 두는 기법입니다. 업데이트 계산을 위한 타겟은 이 타겟 신경망으로부터 구하는데 이와 같이 하는 이유는 동일한 신경망으로부터 타겟과 예측을 구한다면 매번 다른 값이 나올 수 있기 때문입니다.

 

DQN 은 일정 주기마다 일반 신경망의 파라미터를 타겟 신경망으로 한 번에 복사하여 타겟 신경망을 일정 주기동안 고정하지만 DDPG 는 소프트 업데이트(soft update) 방식으로 매 학습마다 파라미터값을 조금씩 업데이트합니다. 따라서 타겟 신경망의 파라미터값이 고정되지 않고 계속 변한다는 특징이 있습니다.

 

파라미터 업데이트 식은 다음과 같습니다.

 

$\begin{aligned}
& \theta^{Q'} \leftarrow \tau \theta^Q + (1 - \tau)\theta^{Q'} & \scriptstyle{ \text{ Critic } } \\
& \theta^{\mu'} \leftarrow \tau \theta^\mu + (1 - \tau)\theta^{\mu'} & \scriptstyle{ \text{ Actor } } 
\end{aligned}$

 

 

이를 구현하면 다음과 같습니다.

TAU = 0.01

x = 0.54123
x_target = 0.01

for i in range(1000):
    x_target = TAU * x + (1 - TAU) * x_target
    print(i, x_target)
0 0.015312300000000001
1 0.020571477
2 0.025778062230000003
3 0.030932581607700002

...

996 0.5412063641292437
997 0.5412066004879513
998 0.5412068344830718
999 0.5412070661382411

 

 

OU 노이즈(Ornstein-Uhlenbeck Noise)

연속적인 행동 공간을 갖는 환경에서의 주된 과제는 탐색(exporation)입니다. DDPG 는 확정적 정책으로 탐색을 위한 무작위성이 없기 때문에 OU 노이즈를 이용하여 이를 해결하고자 합니다.

 

노이즈를 생성하는 식은 다음과 같습니다.

 

$dx_t = \theta (\mu - x_t) dt + \sigma dW_t$

 

$\mu$ 는 평균, $\theta$ 는 평균에 회귀하는 것, $\sigma$ 는 표준 편차, $W_t$ 는 평균이 0이고 분산이 1인 가우시안 화이트 노이즈를 의미합니다.

 

OU 노이즈를 구현하면 다음과 같습니다.

import matplotlib.pyplot as plt
import numpy as np

class OUNoise:
    def __init__(self,action_dimension,mu=0, theta=0.15, sigma=0.3):
        self.action_dimension = action_dimension
        self.mu = mu
        self.theta = theta
        self.sigma = sigma
        self.state = np.ones(self.action_dimension) * self.mu
        self.reset()

    def reset(self):
        self.state = np.ones(self.action_dimension) * self.mu

    def noise(self):
        x = self.state
        dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(len(x))
        self.state = x + dx
        return self.state

ou1 = OUNoise(1, sigma=0.4)
ou2 = OUNoise(1, sigma=0.3)
ou3 = OUNoise(1, sigma=0.2)
ou4 = OUNoise(1, sigma=0.1)

multi_ou = OUNoise(3, sigma=0.2)

r = []

for i in range(100):
    r.append((ou1.noise(), ou2.noise(), ou3.noise(), ou4.noise(), multi_ou.noise()))

r = np.array(r)

plt.figure(figsize=(12, 6))
plt.plot(r[:, 0], label='sigma=0.4')
plt.plot(r[:, 1], label='sigma=0.3')
plt.plot(r[:, 2], label='sigma=0.2')
plt.plot(r[:, 3], label='sigma=0.1')
plt.title('OU Noise')
plt.ylabel('N')
plt.xlabel('time step')
plt.legend(loc='upper right')
plt.show()

plt.figure(figsize=(12, 6))
plt.plot(r[:, 4].tolist())
plt.title('OU Noise - multiple output')
plt.ylabel('N')
plt.xlabel('time step')
plt.show()

 

액터 신경망에서 출력된 행동 $\mu(s_t|\theta^\mu_t)$ 에 노이즈 $\mathcal{N}$ 를 추가하는 것입니다.

 

$\begin{aligned}
& \mu'(s_t) = \mu(s_t|\theta^\mu_t) + \mathcal{N} & \scriptstyle{ \text{ Train } } \\
& \mu'(s_t) = \mu(s_t|\theta^\mu_t) & \scriptstyle{ \text{ Play } }
\end{aligned}$

 

또한 노이즈는 탐색을 위한 것으로 학습시에만 필요한 것입니다.

 

 

목적 함수

액터 신경망의 목표는 리턴의 기댓값을 최대로 만드는 최적의 정책을 구하는 것입니다.

 

따라서 목적 함수는 다음과 같이 리턴의 기댓값인 가치 함수로부터 시작합니다.

 

$J(\theta) = V^{\mu_\theta}$

 

증명을 통해 재구성된 목적 함수의 그래디언트 식은 다음과 같습니다.

 

$\begin{aligned} 
\nabla_{\theta^\mu} J \approx & \mathbb{E}_{s_t \backsim \rho^\beta} [ \nabla_{\theta^\mu} Q(s, a \vert \theta^Q) \vert_{s=s_t, a=\mu(s_t|\theta^\mu)}] & \\
=& \mathbb{E}_{s_t \backsim \rho^\beta} [ \nabla_{a} Q(s, a \vert \theta^Q) \vert_{s=s_t, a=\mu(s_t)} \nabla_{\theta_\mu}\mu(s|\theta^\mu) \vert_{s=s_t}] 
\end{aligned}$

 

${\partial Q(s,a|\theta^Q) \over \partial \theta^\mu}$ 에서 체인 룰을 이용해 ${\partial Q(s,a|\theta^Q) \over \partial a} {\partial a \over \partial \theta^\mu}$ 로 유도된 것으로 이를 통해 액터 신경망의 파라미터 $\theta^{\mu}$ 를 학습할 수 있는 것입니다.

 

크리틱 신경망의 손실함수는 TD 타겟과 예측의 오차를 계산합니다.

 

$L(\theta^{Q}) = {\rm E}_{s_t \backsim \rho^\beta , a_t \backsim \beta , r_t \backsim E} [(Q(s_t, a_t \vert \theta^Q)-y_t)^2]$

 

TD 타겟을 계산하는 식은 다음과 같으며

 

$y_t = r(s_t, a_t) + \gamma Q(s_{t+1}, \mu(s_{t+1}|\theta^{\mu})|\theta^{Q}$

 

타겟 신경망을 이용한 계산으로 고치면 다음과 같습니다.

 

$y_t = r(s_t, a_t) + \gamma \color{red} {Q'(s_{t+1}, \mu'(s_{t+1}|\theta^{\mu'})|\theta^{Q'}}$

 

 

알고리즘

 

 

 

구현

알고리즘 테스트 환경은 OpenAI GymBipedalWalker-v2 입니다.

 

 

BipedalWalker 은 로봇을 목표지점까지 걷게하는 문제입니다.

 

환경으로부터 받을 수 있는 정보를 출력합니다.

import gym

env = gym.make('BipedalWalker-v3')

print('obs', env.observation_space.shape[0])
print('act', env.action_space.shape[0])
print('act low', env.action_space.low)
print('act high', env.action_space.high)

 

상태 정보는 24차원의 벡터로 연속적인 공간을 갖으며 에이전트가 취할 수 있는 행동은 4차원의 벡터로 각 요소는 [-1, 1] 의 범위의 연속적인 공간을 갖습니다.

obs 24
act 4
act low -1.0
act high 1.0

 

 

액터 신경망을 정의합니다.

class Actor:
    def __init__(self, scope):
        with tf.variable_scope(scope):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
            with tf.variable_scope('layer'):
                self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
                self.fc2 = tf.layers.dense(self.fc1, 256, tf.nn.relu)
            with tf.name_scope('out'):
                self.a = tf.layers.dense(self.fc2, N_A, tf.nn.tanh) * A_BOUND[1]
        self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)

 

액터 신경망은 확정적 정책으로 상태를 입력으로 받으면 바로 행동을 출력합니다.

self.a = tf.layers.dense(self.fc2, N_A, tf.nn.tanh) * A_BOUND[1]

...

def get_action(self, s):
    action = self.sess.run(self.actor.a, {self.actor.s: s[np.newaxis, :]})[0]
    return action

 

출력으로 tanh 함수를 사용하여 출력의 범위를 [-1, 1] 로 하는 것입니다. 만약 환경이 더 큰 범위를 갖는다면 뒤에 상수를 곱함으로써 범위를 조정할 수 있습니다.

 

 

크리틱 신경망을 정의합니다.

class Critic:
    def __init__(self, scope):
        with tf.variable_scope(scope):
            with tf.name_scope('input'):
                self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
                self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
            with tf.variable_scope('layer'):
                self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
                concat = tf.concat([self.a, self.fc1], axis=-1)
                self.fc2 = tf.layers.dense(concat, 256, tf.nn.relu)
            with tf.name_scope('out'):
                self.q = tf.layers.dense(self.fc2, 1)
        self.params = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope)

 

액터 신경망으로부터 전달받은 행동층과 결합하여 최종적으로 $Q$ 를 출력합니다.

with tf.name_scope('input'):
    self.s = tf.placeholder(tf.float32, [None, N_S], name='state')
    self.a = tf.placeholder(tf.float32, [None, N_A], name='action')
with tf.variable_scope('layer'):
    self.fc1 = tf.layers.dense(self.s, 512, tf.nn.relu)
    concat = tf.concat([self.a, self.fc1], axis=-1)
    self.fc2 = tf.layers.dense(concat, 256, tf.nn.relu)
with tf.name_scope('out'):
    self.q = tf.layers.dense(self.fc2, 1)

 

일반 신경망과 타겟 신경망을 생성합니다. 

class DDPG:
    def __init__(self, load=False):
        self.actor = Actor('actor/eval')
        self.critic = Critic('critic/eval')
        self.target_actor = Actor('actor/target')
        self.target_critic = Critic('critic/target')

 

TD 타겟을 입력으로 받습니다.

with tf.name_scope('input'):
    self.td_target = tf.placeholder(tf.float32, [None, 1])

 

크리틱 신경망의 손실 함수를 정의합니다.

with tf.name_scope('c_loss'):
    self.c_loss = tf.losses.mean_squared_error(self.td_target, self.critic.q)

 

$Q$ 타겟과 추정 $Q$ 의 평균 제곱 오차(mean squared error, MSE)를 계산합니다.

 

손실값을 최소화하는 방향으로 학습하도록 설정합니다.

with tf.name_scope('c_optimizer'):
    self.c_train_op = tf.train.AdamOptimizer(LR_C).minimize(self.c_loss)

 

그래디언트 계산을 정의합니다.

with tf.name_scope('grads'):
    self.q_grads = tf.gradients(self.critic.q, self.critic.a)
    self.a_grads = tf.gradients(-self.actor.a, self.actor.params, self.q_grads)

 

목적함수 그래디언트 식은 다음과 같습니다.

 

$\nabla_{\theta^\mu} J \approx \mathbb{E}_{s_t \backsim \rho^\beta} [ \nabla_{a} Q(s, a \vert \theta^Q) \vert_{s=s_t, a=\mu(s_t)} \nabla_{\theta_\mu}\mu(s|\theta^\mu) \vert_{s=s_t}]$

 

즉 ${\partial Q(s,a|\theta^Q) \over \partial a} {\partial a \over \partial \theta^\mu}$ 를 계산하는 것으로 크리틱 신경망의 파라미터 $\theta^Q$ 에 의한 행동에 대한 $Q$ 에 대한 그래디언트 ${\partial Q(s,a|\theta^Q) \over \partial a}$ 를 계산하고 이는 최종적으로 계산하고자 하는 ${\partial a \over \partial \theta^\mu}$ 그라디언트 계산에 초기값으로 곱해지는 것입니다. 또한 최대화를 해야하므로 마이너스 부호를 붙입니다.(tf.gradients)

 

그래디언트와 적용할 파라미터를 설정합니다.(tf.train.Optimizer)

with tf.name_scope('a_optimizer'):
    self.a_train_op = tf.train.AdamOptimizer(LR_A).apply_gradients(zip(self.a_grads, self.actor.params))

 

크리틱 신경망에서 행동에 대한 $Q$의 그래디언트를 계산하고 이를 통해 액터 신경망의 파라미터를 업데이트하는 것입니다.

 

 

타겟 신경망에 대한 소프트 업데이트에 대한 부분입니다.

self.update_op = [tf.assign(t_p, TAU * t_p + (1 - TAU) * e_p) for t_p, e_p in zip(self.target_actor.params + self.target_critic.params, self.actor.params + self.critic.params)]

...

def update_target(self):
    self.sess.run(self.update_op)

 

환경이 갖는 행동 크기만큼 OU 노이즈 객체를 생성합니다.

N_A = env.action_space.shape[0]

...

self.ou = OUNoise(N_A)

 

이는 각 노이즈가 다른 궤적을 갖는 것을 의미합니다.

 

 

에이전트와 환경의 상호작용입니다.

agent = DDPG()

for i in range(MAX_EPI + 1):
    agent.ou.reset()
    
    state = env.reset()
    done = False
    while not done:
        action = agent.get_action(state)
        noise = agent.ou.noise()
        action = np.clip(action + noise, *A_BOUND)
        next_state, reward, done, _ = env.step(action)

        agent.replay_memory.append((state, action, reward, done, next_state))
        
        if len(agent.replay_memory) >= MAX_MEM * 0.1:
            agent.train()
        
        state = next_state

 

매 에피소드마다 OU 노이즈를 초기화하여 노이즈의 무작위성을 보장합니다.

agent.ou.reset()

 

액터 신경망으로부터 출력된 행동에 노이즈를 추가하고 행동 범위를 클리핑합니다.

action = agent.get_action(state)
noise = agent.ou.noise()
action = np.clip(action + noise, *A_BOUND)

 

이를 통해 확정적 정책에 대해 탐색 문제를 해소합니다.

 

$\mu'(s_t) = \mu(s_t|\theta^\mu_t) + \mathcal{N}$

 

 

환경으로부터 받은 정보를 리플레이 메모리에 저장하고 일정 크기부터 업데이트를 진행합니다.

MAX_MEM = 50000

...

next_state, reward, done, _ = env.step(action)
        
agent.replay_memory.append((state, action, reward, done, next_state))

if len(agent.replay_memory) >= MAX_MEM * 0.1:
    agent.train()

 

신경망 업데이트에 대한 부분입니다.

def train(self):
    minibatch = random.sample(self.replay_memory, BATCH_SIZE)
    minibatch = np.array(minibatch)

    a_ = self.sess.run(self.target_actor.a, {self.target_actor.s: minibatch[:, 4].tolist()})
    target_q = self.sess.run(self.target_critic.q, {self.target_critic.s: minibatch[:, 4].tolist(),
                                              self.target_critic.a: a_})

    td_target = minibatch[:, 2] + GAMMA * target_q.squeeze() * ~minibatch[:, 3].astype(np.bool)

    self.sess.run(self.c_train_op, {self.critic.s: minibatch[:, 0].tolist(),
                                    self.critic.a: minibatch[:, 1].tolist(),
                                    self.td_target: td_target[:, np.newaxis]})

    self.sess.run(self.a_train_op, {self.actor.s: minibatch[:, 0].tolist(),
                                    self.critic.s: minibatch[:, 0].tolist(),
                                    self.critic.a: minibatch[:, 1].tolist()})

    self.update_target()

 

리플레이 메모리에 저장된 샘플을 배치 사이즈만큼 랜덤하게 추출합니다.

minibatch = random.sample(self.replay_memory, BATCH_SIZE)
minibatch = np.array(minibatch)

 

타겟 신경망으로부터 다음 상태에 대한 $Q'(s_{i+1}, \mu'(s_{i+1}|\theta^{\mu'})$ 를 구합니다.

a_ = self.sess.run(self.target_actor.a, {self.target_actor.s: minibatch[:, 4].tolist()})
target_q = self.sess.run(self.target_critic.q, {self.target_critic.s: minibatch[:, 4].tolist(), self.target_critic.a: a_})

 

TD 타겟인 $r_i + \gamma Q'(s_{i+1}, \mu'(s_{i+1}|\theta^{\mu'})|\theta^{Q'})$ 을 계산합니다.

q_target = minibatch[:, 2] + GAMMA * target_q.squeeze() * ~minibatch[:, 3].astype(np.bool)

 

크리틱 신경망과 액터 신경망을 각각 업데이트합니다.

self.sess.run(self.c_train_op, {self.critic.s: minibatch[:, 0].tolist(),
                                self.critic.a: minibatch[:, 1].tolist(),
                                self.q_target: q_target[:, np.newaxis]})

self.sess.run(self.a_train_op, {self.actor.s: minibatch[:, 0].tolist(),
                                self.critic.s: minibatch[:, 0].tolist(),
                                self.critic.a: minibatch[:, 1].tolist()})

 

크리틱 신경망의 파라미터는 타겟과 예측에 대한 오차를 최소화하는 방향으로 업데이트하며 액터 신경망의 파라미터는 크리틱 신경망으로부터 계산된 $Q$ 그래디언트를 이용하여 목적 함수가 최대가 되도록 행동을 수정하는 방향으로 업데이트합니다.

 

일반 신경망의 파라미터를 타겟 신경망의 파라미터로 복제합니다.

self.update_target()

 

 

1000번의 에피소드 동안 학습한 결과입니다.

 

 

저장된 모델을 불러와 테스트합니다.

INFO:tensorflow:Restoring parameters from ./tmp/ddpg_bipedalwalker/model/model
episode: 0 reward: 190.82911344149113
episode: 1 reward: 32.018837167782266
episode: 2 reward: 66.25328426062981
episode: 3 reward: 186.63262380705766

 

 

 

'머신러닝 > 강화학습' 카테고리의 다른 글

PPO(Proximal Policy Optimization)  (0) 2020.12.07
A3C(Asynchronous Advantage Actor-Critic)  (0) 2020.11.30
액터-크리틱(Actor-Critic)  (0) 2020.11.27
정책 그래디언트(Policy Gradient)  (0) 2020.11.24
DQN(Deep Q-Networks) (2)  (0) 2020.11.24
  Comments,     Trackbacks