新闻中心
值分布强化学习 —— C51
值分布强化学习是基于价值的强化学习算法,不同于传统方法仅建模累积回报期望值,它对整个分布Z(s,a)建模以保留分布信息。C51是其代表算法,将分布离散为51个支点,输出支点概率,通过投影贝尔曼更新处理分布范围问题,损失函数用KL散度,框架与DQN类似但输出和更新方式不同。
☞☞☞AI 智能聊天, 问答助手, AI 智能搜索, 免费无限量使用 DeepSeek R1 模型☜☜☜

值分布强化学习简介
首先需要声明的是,值分布强化学习(Distributional Reinforcement Learning)是一类基于价值的强化学习算法(value-based Reinforcement Learning)
经典的基于价值的强化学习方法使用期望值对累积回报进行建模,表示为价值函数 V(s) 或动作价值函数 Q(s,a)
而在这个建模过程中,完整的分布信息在很大程度上被丢失了
提出值分布强化学习就是想要解决分布信息丢失这个问题,对累积回报随机变量的整个分布 Z(s,a) 进行建模,而非只建模其期望
如果用公式表示:
Q(st,at)=EZ(st,at)=E[i=1∑∞γt+iR(st+i,at+i)]
值分布强化学习——C51算法
C51 简介
C51 算法来自 DeepMind 的 A Distributional Perspective on Reinforcement Learning 一文。在这篇文章中,作者首先说明传统 DQN 算法希望学习的 Q 是一个数值,其含义是未来奖励和的期望。而在值分布强化学习系列算法中,目标则由数值变为一个分布。在值分布强化学习中,目标也由数值 Q 变为随机变量 Z,这种改变可以使学到的内容是除了数值以外的更多信息,即整个分布。而模型返回的损失也转变为两个分布之间相似度的度量(metric)。
算法关键
参数化分布
简而言之,若分布取值范围为Vmin到Vmax,并均分为离散的N个点,每个等分支集为
{zi=Vmin+iΔz:0≤i 模型输出的每个值对应取当前支点的概率 通过上面的经过值分布贝尔曼操作符的作用后,新的随机变量的取值范围可能会超出第一个支点中离散化的支集的范围,如下图所示。 v2.2 修改相关字眼,加强搜索功能,重写找回密码功能,减少文件,增加学院功能,补给相关页面,修改相关表单字段名,更新图片新闻显示功能,修正租房搜索,增加BLOG,BBS文件夹,并修改频道设置和导航布局,去除相关ID扫描漏洞·全站设计考虑校园电子商务模式,人性化的设计,独特的校园式网络交易平台。 ·功能十分强大的后台管理界面,通过IE浏览器即可管理整个网 因此我们必须将贝尔曼操作符更新后的随机变量投影到离散化的支集上,即论文提到的投影贝尔曼更新。 简单来说,投影过程就是将更新后随机变量的值分配到与其相邻的支点上投影贝尔曼更新
多多校园交易网
0
查看详情
小结
C51算法与DQN相同点
- C51算法的框架依然是DQN算法
- 采样过程依然使用ϵ−greedy策略,这里贪婪是取期望贪婪
- 采用单独的目标网络
C51算法与DQN不同点
C51算法的卷积神经网络的输出不再是行为值函数,而是支点处的概率。
C51算法的损失函数不再是均方差和而是如上所述的KL散度
最后一个问题,该算法为什么叫C51呢?
这是因为在论文中,作者将随机变量的取值分成了51个支点类。
代码部分
导入依赖
In [1]from typing import Dict, List, Tupleimport gymfrom visualdl import LogWriterfrom tqdm import tqdm,trangeimport numpy as npimport paddleimport paddle.nn as nnimport paddle.nn.functional as Fimport paddle.optimizer as optimizer
需要用到的特殊算子
In [2]def index_add_(parent, axis, idx, child):
expend_dim = parent.shape[0]
idx_one_hot = F.one_hot(idx.cast("int64"), expend_dim)
child = paddle.expand_as(child.cast("float32").unsqueeze(-1), idx_one_hot)
output = parent + (idx_one_hot.cast("float32").multiply(child)).sum(axis).squeeze() return output
经验回放
In [3]class ReplayBuffer:
def __init__(self, obs_dim: int, size: int, batch_size: int = 32):
self.obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.next_obs_buf = np.zeros([size, obs_dim], dtype=np.float32)
self.acts_buf = np.zeros([size], dtype=np.float32)
self.rews_buf = np.zeros([size], dtype=np.float32)
self.done_buf = np.zeros([size], dtype=np.float32)
self.max_size, self.batch_size = size, batch_size
self.ptr, self.size, = 0, 0
def store(
self,
obs: np.ndarray,
act: np.ndarray,
rew: float,
next_obs: np.ndarray,
done: bool, ):
self.obs_buf[self.ptr] = obs
self.next_obs_buf[self.ptr] = next_obs
self.acts_buf[self.ptr] = act
self.rews_buf[self.ptr] = rew
self.done_buf[self.ptr] = done
self.ptr = (self.ptr + 1) % self.max_size
self.size = min(self.size + 1, self.max_size) def sample_batch(self):
idxs = np.random.choice(self.size, size=self.batch_size, replace=False) return dict(obs=self.obs_buf[idxs],
next_obs=self.next_obs_buf[idxs],
acts=self.acts_buf[idxs],
rews=self.rews_buf[idxs],
done=self.done_buf[idxs]) def __len__(self):
return self.size
定义网络结构
虽然我们的DQN可以学到游戏的整个分布,但在 CartPole-v0 这一环境中,我们还是选取期望作为决策的依据
与传统的 DQN 相比,我们的 C51 会在更新方式上有所不同。
In [4]class C51DQN(nn.Layer):
def __init__(
self,
in_dim: int,
out_dim: int,
atom_size: int,
support ):
# 初始化
super(C51DQN, self).__init__()
self.support = support
self.out_dim = out_dim
self.atom_size = atom_size
self.layers = nn.Sequential(
nn.Linear(in_dim, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, out_dim * atom_size)
) def forward(self, x):
dist = self.dist(x)
q = paddle.sum(dist * self.support, axis=2) return q
def dist(self, x):
q_atoms = self.layers(x).reshape([-1, self.out_dim, self.atom_size])
dist = F.softmax(q_atoms, axis=-1)
dist = dist.clip(min=float(1e-3)) # 避免 nan
return dist
Agent 中涉及到的参数设定
Attribute:env: gym 环境memory: 经验回放池的容量batch_size: 训练批次epsilon: 随机探索参数epsilonepsilon_decay: 随机探索参数epsilon衰减的步长max_epsilon: 随机探索参数epsilon上限min_epsilon: 随机探索参数epsilon下限target_update: 目标网络更新频率gamma: 衰减因子dqn: 训练模型dqn_target: 目标模型optimizer: 优化器transition: 转移向量,包含状态值state, 动作值action, 奖励reward, 次态next_state, (判断是否)结束回合donev_min: 离散支集的上限v_max: 离散支集的下限atom_size: 支点数量support: 支集模板(用于计算分布的向量模板)In [5]
class C51Agent:
def __init__(
self,
env: gym.Env,
memory_size: int,
batch_size: int,
target_update: int,
epsilon_decay: float,
max_epsilon: float = 1.0,
min_epsilon: float = 0.1,
gamma: float = 0.99, # C51 算法的参数
v_min: float = 0.0,
v_max: float = 200.0,
atom_size: int = 51,
log_dir: str = "./log"
):
obs_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
self.env = env
self.memory = ReplayBuffer(obs_dim, memory_size, batch_size)
self.batch_size = batch_size
self.epsilon = max_epsilon
self.epsilon_decay = epsilon_decay
self.max_epsilon = max_epsilon
self.min_epsilon = min_epsilon
self.target_update = target_update
self.gamma = gamma
self.v_min = v_min
self.v_max = v_max
self.atom_size = atom_size
self.support = paddle.linspace(
self.v_min, self.v_max, self.atom_size
) # 定义网络
self.dqn = C51DQN(
obs_dim, action_dim, atom_size, self.support
)
self.dqn_target = C51DQN(
obs_dim, action_dim, atom_size, self.support
)
self.dqn_target.load_dict(self.dqn.state_dict())
self.dqn_target.eval()
self.optimizer = optimizer.Adam(parameters=self.dqn.parameters())
self.transition = []
self.is_test = False
self.log_dir = log_dir
self.log_writer = LogWriter(logdir = self.log_dir, comment= "Categorical DQN") def select_action(self, state: np.ndarray):
if self.epsilon > np.random.random():
selected_action = self.env.action_space.sample() else:
selected_action = self.dqn(
paddle.to_tensor(state,dtype="float32"),
).argmax()
selected_action = selected_action.detach().numpy()
if not self.is_test:
self.transition = [state, selected_action]
return selected_action def step(self, action: np.ndarray):
next_state, reward, done, _ = self.env.step(int(action)) if not self.is_test:
self.transition += [reward, next_state, done]
self.memory.store(*se
lf.transition)
return next_state, reward, done def update_model(self):
samples = self.memory.sample_batch()
loss = self._compute_dqn_loss(samples)
self.optimizer.clear_grad()
loss.backward()
self.optimizer.step()
loss_show = loss return loss_show.numpy().item()
def train(self, num_frames: int, plotting_interval: int = 200):
self.is_test = False
state = self.env.reset()
update_cnt = 0
epsilons = []
losses = []
scores = []
score = 0
epsilon = 0
for frame_idx in trange(1, num_frames + 1):
action = self.select_action(state)
next_state, reward, done = self.step(action)
state = next_state
score += reward # 回合结束
if done:
epsilon += 1
state = self.env.reset()
self.log_writer.add_scalar("Reward", value=paddle.to_tensor(score), step=epsilon)
scores.append(score)
score = 0
if len(self.memory) >= self.batch_size:
loss = self.update_model()
self.log_writer.add_scalar("Loss", value=paddle.to_tensor(loss), step=frame_idx)
losses.append(loss)
update_cnt += 1
self.epsilon = max(
self.min_epsilon, self.epsilon - (
self.max_epsilon - self.min_epsilon
) * self.epsilon_decay
)
epsilons.append(self.epsilon) if update_cnt % self.target_update == 0:
self._target_hard_update()
self.env.close()
def test(self):
self.is_test = True
state = self.env.reset()
done = False
score = 0
frames = [] while not done:
frames.append(self.env.render(mode="rgb_array"))
action = self.select_action(state)
next_state, reward, done = self.step(int(action))
state = next_state
score += reward
print("score: ", score)
self.env.close()
return frames def _compute_dqn_loss(self, samples: Dict[str, np.ndarray]):
# 计算损失
state = paddle.to_tensor(samples["obs"],dtype="float32")
next_state = paddle.to_tensor(samples["next_obs"],dtype="float32")
action = paddle.to_tensor(samples["acts"],dtype="int64")
reward = paddle.to_tensor(samples["rews"].reshape([-1, 1]),dtype="float32")
done = paddle.to_tensor(samples["done"].reshape([-1, 1]),dtype="float32")
delta_z = float(self.v_max - self.v_min) / (self.atom_size - 1) with paddle.no_grad():
next_action = self.dqn_target(next_state).argmax(1)
next_dist = self.dqn_target.dist(next_state)
next_dist = next_dist[:self.batch_size,]
_next_dist = paddle.gather(next_dist, next_action, axis=1)
eyes = np.eye(_next_dist.shape[0], _next_dist.shape[1]).astype("float32")
eyes = np.repeat(eyes, _next_dist.shape[-1]).reshape(-1,_next_dist.shape[1],_next_dist.shape[-1])
eyes = paddle.to_tensor(eyes)
next_dist = _next_dist.multiply(eyes).sum(1)
t_z = reward + (1 - done) * self.gamma * self.support
t_z = t_z.clip(min=self.v_min, max=self.v_max)
b = (t_z - self.v_min) / delta_z
l = b.floor().cast("int64")
u = b.ceil().cast("int64")
offset = (
paddle.linspace( 0, (self.batch_size - 1) * self.atom_size, self.batch_size
).cast("int64")
.unsqueeze(1)
.expand([self.batch_size, self.atom_size])
)
proj_dist = paddle.zeros(next_dist.shape)
proj_dist = index_add_(
proj_dist.reshape([-1]), 0,
(l + offset).reshape([-1]),
(next_dist * (u.cast("float32") - b)).reshape([-1])
)
proj_dist = index_add_(
proj_dist.reshape([-1]), 0,
(u + offset).reshape([-1]),
(next_dist * (b - l.cast("float32"))).reshape([-1])
)
proj_dist = proj_dist.reshape(next_dist.shape)
dist = self.dqn.dist(state)
_dist = paddle.gather(dist[:self.batch_size,], action, axis=1)
eyes = np.eye(_dist.shape[0], _dist.shape[1]).astype("float32")
eyes = np.repeat(eyes, _dist.shape[-1]).reshape(-1,_dist.shape[1],_dist.shape[-1])
eyes = paddle.to_tensor(eyes)
dist_batch = _dist.multiply(eyes).sum(1)
log_p = paddle.log(dist_batch)
loss = -(proj_dist * log_p).sum(1).mean() return loss def _target_hard_update(self):
# 更新目标模型参数
self.dqn_target.load_dict(self.dqn.state_dict())
定义环境
In [6]env_id = "CartPole-v0"env = gym.make(env_id)
设定随机种子
In [7]seed = 777np.random.seed(seed) paddle.seed(seed) env.seed(seed)
[777]
超参数设定与训练
In [ ]num_frames = 20000memory_size = 1000batch_size = 32target_update = 200epsilon_decay = 1 / 2000# 训练agent = C51Agent(env, memory_size, batch_size, target_update, epsilon_decay) agent.train(num_frames)
0%| | 32/20000 [00:00<01:38, 202.97it/s]
训练结果展示(使用VisualDL查看log文件夹)

以上就是值分布强化学习 —— C51的详细内容,更多请关注其它相关文章!
# 科大
# 铁岭企业seo方法优化
# 河北seo网站优化电话
# seo原创文章格式
# 徐州网站建设与规划
# 佛山定制网站建设机构
# 抖音seo视频优化推广
# 保险行业营销推广途径
# 巨人导航seo综合查询
# 东莞网站建设哪家有优势
# 台州seo网站推广费用
# 的是
# ai
# 戛纳
# 开源
# 首款
# 系列产品
# 而在
# 交易网
# 贝尔
# 中文网
# type
# writer
# 为什么
相关栏目:
【
行业资讯67740 】
【
技术百科0 】
【
网络运营39195 】
相关推荐:
typescript怎么解析vue TypeScript在vue中的使用最新解读
固态硬盘如何区分好坏
阿里云盘的会员怎么用
sausage是什么意思
划水是什么意思
ai文件里无法找到链接文件要怎么解决步骤
j*a二数组怎么创建
什么是域名解析 域名解析中采用了什么
360n6锁屏壁纸怎么设置
苹果16会有哪些更新
360n5锁屏壁纸怎么设置
如何打开管理员命令提示符
typescript如何使用
如何将系统移到固态硬盘
手机拍电脑屏幕有条纹怎么解决
typescript要用什么工具
5g手机4g卡怎么没有网络
gs是什么意思
如何进入cmd命令行
汽车中控导航机power线是什么意思
hen是什么意思
春运抢票哪个平台好一点
j*a怎么让数组倒换
域名批量查询工具有哪些
一尺是多少厘米
反向春运抢票方式
bc是什么意思
推特是什么软件国内可以使用吗
苹果怎么没出5g手机
哪个牌子的折叠屏手机好
video是什么意思
联想的固态硬盘如何
typescript怎么写多个构造方法
如何使用程序编译 执行的命令
8寸照片尺寸多少厘米
春运抢票哪个平台好抢
typescript怎么拼接
华为的type-c接口是什么接口
夸克还原排版是什么意思
ai怎么找链接文件位置教程
夸克po什么意思
没网环境如何安装typescript
春运抢票还用取票吗
如何查看bash内置的命令
夸克转存中是什么意思
为什么有的夸克带电
分享一个稳定的ao3镜像网址
固态硬盘如何接主机
皓影混动仪表盘上power是什么意思
类似微信的聊天软件有哪些


2025-07-21
浏览次数:次
返回列表
lf.transition)
return next_state, reward, done def update_model(self):
samples = self.memory.sample_batch()
loss = self._compute_dqn_loss(samples)
self.optimizer.clear_grad()
loss.backward()
self.optimizer.step()
loss_show = loss return loss_show.numpy().item()
def train(self, num_frames: int, plotting_interval: int = 200):
self.is_test = False
state = self.env.reset()
update_cnt = 0
epsilons = []
losses = []
scores = []
score = 0
epsilon = 0
for frame_idx in trange(1, num_frames + 1):
action = self.select_action(state)
next_state, reward, done = self.step(action)
state = next_state
score += reward # 回合结束
if done:
epsilon += 1
state = self.env.reset()
self.log_writer.add_scalar("Reward", value=paddle.to_tensor(score), step=epsilon)
scores.append(score)
score = 0
if len(self.memory) >= self.batch_size:
loss = self.update_model()
self.log_writer.add_scalar("Loss", value=paddle.to_tensor(loss), step=frame_idx)
losses.append(loss)
update_cnt += 1
self.epsilon = max(
self.min_epsilon, self.epsilon - (
self.max_epsilon - self.min_epsilon
) * self.epsilon_decay
)
epsilons.append(self.epsilon) if update_cnt % self.target_update == 0:
self._target_hard_update()
self.env.close()
def test(self):
self.is_test = True
state = self.env.reset()
done = False
score = 0
frames = [] while not done:
frames.append(self.env.render(mode="rgb_array"))
action = self.select_action(state)
next_state, reward, done = self.step(int(action))
state = next_state
score += reward
print("score: ", score)
self.env.close()
return frames def _compute_dqn_loss(self, samples: Dict[str, np.ndarray]):
# 计算损失
state = paddle.to_tensor(samples["obs"],dtype="float32")
next_state = paddle.to_tensor(samples["next_obs"],dtype="float32")
action = paddle.to_tensor(samples["acts"],dtype="int64")
reward = paddle.to_tensor(samples["rews"].reshape([-1, 1]),dtype="float32")
done = paddle.to_tensor(samples["done"].reshape([-1, 1]),dtype="float32")
delta_z = float(self.v_max - self.v_min) / (self.atom_size - 1) with paddle.no_grad():
next_action = self.dqn_target(next_state).argmax(1)
next_dist = self.dqn_target.dist(next_state)
next_dist = next_dist[:self.batch_size,]
_next_dist = paddle.gather(next_dist, next_action, axis=1)
eyes = np.eye(_next_dist.shape[0], _next_dist.shape[1]).astype("float32")
eyes = np.repeat(eyes, _next_dist.shape[-1]).reshape(-1,_next_dist.shape[1],_next_dist.shape[-1])
eyes = paddle.to_tensor(eyes)
next_dist = _next_dist.multiply(eyes).sum(1)
t_z = reward + (1 - done) * self.gamma * self.support
t_z = t_z.clip(min=self.v_min, max=self.v_max)
b = (t_z - self.v_min) / delta_z
l = b.floor().cast("int64")
u = b.ceil().cast("int64")
offset = (
paddle.linspace( 0, (self.batch_size - 1) * self.atom_size, self.batch_size
).cast("int64")
.unsqueeze(1)
.expand([self.batch_size, self.atom_size])
)
proj_dist = paddle.zeros(next_dist.shape)
proj_dist = index_add_(
proj_dist.reshape([-1]), 0,
(l + offset).reshape([-1]),
(next_dist * (u.cast("float32") - b)).reshape([-1])
)
proj_dist = index_add_(
proj_dist.reshape([-1]), 0,
(u + offset).reshape([-1]),
(next_dist * (b - l.cast("float32"))).reshape([-1])
)
proj_dist = proj_dist.reshape(next_dist.shape)
dist = self.dqn.dist(state)
_dist = paddle.gather(dist[:self.batch_size,], action, axis=1)
eyes = np.eye(_dist.shape[0], _dist.shape[1]).astype("float32")
eyes = np.repeat(eyes, _dist.shape[-1]).reshape(-1,_dist.shape[1],_dist.shape[-1])
eyes = paddle.to_tensor(eyes)
dist_batch = _dist.multiply(eyes).sum(1)
log_p = paddle.log(dist_batch)
loss = -(proj_dist * log_p).sum(1).mean() return loss def _target_hard_update(self):
# 更新目标模型参数
self.dqn_target.load_dict(self.dqn.state_dict())