当前位置: 首页 > news >正文

强化学习 - Trust Region Policy Optimization (TRPO)

什么是机器学习

Trust Region Policy OptimizationTRPO)是一种策略梯度方法,用于解决强化学习问题。TRPO旨在通过限制策略更新的大小,提高训练的稳定性。这样可以防止在参数空间中迅速迭代导致过大的更新,从而保持策略在相邻状态上的相似性。

以下是一个使用 Python 和 TensorFlow/Keras 实现简单的 TRPO 的示例。在这个例子中,我们将使用 OpenAI GymCartPole 环境。

import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam
import gym# 定义TRPO Agent
class TRPOAgent:def __init__(self, state_size, action_size):self.state_size = state_sizeself.action_size = action_sizeself.gamma = 0.99  # 折扣因子self.lmbda = 0.95  # GAE(Generalized Advantage Estimation)的参数self.learning_rate = 0.001self.clip_ratio = 0.2# 构建演员(Actor)网络self.actor = self.build_actor()def build_actor(self):state_input = Input(shape=(self.state_size,))dense1 = Dense(64, activation='tanh')(state_input)dense2 = Dense(64, activation='tanh')(dense1)output = Dense(self.action_size, activation='softmax')(dense2)model = Model(inputs=state_input, outputs=output)model.compile(loss=self.trpo_loss, optimizer=Adam(lr=self.learning_rate))return modeldef get_action(self, state):state = np.reshape(state, [1, self.state_size])action_prob = self.actor.predict(state)[0]action = np.random.choice(self.action_size, p=action_prob)return action, action_probdef trpo_loss(self, y_true, y_pred):advantage = tf.placeholder(tf.float32, shape=(None, 1))old_policy_prob = tf.placeholder(tf.float32, shape=(None, self.action_size))new_policy_prob = y_predratio = tf.exp(tf.log(new_policy_prob + 1e-10) - tf.log(old_policy_prob + 1e-10))surrogate_loss = -tf.reduce_mean(ratio * advantage)kl_divergence = tf.reduce_sum(old_policy_prob * (tf.log(old_policy_prob + 1e-10) - tf.log(new_policy_prob + 1e-10)), axis=1)mean_kl_divergence = tf.reduce_mean(kl_divergence)grads = tf.gradients(surrogate_loss, self.actor.trainable_weights)grads_flatten = tf.concat([tf.reshape(g, [-1]) for g in grads], axis=0)fisher_vector_product = tf.gradients(mean_kl_divergence, self.actor.trainable_weights)fisher_vector_product_flatten = tf.concat([tf.reshape(fvp, [-1]) for fvp in fisher_vector_product], axis=0)fisher_vector_product_gradient = tf.reduce_sum(grads_flatten * fisher_vector_product_flatten)conjugate_gradient_step_direction = tf.placeholder(tf.float32, shape=(None,))conjugate_gradient_step = tf.gradients(fisher_vector_product_gradient, self.actor.trainable_weights)conjugate_gradient_step = tf.concat([tf.reshape(cgs, [-1]) for cgs in conjugate_gradient_step], axis=0)conjugate_gradient_step = conjugate_gradient_step / (conjugate_gradient_step @ conjugate_gradient_step_direction) * conjugate_gradient_step_directionflat_actor_gradients = tf.placeholder(tf.float32, shape=(None,))flat_fisher_vector_product = tf.placeholder(tf.float32, shape=(None,))kl_divergence_surrogate_loss_gradients = tf.gradients(mean_kl_divergence, self.actor.trainable_weights, -flat_actor_gradients)kl_divergence_surrogate_loss_gradients_flatten = tf.concat([tf.reshape(kldsg, [-1]) for kldsg in kl_divergence_surrogate_loss_gradients], axis=0)fisher_vector_product_surrogate_loss_gradients = tf.gradients(fisher_vector_product_gradient, self.actor.trainable_weights, flat_fisher_vector_product)fisher_vector_product_surrogate_loss_gradients_flatten = tf.concat([tf.reshape(fvpslg, [-1]) for fvpslg in fisher_vector_product_surrogate_loss_gradients], axis=0)grads_surrogate_loss_gradients = kl_divergence_surrogate_loss_gradients_flatten + fisher_vector_product_surrogate_loss_gradients_flattenconjugate_gradient_step_direction_result = np.zeros_like(flat_actor_gradients.shape)for _ in range(10):  # 通过共轭梯度法求解方程组feed_dict = {advantage: np.zeros((1, 1)),old_policy_prob: np.zeros((1, self.action_size)),flat_actor_gradients: np.zeros_like(conjugate_gradient_step_direction_result),flat_fisher_vector_product: conjugate_gradient_step_direction_result,}for i, placeholder in enumerate(self.actor._feed_input_tensors):feed_dict[placeholder] = np.zeros((1, *self.actor._feed_input_shapes[i][1:]))conjugate_gradient_step_direction_result = fisher_vector_product_gradients_result = grads_surrogate_loss_gradients_result = np.zeros_like(conjugate_gradient_step_direction_result)for _ in range(10):  # 通过逐步迭代求解共轭梯度方向feed_dict[conjugate_gradient_step_direction] = conjugate_gradient_step_direction_resultfeed_dict[flat_actor_gradients] = conjugate_gradient_step_direction_resultcgsdr, fvpgdr, gslgdr = tf.keras.backend.get_session().run([conjugate_gradient_step_direction, fisher_vector_product_gradients, grads_surrogate_loss_gradients],feed_dict=feed_dict)alpha = conjugate_gradient_step_direction_result @ conjugate_gradient_step / (cgsdr @ fisher_vector_product_gradients)conjugate_gradient_step_direction_result += alpha * cgsdrfisher_vector_product_gradients_result += alpha * fvpgdrgrads_surrogate_loss_gradients_result += alpha * gslgdrresidual = fisher_vector_product_gradients_result - grads_surrogate_loss_gradients_resultbeta = residual @ fisher_vector_product_gradients_result / (fisher_vector_product_gradients_result @ fisher_vector_product_gradients_result)conjugate_gradient_step_direction_result -= beta * residualreturn surrogate_lossdef compute_advantages(self, rewards, values, dones):advantages = np.zeros_like(rewards, dtype=np.float32)running_add = 0for t in reversed(range(len(rewards))):running_add = running_add * self.gamma * (1 - dones[t]) + rewards[t]advantages[t] = running_add - values[t]running_add = values[t] + advantages[t] * self.gamma * self.lmbdareturn advantagesdef train(self, states, actions, rewards, values, dones):states = np.vstack(states)actions = np.vstack(actions)rewards = np.vstack(rewards)values = np.vstack(values)dones = np.vstack(dones)old_policy_prob = self.actor.predict(states)advantages = self.compute_advantages(rewards, values, dones)# 计算共轭梯度方向feed_dict = {self.actor.input: states,self.actor.output: old_policy_prob,self.actor.sample_weights[0]: advantages,}flat_actor_gradients_result, flat_fisher_vector_product_result = tf.keras.backend.get_session().run([tf.concat([tf.reshape(grad, [-1]) for grad in tf.gradients(self.trpo_loss, self.actor.trainable_weights)], axis=0),tf.concat([tf.reshape(grad, [-1]) for grad in tf.gradients(tf.reduce_sum(tf.gradients(self.trpo_loss, self.actor.trainable_weights) @tf.gradients(self.trpo_loss, self.actor.trainable_weights), axis=1), self.actor.trainable_weights)], axis=0)],feed_dict=feed_dict)# 计算步长conjugate_gradient_step_direction_result = np.zeros_like(flat_actor_gradients_result)for _ in range(10):  # 通过共轭梯度法求解方程组feed_dict = {self.actor.input: states,self.actor.output: old_policy_prob,self.actor.sample_weights[0]: advantages,tf.placeholder(tf.float32, shape=(None,)): conjugate_gradient_step_direction_result,tf.placeholder(tf.float32, shape=(None,)): flat_fisher_vector_product_result,}flat_actor_gradients_result, flat_fisher_vector_product_result = tf.keras.backend.get_session().run([tf.concat([tf.reshape(grad, [-1]) for grad in tf.gradients(self.trpo_loss, self.actor.trainable_weights)], axis=0),tf.concat([tf.reshape(grad, [-1]) for grad in tf.gradients(tf.reduce_sum(tf.gradients(self.trpo_loss, self.actor.trainable_weights) @tf.gradients(self.trpo_loss, self.actor.trainable_weights), axis=1), self.actor.trainable_weights)], axis=0)],feed_dict=feed_dict)alpha = conjugate_gradient_step_direction_result @ flat_actor_gradients_result / (conjugate_gradient_step_direction_result @ flat_fisher_vector_product_result)conjugate_gradient_step_direction_result += alpha * flat_actor_gradients_result# 利用共轭梯度方向更新参数feed_dict = {self.actor.input: states,self.actor.output: old_policy_prob,self.actor.sample_weights[0]: advantages,tf.placeholder(tf.float32, shape=(None,)): conjugate_gradient_step_direction_result,}new_actor_parameters = tf.keras.backend.get_session().run([tf.concat([tf.reshape(param - alpha * grad, [-1]) for param, grad in zip(self.actor.trainable_weights, tf.gradients(self.trpo_loss, self.actor.trainable_weights))], axis=0)for alpha in [1e-3, 1e-2, 1e-1, 1e0, 1e1, 1e2]],feed_dict=feed_dict)new_actor_parameters = [np.reshape(new_param, param.shape) for new_param, param in zip(new_actor_parameters[0], self.actor.get_weights())]self.actor.set_weights(new_actor_parameters)# 初始化环境和Agent
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = TRPOAgent(state_size, action_size)# 训练TRPO Agent
num_episodes = 500
for episode in range(num_episodes):state = env.reset()total_reward = 0states, actions, rewards, values, dones = [], [], [], [], []for time in range(500):  # 限制每个episode的步数,防止无限循环# env.render()  # 如果想可视化训练过程,可以取消注释此行action, action_prob = agent.get_action(state)next_state, reward, done, _ = env.step(action)total_reward += rewardvalue = agent.actor.predict(np.reshape(state, [1, state_size]))[0]states.append(state)actions.append(action)rewards.append(reward)values.append(value)dones.append(done)state = next_stateif done:print("Episode: {}, Total Reward: {}".format(episode + 1, total_reward))agent.train(states, actions, rewards, values, dones)break# 关闭环境
env.close()

