从代码学习深度强学习 – Dyna-Q 算法 PyTorch版

文章目录

前言
Dyna-Q 算法详解

核心思想

算法伪代码

实验环境:悬崖漫步 (The Environment: Cliff Walking)

环境代码实现

代码实现

Dyna-Q Agent
训练主函数

实验与结果分析

可视化代码
结果分析

总结与思考


前言

在强化学习(Reinforcement Learning, RL)的广阔天地中,智能体(Agent)通过与环境(Environment)的交互来学习如何做出最优决策。根据智能体是否学习环境的模型,RL算法可以分为两大类:无模型的强化学习(Model-Free RL)基于模型的强化学习(Model-Based RL)

无模型RL:不尝试理解环境的动态变化,而是直接从与环境交互采样到的数据中学习策略或价值函数。我们熟悉的Q-learning、Sarsa、DQN等都属于这一类。它们通常更通用,但学习效率(即样本复杂度)较低。
基于模型RL:尝试构建一个环境模型,这个模型可以预测状态转移的概率和奖励。一旦有了模型,智能体就可以在“脑海中”进行推演和规划,而无需与真实环境进行昂贵的交互。

Dyna-Q算法正是基于模型RL领域一个经典且基础的算法。它巧妙地将无模型的Q-learning与基于模型的规划(Planning)结合起来,旨在大幅提升学习效率。它的核心思想非常直观:智能体不仅从与真实世界的交互中学习,还会利用这些经验来构建一个内部世界模型。然后,它可以在这个模型中进行“想象”或“排练”,生成大量模拟经验,从而加速学习过程。

本文将从理论和代码两个层面,深入剖明Dyna-Q算法。我们将首先理解其背后的工作原理,然后通过一个经典的“悬崖漫步”环境,一步步用Python(基于NumPy)实现整个算法,并最终通过实验结果来直观感受其学习效率的提升。

:本文作为系列博客的一部分,虽然标题中含有“PyTorch”,但Dyna-Q作为一种表格型方法,使用NumPy实现更为直接清晰。在后续更复杂的算法中,我们将看到PyTorch在处理函数近似、神经网络等方面的强大威力。

完整代码:下载链接

Dyna-Q 算法详解

Dyna-Q算法的框架非常优雅,它在标准的Q-learning流程中加入了“模型学习”和“规划”两个环节。

核心思想

我们可以通过下面这张图来理解Dyna-Q的整个交互和学习循环:

真实交互 (Real Interaction):智能体在真实环境中执行一个动作,获得一个反馈(下一个状态、奖励)。
直接学习 (Direct RL):利用这次真实的经验 (s, a, r, s'),通过Q-learning更新一次Q值表。这是从真实世界中学习。
模型学习 (Model Learning):同样利用这次经验,更新内部的环境模型。对于一个确定的环境,模型学习非常简单,就是记录下“在状态s下执行动作a,会得到奖励r并转移到状态s’”。
规划/模拟 (Planning):这是Dyna-Q的核心。在完成一次真实交互后,算法会额外进行 N 次“规划”步骤。在每一步规划中,它会:

已经访问过的状态-动作对中随机抽取一个。
利用模型来预测这个状态-动作对会产生的奖励和下一个状态。
用这个模拟出的经验,再次通过Q-learning来更新Q值表。

通过这种方式,每一次与真实环境的交互都被“反复利用”,大大提高了数据的利用率和学习效率。

算法伪代码

下面是Dyna-Q算法的具体流程伪代码,它清晰地展示了上述步骤:

初始化 Q(s,a) 和模型 M(s,a)
循环 对每个序列 e = 1 → E:

得到初始状态 s
循环 对每一步 t = 1 → T:

(1) 用 ε-贪婪策略根据 Q 选择当前状态 s 下的动作 a
(2) 得到环境反馈的 r, s’
(3) (直接学习) Q(s, a) ← Q(s, a) + α[r + γ * max_a' Q(s', a') - Q(s, a)]
(4) (模型学习) M(s, a) ← r, s'
(5) (规划) 循环 n 次:

随机选择一个曾经访问过的状态 sm
采取一个曾经在状态 sm 下执行过的动作 am
通过模型得到 rm, s'm ← M(sm, am)
(间接学习) Q(sm, am) ← Q(sm, am) + α[rm + γ * max_a' Q(s'm, a') - Q(sm, am)]

s ← s’

循环结束

循环结束

值得注意的是,伪代码中 M(s, a) ← r, s' 的模型更新方式仅适用于确定性环境,即在状态s执行动作a,结果是唯一确定的。本文的实验环境“悬崖漫步”就是这样一个例子。

实验环境:悬崖漫步 (The Environment: Cliff Walking)

为了验证算法,我们需要一个实验环境。悬崖漫步(Cliff Walking)是一个非常经典的网格世界问题。

目标:智能体从左下角的起点(S)出发,移动到右下角的目标(G)。
规则

每走一步,奖励为 -1。
中间有一片区域是悬崖(Cliff)。如果智能体进入悬崖区域,会获得 -100 的巨大负奖励,并被传送回起点S,当前回合结束。
到达终点G,回合结束。

最优路径:沿着悬崖上方边缘行走,总步数为13步,总回报为-13。

环境代码实现

我们使用Python和NumPy来实现这个环境。

import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
import random
import time


class CliffWalkingEnv:
    """
    悬崖漫步环境类
    
    这是一个经典的强化学习环境,智能体需要从起点走到终点,
    同时避免掉入悬崖。环境是一个矩形网格世界。
    """
    
    def __init__(self, ncol, nrow):
        """
        初始化悬崖漫步环境
        
        参数:
            ncol (int): 网格的列数,标量
            nrow (int): 网格的行数,标量
        """
        self.nrow = nrow  # 网格行数,标量
        self.ncol = ncol  # 网格列数,标量
        self.x = 0        # 当前智能体位置的横坐标(列索引),标量,取值范围[0, ncol-1]
        self.y = self.nrow - 1  # 当前智能体位置的纵坐标(行索引),标量,取值范围[0, nrow-1]
        
    def step(self, action):
        """
        执行一个动作,返回下一个状态、奖励和是否结束的标志
        
        参数:
            action (int): 动作编号,标量,取值范围[0, 3]
                         0: 上移, 1: 下移, 2: 左移, 3: 右移
        
        返回:
            next_state (int): 下一个状态编号,标量,取值范围[0, nrow*ncol-1]
            reward (int): 奖励值,标量,通常为-1或-100
            done (bool): 是否结束当前回合,标量布尔值
        """
        # 定义4种动作对应的坐标变化
        # change是一个4x2的列表,每行代表一个动作的坐标变化[dx, dy]
        # change[0]: 上移 [0, -1], change[1]: 下移 [0, 1]
        # change[2]: 左移 [-1, 0], change[3]: 右移 [1, 0]
        # 坐标系原点(0,0)定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]  # 形状: (4, 2)
        
        # 更新智能体位置,确保不超出边界
        # min和max函数确保坐标在有效范围内
        self.x = min(self.ncol - 1, max(0, self.x + change[action][0]))  # 新的x坐标,标量
        self.y = min(self.nrow - 1, max(0, self.y + change[action][1]))  # 新的y坐标,标量
        
        # 将二维坐标转换为一维状态编号
        # 状态编号 = 行索引 * 列数 + 列索引
        next_state = self.y * self.ncol + self.x  # 下一个状态的编号,标量
        
        # 默认奖励为-1(每步的代价)
        reward = -1  # 奖励值,标量
        done = False  # 是否结束标志,标量布尔值
        
        # 检查是否到达最后一行(悬崖行或目标行)且不在起点
        if self.y == self.nrow - 1 and self.x > 0:
            done = True  # 回合结束
            if self.x != self.ncol - 1:  # 如果不在目标位置(最右下角)
                reward = -100  # 掉入悬崖,给予大的负奖励
                
        return next_state, reward, done
    
    def reset(self):
        """
        重置环境到初始状态
        
        返回:
            initial_state (int): 初始状态编号,标量,值为0
        """
        # 将智能体位置重置到起点:左下角 (0, nrow-1)
        self.x = 0  # 重置x坐标为0,标量
        self.y = self.nrow - 1  # 重置y坐标为最后一行,标量
        
        # 返回初始状态的编号
        return self.y * self.ncol + self.x  # 初始状态编号,标量

代码实现

现在我们来实现Dyna-Q算法的核心部分,包括智能体类和训练主循环。

Dyna-Q Agent

这个类封装了算法的所有逻辑,包括Q-table、模型、动作选择和更新规则。

import numpy as np
import random


class DynaQ:
    """
    Dyna-Q算法实现类
    
    Dyna-Q算法结合了强化学习的Q-learning算法和基于模型的规划方法,
    通过学习环境模型来提高学习效率。
    """
    
    def __init__(self, 
                 ncol,        # 环境列数 (int)
                 nrow,        # 环境行数 (int)
                 epsilon,     # epsilon贪婪策略参数 (float, 0-1之间)
                 alpha,       # 学习率 (float, 0-1之间)
                 gamma,       # 折扣因子 (float, 0-1之间)
                 n_planning,  # 规划步数 (int)
                 n_action=4): # 动作空间大小 (int, 默认为4)
        """
        初始化Dyna-Q算法参数
        
        参数说明:
        - ncol: 环境网格的列数
        - nrow: 环境网格的行数  
        - epsilon: epsilon-贪婪策略中的探索概率
        - alpha: Q-learning的学习率
        - gamma: 未来奖励的折扣因子
        - n_planning: 每次真实经验后执行的规划次数
        - n_action: 可选动作的数量(通常为4: 上下左右)
        """
        # Q值表格,维度: [状态空间大小, 动作空间大小] = [nrow*ncol, n_action]
        self.Q_table = np.zeros([nrow * ncol, n_action])  
        
        # 动作空间大小,维度: (int)
        self.n_action = n_action  
        
        # 学习率,维度: (float)
        self.alpha = alpha  
        
        # 折扣因子,维度: (float)
        self.gamma = gamma  
        
        # epsilon贪婪策略参数,维度: (float)
        self.epsilon = epsilon  
        
        # 规划步数,维度: (int)
        self.n_planning = n_planning  
        
        # 环境模型字典,存储(状态,动作)到(奖励,下一状态)的映射
        # 键: (state, action) tuple,值: (reward, next_state) tuple
        # 维度: dict{(int, int): (float, int)}
        self.model = dict()  

    def take_action(self, state):
        """
        根据epsilon-贪婪策略选择动作
        
        参数:
        - state: 当前状态,维度: (int)
        
        返回:
        - action: 选择的动作,维度: (int)
        """
        # 生成0-1之间的随机数进行探索决策
        if np.random.random() < self.epsilon:
            # 探索:随机选择动作
            action = np.random.randint(self.n_action)
        else:
            # 利用:选择Q值最大的动作
            # self.Q_table[state]的维度: (n_action,)
            action = np.argmax(self.Q_table[state])
        return action

    def q_learning(self, s0, a0, r, s1):
        """
        执行Q-learning更新
        
        参数:
        - s0: 当前状态,维度: (int)
        - a0: 当前动作,维度: (int) 
        - r: 获得的奖励,维度: (float)
        - s1: 下一状态,维度: (int)
        """
        # 计算TD误差:r + γ*max(Q(s',a')) - Q(s,a)
        # self.Q_table[s1]的维度: (n_action,)
        # self.Q_table[s1].max()的维度: (float)
        td_error = r + self.gamma * self.Q_table[s1].max() - self.Q_table[s0, a0]
        
        # 更新Q值:Q(s,a) = Q(s,a) + α * TD_error
        # self.Q_table[s0, a0]的维度: (float)
        self.Q_table[s0, a0] += self.alpha * td_error

    def update(self, s0, a0, r, s1):
        """
        执行Dyna-Q的完整更新过程
        
        参数:
        - s0: 当前状态,维度: (int)
        - a0: 当前动作,维度: (int)
        - r: 获得的奖励,维度: (float)  
        - s1: 下一状态,维度: (int)
        """
        # 步骤1:使用真实经验进行Q-learning更新
        self.q_learning(s0, a0, r, s1)
        
        # 步骤2:将经验添加到环境模型中
        # 键(s0, a0)的维度: (int, int)
        # 值(r, s1)的维度: (float, int)
        self.model[(s0, a0)] = r, s1  
        
        # 步骤3:执行Q-planning,利用模型进行额外的学习
        for _ in range(self.n_planning):
            # 从模型中随机选择一个曾经经历过的状态-动作对
            # random.choice返回: ((state, action), (reward, next_state))
            # (s, a)的维度: (int, int)
            # (r, s_)的维度: (float, int)
            (s, a), (r, s_) = random.choice(list(self.model.items()))
            
            # 使用模型数据进行Q-learning更新
            self.q_learning(s, a, r, s_)

训练主函数

此函数负责设置超参数,并循环执行多个回合(episodes)来训练智能体。

import numpy as np
from tqdm import tqdm


def DynaQ_CliffWalking(n_planning):
    """
    使用Dyna-Q算法训练悬崖漫步环境的函数
    
    参数:
    - n_planning: 规划步数,每次真实交互后进行的模型学习次数,维度: (int)
    
    返回:
    - return_list: 每个episode的总回报列表,维度: (list[float]),长度为num_episodes
    """
    
    # 环境设置参数
    ncol = 12  # 环境网格列数,维度: (int)
    nrow = 4   # 环境网格行数,维度: (int)
    
    # 创建悬崖漫步环境实例
    # env对象提供reset(), step()等方法
    env = CliffWalkingEnv(ncol, nrow)
    
    # Dyna-Q算法超参数设置
    epsilon = 0.01  # epsilon贪婪策略探索概率,维度: (float)
    alpha = 0.1     # Q-learning学习率,维度: (float) 
    gamma = 0.9     # 折扣因子,维度: (float)
    
    # 创建Dyna-Q智能体实例
    # agent对象提供take_action(), update()等方法
    agent = DynaQ(ncol, nrow, epsilon, alpha, gamma, n_planning)
    
    # 训练参数设置
    num_episodes = 300  # 总训练episode数,维度: (int)
    
    # 记录每个episode总回报的列表,维度: (list[float]),最终长度为num_episodes
    return_list = []  
    
    # 将训练过程分为10个阶段,每个阶段显示一个进度条
    for i in range(10):  # 外层循环:进度条阶段索引,维度: (int),范围[0,9]
        
        # 创建tqdm进度条,每个进度条显示num_episodes/10个episode的进度
        # total: 当前进度条的总步数,维度: (int)
        with tqdm(total=int(num_episodes / 10),
                  desc='Iteration %d' % i) as pbar:
            
            # 内层循环:当前进度条阶段的episode训练
            # i_episode: 当前阶段内的episode索引,维度: (int),范围[0, num_episodes/10-1]
            for i_episode in range(int(num_episodes / 10)):  
                
                # 当前episode的累计回报,维度: (float)
                episode_return = 0
                
                # 重置环境,获取初始状态
                # state: 当前状态(网格位置编码),维度: (int),范围[0, ncol*nrow-1]
                state = env.reset()
                
                # episode结束标志,维度: (bool)
                done = False
                
                # 单个episode的交互循环
                while not done:
                    # 智能体根据当前状态选择动作
                    # action: 选择的动作,维度: (int),范围[0, n_action-1]
                    action = agent.take_action(state)
                    
                    # 环境执行动作,返回下一状态、奖励和结束标志
                    # next_state: 下一状态,维度: (int),范围[0, ncol*nrow-1]
                    # reward: 即时奖励,维度: (float)
                    # done: 是否结束,维度: (bool)
                    next_state, reward, done = env.step(action)
                    
                    # 累加当前步的奖励到episode总回报(不进行折扣)
                    episode_return += reward
                    
                    # 使用经验更新智能体(Q-learning + 模型学习 + 规划)
                    agent.update(state, action, reward, next_state)
                    
                    # 状态转移:将下一状态设为当前状态
                    state = next_state
                
                # 将当前episode的总回报添加到记录列表
                return_list.append(episode_return)
                
                # 每10个episode输出一次平均性能
                if (i_episode + 1) % 10 == 0:
                    # 计算当前episode在整个训练中的绝对编号
                    # 绝对episode编号,维度: (int)
                    current_episode = num_episodes / 10 * i + i_episode + 1
                    
                    # 计算最近10个episode的平均回报
                    # recent_avg_return: 最近10个episode平均回报,维度: (float)
                    recent_avg_return = np.mean(return_list[-10:])
                    
                    # 更新进度条显示信息
                    pbar.set_postfix({
            
                        'episode': '%d' % current_episode,      # 当前episode编号
                        'return': '%.3f' % recent_avg_return    # 最近平均回报
                    })
                
                # 进度条前进一步
                pbar.update(1)
    
    # 返回所有episode的回报记录列表
    # return_list维度: (list[float]),长度为num_episodes
    return return_list

实验与结果分析

我们将对比不同规划步数(n_planning)对算法性能的影响。我们设置了三组实验,规划步数分别为0, 2, 20。其中,n_planning=0 的情况就完全退化为了标准的Q-learning算法。

可视化代码

import numpy as np
import random
import time
import matplotlib.pyplot as plt


# 设置随机种子以确保实验可重复性
np.random.seed(0)    # 设置numpy随机种子,维度: (int)
random.seed(0)       # 设置python内置random模块随机种子,维度: (int)

# 定义不同规划步数的实验组
# n_planning_list: 规划步数列表,维度: (list[int]),长度为3
n_planning_list = [0, 2, 20]

# 对每个规划步数进行实验
for n_planning in n_planning_list:  # n_planning: 当前实验的规划步数,维度: (int)
    
    # 输出当前实验的规划步数信息
    print('Q-planning步数为:%d' % n_planning)
    
    # 暂停 0.5秒,便于观察输出信息
    # 延迟时间,维度: (float),单位:秒
    time.sleep(0.5)
    
    # 运行DynaQ算法训练,获取每个episode的回报
    # return_list: 当前规划步数下所有episode的回报列表,维度: (list[float]),长度为num_episodes
    return_list = DynaQ_CliffWalking(n_planning)
    
    # 生成对应的episode编号列表,用于绘图的x轴
    # episodes_list: episode编号列表,维度: (list[int]),长度与return_list相同
    episodes_list = list(range(len(return_list)))
    
    # 绘制当前规划步数的学习曲线
    plt.plot(episodes_list,          # x轴:episode编号,维度: (list[int])
             return_list,            # y轴:对应的回报值,维度: (list[float])
             label=str(n_planning) + ' planning steps')  # 图例标签,维度: (str)

# 设置图形显示属性
plt.legend()                        # 显示图例,标识不同规划步数的曲线
plt.xlabel('Episodes')              # 设置x轴标签:训练episode数
plt.ylabel('Returns')               # 设置y轴标签:episode回报值
plt.title('Dyna-Q on {}'.format('Cliff Walking'))  # 设置图形标题:Dyna-Q在悬崖漫步环境上的表现

# 显示绘制的对比图形
# 图形将显示三条曲线,分别对应0、2、20步规划的学习表现
plt.show()

结果分析

从上图的训练曲线中,我们可以清晰地看到:

n_planning = 0 (蓝色曲线):这是标准的Q-learning。它需要最多的回合数(大约200个episodes)才能收敛到最优策略附近。
n_planning = 2 (橙色曲线):每次真实交互后,进行2次规划。学习速度明显加快,大约在100个episodes后就基本收敛。
n_planning = 20 (绿色曲线):每次真实交互后,进行20次规划。学习速度最快,仅用不到50个episodes就收敛到了最优策略。

这个结果有力地证明了Dyna-Q算法的有效性。通过增加规划步数,智能体能够更充分地利用已有经验,从而显著降低算法的样本复杂度,即用更少的真实交互次数达到收敛。

当然,这并非在所有环境中都成立。本次实验的“悬崖漫步”环境是确定性的,这意味着我们的模型是完美的。在这样的理想情况下,规划步数越多,收敛速度就越快。


总结与思考

Dyna-Q算法为我们打开了基于模型强化学习的大门。它通过一个简单的模型和一个额外的规划步骤,实现了对真实经验的“再利用”,从而在样本效率上超越了传统的无模型方法。

然而,Dyna-Q的成功依赖于一个关键假设:我们能学到一个足够准确的环境模型。在本次的确定性网格世界中,这个假设是成立的。但如果环境变得更复杂,比如:

状态是连续的:如何表示和学习状态转移?
状态转移是随机的(Stochastic):在状态s执行动作a,可能会有多种可能的s’,每个都有一定概率。这时模型需要学习一个概率分布,这会引入模型误差。

在这些复杂场景下,学习一个准确的模型本身就成了一个巨大的挑战。如果模型不准确,基于这个错误模型进行的“规划”反而可能会误导智能体,使其学到次优甚至糟糕的策略。这正是基于模型强化学习需要面对的核心挑战,也催生了更多如Dyna-2、世界模型(World Models)等更先进的算法。

总而言之,Dyna-Q是一个承上启下的重要算法。它不仅是对Q-learning的有效改进,也为我们理解更复杂的模型与规划相结合的强化学习思想奠定了坚实的基础。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容