Commit 4119eb3d by lvzhengyang

setup env and the fifo case

parents
.venv/
.venv_pkgs/
__pycache__/
# Case: eth_fifo
## Launch Simulation
`python3 tool_scripts/launch_sim.py`
\ No newline at end of file
//////////////////////////////////////////////////////////////////////
//// ////
//// eth_fifo.v ////
//// ////
//// This file is part of the Ethernet IP core project ////
//// http://www.opencores.org/project,ethmac ////
//// ////
//// Author(s): ////
//// - Igor Mohor (igorM@opencores.org) ////
//// ////
//// All additional information is avaliable in the Readme.txt ////
//// file. ////
//// ////
//////////////////////////////////////////////////////////////////////
//// ////
//// Copyright (C) 2001 Authors ////
//// ////
//// This source file may be used and distributed without ////
//// restriction provided that this copyright statement is not ////
//// removed from the file and that any derivative work contains ////
//// the original copyright notice and the associated disclaimer. ////
//// ////
//// This source file is free software; you can redistribute it ////
//// and/or modify it under the terms of the GNU Lesser General ////
//// Public License as published by the Free Software Foundation; ////
//// either version 2.1 of the License, or (at your option) any ////
//// later version. ////
//// ////
//// This source is distributed in the hope that it will be ////
//// useful, but WITHOUT ANY WARRANTY; without even the implied ////
//// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ////
//// PURPOSE. See the GNU Lesser General Public License for more ////
//// details. ////
//// ////
//// You should have received a copy of the GNU Lesser General ////
//// Public License along with this source; if not, download it ////
//// from http://www.opencores.org/lgpl.shtml ////
//// ////
//////////////////////////////////////////////////////////////////////
//
// CVS Revision History
//
// $Log: not supported by cvs2svn $
// Revision 1.3 2002/04/22 13:45:52 mohor
// Generic ram or Xilinx ram can be used in fifo (selectable by setting
// ETH_FIFO_XILINX in eth_defines.v).
//
// Revision 1.2 2002/03/25 13:33:04 mohor
// When clear and read/write are active at the same time, cnt and pointers are
// set to 1.
//
// Revision 1.1 2002/02/05 16:44:39 mohor
// Both rx and tx part are finished. Tested with wb_clk_i between 10 and 200
// MHz. Statuses, overrun, control frame transmission and reception still need
// to be fixed.
//
//
`include "ethmac_defines.v"
`include "timescale.v"
module eth_fifo (data_in, data_out, clk, reset, write, read, clear,
almost_full, full, almost_empty, empty, cnt);
parameter DATA_WIDTH = 32;
parameter DEPTH = 8;
parameter CNT_WIDTH = 4;
input clk;
input reset;
input write;
input read;
input clear;
input [DATA_WIDTH-1:0] data_in;
output [DATA_WIDTH-1:0] data_out;
output almost_full;
output full;
output almost_empty;
output empty;
output [CNT_WIDTH-1:0] cnt;
`ifdef ETH_FIFO_XILINX
`else
`ifdef ETH_ALTERA_ALTSYNCRAM
`else
reg [DATA_WIDTH-1:0] fifo [0:DEPTH-1];
reg [DATA_WIDTH-1:0] data_out;
`endif
`endif
reg [CNT_WIDTH-1:0] cnt;
reg [CNT_WIDTH-2:0] read_pointer;
reg [CNT_WIDTH-2:0] write_pointer;
always @ (posedge clk or posedge reset)
begin
if(reset)
cnt <= 0;
else
if(clear)
cnt <= { {(CNT_WIDTH-1){1'b0}}, read^write};
else
if(read ^ write)
if(read)
cnt <= cnt - 1;
else
cnt <= cnt + 1;
end
always @ (posedge clk or posedge reset)
begin
if(reset)
read_pointer <= 0;
else
if(clear)
read_pointer <= { {(CNT_WIDTH-2){1'b0}}, read};
else
if(read & ~empty)
read_pointer <= read_pointer + 1'b1;
end
always @ (posedge clk or posedge reset)
begin
if(reset)
write_pointer <= 0;
else
if(clear)
write_pointer <= { {(CNT_WIDTH-2){1'b0}}, write};
else
if(write & ~full)
write_pointer <= write_pointer + 1'b1;
end
assign empty = ~(|cnt);
assign almost_empty = cnt == 1;
assign full = cnt == DEPTH;
assign almost_full = &cnt[CNT_WIDTH-2:0];
`ifdef ETH_FIFO_XILINX
xilinx_dist_ram_16x32 fifo
( .data_out(data_out),
.we(write & ~full),
.data_in(data_in),
.read_address( clear ? {CNT_WIDTH-1{1'b0}} : read_pointer),
.write_address(clear ? {CNT_WIDTH-1{1'b0}} : write_pointer),
.wclk(clk)
);
`else // !ETH_FIFO_XILINX
`ifdef ETH_ALTERA_ALTSYNCRAM
altera_dpram_16x32 altera_dpram_16x32_inst
(
.data (data_in),
.wren (write & ~full),
.wraddress (clear ? {CNT_WIDTH-1{1'b0}} : write_pointer),
.rdaddress (clear ? {CNT_WIDTH-1{1'b0}} : read_pointer ),
.clock (clk),
.q (data_out)
); //exemplar attribute altera_dpram_16x32_inst NOOPT TRUE
`else // !ETH_ALTERA_ALTSYNCRAM
always @ (posedge clk)
begin
if(write & clear)
fifo[0] <= data_in;
else
if(write & ~full)
fifo[write_pointer] <= data_in;
end
always @ (posedge clk)
begin
if(clear)
data_out <= fifo[0];
else
data_out <= fifo[read_pointer];
end
`endif // !ETH_ALTERA_ALTSYNCRAM
`endif // !ETH_FIFO_XILINX
endmodule
//////////////////////////////////////////////////////////////////////
//// ////
//// timescale.v ////
//// ////
//// This file is part of the Ethernet IP core project ////
//// http://www.opencores.org/project,ethmac ////
//// ////
//// Author(s): ////
//// - Igor Mohor (igorM@opencores.org) ////
//// ////
//// All additional information is avaliable in the Readme.txt ////
//// file. ////
//// ////
//////////////////////////////////////////////////////////////////////
//// ////
//// Copyright (C) 2001 Authors ////
//// ////
//// This source file may be used and distributed without ////
//// restriction provided that this copyright statement is not ////
//// removed from the file and that any derivative work contains ////
//// the original copyright notice and the associated disclaimer. ////
//// ////
//// This source file is free software; you can redistribute it ////
//// and/or modify it under the terms of the GNU Lesser General ////
//// Public License as published by the Free Software Foundation; ////
//// either version 2.1 of the License, or (at your option) any ////
//// later version. ////
//// ////
//// This source is distributed in the hope that it will be ////
//// useful, but WITHOUT ANY WARRANTY; without even the implied ////
//// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR ////
//// PURPOSE. See the GNU Lesser General Public License for more ////
//// details. ////
//// ////
//// You should have received a copy of the GNU Lesser General ////
//// Public License along with this source; if not, download it ////
//// from http://www.opencores.org/lgpl.shtml ////
//// ////
//////////////////////////////////////////////////////////////////////
//
// CVS Revision History
//
// $Log: not supported by cvs2svn $
// Revision 1.2 2001/10/19 11:36:31 mohor
// Log file added.
//
//
//
`timescale 1ns / 1ns
+incdir+{{ CASE_DIR }}/testcases/{{ TEST_CASE }}
{{ CASE_DIR }}/testcases/{{ TEST_CASE }}/tb.sv
\ No newline at end of file
`timescale 1ns / 1ps
module tb_eth_fifo;
// --------------------------------------------------------------------
// 1. Parameters & Signals
// --------------------------------------------------------------------
parameter DATA_WIDTH = 32;
parameter DEPTH = 8;
parameter CNT_WIDTH = 4;
logic clk;
logic reset;
logic write;
logic read;
logic clear;
logic [DATA_WIDTH-1:0] data_in;
wire [DATA_WIDTH-1:0] data_out;
wire almost_full;
wire full;
wire almost_empty;
wire empty;
wire [CNT_WIDTH-1:0] cnt;
// --------------------------------------------------------------------
// 2. DUT Instantiation
// --------------------------------------------------------------------
eth_fifo #(
.DATA_WIDTH (DATA_WIDTH),
.DEPTH (DEPTH),
.CNT_WIDTH (CNT_WIDTH)
) u_dut (
.clk (clk),
.reset (reset),
.write (write),
.read (read),
.clear (clear),
.data_in (data_in),
.data_out (data_out),
.almost_full (almost_full),
.full (full),
.almost_empty (almost_empty),
.empty (empty),
.cnt (cnt)
);
// --------------------------------------------------------------------
// 3. Clock Generation
// --------------------------------------------------------------------
initial begin
clk = 0;
forever #5 clk = ~clk; // 100MHz clock
end
// --------------------------------------------------------------------
// 4. Test Logic
// --------------------------------------------------------------------
initial begin
// Initialize signals
reset = 1;
write = 0;
read = 0;
clear = 0;
data_in = 0;
// Hold reset for a few cycles
repeat (3) @(posedge clk);
reset <= 0;
$display("--- Reset Complete. Starting Random Traffic ---");
// Run simulation for 100 cycles
repeat (1000) begin
@(posedge clk);
drive_random_stimulus();
end
// End simulation
repeat (5) @(posedge clk);
$display("--- Test Complete ---");
$finish;
end
// --------------------------------------------------------------------
// 5. Stimulus Driver Task
// --------------------------------------------------------------------
task drive_random_stimulus();
// Temporary variables for randomization
bit rand_write;
bit rand_read;
bit rand_clear;
bit [DATA_WIDTH-1:0] rand_data;
// 1. Randomize raw values
// Give clear a low probability (5%) to allow FIFO to fill/drain
rand_clear = ($urandom_range(0, 99) < 5);
rand_write = $urandom_range(0, 1);
rand_read = $urandom_range(0, 1);
rand_data = $urandom();
// 2. Apply Constraints (The critical part)
// CONSTRAINT 1: Never write to a full FIFO
// Note: We check 'full' which is the status from the PREVIOUS cycle
if (full) begin
rand_write = 0;
end
// CONSTRAINT 2: Never read from an empty FIFO
if (empty) begin
rand_read = 0;
end
// CONSTRAINT 3: no clear && read
if (clear) begin
rand_read = 0;
end
// Note: If Clear is asserted, we allow Write to pass through if generated
// (Testing the "Write-Through-Clear" feature), but we ensure we
// don't violate the Full check above.
// 3. Drive signals synchronously (using Non-Blocking Assignment)
write <= rand_write;
read <= rand_read;
clear <= rand_clear;
data_in <= rand_data;
// 4. Monitor/Debug Output (Optional, for visibility)
// We print what we are *attempting* to do in this cycle
if (reset == 0) begin
$display("Time: %0t | Cnt: %0d | Full: %b Empty: %b | Driving -> Wr: %b Rd: %b Clr: %b Data: %h",
$time, cnt, full, empty, rand_write, rand_read, rand_clear, rand_data);
end
endtask
// --------------------------------------------------------------------
// 6. Output Monitor (Checks results after clock edge)
// --------------------------------------------------------------------
always @(posedge clk) begin
if (!reset) begin
// Slight delay to allow NBA updates to settle for display logic
// (purely for printing convenience, not logic)
#1;
if (read && !empty)
$display(" >> READ ACK: Data Out = %h", data_out);
end
end
endmodule
\ No newline at end of file
+incdir+{{ CASE_DIR }}/testcases/{{ TEST_CASE }}
{{ CASE_DIR }}/testcases/{{ TEST_CASE }}/tb.sv
\ No newline at end of file
`timescale 1ns / 1ps
module tb_eth_fifo;
// --------------------------------------------------------------------
// 1. Parameters & Signals
// --------------------------------------------------------------------
parameter DATA_WIDTH = 32;
parameter DEPTH = 8;
parameter CNT_WIDTH = 4;
logic clk;
logic reset;
logic write;
logic read;
logic clear;
logic [DATA_WIDTH-1:0] data_in;
wire [DATA_WIDTH-1:0] data_out;
wire almost_full;
wire full;
wire almost_empty;
wire empty;
wire [CNT_WIDTH-1:0] cnt;
// --------------------------------------------------------------------
// 2. DUT Instantiation
// --------------------------------------------------------------------
eth_fifo #(
.DATA_WIDTH (DATA_WIDTH),
.DEPTH (DEPTH),
.CNT_WIDTH (CNT_WIDTH)
) u_dut (
.clk (clk),
.reset (reset),
.write (write),
.read (read),
.clear (clear),
.data_in (data_in),
.data_out (data_out),
.almost_full (almost_full),
.full (full),
.almost_empty (almost_empty),
.empty (empty),
.cnt (cnt)
);
// --------------------------------------------------------------------
// 3. Clock Generation
// --------------------------------------------------------------------
initial begin
clk = 0;
forever #5 clk = ~clk; // 100MHz clock
end
// --------------------------------------------------------------------
// 4. Test Logic
// --------------------------------------------------------------------
initial begin
// Initialize signals
reset = 1;
write = 0;
read = 0;
clear = 0;
data_in = 0;
// Hold reset for a few cycles
repeat (3) @(posedge clk);
reset <= 0;
$display("--- Reset Complete. Starting Random Traffic ---");
// Run simulation for 100 cycles
repeat (1000) begin
@(posedge clk);
drive_random_stimulus();
end
// End simulation
repeat (5) @(posedge clk);
$display("--- Test Complete ---");
$finish;
end
// --------------------------------------------------------------------
// 5. Stimulus Driver Task
// --------------------------------------------------------------------
task drive_random_stimulus();
// Temporary variables for randomization
bit rand_write;
bit rand_read;
bit rand_clear;
bit [DATA_WIDTH-1:0] rand_data;
// 1. Randomize raw values
// Give clear a low probability (5%) to allow FIFO to fill/drain
// rand_clear = ($urandom_range(0, 99) < 5);
rand_clear = ($urandom_range(0, 99) < 40);
rand_write = $urandom_range(0, 1);
rand_read = $urandom_range(0, 1);
rand_data = $urandom();
// 2. Apply Constraints (The critical part)
// CONSTRAINT 1: Never write to a full FIFO
// Note: We check 'full' which is the status from the PREVIOUS cycle
if (full) begin
rand_write = 0;
end
// CONSTRAINT 2: Never read from an empty FIFO
if (empty) begin
rand_read = 0;
end
// Note: If Clear is asserted, we allow Write to pass through if generated
// (Testing the "Write-Through-Clear" feature), but we ensure we
// don't violate the Full check above.
// 3. Drive signals synchronously (using Non-Blocking Assignment)
write <= rand_write;
read <= rand_read;
clear <= rand_clear;
data_in <= rand_data;
// 4. Monitor/Debug Output (Optional, for visibility)
// We print what we are *attempting* to do in this cycle
if (reset == 0) begin
$display("Time: %0t | Cnt: %0d | Full: %b Empty: %b | Driving -> Wr: %b Rd: %b Clr: %b Data: %h",
$time, cnt, full, empty, rand_write, rand_read, rand_clear, rand_data);
end
endtask
// --------------------------------------------------------------------
// 6. Output Monitor (Checks results after clock edge)
// --------------------------------------------------------------------
always @(posedge clk) begin
if (!reset) begin
// Slight delay to allow NBA updates to settle for display logic
// (purely for printing convenience, not logic)
#1;
if (read && !empty)
$display(" >> READ ACK: Data Out = %h", data_out);
end
end
endmodule
\ No newline at end of file
+incdir+{{ CASE_DIR }}/testcases/{{ TEST_CASE }}
{{ CASE_DIR }}/testcases/{{ TEST_CASE }}/tb.sv
\ No newline at end of file
`timescale 1ns/1ps
module tb_eth_fifo;
// Parameters
parameter DATA_WIDTH = 32;
parameter DEPTH = 8;
parameter CNT_WIDTH = 4;
// Signals
logic clk;
logic reset;
logic write;
logic read;
logic clear;
logic [DATA_WIDTH-1:0] data_in;
// Outputs
logic [DATA_WIDTH-1:0] data_out;
logic almost_full, full;
logic almost_empty, empty;
logic [CNT_WIDTH-1:0] cnt;
// Instantiate the DUT (Device Under Test)
eth_fifo #(
.DATA_WIDTH(DATA_WIDTH),
.DEPTH(DEPTH),
.CNT_WIDTH(CNT_WIDTH)
) u_dut (.*); // wildcard connection since names match
// 1. Clock Generation
initial begin
clk = 0;
forever #5 clk = ~clk; // 10ns period
end
// 2. Test Sequence
initial begin
// Trace generation (compatible with Verilator)
$dumpfile("dump.vcd");
$dumpvars(0, tb_eth_fifo);
// Initialize Inputs
reset = 1;
write = 0;
read = 0;
clear = 0;
data_in = 0;
// Hold Reset
#20;
reset = 0;
#10;
$display("--- Starting Tests ---");
// TEST 1: Fill the FIFO
$display("Writing Data...");
for (int i=0; i<DEPTH; i++) begin
@(posedge clk);
write <= 1;
data_in <= 32'hA0 + i;
end
@(posedge clk);
write <= 0; // Stop writing
@(negedge clk); // Check flags safely away from edge
if (full) $display("[PASS] FIFO is Full");
else $display("[FAIL] FIFO should be Full but cnt=%0d", cnt);
// TEST 2: Empty the FIFO
#20;
$display("Reading Data...");
for (int i=0; i<DEPTH; i++) begin
@(posedge clk);
read <= 1;
end
@(posedge clk);
read <= 0;
@(negedge clk);
if (empty) $display("[PASS] FIFO is Empty");
else $display("[FAIL] FIFO should be Empty but cnt=%0d", cnt);
#50;
$finish;
end
endmodule
// +tree forces coverage to ONLY be collected for this instance and its children
// Syntax: +tree <instance_path> <level>
+tree tb_eth_fifo.u_dut 0
+incdir+{{ CASE_DIR }}/rtl/
{{ CASE_DIR }}/rtl/ethmac_defines.v
{{ CASE_DIR }}/rtl/timescale.v
{{ CASE_DIR }}/rtl/eth_fifo.v
\ No newline at end of file
{{ CASE_ABS_DIR }}/fpv_tb/fifo_props.sv
\ No newline at end of file
import os
from datetime import datetime
import random
import sys
sys.path.append("../..")
from utils import execute_bash_command
from jinja2 import Environment, FileSystemLoader
if __name__ == "__main__":
RANDOM_SEED = random.randint(1, 0xFFFFFFFF)
# TAG = datetime.now().strftime("%Y%m%d_%H%M%S")
TAG = RANDOM_SEED
WAVEFORM_NAME = "waveform"
TB_TOP = "tb_eth_fifo"
TEST_LIST = ["sanity_test.0", "random_test.0"]
CASE_DIR = os.getcwd()
jinja_env = Environment(loader=FileSystemLoader('./tool_scripts'))
for TEST_CASE in TEST_LIST:
sim_dir = f"{CASE_DIR}/sim/{TEST_CASE}/{TAG}"
os.makedirs(sim_dir)
os.makedirs(f"{sim_dir}/csrc")
ucli_tcl_temp = jinja_env.get_template('ucli.tcl.jinja')
ucli_tcl = ucli_tcl_temp.render(
WAVEFORM_NAME = WAVEFORM_NAME,
TB_TOP = TB_TOP,
)
with open(f"{sim_dir}/ucli.tcl", "w") as f:
f.write(ucli_tcl)
f.close()
sim_sh_temp = jinja_env.get_template('sim.sh.jinja')
sim_sh = sim_sh_temp.render(
CASE_DIR = CASE_DIR,
TEST_CASE = TEST_CASE,
TAG = TAG,
TB_FILES = "tb.sv",
WAVEFORM_NAME = WAVEFORM_NAME,
RANDOM_SEED = RANDOM_SEED,
)
with open(f"{sim_dir}/sim.sh", "w") as f:
f.write(sim_sh)
f.close()
dut_f_temp = jinja_env.get_template('dut.f.jinja')
dut_f = dut_f_temp.render(
CASE_DIR = CASE_DIR,
)
with open(f"{sim_dir}/dut.f", "w") as f:
f.write(dut_f)
f.close()
jinja_env_case = Environment(loader=FileSystemLoader(f"testcases/{TEST_CASE}"))
tb_f_temp = jinja_env_case.get_template('tb.f.jinja')
tb_f = tb_f_temp.render(
CASE_DIR = CASE_DIR,
TEST_CASE = TEST_CASE,
)
with open(f"{sim_dir}/tb.f", "w") as f:
f.write(tb_f)
f.close()
ret = execute_bash_command(f"cd {sim_dir} && chmod +x sim.sh && ./sim.sh", verbose=True)
# mrg coverage
cov_dirs_flag = ""
for TEST_CASE in TEST_LIST:
cov_dirs_flag += f"-dir {CASE_DIR}/sim/{TEST_CASE}/{TAG}/simv.vdb "
rpt_dir = f"{CASE_DIR}/sim/cov_rpt/{TAG}"
os.makedirs(rpt_dir, exist_ok=True)
ret = execute_bash_command(f"urg -full64 {cov_dirs_flag} -dbname {rpt_dir}/merged.vdb -format both -report {rpt_dir}", verbose=True)
#!/bin/bash
## RUN on GPU server
workdir=$(pwd)
ssh eda-02 "\
source /eda-tools/eda-software/synopsys/source-scripts/bash_eda02 && \
cd $workdir && python3 tool_scripts/launch_sim.py \
"
#!/bin/bash
vcs -full64 -l vcs.log -sverilog +v2k -debug_access+all -kdb \
-cm line+cond+fsm+tgl+branch+assert \
-P ${VERDI_HOME}/share/PLI/VCS/linux64/novas.tab ${VERDI_HOME}/share/PLI/VCS/linux64/pli.a \
-cm_hier {{ CASE_DIR }}/tool_scripts/cov_config.cfg \
-cm_dir {{ CASE_DIR }}/sim/{{ TEST_CASE }}/{{ TAG }}/simv.vdb \
-o {{ CASE_DIR }}/sim/{{ TEST_CASE }}/{{ TAG }}/simv \
-l {{ CASE_DIR }}/sim/{{ TEST_CASE }}/{{ TAG }}/compile.log \
-f {{ CASE_DIR }}/sim/{{ TEST_CASE }}/{{ TAG }}/dut.f \
-f {{ CASE_DIR }}/sim/{{ TEST_CASE }}/{{ TAG }}/tb.f
pushd {{ CASE_DIR }}/sim/{{ TEST_CASE }}/{{ TAG }}
if [ -f "simv" ]; then
./simv \
-cm line+cond+fsm+tgl+branch+assert \
-cm_dir {{ CASE_DIR }}/sim/{{ TEST_CASE }}/{{ TAG }}/simv.vdb \
-ucli -i ucli.tcl -l sim.log +ntb_random_seed {{ RANDOM_SEED }}
if [ -f {{ WAVEFORM_NAME }}.fsdb ]; then
fsdb2vcd -keep_last_time {{ WAVEFORM_NAME }}.fsdb -o {{ WAVEFORM_NAME }}.vcd
else
echo "SIMULATION FAILED, WAVEFORM NOT GENERATED!!!"
fi
else
echo "COMPILATION FAILED!!!"
fi
popd
read_verilog -Irtl rtl/eth_fifo.v
synth -top eth_fifo
# use DFF without enable ports
dfflegalize -cell $_DFF_PP0_ 0 -cell $_DFF_P_ 0
abc -g AND,NAND,OR,NOR,XOR
clean
# makes the name consistant between netlist.v and netlist.json
rename -enumerate
write_verilog -noexpr -noattr results/netlist.v
write_json results/netlist.json
fsdbDumpfile "{{ WAVEFORM_NAME }}.fsdb"
fsdbDumpvars 0 "{{ TB_TOP }}" "+all" "+mda" "+struct" "+parameter"
fsdbDumpSVA
run
quit
\ No newline at end of file
#!/bin/bash
### NOTE: run ONLY ONCE
./.venv_pkgs/uv venv .venv --python .venv_pkgs/python/bin/python3
./.venv_pkgs/uv pip install -r requirements/requirements_torch.txt --no-index --find-links .venv_pkgs/wheels/
./.venv_pkgs/uv pip install -r requirements/requirements_PyG.txt --no-index --find-links .venv_pkgs/wheels/
torch-cluster==1.6.3
torch-scatter==2.1.2
torch-sparse==0.6.18
torch-spline-conv==1.2.2
torch-geometric==2.6.1
gputil==1.4.0
numpy==2.1.1
progress==1.6
pyparsing==3.1.4
python-dateutil==2.9.0.post0
torch==2.3.0
#!/bin/bash
### MUST be used by `source setup_env.sh`
conda deactivate
source .venv/bin/activate
module load /tools/cluster-modulefiles/git/2.31.1
import subprocess
import sys
import os
import threading
from typing import Dict
import tomli as tomllib
from pathlib import Path
def execute_bash_command(command: str, verbose: bool=False) -> Dict:
"""
Executes a bash command.
Args:
command (str): The bash command to execute.
verbose (bool): If True, streams stdout/stderr to the terminal in real-time.
If False, runs silently.
Returns:
dict: A dictionary containing 'stdout', 'stderr', and 'return_code'.
"""
try:
# --- CASE 1: Silent Mode (Simpler & Faster) ---
if not verbose:
result = subprocess.run(
command,
shell=True,
capture_output=True, # Captures both stdout and stderr
text=True,
executable='/bin/bash'
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"return_code": result.returncode
}
# --- CASE 2: Verbose Mode (Real-time Streaming) ---
else:
# We use Popen to access the streams while the process is running
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
executable='/bin/bash'
)
# Lists to store the full output for the return value
stdout_captured = []
stderr_captured = []
def read_stream(stream, capture_list, is_stderr):
"""Helper to read a stream line-by-line, print it, and capture it."""
for line in stream:
capture_list.append(line)
if is_stderr:
# Print to stderr and flush immediately
print(line, end='', file=sys.stderr, flush=True)
else:
# Print to stdout and flush immediately
print(line, end='', file=sys.stdout, flush=True)
# Start a thread to read STDERR in the background.
# This prevents deadlocks if stderr fills up while we are reading stdout.
stderr_thread = threading.Thread(
target=read_stream,
args=(process.stderr, stderr_captured, True)
)
stderr_thread.start()
# Read STDOUT in the main thread
read_stream(process.stdout, stdout_captured, False)
# Wait for the stderr thread to finish (it ends when the process closes stderr)
stderr_thread.join()
# Wait for the actual process to finish and get exit code
return_code = process.wait()
return {
"stdout": "".join(stdout_captured),
"stderr": "".join(stderr_captured),
"return_code": return_code
}
except Exception as e:
return {
"stdout": "",
"stderr": f"Python Script Error: {str(e)}",
"return_code": -1
}
def load_config(case_name: str) -> Dict:
config_file = f"cases/{case_name}/config.toml"
with open(config_file, 'rb') as f:
config = tomllib.load(f)
f.close()
case_dir = f"cases/{case_name}"
case_abs_dir = os.path.abspath(case_dir)
config["case_abs_dir"] = Path(case_abs_dir)
config["case_name"] = case_name
if config["reset_activate"] == "high":
config["reset_expr"] = config["reset"]
elif config["reset_activate"] == "low":
config["reset_expr"] = f"~{config['reset']}"
else:
raise ValueError("Invalid reset_activate value, should be 'high' or 'low'")
data_dir = f"cases/{case_name}/results"
os.makedirs(data_dir, exist_ok=True)
return config
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment