Commit bdc03b92 by lvzhengyang

implement the gen_data, train, and mine_properties executable by gemini 3.1 pro

parent 3483b343
# Property Embedding & Mining for Dynamic RTL
This repository leverages the concept of **Property Embeddings** to perform representation learning on dynamic circuit behavior. It maps both RTL variables and logical operators into a continuous fuzzy logic space. This allows the model to deeply understand the boolean satisfiability of a circuit geometrically, which can be utilized to generate high-quality SystemVerilog Assertions (SVAs) completely zero-shot on unseen designs.
## Methodology: Property Alignment
Instead of heuristically predicting branch-hit rates or toggles, this model uses a **Property Alignment** training objective.
Variables are projected to $V \in [0, 1]^d$. Logical operators are replaced with differentiable continuous operators (t-norms):
- $V_{\sim A} = 1 - V_A$
- $V_{A \land B} = V_A \circ V_B$
- $V_{A \implies B} = 1 - V_A + V_A \circ V_B$
The network is trained using three formal objectives:
1. **Trace Alignment Loss:** The length of a property embedding matches its True probability in simulation.
2. **Implication Loss:** If $A \implies B$, $V_A$ must be geometrically covered by $V_B$.
3. **Conflict Loss:** If $A$ and $B$ are mutually exclusive, their intersection must be empty.
## Environment Setup
Requirements:
- Python 3.11+
- PyTorch
- PyTorch Geometric
- `tomli` (for parsing config files)
- (Optional but recommended) Yosys for dataset building
## Pipeline Usage
The repository is structured around 3 executable entry points.
### Step 1: Data Generation
Before training, you must parse your RTL into Control-Data Flow Graphs (CDFG) and run logic simulations to extract dynamic traces.
Define your design in `cases/<case_name>/config.toml` (specify clock, reset, and RTL sources).
Then run:
```bash
python gen_data.py --case <case_name>
```
This generates `graphs.npz` and `labels.npz` under `results/<case_name>/dataset/`.
### Step 2: Training the Property Embeddings
Train the Graph Neural Network (GNN) and the PropertyEncoder using the strict formal logic objectives.
```bash
python train.py --case <case_name> --num_epochs 20 --w_imp 1.0 --w_conf 1.0 --w_trace 1.0
```
This outputs a checkpoint (e.g. `property_encoder_last.pth`) to `results/<case_name>/property/stage1/`.
### Step 3: Zero-Shot Property Mining
Once trained, the model can instantly mine properties for new, unseen designs without requiring slow logic simulations. It geometrically analyzes the learned embeddings to discover Equivalent, Conflicting, and Implied signals.
```bash
python mine_properties.py \
--case <case_name> \
--ckpt results/<case_name>/property/property/property_encoder_last.pth \
--threshold 0.95 \
--top_k 100
```
*(Note: Replace `--ckpt` path with your actual checkpoint path).*
**Features of `mine_properties.py`:**
- Automatically wraps generated SystemVerilog Assertions in the correct `clock` and `reset` blocks as defined in the case's `config.toml`.
- `--threshold`: Only outputs properties with a confidence score higher than this value (0.0 to 1.0).
- `--top_k`: Limits the output to the highest-confidence properties (set to 0 for unlimited).
- *Coming Soon:* Composite property mining (`--composite`).
Output is saved by default to `results/<case_name>/mined_properties.sv`.
## File Structure
- `gen_data.py`: Entry point for graph extraction and trace generation.
- `train.py`: Entry point for Property Alignment training.
- `mine_properties.py`: Entry point for SVA generation.
- `property_encoder.py`: Defines the differentiable fuzzy logic operators.
- `property_trainer.py`: Defines the Implication, Conflict, and Trace alignment losses.
- `property_expr.py`: AST definitions for dynamic logic expressions.
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
import torch
from torch_geometric.loader import DataLoader
from property_expr import generate_properties
from property_encoder import PropertyEncoder
from property_trainer import PropertyTrainer, PropertyLossConfig, PropertyCache
REPO_ROOT = Path(__file__).resolve().parent
MODEL_EXAMPLE_SRC = REPO_ROOT / "model_example" / "src"
def _select_device(device_arg: str) -> torch.device:
if device_arg.startswith("cuda") and torch.cuda.is_available():
return torch.device(device_arg)
return torch.device("cpu")
def _load_meta(data_dir: Path) -> dict:
meta_path = data_dir / "meta.json"
if not meta_path.exists():
raise FileNotFoundError(f"meta.json not found in {data_dir}")
return json.loads(meta_path.read_text())
def _get_var_indices(meta: dict, signal_ops: list[str]) -> list[int]:
node_names = meta.get("node_names", [])
var_indices = []
for idx, name in enumerate(node_names):
op = name.split(",")[0] if name else ""
if op in signal_ops:
var_indices.append(idx)
return var_indices
def main() -> int:
parser = argparse.ArgumentParser(description="Property-alignment evaluation")
parser.add_argument("--case", required=True, help="Case name under cases/")
parser.add_argument("--data_dir", default="", help="Dataset dir (default: results/<case>/dataset)")
parser.add_argument("--graph_npz_name", default="graphs.npz", type=str)
parser.add_argument("--label_npz_name", default="labels.npz", type=str)
parser.add_argument("--checkpoint", required=True, help="Property encoder checkpoint path")
parser.add_argument("--device", default="cuda", type=str)
parser.add_argument("--num_rounds", default=20, type=int)
parser.add_argument("--model", default="default_shared", type=str)
parser.add_argument("--eval_seq_len", default=50, type=int)
parser.add_argument("--batch_size", default=1, type=int)
parser.add_argument("--max_props", default=64, type=int)
parser.add_argument("--max_pairs", default=512, type=int)
parser.add_argument("--signal_ops", default="Input,Output,Wire,Reg", type=str)
parser.add_argument("--include_ops", default="VAR,NOT,AND,OR,XOR,IMPLIES,EQUIV", type=str)
parser.add_argument("--cache_dir", default="", type=str)
parser.add_argument("--no_cache", action="store_true")
args = parser.parse_args()
data_dir = Path(args.data_dir) if args.data_dir else (REPO_ROOT / "results" / args.case / "dataset")
graph_npz = data_dir / args.graph_npz_name
label_npz = data_dir / args.label_npz_name
os.chdir(REPO_ROOT)
sys.path.insert(0, str(MODEL_EXAMPLE_SRC))
from npz_parser import NpzParser # noqa: E402
from model_arch import Model_default, Model_shared # noqa: E402
model_factory = {
"default_shared": Model_shared,
"default": Model_default,
}
if args.model not in model_factory:
raise ValueError(f"Model not supported: {args.model}")
device = _select_device(args.device)
meta = _load_meta(data_dir)
signal_ops = [s.strip() for s in args.signal_ops.split(",") if s.strip()]
var_indices = _get_var_indices(meta, signal_ops)
include_ops = tuple(s.strip() for s in args.include_ops.split(",") if s.strip())
props = generate_properties(var_indices, max_props=args.max_props, include_ops=include_ops)
dataset = NpzParser(str(data_dir), str(graph_npz), str(label_npz))
_, val_dataset = dataset.get_dataset(split_with_design=True)
if args.batch_size != 1:
raise ValueError("Property evaluation assumes batch_size=1 for correct indexing.")
val_loader = DataLoader(val_dataset, batch_size=args.batch_size, shuffle=False, drop_last=False)
node_encoder = model_factory[args.model](num_rounds=args.num_rounds)
prop_encoder = PropertyEncoder(node_dim=node_encoder.dim_hidden)
checkpoint = torch.load(args.checkpoint, map_location="cpu")
prop_encoder.load_state_dict(checkpoint["state_dict"])
trainer = PropertyTrainer(node_encoder=node_encoder, prop_encoder=prop_encoder, device=device, freeze_node_encoder=True)
loss_cfg = PropertyLossConfig()
cache = None
if not args.no_cache:
cache_dir = Path(args.cache_dir) if args.cache_dir else (REPO_ROOT / "results" / args.case / "property" / "eval_cache")
cache = PropertyCache(cache_dir)
val_losses = []
for batch in val_loader:
batch = batch.to(device)
losses = trainer.eval_step(batch, props, loss_cfg, args.eval_seq_len, args.max_pairs, cache=cache)
val_losses.append(losses)
def avg(key, items):
return sum(x[key] for x in items) / max(1, len(items))
log = {
"val_total": avg("total", val_losses),
"val_dir": avg("loss_dir", val_losses),
"val_len": avg("loss_len", val_losses),
"val_conf": avg("loss_conf", val_losses),
}
print(json.dumps(log))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
import torch
from torch_geometric.loader import DataLoader
from property_expr import generate_properties
from property_encoder import PropertyEncoder
from property_trainer import PropertyTrainer, PropertyLossConfig, PropertyCache
REPO_ROOT = Path(__file__).resolve().parent
MODEL_EXAMPLE_SRC = REPO_ROOT / "model_example" / "src"
def _select_device(device_arg: str) -> torch.device:
if device_arg.startswith("cuda") and torch.cuda.is_available():
return torch.device(device_arg)
return torch.device("cpu")
def _load_meta(data_dir: Path) -> dict:
meta_path = data_dir / "meta.json"
if not meta_path.exists():
raise FileNotFoundError(f"meta.json not found in {data_dir}")
return json.loads(meta_path.read_text())
def _get_var_indices(meta: dict, signal_ops: list[str]) -> list[int]:
node_names = meta.get("node_names", [])
var_indices = []
for idx, name in enumerate(node_names):
op = name.split(",")[0] if name else ""
if op in signal_ops:
var_indices.append(idx)
return var_indices
def main() -> int:
parser = argparse.ArgumentParser(description="Property-alignment test")
parser.add_argument("--case", required=True, help="Case name under cases/")
parser.add_argument("--data_dir", default="", help="Dataset dir (default: results/<case>/dataset)")
parser.add_argument("--graph_npz_name", default="graphs.npz", type=str)
parser.add_argument("--label_npz_name", default="labels.npz", type=str)
parser.add_argument("--checkpoint", required=True, help="Property encoder checkpoint path")
parser.add_argument("--device", default="cuda", type=str)
parser.add_argument("--num_rounds", default=20, type=int)
parser.add_argument("--model", default="default_shared", type=str)
parser.add_argument("--eval_seq_len", default=50, type=int)
parser.add_argument("--batch_size", default=1, type=int)
parser.add_argument("--max_props", default=64, type=int)
parser.add_argument("--max_pairs", default=512, type=int)
parser.add_argument("--signal_ops", default="Input,Output,Wire,Reg", type=str)
parser.add_argument("--include_ops", default="VAR,NOT,AND,OR,XOR,IMPLIES,EQUIV", type=str)
parser.add_argument("--cache_dir", default="", type=str)
parser.add_argument("--no_cache", action="store_true")
args = parser.parse_args()
data_dir = Path(args.data_dir) if args.data_dir else (REPO_ROOT / "results" / args.case / "dataset")
graph_npz = data_dir / args.graph_npz_name
label_npz = data_dir / args.label_npz_name
os.chdir(REPO_ROOT)
sys.path.insert(0, str(MODEL_EXAMPLE_SRC))
from npz_parser import NpzParser # noqa: E402
from model_arch import Model_default, Model_shared # noqa: E402
model_factory = {
"default_shared": Model_shared,
"default": Model_default,
}
if args.model not in model_factory:
raise ValueError(f"Model not supported: {args.model}")
device = _select_device(args.device)
meta = _load_meta(data_dir)
signal_ops = [s.strip() for s in args.signal_ops.split(",") if s.strip()]
var_indices = _get_var_indices(meta, signal_ops)
include_ops = tuple(s.strip() for s in args.include_ops.split(",") if s.strip())
props = generate_properties(var_indices, max_props=args.max_props, include_ops=include_ops)
dataset = NpzParser(str(data_dir), str(graph_npz), str(label_npz))
_, test_dataset = dataset.get_dataset(split_with_design=True)
if args.batch_size != 1:
raise ValueError("Property testing assumes batch_size=1 for correct indexing.")
test_loader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False, drop_last=False)
node_encoder = model_factory[args.model](num_rounds=args.num_rounds)
prop_encoder = PropertyEncoder(node_dim=node_encoder.dim_hidden)
checkpoint = torch.load(args.checkpoint, map_location="cpu")
prop_encoder.load_state_dict(checkpoint["state_dict"])
trainer = PropertyTrainer(node_encoder=node_encoder, prop_encoder=prop_encoder, device=device, freeze_node_encoder=True)
loss_cfg = PropertyLossConfig()
cache = None
if not args.no_cache:
cache_dir = Path(args.cache_dir) if args.cache_dir else (REPO_ROOT / "results" / args.case / "property" / "test_cache")
cache = PropertyCache(cache_dir)
test_losses = []
for batch in test_loader:
batch = batch.to(device)
losses = trainer.eval_step(batch, props, loss_cfg, args.eval_seq_len, args.max_pairs, cache=cache)
test_losses.append(losses)
def avg(key, items):
return sum(x[key] for x in items) / max(1, len(items))
log = {
"test_total": avg("total", test_losses),
"test_dir": avg("loss_dir", test_losses),
"test_len": avg("loss_len", test_losses),
"test_conf": avg("loss_conf", test_losses),
}
print(json.dumps(log))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import sys
import time
from pathlib import Path
import torch
from torch_geometric.loader import DataLoader
from property_expr import generate_properties
from property_encoder import PropertyEncoder
from property_trainer import PropertyTrainer, PropertyLossConfig, PropertyCache
REPO_ROOT = Path(__file__).resolve().parent
MODEL_EXAMPLE_SRC = REPO_ROOT / "model_example" / "src"
def _select_device(device_arg: str) -> torch.device:
if device_arg.startswith("cuda") and torch.cuda.is_available():
return torch.device(device_arg)
return torch.device("cpu")
def _load_meta(data_dir: Path) -> dict:
meta_path = data_dir / "meta.json"
if not meta_path.exists():
raise FileNotFoundError(f"meta.json not found in {data_dir}")
return json.loads(meta_path.read_text())
def _get_var_indices(meta: dict, signal_ops: list[str]) -> list[int]:
node_names = meta.get("node_names", [])
var_indices = []
for idx, name in enumerate(node_names):
op = name.split(",")[0] if name else ""
if op in signal_ops:
var_indices.append(idx)
return var_indices
def main() -> int:
parser = argparse.ArgumentParser(description="Property-alignment training")
parser.add_argument("--case", required=True, help="Case name under cases/")
parser.add_argument(
"--data_dir", default="", help="Dataset dir (default: results/<case>/dataset)"
)
parser.add_argument("--graph_npz_name", default="graphs.npz", type=str)
parser.add_argument("--label_npz_name", default="labels.npz", type=str)
parser.add_argument("--num_epochs", default=20, type=int)
parser.add_argument("--lr", default=1e-4, type=float)
parser.add_argument("--device", default="cuda", type=str)
parser.add_argument("--num_rounds", default=20, type=int)
parser.add_argument("--model", default="default_shared", type=str)
parser.add_argument("--train_seq_len", default=50, type=int)
parser.add_argument("--eval_seq_len", default=50, type=int)
parser.add_argument("--batch_size", default=1, type=int)
parser.add_argument("--max_props", default=64, type=int)
parser.add_argument("--max_pairs", default=512, type=int)
parser.add_argument("--freeze_node_encoder", action="store_true")
parser.add_argument("--signal_ops", default="Input,Output,Wire,Reg", type=str)
parser.add_argument(
"--include_ops", default="VAR,NOT,AND,OR,XOR,IMPLIES,EQUIV", type=str
)
parser.add_argument("--cache_dir", default="", type=str)
parser.add_argument("--no_cache", action="store_true")
parser.add_argument("--exp_id", default="property", type=str)
parser.add_argument("--w_imp", default=1.0, type=float)
parser.add_argument("--w_conf", default=1.0, type=float)
parser.add_argument("--w_trace", default=1.0, type=float)
args = parser.parse_args()
data_dir = (
Path(args.data_dir)
if args.data_dir
else (REPO_ROOT / "results" / args.case / "dataset")
)
graph_npz = data_dir / args.graph_npz_name
label_npz = data_dir / args.label_npz_name
os.chdir(REPO_ROOT)
sys.path.insert(0, str(MODEL_EXAMPLE_SRC))
from npz_parser import NpzParser # noqa: E402
from model_arch import Model_default, Model_shared # noqa: E402
model_factory = {
"default_shared": Model_shared,
"default": Model_default,
}
if args.model not in model_factory:
raise ValueError(f"Model not supported: {args.model}")
device = _select_device(args.device)
meta = _load_meta(data_dir)
signal_ops = [s.strip() for s in args.signal_ops.split(",") if s.strip()]
var_indices = _get_var_indices(meta, signal_ops)
include_ops = tuple(s.strip() for s in args.include_ops.split(",") if s.strip())
props = generate_properties(
var_indices, max_props=args.max_props, include_ops=include_ops
)
dataset = NpzParser(str(data_dir), str(graph_npz), str(label_npz))
train_dataset, val_dataset = dataset.get_dataset(split_with_design=True)
if args.batch_size != 1:
raise ValueError("Property training assumes batch_size=1 for correct indexing.")
train_loader = DataLoader(
train_dataset, batch_size=args.batch_size, shuffle=True, drop_last=True
)
val_loader = DataLoader(
val_dataset, batch_size=args.batch_size, shuffle=False, drop_last=False
)
node_encoder = model_factory[args.model](num_rounds=args.num_rounds)
prop_encoder = PropertyEncoder(node_dim=node_encoder.dim_hidden)
trainer = PropertyTrainer(
node_encoder=node_encoder,
prop_encoder=prop_encoder,
device=device,
lr=args.lr,
freeze_node_encoder=args.freeze_node_encoder,
)
loss_cfg = PropertyLossConfig(
weight_imp=args.w_imp,
weight_trace=args.w_trace,
weight_conf=args.w_conf,
)
exp_dir = REPO_ROOT / "results" / args.case / "property" / args.exp_id
exp_dir.mkdir(parents=True, exist_ok=True)
cache = None
if not args.no_cache:
cache_dir = Path(args.cache_dir) if args.cache_dir else (exp_dir / "cache")
cache = PropertyCache(cache_dir)
for epoch in range(args.num_epochs):
train_losses = []
for batch in train_loader:
batch = batch.to(device)
losses = trainer.train_step(
batch, props, loss_cfg, args.train_seq_len, args.max_pairs, cache=cache
)
train_losses.append(losses)
val_losses = []
for batch in val_loader:
batch = batch.to(device)
losses = trainer.eval_step(
batch, props, loss_cfg, args.eval_seq_len, args.max_pairs, cache=cache
)
val_losses.append(losses)
def avg(key, items):
return sum(x[key] for x in items) / max(1, len(items))
log = {
"epoch": epoch,
"train_total": avg("total", train_losses),
"val_total": avg("total", val_losses),
"train_imp": avg("loss_imp", train_losses),
"train_trace": avg("loss_trace", train_losses),
"train_conf": avg("loss_conf", train_losses),
"val_imp": avg("loss_imp", val_losses),
"val_trace": avg("loss_trace", val_losses),
"val_conf": avg("loss_conf", val_losses),
}
print(json.dumps(log))
ckpt_path = exp_dir / "property_encoder_last.pth"
torch.save({"state_dict": trainer.prop_encoder.state_dict()}, ckpt_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment