Unverified Commit 91564014 by Yucheng Wang Committed by GitHub

Merge pull request #23 from TILOS-AI-Institute/plc_client-open-source

Plc client open source runnable on CT
parents 1751d2a7 d2b1e651
......@@ -9,3 +9,4 @@ CodeElements/Plc_client/plc_client_os.py
CodeElements/Plc_client/__pycache__/*
CodeElements/Plc_client/proto_reader.py
CodeElements/Plc_client/plc_client.py
CodeElements/Plc_client/failed_proxy_plc/*
\ No newline at end of file
......@@ -85,5 +85,11 @@ $$
Notice a smoothing range can be set for congestion. This is only applied to congestion due to net routing which by counting adjacent cells and adding the averaged congestion to these adjacent cells. More details are provided in the document above.
## Placement Util
**Disclaimer: We DO NOT own the content of placement_util_os.py. All rights belong to Google Authors. This is a modified version of placement_util.py and we are including in the repo for the sake of testing. Original Code can be viewed [here](https://github.com/google-research/circuit_training/blob/main/circuit_training/environment/placement_util.py)**.
## Observation Extractor
**Disclaimer: We DO NOT own the content of observation_extractor_os.py. All rights belong to Google Authors. This is a modified version of observation_extractor.py and we are including in the repo for the sake of testing. Original Code can be viewed [here](https://github.com/google-research/circuit_training/blob/main/circuit_training/environment/observation_extractor.py)**.
# coding=utf-8
# Copyright 2021 The Circuit Training Team Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Coordinate descent placer library."""
import os
import time
from typing import Callable, Dict, Optional, List, Text, Tuple
from absl import logging
from Plc_client import placement_util_os as placement_util
from Plc_client import plc_client as plc_client
import numpy as np
NS_ORIENTATIONS = ['N', 'FN', 'S', 'FS']
EW_ORIENTATIONS = ['E', 'FE', 'W', 'FW']
class CoordinateDescentPlacer(object):
"""Coordinate descent algorithm to place nodes."""
def __init__(self,
plc: plc_client.PlacementCost,
cost_fn: Callable[[plc_client.PlacementCost],
Tuple[float, Dict[Text, float]]],
epochs: int = 10,
use_stdcell_placer: bool = False,
stdcell_placer: Text = 'fd',
node_order: Text = 'random',
accept_bad_stdcell_moves: bool = False,
stdcell_place_every_n_macros: int = 10,
optimize_only_orientation: bool = False,
cell_search_prob: float = 1.0,
k_distance_bounded_search: bool = True,
k_distance_bound: Optional[int] = None) -> None:
"""Creates a CoordinateDescentPlacer.
Args:
plc: The placement cost object.
cost_fn: The cost function that gets the plc and returns cost and info.
epochs: Number of epochs (iterations) in coordinate descend algorithm.
use_stdcell_placer: If True, places stdcells using stdcell placer.
stdcell_placer: Standad cell placer.
node_order: Order of nodes to place using coordinate descent. Choose
random, descending_size_macro_first, random_macro_first.
accept_bad_stdcell_moves: If True, accept stdcell moves even if it leads
to a higher cost.
stdcell_place_every_n_macros: Run stdcell placement for every n macros. If
None, run stdcell placement once after all macros are placed.
optimize_only_orientation: If True, only search for best orientation of
the hard macros.
cell_search_prob: The probability to include a neighborhood cell to
search. When it is 1.0, descents at the steepest direction.'
k_distance_bounded_search: If True, only search best locations within k
grid distance from current placed location. Does not apply to FD stdcell
placer.
k_distance_bound: If k_distance_bounded_search is True, only search within
a neighborhood of at most k_distance_bound grid distance. If not
spesified, it is set to max(cols, rows) // 3.
"""
self.plc = plc
self.cost_fn = cost_fn
self._epochs = epochs
self._node_order = node_order
self._stdcell_place_every_n_macros = stdcell_place_every_n_macros
self._cell_search_prob = cell_search_prob
self._cols, self._rows = self.plc.get_grid_num_columns_rows()
self._k_distance_bound = k_distance_bound or max(self._cols,
self._rows) // 3
self._use_stdcell_placer = use_stdcell_placer
self._stdcell_placer = stdcell_placer
self._accept_bad_stdcell_moves = accept_bad_stdcell_moves
self._optimize_only_orientation = optimize_only_orientation
self._k_distance_bounded_search = k_distance_bounded_search
if self._cell_search_prob < 0 or self._cell_search_prob > 1:
raise ValueError(f'{self._cell_search_prob} should be between 0 and 1.')
# Turn off incremental cost calculation if placing stdcells.
if self._use_stdcell_placer:
plc.set_use_incremental_cost(False)
# Get legal node orientations.
self._node_to_ori = {}
for node in self.plc.get_macro_indices():
if not self.plc.is_node_soft_macro(node):
# TODO(wenjiej): Find orientation when a node is not placed initially.
# Needed only when running CD from an empty grid.
assert self.plc.is_node_placed(node)
cur_ori = self.plc.get_macro_orientation(node)
if cur_ori in NS_ORIENTATIONS:
self._node_to_ori[node] = NS_ORIENTATIONS
elif cur_ori in EW_ORIENTATIONS:
self._node_to_ori[node] = EW_ORIENTATIONS
else:
raise ValueError(f'Unexpected orientation {cur_ori} for node {node}.')
if self._use_stdcell_placer:
plc.allow_hard_macros_over_std_cells(True)
# If node order is random, will shuffle node orders for each iteration.
self._ordered_node_indices = placement_util.get_ordered_node_indices(
self._node_order, self.plc)
# Exclude fixed macros with pre-determined locations.
self._ordered_node_indices = [
m for m in self._ordered_node_indices if not self.plc.is_node_fixed(m)
]
self._soft_macro_indices = [
m for m in self._ordered_node_indices if self.plc.is_node_soft_macro(m)
]
if self._use_stdcell_placer:
# Only include hard macros in self._ordered_node_indices.
self._ordered_node_indices = [
i for i in self._ordered_node_indices
if not self.plc.is_node_soft_macro(i)
]
logging.info('Total number of ordered nodes: %d',
len(self._ordered_node_indices))
logging.info('ordered_node_indices: %s', self._ordered_node_indices)
logging.info('Cost of initial placement: %s', self.report_cost())
def find_best_location(self, node: int, mask: List[int],
locations: List[int]) -> Optional[int]:
"""Given a soft macro, search the best location."""
best_loc = None
best_cost = float('inf')
for loc in locations:
assert mask[loc] == 1
self.plc.place_node(node, loc)
new_cost, _ = self.cost_fn(self.plc)
self.plc.unplace_node(node)
if new_cost < best_cost:
best_loc = loc
best_cost = new_cost
return best_loc
def find_best_location_orientation(
self, node: int, locations: List[int],
orientations: List[Text]) -> Tuple[Optional[int], Optional[Text]]:
"""Given a hard macro, search the best location and orientation."""
assert orientations
best_loc = None
best_ori = None
best_cost = float('inf')
for loc in locations:
for ori in orientations:
self.plc.place_node(node, loc)
self.plc.update_macro_orientation(node, ori)
new_cost, _ = self.cost_fn(self.plc)
self.plc.unplace_node(node)
if new_cost < best_cost:
best_loc = loc
best_ori = ori
best_cost = new_cost
return best_loc, best_ori
def find_best_orientation(self, node: int,
orientations: List[Text]) -> Optional[Text]:
"""Given a hard macro, search the best orientation."""
assert orientations
best_ori = None
best_cost = float('inf')
for ori in orientations:
self.plc.update_macro_orientation(node, ori)
new_cost, _ = self.cost_fn(self.plc)
if new_cost < best_cost:
best_ori = ori
best_cost = new_cost
return best_ori
def _get_row_col_from_cell(self, cell: int) -> Tuple[int, int]:
return cell // self._cols, cell % self._cols
def _get_cell_from_row_col(self, row: int, col: int) -> int:
return int(row * self._cols + col)
def _k_distance_bounded_locations(self, curr: int, k: int,
locations: List[int]) -> List[int]:
"""Find k grid distance bounded locations from current cell."""
curr_row, curr_col = self._get_row_col_from_cell(curr)
bounded = []
for c in locations:
if c == curr:
# Always include current location to search.
bounded.append(c)
continue
row, col = self._get_row_col_from_cell(c)
if abs(row - curr_row) + abs(col - curr_col) <= k:
if np.random.random() <= self._cell_search_prob:
bounded.append(c)
return bounded
def place_node(self, node: int) -> None:
"""Given a node, greedily place the node on the best location wrt cost."""
if not self.plc.is_node_soft_macro(node):
orientations = self._node_to_ori[node]
if self._optimize_only_orientation:
# Placing and unplacing macros cause wiered problems in FD.
# See cl/316830807. Avoid unplacing for orientation optimization.
best_ori = self.find_best_orientation(node, orientations)
self.plc.update_macro_orientation(node, best_ori)
return
# Unplace the node from its current location to prepare placing node.
curr_cell = self.plc.get_grid_cell_of_node(node)
self.plc.unplace_node(node)
mask = self.plc.get_node_mask(node)
locations = [i for i, m in enumerate(mask) if m > 0]
if not locations:
# FD or DP are run between macro moves (_stdcell_place_every_n_macros).
# They may place stdcells in a way that invalidates prior macro locations.
# Stay with previous macro locations in this case.
locations = [curr_cell]
logging.info(
'Cannot find feasible locations for node %d. '
'Use its current location %d.', node, curr_cell)
if self._k_distance_bounded_search:
k = self._k_distance_bound
# Increase search scope until there is at least one feasible location.
while True:
bounded = self._k_distance_bounded_locations(curr_cell, k, locations)
if bounded:
locations = bounded
break
else:
k += self._k_distance_bound
if self.plc.is_node_soft_macro(node):
best_loc = self.find_best_location(node, mask, locations)
self.plc.place_node(node, best_loc)
else:
best_loc, best_ori = self.find_best_location_orientation(
node, locations, orientations)
self.plc.place_node(node, best_loc)
self.plc.update_macro_orientation(node, best_ori)
def place_stdcells(self) -> None:
"""Place stdcells."""
logging.info('Place stdcells using %s', self._stdcell_placer)
old_cost, _ = self.cost_fn(self.plc)
old_coordinates = [
self.plc.get_node_location(m) for m in self._soft_macro_indices
]
if self._stdcell_placer == 'fd':
# Use default FD schedule.
# Use current stdcell location to incrementally change stdcell locations
# between iterations.
placement_util.fd_placement_schedule(self.plc, use_current_loc=True)
else:
raise ValueError(
f'stdcell placer {self._stdcell_placer} is not supported')
new_cost, _ = self.cost_fn(self.plc)
if new_cost > old_cost and not self._accept_bad_stdcell_moves:
logging.info('Bad stdcell placement moves not accepted.')
# Revert to old node coordinates.
for i, (x, y) in enumerate(old_coordinates):
self.plc.update_node_coords(self._soft_macro_indices[i], x, y)
def optimize(self, epoch: int) -> None:
"""Performs one iteration (epoch) of coordinate descent on all nodes."""
logging.info('Starts optimization in epoch %d.', epoch)
start_time = time.time()
node_indices = self._ordered_node_indices
if self._node_order == 'random':
np.random.shuffle(node_indices)
for i, node in enumerate(node_indices):
if i % 25 == 0:
logging.info('Number of nodes placed by CD: %d', i)
self.place_node(node)
if (self._use_stdcell_placer and self._stdcell_place_every_n_macros and
(i + 1) % self._stdcell_place_every_n_macros == 0):
self.place_stdcells()
# Always run stdcell placement after all macros are placed.
if self._use_stdcell_placer:
self.place_stdcells()
logging.info('One iteration of coordinate descent takes %f seconds.',
(time.time() - start_time))
def report_cost(self) -> Text:
proxy_cost, info = self.cost_fn(self.plc)
wirelength = info['wirelength']
congestion = info['congestion']
density = info['density']
return ('(Objective cost, wirelength, congestion, density): ' +
'({:.4f}, {:.4f}, {:.4f}, {:.4f}'.format(proxy_cost, wirelength,
congestion, density))
def place(self) -> None:
"""Place all nodes using coordinate descent algorithm for some iterations."""
# Run stdcell placement at the beginning of the optimization loop if needed.
# Use stdcell locations from initial placement.
if self._use_stdcell_placer:
self.place_stdcells()
prev_cost, _ = self.cost_fn(self.plc)
for i in range(self._epochs):
self.optimize(i)
logging.info('Cost after %d epochs: %s', i + 1, self.report_cost())
curr_cost, _ = self.cost_fn(self.plc)
if (prev_cost - curr_cost) / prev_cost < 1e-3:
break
prev_cost = curr_cost
def save_placement(self, output_dir: Text, plc_filename: Text) -> None:
"""Saves a placement with current plc."""
proxy_cost, info = self.cost_fn(self.plc)
wirelength = info['wirelength']
congestion = info['congestion']
density = info['density']
plc_filename_with_cost = 'cost_{:.4f}_w_{:.4f}_c_{:.4f}_d_{:.4f}_{}'.format(
proxy_cost, wirelength, congestion, density, plc_filename)
output_plc_file = os.path.join(output_dir, plc_filename_with_cost)
placement_util.save_placement(self.plc, output_plc_file)
# TODO(wenjiej): Enable saving plc view.
# placement_util.save_as_svg(self.plc, f'{output_plc_file}.svg')
\ No newline at end of file
# coding=utf-8
# Copyright 2021 The Circuit Training Team Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Circuit training Environmnet with gin config."""
import datetime
import math
import os
from typing import Any, Callable, Dict, Text, Tuple, Optional
from absl import logging
from Plc_client import coordinate_descent_placer as cd_placer
from Plc_client import observation_config
from Plc_client import observation_extractor_os as observation_extractor
from Plc_client import placement_util_os as placement_util
from Plc_client import plc_client as plc_client
from Plc_client import plc_client_os as plc_client_os
import gin
import gym
import numpy as np
import tensorflow as tf
from inspect import currentframe, getframeinfo
# from tf_agents.environments import suite_gym
# from tf_agents.environments import wrappers
ObsType = Dict[Text, np.ndarray]
InfoType = Dict[Text, float]
class InfeasibleActionError(ValueError):
"""An infeasible action were passed to the env."""
def __init__(self, action, mask):
"""Initialize an infeasible action error.
Args:
action: Infeasible action that was performed.
mask: The mask associated with the current observation. mask[action] is
`0` for infeasible actions.
"""
ValueError.__init__(self, action, mask)
self.action = action
self.mask = mask
def __str__(self):
return 'Infeasible action (%s) when the mask is (%s)' % (self.action,
self.mask)
@gin.configurable
def cost_info_function(
plc: plc_client.PlacementCost,
done: bool,
wirelength_weight: float = 1.0,
density_weight: float = 0.5,
congestion_weight: float = 0.5) -> Tuple[float, Dict[Text, float]]:
"""Returns the RL cost and info.
Args:
plc: Placement cost object.
done: Set if it is the terminal step.
wirelength_weight: Weight of wirelength in the reward function.
density_weight: Weight of density in the reward function.
congestion_weight: Weight of congestion in the reward function used only for
legalizing the placement in greedy std cell placer.
Returns:
The RL cost.
Raises:
ValueError: When the cost mode is not supported.
Notes: we found the default congestion and density weights more stable.
"""
proxy_cost = 0.0
if not done:
return proxy_cost, {
'wirelength': -1.0,
'congestion': -1.0,
'density': -1.0,
}
wirelength = -1.0
congestion = -1.0
density = -1.0
if wirelength_weight > 0.0:
wirelength = plc.get_cost()
proxy_cost += wirelength_weight * wirelength
if congestion_weight > 0.0:
congestion = plc.get_congestion_cost()
proxy_cost += congestion_weight * congestion
if density_weight > 0.0:
density = plc.get_density_cost()
proxy_cost += density_weight * density
info = {
'wirelength': wirelength,
'congestion': congestion,
'density': density,
}
return proxy_cost, info
@gin.configurable
class CircuitEnv(object):
"""Defines the CircuitEnv class."""
INFEASIBLE_REWARD = -1.0
def __init__(
self,
_plc_client = None,
netlist_file: Text = '',
init_placement: Text = '',
_plc = None,
create_placement_cost_fn: Callable[
..., plc_client.PlacementCost] = placement_util.create_placement_cost,
std_cell_placer_mode: Text = 'fd',
cost_info_fn: Callable[[plc_client.PlacementCost, bool],
Tuple[float, Dict[Text,
float]]] = cost_info_function,
global_seed: int = 0,
is_eval: bool = False,
save_best_cost: bool = False,
output_plc_file: Text = '',
make_soft_macros_square: bool = True,
cd_finetune: bool = False,
cd_plc_file: Text = 'ppo_cd_placement.plc',
train_step: Optional[tf.Variable] = None,
unplace_all_nodes_in_init: bool = True):
"""Creates a CircuitEnv.
Args:
netlist_file: Path to the input netlist file.
init_placement: Path to the input inital placement file, used to read grid
and canas size.
create_placement_cost_fn: A function that given the netlist and initial
placement file create the placement_cost object.
std_cell_placer_mode: Options for fast std cells placement: `fd` (uses the
force-directed algorithm).
cost_info_fn: The cost function that given the plc object returns the RL
cost.
global_seed: Global seed for initializing env features. This seed should
be the same across actors. Not used currently.
is_eval: If set, save the final placement in output_dir.
save_best_cost: Boolean, if set, saves the palcement if its cost is better
than the previously saved palcement.
output_plc_file: The path to save the final placement.
make_soft_macros_square: If True, make the shape of soft macros square
before using analytical std cell placers like FD.
cd_finetune: If True, runs coordinate descent to finetune macro
orientations. Supposed to run in eval only, not training.
cd_plc_file: Name of the CD fine-tuned plc file, the file will be save in
the same dir as output_plc_file
train_step: A tf.Variable indicating the training step, only used for
saving plc files in the evaluation.
unplace_all_nodes_in_init: Unplace all nodes after initialization.
"""
del global_seed
if not netlist_file:
raise ValueError('netlist_file must be provided.')
self.netlist_file = netlist_file
self._std_cell_placer_mode = std_cell_placer_mode
self._cost_info_fn = cost_info_fn
self._is_eval = is_eval
self._save_best_cost = save_best_cost
self._output_plc_file = output_plc_file
self._output_plc_dir = os.path.dirname(output_plc_file)
self._make_soft_macros_square = make_soft_macros_square
self._cd_finetune = cd_finetune
self._cd_plc_file = cd_plc_file
self._train_step = train_step
self._plc = create_placement_cost_fn(plc_client=_plc,
netlist_file=netlist_file, init_placement=init_placement)
# We call ObservationExtractor before unplace_all_nodes, so we use the
# inital placement in the static features (location_x and location_y).
# This results in better placements.
self._observation_config = observation_config.ObservationConfig()
self._observation_extractor = observation_extractor.ObservationExtractor(
plc=self._plc)
# print(getframeinfo(currentframe()).lineno, '\n', np.array(self._plc.get_node_mask(13333)).reshape(35,33))
if self._make_soft_macros_square:
# It is better to make the shape of soft macros square before using
# analytical std cell placers like FD.
self._plc.make_soft_macros_square()
self._grid_cols, self._grid_rows = self._plc.get_grid_num_columns_rows()
self._canvas_width, self._canvas_height = self._plc.get_canvas_width_height(
)
self._hard_macro_indices = [
m for m in self._plc.get_macro_indices()
if not self._plc.is_node_soft_macro(m)
]
self._num_hard_macros = len(self._hard_macro_indices)
# sorted by size
self._sorted_node_indices = placement_util.get_ordered_node_indices(
mode='descending_size_macro_first', plc=self._plc)
# print(len(self._sorted_node_indices))
self._sorted_soft_macros = self._sorted_node_indices[self._num_hard_macros:]
# print(len(self._sorted_soft_macros))
# Generate a map from actual macro_index to its position in
# self.macro_indices. Needed because node adjacency matrix is in the same
# node order of plc.get_macro_indices.
self._macro_index_to_pos = {}
for i, macro_index in enumerate(self._plc.get_macro_indices()):
self._macro_index_to_pos[macro_index] = i
# Padding for mapping the placement canvas on the agent canvas.
rows_pad = self._observation_config.max_grid_size - self._grid_rows
cols_pad = self._observation_config.max_grid_size - self._grid_cols
self._up_pad = rows_pad // 2
self._right_pad = cols_pad // 2
self._low_pad = rows_pad - self._up_pad
self._left_pad = cols_pad - self._right_pad
self._saved_cost = np.inf
self._current_actions = []
self._current_node = 0
self._done = False
self._current_mask = self._get_mask()
# print(getframeinfo(currentframe()).lineno, '\n', np.array(self._plc.get_node_mask(0)).reshape(35,33))
self._infeasible_state = False
if unplace_all_nodes_in_init:
# TODO(b/223026568) Remove unplace_all_nodes from init
self._plc.unplace_all_nodes()
logging.warning('* Unplaced all Nodes in init *')
logging.info('***Num node to place***:%s', self._num_hard_macros)
@property
def observation_space(self) -> gym.spaces.Space:
"""Env Observation space."""
return self._observation_config.observation_space
@property
def action_space(self) -> gym.spaces.Space:
return gym.spaces.Discrete(self._observation_config.max_grid_size**2)
@property
def environment_name(self) -> Text:
return self.netlist_file
def get_static_obs(self):
"""Get the static observation for the environment.
Static observations are invariant across steps on the same netlist, such as
netlist metadata and the adj graphs. This should only be used for
generalized RL.
Returns:
Numpy array representing the observation
"""
return self._observation_extractor.get_static_features()
def get_cost_info(self,
done: bool = False) -> Tuple[float, Dict[Text, float]]:
return self._cost_info_fn(plc=self._plc, done=done) # pytype: disable=wrong-keyword-args # trace-all-classes
def _get_mask(self) -> np.ndarray:
"""Gets the node mask for the current node.
Returns:
List of 0s and 1s indicating if action is feasible or not.
"""
if self._done:
# reset the board
mask = np.zeros(self._observation_config.max_grid_size**2, dtype=np.int32)
else:
node_index = self._sorted_node_indices[self._current_node]
mask = np.asarray(self._plc.get_node_mask(node_index), dtype=np.int32)
# print("current node mask: \n", mask.reshape(35, 33))
mask = np.reshape(mask, [self._grid_rows, self._grid_cols])
pad = ((self._up_pad, self._low_pad), (self._right_pad, self._left_pad))
mask = np.pad(mask, pad, mode='constant', constant_values=0)
return np.reshape(
mask, (self._observation_config.max_grid_size**2,)).astype(np.int32)
def _get_obs(self) -> ObsType:
"""Returns the observation."""
if self._current_node > 0:
previous_node_sorted = self._sorted_node_indices[self._current_node - 1]
previous_node_index = self._macro_index_to_pos[previous_node_sorted]
else:
previous_node_index = -1
if self._current_node < self._num_hard_macros:
current_node_sorted = self._sorted_node_indices[self._current_node]
current_node_index = self._macro_index_to_pos[current_node_sorted]
else:
current_node_index = 0
# YW EDIT: get observation
res = self._observation_extractor.get_all_features(
previous_node_index=previous_node_index,
current_node_index=current_node_index,
mask=self._current_mask)
# self._plc.set_routes_per_micron(44, 44) Does not do anything
# logging.info("####### Observation Extractor #######")
# print(res)
# logging.info("####### End #######")
return res
def _run_cd(self):
"""Runs coordinate descent to finetune the current placement."""
# CD only modifies macro orientation.
# Plc modified by CD will be reset at the end of the episode.
def cost_fn(plc):
return self._cost_info_fn(plc=plc, done=True) # pytype: disable=wrong-keyword-args # trace-all-classes
cd = cd_placer.CoordinateDescentPlacer(
plc=self._plc,
cost_fn=cost_fn,
use_stdcell_placer=True,
optimize_only_orientation=True)
cd.place()
def _save_placement(self, cost: float) -> None:
"""Saves the current placement.
Args:
cost: the current placement cost.
Raises:
IOError: If we cannot write the placement to file.
"""
if not self._save_best_cost or (cost < self._saved_cost and
(math.fabs(cost - self._saved_cost) /
(cost) > 5e-3)):
user_comments = ''
if self._train_step:
user_comments = f'Train step : {self._train_step.numpy()}'
placement_util.save_placement(self._plc, self._output_plc_file,
user_comments)
ts = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
ppo_snapshot_file = os.path.join(
self._output_plc_dir,
f'snapshot_ppo_opt_placement_timestamp_{ts}_cost_{cost:.4f}.plc')
placement_util.save_placement(self._plc, ppo_snapshot_file, user_comments)
self._saved_cost = cost
# Only runs CD if this is the best RL placement seen so far.
if self._cd_finetune:
self._run_cd()
cost = self._cost_info_fn(plc=self._plc, done=True)[0] # pytype: disable=wrong-keyword-args # trace-all-classes
cd_plc_file = os.path.join(self._output_plc_dir, self._cd_plc_file)
placement_util.save_placement(self._plc, cd_plc_file, user_comments)
cd_snapshot_file = os.path.join(
self._output_plc_dir,
f'snapshot_ppo_cd_placement_timestamp_{ts}_cost_{cost:.4f}.plc')
placement_util.save_placement(self._plc, cd_snapshot_file,
user_comments)
def call_analytical_placer_and_get_cost(self):
"""Calls analytical placer.
Calls analystical placer and evaluates cost when all nodes are placed. Also,
saves the placement file for eval if all the macros are placed by RL.
Returns:
A tuple for placement cost and info.
"""
if self._done:
self.analytical_placer()
# Only evaluates placement cost when all nodes are placed.
# All samples in the episode receive the same reward equal to final cost.
# This is realized by setting intermediate steps cost as zero, and
# propagate the final cost with discount factor set to 1 in replay buffer.
cost, info = self._cost_info_fn(self._plc, self._done)
# We only save placement if all nodes by placed RL, because the dreamplace
# mix-sized placement may not be legal.
if self._current_node == self._num_hard_macros and self._is_eval:
self._save_placement(cost)
return -cost, info
def reset(self) -> ObsType:
"""Restes the environment.
Returns:
An initial observation.
"""
self._plc.unplace_all_nodes()
self._current_actions = []
self._current_node = 0
self._done = False
self._current_mask = self._get_mask()
return self._get_obs()
def translate_to_original_canvas(self, action: int) -> int:
"""Translates a raw location to real one in the original canvas."""
up_pad = (self._observation_config.max_grid_size - self._grid_rows) // 2
right_pad = (self._observation_config.max_grid_size - self._grid_cols) // 2
a_i = action // self._observation_config.max_grid_size - up_pad
a_j = action % self._observation_config.max_grid_size - right_pad
if 0 <= a_i < self._grid_rows or 0 <= a_j < self._grid_cols:
action = a_i * self._grid_cols + a_j
else:
raise InfeasibleActionError(action, self._current_mask)
return action
def place_node(self, node_index: int, action: int) -> None:
self._plc.place_node(node_index, self.translate_to_original_canvas(action))
def analytical_placer(self) -> None:
if self._std_cell_placer_mode == 'fd':
placement_util.fd_placement_schedule(self._plc)
else:
raise ValueError('%s is not a supported std_cell_placer_mode.' %
(self._std_cell_placer_mode))
def step(self, action: int) -> Tuple[ObsType, float, bool, Any]:
"""Steps the environment.
Args:
action: The action to take (should be a list of size 1).
Returns:
observation, reward, done, and info.
Raises:
RuntimeError: action taken after episode was done
InfeasibleActionError: bad action taken (action is not in feasible
actions)
"""
if self._done:
raise RuntimeError('Action taken after episode is done.')
action = int(action)
self._current_actions.append(action)
if self._current_mask[action] == 0:
raise InfeasibleActionError(action, self._current_mask)
node_index = self._sorted_node_indices[self._current_node]
self.place_node(node_index, action)
self._current_node += 1
self._done = (self._current_node == self._num_hard_macros)
self._current_mask = self._get_mask()
if not self._done and not np.any(self._current_mask):
# Please note that _infeasible_state is not reset in reset function so,
# the caller of step() is responsible for resetting it.
self._infeasible_state = True
logging.info('Actions took before becoming infeasible: %s',
self._current_actions)
info = {
'wirelength': -1.0,
'congestion': -1.0,
'density': -1.0,
}
return self.reset(), self.INFEASIBLE_REWARD, True, info
cost, info = self.call_analytical_placer_and_get_cost()
return self._get_obs(), cost, self._done, info
# def create_circuit_environment(*args, **kwarg) -> wrappers.ActionClipWrapper:
# """Create an `CircuitEnv` wrapped as a Gym environment.
# Args:
# *args: Arguments.
# **kwarg: keyworded Arguments.
# Returns:
# PyEnvironment used for training.
# """
# env = CircuitEnv(*args, **kwarg)
# return wrappers.ActionClipWrapper(suite_gym.wrap_env(env))
# coding=utf-8
# Copyright 2021 The Circuit Training Team Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A class to store the observation shape and sizes."""
from typing import Dict, List, Optional, Text, Tuple, Union
import gin
import gym
import numpy as np
import tensorflow as tf
TensorType = Union[np.ndarray, tf.Tensor]
FeatureKeyType = Union[List[Text], Tuple[Text, ...]]
HARD_MACRO = 1
SOFT_MACRO = 2
PORT_CLUSTER = 3
NETLIST_METADATA = (
'normalized_num_edges',
'normalized_num_hard_macros',
'normalized_num_soft_macros',
'normalized_num_port_clusters',
'horizontal_routes_per_micron',
'vertical_routes_per_micron',
'macro_horizontal_routing_allocation',
'macro_vertical_routing_allocation',
'grid_cols',
'grid_rows',
)
GRAPH_ADJACENCY_MATRIX = ('sparse_adj_i', 'sparse_adj_j', 'sparse_adj_weight',
'edge_counts')
NODE_STATIC_FEATURES = (
'macros_w',
'macros_h',
'node_types',
)
STATIC_OBSERVATIONS = (
NETLIST_METADATA + GRAPH_ADJACENCY_MATRIX + NODE_STATIC_FEATURES)
INITIAL_DYNAMIC_OBSERVATIONS = (
'locations_x',
'locations_y',
'is_node_placed',
)
DYNAMIC_OBSERVATIONS = (
'locations_x',
'locations_y',
'is_node_placed',
'current_node',
'mask',
)
ALL_OBSERVATIONS = STATIC_OBSERVATIONS + DYNAMIC_OBSERVATIONS
INITIAL_OBSERVATIONS = STATIC_OBSERVATIONS + INITIAL_DYNAMIC_OBSERVATIONS
@gin.configurable
class ObservationConfig(object):
"""A class that contains shared configs for observation."""
# The default numbers are the maximum number of nodes, edges, and grid size
# on a set of TPU blocks.
# Large numbers may cause GPU/TPU OOM during training.
def __init__(self,
max_num_nodes: int = 5000,
max_num_edges: int = 28400,
max_grid_size: int = 128):
self.max_num_edges = max_num_edges
self.max_num_nodes = max_num_nodes
self.max_grid_size = max_grid_size
@property
def observation_space(self) -> gym.spaces.Space:
"""Env Observation space."""
return gym.spaces.Dict({
'normalized_num_edges':
gym.spaces.Box(low=0, high=1, shape=(1,)),
'normalized_num_hard_macros':
gym.spaces.Box(low=0, high=1, shape=(1,)),
'normalized_num_soft_macros':
gym.spaces.Box(low=0, high=1, shape=(1,)),
'normalized_num_port_clusters':
gym.spaces.Box(low=0, high=1, shape=(1,)),
'horizontal_routes_per_micron':
gym.spaces.Box(low=0, high=100, shape=(1,)),
'vertical_routes_per_micron':
gym.spaces.Box(low=0, high=100, shape=(1,)),
'macro_horizontal_routing_allocation':
gym.spaces.Box(low=0, high=100, shape=(1,)),
'macro_vertical_routing_allocation':
gym.spaces.Box(low=0, high=100, shape=(1,)),
'sparse_adj_weight':
gym.spaces.Box(low=0, high=100, shape=(self.max_num_edges,)),
'sparse_adj_i':
gym.spaces.Box(
low=0,
high=self.max_num_nodes - 1,
shape=(self.max_num_edges,),
dtype=np.int32),
'sparse_adj_j':
gym.spaces.Box(
low=0,
high=self.max_num_nodes - 1,
shape=(self.max_num_edges,),
dtype=np.int32),
'edge_counts':
gym.spaces.Box(
low=0,
high=self.max_num_edges - 1,
shape=(self.max_num_nodes,),
dtype=np.int32),
'node_types':
gym.spaces.Box(
low=0, high=3, shape=(self.max_num_nodes,), dtype=np.int32),
'is_node_placed':
gym.spaces.Box(
low=0, high=1, shape=(self.max_num_nodes,), dtype=np.int32),
'macros_w':
gym.spaces.Box(low=0, high=1, shape=(self.max_num_nodes,)),
'macros_h':
gym.spaces.Box(low=0, high=1, shape=(self.max_num_nodes,)),
'locations_x':
gym.spaces.Box(low=0, high=1, shape=(self.max_num_nodes,)),
'locations_y':
gym.spaces.Box(low=0, high=1, shape=(self.max_num_nodes,)),
'grid_cols':
gym.spaces.Box(low=0, high=1, shape=(1,)),
'grid_rows':
gym.spaces.Box(low=0, high=1, shape=(1,)),
'current_node':
gym.spaces.Box(
low=0, high=self.max_num_nodes - 1, shape=(1,), dtype=np.int32),
'mask':
gym.spaces.Box(
low=0, high=1, shape=(self.max_grid_size**2,), dtype=np.int32),
})
def _to_dict(
flatten_obs: TensorType,
keys: FeatureKeyType,
observation_config: Optional[ObservationConfig] = None
) -> Dict[Text, TensorType]:
"""Unflatten the observation to a dictionary."""
if observation_config:
obs_space = observation_config.observation_space
else:
obs_space = ObservationConfig().observation_space
splits = [obs_space[k].shape[0] for k in keys]
splitted_obs = tf.split(flatten_obs, splits, axis=-1)
return {k: o for o, k in zip(splitted_obs, keys)}
def _flatten(dict_obs: Dict[Text, TensorType],
keys: FeatureKeyType) -> TensorType:
out = [np.asarray(dict_obs[k]) for k in keys]
return np.concatenate(out, axis=-1)
def flatten_static(dict_obs: Dict[Text, TensorType]) -> TensorType:
return _flatten(dict_obs=dict_obs, keys=STATIC_OBSERVATIONS)
def flatten_dynamic(dict_obs: Dict[Text, TensorType]) -> TensorType:
return _flatten(dict_obs=dict_obs, keys=DYNAMIC_OBSERVATIONS)
def flatten_all(dict_obs: Dict[Text, TensorType]) -> TensorType:
return _flatten(dict_obs=dict_obs, keys=ALL_OBSERVATIONS)
def flatten_initial(dict_obs: Dict[Text, TensorType]) -> TensorType:
return _flatten(dict_obs=dict_obs, keys=INITIAL_OBSERVATIONS)
def to_dict_static(
flatten_obs: TensorType,
observation_config: Optional[ObservationConfig] = None
) -> Dict[Text, TensorType]:
"""Convert the flattend numpy array of static observations back to a dict.
Args:
flatten_obs: a numpy array of static observations.
observation_config: Optional observation config.
Returns:
A dict representation of the observations.
"""
return _to_dict(
flatten_obs=flatten_obs,
keys=STATIC_OBSERVATIONS,
observation_config=observation_config)
def to_dict_dynamic(
flatten_obs: TensorType,
observation_config: Optional[ObservationConfig] = None
) -> Dict[Text, TensorType]:
"""Convert the flattend numpy array of dynamic observations back to a dict.
Args:
flatten_obs: a numpy array of dynamic observations.
observation_config: Optional observation config.
Returns:
A dict representation of the observations.
"""
return _to_dict(
flatten_obs=flatten_obs,
keys=DYNAMIC_OBSERVATIONS,
observation_config=observation_config)
def to_dict_all(
flatten_obs: TensorType,
observation_config: Optional[ObservationConfig] = None
) -> Dict[Text, TensorType]:
"""Convert the flattend numpy array of observations back to a dict.
Args:
flatten_obs: a numpy array of observations.
observation_config: Optional observation config.
Returns:
A dict representation of the observations.
"""
return _to_dict(
flatten_obs=flatten_obs,
keys=ALL_OBSERVATIONS,
observation_config=observation_config)
\ No newline at end of file
# coding=utf-8
# Copyright 2021 The Circuit Training Team Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This class extracts features from observations."""
from typing import Dict, Optional, Text, Tuple
from Plc_client import observation_config as observation_config_lib
from Plc_client import plc_client
import gin
import numpy as np
@gin.configurable
class ObservationExtractor(object):
"""Extracts observation features from plc."""
EPSILON = 1E-6
def __init__(self,
plc: plc_client.PlacementCost,
observation_config: Optional[
observation_config_lib.ObservationConfig] = None,
default_location_x: float = 0.5,
default_location_y: float = 0.5):
self.plc = plc
self._observation_config = (
observation_config or observation_config_lib.ObservationConfig())
self._default_location_x = default_location_x
self._default_location_y = default_location_y
self.width, self.height = self.plc.get_canvas_width_height()
self.num_cols, self.num_rows = self.plc.get_grid_num_columns_rows()
self.grid_width = self.width / self.num_cols
self.grid_height = self.height / self.num_rows
# Since there are too many I/O ports, we have to cluster them together to
# make it manageable for the model to process. The ports that are located in
# the same grid cell are clustered togheter.
self.adj_vec, grid_cell_of_clustered_ports_vec = self.plc.get_macro_and_clustered_port_adjacency(
)
self.clustered_port_locations_vec = [
self._get_clustered_port_locations(i)
for i in grid_cell_of_clustered_ports_vec
]
# Extract static features.
self._features = self._extract_static_features()
# done
def _extract_static_features(self) -> Dict[Text, np.ndarray]:
"""Static features that are invariant across training steps."""
features = dict()
self._extract_num_macros(features)
self._extract_technology_info(features)
self._extract_node_types(features)
self._extract_macro_size(features)
self._extract_macro_and_port_adj_matrix(features)
self._extract_canvas_size(features)
self._extract_grid_size(features)
self._extract_initial_node_locations(features)
self._extract_normalized_static_features(features)
return features
# done
def _extract_normalized_static_features(
self, features: Dict[Text, np.ndarray]) -> None:
"""Normalizes static features."""
self._add_netlist_metadata(features)
self._normalize_adj_matrix(features)
self._pad_adj_matrix(features)
self._pad_macro_static_features(features)
self._normalize_macro_size_by_canvas(features)
self._normalize_grid_size(features)
self._normalize_locations_by_canvas(features)
self._replace_unplace_node_location(features)
self._pad_macro_dynamic_features(features)
# done
def _extract_num_macros(self, features: Dict[Text, np.ndarray]) -> None:
features['num_macros'] = np.asarray([len(self.plc.get_macro_indices())
]).astype(np.int32)
# done
def _extract_technology_info(self, features: Dict[Text, np.ndarray]) -> None:
"""Extracts Technology-related information."""
routing_resources = {
'horizontal_routes_per_micron':
self.plc.get_routes_per_micron()[0],
'vertical_routes_per_micron':
self.plc.get_routes_per_micron()[1],
'macro_horizontal_routing_allocation':
self.plc.get_macro_routing_allocation()[0],
'macro_vertical_routing_allocation':
self.plc.get_macro_routing_allocation()[0],
}
for k in routing_resources:
features[k] = np.asarray([routing_resources[k]]).astype(np.float32)
# done
def _extract_initial_node_locations(self, features: Dict[Text,
np.ndarray]) -> None:
"""Extracts initial node locations."""
locations_x = []
locations_y = []
is_node_placed = []
for macro_idx in self.plc.get_macro_indices():
x, y = self.plc.get_node_location(macro_idx)
locations_x.append(x)
locations_y.append(y)
is_node_placed.append(1 if self.plc.is_node_placed(macro_idx) else 0)
for x, y in self.clustered_port_locations_vec:
locations_x.append(x)
locations_y.append(y)
is_node_placed.append(1)
features['locations_x'] = np.asarray(locations_x).astype(np.float32)
features['locations_y'] = np.asarray(locations_y).astype(np.float32)
features['is_node_placed'] = np.asarray(is_node_placed).astype(np.int32)
# done
def _extract_node_types(self, features: Dict[Text, np.ndarray]) -> None:
"""Extracts node types."""
types = []
for macro_idx in self.plc.get_macro_indices():
if self.plc.is_node_soft_macro(macro_idx):
types.append(observation_config_lib.SOFT_MACRO)
else:
types.append(observation_config_lib.HARD_MACRO)
for _ in range(len(self.clustered_port_locations_vec)):
types.append(observation_config_lib.PORT_CLUSTER)
features['node_types'] = np.asarray(types).astype(np.int32)
def _extract_macro_size(self, features: Dict[Text, np.ndarray]) -> None:
"""Extracts macro sizes."""
macros_w = []
macros_h = []
for macro_idx in self.plc.get_macro_indices():
if self.plc.is_node_soft_macro(macro_idx):
# Width and height of soft macros are set to zero.
width = 0
height = 0
else:
width, height = self.plc.get_node_width_height(macro_idx)
macros_w.append(width)
macros_h.append(height)
for _ in range(len(self.clustered_port_locations_vec)):
macros_w.append(0)
macros_h.append(0)
features['macros_w'] = np.asarray(macros_w).astype(np.float32)
features['macros_h'] = np.asarray(macros_h).astype(np.float32)
# done
def _extract_macro_and_port_adj_matrix(
self, features: Dict[Text, np.ndarray]) -> None:
"""Extracts adjacency matrix."""
num_nodes = len(self.plc.get_macro_indices()) + len(
self.clustered_port_locations_vec)
assert num_nodes * num_nodes == len(self.adj_vec)
sparse_adj_i = []
sparse_adj_j = []
sparse_adj_weight = []
edge_counts = np.zeros((self._observation_config.max_num_nodes,),
dtype=np.int32) # issue with determine max_num_nodes
for i in range(num_nodes):
for j in range(i + 1, num_nodes):
weight = self.adj_vec[i + num_nodes * j]
if weight > 0:
sparse_adj_i.append(i)
sparse_adj_j.append(j)
sparse_adj_weight.append(weight)
edge_counts[i] += 1
edge_counts[j] += 1
features['sparse_adj_i'] = np.asarray(sparse_adj_i).astype(np.int32)
features['sparse_adj_j'] = np.asarray(sparse_adj_j).astype(np.int32)
features['sparse_adj_weight'] = np.asarray(sparse_adj_weight).astype(
np.float32)
features['edge_counts'] = edge_counts
# if not enough edges
# print("edge_counts ", np.sum(features['edge_counts'])) # 16624
# done
def _extract_canvas_size(self, features: Dict[Text, np.ndarray]) -> None:
features['canvas_width'] = np.asarray([self.width])
features['canvas_height'] = np.asarray([self.height])
# done
def _extract_grid_size(self, features: Dict[Text, np.ndarray]) -> None:
features['grid_cols'] = np.asarray([self.num_cols]).astype(np.float32)
features['grid_rows'] = np.asarray([self.num_rows]).astype(np.float32)
# done
def _get_clustered_port_locations(
self, grid_cell_index: int) -> Tuple[float, float]:
"""Returns clustered port locations.
This function returns an approximation location of the ports in a grid
cell. Depending on the cell location in the canvas, the approximation
differs.
Args:
grid_cell_index: The index of the grid cell where the cluster port is
located.
Returns:
A tuple of float: Approximate x, y location of the port cluster in the
grid cell in the same unit as canvas width and height (micron).
"""
col = grid_cell_index % self.num_cols
row = grid_cell_index // self.num_cols
if col == 0 and row == 0:
return 0, 0
elif col == 0 and row == self.num_rows - 1:
return 0, self.height
elif col == self.num_cols - 1 and row == 0:
return self.width, 0
elif col == self.num_cols - 1 and row == self.num_rows - 1:
return self.width, self.height
elif col == 0:
return 0, (row + 0.5) * self.grid_height
elif col == self.num_cols - 1:
return self.width, (row + 0.5) * self.grid_height
elif row == 0:
return (col + 0.5) * self.grid_width, 0
elif row == self.num_rows - 1:
return (col + 0.5) * self.grid_width, self.height
else:
return (col + 0.5) * self.grid_width, (row + 0.5) * self.grid_height
def _add_netlist_metadata(self, features: Dict[Text, np.ndarray]) -> None:
"""Adds netlist metadata info."""
features['normalized_num_edges'] = np.asarray([
np.sum(features['sparse_adj_weight']) /
self._observation_config.max_num_edges
]).astype(np.float32)
features['normalized_num_hard_macros'] = np.asarray([
np.sum(
np.equal(features['node_types'],
observation_config_lib.HARD_MACRO).astype(np.float32)) /
self._observation_config.max_num_nodes
]).astype(np.float32)
features['normalized_num_soft_macros'] = np.asarray([
np.sum(
np.equal(features['node_types'],
observation_config_lib.SOFT_MACRO).astype(np.float32)) /
self._observation_config.max_num_nodes
]).astype(np.float32)
features['normalized_num_port_clusters'] = np.asarray([
np.sum(
np.equal(features['node_types'],
observation_config_lib.PORT_CLUSTER).astype(np.float32)) /
self._observation_config.max_num_nodes
]).astype(np.float32)
def _normalize_adj_matrix(self, features: Dict[Text, np.ndarray]) -> None:
"""Normalizes adj matrix weights."""
mean_weight = np.mean(features['sparse_adj_weight'])
features['sparse_adj_weight'] = (
features['sparse_adj_weight'] /
(mean_weight + ObservationExtractor.EPSILON)).astype(np.float32)
def _pad_1d_tensor(self, tensor: np.ndarray, pad_size: int) -> np.ndarray:
if (pad_size - tensor.shape[0]) < 0:
print("padding not applied", pad_size, tensor.shape[0])
return np.pad(
tensor, (0, 0),
mode='constant',
constant_values=0)
else:
return np.pad(
tensor, (0, pad_size - tensor.shape[0]),
mode='constant',
constant_values=0)
def _pad_adj_matrix(self, features: Dict[Text, np.ndarray]) -> None:
"""Pads indices and weights with zero to make their shape known."""
for var in ['sparse_adj_i', 'sparse_adj_j', 'sparse_adj_weight']:
features[var] = self._pad_1d_tensor(
features[var], self._observation_config.max_num_edges)
def _pad_macro_static_features(self, features: Dict[Text,
np.ndarray]) -> None:
"""Pads macro features to make their shape knwon."""
for var in [
'macros_w',
'macros_h',
'node_types',
]:
features[var] = self._pad_1d_tensor(
features[var], self._observation_config.max_num_nodes)
def _pad_macro_dynamic_features(self, features: Dict[Text,
np.ndarray]) -> None:
"""Pads macro features to make their shape knwon."""
for var in [
'locations_x',
'locations_y',
'is_node_placed',
]:
features[var] = self._pad_1d_tensor(
features[var], self._observation_config.max_num_nodes)
def _normalize_grid_size(self, features: Dict[Text, np.ndarray]) -> None:
features['grid_cols'] = (features['grid_cols'] /
self._observation_config.max_grid_size).astype(
np.float32)
features['grid_rows'] = (features['grid_rows'] /
self._observation_config.max_grid_size).astype(
np.float32)
def _normalize_macro_size_by_canvas(self, features: Dict[Text,
np.ndarray]) -> None:
"""Normalizes macro sizes with the canvas size."""
features['macros_w'] = (
features['macros_w'] /
(features['canvas_width'] + ObservationExtractor.EPSILON)).astype(
np.float32)
features['macros_h'] = (
features['macros_h'] /
(features['canvas_height'] + ObservationExtractor.EPSILON)).astype(
np.float32)
def _normalize_locations_by_canvas(self, features: Dict[Text,
np.ndarray]) -> None:
"""Normalizes locations with the canvas size."""
features['locations_x'] = (
features['locations_x'] /
(features['canvas_width'] + ObservationExtractor.EPSILON)).astype(
np.float32)
features['locations_y'] = (
features['locations_y'] /
(features['canvas_height'] + ObservationExtractor.EPSILON)).astype(
np.float32)
def _replace_unplace_node_location(self, features: Dict[Text,
np.ndarray]) -> None:
"""Replace the location of the unplaced macros with a constant."""
is_node_placed = np.equal(features['is_node_placed'], 1)
features['locations_x'] = np.where(
is_node_placed,
features['locations_x'],
self._default_location_x * np.ones_like(features['locations_x']),
).astype(np.float32)
features['locations_y'] = np.where(
is_node_placed,
features['locations_y'],
self._default_location_y * np.ones_like(features['locations_y']),
).astype(np.float32)
def get_static_features(self) -> Dict[Text, np.ndarray]:
return {
key: self._features[key]
for key in observation_config_lib.STATIC_OBSERVATIONS
}
def get_initial_features(self) -> Dict[Text, np.ndarray]:
return {
key: self._features[key]
for key in observation_config_lib.INITIAL_OBSERVATIONS
}
def _update_dynamic_features(self, previous_node_index: int,
current_node_index: int,
mask: np.ndarray) -> None:
"""Updates the dynamic features."""
if previous_node_index >= 0:
x, y = self.plc.get_node_location(
self.plc.get_macro_indices()[previous_node_index])
self._features['locations_x'][previous_node_index] = (
x / (self.width + ObservationExtractor.EPSILON))
self._features['locations_y'][previous_node_index] = (
y / (self.height + ObservationExtractor.EPSILON))
self._features['is_node_placed'][previous_node_index] = 1
self._features['mask'] = mask.astype(np.int32)
self._features['current_node'] = np.asarray([current_node_index
]).astype(np.int32)
def get_dynamic_features(self, previous_node_index: int,
current_node_index: int,
mask: np.ndarray) -> Dict[Text, np.ndarray]:
self._update_dynamic_features(previous_node_index, current_node_index, mask)
return {
key: self._features[key]
for key in observation_config_lib.DYNAMIC_OBSERVATIONS
if key in self._features
}
def get_all_features(self, previous_node_index: int, current_node_index: int,
mask: np.ndarray) -> Dict[Text, np.ndarray]:
features = self.get_static_features()
features.update(
self.get_dynamic_features(
previous_node_index=previous_node_index,
current_node_index=current_node_index,
mask=mask))
return features
......@@ -44,7 +44,7 @@ def nodes_of_types(plc: plc_client.PlacementCost,
yield i
i += 1
# done
def get_node_xy_coordinates(
plc: plc_client.PlacementCost) -> Dict[int, Tuple[float, float]]:
"""Returns all node x,y coordinates (canvas) in a dict."""
......@@ -54,7 +54,7 @@ def get_node_xy_coordinates(
node_coords[node_index] = plc.get_node_location(node_index)
return node_coords
# done
def get_macro_orientations(plc: plc_client.PlacementCost) -> Dict[int, int]:
"""Returns all macros' orientations in a dict."""
macro_orientations = dict()
......@@ -62,7 +62,7 @@ def get_macro_orientations(plc: plc_client.PlacementCost) -> Dict[int, int]:
macro_orientations[node_index] = plc.get_macro_orientation(node_index)
return macro_orientations
# done
def restore_node_xy_coordinates(
plc: plc_client.PlacementCost,
node_coords: Dict[int, Tuple[float, float]]) -> None:
......@@ -70,13 +70,13 @@ def restore_node_xy_coordinates(
if not plc.is_node_fixed(node_index):
plc.update_node_coords(node_index, coords[0], coords[1])
# done
def restore_macro_orientations(plc: plc_client.PlacementCost,
macro_orientations: Dict[int, int]) -> None:
for node_index, orientation in macro_orientations.items():
plc.update_macro_orientation(node_index, orientation)
#
# done
def extract_attribute_from_comments(attribute: str,
filenames: List[str]) -> Optional[str]:
"""Parses the files' comments section, tries to extract the attribute.
......@@ -104,7 +104,7 @@ def extract_attribute_from_comments(attribute: str,
break
return None
#done
# done
def get_blockages_from_comments(
filenames: Union[str, List[str]]) -> Optional[List[List[float]]]:
"""Returns list of blockages if they exist in the file's comments section."""
......@@ -171,7 +171,7 @@ def extract_sizes_from_comments(
if canvas_width and canvas_height and grid_cols and grid_rows:
return canvas_width, canvas_height, grid_cols, grid_rows
# done
def fix_port_coordinates(plc: plc_client.PlacementCost) -> None:
"""Find all ports and fix their coordinates.
......@@ -182,12 +182,13 @@ def fix_port_coordinates(plc: plc_client.PlacementCost) -> None:
# print("node to fix:", node)
plc.fix_node_coord(node)
# done
# The routing capacities are calculated based on the public information about
# 7nm technology (https://en.wikichip.org/wiki/7_nm_lithography_process)
# with an arbitary, yet reasonable, assumption of 18% of the tracks for
# the power grids.
def create_placement_cost(
plc_client: None,
netlist_file: str,
init_placement: Optional[str] = None,
overlap_threshold: float = 4e-3,
......@@ -300,20 +301,21 @@ def get_node_type_counts(plc: plc_client.PlacementCost) -> Dict[str, int]:
counts['HARD_MACRO'] += 1
if node_type == 'MACRO_PIN':
ref_id = plc.get_ref_node_id(node_index)
# print("ref_id: ", ref_id)
if plc.is_node_soft_macro(ref_id):
counts['SOFT_MACRO_PIN'] += 1
else:
counts['HARD_MACRO_PIN'] += 1
return counts
# done
def make_blockage_text(plc: plc_client.PlacementCost) -> str:
ret = ''
for blockage in plc.get_blockages():
ret += 'Blockage : {}\n'.format(' '.join([str(b) for b in blockage]))
return ret
# done
def save_placement(plc: plc_client.PlacementCost,
filename: str,
user_comments: str = '') -> None:
......@@ -372,9 +374,12 @@ def save_placement(plc: plc_client.PlacementCost,
if user_comments:
info += '\nUser comments:\n' + user_comments + '\n'
info += '\nnode_index x y orientation fixed'
# print(info)
return plc.save_placement(filename, info)
# TODO: plc.optimize_stdcells
def fd_placement_schedule(plc: plc_client.PlacementCost,
num_steps: Tuple[int, ...] = (100, 100, 100),
io_factor: float = 1.0,
......@@ -415,7 +420,7 @@ def fd_placement_schedule(plc: plc_client.PlacementCost,
log_scale_conns, use_sizes, io_factor, num_steps,
max_move_distance, attract_factor, repel_factor)
# not tested
def get_ordered_node_indices(mode: str,
plc: plc_client.PlacementCost,
exclude_fixed_nodes: bool = True) -> List[int]:
......@@ -456,7 +461,7 @@ def get_ordered_node_indices(mode: str,
ordered_indices = [m for m in ordered_indices if not plc.is_node_fixed(m)]
return ordered_indices
# done
def extract_parameters_from_comments(
filename: str) -> Tuple[float, float, int, int]:
"""Parses the file's comments section, tries to extract canvas/grid sizes.
......@@ -476,6 +481,7 @@ def extract_parameters_from_comments(
fp_re = re.search(
r'FP bbox: \{([\d\.]+) ([\d\.]+)\} \{([\d\.]+) ([\d\.]+)\}', line)
if fp_re:
# NOTE: first two argument contains origin coord but not used
canvas_width = float(fp_re.group(3))
canvas_height = float(fp_re.group(4))
continue
......@@ -494,7 +500,7 @@ def extract_parameters_from_comments(
break
return canvas_width, canvas_height, grid_cols, grid_rows
# done
def get_routing_resources() -> Dict[str, float]:
"""Currently we only use default parameter settings.
......@@ -512,6 +518,7 @@ def get_routing_resources() -> Dict[str, float]:
}
# done
def nodes_of_types(plc: plc_client.PlacementCost, type_list: List[str]):
"""Yields the index of a node of certain types."""
i = 0
......@@ -523,7 +530,7 @@ def nodes_of_types(plc: plc_client.PlacementCost, type_list: List[str]):
yield i
i += 1
# done
def num_nodes_of_type(plc, node_type):
"""Returns number of node of a particular type."""
count = 0
......@@ -532,6 +539,7 @@ def num_nodes_of_type(plc, node_type):
return count
# not tested
def extract_blockages_from_tcl(filename: str,
block_name: str,
canvas_width: float,
......@@ -606,7 +614,7 @@ def extract_blockages_from_tcl(filename: str,
index += 1
return blockages
# done
def get_ascii_picture(vect: List[Any],
cols: int,
rows: int,
......@@ -636,7 +644,7 @@ def get_ascii_picture(vect: List[Any],
ret_str += ' -' + '-' * 2 * cols + '\n'
return ret_str
# done
def get_hard_macro_density_map(plc: plc_client.PlacementCost) -> List[float]:
"""Returns the placement density map for hard macros only."""
# Unplaces all standard cells and soft macros, so that grid cell density
......@@ -660,7 +668,7 @@ def get_hard_macro_density_map(plc: plc_client.PlacementCost) -> List[float]:
plc.set_canvas_boundary_check(check_boundary)
return hard_macro_density
# done
def save_placement_with_info(plc: plc_client.PlacementCost,
filename: str,
user_comments: str = '') -> None:
......@@ -755,8 +763,9 @@ def save_placement_with_info(plc: plc_client.PlacementCost,
info += '\nnode_index x y orientation fixed'
return plc.save_placement(filename, info)
# done
def create_placement_cost_using_common_arguments(
plc_client:None,
netlist_file: str,
init_placement: Optional[str] = None,
canvas_width: Optional[float] = None,
......@@ -847,7 +856,7 @@ def create_placement_cost_using_common_arguments(
return plc
# done, but need verify
def get_node_locations(plc: plc_client.PlacementCost) -> Dict[int, int]:
"""Returns all node grid locations (macros and stdcells) in a dict."""
node_locations = dict()
......@@ -866,7 +875,7 @@ def get_node_ordering_by_size(plc: plc_client.PlacementCost) -> List[int]:
node_areas[i] = w * h
return sorted(node_areas, key=node_areas.get, reverse=True)
# not tested
def grid_locations_near(plc: plc_client.PlacementCost,
start_grid_index: int) -> Iterator[int]:
"""Yields node indices closest to the start_grid_index."""
......@@ -894,7 +903,7 @@ def grid_locations_near(plc: plc_client.PlacementCost,
continue
yield int(new_col + new_row * cols)
# not tested
def place_near(plc: plc_client.PlacementCost, node_index: int,
location: int) -> bool:
"""Places a node (legally) closest to the given location.
......@@ -913,7 +922,7 @@ def place_near(plc: plc_client.PlacementCost, node_index: int,
return True
return False
# not tested
def disconnect_high_fanout_nets(plc: plc_client.PlacementCost,
max_allowed_fanouts: int = 500) -> None:
high_fanout_nets = []
......@@ -925,7 +934,7 @@ def disconnect_high_fanout_nets(plc: plc_client.PlacementCost,
high_fanout_nets.append(i)
plc.disconnect_nets(high_fanout_nets)
# not tested
def legalize_placement(plc: plc_client.PlacementCost) -> bool:
"""Places the nodes to legal positions snapping to grid cells."""
# Unplace all except i/o's.
......@@ -963,10 +972,44 @@ def main():
test_netlist_dir = './Plc_client/test/'+'ariane'
netlist_file = os.path.join(test_netlist_dir,'netlist.pb.txt')
init_placement = os.path.join(test_netlist_dir,'initial.plc')
plc = create_placement_cost(netlist_file=netlist_file, init_placement=init_placement)
plc = create_placement_cost_using_common_arguments(netlist_file=netlist_file, init_placement=init_placement,
grid_cols=10, grid_rows=10, congestion_smooth_range=2.0, overlap_threshold=0.004, use_incremental_cost=False)
plc = create_placement_cost(plc_client=plc_client, netlist_file=netlist_file, init_placement=init_placement)
# plc = create_placement_cost_using_common_arguments(netlist_file=netlist_file, init_placement=init_placement,
# grid_cols=10, grid_rows=10, congestion_smooth_range=2.0, overlap_threshold=0.004, use_incremental_cost=False)
print(make_blockage_text(plc))
# save_placement(plc, "save_test", 'this is a comment')
# plc.nodes_of_types()
# node_xy_coordinates
NODE_XY_DICT = {}
for i in nodes_of_types(plc, ['MACRO', 'macro', 'STDCELL', 'PORT']):
NODE_XY_DICT[i] = (100, 100)
restore_node_xy_coordinates(plc, NODE_XY_DICT)
# print(get_node_xy_coordinates(plc))
# macro_orientation
MACRO_ORIENTATION = {}
for i in nodes_of_types(plc, ['MACRO', 'macro']):
MACRO_ORIENTATION[i] = "S"
restore_macro_orientations(plc, MACRO_ORIENTATION)
# print(get_macro_orientations(plc))
fix_port_coordinates(plc)
# write out new plc
save_placement(plc, "save_test", 'this is a comment')
# needs more testing
print(get_node_locations(plc))
# num_nodes_of_type
print("num_nodes_of_type 'MACRO':", num_nodes_of_type(plc, "MACRO"))
# get_hard_macro_density_map
print("get_hard_macro_density_map: \n", get_hard_macro_density_map(plc))
print("get_hard_macro_density_map in ASCII: \n", get_ascii_picture(get_hard_macro_density_map(plc), *plc.get_grid_num_columns_rows()))
print()
if __name__ == '__main__':
main()
\ No newline at end of file
"""Open-Sourced PlacementCost client class."""
from ast import Assert
import os, io
from platform import node
import re
import math
from typing import Text, Tuple
from typing import Text, Tuple, overload
from absl import logging
from collections import namedtuple
import matplotlib.pyplot as plt
......@@ -49,6 +49,7 @@ class PlacementCost(object):
self.FLAG_UPDATE_CONGESTION = True
self.FLAG_UPDATE_MACRO_ADJ = True
self.FLAG_UPDATE_MACRO_AND_CLUSTERED_PORT_ADJ = True
self.FLAG_UPDATE_NODE_MASK = True
# Check netlist existance
assert os.path.isfile(self.netlist_file)
......@@ -86,6 +87,9 @@ class PlacementCost(object):
self.hard_macros_to_inpins = {}
self.soft_macros_to_inpins = {}
# Placed macro
self.placed_macro = []
# unknown
self.use_incremental_cost = False
# blockage
......@@ -101,12 +105,13 @@ class PlacementCost(object):
self.grid_row = 10
# initialize congestion map
# TODO recompute after new gridding
self.V_routing_cong = [0] * self.grid_col * self.grid_row
self.H_routing_cong = [0] * self.grid_col * self.grid_row
self.V_macro_routing_cong = [0] * self.grid_col * self.grid_row
self.H_macro_routing_cong = [0] * self.grid_col * self.grid_row
# initial grid mask
self.global_node_mask = [0] * self.grid_col * self.grid_row
self.V_routing_cong = [0] * (self.grid_col * self.grid_row)
self.H_routing_cong = [0] * (self.grid_col * self.grid_row)
self.V_macro_routing_cong = [0] * (self.grid_col * self.grid_row)
self.H_macro_routing_cong = [0] * (self.grid_col * self.grid_row)
# initial grid mask, flatten before output
self.node_mask = np.array([1] * (self.grid_col * self.grid_row))\
.reshape(self.grid_row, self.grid_col)
# store module/component count
self.ports_cnt = len(self.port_indices)
self.hard_macro_cnt = len(self.hard_macro_indices)
......@@ -238,7 +243,7 @@ class PlacementCost(object):
# [MACRO_NAME]/[PIN_NAME]
soft_macro_name = node_name.rsplit('/', 1)[0]
# soft macro pin
soft_macro_pin = self.SoftMacroPin(name=node_name,
soft_macro_pin = self.SoftMacroPin(name=node_name,ref_id=None,
x = attr_dict['x'][1],
y = attr_dict['y'][1],
macro_name = attr_dict['macro_name'][1])
......@@ -290,7 +295,7 @@ class PlacementCost(object):
# [MACRO_NAME]/[PIN_NAME]
hard_macro_name = node_name.rsplit('/', 1)[0]
# hard macro pin
hard_macro_pin = self.HardMacroPin(name=node_name,
hard_macro_pin = self.HardMacroPin(name=node_name,ref_id=None,
x = attr_dict['x'][1],
y = attr_dict['y'][1],
x_offset = attr_dict['x_offset'][1],
......@@ -347,6 +352,9 @@ class PlacementCost(object):
# mapping connection degree to each macros
self.__update_connection()
# all hard macros are placed on canvas initially
self.__update_init_placed_node()
def __read_plc(self, plc_pth: str):
"""
Plc file Parser
......@@ -486,6 +494,8 @@ class PlacementCost(object):
self.FLAG_UPDATE_DENSITY = True
self.FLAG_UPDATE_WIRELENGTH = True
self.FLAG_UPDATE_NODE_MASK = True
# extracted information from .plc file
info_dict = self.__read_plc(plc_pth)
......@@ -562,18 +572,20 @@ class PlacementCost(object):
for macro_idx in (self.hard_macro_indices + self.soft_macro_indices):
macro = self.modules_w_pins[macro_idx]
macro_name = macro.get_name()
macro_type = macro.get_type()
if macro_type == "MACRO":
if not self.is_node_soft_macro(macro_idx):
if macro_name in self.hard_macros_to_inpins.keys():
pin_names = self.hard_macros_to_inpins[macro_name]
else:
return
elif macro_type == "macro":
print("[ERROR UPDATE CONNECTION] MACRO not found")
exit(1)
# use is_node_soft_macro()
elif self.is_node_soft_macro(macro_idx):
if macro_name in self.soft_macros_to_inpins.keys():
pin_names = self.soft_macros_to_inpins[macro_name]
else:
return
print("[ERROR UPDATE CONNECTION] MACRO not found")
exit(1)
for pin_name in pin_names:
pin = self.modules_w_pins[self.mod_name_to_indices[pin_name]]
......@@ -581,9 +593,16 @@ class PlacementCost(object):
if inputs:
for k in inputs.keys():
if macro_type == "MACRO" or macro_type == "macro":
if self.get_node_type(macro_idx) == "MACRO":
weight = pin.get_weight()
macro.add_connections(inputs[k], weight)
def __update_init_placed_node(self):
"""
Place all hard macros on node mask initially
"""
for macro_idx in (self.hard_macro_indices + self.soft_macro_indices):
self.placed_macro.append(macro_idx)
def get_cost(self) -> float:
"""
......@@ -619,6 +638,36 @@ class PlacementCost(object):
def get_soft_macro_pins_count(self) -> int:
return self.soft_macro_pins_cnt
def __get_pin_position(self, pin_idx):
"""
* PORT = its own position
* MACRO PIN = ref position + offset position87654321
"""
try:
assert (self.modules_w_pins[pin_idx].get_type() == 'MACRO_PIN' or\
self.modules_w_pins[pin_idx].get_type() == 'PORT')
except Exception:
print("[ERROR PIN POSITION] Not a MACRO PIN")
exit(1)
ref_node_idx = self.get_ref_node_id(pin_idx)
if ref_node_idx == -1:
if self.modules_w_pins[pin_idx].get_type() == 'PORT':
return self.modules_w_pins[pin_idx].get_pos()
else:
# cannot be 'MACRO'
exit(1)
# print("ref_node_idx", ref_node_idx)
ref_node = self.modules_w_pins[ref_node_idx]
ref_node_x, ref_node_y = ref_node.get_pos()
pin_node = self.modules_w_pins[pin_idx]
pin_node_x_offset, pin_node_y_offset = pin_node.get_offset()
return (ref_node_x + pin_node_x_offset, ref_node_y + pin_node_y_offset)
def get_wirelength(self) -> float:
"""
Proxy HPWL computation
......@@ -626,7 +675,7 @@ class PlacementCost(object):
# NOTE: in pb.txt, netlist input count exceed certain threshold will be ommitted
total_hpwl = 0.0
for mod in self.modules_w_pins:
for mod_idx, mod in enumerate(self.modules_w_pins):
norm_fact = 1.0
curr_type = mod.get_type()
# bounding box data structure
......@@ -636,8 +685,8 @@ class PlacementCost(object):
# NOTE: connection only defined on PORT, soft/hard macro pins
if curr_type == "PORT" and mod.get_sink():
# add source position
x_coord.append(mod.get_pos()[0])
y_coord.append(mod.get_pos()[1])
x_coord.append(self.__get_pin_position(mod_idx)[0])
y_coord.append(self.__get_pin_position(mod_idx)[1])
for sink_name in mod.get_sink():
for sink_pin in mod.get_sink()[sink_name]:
# retrieve indx in modules_w_pins
......@@ -645,12 +694,12 @@ class PlacementCost(object):
# retrieve sink object
sink = self.modules_w_pins[sink_idx]
# retrieve location
x_coord.append(sink.get_pos()[0])
y_coord.append(sink.get_pos()[1])
elif curr_type == "macro_pin" or curr_type == "MACRO_PIN":
x_coord.append(self.__get_pin_position(sink_idx)[0])
y_coord.append(self.__get_pin_position(sink_idx)[1])
elif curr_type == "MACRO_PIN":
# add source position
x_coord.append(mod.get_pos()[0])
y_coord.append(mod.get_pos()[1])
x_coord.append(self.__get_pin_position(mod_idx)[0])
y_coord.append(self.__get_pin_position(mod_idx)[1])
if mod.get_sink():
if mod.get_weight() != 0:
......@@ -659,11 +708,10 @@ class PlacementCost(object):
for sink_name in input_list:
# retrieve indx in modules_w_pins
input_idx = self.mod_name_to_indices[sink_name]
# retrieve input object
input = self.modules_w_pins[input_idx]
# retrieve location
x_coord.append(input.get_pos()[0])
y_coord.append(input.get_pos()[1])
# print(self.__get_pin_position(input_idx))
x_coord.append(self.__get_pin_position(input_idx)[0])
y_coord.append(self.__get_pin_position(input_idx)[1])
if x_coord:
if norm_fact != 1.0:
......@@ -733,6 +781,9 @@ class PlacementCost(object):
def get_congestion_cost(self):
#return max(self.get_H_congestion_cost(), self.get_V_congestion_cost())
# TODO need to test if cong is smaller than 5
if self.FLAG_UPDATE_CONGESTION:
self.get_routing()
return self.abu(self.V_routing_cong + self.H_routing_cong, 0.05)
def __get_grid_cell_location(self, x_pos, y_pos):
......@@ -745,14 +796,71 @@ class PlacementCost(object):
col = math.floor(x_pos / self.grid_width)
return row, col
def __overlap_area(self, block_i, block_j):
def __get_grid_location_position(self, col:int, row:int):
"""
private function for getting x y coord from grid cell row/col
"""
self.grid_width = float(self.width/self.grid_col)
self.grid_height = float(self.height/self.grid_row)
x_pos = self.grid_width * col + self.grid_width / 2
y_pos = self.grid_height * row + self.grid_height / 2
return x_pos, y_pos
def __get_grid_cell_position(self, grid_cell_idx:int):
"""
private function for getting x y coord from grid cell row/col
"""
row = grid_cell_idx // self.grid_col
col = grid_cell_idx % self.grid_col
assert row * self.grid_col + col == grid_cell_idx
return self.__get_grid_location_position(col, row)
def __place_node_mask(self,
grid_cell_idx:int,
mod_width:float,
mod_height:float
):
"""
private function for updating node mask after a placement
"""
row = grid_cell_idx // self.grid_col
col = grid_cell_idx % self.grid_col
assert row * self.grid_col + col == grid_cell_idx
hor_pad, ver_pad = self.__node_pad_cell(mod_width=mod_width,
mod_height=mod_height)
self.node_mask[ row - ver_pad:row + ver_pad + 1,
col - hor_pad:col + hor_pad + 1] = 0
def __unplace_node_mask(self, grid_cell_idx:int):
"""
private function for updating node mask after unplacing a node
"""
row = grid_cell_idx // self.grid_col
col = grid_cell_idx % self.grid_col
assert row * self.grid_col + col == grid_cell_idx
pass
def __overlap_area(self, block_i, block_j, return_pos=False):
"""
private function for computing block overlapping
"""
x_diff = min(block_i.x_max, block_j.x_max) - max(block_i.x_min, block_j.x_min)
y_diff = min(block_i.y_max, block_j.y_max) - max(block_i.y_min, block_j.y_min)
x_min_max = min(block_i.x_max, block_j.x_max)
x_max_min = max(block_i.x_min, block_j.x_min)
y_min_max = min(block_i.y_max, block_j.y_max)
y_max_min = max(block_i.y_min, block_j.y_min)
x_diff = x_min_max - x_max_min
y_diff = y_min_max - y_max_min
if x_diff >= 0 and y_diff >= 0:
return x_diff * y_diff
if return_pos:
return x_diff * y_diff, (x_min_max, y_min_max), (x_max_min, y_max_min)
else:
return x_diff * y_diff
return 0
def __overlap_dist(self, block_i, block_j):
......@@ -888,6 +996,8 @@ class PlacementCost(object):
# Flag updates
self.FLAG_UPDATE_CONGESTION = True
self.FLAG_UPDATE_DENSITY = True
self.FLAG_UPDATE_NODE_MASK = True
self.__reset_node_mask()
self.FLAG_UPDATE_MACRO_AND_CLUSTERED_PORT_ADJ = True
self.grid_width = float(self.width/self.grid_col)
......@@ -911,6 +1021,8 @@ class PlacementCost(object):
# Flag updates
self.FLAG_UPDATE_CONGESTION = True
self.FLAG_UPDATE_DENSITY = True
self.FLAG_UPDATE_NODE_MASK = True
self.__reset_node_mask()
self.FLAG_UPDATE_MACRO_AND_CLUSTERED_PORT_ADJ = True
self.V_routing_cong = [0] * self.grid_col * self.grid_row
......@@ -1292,7 +1404,6 @@ class PlacementCost(object):
self.FLAG_UPDATE_CONGESTION = False
for mod in self.modules_w_pins:
norm_fact = 1.0
curr_type = mod.get_type()
# bounding box data structure
node_gcells = set()
......@@ -1312,12 +1423,13 @@ class PlacementCost(object):
# retrieve sink object
sink = self.modules_w_pins[sink_idx]
# retrieve grid location
node_gcells.add(self.__get_grid_cell_location(*(sink.get_pos())))
node_gcells.add(self.__get_grid_cell_location(*(self.__get_pin_position(sink_idx))))
elif (curr_type == "macro_pin" or curr_type == "MACRO_PIN") and mod.get_sink():
elif curr_type == "MACRO_PIN" and mod.get_sink():
# add source position
node_gcells.add(self.__get_grid_cell_location(*(mod.get_pos())))
source_gcell = self.__get_grid_cell_location(*(mod.get_pos()))
mod_idx = self.mod_name_to_indices[mod.get_name()]
node_gcells.add(self.__get_grid_cell_location(*(self.__get_pin_position(mod_idx))))
source_gcell = self.__get_grid_cell_location(*(self.__get_pin_position(mod_idx)))
if mod.get_weight() > 1:
weight = mod.get_weight()
......@@ -1328,10 +1440,10 @@ class PlacementCost(object):
sink_idx = self.mod_name_to_indices[sink_name]
# retrieve sink object
sink = self.modules_w_pins[sink_idx]
# retrieve grid location
node_gcells.add(self.__get_grid_cell_location(*(sink.get_pos())))
# retrieve grid location
node_gcells.add(self.__get_grid_cell_location(*(self.__get_pin_position(sink_idx))))
elif curr_type == "MACRO":
elif curr_type == "MACRO" and self.is_node_hard_macro(self.mod_name_to_indices[mod.get_name()]):
module_h = mod.get_height()
module_w = mod.get_width()
module_x, module_y = mod.get_pos()
......@@ -1412,35 +1524,131 @@ class PlacementCost(object):
self.H_routing_cong = temp_H_routing_cong
def is_node_soft_macro(self, node_idx) -> bool:
return self.get_node_type(node_idx) == "soft_macro"
"""
Return None or return ref_id
"""
try:
return node_idx in self.soft_macro_indices
except IndexError:
print("[ERROR INDEX OUT OF RANGE] Can not process index at {}".format(node_idx))
exit(0)
def is_node_hard_macro(self, node_idx) -> bool:
return self.get_node_type(node_idx) == "hard_macro"
"""
Return None or return ref_id
"""
try:
return node_idx in self.hard_macro_indices
except IndexError:
print("[ERROR INDEX OUT OF RANGE] Can not process index at {}".format(node_idx))
exit(0)
def get_node_name(self, node_idx: int) -> str:
return self.indices_to_mod_name[node_idx]
def get_node_mask(self, node_idx: int, node_name: str) -> list:
def _get_node_mask(self, node_idx: int, node_name: str=None) -> list:
"""
Return Grid_col x Grid_row:
1 == placable
0 == unplacable
(100, 100) => 5
(99, 99) => 0
(100, 99) => 1
(99, 100) => 4
Placement Constraint:
- center @ grid cell
- no overlapping other macro
- no OOB
"""
if self.FLAG_UPDATE_NODE_MASK:
self.__update_node_mask()
module = self.modules_w_pins[node_idx]
temp_node_mask = np.array([0] * (self.grid_col * self.grid_row))\
.reshape(self.grid_row, self.grid_col)
if module.get_placed_flag():
pass
else:
hor_pad, ver_pad = self.__node_pad_cell(mod_width=module.get_width(),
mod_height=module.get_height())
# row, along y-axis, height
for i in range(ver_pad, self.grid_row - ver_pad):
for j in range(hor_pad, self.grid_col - hor_pad):
cell_region = self.node_mask[i - ver_pad : i + ver_pad + 1,
j - hor_pad : j + hor_pad + 1]
if (cell_region == 1).all():
temp_node_mask[i][j] = 1
return temp_node_mask.flatten()
def get_node_mask(self, node_idx: int, node_name: str=None) -> list:
"""
"""
mod = self.modules_w_pins[node_idx]
canvas_block = Block(x_max=self.width,
y_max=self.height,
x_min=0,
y_min=0)
mod_w = mod.get_width()
mod_h = mod.get_height()
temp_node_mask = np.array([1] * (self.grid_col * self.grid_row))\
.reshape(self.grid_row, self.grid_col)
self.grid_width = float(self.width/self.grid_col)
self.grid_height = float(self.height/self.grid_row)
# print(self.grid_col, self.grid_row)
# print(mod_w*mod_h)
for i in range(self.grid_row):
for j in range(self.grid_col):
# try every location
# construct block based on current module
temp_x = j * self.grid_width + (self.grid_width/2)
temp_y = i * self.grid_height + (self.grid_height/2)
mod_block = Block(
x_max=temp_x + (mod_w/2),
y_max=temp_y + (mod_h/2),
x_min=temp_x - (mod_w/2),
y_min=temp_y - (mod_h/2)
)
# check OOB
if abs(self.__overlap_area(
block_i=canvas_block, block_j=mod_block) - (mod_w*mod_h)) > 1e-8:
# print(i, j, self.__overlap_area(
# block_i=canvas_block, block_j=mod_block))
temp_node_mask[i][j] = 0
else:
for pmod_idx in self.placed_macro:
pmod = self.modules_w_pins[pmod_idx]
if not pmod.get_placed_flag():
continue
p_x, p_y = pmod.get_pos()
p_w = pmod.get_width()
p_h = pmod.get_height()
pmod_block = Block(
x_max=p_x + (p_w/2) + 1,
y_max=p_y + (p_h/2) + 1,
x_min=p_x - (p_w/2) - 1,
y_min=p_y - (p_h/2) - 1
)
# overlap with placed module
if self.__overlap_area(block_i=pmod_block, block_j=mod_block) > 0:
temp_node_mask[i][j] = 0
return temp_node_mask.flatten()
def get_node_type(self, node_idx: int) -> str:
"""
Return Vertical/Horizontal Macro Allocation
Return node type
"""
try:
return self.modules_w_pins[node_idx].get_type()
......@@ -1449,6 +1657,25 @@ class PlacementCost(object):
print("[INDEX OUT OF RANGE WARNING] Can not process index at {}".format(node_idx))
return None
def get_node_width_height(self, node_idx: int):
"""
Return node dimension
"""
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'macro', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR NODE FIXED] Found {}. Only 'MACRO', 'macro', 'STDCELL'".format(mod.get_type())
+"'PORT' are considered to be fixable nodes")
exit(1)
except Exception:
print("[ERROR NODE FIXED] Could not find module by node index")
exit(1)
return mod.get_width(), mod.get_height()
def make_soft_macros_square(self):
pass
......@@ -1599,58 +1826,310 @@ class PlacementCost(object):
return macro_adj, sorted(cell_location)
def is_node_fixed(self):
pass
def is_node_fixed(self, node_idx: int):
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR NODE FIXED] Found {}. Only 'MACRO', 'STDCELL'".format(mod.get_type())
+"'PORT' are considered to be fixable nodes")
exit(1)
except Exception:
print("[ERROR NODE FIXED] Could not find module by node index")
exit(1)
return mod.get_fix_flag()
def optimize_stdcells(self):
pass
def update_node_coords(self):
pass
def update_node_coords(self, node_idx, x_pos, y_pos):
"""
Update Node location if node is 'MACRO', 'STDCELL', 'PORT'
"""
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR NODE LOCATION] Found {}. Only 'MACRO', 'macro', 'STDCELL'".format(mod.get_type())
+"'PORT' are considered to be placable nodes")
exit(1)
except Exception:
print("[ERROR NODE LOCATION] Could not find module by node index")
exit(1)
mod.set_pos(x_pos, y_pos)
def update_macro_orientation(self, node_idx, orientation):
"""
Update macro orientation if node is 'MACRO'
"""
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO']
except AssertionError:
print("[ERROR MACRO ORIENTATION] Found {}. Only 'MACRO'".format(mod.get_type())
+" are considered to be ORIENTED")
exit(1)
except Exception:
print("[ERROR MACRO ORIENTATION] Could not find module by node index")
exit(1)
mod.set_orientation(orientation)
def update_port_sides(self):
"""
Define Port "Side" by its location on canvas
"""
pass
def snap_ports_to_edges(self):
pass
def get_node_location(self, node_idx):
pass
"""
Return Node location if node is 'MACRO', 'STDCELL', 'PORT'
"""
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR NODE LOCATION] Found {}. Only 'MACRO', 'STDCELL'".format(mod.get_type())
+"'PORT' are considered to be placable nodes")
exit(1)
except Exception:
print("[ERROR NODE PLACED] Could not find module by node index")
exit(1)
return mod.get_pos()
def get_grid_cell_of_node(self, node_idx):
""" if grid_cell at grid crossing, break-tie to upper right
"""
return self.modules_w_pins(node_idx).get_location()
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO']
except AssertionError:
print("[ERROR NODE LOCATION] Found {}. Only 'MACRO'".format(mod.get_type())
+" can be called")
exit(1)
except Exception:
print("[ERROR NODE LOCATION] Could not find module by node index")
exit(1)
row, col = self.__get_grid_cell_location(*mod.get_pos())
def update_macro_orientation(self, node_idx, orientation):
pass
return row * self.grid_col + col
def get_macro_orientation(self):
pass
def get_macro_orientation(self, node_idx):
mod = None
def unfix_node_coord(self):
"""In case plc is loaded with fixed macros
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO']
except AssertionError:
print("[ERROR MACRO ORIENTATION] Found {}. Only 'MACRO'".format(mod.get_type())
+" are considered to be ORIENTED")
exit(1)
except Exception:
print("[ERROR MACRO ORIENTATION] Could not find module by node index")
exit(1)
return mod.get_orientation()
def unfix_node_coord(self, node_idx):
"""
pass
Unfix a module
"""
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR UNFIX NODE] Found {}. Only 'MACRO', 'STDCELL'".format(mod.get_type())
+"'PORT' are considered to be fixable nodes")
exit(1)
except Exception:
print("[ERROR UNFIX NODE] Could not find module by node index")
exit(1)
self.modules_w_pins[node_idx].set_fix_flag(False)
def fix_node_coord(self, node_idx):
"""
Fix a module
"""
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR FIX NODE] Found {}. Only 'MACRO', 'STDCELL'"\
.format(mod.get_type())
+"'PORT' are considered to be fixable nodes")
exit(1)
except Exception:
print("[ERROR FIX NODE] Could not find module by node index")
exit(1)
self.modules_w_pins[node_idx].set_fix_flag(True)
def unplace_all_nodes(self):
pass
def __update_node_mask(self):
"""
TODO: should we reload the placed node?
NOTE: NOT USED
"""
return
self.node_mask = np.array([1] * (self.grid_col * self.grid_row)).\
reshape(self.grid_row, self.grid_col)
self.FLAG_UPDATE_NODE_MASK = False
for pmod_idx in self.placed_macro:
pmod = self.modules_w_pins[pmod_idx]
if not pmod.get_placed_flag():
continue
p_x, p_y = pmod.get_pos()
prow, pcol = self.__get_grid_cell_location(p_x, p_y)
c_idx = prow * self.grid_col + pcol
self.__place_node_mask(c_idx, pmod.get_width(), pmod.get_height())
def __reset_node_mask(self):
"""
Internal function for reseting node mask
* All four sides cannot be used for placement
"""
self.node_mask = np.array([1] * (self.grid_col * self.grid_row)).\
reshape(self.grid_row, self.grid_col)
def __node_pad_cell(self, mod_width, mod_height):
"""
Internal function for computing how much cells we need for padding
This is to avoid overlapping on placement
"""
self.grid_width = float(self.width/self.grid_col)
self.grid_height = float(self.height/self.grid_row)
cell_hor = math.ceil(((mod_width/2) - (self.grid_width/2)) / self.grid_width)
cell_ver = math.ceil(((mod_height/2) - (self.grid_height/2)) / self.grid_height)
return cell_hor, cell_ver
def place_node(self, node_idx, grid_cell_idx):
pass
"""
Place the node into the center of the given grid_cell
"""
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR PLACE NODE] Found {}. Only 'MACRO', 'STDCELL'"\
.format(mod.get_type())
+"'PORT' are considered to be placable nodes")
exit(1)
except Exception:
print("[ERROR PLACE NODE] Could not find module by node index")
try:
assert grid_cell_idx <= self.grid_col * self.grid_row - 1
except AssertionError:
print("[WARNING PLACE NODE] Invalid Location. No node placed.")
# TODO: add check valid clause
if not mod.get_fix_flag():
mod.set_pos(*self.__get_grid_cell_position(grid_cell_idx))
self.placed_macro.append(self.mod_name_to_indices[mod.get_name()])
mod.set_placed_flag(True)
# update flag
self.FLAG_UPDATE_CONGESTION = True
self.FLAG_UPDATE_DENSITY = True
# self.FLAG_UPDATE_NODE_MASK = True
self.FLAG_UPDATE_WIRELENGTH = True
self.__place_node_mask(grid_cell_idx, mod_width=mod.get_width(), mod_height=mod.get_height())
def can_place_node(self, node_idx, grid_cell_idx):
return self.get_node_mask(node_idx=node_idx)[grid_cell_idx]
def unplace_node(self, node_idx):
# update node_mask
pass
"""
Set the node's ifPlaced flag to False if not fixed node
"""
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR UNPLACE NODE] Found {}. Only 'MACRO', 'STDCELL'".format(mod.get_type())
+"'PORT' are considered to be placable nodes")
exit(1)
except Exception:
print("[ERROR UNPLACE NODE] Could not find module by node index")
exit(1)
if not mod.get_fix_flag():
mod.set_placed_flag(True)
self.placed_macro.remove(node_idx)
# update flag
self.FLAG_UPDATE_CONGESTION = True
self.FLAG_UPDATE_DENSITY = True
# self.FLAG_UPDATE_NODE_MASK = True # placeholder
self.FLAG_UPDATE_WIRELENGTH = True
else:
print("[WARNING UNPLACE NODE] Trying to unplace a fixed node")
def unplace_all_nodes(self):
"""
Set all ifPlaced flag to False except for fixed nodes
"""
for mod_idx in sorted(self.port_indices + self.hard_macro_indices + self.soft_macro_indices):
mod = self.modules_w_pins[mod_idx]
if mod.get_fix_flag():
continue
if mod.get_placed_flag():
mod.set_placed_flag(False)
self.placed_macro = []
# update flag
self.FLAG_UPDATE_CONGESTION = True
self.FLAG_UPDATE_DENSITY = True
# self.FLAG_UPDATE_NODE_MASK = True
self.FLAG_UPDATE_WIRELENGTH = True
self.__reset_node_mask()
def is_node_placed(self, node_idx):
pass
mod = None
try:
mod = self.modules_w_pins[node_idx]
assert mod.get_type() in ['MACRO', 'macro', 'STDCELL', 'PORT']
except AssertionError:
print("[ERROR NODE PLACED] Found {}. Only 'MACRO', 'STDCELL',".format(mod.get_type())
+"'PORT' are considered to be placable nodes")
exit(1)
except Exception:
print("[ERROR NODE PLACED] Could not find module by node index")
exit(1)
mod = self.modules_w_pins[node_idx]
return mod.get_placed_flag()
def disconnect_nets(self):
pass
......@@ -1661,7 +2140,7 @@ class PlacementCost(object):
return self.netlist_file
def get_blockages(self):
pass
return self.blockages
def create_blockage(self, minx, miny, maxx, maxy, blockage_rate):
self.blockages.append([minx, miny, maxx, maxy, blockage_rate])
......@@ -1678,8 +2157,32 @@ class PlacementCost(object):
return self.mod_name_to_indices[pin.get_macro_name()]
return -1
def save_placement(self):
pass
def save_placement(self, filename, info):
"""
When writing out info line-by-line, add a "#" at front
"""
with open(filename, 'w+') as f:
for line in info.split('\n'):
f.write("# " + line + '\n')
# if first, no \newline
HEADER = True
for mod_idx in sorted(self.hard_macro_indices + self.soft_macro_indices + self.port_indices):
# [node_index] [x] [y] [orientation] [fixed]
mod = self.modules_w_pins[mod_idx]
if HEADER:
f.write("{} {:g} {:g} {} {}".format(mod_idx,
*mod.get_pos(),
mod.get_orientation() if mod.get_orientation() else "-",
"1" if mod.get_fix_flag() else "0"))
HEADER = False
else:
f.write("\n{} {:g} {:g} {} {}".format(mod_idx,
*mod.get_pos(),
mod.get_orientation() if mod.get_orientation() else "-",
"1" if mod.get_fix_flag() else "0"))
def display_canvas( self,
annotate=True,
......@@ -1709,22 +2212,35 @@ class PlacementCost(object):
# Construct module blocks
for mod in self.modules_w_pins:
if mod.get_type() == 'PORT':
if mod.get_type() == 'PORT' and mod.get_placed_flag():
plt.plot(*mod.get_pos(),'ro', markersize=PORT_SIZE)
elif mod.get_type() == 'MACRO':
ax.add_patch(Rectangle((mod.get_pos()[0] - mod.get_width()/2, mod.get_pos()[1] - mod.get_height()/2),\
mod.get_width(), mod.get_height(),\
alpha=0.5, zorder=1000, facecolor='b', edgecolor='darkblue'))
if annotate:
ax.annotate(mod.get_name(), mod.get_pos(), color='r', weight='bold', fontsize=FONT_SIZE, ha='center', va='center')
elif mod.get_type() == 'MACRO' and mod.get_placed_flag():
if not self.is_node_soft_macro(self.mod_name_to_indices[mod.get_name()]):
# hard macro
ax.add_patch(Rectangle((mod.get_pos()[0] - mod.get_width()/2, mod.get_pos()[1] - mod.get_height()/2),\
mod.get_width(), mod.get_height(),\
alpha=0.5, zorder=1000, facecolor='b', edgecolor='darkblue'))
if annotate:
ax.annotate(mod.get_name(), mod.get_pos(), wrap=True,color='r', weight='bold', fontsize=FONT_SIZE, ha='center', va='center')
else:
# soft macro
ax.add_patch(Rectangle((mod.get_pos()[0] - mod.get_width()/2, mod.get_pos()[1] - mod.get_height()/2),\
mod.get_width(), mod.get_height(),\
alpha=0.5, zorder=1000, facecolor='y'))
if annotate:
ax.annotate(mod.get_name(), mod.get_pos(), wrap=True,color='r', weight='bold', fontsize=FONT_SIZE, ha='center', va='center')
elif mod.get_type() == 'MACRO_PIN':
plt.plot(*mod.get_pos(),'bo', markersize=PIN_SIZE)
elif mod.get_type() == 'macro':
ax.add_patch(Rectangle((mod.get_pos()[0] - mod.get_width()/2, mod.get_pos()[1] - mod.get_height()/2),\
mod.get_width(), mod.get_height(),\
alpha=0.5, zorder=1000, facecolor='y'))
if annotate:
ax.annotate(mod.get_name(), mod.get_pos(), wrap=True,color='r', weight='bold', fontsize=FONT_SIZE, ha='center', va='center')
pin_idx = self.mod_name_to_indices[mod.get_name()]
macro_idx = self.get_ref_node_id(pin_idx)
macro = self.modules_w_pins[macro_idx]
if macro.get_placed_flag():
plt.plot(*self.__get_pin_position(pin_idx),'bo', markersize=PIN_SIZE)
# elif mod.get_type() == 'macro' :
# ax.add_patch(Rectangle((mod.get_pos()[0] - mod.get_width()/2, mod.get_pos()[1] - mod.get_height()/2),\
# mod.get_width(), mod.get_height(),\
# alpha=0.5, zorder=1000, facecolor='y'))
# if annotate:
# ax.annotate(mod.get_name(), mod.get_pos(), wrap=True,color='r', weight='bold', fontsize=FONT_SIZE, ha='center', va='center')
plt.show()
plt.close('all')
......@@ -1740,9 +2256,14 @@ class PlacementCost(object):
self.connection = {} # [module_name] => edge degree
self.fix_flag = True
self.placement = 0 # needs to be updated
self.orientation = None
self.ifPlaced = True
def get_name(self):
return self.name
def get_orientation(self):
return self.orientation
def add_connection(self, module_name):
# NOTE: assume PORT names does not contain slash
......@@ -1813,6 +2334,12 @@ class PlacementCost(object):
def get_fix_flag(self):
return self.fix_flag
def set_placed_flag(self, ifPlaced):
self.ifPlaced = ifPlaced
def get_placed_flag(self):
return self.ifPlaced
class SoftMacro:
def __init__(self, name, width, height, x = 0.0, y = 0.0):
self.name = name
......@@ -1860,7 +2387,7 @@ class PlacementCost(object):
return self.x, self.y
def get_type(self):
return "macro"
return "MACRO"
def get_connection(self):
return self.connection
......@@ -1892,11 +2419,18 @@ class PlacementCost(object):
def get_fix_flag(self):
return self.fix_flag
def set_placed_flag(self, ifPlaced):
self.ifPlaced = ifPlaced
def get_placed_flag(self):
return self.ifPlaced
class SoftMacroPin:
def __init__( self, name,
x = 0.0, y = 0.0,
macro_name = "", weight = 1.0):
def __init__(self, name, ref_id,
x = 0.0, y = 0.0,
macro_name = "", weight = 1.0):
self.name = name
self.ref_id = ref_id
self.x = float(x)
self.y = float(y)
self.x_offset = 0.0 # not used
......@@ -1908,6 +2442,12 @@ class PlacementCost(object):
def set_weight(self, weight):
self.weight = weight
def set_ref_id(self, ref_id):
self.ref_id = ref_id
def get_ref_id(self):
return self.ref_id
def get_weight(self):
return self.weight
......@@ -1956,7 +2496,7 @@ class PlacementCost(object):
return self.weight
def get_type(self):
return "macro_pin"
return "MACRO_PIN"
class HardMacro:
def __init__(self, name, width, height,
......@@ -2037,13 +2577,20 @@ class PlacementCost(object):
def get_fix_flag(self):
return self.fix_flag
def set_placed_flag(self, ifPlaced):
self.ifPlaced = ifPlaced
def get_placed_flag(self):
return self.ifPlaced
class HardMacroPin:
def __init__(self, name,
def __init__(self, name, ref_id,
x = 0.0, y = 0.0,
x_offset = 0.0, y_offset = 0.0,
macro_name = "", weight = 1.0):
self.name = name
self.ref_id = ref_id
self.x = float(x)
self.y = float(y)
self.x_offset = float(x_offset)
......@@ -2053,6 +2600,12 @@ class PlacementCost(object):
self.sink = {}
self.ifPlaced = True
def set_ref_id(self, ref_id):
self.ref_id = ref_id
def get_ref_id(self):
return self.ref_id
def set_weight(self, weight):
self.weight = weight
......
import numpy as np
import sys,os,traceback
import pandas as pd
import sys
import os
import traceback
import argparse
import math
import re
from random import randrange
from absl import flags
from absl.flags import argparse_flags
from absl import app
from Plc_client import plc_client_os as plc_client_os
from Plc_client import placement_util_os as placement_util
from Plc_client import observation_extractor_os as observation_extractor
from Plc_client import environment_os as environment
from Plc_client import observation_config
try:
from Plc_client import plc_client as plc_client
except ImportError:
......@@ -28,10 +38,10 @@ Example:
--height 356.640\
--col 35\
--row 33\
--rpmh 10\
--rpmv 10\
--marh 5\
--marv 5\
--rpmh 70.330\
--rpmv 74.510\
--marh 51.790\
--marv 51.790\
--smooth 2
$ python3 -m Plc_client.plc_client_os_test --netlist ./Plc_client/test/ariane133/netlist.pb.txt\
......@@ -45,6 +55,18 @@ Example:
--marh 5\
--marv 5\
--smooth 2
$ python3 -m Plc_client.plc_client_os_test --netlist ./Plc_client/test/0P2M0m/netlist.pb.txt\
--plc ./Plc_client/test/0P2M0m/initial.plc\
--width 500\
--height 500\
--col 5\
--row 5\
--rpmh 10\
--rpmv 10\
--marh 5\
--marv 5\
--smooth 2
Todo:
* Clean up code
......@@ -53,6 +75,7 @@ Todo:
"""
class PlacementCostTest():
""" Canvas Setting Reference Table
......@@ -81,11 +104,10 @@ class PlacementCostTest():
- GRID_ROW = 5
"""
def __init__(self, NETLIST_PATH, PLC_PATH=None,
width=0, height=0,
column=0, row=0, rpmv=10, rpmh=10,
marh=10, marv=10, smooth=1) -> None:
width=0, height=0,
column=0, row=0, rpmv=10, rpmh=10,
marh=10, marv=10, smooth=1) -> None:
self.NETLIST_PATH = NETLIST_PATH
self.PLC_PATH = PLC_PATH
self.CANVAS_WIDTH = width
......@@ -100,15 +122,14 @@ class PlacementCostTest():
self.MARV = marv
self.SMOOTH = smooth
def test_metadata(self):
print("############################ TEST METADATA ############################")
# Google's Binary Executable
self.plc = plc_client.PlacementCost(self.NETLIST_PATH)
# Open-sourced Implementation
self.plc_os = plc_client_os.PlacementCost(netlist_file=self.NETLIST_PATH,
macro_macro_x_spacing = 50,
macro_macro_y_spacing = 50)
macro_macro_x_spacing=50,
macro_macro_y_spacing=50)
# NOTE: must set canvas before restoring placement, otherwise OOB error
self.plc.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
......@@ -120,9 +141,9 @@ class PlacementCostTest():
print("[PLC FILE FOUND] Loading info from .plc file")
self.plc_os.set_canvas_boundary_check(False)
self.plc_os.restore_placement(self.PLC_PATH,
ifInital=True,
ifValidate=True,
ifReadComment=False)
ifInital=True,
ifValidate=True,
ifReadComment=False)
self.plc.set_canvas_boundary_check(False)
self.plc.restore_placement(self.PLC_PATH)
else:
......@@ -130,7 +151,7 @@ class PlacementCostTest():
try:
assert int(self.plc_os.get_area()) == int(self.plc.get_area())
self.plc.set_routes_per_micron(1.0, 2.0)
self.plc_os.set_routes_per_micron(1.0, 2.0)
assert self.plc.get_routes_per_micron() == self.plc_os.get_routes_per_micron()
......@@ -141,86 +162,98 @@ class PlacementCostTest():
self.plc.set_congestion_smooth_range(2.0)
self.plc_os.set_congestion_smooth_range(2.0)
assert self.plc.get_congestion_smooth_range() == self.plc_os.get_congestion_smooth_range()
assert self.plc.get_congestion_smooth_range(
) == self.plc_os.get_congestion_smooth_range()
self.plc.set_macro_routing_allocation(3.0, 4.0)
self.plc_os.set_macro_routing_allocation(3.0, 4.0)
assert self.plc.get_macro_routing_allocation() == self.plc_os.get_macro_routing_allocation()
assert self.plc.get_macro_routing_allocation(
) == self.plc_os.get_macro_routing_allocation()
except Exception as e:
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
tb_info = traceback.extract_tb(tb)
_, line, _, text = tb_info[-1]
print('[METADATA ERROR] at line {} in statement {}'\
.format(line, text))
print('[METADATA ERROR] at line {} in statement {}'
.format(line, text))
exit(1)
# test get_macro_adjacency
plc_macroadj = self.plc.get_macro_adjacency()
plc_macroadj = np.array(plc_macroadj).reshape(int(math.sqrt(len(plc_macroadj))),\
int(math.sqrt(len(plc_macroadj))))
plc_macroadj = np.array(plc_macroadj).reshape(int(math.sqrt(len(plc_macroadj))),
int(math.sqrt(len(plc_macroadj))))
plcos_macroadj = self.plc_os.get_macro_adjacency()
plcos_macroadj = np.array(plcos_macroadj).reshape(int(math.sqrt(len(plcos_macroadj))),\
int(math.sqrt(len(plcos_macroadj))))
plcos_macroadj = np.array(plcos_macroadj).reshape(int(math.sqrt(len(plcos_macroadj))),
int(math.sqrt(len(plcos_macroadj))))
try:
assert(np.sum(np.nonzero(plc_macroadj - plcos_macroadj)) == 0)
except Exception as e:
print("[MACRO ADJ ERROR] Mismatched found -- {}".format(str(e)))
exit(1)
# test get_macro_and_clustered_port_adjacency
plc_clusteradj, plc_cell = self.plc.get_macro_and_clustered_port_adjacency()
plc_clusteradj = np.array(plc_clusteradj).reshape(int(math.sqrt(len(plc_clusteradj))),\
int(math.sqrt(len(plc_clusteradj))))
plc_clusteradj = np.array(plc_clusteradj).reshape(int(math.sqrt(len(plc_clusteradj))),
int(math.sqrt(len(plc_clusteradj))))
plcos_clusteradj, plcos_cell = self.plc_os.get_macro_and_clustered_port_adjacency()
plcos_clusteradj = np.array(plcos_clusteradj).reshape(int(math.sqrt(len(plcos_clusteradj))),\
int(math.sqrt(len(plcos_clusteradj))))
plcos_clusteradj = np.array(plcos_clusteradj).reshape(int(math.sqrt(len(plcos_clusteradj))),
int(math.sqrt(len(plcos_clusteradj))))
try:
for plc_adj, plcos_adj in zip(plc_clusteradj, plcos_clusteradj):
assert(np.sum(np.nonzero(plc_adj - plcos_adj)) == 0)
except Exception as e:
print("[MACRO AND CLUSTERED PORT ADJ ERROR] Mismatched found -- {}".format(str(e)))
print(
"[MACRO AND CLUSTERED PORT ADJ ERROR] Mismatched found -- {}".format(str(e)))
exit(1)
print(" +++++++++++++++++++++++++++")
print(" +++ TEST METADATA: PASS +++")
print(" +++++++++++++++++++++++++++")
def view_canvas(self):
def view_canvas(self, ifInital, ifReadComment):
print("############################ VIEW CANVAS ############################")
self.plc_os = plc_client_os.PlacementCost(netlist_file=self.NETLIST_PATH,
macro_macro_x_spacing = 50,
macro_macro_y_spacing = 50)
self.plc.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc.set_placement_grid(self.GRID_COL, self.GRID_ROW)
macro_macro_x_spacing=50,
macro_macro_y_spacing=50)
self.plc_os.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc_os.set_placement_grid(self.GRID_COL, self.GRID_ROW)
if self.PLC_PATH:
print("[PLC FILE FOUND] Loading info from .plc file")
self.plc_os.set_canvas_boundary_check(False)
self.plc_os.restore_placement(self.PLC_PATH,
ifInital=ifInital,
ifValidate=False,
ifReadComment=ifReadComment)
# show canvas
self.plc_os.display_canvas()
self.plc_os.display_canvas(annotate=False, amplify=False)
def test_proxy_cost(self):
print("############################ TEST PROXY COST ############################")
# Google's Binary Executable
self.plc = plc_client.PlacementCost(self.NETLIST_PATH)
# Open-sourced Implementation
self.plc_os = plc_client_os.PlacementCost(netlist_file=self.NETLIST_PATH,
macro_macro_x_spacing = 50,
macro_macro_y_spacing = 50)
macro_macro_x_spacing=50,
macro_macro_y_spacing=50)
self.plc.get_overlap_threshold()
print("overlap_threshold default", self.plc.get_overlap_threshold())
if self.PLC_PATH:
print("[PLC FILE FOUND] Loading info from .plc file")
self.plc_os.set_canvas_boundary_check(False)
self.plc_os.restore_placement(self.PLC_PATH,
ifInital=True,
ifValidate=True,
ifReadComment=False)
ifInital=True,
ifValidate=True,
ifReadComment=False)
self.plc.set_canvas_boundary_check(False)
# self.plc.restore_placement(self.PLC_PATH)
self.plc.restore_placement(self.PLC_PATH)
else:
print("[PLC FILE MISSING] Using only netlist info")
......@@ -238,10 +271,10 @@ class PlacementCostTest():
self.plc_os.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc_os.set_placement_grid(self.GRID_COL, self.GRID_ROW)
# TODO: [IGNORE] Setting blockage has no effect on proxy cost computation
# TODO: [IGNORE] create_blockage must be defined BEFORE set_canvas_size and set_placement_grid in order to be considered on the canvas
if False:
self.plc.create_blockage(0, 0, 400, 400, 1)
self.plc.create_blockage(0, 0, 200, 200, 1)
self.plc.create_blockage(0.0, 100.0, 300.0, 300.0, 1.0)
self.plc.create_blockage(300, 0, 500, 200, 1)
print(self.plc.get_blockages())
print(self.plc.make_soft_macros_square())
print(self.plc.set_use_incremental_cost(True))
......@@ -249,27 +282,41 @@ class PlacementCostTest():
# HPWL
try:
assert int(self.plc_os.get_wirelength()) == int(self.plc.get_wirelength())
assert int(self.plc_os.get_wirelength()) == int(
self.plc.get_wirelength())
assert abs(self.plc.get_cost() - self.plc_os.get_cost()) <= 1e-3
print("#[INFO WIRELENGTH] Matched irelength cost -- GL {}, OS {}".format(
str(self.plc.get_cost()), self.plc_os.get_cost()))
except Exception as e:
print("[WIRELENGTH ERROR] Discrepancies found when computing wirelength -- {}, {}".format(str(self.plc.get_cost()), self.plc_os.get_cost()))
print("[ERROR WIRELENGTH] Discrepancies found when computing wirelength -- GL {}, OS {}".format(
str(self.plc.get_cost()), self.plc_os.get_cost()))
exit(1)
# Density
try:
assert int(sum(self.plc_os.get_grid_cells_density())) == int(sum(self.plc.get_grid_cells_density()))
assert int(self.plc_os.get_density_cost()) == int(self.plc.get_density_cost())
assert int(sum(self.plc_os.get_grid_cells_density())) == int(
sum(self.plc.get_grid_cells_density()))
assert int(self.plc_os.get_density_cost()) == int(
self.plc.get_density_cost())
print("#[INFO DENSITY] Matched density cost -- GL {}, OS {}".format(
str(self.plc.get_density_cost()), self.plc_os.get_density_cost()))
except Exception as e:
print("[DENSITY ERROR] Discrepancies found when computing density -- {}, {}".format(str(self.plc.get_density_cost()), self.plc_os.get_density_cost()))
print("[ERROR DENSITY] Discrepancies found when computing density -- GL {}, OS {}".format(
str(self.plc.get_density_cost()), self.plc_os.get_density_cost()))
exit(1)
# Congestion
try:
assert abs(sum(self.plc_os.get_horizontal_routing_congestion()) - sum(self.plc.get_horizontal_routing_congestion())) < 1e-3
assert abs(sum(self.plc_os.get_vertical_routing_congestion()) - sum(self.plc.get_vertical_routing_congestion())) < 1e-3
assert abs(self.plc.get_congestion_cost() - self.plc_os.get_congestion_cost()) < 1e-3
# NOTE: [IGNORE] grid-wise congestion not tested because
# miscellaneous implementation differences.
assert abs(self.plc.get_congestion_cost() -
self.plc_os.get_congestion_cost()) < 1e-3
print("#[INFO CONGESTION] Matched congestion cost -- GL {}, OS {}".format(
str(self.plc.get_congestion_cost()), self.plc_os.get_congestion_cost()))
except Exception as e:
print("[CONGESTION ERROR] Discrepancies found when computing congestion -- {}".format(str(e)))
print("[ERROR CONGESTION] Discrepancies found when computing congestion -- GL {}, OS {}"
.format(str(self.plc.get_congestion_cost()),
str(self.plc_os.get_congestion_cost())))
exit(1)
print(" +++++++++++++++++++++++++++++")
......@@ -280,51 +327,79 @@ class PlacementCostTest():
# Google's Binary Executable
self.plc = plc_client.PlacementCost(self.NETLIST_PATH)
self.plc_os = plc_client_os.PlacementCost(netlist_file=self.NETLIST_PATH,
macro_macro_x_spacing = 50,
macro_macro_y_spacing = 50)
print("****************** miscellaneous ******************")
macro_macro_x_spacing=50,
macro_macro_y_spacing=50)
self.plc.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc.set_placement_grid(self.GRID_COL, self.GRID_ROW)
self.plc_util = placement_util.create_placement_cost(
plc_client=plc_client,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH
)
self.plc_os.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc_os.set_placement_grid(self.GRID_COL, self.GRID_ROW)
NODE_IDX = 0
print("get_macro_indices", self.plc.get_macro_indices(), self.plc_os.get_macro_indices())
print("get_node_name", self.plc.get_node_name(NODE_IDX))
print("get_node_location", self.plc.get_node_location(NODE_IDX))
print("get_grid_cell_of_node", self.plc.get_grid_cell_of_node(NODE_IDX))
print("get_node_location", self.plc.get_node_location(NODE_IDX))
print("get_macro_orientation", self.plc.get_macro_orientation(NODE_IDX))
print("is_node_placed", self.plc.is_node_placed(NODE_IDX))
print("get_source_filename", self.plc.get_source_filename())
print("get_blockages", self.plc.get_blockages())
print("get_ref_node_id", self.plc.get_ref_node_id(NODE_IDX), self.plc.get_ref_node_id(NODE_IDX))
print("get_node_mask\n", np.array(self.plc.get_node_mask(NODE_IDX)).reshape((4,4)))
print("can_place_node", self.plc.can_place_node(0, 1))
self.plc_os.display_canvas()
self.unplace_node()
print(np.flip(np.array(self.plc_util.get_node_mask(0)).reshape(35, 33), axis=0))
print(np.flip(np.array(self.plc.get_node_mask(0)).reshape(35, 33), axis=0))
print("****************** miscellaneous ******************")
# self.plc.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
# self.plc.set_placement_grid(self.GRID_COL, self.GRID_ROW)
# self.plc_os.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
# self.plc_os.set_placement_grid(self.GRID_COL, self.GRID_ROW)
# NODE_IDX = 22853
# print("get_macro_indices", self.plc.get_macro_indices())
# print("get_node_name", self.plc.get_node_name(NODE_IDX))
# print("get_node_type", self.plc.get_node_type(NODE_IDX))
# print("get_node_location", self.plc.get_node_location(NODE_IDX))
# print("get_grid_cell_of_node", self.plc.get_grid_cell_of_node(NODE_IDX))
# print("get_macro_orientation", self.plc.get_macro_orientation(NODE_IDX))
# print("is_node_placed", self.plc.is_node_placed(NODE_IDX))
# print("get_source_filename", self.plc.get_source_filename())
# print("get_blockages", self.plc.get_blockages())
# print("get_ref_node_id", self.plc.get_ref_node_id(NODE_IDX), self.plc.get_ref_node_id(NODE_IDX))
# print("get_node_mask\n", np.array(self.plc.get_node_mask(NODE_IDX)).reshape((4,4)))
# print("can_place_node", self.plc.can_place_node(0, 1))
print("***************************************************")
def test_proxy_congestion(self):
# Google's API
self.plc = plc_client.PlacementCost(self.NETLIST_PATH)
self.plc_os = plc_client_os.PlacementCost(self.NETLIST_PATH)
# set rpm
self.plc.set_routes_per_micron(10, 10)
self.plc_os.set_routes_per_micron(10, 10)
self.plc.set_routes_per_micron(self.RPMH, self.RPMV)
self.plc_os.set_routes_per_micron(self.RPMH, self.RPMV)
self.plc.set_macro_routing_allocation(10, 10)
self.plc_os.set_macro_routing_allocation(10, 10)
self.plc.set_macro_routing_allocation(self.MARH, self.MARV)
self.plc_os.set_macro_routing_allocation(self.MARH, self.MARV)
self.plc.set_congestion_smooth_range(0.0)
self.plc_os.set_congestion_smooth_range(0.0)
self.plc.set_congestion_smooth_range(2.0)
self.plc_os.set_congestion_smooth_range(2.0)
self.plc.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc.set_placement_grid(self.GRID_COL, self.GRID_ROW)
self.plc_os.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc_os.set_placement_grid(self.GRID_COL, self.GRID_ROW)
if self.PLC_PATH:
print("[PLC FILE FOUND] Loading info from .plc file")
self.plc_os.set_canvas_boundary_check(False)
self.plc_os.restore_placement(self.PLC_PATH,
ifInital=True,
ifValidate=True,
ifReadComment=False)
self.plc.set_canvas_boundary_check(False)
self.plc.restore_placement(self.PLC_PATH)
else:
print("[PLC FILE MISSING] Using only netlist info")
temp_gl_h = np.array(self.plc.get_horizontal_routing_congestion())
temp_os_h = np.array(self.plc_os.get_horizontal_routing_congestion())
temp_os_h = np.array(self.plc_os.get_horizontal_routing_congestion())
print(temp_gl_h.reshape(self.GRID_COL, self.GRID_ROW))
print(temp_os_h.reshape(self.GRID_COL, self.GRID_ROW))
......@@ -333,7 +408,7 @@ class PlacementCostTest():
temp_gl_v = np.array(self.plc.get_vertical_routing_congestion())
temp_os_v = np.array(self.plc_os.get_vertical_routing_congestion())
print(temp_gl_v.reshape(self.GRID_COL, self.GRID_ROW))
print(temp_os_v.reshape(self.GRID_COL, self.GRID_ROW))
......@@ -347,71 +422,385 @@ class PlacementCostTest():
else:
print("MATCHED!!! **********************")
###################################################################### EXTRACT ROUTING CONGESTION
# EXTRACT ROUTING CONGESTION
self.plc.set_macro_routing_allocation(0, 0)
self.plc_os.set_macro_routing_allocation(0, 0)
temp_gl_h_rt = np.array(self.plc.get_horizontal_routing_congestion())
temp_os_h_rt = np.array(self.plc_os.get_horizontal_routing_congestion())
temp_os_h_rt = np.array(
self.plc_os.get_horizontal_routing_congestion())
temp_gl_v_rt = np.array(self.plc.get_vertical_routing_congestion())
temp_os_v_rt = np.array(self.plc_os.get_vertical_routing_congestion())
temp_gl_h_mc = (temp_gl_h - temp_gl_h_rt).reshape(self.GRID_COL, self.GRID_ROW)
temp_os_h_mc = (temp_os_h - temp_os_h_rt).reshape(self.GRID_COL, self.GRID_ROW)
# H Macro congesiton = H Total congestion - H Routing congestion
temp_gl_h_mc = (temp_gl_h - temp_gl_h_rt)
temp_os_h_mc = (temp_os_h - temp_os_h_rt)
temp_gl_v_mc = (temp_gl_v - temp_gl_v_rt).reshape(self.GRID_COL, self.GRID_ROW)
temp_os_v_mc = (temp_os_v - temp_os_v_rt).reshape(self.GRID_COL, self.GRID_ROW)
# V Macro congesiton = V Total congestion - V Routing congestion
temp_gl_v_mc = (temp_gl_v - temp_gl_v_rt)
temp_os_v_mc = (temp_os_v - temp_os_v_rt)
# print("GL H MACRO Congestion", (temp_gl_h_mc).reshape(self.GRID_COL, self.GRID_ROW))
# print("OS H MACRO Congestion", (temp_os_h_mc).reshape(self.GRID_COL, self.GRID_ROW))
print("H MACRO Congestion DIFF", np.where(abs(temp_gl_h_mc - temp_os_h_mc) > 1e-5))
print("H MACRO Congestion DIFF", np.where(
abs(temp_gl_h_mc - temp_os_h_mc) > 1e-5))
# print("GL V MACRO Congestion", (temp_gl_v_mc).reshape(self.GRID_COL, self.GRID_ROW))
# print("OS V MACRO Congestion", (temp_os_v_mc).reshape(self.GRID_COL, self.GRID_ROW))
print("V MACRO Congestion DIFF", np.where(abs(temp_gl_v_mc - temp_os_v_mc) > 1e-5))
print("V MACRO Congestion DIFF", np.where(
abs(temp_gl_v_mc - temp_os_v_mc) > 1e-5))
print("H Routing Congestion DIFF", np.where(
abs(temp_gl_h_rt - temp_os_h_rt) > 1e-5))
print("V Routing Congestion DIFF", np.where(
abs(temp_gl_v_rt - temp_os_v_rt) > 1e-5))
####################################################################### BY ENTRY
# self.plc_os.display_canvas()
# BY ENTRY
print("**************BY ENTRY DIFF")
print(temp_gl_h_mc[0][6], temp_os_h_mc[0][6])
CELL_IDX = 0
r = (CELL_IDX // 35)
c = int(CELL_IDX % 35)
print(r,c)
print(temp_gl_h_rt[CELL_IDX], temp_os_h_rt[CELL_IDX])
print(temp_gl_v_rt[CELL_IDX], temp_os_v_rt[CELL_IDX])
def test_placement_util(self, keep_save_file=False):
"""
* Read same input, perturb placement and orientation, write to new .plc
"""
print(
"############################ TEST PLACEMENT UTIL ############################")
try:
assert self.PLC_PATH
except AssertionError:
print("[ERROR PLACEMENT UTIL TEST] Facilitate required .plc file")
self.plc_util = placement_util.create_placement_cost(
plc_client=plc_client,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH
)
self.plc_util_os = placement_util.create_placement_cost(
plc_client=plc_client_os,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH
)
# node_xy_coordinates
NODE_XY_DICT = {}
for i in placement_util.nodes_of_types(self.plc_util_os, ['MACRO', 'STDCELL', 'PORT']):
NODE_XY_DICT[i] = (randrange(int(self.plc_util.get_canvas_width_height()[0])),
randrange(int(self.plc_util.get_canvas_width_height()[1])))
# macro_orientation
MACRO_ORIENTATION = {}
for i in placement_util.nodes_of_types(self.plc_util_os, ['MACRO']):
MACRO_ORIENTATION[i] = "S"
# ********************** plc_client_os **********************
# placement_util.restore_node_xy_coordinates(self.plc_util_os, NODE_XY_DICT)
placement_util.restore_macro_orientations(
self.plc_util_os, MACRO_ORIENTATION)
# fix ports
placement_util.fix_port_coordinates(self.plc_util_os)
# write out new plc
placement_util.save_placement(
self.plc_util_os, "save_test_os.plc", 'this is a comment')
# ********************** plc_client **********************
# placement_util.restore_node_xy_coordinates(self.plc_util, NODE_XY_DICT)
placement_util.restore_macro_orientations(
self.plc_util, MACRO_ORIENTATION)
# fix ports
placement_util.fix_port_coordinates(self.plc_util)
# write out new plc
placement_util.save_placement(
self.plc_util, "save_test_gl.plc", 'this is a comment')
# This is only for node information, line-by-line test
try:
with open('save_test_gl.plc') as f1, open('save_test_os.plc') as f2:
for idx, (line1, line2) in enumerate(zip(f1, f2)):
if line1.strip() != line2.strip():
if not re.match(r"(# )\w+", line1.strip()):
print("PLC MISMATCH (GL, OS)\n",
line1.strip(), "\n", line2.strip())
raise AssertionError("false")
except AssertionError:
print("[ERROR PLACEMENT UTIL] Saved PLC Discrepency found at line {}".format(
str(idx)))
# if keep plc file for detailed comparison
if not keep_save_file:
os.remove('save_test_gl.plc')
os.remove('save_test_os.plc')
print(" +++++++++++++++++++++++++++++++++")
print(" +++ TEST PLACEMENT UTIL: PASS +++")
print(" +++++++++++++++++++++++++++++++++")
def test_observation_extractor(self):
"""
plc = placement_util.create_placement_cost(
netlist_file=netlist_file, init_placement='')
plc.set_canvas_size(300, 200)
plc.set_placement_grid(9, 4)
plc.unplace_all_nodes()
# Manually adds I/O port locations, this step is not needed for real
# netlists.
plc.update_node_coords('P0', 0.5, 100) # Left
plc.update_node_coords('P1', 150, 199.5) # Top
plc.update_port_sides()
plc.snap_ports_to_edges()
self.extractor = observation_extractor.ObservationExtractor(
plc=plc, observation_config=self._observation_config)
"""
print("############################ TEST OBSERVATION EXTRACTOR ############################")
try:
assert self.PLC_PATH
except AssertionError:
print("[ERROR OBSERVATION EXTRACTOR TEST] Facilitate required .plc file")
# Using the default edge/node
self._observation_config = observation_config.ObservationConfig(
max_num_edges=28400, max_num_nodes=5000, max_grid_size=128)
self.plc_util = placement_util.create_placement_cost(
plc_client=plc_client,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH
)
self.plc_util.unplace_all_nodes()
self.plc_util_os = placement_util.create_placement_cost(
plc_client=plc_client_os,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH
)
self.plc_util_os.unplace_all_nodes()
if self.PLC_PATH:
print("[PLC FILE FOUND] Loading info from .plc file")
self.plc_os.set_canvas_boundary_check(False)
self.plc_os.restore_placement(self.PLC_PATH,
ifInital=True,
ifValidate=True,
ifReadComment=False)
self.plc.set_canvas_boundary_check(False)
self.plc.restore_placement(self.PLC_PATH)
else:
print("[PLC FILE MISSING] Using only netlist info")
self.extractor = observation_extractor.ObservationExtractor(
plc=self.plc_util, observation_config=self._observation_config
)
self.extractor_os = observation_extractor.ObservationExtractor(
plc=self.plc_util_os, observation_config=self._observation_config
)
# Static features that are invariant across training steps
static_feature_gl = self.extractor._extract_static_features()
static_feature_os = self.extractor_os._extract_static_features()
for feature_gl, feature_os in zip(static_feature_gl, static_feature_os):
assert (static_feature_gl[feature_gl] ==
static_feature_os[feature_os]).all()
print(" ++++++++++++++++++++++++++++++++++++++++")
print(" +++ TEST OBSERVATION EXTRACTOR: PASS +++")
print(" ++++++++++++++++++++++++++++++++++++++++")
def test_place_node(self):
print("############################ TEST PLACE NODE ############################")
self.plc_util = placement_util.create_placement_cost(
plc_client=plc_client,
netlist_file=self.NETLIST_PATH,
init_placement=None
)
self.plc_util_os = placement_util.create_placement_cost(
plc_client=plc_client_os,
netlist_file=self.NETLIST_PATH,
init_placement=None
)
self.plc_util.set_routes_per_micron(self.RPMH, self.RPMV)
self.plc_util_os.set_routes_per_micron(self.RPMH, self.RPMV)
self.plc_util.set_macro_routing_allocation(self.MARH, self.MARV)
self.plc_util_os.set_macro_routing_allocation(self.MARH, self.MARV)
self.plc_util.set_congestion_smooth_range(self.SMOOTH)
self.plc_util_os.set_congestion_smooth_range(self.SMOOTH)
self.plc_util.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc_util.set_placement_grid(self.GRID_COL, self.GRID_ROW)
self.plc_util_os.set_canvas_size(self.CANVAS_WIDTH, self.CANVAS_HEIGHT)
self.plc_util_os.set_placement_grid(self.GRID_COL, self.GRID_ROW)
ordered_node_gl = placement_util.get_ordered_node_indices(
mode='descending_size_macro_first', plc=self.plc_util)
ordered_node_os = placement_util.get_ordered_node_indices(
mode='descending_size_macro_first', plc=self.plc_util_os)
assert (np.array(ordered_node_gl) == np.array(ordered_node_os)).all()
# Initialize Placement
self.plc_util_os.unplace_all_nodes()
self.plc_util.unplace_all_nodes()
NODE_TO_PLACE_IDX = 0
CELL_TO_PLACE_IDX = 6
print("MASK FOR PLACING FIRST NODE:")
self.plc_util_os.display_canvas(annotate=False)
print(np.flip(np.array(self.plc_util_os.get_node_mask(
NODE_TO_PLACE_IDX)).reshape(self.GRID_ROW, self.GRID_COL), axis=0))
print(np.flip(np.array(self.plc_util.get_node_mask(NODE_TO_PLACE_IDX)).reshape(
self.GRID_ROW, self.GRID_COL), axis=0))
self.plc_util_os.place_node(NODE_TO_PLACE_IDX, CELL_TO_PLACE_IDX)
self.plc_util.place_node(NODE_TO_PLACE_IDX, CELL_TO_PLACE_IDX)
# place node NODE_TO_PLACE_IDX @ position CELL_TO_PLACE_IDX
NODE_TO_PLACE_IDX = 1
CELL_TO_PLACE_IDX = 18
print("MASK FOR PLACING SECOND NODE:")
print(np.flip(np.array(self.plc_util_os.get_node_mask(
NODE_TO_PLACE_IDX)).reshape(self.GRID_ROW, self.GRID_COL), axis=0))
print(np.flip(np.array(self.plc_util.get_node_mask(NODE_TO_PLACE_IDX)).reshape(
self.GRID_ROW, self.GRID_COL), axis=0))
self.plc_util_os.place_node(NODE_TO_PLACE_IDX, CELL_TO_PLACE_IDX)
self.plc_util.place_node(NODE_TO_PLACE_IDX, CELL_TO_PLACE_IDX)
self.plc_util_os.display_canvas(annotate=False)
def test_environment(self):
print("############################ TEST ENVIRONMENT ############################")
# Source: https://github.com/google-research/circuit_training/blob/d5e454e5bcd153a95d320f664af0d1b378aace7b/circuit_training/environment/environment_test.py#L39
def random_action(mask):
valid_actions, = np.nonzero(mask.flatten())
if len(valid_actions): # pylint: disable=g-explicit-length-test
return np.random.choice(valid_actions)
# If there is no valid choice, then `[0]` is returned which results in an
# infeasable action ending the episode.
return 0
env = environment.CircuitEnv(
_plc=plc_client,
create_placement_cost_fn=placement_util.create_placement_cost,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH)
self.plc_util = placement_util.create_placement_cost(
plc_client=plc_client,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH
)
# print(np.array2string(env._current_mask.reshape(128, 128)), sep=']')
env_os = environment.CircuitEnv(
_plc=plc_client_os,
create_placement_cost_fn=placement_util.create_placement_cost,
netlist_file=self.NETLIST_PATH,
init_placement=self.PLC_PATH)
# print(np.array(env_os._plc.get_node_mask(13333)).reshape(33,35))
# print(np.array(env._plc.get_node_mask(13333)).reshape(33,35))
assert (env_os._get_mask() == env._get_mask()).all()
# TODO DISCREPENCY FOUND
obs_gl = env._get_obs()
obs_os = env_os._get_obs()
env_os.reset()
env.reset()
for feature_gl, feature_os in zip(obs_gl, obs_os):
if not (obs_gl[feature_gl] == obs_os[feature_os]).all():
print(feature_gl, feature_os)
print(np.where(obs_gl[feature_gl] != obs_os[feature_os]))
print(" ++++++++++++++++++++++++++++++")
print(" +++ TEST ENVIRONMENT: PASS +++")
print(" ++++++++++++++++++++++++++++++")
def parse_flags(argv):
parser = argparse_flags.ArgumentParser(description='An argparse + app.run example')
parser = argparse_flags.ArgumentParser(
description='An argparse + app.run example')
parser.add_argument("--netlist", required=True,
help="Path to netlist in pb.txt")
help="Path to netlist in pb.txt")
parser.add_argument("--plc", required=False,
help="Path to plc in .plc")
help="Path to plc in .plc")
parser.add_argument("--width", type=float, required=True,
help="Canvas width")
help="Canvas width")
parser.add_argument("--height", type=float, required=True,
help="Canvas height")
help="Canvas height")
parser.add_argument("--col", type=int, required=True,
help="Grid column")
help="Grid column")
parser.add_argument("--row", type=int, required=True,
help="Grid row")
help="Grid row")
parser.add_argument("--rpmh", type=float, default=10, required=False,
help="Grid row")
help="Grid row")
parser.add_argument("--rpmv", type=float, default=10, required=False,
help="Grid row")
help="Grid row")
parser.add_argument("--marh", type=float, default=10, required=False,
help="Grid row")
help="Grid row")
parser.add_argument("--marv", type=float, default=10, required=False,
help="Grid row")
help="Grid row")
parser.add_argument("--smooth", type=float, default=1, required=False,
help="Grid row")
help="Grid row")
return parser.parse_args(argv[1:])
def main(args):
if args.plc:
PCT = PlacementCostTest(args.netlist, args.plc, args.width, args.height,
args.col, args.row, args.rpmv, args.rpmv,
args.marh, args.marv, args.smooth)
PCT = PlacementCostTest(NETLIST_PATH=args.netlist,
PLC_PATH=args.plc,
width=args.width,
height=args.height,
column=args.col,
row=args.row,
rpmv=args.rpmv,
rpmh=args.rpmh,
marh=args.marh,
marv=args.marv,
smooth=args.smooth)
else:
PCT = PlacementCostTest(args.netlist, args.width, args.height,
args.col, args.row, args.rpmv, args.rpmv,
args.marh, args.marv, args.smooth)
PCT.test_metadata()
PCT = PlacementCostTest(NETLIST_PATH=args.netlist,
width=args.width,
height=args.height,
column=args.col,
row=args.row,
rpmv=args.rpmv,
rpmh=args.rpmh,
marh=args.marh,
marv=args.marv,
smooth=args.smooth)
# PCT.test_metadata()
PCT.test_proxy_cost()
# PCT.test_placement_util(keep_save_file=False)
# PCT.test_place_node()
# PCT.test_miscellaneous()
# PCT.test_observation_extractor()
# PCT.view_canvas()
# PCT.test_proxy_congestion()
# PCT.test_environment()
if __name__ == '__main__':
app.run(main, flags_parser=parse_flags)
\ No newline at end of file
app.run(main, flags_parser=parse_flags)
......@@ -23,13 +23,13 @@ node {
attr {
key: "x"
value {
f: 399
f: 300
}
}
attr {
key: "y"
value {
f: 399
f: 300
}
}
attr {
......@@ -44,7 +44,7 @@ node {
attr {
key: "height"
value {
f: 50
f: 200
}
}
attr {
......@@ -74,7 +74,7 @@ node {
attr {
key: "width"
value {
f: 50
f: 200
}
}
}
......@@ -96,31 +96,31 @@ node {
attr {
key: "x_offset"
value {
f: -25
f: -75
}
}
attr {
key: "y_offset"
value {
f: -25
f: -75
}
}
attr {
key: "x"
value {
f: 374
f: 0
}
}
attr {
key: "y"
value {
f: 374
f: 0
}
}
attr {
key: "weight"
value {
f: 1000
f: 1
}
}
}
......@@ -141,31 +141,31 @@ node {
attr {
key: "x_offset"
value {
f: -25
f: -75
}
}
attr {
key: "y_offset"
value {
f: 25
f: 75
}
}
attr {
key: "x"
value {
f: 374
f: 0
}
}
attr {
key: "y"
value {
f: 424
f: 0
}
}
attr {
key: "weight"
value {
f: 1000
f: 1
}
}
}
......@@ -186,25 +186,25 @@ node {
attr {
key: "x_offset"
value {
f: 25
f: 75
}
}
attr {
key: "y_offset"
value {
f: -25
f: -75
}
}
attr {
key: "x"
value {
f: 75
f: 0
}
}
attr {
key: "y"
value {
f: 25
f: 0
}
}
}
......@@ -225,25 +225,25 @@ node {
attr {
key: "x_offset"
value {
f: 25
f: 75
}
}
attr {
key: "y_offset"
value {
f: 25
f: 75
}
}
attr {
key: "x"
value {
f: 75
f: 0
}
}
attr {
key: "y"
value {
f: 75
f: 0
}
}
}
\ No newline at end of file
# Placement file for Circuit Training
# Source input file(s) : ./output_ariane_NanGate45/22cols_30rows/g500_ub5_nruns10_c5_r3_v3_rc1/netlist.pb.txt
# This file : ./output_ariane_NanGate45/22cols_30rows/g500_ub5_nruns10_c5_r3_v3_rc1/initial.plc
# Original initial placement :
# Date : 2022-08-26 08:27:04
# Columns : 22 Rows : 30
# Width : 1357.360 Height : 1356.880
# Area (stdcell+macros) : 1448306.937872215
# Wirelength : 3264054.817
# Wirelength cost : 0.0540
# Congestion cost : 0.8487
# Density cost : 0.6405
# Fake net cost : 0.0000
# 90% Congestion metric: (0.01363636363636364, 0.0006827604141139862)
# Project : unset_project
# Block : unset_block
# Routes per micron, hor : 11.285 ver : 12.605
# Routes used by macros, hor : 7.143 ver : 8.339
# Smoothing factor : 0
# Use incremental cost : False
#
# To view this file (most options are default):
# viewer_binary --netlist_file ./output_ariane_NanGate45/22cols_30rows/g500_ub5_nruns10_c5_r3_v3_rc1/netlist.pb.txt --canvas_width 1357.36 --canvas_height 1356.88 --grid_cols 22 --grid_rows=30 --init_placement ./output_ariane_NanGate45/22cols_30rows/g500_ub5_nruns10_c5_r3_v3_rc1/initial.plc --project unset_project --block_name unset_block --congestion_smooth_range 0 --overlap_threshold 0 --noboundary_check
# or you can simply run:
# viewer_binary --init_placement ./output_ariane_NanGate45/22cols_30rows/g500_ub5_nruns10_c5_r3_v3_rc1/initial.plc
#
#
#
# Source input file(s) : environment/test_data/ariane/ariane.txt
# This file : environment/test_data/ariane/init.plc
# Date : 2022-03-13 09:30:00
# Columns : 50 Rows : 50
# Width : 1599.99 Height : 1598.8
# Area : 1244102.4819999968
# Wirelength : 0.0
# Wirelength cost : 0.0
# Congestion cost : 0.0
# Block : ariane
# Routes per micron, hor : 70.33 ver : 74.51
# Routes used by macros, hor : 51.79 ver : 51.79
# Smoothing factor : 2
# Overlap threshold : 0.004
#
#
#
# Counts of node types:
# HARD_MACROs : 133
# HARD_MACRO_PINs : 7847
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment