Source code for d3rlpy.online.buffers

from abc import ABCMeta, abstractmethod
from typing import Generic, List, Optional, TypeVar, Sequence

import numpy as np
import gym

from ..dataset import Episode, Transition, TransitionMiniBatch
from .utility import get_action_size_from_env

T = TypeVar("T")


class FIFOQueue(Generic[T]):
    """Simple FIFO queue implementation.

    Random access of this queue object is O(1).

    """

    _maxlen: int
    _buffer: List[Optional[T]]
    _cursor: int
    _size: int

    def __init__(self, maxlen: int):
        self._maxlen = maxlen
        self._buffer = [None for _ in range(maxlen)]
        self._cursor = 0
        self._size = 0

    def append(self, item: T) -> None:
        self._buffer[self._cursor] = item
        self._cursor += 1
        if self._cursor == self._maxlen:
            self._cursor = 0
        self._size = min(self._size + 1, self._maxlen)

    def __getitem__(self, index: int) -> T:
        assert index < self._size
        item = self._buffer[index]
        assert item is not None
        return item

    def __len__(self) -> int:
        return self._size


class Buffer(metaclass=ABCMeta):
    @abstractmethod
    def append(
        self,
        observation: np.ndarray,
        action: np.ndarray,
        reward: float,
        terminal: float,
    ) -> None:
        """Append observation, action, reward and terminal flag to buffer.

        If the terminal flag is True, Monte-Carlo returns will be computed with
        an entire episode and the whole transitions will be appended.

        Args:
            observation (numpy.ndarray): observation.
            action (numpy.ndarray or int): action.
            reward (float): reward.
            terminal (bool or float): terminal flag.

        """

    @abstractmethod
    def append_episode(self, episode: Episode) -> None:
        """Append Episode object to buffer.

        Args:
            episode: episode.

        """

    @abstractmethod
    def sample(
        self,
        batch_size: int,
        n_frames: int = 1,
        n_steps: int = 1,
        gamma: float = 0.99,
    ) -> TransitionMiniBatch:
        """Returns sampled mini-batch of transitions.

        If observation is image, you can stack arbitrary frames via
        ``n_frames``.

        .. code-block:: python

            buffer.observation_shape == (3, 84, 84)

            # stack 4 frames
            batch = buffer.sample(batch_size=32, n_frames=4)

            batch.observations.shape == (32, 12, 84, 84)

        Args:
            batch_size: mini-batch size.
            n_frames: the number of frames to stack for image observation.
            n_steps: the number of steps before the next observation.
            gamma: discount factor used in N-step return calculation.

        Returns:
            mini-batch.

        """

    @abstractmethod
    def size(self) -> int:
        """Returns the number of appended elements in buffer.

        Returns:
            the number of elements in buffer.

        """

    def __len__(self) -> int:
        return self.size()


[docs]class ReplayBuffer(Buffer): """Standard Replay Buffer. Args: maxlen (int): the maximum number of data length. env (gym.Env): gym-like environment to extract shape information. episodes (list(d3rlpy.dataset.Episode)): list of episodes to initialize buffer """ _prev_observation: Optional[np.ndarray] _prev_action: Optional[np.ndarray] _prev_reward: float _prev_transition: Optional[Transition] _transitions: FIFOQueue[Transition] _observation_shape: Sequence[int] _action_size: int def __init__( self, maxlen: int, env: gym.Env, episodes: Optional[List[Episode]] = None, ): # temporary cache to hold transitions for an entire episode self._prev_observation = None self._prev_action = None self._prev_reward = 0.0 self._prev_transition = None self._transitions = FIFOQueue(maxlen=maxlen) # extract shape information self._observation_shape = env.observation_space.shape self._action_size = get_action_size_from_env(env) # add initial transitions if episodes: for episode in episodes: self.append_episode(episode)
[docs] def append( self, observation: np.ndarray, action: np.ndarray, reward: float, terminal: float, ) -> None: # validation assert observation.shape == self._observation_shape if isinstance(action, np.ndarray): assert action.shape[0] == self._action_size else: action = int(action) assert action < self._action_size # create Transition object if self._prev_observation is not None: if isinstance(terminal, bool): terminal = 1.0 if terminal else 0.0 transition = Transition( observation_shape=self._observation_shape, action_size=self._action_size, observation=self._prev_observation, action=self._prev_action, reward=self._prev_reward, next_observation=observation, next_action=action, next_reward=reward, terminal=terminal, prev_transition=self._prev_transition, ) if self._prev_transition: self._prev_transition.next_transition = transition self._transitions.append(transition) self._prev_transition = transition self._prev_observation = observation self._prev_action = action self._prev_reward = reward if terminal: self._prev_observation = None self._prev_action = None self._prev_reward = 0.0 self._prev_transition = None
[docs] def append_episode(self, episode: Episode) -> None: assert episode.get_observation_shape() == self._observation_shape assert episode.get_action_size() == self._action_size for transition in episode.transitions: self._transitions.append(transition)
[docs] def sample( self, batch_size: int, n_frames: int = 1, n_steps: int = 1, gamma: float = 0.99, ) -> TransitionMiniBatch: indices = np.random.choice(len(self._transitions), batch_size) transitions = [self._transitions[index] for index in indices] return TransitionMiniBatch(transitions, n_frames, n_steps, gamma)
[docs] def size(self) -> int: return len(self._transitions)