在这个例子中,我们定义了一个简单的TRPO Agent,包括演员(Actor)神经网络。在训练过程中,我们使用TRPO算法来更新演员网络的参数。请注意,TRPO算法的实现可能因问题的复杂性而有所不同,可能需要更多的技术和调整,如归一化奖励、使用更复杂的神经网络结构等。

相关文章:

  • 2、互信息(Mutual Information)
  • CSS探索浏览器兼容性
  • 【C++干货铺】C++中的IO流和文件操作
  • Java基础知识-异常
  • Delphi 7 IdHTTP POST 中文乱码得解决
  • k8s实例
  • 【Linux 基础】常用基础指令(上)
  • Impala依赖组件的客户端源码下载
  • 3d gaussian splatting笔记(paper部分翻译)
  • qt的main函数(程序启动入口)
  • 大模型+自动驾驶
  • 两个近期的计算机领域国际学术会议(软件工程、计算机安全):欢迎投稿
  • 第5章 (python深度学习——波斯美女)
  • 市场复盘总结 20240122
  • day23 其他事件(页面加载事件、页面滚动事件)
  • 【译】JS基础算法脚本:字符串结尾
  • Angular 响应式表单 基础例子
  • axios请求、和返回数据拦截,统一请求报错提示_012
  • CSS魔法堂:Absolute Positioning就这个样
  • eclipse(luna)创建web工程
  • Hibernate最全面试题
  • JAVA SE 6 GC调优笔记
  • MQ框架的比较
  • vuex 学习笔记 01
  • 不上全站https的网站你们就等着被恶心死吧
  • 第2章 网络文档
  • 分享一份非常强势的Android面试题
  • 高性能JavaScript阅读简记(三)
  • 工作手记之html2canvas使用概述
  • 排序算法学习笔记
  • 入门到放弃node系列之Hello Word篇
  • 手写双向链表LinkedList的几个常用功能
  • 小程序、APP Store 需要的 SSL 证书是个什么东西?
  • 正则与JS中的正则
  • Linux权限管理(week1_day5)--技术流ken
  • 如何用纯 CSS 创作一个菱形 loader 动画
  • (ibm)Java 语言的 XPath API
  • (篇九)MySQL常用内置函数
  • (十一)手动添加用户和文件的特殊权限
  • (五)关系数据库标准语言SQL
  • (原創) 如何動態建立二維陣列(多維陣列)? (.NET) (C#)
  • (转)scrum常见工具列表
  • .net core MVC 通过 Filters 过滤器拦截请求及响应内容
  • .net开源工作流引擎ccflow表单数据返回值Pop分组模式和表格模式对比
  • /var/spool/postfix/maildrop 下有大量文件
  • [ Linux ] Linux信号概述 信号的产生
  • [ vulhub漏洞复现篇 ] Hadoop-yarn-RPC 未授权访问漏洞复现
  • []error LNK2001: unresolved external symbol _m
  • [20160807][系统设计的三次迭代]
  • [AX]AX2012 R2 出差申请和支出报告
  • [BT]BUUCTF刷题第8天(3.26)
  • [bzoj1006]: [HNOI2008]神奇的国度(最大势算法)
  • [C#] 基于 yield 语句的迭代器逻辑懒执行
  • [C语言]一维数组二维数组的大小
  • [HDU] 1054 Strategic Game 入门树形DP