Article Reading

本次阅读三篇论文,分别是 1509.06461 1511.05952 1511.06581,分别是优先经验回放、Double DQN 和 Dual DQN

Double DQN

Double DQN主要解决的问题是在贪心策略做最大化的时候出现的最大化偏差问题,什么是最大化偏差呢?

最大化偏差(过估计)

正常情况下理想的最优策略下的状态价值函数和动作价值函数应该满足:

即使这里的估计是无偏的,即满足

也不代表它的二阶矩就是0(证明是把上面那个式子里面的正的部分和负的部分分开再求平方),假设

在这种情况下,会有:(平均值原理)

这个下界是紧下界。

最大化偏差问题会导致很多模型的训练出现训练不稳定或者就训练不起来的情况。

Double DQN如何解决最大化偏差问题

Double DQN选择使用两个不同的网络来进行学习。首先是有一个 Target 网络用于对当前的动作进行评估,再用一个 Q-Network 来学习当前的价值。每过一段时间就用 Q网络来更新target网络。

这个时候的更新表达式如下:

$$
T_t^{\mathrm{DoubleQ}} \equiv R_{t +1} + \gamma Q(S_{t +1},\mathop{\mathrm{argmax}}a Q(S{t + 1},a:\theta_t):\theta_t’ )
$$

和之前DQN里面的比较只有这里的那个 从原来直接求原网络的最大变成了现在的另一个网络的argmax。

为什么work

因为在Double DQN的情况下,满足表达式:
$$
|Q’t(s,argmax_a Q_t(s,a) - V(s))| = 0

Q_t(s,a_1) = V_
(s) + \sqrt{C\frac{m - 1}{m}}
$$

注意这里下标是1

以及对于大于1的情况下:

满足前面的那个假设

优先经验回放

优先经验回放着眼于解决某些TD-Error很小的样本会被回放多次,而某些误差很大的样本由于是等概率采样,被选出的概率会比较小,导致整体的学习比较慢。

解决方案

首先是使用 作为排序的依据,每次在获得一个新的动作的时候,把当前的 priority 变成当前的最大值加一,保证新进来的样本几乎可以确定被选择一次。

然后每次训练之后,根据训练后的TD-Error更新训练样本的Priority,再根据 Priority 算出一个概率,依据概率采样出对应的样本进行训练

抽取概率计算

直接使用权重做概率来抽取会导致整个抽取的行为严重依赖当前的训练状态,所以论文中提供了两种方法来解决这种问题,我选择的是每次直接进行排序,然后计算小p,即 然后每个被抽取的概率就是:

再按照这个来抽样。

重要性采样

由于优先经验回放采样的TD-Error来排序,破坏了样本的随机性,所以需要加上重要性采样来进行优化,即令:

这里的 是buffer的大小, 滚动更新

Dual Network

对偶网络的提出背景是,在某些状态的情况下,选取什么动作其实是对整个过程没有什么影响的,这个时候状态才是最重要的,那么就可以定义一个叫做 动作优势函数的东西,相当于把动作价值函数拆分成两个部分,一个是状态的价值,一个是选取这个动作带来的优势:

然后去分别学习这两个部分

网络结构对比图如下:

上面那个是DQN下面那个是Dual DQN

代码实现以及运行效果

这里的代码都是在上一篇的基础上做的改进

Double DQN

在上一篇的基础上,只需要修改loss的计算,改成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def calculate_loss(self, states, actions, rewards, next_states, dones):
"""
Calculates the loss for a batch of experiences.

Args:
states (torch.Tensor): Batch of states.
actions (torch.Tensor): Batch of actions.
rewards (torch.Tensor): Batch of rewards.
next_states (torch.Tensor): Batch of next states.
dones (torch.Tensor): Batch of done flags.

Returns:
torch.Tensor: Loss value.
"""
tmp = self.q_network(states)
rewards = rewards.to(self.device)
q_values = tmp[range(states.shape[0]), actions.long()]
q_predict_action_output = self.q_network(next_states).detach()
q_predict_action = q_predict_action_output.argmax(dim=1)
target_output = self.gamma * self.target_network(next_states)
target_output = target_output[range(states.shape[0]), q_predict_action]
default = rewards + target_output
target = torch.where(dones.to(self.device), rewards, default).to(self.device).detach()
return F.mse_loss(target, q_values)

就可以了

实验效果

在pong这个游戏下表现还是不错的

Loss 曲线

Reward 曲线

优先经验回放

这里需要修改一下经验回放数组,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import random
import torch


class Transition(object):
'''
the order is frame action reward next_frame done
'''
def __init__(self, frame, action, reward, next_frame, done, priority=0, index=0):
self.priority = priority
self.index = index
self.content = (frame, action, reward, next_frame, done)

def __lt__(self, other):
return self.priority < other.priority

def __gt__(self, other):
return self.priority > other.priority

def __str__(self):
return f'priority: {self.priority}, index: {self.index}'


class MPriorityQueue:
def __init__(self, max_size=10000):
self.heap = []
self.max_size = max_size

def push(self, item):
self.heap.append(item)
self._heapify_up(len(self.heap) - 1)
while (len(self.heap) > self.max_size):
self.pop()

def pop(self):
if len(self.heap) == 0:
return None
if len(self.heap) == 1:
return self.heap.pop()
root = self.heap[0]
self.heap[0] = self.heap.pop()
self._heapify_down(0)
return root

def update_key(self, index: int, new_key):
if index < 0 or index >= len(self.heap):
return
old_key = self.heap[index].priority
self.heap[index].priority = new_key
if new_key > old_key:
self._heapify_down(index)

def _heapify_up(self, index):
parent_index = (index - 1) // 2
if index > 0 and self.heap[index] > self.heap[parent_index]: # 自定义对象的比较关系
self.heap[index], self.heap[parent_index] = self.heap[parent_index], self.heap[index]
self._heapify_up(parent_index)

def _heapify_down(self, index):
left_child_index = 2 * index + 1
right_child_index = 2 * index + 2
largest = index

if (left_child_index < len(self.heap) and
self.heap[left_child_index] > self.heap[largest]):
largest = left_child_index

if (right_child_index < len(self.heap) and
self.heap[right_child_index] > self.heap[largest]):
largest = right_child_index

if largest != index:
self.heap[index], self.heap[largest] = self.heap[largest], self.heap[index]
self._heapify_down(largest)


class SortedList:
def __init__(self, max_size=10000):
self.heap = []
self.max_size = max_size

def push(self, item):
self.heap.append(item)
if len(self.heap) > 2 * self.max_size:
self.sort()

def sort(self):
print('sort')
self.heap.sort(key=lambda x: x.priority, reverse=True)
while (len(self.heap) > self.max_size):
self.pop()

def pop(self):
if len(self.heap) == 0:
return None
return self.heap.pop()

def update_key(self, index: list, new_key: list):
for idx, key in zip(index, new_key):
for item in self.heap:
if item.index == idx:
item.priority = key
break

class PriorityReplayBuffer:
"""
A replay buffer class for storing and sampling experiences for reinforcement learning.

Args:
size (int): The maximum size of the replay buffer.

Attributes:
size (int): The maximum size of the replay buffer.
buffer (list): A list to store the experiences.
cur (int): The current index in the buffer.
device (torch.device): The device to use for tensor operations.

Methods:
__len__(): Returns the number of experiences in the buffer.
transform(lazy_frame): Transforms a lazy frame into a tensor.
push(state, action, reward, next_state, done): Adds an experience to the buffer.
sample(batch_size): Samples a batch of experiences from the buffer.

"""

def __init__(self, size, alpha):
self.alpha = alpha
self.size = size
self.cur = 0
self.max_priority = 0
self.device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
self.q = SortedList(max_size=size)

def __len__(self):
return len(self.q.heap)

def transform(self, lazy_frame):
state = torch.from_numpy(lazy_frame.__array__()[None] / 255).float()
return state.to(self.device)

def push(self, state, action, reward, next_state, done):
"""
Adds an experience to the replay buffer.

Args:
state (numpy.ndarray): The current state.
action (int): The action taken.
reward (float): The reward received.
next_state (numpy.ndarray): The next state.
done (bool): Whether the episode is done.

"""
trans = Transition(frame=state, action=action, reward=reward, next_frame=next_state, done=done,
priority=self.max_priority + 1, index=self.cur)
self.max_priority += 1
self.q.push(trans)
self.cur += 1

def get_index(self, batch_size):
self.weight = [1 / ((i + 1) ** self.alpha) for i in range(len(self.q.heap))]
t = sum(self.weight)
self.weight = [w / t for w in self.weight]
# self.weight = [1 / (len(self.q.heap)) for _ in range(len(self.q.heap))]
return random.choices(range(len(self.q.heap)), self.weight, k=batch_size)

def get_weight(self, index):
max_prob = max(self.weight)
ret = [self.weight[i] / max_prob for i in index]
return ret

def sample(self, batch_size, index):
"""
Samples a batch of experiences from the replay buffer.

Args:
batch_size (int): The size of the batch to sample.

Returns:
tuple: A tuple containing the batch of states, actions, rewards, next states, and dones.

"""
states, actions, rewards, next_states, dones = [], [], [], [], []
for idx in index:
frame, action, reward, next_frame, done = self.q.heap[idx].content
state = self.transform(frame)
next_state = self.transform(next_frame)
state = torch.squeeze(state, 0)
next_state = torch.squeeze(next_state, 0)
states.append(state)
actions.append(action)
rewards.append(reward)
next_states.append(next_state)
dones.append(done)
return (torch.stack(states).to(self.device), torch.tensor(actions).to(self.device),
torch.tensor(rewards).to(self.device),
torch.stack(next_states).to(self.device), torch.tensor(dones).to(self.device))

def update_priority(self, index, td_error):
self.q.update_key(index, td_error)
self.max_priority = max(self.max_priority, max(td_error))


if __name__ == '__main__':
sl = SortedList()
for i in range(10):
t = Transition(priority=i, index=i)
sl.push(t)
sl.update_key([5, 2], [10, 20])
for trans in sl.heap:
print(trans)
t = Transition(priority=-10, index=-1)
sl.push(t)
t = Transition(priority=-2, index=-2)
sl.push(t)
sl.update_key([-2, 2], [30, 20])
sl.sort()
for trans in sl.heap:
print(trans)

实验效果


图中的红线是没有加权的经验回放的,蓝线是加了的,可以看出训练要快一些

不过这个加了权的经验回放即使在参数相同的情况下也有train不起来的情况,如下图:

黄色的参数是一样的,不过在很前面的时候就已经loss爆炸了,所以这个还是依赖网络初始化的时候的初始值的。

开完会感觉最后那个是因为没有归一化导致值太大了

个人的思考

关于Dual DQN

在Dual DQN中虽然理论上是学习一个状态价值函数和一个动作有势函数,不过就网络结构来看,它上下两个部分学到的到底是什么我认为是不好说的。下周如果没有安排我会考虑把上面的那个一个的state改成两个,如果还能够取得不错的效果说明他的理论并不能解释为什么他的网络结构效果好。