Source code for d3rlpy.algos.qlearning.prdc

import dataclasses
from typing import Callable, Optional

import numpy as np
from sklearn.neighbors import NearestNeighbors
from typing_extensions import Self

from ...base import DeviceArg, LearnableConfig, register_learnable
from ...constants import ActionSpace, LoggingStrategy
from ...dataset import ReplayBufferBase
from ...logging import FileAdapterFactory, LoggerAdapterFactory
from ...metrics import EvaluatorProtocol
from ...models.builders import (
    create_continuous_q_function,
    create_deterministic_policy,
)
from ...models.encoders import EncoderFactory, make_encoder_field
from ...models.q_functions import QFunctionFactory, make_q_func_field
from ...optimizers.optimizers import OptimizerFactory, make_optimizer_field
from ...types import Shape
from ..utility import build_scalers_with_transition_picker
from .base import QLearningAlgoBase
from .torch.ddpg_impl import DDPGModules
from .torch.prdc_impl import PRDCImpl

__all__ = ["PRDCConfig", "PRDC"]


[docs]@dataclasses.dataclass() class PRDCConfig(LearnableConfig): r"""Config of PRDC algorithm. PRDC is an simple offline RL algorithm built on top of TD3. PRDC introduces Dataset Constraint (DC)-reguralized policy objective function. .. math:: J(\phi) = \mathbb{E}_{s \sim D} [\lambda Q(s, \pi(s)) - d^\beta_D(s, \pi(s))] where .. math:: \lambda = \frac{\alpha}{\frac{1}{N} \sum_(s_i, a_i) |Q(s_i, a_i)|} and :math:`d^\beta_\mathcal{D}(s,\pi(s))` is the DC loss, defined as .. math:: d^\beta_\mathcal{D}(s,\pi(s)) = \min_{\hat{s}, \hat{a} \sim D} [\| (\beta s) \oplus \pi(s) - (\beta \hat{s}) \oplus \hat{a} \|] References: * `Ran et al., Policy Regularization with Dataset Constraint for Offline Reinforcement Learning Learning. <https://arxiv.org/abs/2306.06569>`_ Args: observation_scaler (d3rlpy.preprocessing.ObservationScaler): Observation preprocessor. action_scaler (d3rlpy.preprocessing.ActionScaler): Action preprocessor. reward_scaler (d3rlpy.preprocessing.RewardScaler): Reward preprocessor. actor_learning_rate (float): Learning rate for a policy function. critic_learning_rate (float): Learning rate for Q functions. actor_optim_factory (d3rlpy.optimizers.OptimizerFactory): Optimizer factory for the actor. critic_optim_factory (d3rlpy.optimizers.OptimizerFactory): Optimizer factory for the critic. actor_encoder_factory (d3rlpy.models.encoders.EncoderFactory): Encoder factory for the actor. critic_encoder_factory (d3rlpy.models.encoders.EncoderFactory): Encoder factory for the critic. q_func_factory (d3rlpy.models.q_functions.QFunctionFactory): Q function factory. batch_size (int): Mini-batch size. gamma (float): Discount factor. tau (float): Target network synchronization coefficiency. n_critics (int): Number of Q functions for ensemble. target_smoothing_sigma (float): Standard deviation for target noise. target_smoothing_clip (float): Clipping range for target noise. alpha (float): :math:`\alpha` value. beta (float): :math:`\beta` value. update_actor_interval (int): Interval to update policy function described as `delayed policy update` in the paper. compile_graph (bool): Flag to enable JIT compilation and CUDAGraph. """ actor_learning_rate: float = 3e-4 critic_learning_rate: float = 3e-4 actor_optim_factory: OptimizerFactory = make_optimizer_field() critic_optim_factory: OptimizerFactory = make_optimizer_field() actor_encoder_factory: EncoderFactory = make_encoder_field() critic_encoder_factory: EncoderFactory = make_encoder_field() q_func_factory: QFunctionFactory = make_q_func_field() batch_size: int = 256 gamma: float = 0.99 tau: float = 0.005 n_critics: int = 2 target_smoothing_sigma: float = 0.2 target_smoothing_clip: float = 0.5 alpha: float = 2.5 beta: float = 2.0 update_actor_interval: int = 2
[docs] def create( self, device: DeviceArg = False, enable_ddp: bool = False ) -> "PRDC": return PRDC(self, device, enable_ddp)
@staticmethod def get_type() -> str: return "prdc"
[docs]class PRDC(QLearningAlgoBase[PRDCImpl, PRDCConfig]): _nbsr = NearestNeighbors(n_neighbors=1, algorithm="auto", n_jobs=-1) def inner_create_impl( self, observation_shape: Shape, action_size: int ) -> None: assert not self._config.compile_graph, ( "PRDC doesn't support compile_graph option because there is " "non-CUDA operation in the update logic." ) policy = create_deterministic_policy( observation_shape, action_size, self._config.actor_encoder_factory, device=self._device, enable_ddp=self._enable_ddp, ) targ_policy = create_deterministic_policy( observation_shape, action_size, self._config.actor_encoder_factory, device=self._device, enable_ddp=self._enable_ddp, ) q_funcs, q_func_forwarder = create_continuous_q_function( observation_shape, action_size, self._config.critic_encoder_factory, self._config.q_func_factory, n_ensembles=self._config.n_critics, device=self._device, enable_ddp=self._enable_ddp, ) targ_q_funcs, targ_q_func_forwarder = create_continuous_q_function( observation_shape, action_size, self._config.critic_encoder_factory, self._config.q_func_factory, n_ensembles=self._config.n_critics, device=self._device, enable_ddp=self._enable_ddp, ) actor_optim = self._config.actor_optim_factory.create( policy.named_modules(), lr=self._config.actor_learning_rate, compiled=self.compiled, ) critic_optim = self._config.critic_optim_factory.create( q_funcs.named_modules(), lr=self._config.critic_learning_rate, compiled=self.compiled, ) modules = DDPGModules( policy=policy, targ_policy=targ_policy, q_funcs=q_funcs, targ_q_funcs=targ_q_funcs, actor_optim=actor_optim, critic_optim=critic_optim, ) self._impl = PRDCImpl( observation_shape=observation_shape, action_size=action_size, modules=modules, q_func_forwarder=q_func_forwarder, targ_q_func_forwarder=targ_q_func_forwarder, gamma=self._config.gamma, tau=self._config.tau, target_smoothing_sigma=self._config.target_smoothing_sigma, target_smoothing_clip=self._config.target_smoothing_clip, alpha=self._config.alpha, beta=self._config.beta, update_actor_interval=self._config.update_actor_interval, compiled=self.compiled, nbsr=self._nbsr, device=self._device, )
[docs] def fit( self, dataset: ReplayBufferBase, n_steps: int, n_steps_per_epoch: int = 10000, experiment_name: Optional[str] = None, with_timestamp: bool = True, logging_steps: int = 500, logging_strategy: LoggingStrategy = LoggingStrategy.EPOCH, logger_adapter: LoggerAdapterFactory = FileAdapterFactory(), show_progress: bool = True, save_interval: int = 1, evaluators: Optional[dict[str, EvaluatorProtocol]] = None, callback: Optional[Callable[[Self, int, int], None]] = None, epoch_callback: Optional[Callable[[Self, int, int], None]] = None, ) -> list[tuple[int, dict[str, float]]]: observation_list = [] action_list = [] for episode in dataset.buffer.episodes: for i in range(episode.transition_count): transition = dataset.transition_picker(episode, i) observation_list.append( np.reshape(transition.observation, (1, -1)) ) action_list.append(np.reshape(transition.action, (1, -1))) observations = np.concatenate(observation_list, axis=0) actions = np.concatenate(action_list, axis=0) build_scalers_with_transition_picker(self, dataset) if self.observation_scaler and self.observation_scaler.built: observations = self.observation_scaler.transform_numpy(observations) if self.action_scaler and self.action_scaler.built: actions = self.action_scaler.transform_numpy(actions) self._nbsr.fit( np.concatenate( [np.multiply(observations, self._config.beta), actions], axis=1, ) ) return super().fit( dataset=dataset, n_steps=n_steps, n_steps_per_epoch=n_steps_per_epoch, logging_steps=logging_steps, logging_strategy=logging_strategy, experiment_name=experiment_name, with_timestamp=with_timestamp, logger_adapter=logger_adapter, show_progress=show_progress, save_interval=save_interval, evaluators=evaluators, callback=callback, epoch_callback=epoch_callback, )
[docs] def get_action_type(self) -> ActionSpace: return ActionSpace.CONTINUOUS
register_learnable(PRDCConfig)