Commit d8388353 by Werner Duvaud

Initial commit

parents
MIT License
Copyright (c) 2020 Werner Duvaud
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
<p>
<img src="https://img.shields.io/badge/licence-MIT-green">
<img src="https://img.shields.io/badge/dependencies-up%20to%20date-brightgreen">
<a href="https://github.com/psf/black"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg"></a>
</p>
# MuZero General
A flexible, commented and [documented](https://github.com/werner-duvaud/muzero-general/wiki/MuZero-Documentation) implementation of MuZero based on the Google DeepMind [paper](https://arxiv.org/abs/1911.08265) and the associated [pseudocode](https://arxiv.org/src/1911.08265v1/anc/pseudocode.py).
It is designed to be easily adaptable for every games or reinforcement learning environnements (like [gym](https://github.com/openai/gym)). You only need to edit the game file with the parameters and the game class. Please refer to the documentation and the tutorial.
MuZero is a model based reinforcement learning algorithm, successor of AlphaZero. It learns to master games whithout knowing the rules. It only know actions and then learn to play and master the game. It is at least more efficient than similar algorithms like [AlphaZero](https://arxiv.org/abs/1712.01815), [SimPLe](https://arxiv.org/abs/1903.00374) and [World Models](https://arxiv.org/abs/1803.10122).
It uses [PyTorch](https://github.com/pytorch/pytorch) and [Ray](https://github.com/ray-project/ray) for self-playing on multiple threads. A synchronous mode (easier for debug) will be released. There is a complete GPU support.
The code has three parts, muzero.py with the entry class, self-play.py with the replay-buffer and the MCTS classes, and network.py with the neural networks and the shared storage classes.
## Games already implemented with pretrained network available
* Lunar Lander
* Cartpole
## Getting started
### Installation
```bash
cd muzero-general
pip install -r requirements.txt
```
### Training
Edit the end of muzero.py :
```python
muzero = Muzero("cartpole")
muzero.train()
```
Then run :
```bash
python muzero.py
```
### Testing
Edit the end of muzero.py :
```python
muzero = Muzero("cartpole")
muzero.load_model()
muzero.test()
```
Then run :
```bash
python muzero.py
```
## Coming soon
* [ ] Convolutionnal / Atari mode
* [ ] Performance tracking
* [ ] Synchronous mode
* [ ] [Open spiel](https://github.com/deepmind/open_spiel) integration
* [ ] Checkers game
* [ ] TensorFlow mode
## Authors
* Werner Duvaud
* Aurèle Hainaut
* Paul Lenoir
import gym
import numpy
import torch
class MuZeroConfig:
def __init__(self):
self.seed = 0 # Seed for numpy, torch and the game
### Game
self.observation_shape = 4 # Dimensions of the game observation
self.action_space = [i for i in range(2)] # Fixed list of all possible actions
### Self-Play
self.num_actors = 10 # Number of simultaneous threads self-playing to feed the replay buffer
self.max_moves = 500 # Maximum number of moves if game is not finished before
self.num_simulations = 50 # Number of futur moves self-simulated
self.discount = 0.997 # Chronological discount of the reward
# Root prior exploration noise
self.root_dirichlet_alpha = 0.25
self.root_exploration_fraction = 0.25
# UCB formula
self.pb_c_base = 19652
self.pb_c_init = 1.25
# If we already have some information about which values occur in the environment, we can use them to initialize the rescaling
# This is not strictly necessary, but establishes identical behaviour to AlphaZero in board games
self.min_known_bound = None
self.max_known_bound = None
### Network
self.encoding_size = 32
self.hidden_size = 64
# Training
self.results_path = "./pretrained" # Path to store the model weights
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Automatically use GPU instead of CPU if available
self.training_steps = 400 # Total number of training steps (ie weights update according to a batch)
self.batch_size = 128 # Number of parts of games to train on at each training step
self.num_unroll_steps = 5 # Number of game moves to keep for every batch element
self.test_interval = 20 # Number of training steps before evaluating the network on the game to track the performance
self.test_episodes = 2 # Number of game played to evaluate the network
self.checkpoint_interval = 10 # Number of training steps before using the model for sef-playing
self.window_size = 1000 # Number of self-play games to keep in memory (in the replay buffer)
self.td_steps = 10 # Number of steps in the futur to take into account for calculating the target value
self.weight_decay = 1e-4 # L2 weights regularization
self.momentum = 0.9
# Exponential learning rate schedule
self.lr_init = 0.005 # Initial learning rate
self.lr_decay_rate = 0.1
self.lr_decay_steps = 3500
def visit_softmax_temperature_fn(self, num_moves, trained_steps):
"""
Parameter to alter the visit count distribution to ensure that the action selection becomes greedier as training progresses.
The smaller it is, the more likely the best action (ie with the highest visit count) is chosen.
Returns:
Positive float.
"""
if trained_steps < 0.5 * self.training_steps:
return 1.0
elif trained_steps < 0.75 * self.training_steps:
return 0.5
else:
return 0.25
class Game:
"""Game wrapper.
"""
def __init__(self, seed=None):
self.env = gym.make("CartPole-v1")
if seed is not None:
self.env.seed(seed)
def step(self, action):
"""Apply action to the game.
Args:
action : action of the action_space to take.
Returns:
The new observation, the reward and a boolean if the game has ended.
"""
observation, reward, done, _ = self.env.step(action)
return numpy.array(observation).flatten(), reward, done
def reset(self):
"""Reset the game for a new game.
Returns:
Initial observation of the game.
"""
return self.env.reset()
def close(self):
"""Properly close the game.
"""
self.env.close()
def render(self):
"""Display the game observation.
"""
self.env.render()
input("Press enter to take a step ")
import gym
import numpy
import torch
class MuZeroConfig:
def __init__(self):
self.seed = 0 # Seed for numpy, torch and the game
### Game
self.observation_shape = 8 # Dimensions of the game observation
self.action_space = [i for i in range(4)] # Fixed list of all possible actions
### Self-Play
self.num_actors = 10 # Number of simultaneous threads self-playing to feed the replay buffer
self.max_moves = 100 # Maximum number of moves if game is not finished before
self.num_simulations = 50 # Number of futur moves self-simulated
self.discount = 0.997 # Chronological discount of the reward
# Root prior exploration noise
self.root_dirichlet_alpha = 0.25
self.root_exploration_fraction = 0.25
# UCB formula
self.pb_c_base = 19652
self.pb_c_init = 1.25
# If we already have some information about which values occur in the environment, we can use them to initialize the rescaling
# This is not strictly necessary, but establishes identical behaviour to AlphaZero in board games
self.min_known_bound = None
self.max_known_bound = None
### Network
self.encoding_size = 16
self.hidden_size = 8
# Training
self.results_path = "./pretrained" # Path to store the model weights
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Automatically use GPU instead of CPU if available
self.training_steps = 700 # Total number of training steps (ie weights update according to a batch)
self.batch_size = 128 # Number of parts of games to train on at each training step
self.num_unroll_steps = 50 # Number of game moves to keep for every batch element
self.test_interval = 20 # Number of training steps before evaluating the network on the game to track the performance
self.test_episodes = 2 # Number of game played to evaluate the network
self.checkpoint_interval = 20 # Number of training steps before using the model for sef-playing
self.window_size = 1000 # Number of self-play games to keep in memory (in the replay buffer)
self.td_steps = 50 # Number of steps in the futur to take into account for calculating the target value
self.weight_decay = 1e-4 # L2 weights regularization
self.momentum = 0.9
# Exponential learning rate schedule
self.lr_init = 0.005 # Initial learning rate
self.lr_decay_rate = 0.01
self.lr_decay_steps = 3500
def visit_softmax_temperature_fn(self, num_moves, trained_steps):
"""
Parameter to alter the visit count distribution to ensure that the action selection becomes greedier as training progresses.
The smaller it is, the more likely the best action (ie with the highest visit count) is chosen.
Returns:
Positive float.
"""
if trained_steps < 0.25 * self.training_steps:
return 1000
elif trained_steps < 0.5 * self.training_steps:
return 1
elif trained_steps < 0.75 * self.training_steps:
return 0.5
else:
return 0.1
class Game:
"""Game wrapper.
"""
def __init__(self, seed=None):
self.env = gym.make("LunarLander-v2")
if seed is not None:
self.env.seed(seed)
def step(self, action):
"""Apply action to the game.
Args:
action : action of the action_space to take.
Returns:
The new observation, the reward and a boolean if the game has ended.
"""
observation, reward, done, _ = self.env.step(action)
return numpy.array(observation).flatten(), reward, done
def reset(self):
"""Reset the game for a new game.
Returns:
Initial observation of the game.
"""
return self.env.reset()
def close(self):
"""Properly close the game.
"""
self.env.close()
def render(self):
"""Display the game observation.
"""
self.env.render()
input("Press enter to take a step ")
import importlib
import os
import time
import numpy
import ray
import torch
import network
import self_play
class MuZero:
"""
Main class to manage MuZero.
Args:
game_name (str): Name of the game module, it should match the name of a .py file in the "./games" directory.
Example:
>>> muzero = MuZero("cartpole")
>>> muzero.train()
>>> muzero.test()
"""
def __init__(self, game_name):
self.game_name = game_name
# Load the game and the config from the module with the game name
try:
game_module = importlib.import_module("games." + self.game_name)
self.config = game_module.MuZeroConfig()
self.Game = game_module.Game
except Exception as err:
print(
'{} is not a supported game name, try "cartpole" or refer to the documentation for adding a new game.'.format(
self.game_name
)
)
raise err
# Fix random generator seed for reproductibility
# TODO: check if results do not change from one run to another
numpy.random.seed(self.config.seed)
torch.manual_seed(self.config.seed)
self.best_model = network.Network(
self.config.observation_shape,
len(self.config.action_space),
self.config.encoding_size,
self.config.hidden_size,
)
def train(self):
# Initialize and launch components that work simultaneously
ray.init()
model = self.best_model
model.train()
storage = network.SharedStorage.remote(model)
replay_buffer = self_play.ReplayBuffer.remote(self.config)
for seed in range(self.config.num_actors):
self_play.run_selfplay.remote(
self.Game,
self.config,
storage,
replay_buffer,
model,
self.config.seed + seed,
)
# Initialize network for training
model = model.to(self.config.device)
optimizer = torch.optim.SGD(
model.parameters(),
lr=self.config.lr_init,
momentum=self.config.momentum,
weight_decay=self.config.weight_decay,
)
# Wait for replay buffer to be non-empty
while ray.get(replay_buffer.length.remote()) == 0:
time.sleep(0.1)
# Training loop
best_test_rewards = None
for training_step in range(self.config.training_steps):
model.train()
storage.set_training_step.remote(training_step)
# Make the model available for self-play
if training_step % self.config.checkpoint_interval == 0:
storage.set_weights.remote(model.state_dict())
# Update learning rate
lr = self.config.lr_init * self.config.lr_decay_rate ** (
training_step / self.config.lr_decay_steps
)
for param_group in optimizer.param_groups:
param_group["lr"] = lr
# Train on a batch.
batch = ray.get(
replay_buffer.sample_batch.remote(
self.config.num_unroll_steps, self.config.td_steps
)
)
loss = network.update_weights(optimizer, model, batch, self.config)
# Test the current model and save it on disk if it is the best
if training_step % self.config.test_interval == 0:
total_test_rewards = self.test(model=model, render=False)
if best_test_rewards is None or sum(total_test_rewards) >= sum(
best_test_rewards
):
self.best_model = model
best_test_rewards = total_test_rewards
self.save_model()
print(
"Training step: {}\nBuffer Size: {}\nLearning rate: {}\nLoss: {}\nLast test score: {}\nBest sest score: {}\n".format(
training_step,
ray.get(replay_buffer.length.remote()),
lr,
loss,
str(total_test_rewards),
str(best_test_rewards),
)
)
# Finally, save the latest network in the shared storage and end the self-play
storage.set_weights.remote(model.state_dict())
ray.shutdown()
def test(self, model=None, render=True):
if not model:
model = self.best_model
model.to(self.config.device)
test_rewards = []
game = self.Game()
model.eval()
with torch.no_grad():
for _ in range(self.config.test_episodes):
observation = game.reset()
done = False
total_reward = 0
while not done:
if render:
game.render()
root = self_play.MCTS(self.config).run(model, observation, False)
action = self_play.select_action(root, temperature=0)
observation, reward, done = game.step(action)
total_reward += reward
test_rewards.append(total_reward)
return test_rewards
def save_model(self, model=None, path=None):
if not model:
model = self.best_model
if not path:
path = os.path.join(self.config.results_path, self.game_name)
torch.save(model.state_dict(), path)
def load_model(self, path=None):
if not path:
path = os.path.join(self.config.results_path, self.game_name)
self.best_model = network.Network(
self.config.observation_shape,
len(self.config.action_space),
self.config.encoding_size,
self.config.hidden_size,
)
try:
self.best_model.load_state_dict(torch.load(path))
except FileNotFoundError:
print("There is no model saved in {}.".format(path))
if __name__ == "__main__":
# Load the game and the parameters from ./games/file_name.py
muzero = MuZero("cartpole")
muzero.load_model()
muzero.train()
muzero.test()
import ray
import torch
class Network(torch.nn.Module):
def __init__(self, input_size, action_space_n, encoding_size, hidden_size):
super().__init__()
self.action_space_n = action_space_n
self.representation_network = FullyConnectedNetwork(
input_size, [], encoding_size
)
self.dynamics_state_network = FullyConnectedNetwork(
encoding_size + self.action_space_n, [hidden_size], encoding_size
)
self.dynamics_reward_network = FullyConnectedNetwork(
encoding_size + self.action_space_n, [hidden_size], 1
)
self.prediction_actor_network = FullyConnectedNetwork(
encoding_size, [], self.action_space_n, activation=None
)
self.prediction_value_network = FullyConnectedNetwork(
encoding_size, [], 1, activation=None
)
def prediction(self, state):
actor_logit = self.prediction_actor_network(state)
value = self.prediction_value_network(state)
return actor_logit, value
def representation(self, observation):
return self.representation_network(observation)
def dynamics(self, state, action):
action_one_hot = (
torch.zeros((action.shape[0], self.action_space_n))
.to(action.device)
.float()
)
action_one_hot.scatter_(1, action.long(), 1.0)
x = torch.cat((state, action_one_hot), dim=1)
next_state = self.dynamics_state_network(x)
reward = self.dynamics_reward_network(x)
return next_state, reward
def initial_inference(self, observation):
state = self.representation(observation)
actor_logit, value = self.prediction(state)
return (
value,
torch.zeros(len(observation)).to(observation.device),
actor_logit,
state,
)
def recurrent_inference(self, hidden_state, action):
state, reward = self.dynamics(hidden_state, action)
actor_logit, value = self.prediction(state)
return value, reward, actor_logit, state
def update_weights(optimizer, model, batch, config):
observation_batch, action_batch, target_reward, target_value, target_policy = batch
observation_batch = torch.tensor(observation_batch).float().to(config.device)
action_batch = torch.tensor(action_batch).float().to(config.device).unsqueeze(-1)
target_value = torch.tensor(target_value).float().to(config.device)
target_reward = torch.tensor(target_reward).float().to(config.device)
target_policy = torch.tensor(target_policy).float().to(config.device)
value, reward, policy_logits, hidden_state = model.initial_inference(
observation_batch
)
predictions = [(value, reward, policy_logits)]
for action_i in range(config.num_unroll_steps):
value, reward, policy_logits, hidden_state = model.recurrent_inference(
hidden_state, action_batch[:, action_i]
)
predictions.append((value, reward, policy_logits))
loss = 0
for i, prediction in enumerate(predictions):
value, reward, policy_logits = prediction
loss += loss_function(
value.squeeze(-1),
reward.squeeze(-1),
policy_logits,
target_value[:, i],
target_reward[:, i],
target_policy[:, i, :],
)
# Scale gradient by number of unroll steps (See paper Training appendix)
loss = loss.mean() / config.num_unroll_steps
# Optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
return loss.item()
def loss_function(
value, reward, policy_logits, target_value, target_reward, target_policy
):
# TODO: paper promotes cross entropy instead of MSE
value_loss = torch.nn.MSELoss(reduction="none")(value, target_value)
reward_loss = torch.nn.MSELoss(reduction="none")(reward, target_reward)
policy_loss = -(torch.log_softmax(policy_logits, dim=1) * target_policy).sum(1)
return value_loss + reward_loss + policy_loss
class FullyConnectedNetwork(torch.nn.Module):
def __init__(
self, input_size, layers_sizes, output_size, activation=torch.nn.Tanh()
):
super(FullyConnectedNetwork, self).__init__()
layers_sizes.insert(0, input_size)
layers = []
if 1 < len(layers_sizes):
for i in range(len(layers_sizes) - 1):
layers.extend(
[
torch.nn.Linear(layers_sizes[i], layers_sizes[i + 1]),
torch.nn.ReLU(),
]
)
layers.append(torch.nn.Linear(layers_sizes[-1], output_size))
if activation:
layers.append(activation)
self.layers = torch.nn.ModuleList(layers)
def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
@ray.remote
class SharedStorage:
def __init__(self, model):
self.training_step = 0
self.model = model
def get_weights(self):
return self.model.state_dict()
def set_weights(self, weights):
return self.model.load_state_dict(weights)
def set_training_step(self, training_step):
self.training_step = training_step
def get_training_step(self):
return self.training_step
import math
import numpy
import ray
import torch
@ray.remote # (num_gpus=1) # Uncoomment num_gpus and model.to(config.device) to self-play on GPU
def run_selfplay(Game, config, shared_storage, replay_buffer, model, seed):
"""
Function which run simultaneously in multiple threads and is continuously playing games and saving them to the replay-buffer.
"""
# model.to(config.device) # Uncoomment this line and num_gpus from the decorator to self-play on GPU
with torch.no_grad():
while True:
# Initialize a self-play
model.load_state_dict(ray.get(shared_storage.get_weights.remote()))
game = Game(seed)
done = False
game_history = GameHistory(config.discount)
# Self-play with actions based on the Monte Carlo tree search at each moves
observation = game.reset()
game_history.observation_history.append(observation)
while not done and len(game_history.history) < config.max_moves:
root = MCTS(config).run(model, observation, True)
temperature = config.visit_softmax_temperature_fn(
num_moves=len(game_history.history),
trained_steps=ray.get(shared_storage.get_training_step.remote()),
)
action = select_action(root, temperature)
observation, reward, done = game.step(action)
game_history.observation_history.append(observation)
game_history.rewards.append(reward)
game_history.history.append(action)
game_history.store_search_statistics(root, config.action_space)
game.close()
# Save the game history
replay_buffer.save_game.remote(game_history)
# Game independant
class MCTS:
"""
Core Monte Carlo Tree Search algorithm.
To decide on an action, we run N simulations, always starting at the root of
the search tree and traversing the tree according to the UCB formula until we
reach a leaf node.
"""
def __init__(self, config):
self.config = config
def run(self, model, observation, add_exploration_noise):
"""
At the root of the search tree we use the representation function to obtain a hidden state given the current observation.
We then run a Monte Carlo Tree Search using only action sequences and the model learned by the network.
"""
root = Node(0)
observation = (
torch.from_numpy(observation).to(self.config.device).float().unsqueeze(0)
)
_, expected_reward, policy_logits, hidden_state = model.initial_inference(
observation
)
root.expand(
self.config.action_space, expected_reward, policy_logits, hidden_state
)
if add_exploration_noise:
root.add_exploration_noise(
dirichlet_alpha=self.config.root_dirichlet_alpha,
exploration_fraction=self.config.root_exploration_fraction,
)
min_max_stats = MinMaxStats(
self.config.min_known_bound, self.config.max_known_bound
)
for _ in range(self.config.num_simulations):
node = root
search_path = [node]
while node.expanded():
action, node = self.select_child(node, min_max_stats)
last_action = action
search_path.append(node)
# Inside the search tree we use the dynamics function to obtain the next hidden state given an action and the previous hidden state
parent = search_path[-2]
value, reward, policy_logits, hidden_state = model.recurrent_inference(
parent.hidden_state,
torch.tensor([[last_action]]).to(parent.hidden_state.device),
)
node.expand(self.config.action_space, reward, policy_logits, hidden_state)
self.backpropagate(search_path, value.item(), min_max_stats)
return root
def select_child(self, node, min_max_stats):
"""
Select the child with the highest UCB score.
"""
_, action, child = max(
(self.ucb_score(node, child, min_max_stats), action, child)
for action, child in node.children.items()
)
return action, child
def ucb_score(self, parent, child, min_max_stats):
"""
The score for a node is based on its value, plus an exploration bonus based on the prior.
"""
pb_c = (
math.log(
(parent.visit_count + self.config.pb_c_base + 1) / self.config.pb_c_base
)
+ self.config.pb_c_init
)
pb_c *= math.sqrt(parent.visit_count) / (child.visit_count + 1)
prior_score = pb_c * child.prior
value_score = min_max_stats.normalize(child.value())
return prior_score + value_score
def backpropagate(self, search_path, value, min_max_stats):
"""
At the end of a simulation, we propagate the evaluation all the way up the tree to the root.
"""
for node in search_path:
# Always the same player, the other players minds should be modeled in network because environment do not act always in the best way to make you lose
node.value_sum += value # if node.to_play == to_play else -value
node.visit_count += 1
min_max_stats.update(node.value())
value = node.reward + self.config.discount * value
def select_action(node, temperature, random=False):
"""
Select action according to the vivist count distribution and the temperature.
The temperature is changed dynamically with the visit_softmax_temperature function in the config.
"""
visit_counts = numpy.array(
[[child.visit_count, action] for action, child in node.children.items()]
).T
if temperature == 0:
action_pos = numpy.argmax(visit_counts[0])
else:
# See paper Data Generation appendix
visit_count_distribution = visit_counts[0] ** (1 / temperature)
visit_count_distribution = visit_count_distribution / sum(
visit_count_distribution
)
action_pos = numpy.random.choice(
len(visit_counts[1]), p=visit_count_distribution
)
if random:
action_pos = numpy.random.choice(len(visit_counts[1]))
return visit_counts[1][action_pos]
class Node:
def __init__(self, prior):
self.visit_count = 0
self.to_play = -1
self.prior = prior
self.value_sum = 0
self.children = {}
self.hidden_state = None
self.reward = 0
def expanded(self):
return len(self.children) > 0
def value(self):
if self.visit_count == 0:
return 0
return self.value_sum / self.visit_count
def expand(self, actions, reward, policy_logits, hidden_state):
"""
We expand a node using the value, reward and policy prediction obtained from the neural network.
"""
self.reward = reward
self.hidden_state = hidden_state
policy = {a: math.exp(policy_logits[0][a]) for a in actions}
policy_sum = sum(policy.values())
for action, p in policy.items():
self.children[action] = Node(p / policy_sum)
def add_exploration_noise(self, dirichlet_alpha, exploration_fraction):
"""
At the start of each search, we add dirichlet noise to the prior of the root to encourage the search to explore new actions.
"""
actions = list(self.children.keys())
noise = numpy.random.dirichlet([dirichlet_alpha] * len(actions))
frac = exploration_fraction
for a, n in zip(actions, noise):
self.children[a].prior = self.children[a].prior * (1 - frac) + n * frac
class GameHistory:
"""
Store only usefull information of a self-play game.
"""
def __init__(self, discount):
self.observation_history = []
self.history = []
self.rewards = []
self.child_visits = []
self.root_values = []
self.discount = discount
def store_search_statistics(self, root, action_space):
sum_visits = sum(child.visit_count for child in root.children.values())
self.child_visits.append(
[
root.children[a].visit_count / sum_visits if a in root.children else 0
for a in action_space
]
)
self.root_values.append(root.value())
@ray.remote
class ReplayBuffer:
# Store list of game history and generate batch
def __init__(self, config):
self.window_size = config.window_size
self.batch_size = config.batch_size
self.buffer = []
def save_game(self, game_history):
if len(self.buffer) > self.window_size:
self.buffer.pop(0)
self.buffer.append(game_history)
def sample_batch(self, num_unroll_steps, td_steps):
observation_batch, action_batch, reward_batch, value_batch, policy_batch = (
[],
[],
[],
[],
[],
)
for _ in range(self.batch_size):
game_history = self.sample_game()
game_pos = self.sample_position(game_history)
actions = game_history.history[game_pos : game_pos + num_unroll_steps]
# Repeat precedent action to make "actions" of length "num_unroll_steps"
actions.extend(
[actions[-1] for _ in range(num_unroll_steps - len(actions) + 1)]
)
observation_batch.append(game_history.observation_history[game_pos])
action_batch.append(actions)
value, reward, policy = self.make_target(
game_history, game_pos, num_unroll_steps, td_steps
)
reward_batch.append(reward)
value_batch.append(value)
policy_batch.append(policy)
return observation_batch, action_batch, reward_batch, value_batch, policy_batch
def sample_game(self):
"""
Sample game from buffer either uniformly or according to some priority.
"""
# TODO: sample with probability link to the highest difference between real and predicted value (see paper appendix Training)
return self.buffer[numpy.random.choice(range(len(self.buffer)))]
def sample_position(self, game):
"""
Sample position from game either uniformly or according to some priority.
"""
# TODO: according to some priority
return numpy.random.choice(range(len(game.rewards)))
def make_target(self, game, state_index, num_unroll_steps, td_steps):
"""
The value target is the discounted root value of the search tree td_steps into the future, plus the discounted sum of all rewards until then.
"""
target_values, target_rewards, target_policies = [], [], []
for current_index in range(state_index, state_index + num_unroll_steps + 1):
bootstrap_index = current_index + td_steps
if bootstrap_index < len(game.root_values):
value = game.root_values[bootstrap_index] * game.discount ** td_steps
else:
value = 0
for i, reward in enumerate(game.rewards[current_index:bootstrap_index]):
value += reward * game.discount ** i
if current_index < len(game.root_values):
target_values.append(value)
target_rewards.append(game.rewards[current_index])
target_policies.append(game.child_visits[current_index])
else:
# States past the end of games are treated as absorbing states
target_values.append(0)
target_rewards.append(0)
# Uniform policy to give the tensor a valid dimension
target_policies.append(
[
1 / len(game.child_visits[0])
for _ in range(len(game.child_visits[0]))
]
)
return target_values, target_rewards, target_policies
def length(self):
return len(self.buffer)
class MinMaxStats:
"""
A class that holds the min-max values of the tree.
"""
def __init__(self, min_value_bound, max_value_bound):
self.maximum = min_value_bound if min_value_bound else -float("inf")
self.minimum = max_value_bound if max_value_bound else float("inf")
def update(self, value):
self.maximum = max(self.maximum, value)
self.minimum = min(self.minimum, value)
def normalize(self, value):
if self.maximum > self.minimum:
# We normalize only when we have set the maximum and minimum values
return (value - self.minimum) / (self.maximum - self.minimum)
return value
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment