Commit 2267b85e by lsy

Initial commit

parents
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
*.local
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
fig/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
*.jpg
*.jpeg
.idea/
*.npy
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
runs/
prev_runs/
saved_models/
*.log
*.jpg
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# DS Store
.DS_Store
#saved_model
*.pth
*.pt
*.log
*.avi
.idea/
runs/
# Learning Subgoal Representations with Slow Dynamics
We propose a slowness objective to effectively learn the subgoal representation
for goal-conditioned hierarchical reinforcement learning. [Our paper](https://openreview.net/pdf?id=wxRwhSdORKG) is accepted by ICLR 2021.
The python dependencies are as follows.
* Python 3.6 or above
* [PyTorch](https://pytorch.org/)
* [Gym](https://gym.openai.com/)
* [Mujoco](https://www.roboti.us)
Run the codes with ``python train_hier_sac.py``. The tensorboard files are saved in the ``runs`` folder and the
trained models are saved in the ``saved_models`` folder.
import numpy as np
class her_sampler:
def __init__(self, replay_strategy, replay_k, threshold, future_step, dense_reward, direction_reward, low_reward_coeff):
self.replay_strategy = replay_strategy
self.replay_k = replay_k
if self.replay_strategy == 'future':
self.future_p = 1 - (1. / (1 + replay_k))
else:
self.future_p = 0
self.threshold = threshold
self.furture_step = future_step
self.border_index = None
self.direction_reward = direction_reward
# reward type not use in direction reward
if not dense_reward:
self.reward_type = 'sparse'
else:
self.reward_type = 'dense'
self.reward_coeff = low_reward_coeff
def reward_func(self, state, goal, info=None):
assert state.shape == goal.shape
dist = np.linalg.norm(state - goal, axis=-1)
if self.reward_type == 'sparse':
return -(dist > self.threshold).astype(np.float32)
else:
return -dist * self.reward_coeff
def direction_reward_func(self, ag_next, goal, ag):
# l2 distance reward
assert ag.shape == goal.shape
dist = np.linalg.norm(ag + goal - ag_next, axis=-1)
return -dist
# # cosine distance reward
# a_direction = ag_next - ag # achieved direction
# cos_dist = np.sum(np.multiply(a_direction, goal), axis=1) / (
# (np.linalg.norm(a_direction, axis=1) * np.linalg.norm(goal, axis=1)) + 1e-6)
# return cos_dist
def sample_her_transitions(self, episode_batch, batch_size_in_transitions):
T = episode_batch['actions'].shape[1]
rollout_batch_size = episode_batch['actions'].shape[0]
batch_size = batch_size_in_transitions
# select which rollouts and which timesteps to be used
episode_idxs = np.random.randint(0, rollout_batch_size, batch_size)
t_samples = np.random.randint(T, size=batch_size)
transitions = {key: episode_batch[key][episode_idxs, t_samples].copy() for key in episode_batch.keys()}
# her idx
her_indexes = np.where(np.random.uniform(size=batch_size) < self.future_p)
# cheat in her for large step length
target_index = np.minimum(T, t_samples + self.furture_step)
future_offset = np.random.uniform(size=batch_size) * (target_index - t_samples)
future_offset = future_offset.astype(int)
future_t = (t_samples + 1 + future_offset)[her_indexes]
# replace goal with achieved goal
future_ag = episode_batch['ag'][episode_idxs[her_indexes], future_t]
transitions['g'][her_indexes] = future_ag
# to get the params to re-compute reward
if not self.direction_reward:
transitions['r'] = np.expand_dims(
self.reward_func(transitions['ag_next'], transitions['g'],
None), 1)
else:
transitions['r'] = np.expand_dims(
self.direction_reward_func(transitions['ag_next'].copy(), transitions['g'].copy(),
transitions['ag'].copy()), 1)
transitions = {k: transitions[k].reshape(batch_size, *transitions[k].shape[1:]) for k in transitions.keys()}
return transitions
def sample_her_energy(self, episode_batch, batch_size_in_transitions, temperature=1.0):
T = episode_batch['actions'].shape[1]
rollout_batch_size = episode_batch['actions'].shape[0]
batch_size = batch_size_in_transitions
# select which rollouts and which timesteps to be used
energy_trajectory = episode_batch['e']
p_trajectory = np.power(energy_trajectory, 1 / (temperature + 1e-2))
p_trajectory = p_trajectory / p_trajectory.sum()
episode_idxs = np.random.choice(rollout_batch_size, size=batch_size, replace=True, p=p_trajectory.flatten())
t_samples = np.random.randint(T, size=batch_size)
transitions = {}
for key in episode_batch.keys():
if not key == 'e':
transitions[key] = episode_batch[key][episode_idxs, t_samples].copy()
# her idx
her_indexes = np.where(np.random.uniform(size=batch_size) < self.future_p)
# cheat in her for large step length
target_index = np.minimum(T, t_samples + self.furture_step)
future_offset = np.random.uniform(size=batch_size) * (target_index - t_samples)
future_offset = future_offset.astype(int)
future_t = (t_samples + 1 + future_offset)[her_indexes]
# replace go with achieved goal
future_ag = episode_batch['ag'][episode_idxs[her_indexes], future_t]
transitions['g'][her_indexes] = future_ag
# to get the params to re-compute reward
if not self.direction_reward:
transitions['r'] = np.expand_dims(
self.reward_func(transitions['ag_next'], transitions['g'],
None), 1)
else:
transitions['r'] = np.expand_dims(
self.direction_reward_func(transitions['ag_next'], transitions['g'],
transitions['ag']), 1)
transitions = {k: transitions[k].reshape(batch_size, *transitions[k].shape[1:]) for k in transitions.keys()}
return transitions
def adjust_replay_k(self):
if self.replay_k > 1:
self.replay_k -= 1
if self.replay_strategy == 'future':
self.future_p = 1 - (1. / (1 + self.replay_k))
else:
self.future_p = 0
### Description
------------
Reimplementation of [Soft Actor-Critic Algorithms and Applications](https://arxiv.org/pdf/1812.05905.pdf) and a deterministic variant of SAC from [Soft Actor-Critic: Off-Policy Maximum Entropy Deep Reinforcement
Learning with a Stochastic Actor](https://arxiv.org/pdf/1801.01290.pdf).
Added another branch for [Soft Actor-Critic: Off-Policy Maximum Entropy Deep Reinforcement
Learning with a Stochastic Actor](https://arxiv.org/pdf/1801.01290.pdf) -> [SAC_V](https://github.com/pranz24/pytorch-soft-actor-critic/tree/SAC_V).
### Requirements
------------
* [mujoco-py](https://github.com/openai/mujoco-py)
* [TensorboardX](https://github.com/lanpa/tensorboardX)
* [PyTorch](http://pytorch.org/)
### Default Arguments and Usage
------------
### Usage
```
usage: main.py [-h] [--env-name ENV_NAME] [--policy POLICY] [--eval EVAL]
[--gamma G] [--tau G] [--lr G] [--alpha G]
[--automatic_entropy_tuning G] [--seed N] [--batch_size N]
[--num_steps N] [--hidden_size N] [--updates_per_step N]
[--start_steps N] [--target_update_interval N]
[--replay_size N] [--cuda]
```
(Note: There is no need for setting Temperature(`--alpha`) if `--automatic_entropy_tuning` is True.)
#### For SAC
```
python main.py --env-name Humanoid-v2 --alpha 0.05
```
#### For SAC (Hard Update)
```
python main.py --env-name Humanoid-v2 --alpha 0.05 --tau 1 --target_update_interval 1000
```
#### For SAC (Deterministic, Hard Update)
```
python main.py --env-name Humanoid-v2 --policy Deterministic --tau 1 --target_update_interval 1000
```
### Arguments
------------
```
PyTorch Soft Actor-Critic Args
optional arguments:
-h, --help show this help message and exit
--env-name ENV_NAME Mujoco Gym environment (default: HalfCheetah-v2)
--policy POLICY Policy Type: Gaussian | Deterministic (default:
Gaussian)
--eval EVAL Evaluates a policy a policy every 10 episode (default:
True)
--gamma G discount factor for reward (default: 0.99)
--tau G target smoothing coefficient(τ) (default: 5e-3)
--lr G learning rate (default: 3e-4)
--alpha G Temperature parameter α determines the relative
importance of the entropy term against the reward
(default: 0.2)
--automatic_entropy_tuning G
Automaically adjust α (default: False)
--seed N random seed (default: 123456)
--batch_size N batch size (default: 256)
--num_steps N maximum number of steps (default: 1e6)
--hidden_size N hidden size (default: 256)
--updates_per_step N model updates per simulator step (default: 1)
--start_steps N Steps sampling random actions (default: 1e4)
--target_update_interval N
Value target update per no. of updates per step
(default: 1)
--replay_size N size of replay buffer (default: 1e6)
--cuda run on CUDA (default: False)
```
| Environment **(`--env-name`)**| Temperature **(`--alpha`)**|
| ---------------| -------------|
| HalfCheetah-v2| 0.2|
| Hopper-v2| 0.2|
| Walker2d-v2| 0.2|
| Ant-v2| 0.2|
| Humanoid-v2| 0.05|
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Normal
LOG_SIG_MAX = 2
LOG_SIG_MIN = -20
epsilon = 1e-6
# Initialize Policy weights
def weights_init_(m):
if isinstance(m, nn.Linear):
torch.nn.init.xavier_uniform_(m.weight, gain=1)
torch.nn.init.constant_(m.bias, 0)
class ValueNetwork(nn.Module):
def __init__(self, num_inputs, hidden_dim):
super(ValueNetwork, self).__init__()
self.linear1 = nn.Linear(num_inputs, hidden_dim)
self.linear2 = nn.Linear(hidden_dim, hidden_dim)
self.linear3 = nn.Linear(hidden_dim, 1)
self.apply(weights_init_)
def forward(self, state):
x = F.relu(self.linear1(state))
x = F.relu(self.linear2(x))
x = self.linear3(x)
return x
class QNetwork(nn.Module):
def __init__(self, num_inputs, num_actions, hidden_dim):
super(QNetwork, self).__init__()
# Q1 architecture
self.linear1 = nn.Linear(num_inputs + num_actions, hidden_dim)
self.linear2 = nn.Linear(hidden_dim, hidden_dim)
self.linear3 = nn.Linear(hidden_dim, 1)
# Q2 architecture
self.linear4 = nn.Linear(num_inputs + num_actions, hidden_dim)
self.linear5 = nn.Linear(hidden_dim, hidden_dim)
self.linear6 = nn.Linear(hidden_dim, 1)
self.apply(weights_init_)
def forward(self, state, action):
xu = torch.cat([state, action], 1)
x1 = F.relu(self.linear1(xu))
x1 = F.relu(self.linear2(x1))
x1 = self.linear3(x1)
x2 = F.relu(self.linear4(xu))
x2 = F.relu(self.linear5(x2))
x2 = self.linear6(x2)
return x1, x2
class QNetwork_out(nn.Module):
def __init__(self, num_inputs, num_actions, hidden_dim):
super(QNetwork_out, self).__init__()
# Q1 architecture
self.linear1 = nn.Linear(num_inputs, hidden_dim)
self.linear2 = nn.Linear(hidden_dim, hidden_dim)
self.linear3 = nn.Linear(hidden_dim, num_actions)
self.apply(weights_init_)
def forward(self, state):
x1 = F.relu(self.linear1(state))
x1 = F.relu(self.linear2(x1))
x1 = self.linear3(x1)
return x1
class QNetwork_phi(nn.Module):
def __init__(self, num_inputs, num_actions, hidden_dim, abs_range, tanh_output):
super(QNetwork_phi, self).__init__()
# Q1 network
# obs encoder
obs_models = [nn.Linear(num_inputs-2, hidden_dim)]
obs_models += [nn.ReLU(), nn.Linear(hidden_dim, hidden_dim)]
obs_models += [nn.ReLU(), nn.Linear(hidden_dim, 2)]
self.obs_encoder = nn.Sequential(*obs_models)
# goal input
self.action_input = nn.Linear(num_actions+2, int(hidden_dim / 2))
self.dynamics_layer = nn.Linear(int(hidden_dim / 2) + 2, hidden_dim)
self.output_layer = nn.Linear(hidden_dim, 1)
# Q2 architecture
self.linear4 = nn.Linear(num_inputs + num_actions, hidden_dim)
self.linear5 = nn.Linear(hidden_dim, hidden_dim)
self.linear6 = nn.Linear(hidden_dim, 1)
self.tanh_output = tanh_output
self.abs_range = abs_range
self.apply(weights_init_)
def forward(self, state, action):
xu = torch.cat([state, action], 1)
x2 = F.relu(self.linear4(xu))
x2 = F.relu(self.linear5(x2))
x2 = self.linear6(x2)
state = state[:, :-2]
action = torch.cat([state[:, -2:], action], 1)
latent_s = self.obs_encoder(state)
if self.tanh_output:
latent_s = self.abs_range * torch.tanh(latent_s)
action_out = self.action_input(action)
action_out = F.relu(action_out)
x = torch.cat([latent_s, action_out], 1)
x = self.dynamics_layer(x)
x = F.relu(x)
x1 = self.output_layer(x)
return x1, x2
def phi(self, obs):
if len(obs.shape) is 1:
obs = obs.unsqueeze(0)
s = self.obs_encoder(obs)
if self.tanh_output:
s = self.abs_range * torch.tanh(s)
return s
class GaussianPolicy(nn.Module):
def __init__(self, num_inputs, num_actions, hidden_dim, action_space, goal_dim):
super(GaussianPolicy, self).__init__()
# self.linear1 = nn.Linear(num_inputs - goal_dim, hidden_dim)
# self.goal_input = nn.Linear(goal_dim, hidden_dim)
self.linear1 = nn.Linear(num_inputs, hidden_dim)
self.linear2 = nn.Linear(hidden_dim, hidden_dim)
self.goal_dim = goal_dim
self.mean_linear = nn.Linear(hidden_dim, num_actions)
self.log_std_linear = nn.Linear(hidden_dim, num_actions)
# add phi layer
# self.phi_layer1 = nn.Linear(hidden_dim, hidden_dim)
# self.phi_layer2 = nn.Linear(hidden_dim, 2)
# self.phi_layer3 = nn.Linear(hidden_dim, 2)
self.apply(weights_init_)
# action rescaling
if action_space is None:
self.action_scale = torch.tensor(1.)
self.action_bias = torch.tensor(0.)
else:
self.action_scale = torch.FloatTensor(
(action_space.high - action_space.low) / 2.)
self.action_bias = torch.FloatTensor(
(action_space.high + action_space.low) / 2.)
def forward(self, state):
# x = self.linear1(state[..., :-self.goal_dim]) + self.goal_input(state[..., -self.goal_dim:])
x = self.linear1(state)
x = F.relu(x)
x = F.relu(self.linear2(x))
mean = self.mean_linear(x)
log_std = self.log_std_linear(x)
log_std = torch.clamp(log_std, min=LOG_SIG_MIN, max=LOG_SIG_MAX)
return mean, log_std
def phi(self, state):
x = F.relu(self.linear1(state))
x = F.relu(self.phi_layer1(x))
phi = self.phi_layer2(x)
return phi
def sample(self, state):
mean, log_std = self.forward(state)
std = log_std.exp()
normal = Normal(mean, std)
x_t = normal.rsample() # for reparameterization trick (mean + std * N(0,1))
y_t = torch.tanh(x_t)
action = y_t * self.action_scale + self.action_bias
log_prob = normal.log_prob(x_t)
# Enforcing Action Bound
log_prob -= torch.log(self.action_scale * (1 - y_t.pow(2)) + epsilon)
log_prob = log_prob.sum(1, keepdim=True)
mean = torch.tanh(mean) * self.action_scale + self.action_bias
return action, log_prob, mean
def to(self, device):
self.action_scale = self.action_scale.to(device)
self.action_bias = self.action_bias.to(device)
return super(GaussianPolicy, self).to(device)
class DeterministicPolicy(nn.Module):
def __init__(self, num_inputs, num_actions, hidden_dim, action_space=None):
super(DeterministicPolicy, self).__init__()
self.linear1 = nn.Linear(num_inputs, hidden_dim)
self.linear2 = nn.Linear(hidden_dim, hidden_dim)
self.mean = nn.Linear(hidden_dim, num_actions)
self.noise = torch.Tensor(num_actions)
self.apply(weights_init_)
# action rescaling
if action_space is None:
self.action_scale = 1.
self.action_bias = 0.
else:
self.action_scale = torch.FloatTensor(
(action_space.high - action_space.low) / 2.)
self.action_bias = torch.FloatTensor(
(action_space.high + action_space.low) / 2.)
def forward(self, state):
x = F.relu(self.linear1(state))
x = F.relu(self.linear2(x))
mean = torch.tanh(self.mean(x)) * self.action_scale + self.action_bias
return mean
def sample(self, state):
mean = self.forward(state)
noise = self.noise.normal_(0., std=0.1)
noise = noise.clamp(-0.25, 0.25)
action = mean + noise
return action, torch.tensor(0.), mean
def to(self, device):
self.action_scale = self.action_scale.to(device)
self.action_bias = self.action_bias.to(device)
self.noise = self.noise.to(device)
return super(DeterministicPolicy, self).to(device)
import random
import numpy as np
class ReplayMemory:
def __init__(self, capacity):
self.capacity = capacity
self.buffer = []
self.position = 0
def push(self, state, action, reward, next_state, done, epoch):
if len(self.buffer) < self.capacity:
self.buffer.append(None)
self.buffer[self.position] = (state, action, reward, next_state, done, epoch+1)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done, _ = map(np.stack, zip(*batch))
return state, action, reward, next_state, done
def __len__(self):
return len(self.buffer)
def get_obs(self):
obs = [x[0] for x in self.buffer]
obs = np.array(obs)
obs_next = [x[3] for x in self.buffer]
obs_next = np.array(obs_next)
return obs.copy(), obs_next.copy()
def pri_sample(self, batch_size, temperature=1.):
tmp_buffer = np.array(self.buffer)
epoch = tmp_buffer[:, -1]
p_trajectory = np.power(epoch, 1 / (temperature + 1e-2))
p_trajectory = p_trajectory / p_trajectory.sum()
p_trajectory = p_trajectory.astype(np.float64)
idxs = np.random.choice(len(self.buffer), size=batch_size, replace=False, p=p_trajectory)
batch = [self.buffer[i] for i in idxs]
state, action, reward, next_state, done, _ = map(np.stack, zip(*batch))
return state, action, reward, next_state, done
def random_sample(self, batch_size):
idxs = np.random.randint(0, len(self.buffer), batch_size)
obs = [self.buffer[i][0] for i in idxs]
obs = np.array(obs)
obs_next = [self.buffer[i][3] for i in idxs]
obs_next = np.array(obs_next)
return obs, obs_next
class Array_ReplayMemory:
def __init__(self, capacity, env_params):
self.capacity = capacity
action_dim = env_params['real_goal_dim']
obs_dim = env_params['obs'] + env_params['goal']
# create the buffer to store info
self.buffers = {'obs': np.empty([capacity, obs_dim]),
'actions': np.empty([capacity, action_dim]),
'reward': np.empty([capacity]),
'next_obs': np.empty([capacity, obs_dim]),
'done': np.empty([capacity])
}
self.position = 0
self.current_size = 0
def push(self, state, action, reward, next_state, done, epoch):
self.buffers['obs'][self.position] = state
self.buffers['actions'][self.position] = action
self.buffers['reward'][self.position] = reward
self.buffers['next_obs'][self.position] = next_state
self.buffers['done'][self.position] = done
self.position = (self.position + 1) % self.capacity
if self.current_size + 1 < self.capacity:
self.current_size += 1
def sample(self, batch_size):
idx = np.random.randint(0, self.current_size, batch_size)
state = self.buffers['obs'][idx]
action = self.buffers['actions'][idx]
reward = self.buffers['reward'][idx]
next_state = self.buffers['next_obs'][idx]
done = self.buffers['done'][idx]
return state, action, reward, next_state, done
def __len__(self):
return self.current_size
import os
import torch
import torch.nn.functional as F
from torch.optim import Adam
from algos.sac.utils import soft_update, hard_update
from algos.sac.model import GaussianPolicy, QNetwork, DeterministicPolicy, QNetwork_phi
class SAC(object):
def __init__(self, num_inputs, action_space, args, pri_replay, goal_dim, gradient_flow_value, abs_range, tanh_output):
self.gamma = args.gamma
self.tau = args.tau
self.alpha = args.alpha
self.pri_replay = pri_replay
self.policy_type = args.policy
self.target_update_interval = args.target_update_interval
self.automatic_entropy_tuning = args.automatic_entropy_tuning
self.device = args.device
self.gradient_flow_value = gradient_flow_value
if not gradient_flow_value:
self.critic = QNetwork(num_inputs, action_space.shape[0], args.hidden_size).to(device=self.device)
self.critic_optim = Adam(self.critic.parameters(), lr=args.lr)
self.critic_target = QNetwork(num_inputs, action_space.shape[0], args.hidden_size).to(self.device)
hard_update(self.critic_target, self.critic)
else:
self.critic = QNetwork_phi(num_inputs, action_space.shape[0], args.hidden_size, abs_range, tanh_output).to(device=self.device)
self.critic_optim = Adam(self.critic.parameters(), lr=args.lr)
self.critic_target = QNetwork_phi(num_inputs, action_space.shape[0], args.hidden_size, abs_range, tanh_output).to(self.device)
hard_update(self.critic_target, self.critic)
if self.policy_type == "Gaussian":
# Target Entropy = −dim(A) (e.g. , -6 for HalfCheetah-v2) as given in the paper
if self.automatic_entropy_tuning is True:
self.target_entropy = -torch.prod(torch.Tensor(action_space.shape).to(self.device)).item()
self.log_alpha = torch.zeros(1, requires_grad=True, device=self.device)
self.alpha_optim = Adam([self.log_alpha], lr=args.lr)
self.policy = GaussianPolicy(num_inputs, action_space.shape[0], args.hidden_size, action_space, goal_dim).to(self.device)
self.policy_optim = Adam(self.policy.parameters(), lr=args.lr)
self.policy_target = GaussianPolicy(num_inputs, action_space.shape[0], args.hidden_size, action_space,
goal_dim).to(self.device)
hard_update(self.policy_target, self.policy)
else:
self.alpha = 0
self.automatic_entropy_tuning = False
self.policy = DeterministicPolicy(num_inputs, action_space.shape[0], args.hidden_size, action_space).to(self.device)
self.policy_optim = Adam(self.policy.parameters(), lr=args.lr)
def select_action(self, state, evaluate=False):
state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
if evaluate is False:
action, _, _ = self.policy.sample(state)
else:
_, _, action = self.policy.sample(state)
return action.detach().cpu().numpy()[0]
def update_parameters(self, memory, batch_size, env_params, hi_sparse, feature_data):
# Sample a batch from memory
if self.pri_replay:
state_batch, action_batch, reward_batch, next_state_batch, mask_batch = memory.pri_sample(batch_size=batch_size)
else:
state_batch, action_batch, reward_batch, next_state_batch, mask_batch = memory.sample(batch_size=batch_size)
state_batch = torch.FloatTensor(state_batch).to(self.device)
next_state_batch = torch.FloatTensor(next_state_batch).to(self.device)
action_batch = torch.FloatTensor(action_batch).to(self.device)
reward_batch = torch.FloatTensor(reward_batch).to(self.device).unsqueeze(1)
mask_batch = torch.FloatTensor(mask_batch).to(self.device).unsqueeze(1)
with torch.no_grad():
next_state_action, next_state_log_pi, _ = self.policy.sample(next_state_batch)
qf1_next_target, qf2_next_target = self.critic_target(next_state_batch, next_state_action)
min_qf_next_target = torch.min(qf1_next_target, qf2_next_target) - self.alpha * next_state_log_pi
# print("min_qf_target", min_qf_next_target.shape)
next_q_value = reward_batch + mask_batch * self.gamma * (min_qf_next_target)
if hi_sparse:
# clip target value
next_q_value = torch.clamp(next_q_value, -env_params['max_timesteps'], 0.)
qf1, qf2 = self.critic(state_batch, action_batch) # Two Q-functions to mitigate positive bias in the policy improvement step
# print("qf1", qf1.shape)
# print("next_q", next_q_value.shape)
qf1_loss = F.mse_loss(qf1, next_q_value) # JQ = 𝔼(st,at)~D[0.5(Q1(st,at) - r(st,at) - γ(𝔼st+1~p[V(st+1)]))^2]
qf2_loss = F.mse_loss(qf2, next_q_value) # JQ = 𝔼(st,at)~D[0.5(Q1(st,at) - r(st,at) - γ(𝔼st+1~p[V(st+1)]))^2]
pi, log_pi, _ = self.policy.sample(state_batch)
qf1_pi, qf2_pi = self.critic(state_batch, pi)
min_qf_pi = torch.min(qf1_pi, qf2_pi)
policy_loss = ((self.alpha * log_pi) - min_qf_pi).mean() # Jπ = 𝔼st∼D,εt∼N[α * logπ(f(εt;st)|st) − Q(st,f(εt;st))]
if feature_data is not None:
if self.gradient_flow_value:
obs, obs_next = self.critic.phi(feature_data[0]), self.critic.phi(feature_data[1])
min_dist = torch.clamp((obs - obs_next).pow(2).mean(dim=1), min=0.)
hi_obs, hi_obs_next = self.critic.phi(feature_data[2]), self.critic.phi(feature_data[3])
max_dist = torch.clamp(1 - (hi_obs - hi_obs_next).pow(2).mean(dim=1), min=0.)
representation_loss = (min_dist + max_dist).mean()
qf1_loss = qf1_loss * 0.1 + representation_loss
else:
obs, obs_next = self.policy.phi(feature_data[0]), self.policy.phi(feature_data[1])
min_dist = torch.clamp((obs - obs_next).pow(2).mean(dim=1), min=0.)
hi_obs, hi_obs_next = self.policy.phi(feature_data[2]), self.policy.phi(feature_data[3])
max_dist = torch.clamp(1 - (hi_obs - hi_obs_next).pow(2).mean(dim=1), min=0.)
representation_loss = (min_dist + max_dist).mean()
policy_loss += representation_loss
self.critic_optim.zero_grad()
qf1_loss.backward()
self.critic_optim.step()
self.critic_optim.zero_grad()
qf2_loss.backward()
self.critic_optim.step()
self.policy_optim.zero_grad()
policy_loss.backward()
self.policy_optim.step()
if self.automatic_entropy_tuning:
alpha_loss = -(self.log_alpha * (log_pi + self.target_entropy).detach()).mean()
self.alpha_optim.zero_grad()
alpha_loss.backward()
self.alpha_optim.step()
self.alpha = self.log_alpha.exp()
alpha_tlogs = self.alpha.clone() # For TensorboardX logs
else:
alpha_loss = torch.tensor(0.).to(self.device)
alpha_tlogs = torch.tensor(self.alpha) # For TensorboardX logs
soft_update(self.critic_target, self.critic, self.tau)
soft_update(self.policy_target, self.policy, self.tau)
return qf1_loss.item(), qf2_loss.item(), policy_loss.item(), alpha_loss.item(), alpha_tlogs.item()
# Save model parameters
def save_model(self, env_name, suffix="", actor_path=None, critic_path=None):
if not os.path.exists('models/'):
os.makedirs('models/')
if actor_path is None:
actor_path = "models/sac_actor_{}_{}".format(env_name, suffix)
if critic_path is None:
critic_path = "models/sac_critic_{}_{}".format(env_name, suffix)
print('Saving models to {} and {}'.format(actor_path, critic_path))
torch.save(self.policy.state_dict(), actor_path)
torch.save(self.critic.state_dict(), critic_path)
# Load model parameters
def load_model(self, actor_path, critic_path):
print('Loading models from {} and {}'.format(actor_path, critic_path))
if actor_path is not None:
self.policy.load_state_dict(torch.load(actor_path))
if critic_path is not None:
self.critic.load_state_dict(torch.load(critic_path))
import math
import torch
def create_log_gaussian(mean, log_std, t):
quadratic = -((0.5 * (t - mean) / (log_std.exp())).pow(2))
l = mean.shape
log_z = log_std
z = l[-1] * math.log(2 * math.pi)
log_p = quadratic.sum(dim=-1) - log_z.sum(dim=-1) - 0.5 * z
return log_p
def logsumexp(inputs, dim=None, keepdim=False):
if dim is None:
inputs = inputs.view(-1)
dim = 0
s, _ = torch.max(inputs, dim=dim, keepdim=True)
outputs = s + (inputs - s).exp().sum(dim=dim, keepdim=True).log()
if not keepdim:
outputs = outputs.squeeze(dim)
return outputs
def soft_update(target, source, tau):
for target_param, param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
def hard_update(target, source):
for target_param, param in zip(target.parameters(), source.parameters()):
target_param.data.copy_(param.data)
import numpy as np
class normalizer:
def __init__(self, size, eps=1e-2, default_clip_range=np.inf):
self.size = size
self.eps = eps
self.default_clip_range = default_clip_range
# some local information
self.local_sum = np.zeros(self.size, np.float32)
self.local_sumsq = np.zeros(self.size, np.float32)
self.local_count = np.zeros(1, np.float32)
# get the total sum sumsq and sum count
self.total_sum = np.zeros(self.size, np.float32)
self.total_sumsq = np.zeros(self.size, np.float32)
self.total_count = np.ones(1, np.float32)
# get the mean and std
self.mean = np.zeros(self.size, np.float32)
self.std = np.ones(self.size, np.float32)
# thread locker
# update the parameters of the normalizer
def update(self, v):
v = v.reshape(-1, self.size)
self.local_sum += v.sum(axis=0)
self.local_sumsq += (np.square(v)).sum(axis=0)
self.local_count[0] += v.shape[0]
def recompute_stats(self):
local_count = self.local_count.copy()
local_sum = self.local_sum.copy()
local_sumsq = self.local_sumsq.copy()
# reset
self.local_count[...] = 0
self.local_sum[...] = 0
self.local_sumsq[...] = 0
# update the total stuff
self.total_sum += local_sum
self.total_sumsq += local_sumsq
self.total_count += local_count
# calculate the new mean and std
self.mean = self.total_sum / self.total_count
self.std = np.sqrt(np.maximum(np.square(self.eps), (self.total_sumsq / self.total_count) - np.square(
self.total_sum / self.total_count)))
# normalize the observation
def normalize(self, v, clip_range=None):
# print('now normalize', v)
if clip_range is None:
clip_range = self.default_clip_range
# print((v - self.mean) / (self.std))
return np.clip((v - self.mean) / (self.std), -clip_range, clip_range)
import numpy as np
import torch
from torch import nn
class RandomPolicy(nn.Module):
def __init__(self, action_space, is_binary=False):
nn.Module.__init__(self)
self.action_space = action_space
self.is_binary = is_binary
self.discrete = ('n' in vars(self.action_space))
def random(self):
if self.discrete:
return np.random.randint(self.action_space.n)
else:
low = np.array(self.action_space.low)
high = np.array(self.action_space.high)
if self.is_binary:
return np.random.randint(3, size=self.action_space.shape) - 1
return np.random.random(size=self.action_space.shape) * (high - low) + low
def forward(self, obs, *args):
if isinstance(obs, dict): # goal conditioned environment
obs = obs['observation']
act = torch.Tensor(np.stack([self.random() for i in range(len(obs))], axis=0))
if self.discrete:
act = act.long()
return act
def reset(self, i):
pass
import numpy as np
# [reference] https://github.com/matthiasplappert/keras-rl/blob/master/rl/random.py
class RandomProcess(object):
def reset_states(self):
pass
class AnnealedGaussianProcess(RandomProcess):
def __init__(self, mu, sigma, sigma_min, n_steps_annealing):
self.mu = mu
self.sigma = sigma
self.n_steps = 0
if sigma_min is not None:
self.m = -float(sigma - sigma_min) / float(n_steps_annealing)
self.c = sigma
self.sigma_min = sigma_min
else:
self.m = 0.
self.c = sigma
self.sigma_min = sigma
@property
def current_sigma(self):
sigma = max(self.sigma_min, self.m * float(self.n_steps) + self.c)
return sigma
# Based on http://math.stackexchange.com/questions/1287634/implementing-ornstein-uhlenbeck-in-matlab
class OrnsteinUhlenbeckProcess(AnnealedGaussianProcess):
def __init__(self, theta, mu=0., sigma=1., dt=1e-2, x0=None, size=1, sigma_min=None, n_steps_annealing=1000):
super(OrnsteinUhlenbeckProcess, self).__init__(mu=mu, sigma=sigma, sigma_min=sigma_min,
n_steps_annealing=n_steps_annealing)
self.theta = theta
self.mu = mu
self.dt = dt
self.x0 = x0
self.size = size
self.reset_states()
def sample(self):
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.current_sigma * np.sqrt(
self.dt) * np.random.normal(size=self.size)
self.x_prev = x
self.n_steps += 1
return x
def reset_states(self):
self.x_prev = self.x0 if self.x0 is not None else np.zeros(self.size)
import sys
sys.path.append('../')
from gym.envs.registration import register
import gym
from goal_env.bitflip import BitFlipEnv
from goal_env.fourroom import FourRoom, FourRoom2, FourRoom3, FourRoom4
from goal_env.mountaincar import MountainCarEnv
from goal_env.plane import NaivePlane, NaivePlane2, NaivePlane3, NaivePlane4, NaivePlane5
from goal_env.goal_plane_env import GoalPlane
from goal_env.nchain import NChainEnv
register(
id='Bitflip-v0',
entry_point='goal_env.bitflip:BitFlipEnv',
kwargs={'num_bits': 11},
max_episode_steps=200,
reward_threshold=100.0,
nondeterministic=False,
)
N = 64
register(
id='NChain-v1',
entry_point='goal_env.nchain:NChainEnv',
kwargs={'n': N,
'slip': 0.1,
},
max_episode_steps=N+10,
)
register(
id='FourRoom-v0',
entry_point='goal_env.fourroom:FourRoom',
kwargs={'goal_type': 'fix_goal'},
max_episode_steps=200,
reward_threshold=100.0,
nondeterministic=False,
)
register(
id='FourRoom-v1',
entry_point='goal_env.fourroom:FourRoom2',
kwargs={'goal_type': 'fix_goal'},
max_episode_steps=200,
reward_threshold=100.0,
nondeterministic=False,
)
register(
id='FourRoom-v2',
entry_point='goal_env.fourroom:FourRoom3',
kwargs={'goal_type': 'fix_goal'},
max_episode_steps=200,
reward_threshold=100.0,
nondeterministic=False,
)
register(
id='FourRoom-v4',
entry_point='goal_env.fourroom:FourRoom4',
kwargs={'goal_type': 'fix_goal'},
max_episode_steps=200,
reward_threshold=100.0,
nondeterministic=False,
)
register(
id='mcar-v0',
entry_point='goal_env.mountaincar:MountainCarEnv',
kwargs={'goal_dim': 1},
max_episode_steps=200,
reward_threshold=100.0,
nondeterministic=False,
)
register(
id='Plane-v0',
entry_point='goal_env.plane:NaivePlane5',
)
register(
id='GoalPlane-v0',
entry_point='goal_env.goal_plane_env:GoalPlane',
max_episode_steps=50,
reward_threshold=195.0,
kwargs={
"env_name": "Plane-v0",
"maze_size": 15,
"action_size": 1,
"distance": 1.,
"start": (2.5, 2.5),
}
)
register(
id='GoalPlaneMid-v0',
entry_point='goal_env.goal_plane_env:GoalPlane',
max_episode_steps=50,
reward_threshold=195.0,
kwargs={
"env_name": "Plane-v0",
"type": "mid",
"maze_size": 15,
"action_size": 1,
"distance": 1.,
"start": (2.5, 2.5),
}
)
register(
id='GoalPlaneHard-v0',
entry_point='goal_env.goal_plane_env:GoalPlane',
max_episode_steps=50,
reward_threshold=195.0,
kwargs={
"env_name": "Plane-v0",
"type": "hard",
"maze_size": 15,
"action_size": 1,
"distance": 1.,
"start": (2.5, 2.5),
}
)
register(
id='GoalPlaneEasy-v0',
entry_point='goal_env.goal_plane_env:GoalPlane',
max_episode_steps=50,
reward_threshold=195.0,
kwargs={
"env_name": "Plane-v0",
"type": "easy",
"maze_size": 15,
"action_size": 1,
"distance": 1.,
"start": (2.5, 2.5),
}
)
register(
id='GoalPlaneTest-v0',
entry_point='goal_env.goal_plane_env:GoalPlane',
max_episode_steps=50,
reward_threshold=195.0,
kwargs={
"env_name": "Plane-v0",
"maze_size": 15,
"action_size": 1,
"distance": 1.,
"start": (2.5, 2.5),
"goals": (2.5, 12.5)
}
)
## copied from RL-Adventure2
import gym
import numpy as np
from gym import spaces
class BitFlipEnv(gym.Env):
def __init__(self, num_bits):
self.num_bits = num_bits
self.observation_space = {
'observation': spaces.Box(low=0, high=1, shape=(self.num_bits,)),
'desired_goal': spaces.Box(low=0, high=1, shape=(self.num_bits,)),
'achieved_goal': spaces.Box(low=0, high=1, shape=(self.num_bits,))
}
self.action_space = spaces.Discrete(self.num_bits)
def get_obs(self):
return {
"observation": np.copy(self.state),
"achieved_goal": np.copy(self.state),
"desired_goal": np.copy(self.target),
}
def reset(self):
self.done = False
self.num_steps = 0
self.state = np.random.randint(2, size=self.num_bits)
self.target = np.random.randint(2, size=self.num_bits)
return self.get_obs()
# return self.state, self.target
def step(self, action):
self.state[action] = 1 - self.state[action]
info = {'is_success': False}
# print(self.state, self.target)
if self.num_steps > self.num_bits + 1:
self.done = True
self.num_steps += 1
if np.sum(self.state == self.target) == self.num_bits:
self.done = True
info = {'is_success': True}
return self.get_obs(), 0, self.done, info
else:
return self.get_obs(), -1, self.done, info
def compute_reward(self, state, goal, info):
calcu = np.sum(state == goal, axis=1)
reward = np.where(calcu == self.num_bits, 0, -1)
return reward
def get_pairwise(self, state, target):
dist = self.num_bits - np.sum(state == target)
return dist
## importance resampling
import gym
import numpy as np
from gym import spaces
class FourRoom(gym.Env):
def __init__(self, seed=None, goal_type='fix_goal'):
self.n = 11
self.map = np.array([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0,
0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0,
0, 1, 1, 1, 1, 0, 0, 0, 1, 0, 0,
0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]).reshape((self.n, self.n))
self.goal_type = goal_type
self.goal = None
self.init()
def init(self):
self.observation_space = {
'observation': spaces.Box(low=0, high=1, shape=(self.n * self.n,), dtype=np.float32),
'desired_goal': spaces.Box(low=0, high=1, shape=(self.n * self.n,), dtype=np.float32),
'achieved_goal': spaces.Box(low=0, high=1, shape=(self.n * self.n,), dtype=np.float32)
}
self.observation_space['observation'].n = self.n
self.dx = [0, 1, 0, -1]
self.dy = [1, 0, -1, 0]
self.action_space = spaces.Discrete(len(self.dx))
self.reset()
def label2obs(self, x, y):
a = np.zeros((self.n * self.n,))
assert self.x < self.n and self.y < self.n
a[x * self.n + y] = 1
return a
def get_obs(self):
assert self.goal is not None
return {
'observation': self.label2obs(self.x, self.y),
'desired_goal': self.label2obs(*self.goal),
'achieved_goal': self.label2obs(self.x, self.y),
}
def reset(self):
condition = True
while condition:
self.x = np.random.randint(1, self.n)
self.y = np.random.randint(1, self.n)
condition = (self.map[self.x, self.y] == 0)
loc = np.where(self.map > 0.5)
assert len(loc) == 2
if self.goal_type == 'random':
goal_idx = np.random.randint(len(loc[0]))
elif self.goal_type == 'fix_goal':
goal_idx = 0
else:
raise NotImplementedError
self.goal = loc[0][goal_idx], loc[1][goal_idx]
self.done = False
return self.get_obs()
def step(self, action):
# assert not self.done
nx, ny = self.x + self.dx[action], self.y + self.dy[action]
info = {'is_success': False}
# before = self.get_obs().argmax()
if self.map[nx, ny]:
self.x, self.y = nx, ny
reward = -1
done = False
else:
reward = -1
done = False
if nx == self.goal[0] and ny == self.goal[1]:
reward = 0
info = {'is_success': True}
done = self.done = True
return self.get_obs(), reward, done, info
def compute_reward(self, state, goal, info):
state_obs = state.argmax(axis=1)
goal_obs = goal.argmax(axis=1)
reward = np.where(state_obs == goal_obs, 0, -1)
return reward
def restore(self, obs):
obs = obs.argmax()
self.x = obs // self.n
self.y = obs % self.n
def bfs_dist(self, state, goal):
# using bfs to search for shortest path
visited = {key: False for key in range(self.n * self.n)}
state_key = state.argmax()
goal_key = goal.argmax()
queue = []
visited[state_key] = True
queue.append(state_key)
dist = [-np.inf] * (self.n * self.n)
dist[state_key] = 0
while (queue):
par = queue.pop(0)
if par == goal_key:
break
x_par, y_par = par // self.n, par % self.n
for action in range(4):
x_child, y_child = x_par + self.dx[action], y_par + self.dy[action]
child = x_child * self.n + y_child
if self.map[x_child, y_child] == 0:
continue
if visited[child] == False:
visited[child] = True
queue.append(child)
dist[child] = dist[par] + 1
return dist[goal_key]
def get_pairwise(self, state, target):
dist = self.bfs_dist(state, target)
return dist
def all_states(self):
states = []
mask = []
for i in range(self.n):
for j in range(self.n):
self.x = i
self.y = j
states.append(self.get_obs())
if isinstance(states[-1], dict):
states[-1] = states[-1]['observation']
mask.append(self.map[self.x, self.y] > 0.5)
return np.array(states)[mask]
def all_edges(self):
A = np.zeros((self.n * self.n, self.n * self.n))
mask = []
for i in range(self.n):
for j in range(self.n):
mask.append(self.map[i, j] > 0.5)
if self.map[i][j]:
for a in range(4):
self.x = i
self.y = j
t = self.step(a)[0]
if isinstance(t, dict):
t = t['observation']
self.restore(t)
A[i * self.n + j, self.x * self.n + self.y] = 1
return A[mask][:, mask]
class FourRoom2(FourRoom):
def __init__(self, *args, **kwargs):
FourRoom.__init__(self, *args, **kwargs)
self.map = np.array([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]).reshape((self.n, self.n))
class FourRoom3(FourRoom):
def __init__(self, *args, **kwargs):
FourRoom.__init__(self, *args, **kwargs)
self.n = 5
self.map = np.array([
0, 0, 0, 0, 0,
0, 1, 1, 1, 0,
0, 1, 1, 1, 0,
0, 1, 1, 1, 0,
0, 0, 0, 0, 0,
]).reshape((self.n, self.n))
self.init()
class FourRoom4(FourRoom):
def __init__(self, seed=None, *args, **kwargs):
FourRoom.__init__(self, *args, **kwargs)
self.n = 16
self.map = np.array([
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
]).reshape((self.n, self.n))
self.init()
if __name__ == '__main__':
a = FourRoom()
import gym
import copy
import numpy as np
import cv2
from collections import OrderedDict
class GoalPlane(gym.Env):
def __init__(self, env_name, type='random', maze_size=16., action_size=1., distance=0.1, start=None, goals=None):
super(GoalPlane, self).__init__()
self.env = gym.make(env_name)
self.maze_size = maze_size
self.action_size = action_size
self.action_space = gym.spaces.Box(
low=-action_size, high=action_size, shape=(2,), dtype='float32')
self.ob_space = gym.spaces.Box(
low=0., high=maze_size, shape=(2,), dtype='float32')
self.easy_goal_space = gym.spaces.Box(low=np.array([0., 0.]),
high=np.array([self.maze_size, self.maze_size / 2]) \
, dtype=np.float32)
self.mid_goal_space = gym.spaces.Box(low=np.array([self.maze_size / 2, self.maze_size / 2]), \
high=np.array([self.maze_size, self.maze_size]), dtype=np.float32)
self.hard_goal_space = gym.spaces.Box(low=np.array([0., self.maze_size * 0.65]), \
high=np.array([self.maze_size / 2, self.maze_size]), dtype=np.float32)
self.type = type
if self.type == 'random':
self.goal_space = self.ob_space
elif self.type == 'easy':
self.goal_space = self.easy_goal_space
elif self.type == 'mid':
self.goal_space = self.mid_goal_space
elif self.type == 'hard':
self.goal_space = self.hard_goal_space
self.distance = distance
self.goals = goals
self.start = start
self.observation_space = gym.spaces.Dict(OrderedDict({
'observation': self.ob_space,
'desired_goal': self.goal_space,
'achieved_goal': self.ob_space,
}))
self.goal = None
def compute_reward(self, achieved_goal, desired_goal, info):
reward = -np.linalg.norm(achieved_goal - desired_goal, axis=-1)
return reward
def change_mode(self, mode='mid'):
if mode == 'random':
self.goal_space = self.ob_space
elif mode == 'easy':
self.goal_space = self.easy_goal_space
elif mode == 'mid':
self.goal_space = self.mid_goal_space
elif mode == 'hard':
self.goal_space = self.hard_goal_space
def step(self, action):
assert self.goal is not None
observation, reward, done, info = self.env.step(np.array(action) / self.maze_size) # normalize action
observation = np.array(observation) * self.maze_size
out = {'observation': observation,
'desired_goal': self.goal,
'achieved_goal': observation}
reward = -np.linalg.norm(observation - self.goal, axis=-1)
info['is_success'] = (reward > -self.distance)
return out, reward, done, info
def reset(self):
if self.start is not None:
self.env.reset()
observation = np.array(self.start)
self.env.restore(observation / self.maze_size)
else:
observation = self.env.reset()
if self.goals is None:
condition = True
while condition: # note: goal should not be in the block
self.goal = self.goal_space.sample()
condition = self.env.check_inside(self.goal / self.maze_size)
else:
self.goal = np.array(self.goals)
out = {'observation': observation, 'desired_goal': self.goal}
out['achieved_goal'] = observation
return out
def render(self, mode='rgb_array'):
image = self.env.render(mode='rgb_array')
goal_loc = copy.copy(self.goal)
goal_loc[0] = goal_loc[0] / self.maze_size * image.shape[1]
goal_loc[1] = goal_loc[1] / self.maze_size * image.shape[0]
cv2.circle(image, (int(goal_loc[0]), int(goal_loc[1])), 10, (0, 255, 0), -1)
if mode == 'human':
cv2.imshow('image', image)
cv2.waitKey(2)
else:
return image
import math
import numpy as np
import gym
from gym import spaces
from gym.utils import seeding
class MountainCarEnv(gym.Env):
metadata = {
'render.modes': ['human', 'rgb_array'],
'video.frames_per_second': 30
}
def __init__(self, goal_dim=1):
self.min_position = -1.2
self.max_position = 0.6
self.max_speed = 0.07
self.goal_position = 0.5
self.force = 0.001
self.gravity = 0.0025
self.low = np.array([self.min_position, -self.max_speed])
self.high = np.array([self.max_position, self.max_speed])
self.viewer = None
self.goal_dim = goal_dim
self.action_space = spaces.Discrete(3)
self.observation_space = {
"achieved_goal": spaces.Box(self.low[:self.goal_dim], self.high[:self.goal_dim], dtype=np.float32),
"desired_goal": spaces.Box(self.low[:self.goal_dim], self.high[:self.goal_dim], dtype=np.float32),
"observation": spaces.Box(self.low, self.high, dtype=np.float32),
}
self.seed()
def get_obs(self):
return {
"achieved_goal": np.array(self.state)[:self.goal_dim],
"desired_goal": np.array([self.goal_position, 0][:self.goal_dim]),
"observation": np.array(self.state),
}
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, action):
assert self.action_space.contains(
action), "%r (%s) invalid" % (action, type(action))
info = {'is_success': False}
position, velocity = self.state
velocity += (action - 1) * self.force + \
math.cos(3 * position) * (-self.gravity)
velocity = np.clip(velocity, -self.max_speed, self.max_speed)
position += velocity
position = np.clip(position, self.min_position, self.max_position)
if (position == self.min_position and velocity < 0):
velocity = 0
done = bool(position >= self.goal_position)
reward = -1.0
if done:
reward = 0.0
info['is_success'] = True
self.state = (position, velocity)
return self.get_obs(), reward, done, info
# return np.array(self.state), reward, done, {}
def reset(self):
self.state = np.array([self.np_random.uniform(low=-0.6, high=-0.4), 0])
return self.get_obs()
# return np.array(self.state)
def _height(self, xs):
return np.sin(3 * xs) * .45 + .55
def render(self, mode='human'):
screen_width = 600
screen_height = 400
world_width = self.max_position - self.min_position
scale = screen_width / world_width
carwidth = 40
carheight = 20
if self.viewer is None:
from gym.envs.classic_control import rendering
self.viewer = rendering.Viewer(screen_width, screen_height)
xs = np.linspace(self.min_position, self.max_position, 100)
ys = self._height(xs)
xys = list(zip((xs - self.min_position) * scale, ys * scale))
self.track = rendering.make_polyline(xys)
self.track.set_linewidth(4)
self.viewer.add_geom(self.track)
clearance = 10
l, r, t, b = -carwidth / 2, carwidth / 2, carheight, 0
car = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])
car.add_attr(rendering.Transform(translation=(0, clearance)))
self.cartrans = rendering.Transform()
car.add_attr(self.cartrans)
self.viewer.add_geom(car)
frontwheel = rendering.make_circle(carheight / 2.5)
frontwheel.set_color(.5, .5, .5)
frontwheel.add_attr(rendering.Transform(
translation=(carwidth / 4, clearance)))
frontwheel.add_attr(self.cartrans)
self.viewer.add_geom(frontwheel)
backwheel = rendering.make_circle(carheight / 2.5)
backwheel.add_attr(rendering.Transform(
translation=(-carwidth / 4, clearance)))
backwheel.add_attr(self.cartrans)
backwheel.set_color(.5, .5, .5)
self.viewer.add_geom(backwheel)
flagx = (self.goal_position - self.min_position) * scale
flagy1 = self._height(self.goal_position) * scale
flagy2 = flagy1 + 50
flagpole = rendering.Line((flagx, flagy1), (flagx, flagy2))
self.viewer.add_geom(flagpole)
flag = rendering.FilledPolygon(
[(flagx, flagy2), (flagx, flagy2 - 10), (flagx + 25, flagy2 - 5)])
flag.set_color(.8, .8, 0)
self.viewer.add_geom(flag)
pos = self.state[0]
self.cartrans.set_translation(
(pos - self.min_position) * scale, self._height(pos) * scale)
self.cartrans.set_rotation(math.cos(3 * pos))
return self.viewer.render(return_rgb_array=mode == 'rgb_array')
def get_keys_to_action(self):
# control with left and right arrow keys
return {(): 1, (276,): 0, (275,): 2, (275, 276): 1}
def close(self):
if self.viewer:
self.viewer.close()
self.viewer = None
def compute_reward(self, state, goal):
'''
to be finish
:param state:
:param goal:
:return:
'''
def get_pairwise(self, state, target):
'''
to be finish
:param state:
:param target:
:return:
'''
from gym.envs.registration import register
import sys
print("path", sys.argv[0].split('/')[-1], "!!!")
if sys.argv[0].split('/')[-1] in ["train_ddpg.py", "visitation_plot.py", "vis_fetch.py"]:
from train_ddpg import args
elif sys.argv[0].split('/')[-1] == "train_hier_ddpg.py":
from train_hier_ddpg import args
elif sys.argv[0].split('/')[-1] == "train_hier_sac.py":
from train_hier_sac import args
elif sys.argv[0].split('/')[-1] == "train_hier_ppo.py":
from train_hier_ppo import args
elif sys.argv[0].split('/')[-1] == "train_covering.py":
from train_covering import args
else:
raise Exception("Unknown main file !!!")
robots = ['Point', 'Ant', 'Swimmer']
task_types = ['Maze', 'Maze1', 'Push', 'Fall', 'Block', 'BlockMaze']
all_name = [x + y for x in robots for y in task_types]
random_start = False
if args.image:
top_down = True
else:
top_down = False
for name_t in all_name:
# episode length
if name_t == "AntMaze":
max_timestep = 1000
else:
max_timestep = 500
for Test in ['', 'Test', 'Test1', 'Test2']:
if Test in ['Test', 'Test1', 'Test2']:
fix_goal = True
else:
if name_t == "AntBlock":
fix_goal = True
else:
fix_goal = False
goal_args = [[-5, -5], [5, 5]]
register(
id=name_t + Test + '-v0',
entry_point='goal_env.mujoco.create_maze_env:create_maze_env',
kwargs={'env_name': name_t, 'goal_args': goal_args, 'maze_size_scaling': 8, 'random_start': random_start},
max_episode_steps=max_timestep,
)
# v1 is the one we use in the main paper
register(
id=name_t + Test + '-v1',
entry_point='goal_env.mujoco.create_maze_env:create_maze_env',
kwargs={'env_name': name_t, 'goal_args': goal_args, 'maze_size_scaling': 4, 'random_start': random_start,
"fix_goal": fix_goal, "top_down_view": top_down, 'test':Test},
max_episode_steps=max_timestep,
)
register(
id=name_t + Test + '-v2',
entry_point='goal_env.mujoco.create_maze_env:create_maze_env',
kwargs={'env_name': name_t, 'goal_args': goal_args, 'maze_size_scaling': 2, 'random_start': random_start},
max_episode_steps=max_timestep,
)
# Copyright 2018 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Wrapper for creating the ant environment in gym_mujoco."""
import math
import numpy as np
from gym import utils
from gym.envs.mujoco import mujoco_env
def q_inv(a):
return [a[0], -a[1], -a[2], -a[3]]
def q_mult(a, b): # multiply two quaternion
w = a[0] * b[0] - a[1] * b[1] - a[2] * b[2] - a[3] * b[3]
i = a[0] * b[1] + a[1] * b[0] + a[2] * b[3] - a[3] * b[2]
j = a[0] * b[2] - a[1] * b[3] + a[2] * b[0] + a[3] * b[1]
k = a[0] * b[3] + a[1] * b[2] - a[2] * b[1] + a[3] * b[0]
return [w, i, j, k]
class AntEnv(mujoco_env.MujocoEnv, utils.EzPickle):
FILE = "ant.xml"
ORI_IND = 3
def __init__(self, file_path=None, expose_all_qpos=True,
expose_body_coms=None, expose_body_comvels=None, noisy_init=True):
self._expose_all_qpos = expose_all_qpos
self._expose_body_coms = expose_body_coms
self._expose_body_comvels = expose_body_comvels
self._body_com_indices = {}
self._body_comvel_indices = {}
self.noisy_init = noisy_init
self.full_obs = False
self.add_noise = False
mujoco_env.MujocoEnv.__init__(self, file_path, 10)
utils.EzPickle.__init__(self)
@property
def physics(self):
return self.model
def _step(self, a):
return self.step(a)
def step(self, a):
xposbefore = self.get_body_com("torso")[0]
self.do_simulation(a, self.frame_skip)
xposafter = self.get_body_com("torso")[0]
forward_reward = (xposafter - xposbefore) / self.dt
ctrl_cost = .5 * np.square(a).sum()
survive_reward = 1.0
reward = forward_reward - ctrl_cost + survive_reward
state = self.state_vector()
done = False
ob = self._get_obs()
return ob, reward, done, dict(
reward_forward=forward_reward,
reward_ctrl=-ctrl_cost,
reward_survive=survive_reward)
def _get_obs(self):
# No cfrc observation
if self._expose_all_qpos:
obs = np.concatenate([
self.data.qpos.flat[:15], # Ensures only ant obs.
self.data.qvel.flat[:14],
])
else:
obs = np.concatenate([
self.data.qpos.flat[2:15],
self.data.qvel.flat[:14],
])
if self._expose_body_coms is not None:
for name in self._expose_body_coms:
com = self.get_body_com(name)
if name not in self._body_com_indices:
indices = range(len(obs), len(obs) + len(com))
self._body_com_indices[name] = indices
obs = np.concatenate([obs, com])
if self._expose_body_comvels is not None:
for name in self._expose_body_comvels:
comvel = self.get_body_comvel(name)
if name not in self._body_comvel_indices:
indices = range(len(obs), len(obs) + len(comvel))
self._body_comvel_indices[name] = indices
obs = np.concatenate([obs, comvel])
if self.full_obs:
obs = np.concatenate([
self.data.qpos.flat,
self.data.qvel.flat,
np.clip(self.data.cfrc_ext, -1, 1).flat,
])
if self.add_noise:
obs = np.concatenate((obs, np.random.uniform(low=-1, high=1, size=20)))
return obs
def reset_model(self):
if self.noisy_init:
qpos = self.init_qpos + self.np_random.uniform(
size=self.model.nq, low=-.1, high=.1)
qvel = self.init_qvel + self.np_random.randn(self.model.nv) * .1
else:
qpos = self.init_qpos
qvel = self.init_qvel
# Set everything other than ant to original position and 0 velocity.
qpos[15:] = self.init_qpos[15:]
qvel[14:] = 0.
self.set_state(qpos, qvel)
return self._get_obs()
def viewer_setup(self):
# self.viewer.cam.distance = self.model.stat.extent
# self.viewer.cam.trackbodyid = 1
# self.viewer.cam.distance = self.model.stat.extent * 0.7
# self.viewer.cam.lookat[2] = 0.8925
# self.viewer.cam.elevation = 0
self.viewer.cam.trackbodyid = -1
self.viewer.cam.distance = 30
self.viewer.cam.elevation = -90
def get_ori(self):
ori = [0, 1, 0, 0]
rot = self.data.qpos[self.__class__.ORI_IND:self.__class__.ORI_IND + 4] # take the quaternion
ori = q_mult(q_mult(rot, ori), q_inv(rot))[1:3] # project onto x-y plane
ori = math.atan2(ori[1], ori[0])
return ori
def set_xy(self, xy):
qpos = np.copy(self.data.qpos)
qpos[0] = xy[0]
qpos[1] = xy[1]
qvel = self.data.qvel
self.set_state(qpos, qvel)
def get_xy(self):
return self.data.qpos[:2]
from .maze_env import MazeEnv
from .ant import AntEnv
class AntMazeEnv(MazeEnv):
MODEL_CLASS = AntEnv
<mujoco model="ant">
<compiler inertiafromgeom="true" angle="degree" coordinate="local"/>
<option timestep="0.02" integrator="RK4"/>
<custom>
<numeric name="init_qpos" data="0.0 0.0 0.55 1.0 0.0 0.0 0.0 0.0 1.0 0.0 -1.0 0.0 -1.0 0.0 1.0"/>
</custom>
<default>
<joint limited="true" armature="1" damping="1"/>
<geom condim="3" conaffinity="0" margin="0.01" friction="1 0.5 0.5" solref=".02 1" solimp=".8 .8 .01"
rgba="0.8 0.6 0.4 1" density="5.0"/>
</default>
<asset>
<texture type="skybox" builtin="gradient" width="100" height="100" rgb1="1 1 1" rgb2="0 0 0"/>
<texture name="texgeom" type="cube" builtin="flat" mark="cross" width="127" height="1278" rgb1="0.8 0.6 0.4"
rgb2="0.8 0.6 0.4" markrgb="1 1 1" random="0.01"/>
<texture name="texplane" type="2d" builtin="checker" rgb1="0 0 0" rgb2="0.8 0.8 0.8" width="100" height="100"/>
<material name='MatPlane' texture="texplane" shininess="1" texrepeat="60 60" specular="1" reflectance="0.5"/>
<material name='geom' texture="texgeom" texuniform="true"/>
</asset>
<worldbody>
<light directional="true" cutoff="100" exponent="1" diffuse="1 1 1" specular=".1 .1 .1" pos="0 0 1.3"
dir="-0 0 -1.3"/>
<geom name='floor' pos='0 0 0' size='40 40 40' type='plane' conaffinity='1' rgba='0.8 0.9 0.8 1' condim='3'/>
<body name="torso" pos="0 0 0.75">
<geom name="torso_geom" type="sphere" size="0.25" pos="0 0 0"/>
<joint name="root" type="free" limited="false" pos="0 0 0" axis="0 0 1" margin="0.01" armature="0"
damping="0"/>
<body name="front_left_leg" pos="0 0 0">
<geom name="aux_1_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 0.2 0.2 0.0"/>
<body name="aux_1" pos="0.2 0.2 0">
<joint name="hip_1" type="hinge" pos="0.0 0.0 0.0" axis="0 0 1" range="-30 30"/>
<geom name="left_leg_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 0.2 0.2 0.0"/>
<body pos="0.2 0.2 0">
<joint name="ankle_1" type="hinge" pos="0.0 0.0 0.0" axis="-1 1 0" range="30 70"/>
<geom name="left_ankle_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 0.4 0.4 0.0"/>
</body>
</body>
</body>
<body name="front_right_leg" pos="0 0 0">
<geom name="aux_2_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 -0.2 0.2 0.0"/>
<body name="aux_2" pos="-0.2 0.2 0">
<joint name="hip_2" type="hinge" pos="0.0 0.0 0.0" axis="0 0 1" range="-30 30"/>
<geom name="right_leg_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 -0.2 0.2 0.0"/>
<body pos="-0.2 0.2 0">
<joint name="ankle_2" type="hinge" pos="0.0 0.0 0.0" axis="1 1 0" range="-70 -30"/>
<geom name="right_ankle_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 -0.4 0.4 0.0"/>
</body>
</body>
</body>
<body name="back_leg" pos="0 0 0">
<geom name="aux_3_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 -0.2 -0.2 0.0"/>
<body name="aux_3" pos="-0.2 -0.2 0">
<joint name="hip_3" type="hinge" pos="0.0 0.0 0.0" axis="0 0 1" range="-30 30"/>
<geom name="back_leg_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 -0.2 -0.2 0.0"/>
<body pos="-0.2 -0.2 0">
<joint name="ankle_3" type="hinge" pos="0.0 0.0 0.0" axis="-1 1 0" range="-70 -30"/>
<geom name="third_ankle_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 -0.4 -0.4 0.0"/>
</body>
</body>
</body>
<body name="right_back_leg" pos="0 0 0">
<geom name="aux_4_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 0.2 -0.2 0.0"/>
<body name="aux_4" pos="0.2 -0.2 0">
<joint name="hip_4" type="hinge" pos="0.0 0.0 0.0" axis="0 0 1" range="-30 30"/>
<geom name="rightback_leg_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 0.2 -0.2 0.0"/>
<body pos="0.2 -0.2 0">
<joint name="ankle_4" type="hinge" pos="0.0 0.0 0.0" axis="1 1 0" range="30 70"/>
<geom name="fourth_ankle_geom" type="capsule" size="0.08" fromto="0.0 0.0 0.0 0.4 -0.4 0.0"/>
</body>
</body>
</body>
</body>
</worldbody>
<actuator>
<motor joint="hip_4" ctrlrange="-16.0 16.0" ctrllimited="true"/>
<motor joint="ankle_4" ctrlrange="-16.0 16.0" ctrllimited="true"/>
<motor joint="hip_1" ctrlrange="-16.0 16.0" ctrllimited="true"/>
<motor joint="ankle_1" ctrlrange="-16.0 16.0" ctrllimited="true"/>
<motor joint="hip_2" ctrlrange="-16.0 16.0" ctrllimited="true"/>
<motor joint="ankle_2" ctrlrange="-16.0 16.0" ctrllimited="true"/>
<motor joint="hip_3" ctrlrange="-16.0 16.0" ctrllimited="true"/>
<motor joint="ankle_3" ctrlrange="-16.0 16.0" ctrllimited="true"/>
</actuator>
<!--<actuator>-->
<!--<motor joint="hip_4" ctrlrange="-30.0 30.0" ctrllimited="true"/>-->
<!--<motor joint="ankle_4" ctrlrange="-30.0 30.0" ctrllimited="true"/>-->
<!--<motor joint="hip_1" ctrlrange="-30.0 30.0" ctrllimited="true"/>-->
<!--<motor joint="ankle_1" ctrlrange="-30.0 30.0" ctrllimited="true"/>-->
<!--<motor joint="hip_2" ctrlrange="-30.0 30.0" ctrllimited="true"/>-->
<!--<motor joint="ankle_2" ctrlrange="-30.0 30.0" ctrllimited="true"/>-->
<!--<motor joint="hip_3" ctrlrange="-30.0 30.0" ctrllimited="true"/>-->
<!--<motor joint="ankle_3" ctrlrange="-30.0 30.0" ctrllimited="true"/>-->
<!--</actuator>-->
</mujoco>
<mujoco>
<compiler angle="degree" coordinate="local" inertiafromgeom="true"/>
<option integrator="RK4" timestep="0.02"/>
<default>
<joint armature="0" damping="0" limited="false"/>
<geom conaffinity="0" condim="3" density="100" friction="1 0.5 0.5" margin="0" rgba="0.8 0.6 0.4 1"/>
</default>
<asset>
<texture builtin="gradient" height="100" rgb1="1 1 1" rgb2="0 0 0" type="skybox" width="100"/>
<texture builtin="flat" height="1278" mark="cross" markrgb="1 1 1" name="texgeom" random="0.01"
rgb1="0.8 0.6 0.4" rgb2="0.8 0.6 0.4" type="cube" width="127"/>
<texture builtin="checker" height="100" name="texplane" rgb1="0 0 0" rgb2="0.8 0.8 0.8" type="2d" width="100"/>
<material name="MatPlane" reflectance="0.5" shininess="1" specular="1" texrepeat="30 30" texture="texplane"/>
<material name="geom" texture="texgeom" texuniform="true"/>
</asset>
<worldbody>
<light cutoff="100" diffuse="1 1 1" dir="-0 0 -1.3" directional="true" exponent="1" pos="0 0 1.3"
specular=".1 .1 .1"/>
<geom conaffinity="1" condim="3" material="MatPlane" name="floor" pos="0 0 0" rgba="0.8 0.9 0.8 1"
size="40 40 40" type="plane"/>
<body name="torso" pos="0 0 0">
<geom name="pointbody" pos="0 0 0.5" size="0.5" type="sphere"/>
<geom name="pointarrow" pos="0.6 0 0.5" size="0.5 0.1 0.1" type="box"/>
<joint axis="1 0 0" name="ballx" pos="0 0 0" type="slide"/>
<joint axis="0 1 0" name="bally" pos="0 0 0" type="slide"/>
<joint axis="0 0 1" limited="false" name="rot" pos="0 0 0" type="hinge"/>
</body>
</worldbody>
<actuator>
<!-- Those are just dummy actuators for providing ranges -->
<motor ctrllimited="true" ctrlrange="-1 1" joint="ballx"/>
<motor ctrllimited="true" ctrlrange="-0.25 0.25" joint="rot"/>
</actuator>
</mujoco>
\ No newline at end of file
<mujoco model="swimmer">
<compiler inertiafromgeom="true" angle="degree" coordinate="local" />
<custom>
<numeric name="frame_skip" data="50" />
</custom>
<option timestep="0.001" density="4000" viscosity="0.1" collision="predefined" integrator="Euler" iterations="1000">
<flag warmstart="disable" />
</option>
<default>
<geom contype='1' conaffinity='1' condim='1' rgba='0.8 0.6 .4 1' material="geom" />
<!--<joint armature='1' />-->
</default>
<asset>
<texture type="skybox" builtin="gradient" width="100" height="100" rgb1="1 1 1" rgb2="0 0 0" />
<texture name="texgeom" type="cube" builtin="flat" mark="cross" width="127" height="1278" rgb1="0.8 0.6 0.4" rgb2="0.8 0.6 0.4" markrgb="1 1 1" random="0.01" />
<texture name="texplane" type="2d" builtin="checker" rgb1="0 0 0" rgb2="0.8 0.8 0.8" width="100" height="100" />
<material name='MatPlane' texture="texplane" shininess="1" texrepeat="30 30" specular="1" reflectance="0.5" />
<material name='geom' texture="texgeom" texuniform="true" />
</asset>
<worldbody>
<light directional="true" cutoff="100" exponent="1" diffuse="1 1 1" specular=".1 .1 .1" pos="0 0 1.3" dir="-0 0 -1.3" />
<geom name='floor' material="MatPlane" pos='0 0 -0.1' size='40 40 0.1' type='plane' conaffinity='1' rgba='0.8 0.9 0.8 1' condim='3' />
<!-- ================= SWIMMER ================= /-->
<body name="torso" pos="0 0 0">
<geom name="torso" type="capsule" fromto="1.5 0 0 0.5 0 0" size="0.1" density="1000" />
<joint pos="0 0 0" type="slide" name="slider1" axis="1 0 0" />
<joint pos="0 0 0" type="slide" name="slider2" axis="0 1 0" />
<joint name="rot" type="hinge" pos="0 0 0" axis="0 0 1" />
<body name="mid" pos="0.5 0 0">
<geom name="mid" type="capsule" fromto="0 0 0 -1 0 0" size="0.1" density="1000" />
<joint name="rot2" type="hinge" pos="0 0 0" axis="0 0 1" range="-100 100" limited="true" />
<body name="back" pos="-1 0 0">
<geom name="back" type="capsule" fromto="0 0 0 -1 0 0" size="0.1" density="1000" />
<joint name="rot3" type="hinge" pos="0 0 0" axis="0 0 1" range="-100 100" limited="true" />
</body>
</body>
</body>
</worldbody>
<actuator>
<motor joint="rot2" ctrllimited="true" ctrlrange="-50 50" />
<motor joint="rot3" ctrllimited="true" ctrlrange="-50 50" />
</actuator>
</mujoco>
from .ant_maze_env import AntMazeEnv
from .point_maze_env import PointMazeEnv
from .swimmer_maze_env import SwimmerMazeEnv
from collections import OrderedDict
import gym
import numpy as np
import copy
from gym import Wrapper
from gym.envs.registration import EnvSpec
class GoalWrapper(Wrapper):
def __init__(self, env, maze_size_scaling, random_start, low, high, fix_goal=True, top_down=False, test=None):
super(GoalWrapper, self).__init__(env)
ob_space = env.observation_space
self.maze_size_scaling = maze_size_scaling
row_num, col_num = len(self.env.MAZE_STRUCTURE), len(self.env.MAZE_STRUCTURE[0])
contain_r = [1 if "r" in row else 0 for row in self.env.MAZE_STRUCTURE]
row_r = contain_r.index(1)
col_r = self.env.MAZE_STRUCTURE[row_r].index("r")
y_low = (0.5 - row_r) * self.maze_size_scaling
x_low = (0.5 - col_r) * self.maze_size_scaling
y_high = (row_num - 1.5 - row_r) * self.maze_size_scaling
x_high = (col_num - 1.5 - col_r) * self.maze_size_scaling
self.maze_low = maze_low = np.array([x_low, y_low],
dtype=ob_space.dtype)
self.maze_high = maze_high = np.array([x_high, y_high],
dtype=ob_space.dtype)
print("maze_low, maze_high", self.maze_low, self.maze_high)
goal_low, goal_high = maze_low, maze_high
self.goal_space = gym.spaces.Box(low=goal_low, high=goal_high)
self.maze_space = gym.spaces.Box(low=maze_low, high=maze_high)
if self.env._maze_id == "Fall":
self.goal_dim = 3
else:
self.goal_dim = goal_low.size
print("goal_dim in create_maze", self.goal_dim)
self.distance_threshold = 1.5
print("distance threshold in create_maze", self.distance_threshold)
self.observation_space = gym.spaces.Dict(OrderedDict({
'observation': ob_space,
'desired_goal': self.goal_space,
'achieved_goal': self.goal_space,
}))
self.random_start = random_start
# fix goal
self.fix_goal = fix_goal
print("fix goal", self.fix_goal)
contain_g = [1 if "g" in row else 0 for row in self.env.MAZE_STRUCTURE]
if 1 in contain_g and self.fix_goal and test == "Test":
row = contain_g.index(1)
col = self.env.MAZE_STRUCTURE[row].index("g")
y = (row - row_r) * self.maze_size_scaling
x = (col - col_r) * self.maze_size_scaling
self.fix_goal_xy = np.array([x, y])
if env._maze_id == "Fall":
self.fix_goal_xy = np.concatenate((self.fix_goal_xy, [self.maze_size_scaling * 0.5 + 0.5]))
print("fix goal xy", self.fix_goal_xy)
elif test == "Test1":
if env._maze_id == "Push":
self.fix_goal_xy = np.array([-4, 0])
elif env._maze_id == "Maze1":
self.fix_goal_xy = np.array([8, 0])
else:
print("Unknown env", env._maze_id)
assert False
print("fix goal xy", self.fix_goal_xy)
elif test == "Test2":
if env._maze_id == "Push":
self.fix_goal_xy = np.array([-4, 4])
elif env._maze_id == "Maze1":
self.fix_goal_xy = np.array([8, 8])
else:
print("Unknown env", env._maze_id)
assert False
print("fix goal xy", self.fix_goal_xy)
else:
# get vacant rowcol
structure = self.env.MAZE_STRUCTURE
self.vacant_rowcol = []
for i in range(len(structure)):
for j in range(len(structure[0])):
if structure[i][j] not in [1, -1, 'r']:
self.vacant_rowcol.append((i, j))
self.reward_type = "dense"
self.top_down = top_down
def step(self, action):
observation, reward, _, info = self.env.step(action)
out = {'observation': observation,
'desired_goal': self.goal,
# 'achieved_goal': observation[..., 3:5]}
'achieved_goal': observation[..., :self.goal_dim]}
distance = np.linalg.norm(observation[..., :self.goal_dim] - self.goal[..., :self.goal_dim], axis=-1)
info['is_success'] = done = (distance < self.distance_threshold)
if self.reward_type == "sparse":
reward = -(distance > self.distance_threshold).astype(np.float32)
else:
# normlization
reward = -distance * 0.1
if self.top_down:
mask = np.array([0.0] * 2 + [1.0] * (out['observation'].shape[0] - 2))
out['observation'] = out['observation'] * mask
return out, reward, done, info
def reset(self):
if self.fix_goal:
self.goal = self.fix_goal_xy
else:
self.goal = self.goal_space.sample()
if self.env._maze_id == "Push":
while (self.env.old_invalid_goal(self.goal[:2])):
self.goal = self.goal_space.sample()
else:
while (self.env.invalid_goal(self.goal[:2])):
self.goal = self.goal_space.sample()
if self.env._maze_id == "Fall":
self.goal = np.concatenate((self.goal, [self.maze_size_scaling * 0.5 + 0.5]))
observation = self.env.reset(self.goal)
# random start a position without collision
if self.random_start:
xy = self.maze_space.sample()
while (self.env._is_in_collision(xy)):
xy = self.maze_space.sample()
self.env.wrapped_env.set_xy(xy)
observation = self.env._get_obs()
out = {'observation': observation, 'desired_goal': self.goal}
out['achieved_goal'] = observation[..., :self.goal_dim]
# out['achieved_goal'] = observation[..., 3:5]
if self.top_down:
# print("obs", out['observation'].shape)
mask = np.array([0.0] * 2 + [1.0] * (out['observation'].shape[0] - 2))
out['observation'] = out['observation'] * mask
return out
def create_maze_env(env_name=None, top_down_view=False, maze_size_scaling=4, random_start=True, goal_args=None,
fix_goal=True, test=None):
n_bins = 0
if env_name.startswith('Ego'):
n_bins = 8
env_name = env_name[3:]
if env_name.startswith('Ant'):
manual_collision = True
cls = AntMazeEnv
env_name = env_name[3:]
maze_size_scaling = maze_size_scaling
elif env_name.startswith('Point'):
cls = PointMazeEnv
manual_collision = True
env_name = env_name[5:]
maze_size_scaling = maze_size_scaling
elif env_name.startswith('Swimmer'):
cls = SwimmerMazeEnv
manual_collision = True
env_name = env_name[7:]
maze_size_scaling = maze_size_scaling
else:
assert False, 'unknown env %s' % env_name
observe_blocks = False
put_spin_near_agent = False
if env_name == 'Maze':
maze_id = 'Maze'
elif env_name == 'Maze1':
maze_id = 'Maze1'
maze_size_scaling = 4
elif env_name == 'Push':
maze_id = 'Push'
manual_collision = True
maze_size_scaling = 4
elif env_name == 'Fall':
maze_id = 'Fall'
elif env_name == 'Block':
maze_id = 'Block'
put_spin_near_agent = True
observe_blocks = True
elif env_name == 'BlockMaze':
maze_id = 'BlockMaze'
put_spin_near_agent = True
observe_blocks = True
else:
raise ValueError('Unknown maze environment %s' % env_name)
gym_mujoco_kwargs = {
'maze_id': maze_id,
'n_bins': n_bins,
'observe_blocks': observe_blocks,
'put_spin_near_agent': put_spin_near_agent,
'top_down_view': top_down_view,
'manual_collision': manual_collision,
'maze_size_scaling': maze_size_scaling,
}
gym_env = cls(**gym_mujoco_kwargs)
# gym_env.reset()
# goal_args = np.array(goal_args) / 8 * maze_size_scaling
return GoalWrapper(gym_env, maze_size_scaling, random_start, *goal_args, fix_goal=fix_goal, top_down=top_down_view, test=test)
"""Adapted from rllab maze_env_utils.py."""
import numpy as np
import math
class Move(object):
X = 11
Y = 12
Z = 13
XY = 14
XZ = 15
YZ = 16
XYZ = 17
SpinXY = 18
def can_move_x(movable):
return movable in [Move.X, Move.XY, Move.XZ, Move.XYZ,
Move.SpinXY]
def can_move_y(movable):
return movable in [Move.Y, Move.XY, Move.YZ, Move.XYZ,
Move.SpinXY]
def can_move_z(movable):
return movable in [Move.Z, Move.XZ, Move.YZ, Move.XYZ]
def can_spin(movable):
return movable in [Move.SpinXY]
def can_move(movable):
return can_move_x(movable) or can_move_y(movable) or can_move_z(movable)
def construct_maze(maze_id='Maze'):
if maze_id == 'Maze':
structure = [
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
[1, 0, 'r', 0, 0, 1, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
[1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1],
[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
[1, 0, 0, 0, 0, 1, 0, 0, 0, 'g', 1],
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
]
elif maze_id == 'Maze1':
structure = [
[1, 1, 1, 1, 1],
[1, 'r', 0, 0, 1],
[1, 1, 1, 0, 1],
[1, 'g', 0, 0, 1],
[1, 1, 1, 1, 1],
]
elif maze_id == 'Push':
structure = [
[1, 1, 1, 1, 1],
[1, 0, 'r', 1, 1],
[1, 0, Move.XY, 0, 1],
[1, 1, 'g', 1, 1],
[1, 1, 1, 1, 1],
]
elif maze_id == 'Fall':
structure = [
[1, 1, 1, 1],
[1, 'r', 0, 1],
[1, 0, Move.YZ, 1],
[1, -1, -1, 1],
[1, 'g', 0, 1],
[1, 1, 1, 1],
]
elif maze_id == 'Block':
O = 'r'
structure = [
[1, 1, 1, 1, 1],
[1, O, 0, 0, 1],
[1, 0, 0, 0, 1],
[1, 0, 0, 'g', 1],
[1, 1, 1, 1, 1],
]
elif maze_id == 'BlockMaze':
O = 'r'
structure = [
[1, 1, 1, 1],
[1, O, 0, 1],
[1, 1, 0, 1],
[1, 'g', 0, 1],
[1, 1, 1, 1],
]
else:
raise NotImplementedError('The provided MazeId %s is not recognized' % maze_id)
return structure
def line_intersect(pt1, pt2, ptA, ptB):
"""
Taken from https://www.cs.hmc.edu/ACM/lectures/intersections.html
this returns the intersection of Line(pt1,pt2) and Line(ptA,ptB)
"""
DET_TOLERANCE = 0.00000001
# the first line is pt1 + r*(pt2-pt1)
# in component form:
x1, y1 = pt1
x2, y2 = pt2
dx1 = x2 - x1
dy1 = y2 - y1
# the second line is ptA + s*(ptB-ptA)
x, y = ptA
xB, yB = ptB
dx = xB - x
dy = yB - y
DET = (-dx1 * dy + dy1 * dx)
if math.fabs(DET) < DET_TOLERANCE: return (0, 0, 0, 0, 0)
# now, the determinant should be OK
DETinv = 1.0 / DET
# find the scalar amount along the "self" segment
r = DETinv * (-dy * (x - x1) + dx * (y - y1))
# find the scalar amount along the input line
s = DETinv * (-dy1 * (x - x1) + dx1 * (y - y1))
# return the average of the two descriptions
xi = (x1 + r * dx1 + x + s * dx) / 2.0
yi = (y1 + r * dy1 + y + s * dy) / 2.0
return (xi, yi, 1, r, s)
def ray_segment_intersect(ray, segment):
"""
Check if the ray originated from (x, y) with direction theta intersects the line segment (x1, y1) -- (x2, y2),
and return the intersection point if there is one
"""
(x, y), theta = ray
# (x1, y1), (x2, y2) = segment
pt1 = (x, y)
len = 1
pt2 = (x + len * math.cos(theta), y + len * math.sin(theta))
xo, yo, valid, r, s = line_intersect(pt1, pt2, *segment)
if valid and r >= 0 and 0 <= s <= 1:
return (xo, yo)
return None
def point_distance(p1, p2):
x1, y1 = p1
x2, y2 = p2
return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5
"""Wrapper for creating the ant environment in gym_mujoco."""
import math
import numpy as np
from gym import utils
from gym.envs.mujoco import mujoco_env
class PointEnv(mujoco_env.MujocoEnv, utils.EzPickle):
FILE = "point.xml"
ORI_IND = 2
def __init__(self, file_path=None, expose_all_qpos=True):
self._expose_all_qpos = expose_all_qpos
self.add_noise = False
mujoco_env.MujocoEnv.__init__(self, file_path, 1)
utils.EzPickle.__init__(self)
@property
def physics(self):
return self.model
def _step(self, a):
return self.step(a)
def step(self, action):
action[0] = 0.2 * action[0]
qpos = np.copy(self.data.qpos)
qpos[2] += action[1]
ori = qpos[2]
# compute increment in each direction
dx = math.cos(ori) * action[0]
dy = math.sin(ori) * action[0]
# ensure that the robot is within reasonable range
qpos[0] = np.clip(qpos[0] + dx, -100, 100)
qpos[1] = np.clip(qpos[1] + dy, -100, 100)
qvel = self.data.qvel
self.set_state(qpos, qvel)
for _ in range(0, self.frame_skip):
self.sim.step()
next_obs = self._get_obs()
reward = 0
done = False
info = {}
return next_obs, reward, done, info
def _get_obs(self):
if self._expose_all_qpos:
obs = np.concatenate([
self.data.qpos.flat[:3], # Only point-relevant coords.
self.data.qvel.flat[:3]])
if self.add_noise:
obs = np.concatenate((obs, np.random.uniform(low=-2, high=2, size=20)))
return obs
return np.concatenate([
self.data.qpos.flat[2:3],
self.data.qvel.flat[:3]])
def reset_model(self):
qpos = self.init_qpos + self.np_random.uniform(
size=self.model.nq, low=-.1, high=.1)
qvel = self.init_qvel + self.np_random.randn(self.model.nv) * .1
# Set everything other than point to original position and 0 velocity.
qpos[3:] = self.init_qpos[3:]
qvel[3:] = 0.
self.set_state(qpos, qvel)
return self._get_obs()
def get_ori(self):
return self.data.qpos[self.__class__.ORI_IND]
def set_xy(self, xy):
qpos = np.copy(self.data.qpos)
qpos[0] = xy[0]
qpos[1] = xy[1]
qvel = self.data.qvel
self.set_state(qpos, qvel)
def get_xy(self):
qpos = np.copy(self.data.qpos)
return qpos[:2]
def viewer_setup(self):
# self.viewer.cam.trackbodyid = 1
# self.viewer.cam.distance = self.model.stat.extent * 0.7
# self.viewer.cam.lookat[2] = 0.8925
# self.viewer.cam.elevation = 0
self.viewer.cam.trackbodyid = -1
self.viewer.cam.distance = 60
self.viewer.cam.elevation = -90
from .maze_env import MazeEnv
from .point import PointEnv
class PointMazeEnv(MazeEnv):
MODEL_CLASS = PointEnv
import numpy as np
from gym import utils
from gym.envs.mujoco import mujoco_env
class SwimmerEnv(mujoco_env.MujocoEnv, utils.EzPickle):
ORI_IND = 2
FILE = "swimmer.xml"
def __init__(self, file_path=None, expose_all_qpos=True):
self._expose_all_qpos = expose_all_qpos
self.add_noise = False
mujoco_env.MujocoEnv.__init__(self, file_path, 4)
utils.EzPickle.__init__(self)
def _step(self, a):
return self.step(a)
def step(self, a):
ctrl_cost_coeff = 0.0001
xposbefore = self.sim.data.qpos[0]
self.do_simulation(a, self.frame_skip)
xposafter = self.sim.data.qpos[0]
reward_fwd = (xposafter - xposbefore) / self.dt
reward_ctrl = - ctrl_cost_coeff * np.square(a).sum()
reward = reward_fwd + reward_ctrl
ob = self._get_obs()
return ob, reward, False, dict(reward_fwd=reward_fwd, reward_ctrl=reward_ctrl)
def _get_obs(self):
qpos = self.sim.data.qpos
qvel = self.sim.data.qvel
# print("qpos", qpos)
# print("qvel", qvel)
return np.concatenate([qpos.flat, qvel.flat])
def reset_model(self):
self.set_state(
self.init_qpos + self.np_random.uniform(low=-.1, high=.1, size=self.model.nq),
self.init_qvel + self.np_random.uniform(low=-.1, high=.1, size=self.model.nv)
)
return self._get_obs()
def get_ori(self):
return self.data.qpos[self.__class__.ORI_IND]
def set_xy(self, xy):
qpos = np.copy(self.data.qpos)
qpos[0] = xy[0]
qpos[1] = xy[1]
qvel = self.data.qvel
self.set_state(qpos, qvel)
def get_xy(self):
qpos = np.copy(self.data.qpos)
return qpos[:2]
def viewer_setup(self):
# self.viewer.cam.trackbodyid = 1
# self.viewer.cam.distance = self.model.stat.extent * 0.7
# self.viewer.cam.lookat[2] = 0.8925
# self.viewer.cam.elevation = 0
self.viewer.cam.trackbodyid = -1
self.viewer.cam.distance = 60
self.viewer.cam.elevation = -90
from .maze_env import MazeEnv
from .swimmer import SwimmerEnv
class SwimmerMazeEnv(MazeEnv):
MODEL_CLASS = SwimmerEnv
# copied from openai gym
import gym
from gym import spaces
from gym.utils import seeding
import numpy as np
class NChainEnv(gym.Env):
"""n-Chain environment
This game presents moves along a linear chain of states, with two actions:
0) forward, which moves along the chain but returns no reward
1) backward, which returns to the beginning and has a small reward
The end of the chain, however, presents a large reward, and by moving
'forward' at the end of the chain this large reward can be repeated.
At each action, there is a small probability that the agent 'slips' and the
opposite transition is instead taken.
The observed state is the current state in the chain (0 to n-1).
This environment is described in section 6.1 of:
A Bayesian Framework for Reinforcement Learning by Malcolm Strens (2000)
http://ceit.aut.ac.ir/~shiry/lecture/machine-learning/papers/BRL-2000.pdf
"""
def __init__(self, n=5, slip=0.2, small=0.001, large=1.0):
self.n = n
self.n2 = bin(n-1)
print("n2", self.n2, len(self.n2)-2)
self.slip = slip # probability of 'slipping' an action
self.small = small # payout for 'backwards' action
self.large = large # payout at end of chain for 'forwards' action
self.state = 0 # Start at beginning of the chain
self.action_space = spaces.Box(low=-1., high=1., shape=(1,))
# self.observation_space = spaces.Discrete(self.n)
self.observation_space = spaces.Discrete(len(self.n2) - 2)
self.shuffle_order = np.arange(len(self.n2) - 2)
np.random.shuffle(self.shuffle_order)
self.seed()
target = np.zeros(n)
target[n-1] = 1
self.target = target
self.reward_type = "sparse"
self.visited_count = np.zeros(n)
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def step(self, action):
# print("action", action)
success = False
info = {}
assert self.action_space.contains(action)
if self.np_random.rand() < self.slip:
action = 0 - action # agent slipped, reverse action taken
if action < 0 and self.state > 0: # 'backwards': go back to the beginning, get small reward
reward = self.small
self.state -= 1
elif action > 0 and self.state < self.n - 1: # 'forwards': go up along the chain
reward = 0
self.state += 1
elif self.state == self.n - 1: # 'forwards': stay at the end of the chain, collect large reward
reward = self.large
success = True
else:
reward = 0
done = False
info["is_success"] = success
# print("state", self.state)
if self.visited_count[self.state] == 0:
self.visited_count[self.state] = 1
return self.get_obs(), reward, done, info
def reset(self):
self.state = 0
if self.visited_count[self.state] == 0:
self.visited_count[self.state] = 1.
return self.get_obs()
def get_obs(self):
new = np.zeros(len(self.n2) - 2)
# new[self.state] = 1
new2 = bin(self.state)
new2 = list(new2[2:])
new2.reverse()
for i, ele in enumerate(new2):
new[-(i+1)] = int(ele)
new = new[::-1]
# new = new[self.shuffle_order]
return {
"observation": np.copy(new),
"achieved_goal": np.copy(new),
"desired_goal": np.copy(self.target),
}
@property
def coverage(self):
return np.sum(self.visited_count) / self.n
\ No newline at end of file
import gym
import numpy as np
import cv2
from gym import spaces
def line_intersection(line1, line2):
# calculate the intersection point
xdiff = (line1[0][0] - line1[1][0], line2[0][0] - line2[1][0])
ydiff = (line1[0][1] - line1[1][1], line2[0]
[1] - line2[1][1]) # Typo was here
def det(a, b):
return a[0] * b[1] - a[1] * b[0]
div = det(xdiff, ydiff)
if div == 0:
raise Exception('lines do not intersect')
d = (det(*line1), det(*line2))
x = det(d, xdiff) / div
y = det(d, ydiff) / div
return x, y
def check_cross(x0, y0, x1, y1):
x0 = np.array(x0)
y0 = np.array(y0)
x1 = np.array(x1)
y1 = np.array(y1)
return np.cross(x1 - x0, y0 - x0), np.cross(y0 - x0, y1 - x0)
def check_itersection(x0, y0, x1, y1):
EPS = 1e-10
def sign(x):
if x > EPS:
return 1
if x < -EPS:
return -1
return 0
f1, f2 = check_cross(x0, y0, x1, y1)
f3, f4 = check_cross(x1, y1, x0, y0)
if sign(f1) == sign(f2) and sign(f3) == sign(f4) and sign(f1) != 0 and sign(f3) != 0:
return True
return False
class PlaneBase(gym.Env):
def __init__(self, rects, R, is_render=False, size=512):
self.rects = rects
self.n = len(self.rects)
self.size = size
self.map = np.ones((size, size, 3), dtype=np.uint8) * 255
self.R = R
self.R2 = R ** 2
self.board = np.array(
[[0, 0],
[1, 1]],
dtype='float32')
self.action_space = gym.spaces.Box(
low=-R, high=R, shape=(2,), dtype='float32')
self.observation_space = gym.spaces.Box(
low=0., high=1., shape=(2,), dtype='float32')
if is_render:
cv2.namedWindow('image', cv2.WINDOW_NORMAL)
self.image_name = 'image'
for i in range(self.n):
for j in range(i + 1, self.n):
if check_itersection(self.rects[i][0], self.rects[i][1], self.rects[j][0], self.rects[j][0]):
raise Exception("Rectangle interaction with each other")
for ((x0, y0), (x1, y1)) in rects:
x0, y0 = int(x0 * size), int(y0 * size)
x1, y1 = int(x1 * size), int(y1 * size)
cv2.rectangle(self.map, (x0, y0), (x1, y1), (0, 255, 0), 1)
ps = np.array([
[x0, y0],
[x1, y0],
[x1, y1],
[x0, y1],
], dtype=np.int32)
cv2.fillConvexPoly(self.map, ps, (127, 127, 127))
self.state = (0, 0)
self.reset()
def restore(self, obs):
self.state = (float(obs[0]), float(obs[1]))
def rect_lines(self, rect):
(x0, y0), (x1, y1) = rect
yield (x0, y0), (x1, y0)
yield (x1, y0), (x1, y1)
yield (x1, y1), (x0, y1)
yield (x0, y1), (x0, y0)
def l2dist(self, x, y):
return ((y[0] - x[0]) ** 2) + ((y[1] - x[1]) ** 2)
def check_inside(self, p):
EPS = 1e-10
for i in self.rects:
if p[0] > i[0][0] + EPS and p[0] < i[1][0] - EPS and p[1] > i[0][1] + EPS and p[1] < i[1][1] - EPS:
return True
return False
def step(self, action):
dx, dy = action
l = 0.0001
p = (self.state[0] + dx * l, self.state[1] + dy * l)
if self.check_inside(p) or p[0] > 1 or p[1] > 1 or p[0] < 0 or p[1] < 0:
return np.array(self.state), 0, False, {}
dest = (self.state[0] + dx, self.state[1] + dy)
md = self.l2dist(self.state, dest)
_dest = dest
line = (self.state, dest)
for i in list(self.rects) + [self.board]:
for l in self.rect_lines(i):
if check_itersection(self.state, dest, l[0], l[1]):
inter_point = line_intersection(line, l)
d = self.l2dist(self.state, inter_point)
if d < md:
md = d
_dest = inter_point
self.restore(_dest)
return np.array(self.state), -md, False, {}
def render(self, mode='human'):
image = self.map.copy()
x, y = self.state
x = int(x * self.size)
y = int(y * self.size)
cv2.circle(image, (x, y), 5, (255, 0, 255), -1)
if mode == 'human':
cv2.imshow('image', image)
cv2.waitKey(2)
else:
return image
def reset(self):
inside_rect = True
while inside_rect:
a, b = np.random.random(), np.random.random()
inside_rect = self.check_inside((a, b))
self.state = (a, b)
return np.array(self.state)
class NaivePlane(PlaneBase):
def __init__(self, is_render=True, R=300, size=512):
PlaneBase.__init__(self,
[
np.array([[128, 128], [300, 386]]) / 512,
np.array([[400, 400], [500, 500]]) / 512,
],
R, is_render=is_render, size=size),
class NaivePlane2(PlaneBase):
# two rectangle
def __init__(self, is_render=True, R=300, size=512):
PlaneBase.__init__(self,
[
np.array([[64, 64], [256, 256]]) / 512,
np.array([[300, 128], [400, 500]]) / 512,
],
R, is_render=is_render, size=size),
class NaivePlane3(PlaneBase):
# four rectangle
def __init__(self, is_render=True, R=300, size=512):
PlaneBase.__init__(self,
[
np.array([[64, 64], [192, 192]]) / 512,
np.array([[320, 64], [448, 192]]) / 512,
np.array([[320, 320], [448, 448]]) / 512,
np.array([[64, 320], [192, 448]]) / 512,
],
R, is_render=is_render, size=size),
class NaivePlane4(PlaneBase):
# four rectangle
def __init__(self, is_render=True, R=300, size=512):
PlaneBase.__init__(self,
[
np.array([[64, 64], [192, 512]]) / 512,
np.array([[320, 64], [448, 512]]) / 512,
],
R, is_render=is_render, size=size),
class NaivePlane5(PlaneBase):
# four rectangle
def __init__(self, is_render=False, R=300, size=512):
PlaneBase.__init__(self,
[
np.array([[0, 1. / 3], [2. / 3, 2. / 3]]),
],
R, is_render=is_render, size=size),
if __name__ == '__main__':
env = NaivePlane5()
obs = env.reset()
while True:
print(obs)
env.render()
while True:
try:
print('entering the dir (x, y)')
act = input().strip().split(' ')
act = float(act[0]) / 512, float(act[1]) / 512
break
except KeyboardInterrupt as e:
raise e
except:
continue
obs, reward, _, _ = env.step(act)
import cv2
import torch
import numpy as np
## This is used to store a video for remote visualization
def play(env, policy, video_path="tmp.avi", time_limit=500, device='cpu'):
out = None
obs = env.reset()
num = 0
rew = None
action = None
info = None
flag = False
while True:
img = env.unwrapped.render(mode='rgb_array')[:, :, ::-1].copy()
'''
if True and isinstance(obs, dict):
np.set_printoptions(precision=3)
achieved = (float(obs['achieved_goal'][0]), float(obs['achieved_goal'][1]))
desired = (float(obs['desired_goal'][0]), float(obs['desired_goal'][1]))
cv2.putText(img, " obs: {:.3f} {:.3f}".format(achieved[0], achieved[1]), (400,25), cv2.FONT_HERSHEY_SIMPLEX, 0.3, 255)
cv2.putText(img, "goal: {:.3f} {:.3f}".format(desired[0], desired[1]), (400,50), cv2.FONT_HERSHEY_SIMPLEX, 0.3, 255)
if rew is not None:
cv2.putText(img, "rew: {:.3f}".format(rew), (400,75), cv2.FONT_HERSHEY_SIMPLEX, 0.3, 255)
if action is not None:
action = [float(i) for i in action][:2]
cv2.putText(img, "rew: {:.3f} {:.3f}".format(action[0], action[1]), (400,100), cv2.FONT_HERSHEY_SIMPLEX, 0.3, 255)
if info is not None:
if 'is_success' in info:
cv2.putText(img, "success? {}".format(info['is_success']), (400,125), cv2.FONT_HERSHEY_SIMPLEX, 0.3, 255)
cv2.putText(img, "step {}".format(num), (400,150), cv2.FONT_HERSHEY_SIMPLEX, 0.3, 255)
flag = True
'''
if out is None:
out = cv2.VideoWriter(
video_path, cv2.VideoWriter_fourcc(*'XVID'), 20.0, (img.shape[1], img.shape[0]))
out.write(img)
if isinstance(obs, dict):
goal = torch.tensor(obs['desired_goal'], dtype=torch.float32).to(device)
obs = torch.tensor(obs['observation'], dtype=torch.float32).to(device)
action = policy(obs.unsqueeze(0), goal.unsqueeze(0))
if isinstance(action, torch.Tensor):
action = action.detach().cpu().numpy()
else:
action = policy(np.array(obs)[None]).action[0].detach().cpu().numpy()
obs, rew, done, info = env.step(action)
if done:
obs = env.reset()
num += 1
# assert not info['is_success']
flag = True
if not flag:
print(num, info, rew, done, env.goal, action)
if num == time_limit - 1:
break
import torch
from torch import nn
import numpy as np
class L1(nn.Module):
def __init__(self):
nn.Module.__init__(self)
def forward(self, s, t):
if isinstance(s, np.ndarray):
s = torch.from_numpy(s).float()
if isinstance(t, np.ndarray):
t = torch.from_numpy(t).float()
out = torch.abs(s - t)
return out.view(out.size(0), -1).sum(dim=1)
class L2(nn.Module):
def __init__(self):
nn.Module.__init__(self)
def forward(self, s, t):
out = (s - t) ** 2
return (out.view(out.size(0), -1).sum(dim=1) + 1e-14) ** 0.5
class DotProd(nn.Module):
def __init__(self):
nn.Module.__init__(self)
def forward(self, s, t):
if isinstance(s, np.ndarray):
s = torch.from_numpy(s).float()
if isinstance(t, np.ndarray):
t = torch.from_numpy(t).float()
out = (s * t[:, None, :]).sum(dim=2)[:, 0]
return out
class MLPDist(nn.Module):
def __init__(self, inp_dim):
nn.Module.__init__(self)
self.dim = inp_dim
self.mlp = nn.Sequential(
nn.Linear(self.dim * 2, self.dim),
nn.ReLU(),
nn.Linear(self.dim, self.dim),
nn.ReLU(),
nn.Linear(self.dim, 1),
)
def forward(self, s, t):
if isinstance(s, np.ndarray):
s = torch.from_numpy(s).float()
if isinstance(t, np.ndarray):
t = torch.from_numpy(t).float()
out = self.mlp(torch.cat([s, t], dim=1))
return out.squeeze(-1)
class Distance(nn.Module):
def __init__(self, encoder, distance):
nn.Module.__init__(self)
self.encoder = encoder
self.metrics = distance
def forward(self, s, t):
s = self.encoder(s)
t = self.encoder(t)
return self.metrics(s, t)
class MultiEncoderDistance(nn.Module):
def __init__(self, encoder_s, encoder_t, distance):
nn.Module.__init__(self)
self.encoder_s = encoder_s
self.encoder_t = encoder_t
self.metrics = distance
def forward(self, s, t):
s = self.encoder_s(s)
t = self.encoder_t(t)
return self.metrics(s, t)
import numpy as np
import gym
from arguments.arguments_hier_sac import get_args_ant, get_args_chain
from algos.hier_sac import hier_sac_agent
from goal_env.mujoco import *
import random
import torch
def get_env_params(env):
obs = env.reset()
# close the environment
params = {'obs': obs['observation'].shape[0], 'goal': obs['desired_goal'].shape[0],
'action': env.action_space.shape[0], 'action_max': env.action_space.high[0],
'max_timesteps': env._max_episode_steps}
return params
def launch(args):
# create the ddpg_agent
env = gym.make(args.env_name)
test_env = gym.make(args.test)
# if args.env_name == "AntPush-v1":
# test_env1 = gym.make("AntPushTest1-v1")
# test_env2 = gym.make("AntPushTest2-v1")
# elif args.env_name == "AntMaze1-v1":
# test_env1 = gym.make("AntMaze1Test1-v1")
# test_env2 = gym.make("AntMaze1Test2-v1")
# else:
test_env1 = test_env2 = None
print("test_env", test_env1, test_env2)
# set random seeds for reproduce
env.seed(args.seed)
if args.env_name != "NChain-v1":
env.env.env.wrapped_env.seed(args.seed)
test_env.env.env.wrapped_env.seed(args.seed)
random.seed(args.seed)
np.random.seed(args.seed)
torch.manual_seed(args.seed)
if args.device is not 'cpu':
torch.cuda.manual_seed(args.seed)
gym.spaces.prng.seed(args.seed)
# get the environment parameters
if args.env_name[:3] in ["Ant", "Poi", "Swi"]:
env.env.env.visualize_goal = args.animate
test_env.env.env.visualize_goal = args.animate
env_params = get_env_params(env)
env_params['max_test_timesteps'] = test_env._max_episode_steps
# create the ddpg agent to interact with the environment
sac_trainer = hier_sac_agent(args, env, env_params, test_env, test_env1, test_env2)
if args.eval:
if not args.resume:
print("random policy !!!")
# sac_trainer._eval_hier_agent(test_env)
# sac_trainer.vis_hier_policy()
# sac_trainer.cal_slow()
# sac_trainer.visualize_representation(100)
# sac_trainer.vis_learning_process()
# sac_trainer.picvideo('fig/final/', (1920, 1080))
else:
sac_trainer.learn()
# get the params
args = get_args_ant()
# args = get_args_chain()
# args = get_args_fetch()
# args = get_args_point()
if __name__ == '__main__':
launch(args)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment