Source code for d3rlpy.dataset

import numpy as np
import torch
import h5py

from .gpu import Device


def _safe_size(array):
    if isinstance(array, (list, tuple)):
        return len(array)
    elif isinstance(array, (np.ndarray, torch.Tensor)):
        return array.shape[0]
    raise ValueError


def _device_to_string(device):
    if device is None:
        return 'cpu:0'
    return 'cuda:%d' % device.get_id()


def _numpy_to_tensor(ndarray, device):
    # return if ndarray is already torch.Tensor
    if isinstance(ndarray, torch.Tensor):
        return ndarray

    if ndarray.dtype == np.float32:
        dtype = torch.float32
    elif ndarray.dtype == np.float64:
        dtype = torch.float32
    elif ndarray.dtype == np.int32:
        dtype = torch.int32
    elif ndarray.dtype == np.uint8:
        dtype = torch.uint8
    device = _device_to_string(device)

    return torch.tensor(ndarray, dtype=dtype, device=device)


def _to_episodes(observation_shape, action_size, observations, actions,
                 rewards, terminals):
    rets = []
    head_index = 0
    for i in range(_safe_size(observations)):
        if terminals[i]:
            episode = Episode(observation_shape=observation_shape,
                              action_size=action_size,
                              observations=observations[head_index:i + 1],
                              actions=actions[head_index:i + 1],
                              rewards=rewards[head_index:i + 1])
            rets.append(episode)
            head_index = i + 1
    return rets


def _to_transitions(observation_shape, action_size, observations, actions,
                    rewards):
    rets = []
    num_data = _safe_size(observations)
    prev_transition = None
    for i in range(num_data - 1):
        observation = observations[i]
        action = actions[i]
        reward = rewards[i]
        next_observation = observations[i + 1]
        next_action = actions[i + 1]
        next_reward = rewards[i + 1]
        terminal = 1.0 if i == num_data - 2 else 0.0

        transition = Transition(observation_shape=observation_shape,
                                action_size=action_size,
                                observation=observation,
                                action=action,
                                reward=reward,
                                next_observation=next_observation,
                                next_action=next_action,
                                next_reward=next_reward,
                                terminal=terminal,
                                prev_transition=prev_transition)

        # set pointer to the next transition
        if prev_transition:
            prev_transition.next_transition = transition

        prev_transition = transition

        rets.append(transition)
    return rets


def _stack_frames(transition, n_frames):
    assert len(transition.observation.shape) == 3
    assert n_frames > 1
    assert isinstance(transition.observation, (np.ndarray, torch.Tensor))

    dtype = transition.observation.dtype
    n_channels = transition.observation.shape[0]
    image_size = transition.observation.shape[1:]
    shape = (n_frames * n_channels, *image_size)

    # create empty ndarray
    if isinstance(transition.observation, np.ndarray):
        observation = np.zeros(shape, dtype=dtype)
        next_observation = np.zeros(shape, dtype=dtype)
    else:
        device = transition.observation.device
        observation = torch.zeros(shape, dtype=dtype, device=device)
        next_observation = torch.zeros(shape, dtype=dtype, device=device)

    # stack frames
    t = transition
    for i in range(n_frames):
        tail_index = n_frames * n_channels - i * n_channels
        head_index = tail_index - n_channels
        observation[head_index:tail_index] = t.observation
        next_observation[head_index:tail_index] = t.next_observation
        if t.prev_transition is None:
            if i != n_frames - 1:
                tail_index -= n_channels
                head_index -= n_channels
                next_observation[head_index:tail_index] = t.observation
            break
        t = t.prev_transition

    return observation, next_observation


[docs]class MDPDataset: """ Markov-Decision Process Dataset class. MDPDataset is deisnged for reinforcement learning datasets to use them like supervised learning datasets. .. code-block:: python from d3rlpy.dataset import MDPDataset # 1000 steps of observations with shape of (100,) observations = np.random.random((1000, 100)) # 1000 steps of actions with shape of (4,) actions = np.random.random((1000, 4)) # 1000 steps of rewards rewards = np.random.random(1000) # 1000 steps of terminal flags terminals = np.random.randint(2, size=1000) dataset = MDPDataset(observations, actions, rewards, terminals) The MDPDataset object automatically splits the given data into list of :class:`d3rlpy.dataset.Episode` objects. Furthermore, the MDPDataset object behaves like a list in order to use with scikit-learn utilities. .. code-block:: python # returns the number of episodes len(dataset) # access to the first episode episode = dataset[0] # iterate through all episodes for episode in dataset: pass Args: observations (numpy.ndarray or list(numpy.ndarray)): N-D array. If the observation is a vector, the shape should be `(N, dim_observation)`. If the observations is an image, the shape should be `(N, C, H, W)`. actions (numpy.ndarray): N-D array. If the actions-space is continuous, the shape should be `(N, dim_action)`. If the action-space is discrete, the shpae should be `(N,)`. rewards (numpy.ndarray): array of scalar rewards. terminals (numpy.ndarray): array of binary terminal flags. discrete_action (bool): flag to use the given actions as discrete action-space actions. as_tensor (bool): flag to hold observations as ``torch.Tensor``. device (d3rlpy.gpu.Device or int): gpu device or device id for tensors. """ def __init__(self, observations, actions, rewards, terminals, discrete_action=False, as_tensor=False, device=None): # data type option self._as_tensor = as_tensor if isinstance(device, int): self._device = Device(device) else: self._device = device # numpy to PyTorch conversion if as_tensor: self._observations = _numpy_to_tensor(observations, self._device) else: self._observations = observations self._rewards = np.asarray(rewards).reshape(-1) self._terminals = np.asarray(terminals).reshape(-1) self.discrete_action = discrete_action if discrete_action: self._actions = np.asarray(actions).reshape(-1) else: self._actions = np.asarray(actions) self._episodes = None @property def observations(self): """ Returns the observations. Returns: numpy.ndarray, list(numpy.ndarray) or torch.Tensor: array of observations. """ return self._observations @property def actions(self): """ Returns the actions. Returns: numpy.ndarray: array of actions. """ return self._actions @property def rewards(self): """ Returns the rewards. Returns: numpy.ndarray: array of rewards """ return self._rewards @property def terminals(self): """ Returns the terminal flags. Returns: numpy.ndarray: array of terminal flags. """ return self._terminals @property def as_tensor(self): """ Returns the flag to hold observations as ``torch.Tensor``. Returns: bool: flag to hold observations as ``torch.Tensor``. """ return self._as_tensor @property def device(self): """ Returns the gpu device for tensors. Returns: d3rlpy.gpu.Device: gpu device. """ return self._device @property def episodes(self): """ Returns the episodes. Returns: list(d3rlpy.dataset.Episode): list of :class:`d3rlpy.dataset.Episode` objects. """ if self._episodes is None: self.build_episodes() return self._episodes
[docs] def size(self): """ Returns the number of episodes in the dataset. Returns: int: the number of episodes. """ return len(self.episodes)
[docs] def get_action_size(self): """ Returns dimension of action-space. If `discrete_action=True`, the return value will be the maximum index +1 in the give actions. Returns: int: dimension of action-space. """ if self.discrete_action: return int(np.max(self._actions) + 1) return self._actions.shape[1]
[docs] def get_observation_shape(self): """ Returns observation shape. Returns: tuple: observation shape. """ return self._observations[0].shape
[docs] def is_action_discrete(self): """ Returns `discrete_action` flag. Returns: bool: `discrete_action` flag. """ return self.discrete_action
[docs] def compute_stats(self): """ Computes statistics of the dataset. .. code-block:: python stats = dataset.compute_stats() # return statistics stats['return']['mean'] stats['return']['std'] stats['return']['min'] stats['return']['max'] # reward statistics stats['reward']['mean'] stats['reward']['std'] stats['reward']['min'] stats['reward']['max'] # action (only with continuous control actions) stats['action']['mean'] stats['action']['std'] stats['action']['min'] stats['action']['max'] # observation (only with numpy.ndarray observations) stats['observation']['mean'] stats['observation']['std'] stats['observation']['min'] stats['observation']['max'] Returns: dict: statistics of the dataset. """ episode_returns = [] for episode in self.episodes: episode_returns.append(episode.compute_return()) stats = { 'return': { 'mean': np.mean(episode_returns), 'std': np.std(episode_returns), 'min': np.min(episode_returns), 'max': np.max(episode_returns), 'histogram': np.histogram(episode_returns, bins=20) }, 'reward': { 'mean': np.mean(self._rewards), 'std': np.std(self._rewards), 'min': np.min(self._rewards), 'max': np.max(self._rewards), 'histogram': np.histogram(self._rewards, bins=20) } } # only for continuous control task if not self.discrete_action: # calculate histogram on each dimension hists = [] for i in range(self.get_action_size()): hists.append(np.histogram(self.actions[:, i], bins=20)) stats['action'] = { 'mean': np.mean(self.actions, axis=0), 'std': np.std(self.actions, axis=0), 'min': np.min(self.actions, axis=0), 'max': np.max(self.actions, axis=0), 'histogram': hists } else: # count frequency of discrete actions freqs = [] for i in range(self.get_action_size()): freqs.append((self.actions == i).sum()) stats['action'] = { 'histogram': [freqs, np.arange(self.get_action_size())] } # avoid large copy when observations are huge data. if isinstance(self._observations, np.ndarray): stats['observation'] = { 'mean': np.mean(self.observations, axis=0), 'std': np.std(self.observations, axis=0), 'min': np.min(self.observations, axis=0), 'max': np.max(self.observations, axis=0), } return stats
[docs] def clip_reward(self, low=None, high=None): """ Clips rewards in the given range. Args: low (float): minimum value. If None, clipping is not performed on lower edge. high (float): maximum value. If None, clipping is not performed on upper edge. """ self._rewards = np.clip(self._rewards, low, high) # rebuild Episode objects if self._episodes: self.build_episodes()
[docs] def append(self, observations, actions, rewards, terminals): """ Appends new data. Args: observations (numpy.ndarray or list(numpy.ndarray)): N-D array. actions (numpy.ndarray): actions. rewards (numpy.ndarray): rewards. terminals (numpy.ndarray): terminals. """ # validation for observation, action in zip(observations, actions): assert observation.shape == self.get_observation_shape() if self.discrete_action: assert int(action) < self.get_action_size() else: assert action.shape == (self.get_action_size(), ) # append observations if isinstance(self._observations, list): self._observations += list(map(lambda x: x, observations)) elif isinstance(self._observations, torch.Tensor): observations = _numpy_to_tensor(observations, self._device) self._observations = torch.cat([self._observations, observations], dim=0) else: self._observations = np.vstack([self._observations, observations]) # append actions if self.discrete_action: self._actions = np.hstack([self._actions, actions]) else: self._actions = np.vstack([self._actions, actions]) # append rests self._rewards = np.hstack([self._rewards, rewards]) self._terminals = np.hstack([self._terminals, terminals]) # convert new data to list of episodes episodes = _to_episodes(observation_shape=self.get_observation_shape(), action_size=self.get_action_size(), observations=observations, actions=actions, rewards=rewards, terminals=terminals) # append to episodes self._episodes += episodes
[docs] def extend(self, dataset): """ Extend dataset by another dataset. Args: dataset (d3rlpy.dataset.MDPDataset): dataset. """ assert self.is_action_discrete() == dataset.is_action_discrete() assert self.get_observation_shape() == dataset.get_observation_shape() assert self.get_action_size() == dataset.get_action_size() self.append(dataset.observations, dataset.actions, dataset.rewards, dataset.terminals)
[docs] def dump(self, fname): """ Saves dataset as HDF5. Args: fname (str): file path. """ # make sure if data is numpy.ndarray or list if isinstance(self._observations, torch.Tensor): observations = self._observations.cpu().numpy() else: observations = self._observations with h5py.File(fname, 'w') as f: f.create_dataset('observations', data=observations) f.create_dataset('actions', data=self._actions) f.create_dataset('rewards', data=self._rewards) f.create_dataset('terminals', data=self._terminals) f.create_dataset('discrete_action', data=self.discrete_action) f.flush()
[docs] @classmethod def load(cls, fname, as_tensor=False, device=None): """ Loads dataset from HDF5. .. code-block:: python import numpy as np from d3rlpy.dataset import MDPDataset dataset = MDPDataset(np.random.random(10, 4), np.random.random(10, 2), np.random.random(10), np.random.randint(2, size=10)) # save as HDF5 dataset.dump('dataset.h5') # load from HDF5 new_dataset = MDPDataset.load('dataset.h5') Args: fname (str): file path. as_tensor (bool): flag to hold observations as ``torch.Tensor``. device (d3rlpy.gpu.Device or int): gpu device or device id for tensor. """ with h5py.File(fname, 'r') as f: observations = f['observations'][()] actions = f['actions'][()] rewards = f['rewards'][()] terminals = f['terminals'][()] discrete_action = f['discrete_action'][()] dataset = cls(observations=observations, actions=actions, rewards=rewards, terminals=terminals, discrete_action=discrete_action, as_tensor=as_tensor, device=device) return dataset
[docs] def build_episodes(self): """ Builds episode objects. This method will be internally called when accessing the episodes property at the first time. """ self._episodes = _to_episodes( observation_shape=self.get_observation_shape(), action_size=self.get_action_size(), observations=self._observations, actions=self._actions, rewards=self._rewards, terminals=self._terminals)
[docs] def __len__(self): return self.size()
[docs] def __getitem__(self, index): return self.episodes[index]
[docs] def __iter__(self): return iter(self.episodes)
[docs]class Episode: """ Episode class. This class is designed to hold data collected in a single episode. Episode object automatically splits data into list of :class:`d3rlpy.dataset.Transition` objects. Also Episode object behaves like a list object for ease of access to transitions. .. code-block:: python # return the number of transitions len(episode) # access to the first transition transitions = episode[0] # iterate through all transitions for transition in episode: pass Args: observation_shape (tuple): observation shape. action_size (int): dimension of action-space. observations (numpy.ndarray, list(numpy.ndarray) or torch.Tensor): observations. actions (numpy.ndarray): actions. rewards (numpy.ndarray): scalar rewards. terminals (numpy.ndarray): binary terminal flags. """ def __init__(self, observation_shape, action_size, observations, actions, rewards): self.observation_shape = observation_shape self.action_size = action_size self._observations = observations self._actions = actions self._rewards = rewards self._transitions = None @property def observations(self): """ Returns the observations. Returns: numpy.ndarray, list(numpy.ndarray) or torch.Tensor: array of observations. """ return self._observations @property def actions(self): """ Returns the actions. Returns: numpy.ndarray: array of actions. """ return self._actions @property def rewards(self): """ Returns the rewards. Returns: numpy.ndarray: array of rewards. """ return self._rewards @property def transitions(self): """ Returns the transitions. Returns: list(d3rlpy.dataset.Transition): list of :class:`d3rlpy.dataset.Transition` objects. """ if self._transitions is None: self.build_transitions() return self._transitions
[docs] def build_transitions(self): """ Builds transition objects. This method will be internally called when accessing the transitions property at the first time. """ self._transitions = _to_transitions( observation_shape=self.observation_shape, action_size=self.action_size, observations=self._observations, actions=self._actions, rewards=self._rewards)
[docs] def size(self): """ Returns the number of transitions. Returns: int: the number of transitions. """ return len(self.transitions)
[docs] def get_observation_shape(self): """ Returns observation shape. Returns: tuple: observation shape. """ return self.observation_shape
[docs] def get_action_size(self): """ Returns dimension of action-space. Returns: int: dimension of action-space. """ return self.action_size
[docs] def compute_return(self): """ Computes sum of rewards. .. math:: R = \\sum_{i=1} r_i Returns: float: episode return. """ return np.sum(self._rewards[1:])
[docs] def __len__(self): return self.size()
[docs] def __getitem__(self, index): return self.transitions[index]
[docs] def __iter__(self): return iter(self.transitions)
[docs]class Transition: """ Transition class. This class is designed to hold data between two time steps, which is usually used as inputs of loss calculation in reinforcement learning. Args: observation_shape (tuple): observation shape. action_size (int): dimension of action-space. observation (numpy.ndarray or torch.Tensor): observation at `t`. action (numpy.ndarray or int): action at `t`. reward (float): reward at `t`. next_observation (numpy.ndarray or torch.Tensor): observation at `t+1`. next_action (numpy.ndarray or int): action at `t+1`. next_reward (float): reward at `t+1`. terminal (int): terminal flag at `t+1`. prev_transition (d3rlpy.dataset.Transition): pointer to the previous transition. next_transition (d3rlpy.dataset.Transition): pointer to the next transition. """ def __init__(self, observation_shape, action_size, observation, action, reward, next_observation, next_action, next_reward, terminal, prev_transition=None, next_transition=None): self.observation_shape = observation_shape self.action_size = action_size self._observation = observation self._action = action self._reward = reward self._next_observation = next_observation self._next_action = next_action self._next_reward = next_reward self._terminal = terminal self._prev_transition = prev_transition self._next_transition = next_transition
[docs] def get_observation_shape(self): """ Returns observation shape. Returns: tuple: observation shape. """ return self.observation_shape
[docs] def get_action_size(self): """ Returns dimension of action-space. Returns: int: dimension of action-space. """ return self.action_size
@property def observation(self): """ Returns observation at `t`. Returns: numpy.ndarray or torch.Tensor: observation at `t`. """ return self._observation @property def action(self): """ Returns action at `t`. Returns: (numpy.ndarray or int): action at `t`. """ return self._action @property def reward(self): """ Returns reward at `t`. Returns: float: reward at `t`. """ return self._reward @property def next_observation(self): """ Returns observation at `t+1`. Returns: numpy.ndarray or torch.Tensor: observation at `t+1`. """ return self._next_observation @property def next_action(self): """ Returns action at `t+1`. Returns: (numpy.ndarray or int): action at `t+1`. """ return self._next_action @property def next_reward(self): """ Returns reward at `t+1`. Returns: float: reward at `t+1`. """ return self._next_reward @property def terminal(self): """ Returns terminal flag at `t+1`. Returns: int: terminal flag at `t+1`. """ return self._terminal @property def prev_transition(self): """ Returns pointer to the previous transition. If this is the first transition, this method should return ``None``. Returns: d3rlpy.dataset.Transition: previous transition. """ return self._prev_transition @prev_transition.setter def prev_transition(self, transition): """ Sets transition to ``prev_transition``. Args: d3rlpy.dataset.Transition: previous transition. """ assert isinstance(transition, Transition) self._prev_transition = transition @property def next_transition(self): """ Returns pointer to the next transition. If this is the last transition, this method should return ``None``. Returns: d3rlpy.dataset.Transition: next transition. """ return self._next_transition @next_transition.setter def next_transition(self, transition): """ Sets transition to ``next_transition``. Args: d3rlpy.dataset.Dataset: next transition. """ assert isinstance(transition, Transition) self._next_transition = transition
[docs]class TransitionMiniBatch: """ mini-batch of Transition objects. This class is designed to hold :class:`d3rlpy.dataset.Transition` objects for being passed to algorithms during fitting. If the observation is image, you can stack arbitrary frames via ``n_frames``. .. code-block:: python transition.observation.shape == (3, 84, 84) batch_size = len(transitions) # stack 4 frames batch = TransitionMiniBatch(transitions, n_frames=4) # 4 frames x 3 channels batch.observations.shape == (batch_size, 12, 84, 84) This is implemented by tracing previous transitions through ``prev_transition`` property. Args: transitions (list(d3rlpy.dataset.Transition)): mini-batch of transitions. n_frames (int): the number of frames to stack for image observation. """ def __init__(self, transitions, n_frames=1): self._transitions = transitions observations = [] actions = [] rewards = [] next_observations = [] next_actions = [] next_rewards = [] terminals = [] for transition in transitions: # stack frames if necessary if n_frames > 1 and len(transition.observation.shape) == 3: stacked_data = _stack_frames(transition, n_frames) observation, next_observation = stacked_data else: observation = transition.observation next_observation = transition.next_observation observations.append(observation) actions.append(transition.action) rewards.append(transition.reward) next_observations.append(next_observation) next_actions.append(transition.next_action) next_rewards.append(transition.next_reward) terminals.append(transition.terminal) # convert list to ndarray or torch.Tensor and fix shapes if isinstance(observations[0], torch.Tensor): self._observations = torch.stack(observations, dim=0) self._next_observations = torch.stack(next_observations, dim=0) else: self._observations = np.array(observations) self._next_observations = np.array(next_observations) self._actions = np.array(actions).reshape((self.size(), -1)) self._rewards = np.array(rewards).reshape((self.size(), 1)) self._next_rewards = np.array(next_rewards).reshape((self.size(), 1)) self._next_actions = np.array(next_actions).reshape((self.size(), -1)) self._terminals = np.array(terminals).reshape((self.size(), 1)) @property def observations(self): """ Returns mini-batch of observations at `t`. Returns: numpy.ndarray or torch.Tensor: observations at `t`. """ return self._observations @property def actions(self): """ Returns mini-batch of actions at `t`. Returns: numpy.ndarray: actions at `t`. """ return self._actions @property def rewards(self): """ Returns mini-batch of rewards at `t`. Returns: numpy.ndarray: rewards at `t`. """ return self._rewards @property def next_observations(self): """ Returns mini-batch of observations at `t+1`. Returns: numpy.ndarray or torch.Tensor: observations at `t+1`. """ return self._next_observations @property def next_actions(self): """ Returns mini-batch of actions at `t+1`. Returns: numpy.ndarray: actions at `t+1`. """ return self._next_actions @property def next_rewards(self): """ Returns mini-batch of rewards at `t+1`. Returns: numpy.ndarray: rewards at `t+1`. """ return self._next_rewards @property def terminals(self): """ Returns mini-batch of terminal flags at `t+1`. Returns: numpy.ndarray: terminal flags at `t+1`. """ return self._terminals @property def transitions(self): """ Returns transitions. Returns: d3rlpy.dataset.Transition: list of transitions. """ return self._transitions
[docs] def size(self): """ Returns size of mini-batch. Returns: int: mini-batch size. """ return len(self._transitions)
[docs] def __len__(self): return self.size()
[docs] def __getitem__(self, index): return self._transitions[index]
[docs] def __iter__(self): return iter(self._transitions)