Commit b1db6cb1 by nzy

add new evaluation. TODO plot results

parent c32b630e
from codecritic.evaluation.metric import group_results, score_pass_at_k
import argparse
from functools import partial
from pathlib import Path
import codecritic.evaluation.metric as metric
from codecritic.utils.json import load_jsonl, save_jsonl
def eval():
# compute pass@k
eval_result_path = result_dir / "passk.jsonl"
# results = load_jsonl(score_path)
groups = group_results(results, apps_path)
eval_results = [score_pass_at_k(groups, k, home_path.stem) for k in range(1, 16)]
save_jsonl(eval_results, eval_result_path)
pprint.pp(eval_results)
\ No newline at end of file
def eval(samples_path):
model, testset = samples_path.stem.split('-')[:2]
def f(item):
item["model"] = model
item["testset"] = testset
samples = load_jsonl(samples_path)
ks = list(range(1, 17))
results = []
results.append(metric.pass_at_k(samples, ks))
results.append(metric.top_at_k(samples, ks, metric.postive_and_negative))
results.append(metric.top_at_k(samples, ks, metric.positive_only))
for i in range(4):
threshold = 0.5 + i * 0.1
score_func = partial(metric.pos_neg_filter_uncertain, threshold=threshold)
results.append(metric.top_at_k(samples, ks, score_func))
return list(map(f, results))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--sample_dir",
type=str,
default=None,
help="Path to the directory containing samples. If not provided, cached results will be used."
)
parser.add_argument("--out_dir", type=str, help="path/to/output_dir")
parser.add_argument(
"--score_func",
type=str,
default="all",
choices=["all", "posonly", "posneg", "posneg_filter"], # Add valid options
help="Select the scoring function to use. Default: 'all'."
)
parser.add_argument("--plot", type=str, help="path/to/plot")
args = parser.parse_args()
outdir = Path(args.out_dir)
if args.sample_dir:
for samples_path in Path(args.sample_dir).glob("*.jsonl"):
out_path = outdir / (samples_path.stem + "-eval.jsonl")
if not out_path.exists():
eval_results = eval(samples_path)
save_jsonl(eval_results, out_path)
for out_path in outdir.glob("*.jsonl"):
pass
\ No newline at end of file
# Additional Experiment:
# Is reasoning really work? Let's verify step by step.
from codecritic.dataset.code import extract_code, code_template
from codecritic.dataset.utils import SPLITTER, mk_message
from codecritic.dataset.verify import mk_critic_verify
COV_PROMPT = "Please verify your code step by step using Markdown code blocks. After each step, explain whether it's correct or not, and if not, explain the issue."
COV_EXAMPLE = """\
** Example RETURN FORMAT **
```python
def add_numbers(a, b):
return a + b
result = add_numbers(5, '10')
```
1. **Code:**
```python
def add_numbers(a, b):
return a + b
```
**Explanation:** Correct. This defines a function `add_numbers` that takes two arguments and returns their sum.
2. **Code:**
```python
result = add_numbers(5, '10')
```
**Explanation:** Incorrect. The second argument is a string (`'10'`), which will cause a TypeError when trying to add it to an integer.
"""
CORRECT_PROMPT = "Your code is correct."
INCORRECT_PROMPT = "Your code is incorrect."
def mk_cov_prompt(is_correct, splitter, mode):
if mode == "train":
anchor = CORRECT_PROMPT if is_correct else INCORRECT_PROMPT
elif mode == "test":
anchor = ""
else:
raise ValueError(f"Invalid mode: {mode}. Expected 'train' or 'test'.")
turn1 = {"role": "user", "content": '\n'.join([anchor, COV_PROMPT, COV_EXAMPLE])}
if splitter:
turn2 = {
"role": "assistant",
"content": "Here's a step-by-step verification of the code." + SPLITTER,
}
return [turn1, turn2]
else:
return [turn1]
def convert_preference_to_vot_prompt(item, splitter, mode):
message = item["messages"][0]["content"]
chosen = item["chosen"]["content"]
rejected = item["rejected"]["content"]
chosen = code_template.format(extract_code(chosen))
rejected = code_template.format(extract_code(rejected))
messages1 = mk_message(message, chosen) + mk_cov_prompt(True, splitter, mode)
messages2 = mk_message(message, rejected) + mk_cov_prompt(False, splitter, mode)
return (
{"messages": messages1, "eval_result": True, "problem_id": item["problem_id"]},
{"messages": messages2, "eval_result": False, "problem_id": item["problem_id"]}
)
def convert_sft_to_vot_prompt(item, splitter, mode):
question = item["messages"][0]["content"]
response = item["messages"][1]["content"]
code = code_template.format(extract_code(response))
messages = mk_message(question, code) + mk_cov_prompt(item["eval_result"], splitter, mode)
return {"messages": messages, "eval_result": item["eval_result"], "problem_id": item["problem_id"]}
def convert_cov_to_cov_dataset(item, mode):
item["messages"][2]["content"] = COV_PROMPT
if mode == "train":
item["messages"] += mk_critic_verify(item["eval_result"])
return item
from difflib import unified_diff
import re
from codecritic.dataset.code import extract_code
from codecritic.dataset.verify import mk_critic_verify
# QwQ doesn't follow my instruction, but output *really* reasonable explanation
SYS_PROMPT = f"""
You are an AI code reviewer tasked with analyzing code solutions to programming problems. You will be given a problem description, a code solution, and information about the solution's correctness. If the solution is incorrect, you will also be provided with a diff showing the differences between the given solution and a correct one.
Your task is to analyze the provided code *step-by-step*, reasoning through its logic and identifying potential issues. Initially, approach the analysis as if you don't know the final judgement of its correctness. However, your final conclusion about the code's correctness must align with the provided information.
Output your reasoning process within a markdown code block using the following format:
```Rationale
[Your step-by-step reasoning here. Explain what the code does line by line and discuss possible errors.]
```
Don't simulate its runtime behavior, mentally executing it with specific inputs, or predicting its output
Finally, based on your analysis, state your conclusion about the code's correctness (either "Yes" or "No") using the following format:
Final Answer:
(Yes or No)
"""
USER_PROMPT = """
Problem:
{problem}
Code:
{code}
Correctness(Yes or No): {correctness}
Diff (Only if Correctness is "No"):
{diff}
"""
def transform_preference_to_qwq_prompt(item):
assert all(len(item[x]) == 1 for x in ["messages", "chosen", "rejected"])
problem = item["messages"][0]["content"]
chosen_code = item["chosen"][0]["content"]
rejected_code = item["rejected"][0]["content"]
diff = "".join(unified_diff(
extract_code(rejected_code).splitlines(keepends=True),
extract_code(chosen_code).splitlines(keepends=True),
fromfile="incorrect.py",
tofile="correct.py",
n=1,
))
sys_message = {"role": "system", "content": SYS_PROMPT}
chosen_message = {
"role": "user",
"content": USER_PROMPT.format(
problem=problem, code=chosen_code, correctness="Yes", diff=""
),
}
rejected_message = {
"role": "user",
"content": USER_PROMPT.format(
problem=problem, code=rejected_code, correctness="No", diff=diff
),
}
return (
{
"messages": [sys_message, chosen_message],
"eval_result": True,
"raw": item["messages"] + item["chosen"],
},
{
"messages": [sys_message, rejected_message],
"eval_result": False,
"raw": item["messages"] + item["rejected"],
},
)
rationale_pattern = re.compile(r"```rationale(.+?)```", flags=re.DOTALL)
def extract_rationale(text):
rationale = [match.strip() for match in re.findall(rationale_pattern, text)]
if len(rationale) < 1:
return ""
elif len(rationale) > 1:
print("warning: multiple rationales")
return "\n".join(rationale)
else:
return rationale[0]
def transform_qwqout_to_trainset(item):
messages = item["raw"]
rationale = item["messages"][-1]["content"]
response = [
{"role": "user", "content": "Please analyze your code step by step."},
{"role": "assistant", "content": rationale},
]
response += mk_critic_verify(item["eval_result"])
return {
"question": messages,
"response": response,
"eval_result": item["eval_result"],
}
import sys
import numpy as np
from datasets import load_dataset
from sklearn import metrics
from collections import defaultdict
......@@ -23,62 +24,75 @@ def estimate_pass_at_k(
)
def group_results(results, apps_path):
"""
Output
{
"interview": [
problem_id: [
{"problem_id", problem_id, "eval_result": True, ... },
...
],
...
],
...
}
"""
dataset = load_dataset(apps_path)
def pass_at_k(samples, ks: list[int]):
# groupby taskid
grouped = defaultdict(list)
for sample in samples:
grouped[sample["task_id"]] = sample
num_samples, num_correct = [], []
for task_id, group in grouped.items():
num_samples.append(len(group))
num_correct.append(sum(x["pass"] for x in group))
assert len(set(num_samples)) == 1, "Groups don't have same size"
results = []
for k in ks:
pass_k = np.mean(estimate_pass_at_k(num_samples, num_correct, k))
results.append({"k": k, "pass@k": pass_k, "score_func": "random"})
return results
def positive_only(item):
return item["positive_score"]
groups = defaultdict(lambda: defaultdict(list))
for item in results:
problem_id = item["problem_id"]
split, idx = problem_id.split("_")
difficulty = dataset[split][int(idx)]["difficulty"]
groups[difficulty][problem_id].append(item)
def postive_and_negative(item):
pos = item["positive_score"]
neg = item["negative_score"]
return pos / (pos + neg)
if "score" in results[0]:
for difficulty, problem in groups.items():
for problem_id, lst in problem.items():
sorted_lst = sorted(lst, key=lambda x: x["score"], reverse=True)
problem[problem_id] = sorted_lst
return groups
def pos_neg_filter_uncertain(item, threshold):
pos = item["positive_score"]
neg = item["negative_score"]
if (pos + neg) < threshold:
return 0
else:
return pos / (pos + neg)
def pass_at_k(groups, k):
result = {"strategy": "pass@k", "k": k}
def top_at_k(samples, ks: list[int], score_func):
grouped = defaultdict(list)
for sample in samples:
grouped[sample["task_id"]] = sample
for difficulty, problems in groups.items():
num_samples, num_correct = [], []
for lst in problems.values():
num_samples.append(len(lst))
num_correct.append(sum(item["eval_result"] for item in lst))
pass_at_k = np.mean(estimate_pass_at_k(num_samples, num_correct, k))
result[difficulty] = pass_at_k
num_samples, first_pass_indices = [], []
for task_id, group in grouped.items():
num_samples.append(len(group))
scored_group = [(score_func(item), item) for item in group]
sorted_group = sorted(scored_group, key=lambda x: x[0], reverse=True)
return result
first_pass_idx = sys.maxsize
for idx, (_, item) in enumerate(sorted_group):
if item["pass"]:
first_pass_idx = idx
break
first_pass_indices.append(first_pass_idx)
assert len(set(num_samples)) == 1, "Groups don't have same size"
def score_pass_at_k(groups, k, strategy):
result = {"strategy": strategy, "k": k}
results = []
for k in ks:
top_k = sum(1 for x in first_pass_indices if x < k) / len(first_pass_indices)
results.append({"k": k, "pass@k": top_k, "score_func": score_func.__name__})
return results
for difficulty, problems in groups.items():
num_samples, num_correct = 0, 0
for lst in problems.values():
num_samples += 1
num_correct += any(item["eval_result"] for item in lst[:k])
pass_at_k = num_correct / num_samples
result[difficulty] = pass_at_k
def auroc(samples, score_func):
y = np.array([1 if x["pass"] else 0 for x in samples])
pred = np.array([score_func(x) for x in samples])
return result
fpr, tpr, thresholds = metrics.roc_curve(y, pred)
roc_auc = metrics.auc(fpr, tpr)
return roc_auc, fpr, tpr
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment