Source code for d3rlpy.algos.awr

from typing import Any, Dict, List, Optional, Sequence, Union

import numpy as np

from ..argument_utility import (
    ActionScalerArg,
    EncoderArg,
    RewardScalerArg,
    ScalerArg,
    UseGPUArg,
    check_encoder,
    check_use_gpu,
)
from ..constants import IMPL_NOT_INITIALIZED_ERROR, ActionSpace
from ..dataset import TransitionMiniBatch, compute_lambda_return
from ..gpu import Device
from ..models.encoders import EncoderFactory
from ..models.optimizers import OptimizerFactory, SGDFactory
from .base import AlgoBase
from .torch.awr_impl import AWRBaseImpl, AWRImpl, DiscreteAWRImpl


class _AWRBase(AlgoBase):

    _actor_learning_rate: float
    _critic_learning_rate: float
    _actor_optim_factory: OptimizerFactory
    _critic_optim_factory: OptimizerFactory
    _actor_encoder_factory: EncoderFactory
    _critic_encoder_factory: EncoderFactory
    _batch_size_per_update: int
    _n_actor_updates: int
    _n_critic_updates: int
    _lam: float
    _beta: float
    _max_weight: float
    _use_gpu: Optional[Device]
    _impl: Optional[AWRBaseImpl]

    def __init__(
        self,
        *,
        actor_learning_rate: float = 5e-5,
        critic_learning_rate: float = 1e-4,
        actor_optim_factory: OptimizerFactory = SGDFactory(momentum=0.9),
        critic_optim_factory: OptimizerFactory = SGDFactory(momentum=0.9),
        actor_encoder_factory: EncoderArg = "default",
        critic_encoder_factory: EncoderArg = "default",
        batch_size: int = 2048,
        n_frames: int = 1,
        gamma: float = 0.99,
        batch_size_per_update: int = 256,
        n_actor_updates: int = 1000,
        n_critic_updates: int = 200,
        lam: float = 0.95,
        beta: float = 1.0,
        max_weight: float = 20.0,
        use_gpu: UseGPUArg = False,
        scaler: ScalerArg = None,
        action_scaler: ActionScalerArg = None,
        reward_scaler: RewardScalerArg = None,
        impl: Optional[AWRImpl] = None,
        **kwargs: Any
    ):
        # batch_size in AWR has different semantic from Q learning algorithms.
        super().__init__(
            batch_size=batch_size,
            n_frames=n_frames,
            n_steps=1,
            gamma=gamma,
            scaler=scaler,
            action_scaler=action_scaler,
            reward_scaler=reward_scaler,
            kwargs=kwargs,
        )
        self._actor_learning_rate = actor_learning_rate
        self._critic_learning_rate = critic_learning_rate
        self._actor_optim_factory = actor_optim_factory
        self._critic_optim_factory = critic_optim_factory
        self._actor_encoder_factory = check_encoder(actor_encoder_factory)
        self._critic_encoder_factory = check_encoder(critic_encoder_factory)
        self._batch_size_per_update = batch_size_per_update
        self._n_actor_updates = n_actor_updates
        self._n_critic_updates = n_critic_updates
        self._lam = lam
        self._beta = beta
        self._max_weight = max_weight
        self._use_gpu = check_use_gpu(use_gpu)
        self._impl = impl

    def _compute_lambda_returns(self, batch: TransitionMiniBatch) -> np.ndarray:
        # compute TD(lambda)
        lambda_returns = []
        for transition in batch.transitions:
            lambda_return = compute_lambda_return(
                transition=transition,
                algo=self,
                gamma=self._gamma,
                lam=self._lam,
                n_frames=self._n_frames,
            )
            lambda_returns.append(lambda_return)
        return np.array(lambda_returns).reshape((-1, 1))

    def _compute_advantages(
        self, returns: np.ndarray, batch: TransitionMiniBatch
    ) -> np.ndarray:
        baselines = self.predict_value(batch.observations).reshape((-1, 1))
        advantages = returns - baselines
        adv_mean = np.mean(advantages)
        adv_std = np.std(advantages)
        return (advantages - adv_mean) / (adv_std + 1e-5)

    def _compute_clipped_weights(self, advantages: np.ndarray) -> np.ndarray:
        weights = np.exp(advantages / self._beta)
        return np.minimum(weights, self._max_weight)

    def predict_value(  # pylint: disable=signature-differs
        self, x: Union[np.ndarray, List[Any]], *args: Any, **kwargs: Any
    ) -> np.ndarray:
        """Returns predicted state values.

        Args:
            x: observations.

        Returns:
            predicted state values.

        """
        assert self._impl is not None, IMPL_NOT_INITIALIZED_ERROR
        return self._impl.predict_value(x)

    def _update(self, batch: TransitionMiniBatch) -> Dict[str, float]:
        assert self._impl is not None, IMPL_NOT_INITIALIZED_ERROR

        metrics = {}

        # compute lmabda return
        lambda_returns = self._compute_lambda_returns(batch)

        # calcuate advantage
        advantages = self._compute_advantages(lambda_returns, batch)

        # compute weights
        clipped_weights = self._compute_clipped_weights(advantages)
        metrics.update({"weights": np.mean(clipped_weights)})

        n_steps_per_batch = self.batch_size // self._batch_size_per_update

        # update critic
        critic_loss_history = []
        for _ in range(self._n_critic_updates // n_steps_per_batch):
            for j in range(n_steps_per_batch):
                head_index = j * self._batch_size_per_update
                tail_index = head_index + self._batch_size_per_update
                observations = batch.observations[head_index:tail_index]
                returns = lambda_returns[head_index:tail_index]
                critic_loss = self._impl.update_critic(observations, returns)
                critic_loss_history.append(critic_loss)
        critic_loss_mean = np.mean(critic_loss_history)
        metrics.update({"critic_loss": critic_loss_mean})

        # update actor
        actor_loss_history = []
        for _ in range(self._n_actor_updates // n_steps_per_batch):
            for j in range(n_steps_per_batch):
                head_index = j * self._batch_size_per_update
                tail_index = head_index + self._batch_size_per_update
                observations = batch.observations[head_index:tail_index]
                actions = batch.actions[head_index:tail_index]
                weights = clipped_weights[head_index:tail_index]
                actor_loss = self._impl.update_actor(
                    observations, actions, weights
                )
                actor_loss_history.append(actor_loss)
        actor_loss_mean = np.mean(actor_loss_history)
        metrics.update({"actor_loss": actor_loss_mean})

        return metrics


[docs]class AWR(_AWRBase): r"""Advantage-Weighted Regression algorithm. AWR is an actor-critic algorithm that trains via supervised regression way, and has shown strong performance in online and offline settings. The value function is trained as a supervised regression problem. .. math:: L(\theta) = \mathbb{E}_{s_t, R_t \sim D} [(R_t - V(s_t|\theta))^2] where :math:`R_t` is approximated using TD(:math:`\lambda`) to mitigate high variance issue. The policy function is also trained as a supervised regression problem. .. math:: J(\phi) = \mathbb{E}_{s_t, a_t, R_t \sim D} [\log \pi(a_t|s_t, \phi) \exp (\frac{1}{B} (R_t - V(s_t|\theta)))] where :math:`B` is a constant factor. References: * `Peng et al., Advantage-Weighted Regression: Simple and Scalable Off-Policy Reinforcement Learning <https://arxiv.org/abs/1910.00177>`_ Args: actor_learning_rate (float): learning rate for policy function. critic_learning_rate (float): learning rate for value function. actor_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the actor. critic_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the critic. actor_encoder_factory (d3rlpy.models.encoders.EncoderFactory or str): encoder factory for the actor. critic_encoder_factory (d3rlpy.models.encoders.EncoderFactory or str): encoder factory for the critic. batch_size (int): batch size per iteration. n_frames (int): the number of frames to stack for image observation. gamma (float): discount factor. batch_size_per_update (int): mini-batch size. n_actor_updates (int): actor gradient steps per iteration. n_critic_updates (int): critic gradient steps per iteration. lam (float): :math:`\lambda` for TD(:math:`\lambda`). beta (float): :math:`B` for weight scale. max_weight (float): :math:`w_{\text{max}}` for weight clipping. use_gpu (bool, int or d3rlpy.gpu.Device): flag to use GPU, device ID or device. scaler (d3rlpy.preprocessing.Scaler or str): preprocessor. The available options are `['pixel', 'min_max', 'standard']`. action_scaler (d3rlpy.preprocessing.ActionScaler or str): action preprocessor. The available options are ``['min_max']``. reward_scaler (d3rlpy.preprocessing.RewardScaler or str): reward preprocessor. The available options are ``['clip', 'min_max', 'standard']``. impl (d3rlpy.algos.torch.awr_impl.AWRImpl): algorithm implementation. """ _impl: Optional[AWRImpl] def _create_impl( self, observation_shape: Sequence[int], action_size: int ) -> None: self._impl = AWRImpl( observation_shape=observation_shape, action_size=action_size, actor_learning_rate=self._actor_learning_rate, critic_learning_rate=self._critic_learning_rate, actor_optim_factory=self._actor_optim_factory, critic_optim_factory=self._critic_optim_factory, actor_encoder_factory=self._actor_encoder_factory, critic_encoder_factory=self._critic_encoder_factory, use_gpu=self._use_gpu, scaler=self._scaler, action_scaler=self._action_scaler, reward_scaler=self._reward_scaler, ) self._impl.build()
[docs] def get_action_type(self) -> ActionSpace: return ActionSpace.CONTINUOUS
[docs]class DiscreteAWR(_AWRBase): r"""Discrete veriosn of Advantage-Weighted Regression algorithm. AWR is an actor-critic algorithm that trains via supervised regression way, and has shown strong performance in online and offline settings. The value function is trained as a supervised regression problem. .. math:: L(\theta) = \mathbb{E}_{s_t, R_t \sim D} [(R_t - V(s_t|\theta))^2] where :math:`R_t` is approximated using TD(:math:`\lambda`) to mitigate high variance issue. The policy function is also trained as a supervised regression problem. .. math:: J(\phi) = \mathbb{E}_{s_t, a_t, R_t \sim D} [\log \pi(a_t|s_t, \phi) \exp (\frac{1}{B} (R_t - V(s_t|\theta)))] where :math:`B` is a constant factor. References: * `Peng et al., Advantage-Weighted Regression: Simple and Scalable Off-Policy Reinforcement Learning <https://arxiv.org/abs/1910.00177>`_ Args: actor_learning_rate (float): learning rate for policy function. critic_learning_rate (float): learning rate for value function. actor_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the actor. critic_optim_factory (d3rlpy.models.optimizers.OptimizerFactory): optimizer factory for the critic. actor_encoder_factory (d3rlpy.models.encoders.EncoderFactory or str): encoder factory for the actor. critic_encoder_factory (d3rlpy.models.encoders.EncoderFactory or str): encoder factory for the critic. batch_size (int): batch size per iteration. n_frames (int): the number of frames to stack for image observation. gamma (float): discount factor. batch_size_per_update (int): mini-batch size. n_actor_updates (int): actor gradient steps per iteration. n_critic_updates (int): critic gradient steps per iteration. lam (float): :math:`\lambda` for TD(:math:`\lambda`). beta (float): :math:`B` for weight scale. max_weight (float): :math:`w_{\text{max}}` for weight clipping. use_gpu (bool, int or d3rlpy.gpu.Device): flag to use GPU, device ID or device. scaler (d3rlpy.preprocessing.Scaler or str): preprocessor. The available options are `['pixel', 'min_max', 'standard']`. reward_scaler (d3rlpy.preprocessing.RewardScaler or str): reward preprocessor. The available options are ``['clip', 'min_max', 'standard']``. impl (d3rlpy.algos.torch.awr_impl.DiscreteAWRImpl): algorithm implementation. """ _impl: Optional[DiscreteAWRImpl] def _create_impl( self, observation_shape: Sequence[int], action_size: int ) -> None: self._impl = DiscreteAWRImpl( observation_shape=observation_shape, action_size=action_size, actor_learning_rate=self._actor_learning_rate, critic_learning_rate=self._critic_learning_rate, actor_optim_factory=self._actor_optim_factory, critic_optim_factory=self._critic_optim_factory, actor_encoder_factory=self._actor_encoder_factory, critic_encoder_factory=self._critic_encoder_factory, use_gpu=self._use_gpu, scaler=self._scaler, action_scaler=None, reward_scaler=self._reward_scaler, ) self._impl.build()
[docs] def get_action_type(self) -> ActionSpace: return ActionSpace.DISCRETE