Commit 7beec4a5 by Dinple

environment WIP

parent 7789080d
......@@ -85,5 +85,11 @@ $$
Notice a smoothing range can be set for congestion. This is only applied to congestion due to net routing which by counting adjacent cells and adding the averaged congestion to these adjacent cells. More details are provided in the document above.
## Placement Util
**Disclaimer: We DO NOT own the content of placement_util_os.py. All rights belong to Google Authors. This is a modified version of placement_util.py and we are including in the repo for the sake of testing. Original Code can be viewed [here](https://github.com/google-research/circuit_training/blob/main/circuit_training/environment/placement_util.py)**.
## Observation Extractor
**Disclaimer: We DO NOT own the content of observation_extractor_os.py. All rights belong to Google Authors. This is a modified version of observation_extractor.py and we are including in the repo for the sake of testing. Original Code can be viewed [here](https://github.com/google-research/circuit_training/blob/main/circuit_training/environment/observation_extractor.py)**.
# coding=utf-8
# Copyright 2021 The Circuit Training Team Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Coordinate descent placer library."""
import os
import time
from typing import Callable, Dict, Optional, List, Text, Tuple
from absl import logging
from Plc_client import placement_util_os as placement_util
from Plc_client import plc_client as plc_client
import numpy as np
NS_ORIENTATIONS = ['N', 'FN', 'S', 'FS']
EW_ORIENTATIONS = ['E', 'FE', 'W', 'FW']
class CoordinateDescentPlacer(object):
"""Coordinate descent algorithm to place nodes."""
def __init__(self,
plc: plc_client.PlacementCost,
cost_fn: Callable[[plc_client.PlacementCost],
Tuple[float, Dict[Text, float]]],
epochs: int = 10,
use_stdcell_placer: bool = False,
stdcell_placer: Text = 'fd',
node_order: Text = 'random',
accept_bad_stdcell_moves: bool = False,
stdcell_place_every_n_macros: int = 10,
optimize_only_orientation: bool = False,
cell_search_prob: float = 1.0,
k_distance_bounded_search: bool = True,
k_distance_bound: Optional[int] = None) -> None:
"""Creates a CoordinateDescentPlacer.
Args:
plc: The placement cost object.
cost_fn: The cost function that gets the plc and returns cost and info.
epochs: Number of epochs (iterations) in coordinate descend algorithm.
use_stdcell_placer: If True, places stdcells using stdcell placer.
stdcell_placer: Standad cell placer.
node_order: Order of nodes to place using coordinate descent. Choose
random, descending_size_macro_first, random_macro_first.
accept_bad_stdcell_moves: If True, accept stdcell moves even if it leads
to a higher cost.
stdcell_place_every_n_macros: Run stdcell placement for every n macros. If
None, run stdcell placement once after all macros are placed.
optimize_only_orientation: If True, only search for best orientation of
the hard macros.
cell_search_prob: The probability to include a neighborhood cell to
search. When it is 1.0, descents at the steepest direction.'
k_distance_bounded_search: If True, only search best locations within k
grid distance from current placed location. Does not apply to FD stdcell
placer.
k_distance_bound: If k_distance_bounded_search is True, only search within
a neighborhood of at most k_distance_bound grid distance. If not
spesified, it is set to max(cols, rows) // 3.
"""
self.plc = plc
self.cost_fn = cost_fn
self._epochs = epochs
self._node_order = node_order
self._stdcell_place_every_n_macros = stdcell_place_every_n_macros
self._cell_search_prob = cell_search_prob
self._cols, self._rows = self.plc.get_grid_num_columns_rows()
self._k_distance_bound = k_distance_bound or max(self._cols,
self._rows) // 3
self._use_stdcell_placer = use_stdcell_placer
self._stdcell_placer = stdcell_placer
self._accept_bad_stdcell_moves = accept_bad_stdcell_moves
self._optimize_only_orientation = optimize_only_orientation
self._k_distance_bounded_search = k_distance_bounded_search
if self._cell_search_prob < 0 or self._cell_search_prob > 1:
raise ValueError(f'{self._cell_search_prob} should be between 0 and 1.')
# Turn off incremental cost calculation if placing stdcells.
if self._use_stdcell_placer:
plc.set_use_incremental_cost(False)
# Get legal node orientations.
self._node_to_ori = {}
for node in self.plc.get_macro_indices():
if not self.plc.is_node_soft_macro(node):
# TODO(wenjiej): Find orientation when a node is not placed initially.
# Needed only when running CD from an empty grid.
assert self.plc.is_node_placed(node)
cur_ori = self.plc.get_macro_orientation(node)
if cur_ori in NS_ORIENTATIONS:
self._node_to_ori[node] = NS_ORIENTATIONS
elif cur_ori in EW_ORIENTATIONS:
self._node_to_ori[node] = EW_ORIENTATIONS
else:
raise ValueError(f'Unexpected orientation {cur_ori} for node {node}.')
if self._use_stdcell_placer:
plc.allow_hard_macros_over_std_cells(True)
# If node order is random, will shuffle node orders for each iteration.
self._ordered_node_indices = placement_util.get_ordered_node_indices(
self._node_order, self.plc)
# Exclude fixed macros with pre-determined locations.
self._ordered_node_indices = [
m for m in self._ordered_node_indices if not self.plc.is_node_fixed(m)
]
self._soft_macro_indices = [
m for m in self._ordered_node_indices if self.plc.is_node_soft_macro(m)
]
if self._use_stdcell_placer:
# Only include hard macros in self._ordered_node_indices.
self._ordered_node_indices = [
i for i in self._ordered_node_indices
if not self.plc.is_node_soft_macro(i)
]
logging.info('Total number of ordered nodes: %d',
len(self._ordered_node_indices))
logging.info('ordered_node_indices: %s', self._ordered_node_indices)
logging.info('Cost of initial placement: %s', self.report_cost())
def find_best_location(self, node: int, mask: List[int],
locations: List[int]) -> Optional[int]:
"""Given a soft macro, search the best location."""
best_loc = None
best_cost = float('inf')
for loc in locations:
assert mask[loc] == 1
self.plc.place_node(node, loc)
new_cost, _ = self.cost_fn(self.plc)
self.plc.unplace_node(node)
if new_cost < best_cost:
best_loc = loc
best_cost = new_cost
return best_loc
def find_best_location_orientation(
self, node: int, locations: List[int],
orientations: List[Text]) -> Tuple[Optional[int], Optional[Text]]:
"""Given a hard macro, search the best location and orientation."""
assert orientations
best_loc = None
best_ori = None
best_cost = float('inf')
for loc in locations:
for ori in orientations:
self.plc.place_node(node, loc)
self.plc.update_macro_orientation(node, ori)
new_cost, _ = self.cost_fn(self.plc)
self.plc.unplace_node(node)
if new_cost < best_cost:
best_loc = loc
best_ori = ori
best_cost = new_cost
return best_loc, best_ori
def find_best_orientation(self, node: int,
orientations: List[Text]) -> Optional[Text]:
"""Given a hard macro, search the best orientation."""
assert orientations
best_ori = None
best_cost = float('inf')
for ori in orientations:
self.plc.update_macro_orientation(node, ori)
new_cost, _ = self.cost_fn(self.plc)
if new_cost < best_cost:
best_ori = ori
best_cost = new_cost
return best_ori
def _get_row_col_from_cell(self, cell: int) -> Tuple[int, int]:
return cell // self._cols, cell % self._cols
def _get_cell_from_row_col(self, row: int, col: int) -> int:
return int(row * self._cols + col)
def _k_distance_bounded_locations(self, curr: int, k: int,
locations: List[int]) -> List[int]:
"""Find k grid distance bounded locations from current cell."""
curr_row, curr_col = self._get_row_col_from_cell(curr)
bounded = []
for c in locations:
if c == curr:
# Always include current location to search.
bounded.append(c)
continue
row, col = self._get_row_col_from_cell(c)
if abs(row - curr_row) + abs(col - curr_col) <= k:
if np.random.random() <= self._cell_search_prob:
bounded.append(c)
return bounded
def place_node(self, node: int) -> None:
"""Given a node, greedily place the node on the best location wrt cost."""
if not self.plc.is_node_soft_macro(node):
orientations = self._node_to_ori[node]
if self._optimize_only_orientation:
# Placing and unplacing macros cause wiered problems in FD.
# See cl/316830807. Avoid unplacing for orientation optimization.
best_ori = self.find_best_orientation(node, orientations)
self.plc.update_macro_orientation(node, best_ori)
return
# Unplace the node from its current location to prepare placing node.
curr_cell = self.plc.get_grid_cell_of_node(node)
self.plc.unplace_node(node)
mask = self.plc.get_node_mask(node)
locations = [i for i, m in enumerate(mask) if m > 0]
if not locations:
# FD or DP are run between macro moves (_stdcell_place_every_n_macros).
# They may place stdcells in a way that invalidates prior macro locations.
# Stay with previous macro locations in this case.
locations = [curr_cell]
logging.info(
'Cannot find feasible locations for node %d. '
'Use its current location %d.', node, curr_cell)
if self._k_distance_bounded_search:
k = self._k_distance_bound
# Increase search scope until there is at least one feasible location.
while True:
bounded = self._k_distance_bounded_locations(curr_cell, k, locations)
if bounded:
locations = bounded
break
else:
k += self._k_distance_bound
if self.plc.is_node_soft_macro(node):
best_loc = self.find_best_location(node, mask, locations)
self.plc.place_node(node, best_loc)
else:
best_loc, best_ori = self.find_best_location_orientation(
node, locations, orientations)
self.plc.place_node(node, best_loc)
self.plc.update_macro_orientation(node, best_ori)
def place_stdcells(self) -> None:
"""Place stdcells."""
logging.info('Place stdcells using %s', self._stdcell_placer)
old_cost, _ = self.cost_fn(self.plc)
old_coordinates = [
self.plc.get_node_location(m) for m in self._soft_macro_indices
]
if self._stdcell_placer == 'fd':
# Use default FD schedule.
# Use current stdcell location to incrementally change stdcell locations
# between iterations.
placement_util.fd_placement_schedule(self.plc, use_current_loc=True)
else:
raise ValueError(
f'stdcell placer {self._stdcell_placer} is not supported')
new_cost, _ = self.cost_fn(self.plc)
if new_cost > old_cost and not self._accept_bad_stdcell_moves:
logging.info('Bad stdcell placement moves not accepted.')
# Revert to old node coordinates.
for i, (x, y) in enumerate(old_coordinates):
self.plc.update_node_coords(self._soft_macro_indices[i], x, y)
def optimize(self, epoch: int) -> None:
"""Performs one iteration (epoch) of coordinate descent on all nodes."""
logging.info('Starts optimization in epoch %d.', epoch)
start_time = time.time()
node_indices = self._ordered_node_indices
if self._node_order == 'random':
np.random.shuffle(node_indices)
for i, node in enumerate(node_indices):
if i % 25 == 0:
logging.info('Number of nodes placed by CD: %d', i)
self.place_node(node)
if (self._use_stdcell_placer and self._stdcell_place_every_n_macros and
(i + 1) % self._stdcell_place_every_n_macros == 0):
self.place_stdcells()
# Always run stdcell placement after all macros are placed.
if self._use_stdcell_placer:
self.place_stdcells()
logging.info('One iteration of coordinate descent takes %f seconds.',
(time.time() - start_time))
def report_cost(self) -> Text:
proxy_cost, info = self.cost_fn(self.plc)
wirelength = info['wirelength']
congestion = info['congestion']
density = info['density']
return ('(Objective cost, wirelength, congestion, density): ' +
'({:.4f}, {:.4f}, {:.4f}, {:.4f}'.format(proxy_cost, wirelength,
congestion, density))
def place(self) -> None:
"""Place all nodes using coordinate descent algorithm for some iterations."""
# Run stdcell placement at the beginning of the optimization loop if needed.
# Use stdcell locations from initial placement.
if self._use_stdcell_placer:
self.place_stdcells()
prev_cost, _ = self.cost_fn(self.plc)
for i in range(self._epochs):
self.optimize(i)
logging.info('Cost after %d epochs: %s', i + 1, self.report_cost())
curr_cost, _ = self.cost_fn(self.plc)
if (prev_cost - curr_cost) / prev_cost < 1e-3:
break
prev_cost = curr_cost
def save_placement(self, output_dir: Text, plc_filename: Text) -> None:
"""Saves a placement with current plc."""
proxy_cost, info = self.cost_fn(self.plc)
wirelength = info['wirelength']
congestion = info['congestion']
density = info['density']
plc_filename_with_cost = 'cost_{:.4f}_w_{:.4f}_c_{:.4f}_d_{:.4f}_{}'.format(
proxy_cost, wirelength, congestion, density, plc_filename)
output_plc_file = os.path.join(output_dir, plc_filename_with_cost)
placement_util.save_placement(self.plc, output_plc_file)
# TODO(wenjiej): Enable saving plc view.
# placement_util.save_as_svg(self.plc, f'{output_plc_file}.svg')
\ No newline at end of file
# coding=utf-8
# Copyright 2021 The Circuit Training Team Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Circuit training Environmnet with gin config."""
import datetime
import math
import os
from typing import Any, Callable, Dict, Text, Tuple, Optional
from absl import logging
from Plc_client import coordinate_descent_placer as cd_placer
from Plc_client import observation_config
from Plc_client import observation_extractor_os as observation_extractor
from Plc_client import placement_util_os as placement_util
from Plc_client import plc_client as plc_client
from Plc_client import plc_client_os as plc_client_os
import gin
import gym
import numpy as np
import tensorflow as tf
# from tf_agents.environments import suite_gym
# from tf_agents.environments import wrappers
ObsType = Dict[Text, np.ndarray]
InfoType = Dict[Text, float]
class InfeasibleActionError(ValueError):
"""An infeasible action were passed to the env."""
def __init__(self, action, mask):
"""Initialize an infeasible action error.
Args:
action: Infeasible action that was performed.
mask: The mask associated with the current observation. mask[action] is
`0` for infeasible actions.
"""
ValueError.__init__(self, action, mask)
self.action = action
self.mask = mask
def __str__(self):
return 'Infeasible action (%s) when the mask is (%s)' % (self.action,
self.mask)
@gin.configurable
def cost_info_function(
plc: plc_client.PlacementCost,
done: bool,
wirelength_weight: float = 1.0,
density_weight: float = 0.5,
congestion_weight: float = 0.5) -> Tuple[float, Dict[Text, float]]:
"""Returns the RL cost and info.
Args:
plc: Placement cost object.
done: Set if it is the terminal step.
wirelength_weight: Weight of wirelength in the reward function.
density_weight: Weight of density in the reward function.
congestion_weight: Weight of congestion in the reward function used only for
legalizing the placement in greedy std cell placer.
Returns:
The RL cost.
Raises:
ValueError: When the cost mode is not supported.
Notes: we found the default congestion and density weights more stable.
"""
proxy_cost = 0.0
if not done:
return proxy_cost, {
'wirelength': -1.0,
'congestion': -1.0,
'density': -1.0,
}
wirelength = -1.0
congestion = -1.0
density = -1.0
if wirelength_weight > 0.0:
wirelength = plc.get_cost()
proxy_cost += wirelength_weight * wirelength
if congestion_weight > 0.0:
congestion = plc.get_congestion_cost()
proxy_cost += congestion_weight * congestion
if density_weight > 0.0:
density = plc.get_density_cost()
proxy_cost += density_weight * density
info = {
'wirelength': wirelength,
'congestion': congestion,
'density': density,
}
return proxy_cost, info
@gin.configurable
class CircuitEnv(object):
"""Defines the CircuitEnv class."""
INFEASIBLE_REWARD = -1.0
def __init__(
self,
netlist_file: Text = '',
init_placement: Text = '',
_plc = None,
create_placement_cost_fn: Callable[
..., plc_client.PlacementCost] = placement_util.create_placement_cost,
std_cell_placer_mode: Text = 'fd',
cost_info_fn: Callable[[plc_client.PlacementCost, bool],
Tuple[float, Dict[Text,
float]]] = cost_info_function,
global_seed: int = 0,
is_eval: bool = False,
save_best_cost: bool = False,
output_plc_file: Text = '',
make_soft_macros_square: bool = True,
cd_finetune: bool = False,
cd_plc_file: Text = 'ppo_cd_placement.plc',
train_step: Optional[tf.Variable] = None,
unplace_all_nodes_in_init: bool = True):
"""Creates a CircuitEnv.
Args:
netlist_file: Path to the input netlist file.
init_placement: Path to the input inital placement file, used to read grid
and canas size.
create_placement_cost_fn: A function that given the netlist and initial
placement file create the placement_cost object.
std_cell_placer_mode: Options for fast std cells placement: `fd` (uses the
force-directed algorithm).
cost_info_fn: The cost function that given the plc object returns the RL
cost.
global_seed: Global seed for initializing env features. This seed should
be the same across actors. Not used currently.
is_eval: If set, save the final placement in output_dir.
save_best_cost: Boolean, if set, saves the palcement if its cost is better
than the previously saved palcement.
output_plc_file: The path to save the final placement.
make_soft_macros_square: If True, make the shape of soft macros square
before using analytical std cell placers like FD.
cd_finetune: If True, runs coordinate descent to finetune macro
orientations. Supposed to run in eval only, not training.
cd_plc_file: Name of the CD fine-tuned plc file, the file will be save in
the same dir as output_plc_file
train_step: A tf.Variable indicating the training step, only used for
saving plc files in the evaluation.
unplace_all_nodes_in_init: Unplace all nodes after initialization.
"""
del global_seed
if not netlist_file:
raise ValueError('netlist_file must be provided.')
self.netlist_file = netlist_file
self._std_cell_placer_mode = std_cell_placer_mode
self._cost_info_fn = cost_info_fn
self._is_eval = is_eval
self._save_best_cost = save_best_cost
self._output_plc_file = output_plc_file
self._output_plc_dir = os.path.dirname(output_plc_file)
self._make_soft_macros_square = make_soft_macros_square
self._cd_finetune = cd_finetune
self._cd_plc_file = cd_plc_file
self._train_step = train_step
self._plc = create_placement_cost_fn(plc_client=_plc,
netlist_file=netlist_file, init_placement=init_placement)
# We call ObservationExtractor before unplace_all_nodes, so we use the
# inital placement in the static features (location_x and location_y).
# This results in better placements.
self._observation_config = observation_config.ObservationConfig()
self._observation_extractor = observation_extractor.ObservationExtractor(
plc=self._plc)
if self._make_soft_macros_square:
# It is better to make the shape of soft macros square before using
# analytical std cell placers like FD.
self._plc.make_soft_macros_square()
self._grid_cols, self._grid_rows = self._plc.get_grid_num_columns_rows()
self._canvas_width, self._canvas_height = self._plc.get_canvas_width_height(
)
self._hard_macro_indices = [
m for m in self._plc.get_macro_indices()
if not self._plc.is_node_soft_macro(m)
]
self._num_hard_macros = len(self._hard_macro_indices)
self._sorted_node_indices = placement_util.get_ordered_node_indices(
mode='descending_size_macro_first', plc=self._plc)
self._sorted_soft_macros = self._sorted_node_indices[self._num_hard_macros:]
# Generate a map from actual macro_index to its position in
# self.macro_indices. Needed because node adjacency matrix is in the same
# node order of plc.get_macro_indices.
self._macro_index_to_pos = {}
for i, macro_index in enumerate(self._plc.get_macro_indices()):
self._macro_index_to_pos[macro_index] = i
# Padding for mapping the placement canvas on the agent canvas.
rows_pad = self._observation_config.max_grid_size - self._grid_rows
cols_pad = self._observation_config.max_grid_size - self._grid_cols
self._up_pad = rows_pad // 2
self._right_pad = cols_pad // 2
self._low_pad = rows_pad - self._up_pad
self._left_pad = cols_pad - self._right_pad
self._saved_cost = np.inf
self._current_actions = []
self._current_node = 0
self._done = False
self._current_mask = self._get_mask()
self._infeasible_state = False
if unplace_all_nodes_in_init:
# TODO(b/223026568) Remove unplace_all_nodes from init
self._plc.unplace_all_nodes()
logging.warning('* Unplaced all Nodes in init *')
logging.info('***Num node to place***:%s', self._num_hard_macros)
@property
def observation_space(self) -> gym.spaces.Space:
"""Env Observation space."""
return self._observation_config.observation_space
@property
def action_space(self) -> gym.spaces.Space:
return gym.spaces.Discrete(self._observation_config.max_grid_size**2)
@property
def environment_name(self) -> Text:
return self.netlist_file
def get_static_obs(self):
"""Get the static observation for the environment.
Static observations are invariant across steps on the same netlist, such as
netlist metadata and the adj graphs. This should only be used for
generalized RL.
Returns:
Numpy array representing the observation
"""
return self._observation_extractor.get_static_features()
def get_cost_info(self,
done: bool = False) -> Tuple[float, Dict[Text, float]]:
return self._cost_info_fn(plc=self._plc, done=done) # pytype: disable=wrong-keyword-args # trace-all-classes
def _get_mask(self) -> np.ndarray:
"""Gets the node mask for the current node.
Returns:
List of 0s and 1s indicating if action is feasible or not.
"""
if self._done:
mask = np.zeros(self._observation_config.max_grid_size**2, dtype=np.int32)
else:
node_index = self._sorted_node_indices[self._current_node]
mask = np.asarray(self._plc.get_node_mask(node_index), dtype=np.int32)
mask = np.reshape(mask, [self._grid_rows, self._grid_cols])
pad = ((self._up_pad, self._low_pad), (self._right_pad, self._left_pad))
mask = np.pad(mask, pad, mode='constant', constant_values=0)
return np.reshape(
mask, (self._observation_config.max_grid_size**2,)).astype(np.int32)
def _get_obs(self) -> ObsType:
"""Returns the observation."""
if self._current_node > 0:
previous_node_sorted = self._sorted_node_indices[self._current_node - 1]
previous_node_index = self._macro_index_to_pos[previous_node_sorted]
else:
previous_node_index = -1
if self._current_node < self._num_hard_macros:
current_node_sorted = self._sorted_node_indices[self._current_node]
current_node_index = self._macro_index_to_pos[current_node_sorted]
else:
current_node_index = 0
# YW EDIT: get observation
res = self._observation_extractor.get_all_features(
previous_node_index=previous_node_index,
current_node_index=current_node_index,
mask=self._current_mask)
# self._plc.set_routes_per_micron(44, 44) Does not do anything
# logging.info("####### Observation Extractor #######")
# print(res)
# logging.info("####### End #######")
return res
def _run_cd(self):
"""Runs coordinate descent to finetune the current placement."""
# CD only modifies macro orientation.
# Plc modified by CD will be reset at the end of the episode.
def cost_fn(plc):
return self._cost_info_fn(plc=plc, done=True) # pytype: disable=wrong-keyword-args # trace-all-classes
cd = cd_placer.CoordinateDescentPlacer(
plc=self._plc,
cost_fn=cost_fn,
use_stdcell_placer=True,
optimize_only_orientation=True)
cd.place()
def _save_placement(self, cost: float) -> None:
"""Saves the current placement.
Args:
cost: the current placement cost.
Raises:
IOError: If we cannot write the placement to file.
"""
if not self._save_best_cost or (cost < self._saved_cost and
(math.fabs(cost - self._saved_cost) /
(cost) > 5e-3)):
user_comments = ''
if self._train_step:
user_comments = f'Train step : {self._train_step.numpy()}'
placement_util.save_placement(self._plc, self._output_plc_file,
user_comments)
ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
ppo_snapshot_file = os.path.join(
self._output_plc_dir,
f'snapshot_ppo_opt_placement_timestamp_{ts}_cost_{cost:.4f}.plc')
placement_util.save_placement(self._plc, ppo_snapshot_file, user_comments)
self._saved_cost = cost
# Only runs CD if this is the best RL placement seen so far.
if self._cd_finetune:
self._run_cd()
cost = self._cost_info_fn(plc=self._plc, done=True)[0] # pytype: disable=wrong-keyword-args # trace-all-classes
cd_plc_file = os.path.join(self._output_plc_dir, self._cd_plc_file)
placement_util.save_placement(self._plc, cd_plc_file, user_comments)
cd_snapshot_file = os.path.join(
self._output_plc_dir,
f'snapshot_ppo_cd_placement_timestamp_{ts}_cost_{cost:.4f}.plc')
placement_util.save_placement(self._plc, cd_snapshot_file,
user_comments)
def call_analytical_placer_and_get_cost(self):
"""Calls analytical placer.
Calls analystical placer and evaluates cost when all nodes are placed. Also,
saves the placement file for eval if all the macros are placed by RL.
Returns:
A tuple for placement cost and info.
"""
if self._done:
self.analytical_placer()
# Only evaluates placement cost when all nodes are placed.
# All samples in the episode receive the same reward equal to final cost.
# This is realized by setting intermediate steps cost as zero, and
# propagate the final cost with discount factor set to 1 in replay buffer.
cost, info = self._cost_info_fn(self._plc, self._done)
# We only save placement if all nodes by placed RL, because the dreamplace
# mix-sized placement may not be legal.
if self._current_node == self._num_hard_macros and self._is_eval:
self._save_placement(cost)
return -cost, info
def reset(self) -> ObsType:
"""Restes the environment.
Returns:
An initial observation.
"""
self._plc.unplace_all_nodes()
self._current_actions = []
self._current_node = 0
self._done = False
self._current_mask = self._get_mask()
return self._get_obs()
def translate_to_original_canvas(self, action: int) -> int:
"""Translates a raw location to real one in the original canvas."""
up_pad = (self._observation_config.max_grid_size - self._grid_rows) // 2
right_pad = (self._observation_config.max_grid_size - self._grid_cols) // 2
a_i = action // self._observation_config.max_grid_size - up_pad
a_j = action % self._observation_config.max_grid_size - right_pad
if 0 <= a_i < self._grid_rows or 0 <= a_j < self._grid_cols:
action = a_i * self._grid_cols + a_j
else:
raise InfeasibleActionError(action, self._current_mask)
return action
def place_node(self, node_index: int, action: int) -> None:
self._plc.place_node(node_index, self.translate_to_original_canvas(action))
def analytical_placer(self) -> None:
if self._std_cell_placer_mode == 'fd':
placement_util.fd_placement_schedule(self._plc)
else:
raise ValueError('%s is not a supported std_cell_placer_mode.' %
(self._std_cell_placer_mode))
def step(self, action: int) -> Tuple[ObsType, float, bool, Any]:
"""Steps the environment.
Args:
action: The action to take (should be a list of size 1).
Returns:
observation, reward, done, and info.
Raises:
RuntimeError: action taken after episode was done
InfeasibleActionError: bad action taken (action is not in feasible
actions)
"""
if self._done:
raise RuntimeError('Action taken after episode is done.')
action = int(action)
self._current_actions.append(action)
if self._current_mask[action] == 0:
raise InfeasibleActionError(action, self._current_mask)
node_index = self._sorted_node_indices[self._current_node]
self.place_node(node_index, action)
self._current_node += 1
self._done = (self._current_node == self._num_hard_macros)
self._current_mask = self._get_mask()
if not self._done and not np.any(self._current_mask):
# Please note that _infeasible_state is not reset in reset function so,
# the caller of step() is responsible for resetting it.
self._infeasible_state = True
logging.info('Actions took before becoming infeasible: %s',
self._current_actions)
info = {
'wirelength': -1.0,
'congestion': -1.0,
'density': -1.0,
}
return self.reset(), self.INFEASIBLE_REWARD, True, info
cost, info = self.call_analytical_placer_and_get_cost()
return self._get_obs(), cost, self._done, info
# def create_circuit_environment(*args, **kwarg) -> wrappers.ActionClipWrapper:
# """Create an `CircuitEnv` wrapped as a Gym environment.
# Args:
# *args: Arguments.
# **kwarg: keyworded Arguments.
# Returns:
# PyEnvironment used for training.
# """
# env = CircuitEnv(*args, **kwarg)
# return wrappers.ActionClipWrapper(suite_gym.wrap_env(env))
......@@ -8,6 +8,7 @@ from absl import app
from Plc_client import plc_client_os as plc_client_os
from Plc_client import placement_util_os as placement_util
from Plc_client import observation_extractor_os as observation_extractor
from Plc_client import environment_os as environment
from Plc_client import observation_config
try:
......@@ -484,6 +485,7 @@ class PlacementCostTest():
self.extractor = observation_extractor.ObservationExtractor(
plc=plc, observation_config=self._observation_config)
"""
print("############################ TEST OBSERVATION EXTRACTOR ############################")
try:
assert self.PLC_PATH
except AssertionError:
......@@ -527,7 +529,32 @@ class PlacementCostTest():
pass
def test_environment(self):
pass
print("############################ TEST ENVIRONMENT ############################")
# Source: https://github.com/google-research/circuit_training/blob/d5e454e5bcd153a95d320f664af0d1b378aace7b/circuit_training/environment/environment_test.py#L39
def random_action(mask):
valid_actions, = np.nonzero(mask.flatten())
if len(valid_actions): # pylint: disable=g-explicit-length-test
return np.random.choice(valid_actions)
# If there is no valid choice, then `[0]` is returned which results in an
# infeasable action ending the episode.
return 0
env = environment.CircuitEnv(
_plc=plc_client,
create_placement_cost_fn=placement_util.create_placement_cost,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH)
env_os = environment.CircuitEnv(
_plc=plc_client_os,
create_placement_cost_fn=placement_util.create_placement_cost,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH)
print(" ++++++++++++++++++++++++++++++")
print(" +++ TEST ENVIRONMENT: PASS +++")
print(" ++++++++++++++++++++++++++++++")
def parse_flags(argv):
parser = argparse_flags.ArgumentParser(description='An argparse + app.run example')
......@@ -584,7 +611,8 @@ def main(args):
PCT.test_proxy_cost()
# PCT.test_placement_util()
# PCT.test_miscellaneous()
PCT.test_observation_extractor()
# PCT.test_observation_extractor()
PCT.test_environment()
if __name__ == '__main__':
app.run(main, flags_parser=parse_flags)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment