Commit c401feaf by nanziyuan

step4: clean codes

parent c72d23fc
...@@ -13,23 +13,39 @@ from transformers import AutoTokenizer ...@@ -13,23 +13,39 @@ from transformers import AutoTokenizer
def run_server(api_port, cuda_device, rm_inference_yaml_path, llamafactory_path): def run_server(api_port, cuda_device, rm_inference_yaml_path, llamafactory_path):
env = os.environ.copy() env = os.environ.copy()
env["API_PORT"] = api_port env["API_PORT"] = str(api_port)
env["CUDA_VISIBLE_DEVICES"] = cuda_device env["CUDA_VISIBLE_DEVICES"] = str(cuda_device)
server_process = subprocess.Popen( server_process = subprocess.Popen(
["llamafactory-cli", "api", rm_inference_yaml_path], ["llamafactory-cli", "api", rm_inference_yaml_path],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env, env=env,
cwd=llamafactory_path, cwd=llamafactory_path,
shell=True text=True
) )
for line in server_process.stdout:
if "start output" in line:
break #TODO
print( print(
f"Started server with PID {server_process.pid} on port {api_port} and CUDA device {cuda_device}" f"Started server with PID {server_process.pid} on port {api_port} and CUDA device {cuda_device}"
) )
return server_process return server_process
def stop_server(server_processes): def start_servers(llamafactory_path, inference_cfg_path):
cuda_devices = os.environ["CUDA_VISIBLE_DEVICES"].split(",")
gpu_num = len(cuda_devices)
server_processes = [
run_server(8000 + i, cuda_devices[i], inference_cfg_path, llamafactory_path)
for i in range(gpu_num)
]
time.sleep(10) # Wait for the servers to start (adjust the sleep time as needed)
return server_processes
def stop_servers(server_processes):
for server_process in server_processes: for server_process in server_processes:
server_process.terminate() server_process.terminate()
server_process.wait() server_process.wait()
...@@ -62,8 +78,7 @@ def preprocess_dataset(model_path, test_dataset, gpu_num): ...@@ -62,8 +78,7 @@ def preprocess_dataset(model_path, test_dataset, gpu_num):
return result return result
def reward_model_inference(args): def test_reward_model(item, api_port):
item, api_port = args
server_url = f"http://0.0.0.0:{api_port}/v1/score/evaluation" server_url = f"http://0.0.0.0:{api_port}/v1/score/evaluation"
score = get_rewards_from_server(server_url, item["format_str"])[0] score = get_rewards_from_server(server_url, item["format_str"])[0]
return { return {
...@@ -74,34 +89,29 @@ def reward_model_inference(args): ...@@ -74,34 +89,29 @@ def reward_model_inference(args):
} }
def mutli_process_reward_model_inference(
test_path, model_path, inference_cfg_path, result_path, llamafactory_path
):
cuda_devices = os.environ["CUDA_VISIBLE_DEVICES"].split(",")
gpu_num = len(cuda_devices)
test_dataset = preprocess_dataset(model_path, load_jsonl(test_path), gpu_num)
server_processes = [
run_server(8000 + i, cuda_devices[i], inference_cfg_path, llamafactory_path)
for i in range(gpu_num)
]
time.sleep(300) # Wait for the servers to start (adjust the sleep time as needed)
results = thread_map(reward_model_inference, test_dataset)
stop_server(server_processes)
save_jsonl(results, result_path)
return results
if __name__ == "__main__": if __name__ == "__main__":
cfg = read_config(["orm_testmodel"]) cfg = read_config(["orm_testmodel"])
orm_test_model = cfg["orm_testmodel"] orm_test_model = cfg["orm_testmodel"]
results = mutli_process_reward_model_inference(
cfg["dataset"]["minimal_test_path"], raw_test_dataset = load_jsonl(cfg["dataset"]["minimal_test_path"])
cfg["orm"][orm_test_model]["model_path"],
servers = start_servers(
cfg["llamafactory_path"],
cfg["orm"][orm_test_model]["inference_yaml_path"], cfg["orm"][orm_test_model]["inference_yaml_path"],
cfg["orm"][orm_test_model]["minimal_test_score_path"],
cfg["llamafactory_path"]
) )
groups = group_results(results) test_dataset = preprocess_dataset(
eval_results = [score_pass_at_k(groups, k, orm_test_model) for k in range(16)] cfg["orm"][orm_test_model]["model_path"],
raw_test_dataset,
len(servers)
)
results = thread_map(lambda arg: test_reward_model(*arg),
test_dataset,
max_workers=len(servers))
save_jsonl(results, cfg["orm"][orm_test_model]["minimal_test_score_path"])
stop_servers(servers)
groups = group_results(results, cfg["apps"])
eval_results = [score_pass_at_k(groups, k, orm_test_model) for k in range(1, 32)]
save_jsonl(eval_results, cfg["orm"][orm_test_model]["eval_result_path"]) save_jsonl(eval_results, cfg["orm"][orm_test_model]["eval_result_path"])
from tqdm import tqdm
from utils import load_jsonl, save_jsonl, read_config
from utils_metric import group_results, score_pass_at_k
from step4_test_reward_model import preprocess_dataset, test_reward_model
if __name__ == "__main__":
cfg = read_config(["orm_testmodel"])
orm_test_model = cfg["orm_testmodel"]
raw_test_dataset = load_jsonl(cfg["dataset"]["minimal_test_path"])
model_path = cfg["orm"][orm_test_model]["model_path"]
test_dataset = preprocess_dataset(model_path, raw_test_dataset, 1)
results = [test_reward_model(*arg) for arg in tqdm(test_dataset)]
save_jsonl(results, cfg["orm"][orm_test_model]["minimal_test_score_path"])
# results = load_jsonl(result_path)
groups = group_results(results, cfg["apps"])
eval_results = [score_pass_at_k(groups, k, orm_test_model) for k in range(1, 32)]
save_jsonl(eval_results, cfg["orm"][orm_test_model]["eval_result_path"])
print(eval_results)
from tqdm import tqdm
import requests
import json
from utils import load_jsonl, extract_code, save_jsonl, read_config
from utils_metric import group_results, score_pass_at_k
from transformers import AutoTokenizer
def get_rewards_from_server(server_url: str, messages: list[str]):
"""
Gets reward scores from the API server.
"""
headers = {"Content-Type": "application/json"}
payload = {"model": "model", "messages": messages}
response = requests.post(server_url, json=payload, headers=headers)
rewards = json.loads(response.text)["scores"]
return rewards
def reward_model_inference(item):
server_url = f"http://0.0.0.0:8000/v1/score/evaluation"
score = get_rewards_from_server(server_url, item["format_str"])[0]
return {
"problem_id": item["problem_id"],
"messages": item["messages"],
"eval_result": item["eval_result"],
"score": score,
}
def preprocess_dataset(model_path, test_dataset):
"apply chat_template and split the dataset to different gpu"
tokenizer = AutoTokenizer.from_pretrained(model_path)
result = []
for i, item in enumerate(test_dataset):
messages = item["messages"]
messages[-1]["content"] = extract_code(messages[-1]["content"])
# https://github.com/hiyouga/LLaMA-Factory/blob/a45f3f5461e2936b9e119eda2ef4d8c7a4131740/tests/data/test_template.py#L58
# # llama factory's template should match tokenizer's `apply_chat_template`.
item["format_str"] = [tokenizer.apply_chat_template(messages, tokenize=False)]
result.append(item)
return result
if __name__ == "__main__":
cfg = read_config(["orm_testmodel"])
orm_test_model = cfg["orm_testmodel"]
test_path = cfg["dataset"]["minimal_test_path"]
model_path = cfg["orm"][orm_test_model]["model_path"]
result_path = cfg["orm"][orm_test_model]["minimal_test_score_path"]
eval_result_path = cfg["orm"][orm_test_model]["eval_result_path"]
test_dataset = preprocess_dataset(model_path, load_jsonl(test_path))
results = [reward_model_inference(x) for x in tqdm(test_dataset)]
save_jsonl(results, result_path)
# results = load_jsonl(result_path)
groups = group_results(results, cfg["apps"])
eval_results = [score_pass_at_k(groups, k, orm_test_model) for k in range(1, 32)]
save_jsonl(eval_results, eval_result_path)
print(eval_results)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment