from abc import ABCMeta, abstractmethod
from typing import Any, List, Optional, Union
from typing_extensions import Protocol
import numpy as np
class _ActionProtocol(Protocol):
def predict(self, x: Union[np.ndarray, List[Any]]) -> np.ndarray:
...
def sample_action(self, x: Union[np.ndarray, List[Any]]) -> np.ndarray:
...
@property
def action_size(self) -> Optional[int]:
...
class Explorer(metaclass=ABCMeta):
@abstractmethod
def sample(
self, algo: _ActionProtocol, x: np.ndarray, step: int
) -> np.ndarray:
pass
[docs]class ConstantEpsilonGreedy(Explorer):
""":math:`\\epsilon`-greedy explorer with constant :math:`\\epsilon`.
Args:
epsilon (float): the constant :math:`\\epsilon`.
"""
_epsilon: float
def __init__(self, epsilon: float):
self._epsilon = epsilon
[docs] def sample(
self, algo: _ActionProtocol, x: np.ndarray, step: int
) -> np.ndarray:
greedy_actions = algo.predict(x)
random_actions = np.random.randint(algo.action_size, size=x.shape[0])
is_random = np.random.random(x.shape[0]) < self._epsilon
return np.where(is_random, random_actions, greedy_actions)
[docs]class LinearDecayEpsilonGreedy(Explorer):
""":math:`\\epsilon`-greedy explorer with linear decay schedule.
Args:
start_epsilon (float): the beginning :math:`\\epsilon`.
end_epsilon (float): the end :math:`\\epsilon`.
duration (int): the scheduling duration.
"""
_start_epsilon: float
_end_epsilon: float
_duration: int
def __init__(
self,
start_epsilon: float = 1.0,
end_epsilon: float = 0.1,
duration: int = 1000000,
):
self._start_epsilon = start_epsilon
self._end_epsilon = end_epsilon
self._duration = duration
[docs] def sample(
self, algo: _ActionProtocol, x: np.ndarray, step: int
) -> np.ndarray:
"""Returns :math:`\\epsilon`-greedy action.
Args:
algo: algorithm.
x: observation.
step: current environment step.
Returns:
:math:`\\epsilon`-greedy action.
"""
greedy_actions = algo.predict(x)
random_actions = np.random.randint(algo.action_size, size=x.shape[0])
is_random = np.random.random(x.shape[0]) < self.compute_epsilon(step)
return np.where(is_random, random_actions, greedy_actions)
[docs] def compute_epsilon(self, step: int) -> float:
"""Returns decayed :math:`\\epsilon`.
Returns:
:math:`\\epsilon`.
"""
if step >= self._duration:
return self._end_epsilon
base = self._start_epsilon - self._end_epsilon
return base * (1.0 - step / self._duration) + self._end_epsilon
[docs]class NormalNoise(Explorer):
"""Normal noise explorer.
Args:
mean (float): mean.
std (float): standard deviation.
"""
_mean: float
_std: float
def __init__(self, mean: float = 0.0, std: float = 0.1):
self._mean = mean
self._std = std
[docs] def sample(
self, algo: _ActionProtocol, x: np.ndarray, step: int
) -> np.ndarray:
"""Returns action with noise injection.
Args:
algo: algorithm.
x: observation.
Returns:
action with noise injection.
"""
action = algo.sample_action(x)
noise = np.random.normal(self._mean, self._std, size=action.shape)
return np.clip(action + noise, -1.0, 1.0)