从代码学习深度强化学习 – 模型预测控制 PyTorch版

文章目录

前言
核心组件剖析

1. 基础设置
2. 通过神经网络学习环境模型

2.1 真实环境
2.2 要学习的环境模型 (Ensemble Dynamics Model)

3. 基于模型的规划与控制

3.1 虚拟环境 (FakeEnv)
3.2 交叉熵方法 (Cross-Entropy Method, CEM)

4. PETS 算法整体流程

4.1 经验回放与主类定义
4.2 实验与结果

总结


前言

在深度强化学习(DRL)的广阔天地中,算法大致可以分为两大流派:无模型(Model-Free)和基于模型(Model-Based)的方法。我们熟知的 DQN、REINFORCE 以及 Actor-Critic 等都属于无模型方法,它们直接学习从状态到动作的映射(策略)或价值函数,而无需构建对环境动态的理解。

与之相对,基于模型的方法则试图先通过神经网络等工具,从历史数据中学习一个“环境模型”(Environment Model),这个模型能够预测在给定状态下执行某个动作后,环境会转换到什么新的状态以及会产生多少奖励。然后,智能体可以利用这个学习到的模型进行规划和决策,甚至在“脑海中”进行虚拟的训练,从而大大提高数据利用效率。

模型预测控制(Model Predictive Control, MPC) 就是一种经典且强大的基于模型的控制方法。它的核心思想并不在于学习一个显式的策略网络,而是在每一个决策点,利用环境模型进行前瞻性规划。具体来说,MPC 会在当前状态下,生成一系列候选的未来动作序列,然后使用环境模型来预测每个序列可能带来的累积奖励。最后,它会选择最优的那个动作序列,并执行其中的第一个动作。执行完毕后,在新的状态下,它会重复整个规划过程。

本文将要介绍的 带有轨迹采样的概率集成(Probabilistic Ensembles with Trajectory Sampling, PETS) 算法,正是一种先进的基于模型的强化学习算法,它将 MPC 与深度学习巧妙地结合起来。PETS 的关键在于:

学习概率环境模型:使用神经网络集成(Ensemble)来学习一个概率性的环境模型,这不仅能预测未来,还能量化预测的不确定性。
利用模型进行规划:在学习到的模型之上,使用一种名为“交叉熵方法”(Cross-Entropy Method, CEM)的优化算法来高效地选择动作。

接下来,让我们深入代码,一步步揭开 PETS 的神秘面纱,看看它是如何通过 PyTorch 实现的。

完整代码:下载链接

核心组件剖析

模型预测控制方法主要包含两个部分:1. 根据历史数据学习环境模型 2. 在和真实环境交互过程中用环境模型来选择动作。我们将围绕这两大核心,逐步解析代码。

1. 基础设置

首先,我们导入所有必要的库,并设置计算设备。

# 导入包
import numpy as np
from scipy.stats import truncnorm
import gym
import itertools
import torch
import torch.nn as nn
import torch.nn.functional as F
import collections
import matplotlib.pyplot as plt

"""
为了搭建这样一个较为复杂的模型,我们定义模型中每一层的构造。在定义时就必须考虑每一层都是一个集成。
"""
device = torch.device("cuda") if torch.cuda.is_available() else torch.device(
    "cpu")

2. 通过神经网络学习环境模型

PETS 算法的第一步,也是最关键的一步,是学习一个能够准确模拟真实环境动态的模型。

2.1 真实环境

我们选择 Pendulum-v1 作为我们的实验环境,这是一个经典的连续控制任务。

# env
env_name = 'Pendulum-v1'
env = gym.make(env_name)

2.2 要学习的环境模型 (Ensemble Dynamics Model)

在强化学习中,环境本身具有随机性,我们称之为 偶然不确定性 (Aleatoric Uncertainty)。此外,当模型因为见过的相关数据较少而对预测没有信心时,会产生 认知不确定性 (Epistemic Uncertainty)

PETS 通过构建一个概率性的神经网络集成来同时应对这两种不确定性:

概率性输出:模型输出一个高斯分布的均值和方差,而不是一个确定的值,以此来捕捉偶然不确定性。
网络集成 (Ensemble):我们同时训练多个结构相同但参数初始化和训练数据采样不同的神经网络。在预测时,这些网络会给出不同的结果,结果的差异就体现了认知不确定性。

辅助模块:Swish激活函数与权重初始化

在构建主模型之前,我们先定义一些辅助模块,包括 Swish 激活函数和一个特殊的权重初始化函数。

class Swish(nn.Module):
    ''' Swish激活函数 '''
    def __init__(self):
        super(Swish, self).__init__()

    def forward(self, x):
        return x * torch.sigmoid(x)


def init_weights(m):
    ''' 初始化模型权重 '''
    def truncated_normal_init(t, mean=0.0, std=0.01):
        # 用于初始化权重为截断正态分布。
        """
        首先,使用 torch.nn.init.normal_ 将张量 t 初始化为均值为 mean、标准差为 std 的正态分布值。
        然后,通过一个 while 循环,检查张量中的值是否满足条件:值不能小于 mean - 2 * std 或大于 mean + 2 * std。
        这是截断正态分布的核心逻辑,即只保留均值附近的值,超出范围的值会被重新初始化。
        如果有不满足条件的值(即 torch.sum(cond) 不为零),则使用 torch.where 将这些值重新初始化为正态分布的值,直到所有值都满足条件。
        """
        torch.nn.init.normal_(t, mean=mean, std=std)
        while True:
            cond = (t < mean - 2 * std) | (t > mean + 2 * std)
            if not torch.sum(cond):
                break
            t = torch.where(
                cond,
                torch.nn.init.normal_(torch.ones(t.shape, device=device),
                                      mean=mean,
                                      std=std), t)
        return t
    """
    如果 m 是 nn.Linear 或 FCLayer 类型(FCLayer 是一个自定义类,可能继承自 nn.Linear),则对 m.weight(权重)进行截断正态分布初始化。
    权重的初始化标准差为 1 / (2 * np.sqrt(m._input_dim)),其中 m._input_dim 是输入维度。
    对偏置 m.bias 进行初始化,将其所有值设置为 0.0。
    """
    if type(m) == nn.Linear or isinstance(m, FCLayer):
        truncated_normal_init(m.weight, std=1 / (2 * np.sqrt(m._input_dim)))
        m.bias.data.fill_(0.0)


class FCLayer(nn.Module):
    ''' 集成之后的全连接层 '''
    def __init__(self, input_dim, output_dim, ensemble_size, activation):
        """
        input_dim:输入特征的维度。
        output_dim:输出特征的维度。
        ensemble_size:集成的大小,表示有多少个独立的全连接层被集成在一起。这允许模型同时处理多个子任务或多个样本。
        activation:激活函数,用于在前向传播中对输出进行非线性变换。
        """
        super(FCLayer, self).__init__()
        self._input_dim, self._output_dim = input_dim, output_dim
        self.weight = nn.Parameter(
            torch.Tensor(ensemble_size, input_dim, output_dim).to(device))
        # 这意味着每个集成成员都有自己的权重矩阵。
        self._activation = activation
        self.bias = nn.Parameter(
            torch.Tensor(ensemble_size, output_dim).to(device))
        # 每个集成成员都有自己的偏置向量。

    def forward(self, x):
        return self._activation(
            torch.add(torch.bmm(x, self.weight), self.bias[:, None, :]))
        """
        使用 torch.bmm(x, self.weight) 进行批量矩阵乘法(Batch Matrix Multiplication)。
        x 的形状为 (ensemble_size, batch_size, input_dim),
        self.weight 的形状为 (ensemble_size, input_dim, output_dim),
        结果的形状为 (ensemble_size, batch_size, output_dim)。
        使用 torch.add 将偏置 self.bias 添加到矩阵乘法的结果中。
        偏置的形状为 (ensemble_size, output_dim),
        需要通过 self.bias[:, None, :] 扩展为 (ensemble_size, 1, output_dim),以便与结果的形状匹配。
        """
        # 输出张量的形状为 (ensemble_size, batch_size, output_dim),表示每个集成成员的输出。

FCLayer 是我们为集成模型设计的核心层。它的权重和偏置都包含一个 ensemble_size 维度,使得一次前向传播可以同时完成所有集成成员的计算,非常高效。

集成模型 (EnsembleModel) 与其训练管理 (EnsembleDynamicsModel)

现在我们用 FCLayer 搭建完整的集成模型 EnsembleModel。它接收状态和动作的拼接作为输入,输出下一状态变化量(delta state)和奖励的高斯分布参数(均值和方差)。

# 使用高斯分布的概率模型来定义一个集成模型
class EnsembleModel(nn.Module):
    ''' 环境模型集成 '''
    """
    EnsembleModel 继承自 PyTorch 的 nn.Module,是一个神经网络模型。
    state_dim 和 action_dim 分别表示状态和动作的维度。
    ensemble_size 是集成模型中模型的数量,默认为 5。
    learning_rate 是优化器的学习率,默认为 10 −3。
    模型的输出维度是状态维度和奖励维度之和的两倍,因为模型需要输出均值和方差。
    state_dim + 1 表示状态维度加上奖励维度(奖励通常是一个标量)。
    * 2 表示输出均值和方差。
    _max_logvar 和 _min_logvar 是方差的上下限,用于限制方差的范围,避免方差过大或过小。
    _max_logvar 是方差的最大值(对数形式),初始化为 0.5。
    _min_logvar 是方差的最小值(对数形式),初始化为 -10。
    模型由 5 层全连接层(FCLayer)组成,每层的激活函数如下:
    前 4 层使用 Swish 激活函数。
    最后一层使用 nn.Identity,即没有激活函数,直接输出。
    每一层的输入和输出维度如下:
    第一层:输入维度为 state_dim + action_dim(状态和动作拼接),输出维度为 200。
    中间三层:输入和输出维度均为 200。
    最后一层:输入维度为 200,输出维度为 _output_dim(均值和方差的总维度)。
    使用 init_weights 函数对模型的所有参数进行初始化。init_weights 是一个自定义函数,通常用于初始化神经网络的权重和偏置。
    使用 Adam 优化器,学习率由 learning_rate 参数指定。
    """
    def __init__(self,
                 state_dim,
                 action_dim,
                 ensemble_size=5,
                 learning_rate=1e-3):
        super(EnsembleModel, self).__init__()
        # 输出包括均值和方差,因此是状态与奖励维度之和的两倍
        self._output_dim = (state_dim + 1) * 2
        self._max_logvar = nn.Parameter((torch.ones(
            (1, self._output_dim // 2)).float() / 2).to(device),
                                        requires_grad=False)
        self._min_logvar = nn.Parameter((-torch.ones(
            (1, self._output_dim // 2)).float
© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容