Commit 31fbe161 by Yaoyu Zhu

add analysis for are_equal_under_sympy timeout

parent 690c0419
...@@ -111,7 +111,9 @@ ssh -L 8266:localhost:49931 zhuyaoyu@r8l40-a02 -N ...@@ -111,7 +111,9 @@ ssh -L 8266:localhost:49931 zhuyaoyu@r8l40-a02 -N
#### are_equal_under_sympy 超时 #### are_equal_under_sympy 超时
[TODO] [TODO]
原因未知。**难点:在are_equal_under_sympy里面打印各环节步骤没有超时的,怀疑是装饰器timeout_limit里面不知道哪搞超时了。** 我测了一遍`verl/utils/py_functional.py`里面的时间,发现`are_equal_under_sympy`的超时时间是10s,但是我测的`_mp_target_wrapper`的实际执行时间都在1s以内(`are_equal_under_sympy`更短)。然而,外层`wrapper_mp`里面`process.join(timeout=seconds)`那一步耗时在5~10s的样子,并且观察到超时报错之后会有重置现象(10s->5s),暂时没太搞懂原因。
由于这部分是数学的,后面暂时不用,就先放着不管了吧。
#### reward_score里面None过多 #### reward_score里面None过多
...@@ -188,15 +190,10 @@ original_print = builtins.print ...@@ -188,15 +190,10 @@ original_print = builtins.print
def traced_print(*args, **kwargs): def traced_print(*args, **kwargs):
frame = inspect.currentframe().f_back frame = inspect.currentframe().f_back
depth = 0 depth = 0
while frame and depth < 15: filename = os.path.basename(frame.f_code.co_filename)
filename = os.path.basename(frame.f_code.co_filename) lineno = frame.f_lineno
lineno = frame.f_lineno original_print(f"[PRINT @ {filename}:{lineno} DEPTH={depth}]", *args, **kwargs)
if depth == 0:
original_print(f"[PRINT @ {filename}:{lineno} DEPTH={depth}]", *args, **kwargs)
break
frame = frame.f_back
depth += 1
builtins.print = traced_print builtins.print = traced_print
``` ```
...@@ -222,10 +219,10 @@ builtins.print = traced_print ...@@ -222,10 +219,10 @@ builtins.print = traced_print
"'val-core/numina_synthetic_amc/reward/mean@1': 0.3055555555555556, " "'val-core/numina_synthetic_amc/reward/mean@1': 0.3055555555555556, "
"'val-core/numina_aops_forum/reward/mean@1': 0.4074074074074074, " "'val-core/numina_aops_forum/reward/mean@1': 0.4074074074074074, "
"'val-core/numina_amc_aime/reward/mean@1': 0.5714285714285714, " "'val-core/numina_amc_aime/reward/mean@1': 0.5714285714285714, "
"'val-core/codecontests/reward/mean@1': 0.1666966161396947, " "'val-core/codecontests/reward/mean@1': 0.31737047114542555, "
"'val-core/codeforces/reward/mean@1': 0.11601288201378994, " "'val-core/codeforces/reward/mean@1': 0.1810402046950137, "
"'val-core/apps/reward/mean@1': 0.20229991152882576, " "'val-core/apps/reward/mean@1': 0.3451626427786451, "
"'val-aux/taco/reward/mean@1': 0.0732337588380123, " "'val-aux/taco/reward/mean@1': 0.17660730168104805, "
"'val-core/taco/reward/mean@2': 0.0, 'val-aux/taco/reward/std@2': 0.0, " "'val-core/taco/reward/mean@2': 0.0, 'val-aux/taco/reward/std@2': 0.0, "
"'val-core/taco/reward/best@2/mean': 0.0, 'val-core/taco/reward/best@2/std': " "'val-core/taco/reward/best@2/mean': 0.0, 'val-core/taco/reward/best@2/std': "
"0.0, 'val-aux/taco/reward/worst@2/mean': 0.0, " "0.0, 'val-aux/taco/reward/worst@2/mean': 0.0, "
......
set -x
export VERL_LOGGING_LEVEL=INFO
python3 -X faulthandler -u -m verl.trainer.main_ppo \
reward_model.sandbox_fusion.url=$SANDBOX_URL \
reward_model.sandbox_fusion.max_concurrent=128 \
reward_model.reward_manager=prime \
algorithm.adv_estimator=grpo \
data.train_files=$CURR_DIR/data/eurus2/train.parquet \
data.val_files=$CURR_DIR/data/eurus2/validation.parquet \
data.train_batch_size=128 \
data.max_prompt_length=1024 \
data.max_response_length=3072 \
data.return_raw_chat=True \
data.filter_overlong_prompts=True \
data.truncation='error' \
actor_rollout_ref.model.path=/share/collab/codemodel/models/Qwen2.5-7B-Instruct \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.model.use_remove_padding=True \
actor_rollout_ref.model.use_liger=True \
actor_rollout_ref.actor.use_dynamic_bsz=True \
actor_rollout_ref.actor.ppo_mini_batch_size=128 \
actor_rollout_ref.actor.ppo_max_token_len_per_gpu=12000 \
actor_rollout_ref.actor.fsdp_config.param_offload=False \
actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \
actor_rollout_ref.actor.use_kl_loss=False \
actor_rollout_ref.model.enable_gradient_checkpointing=True \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=32 \
actor_rollout_ref.rollout.tensor_model_parallel_size=4 \
actor_rollout_ref.rollout.name=sglang \
actor_rollout_ref.rollout.multi_turn.enable=True \
actor_rollout_ref.rollout.multi_turn.max_assistant_turns=5 \
actor_rollout_ref.rollout.multi_turn.tool_config_path=$CURR_DIR/examples/tir/sandbox_fusion_tool_config.yaml \
actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \
actor_rollout_ref.rollout.n=16 \
algorithm.use_kl_in_reward=False \
trainer.critic_warmup=0 \
trainer.logger=['console'] \
trainer.project_name='verl_example_sandbox_fusion' \
trainer.experiment_name='eurus2_sandbox_fusion' \
trainer.n_gpus_per_node=8 \
trainer.nnodes=1 \
trainer.save_freq=20 \
trainer.test_freq=1 \
trainer.total_epochs=15 $@
tools:
- class_name: "verl.tools.sandbox_fusion_tools.SandboxFusionTool"
config:
sandbox_fusion_url: "localhost:8080"
num_workers: 10
enable_global_rate_limit: true
rate_limit: 10
default_timeout: 30
default_language: "python"
tool_schema:
type: "function"
function:
name: "code_interpreter"
description: "A tool for executing code."
parameters:
type: "object"
properties:
code:
type: "string"
description: "The code to execute."
required: ["code"]
\ No newline at end of file
tools:
- class_name: "verl.tools.sandbox_fusion_tools.SandboxFusionTool"
config:
sandbox_fusion_url: "${SANDBOX_URL}"
num_workers: 10
enable_global_rate_limit: true
rate_limit: 10
default_timeout: 30
default_language: "python"
tool_schema:
type: "function"
function:
name: "code_interpreter"
description: "A tool for executing code."
parameters:
type: "object"
properties:
code:
type: "string"
description: "The code to execute."
required: ["code"]
\ No newline at end of file
...@@ -24,6 +24,7 @@ from functools import wraps, lru_cache ...@@ -24,6 +24,7 @@ from functools import wraps, lru_cache
from types import SimpleNamespace from types import SimpleNamespace
from typing import Any, Callable, Dict, Iterator, Optional, Tuple from typing import Any, Callable, Dict, Iterator, Optional, Tuple
import cloudpickle import cloudpickle
import time
# --- Top-level helper for multiprocessing timeout --- # --- Top-level helper for multiprocessing timeout ---
...@@ -33,10 +34,13 @@ def _mp_target_wrapper(target_func: Callable, mp_queue: multiprocessing.Queue, a ...@@ -33,10 +34,13 @@ def _mp_target_wrapper(target_func: Callable, mp_queue: multiprocessing.Queue, a
Internal wrapper function executed in the child process. Internal wrapper function executed in the child process.
Calls the original target function and puts the result or exception into the queue. Calls the original target function and puts the result or exception into the queue.
""" """
# t0 = time.time()
target_func = cloudpickle.loads(target_func) target_func = cloudpickle.loads(target_func)
# t1 = time.time()
try: try:
result = target_func(*args, **kwargs) result = target_func(*args, **kwargs)
# t2 = time.time()
mp_queue.put((True, result)) # Indicate success and put result mp_queue.put((True, result)) # Indicate success and put result
except Exception as e: except Exception as e:
# Ensure the exception is pickleable for the queue # Ensure the exception is pickleable for the queue
...@@ -48,8 +52,19 @@ def _mp_target_wrapper(target_func: Callable, mp_queue: multiprocessing.Queue, a ...@@ -48,8 +52,19 @@ def _mp_target_wrapper(target_func: Callable, mp_queue: multiprocessing.Queue, a
except (pickle.PicklingError, TypeError): except (pickle.PicklingError, TypeError):
# Fallback if the original exception cannot be pickled # Fallback if the original exception cannot be pickled
mp_queue.put((False, RuntimeError(f"Original exception type {type(e).__name__} not pickleable: {e}"))) mp_queue.put((False, RuntimeError(f"Original exception type {type(e).__name__} not pickleable: {e}")))
# finally:
# t3 = time.time()
# print(f'[_mp_target_wrapper] for {target_func.__name__}: total time {t3 - t0:.6f}, cloudpickle load time {t1 - t0:.6f}, target func time {t2 - t1:.6f}, put result time {t3 - t2:.6f}')
# # Subprocess task: encapsulate the original function call
# def _mp_task(func_blob, args_blob):
# func = cloudpickle.loads(func_blob)
# args, kwargs = cloudpickle.loads(args_blob)
# return func(*args, **kwargs)
# pool = multiprocessing.Pool(processes=16)
# Renamed the function from timeout to timeout_limit # Renamed the function from timeout to timeout_limit
def timeout_limit(seconds: float, use_signals: bool = False): def timeout_limit(seconds: float, use_signals: bool = False):
""" """
...@@ -105,13 +120,16 @@ def timeout_limit(seconds: float, use_signals: bool = False): ...@@ -105,13 +120,16 @@ def timeout_limit(seconds: float, use_signals: bool = False):
# --- Multiprocessing based timeout (existing logic) --- # --- Multiprocessing based timeout (existing logic) ---
@wraps(func) @wraps(func)
def wrapper_mp(*args, **kwargs): def wrapper_mp(*args, **kwargs):
# q = multiprocessing.Queue(maxsize=1)
# process = multiprocessing.Process(target=_mp_target_wrapper, args=(func, q, args, kwargs))
q = multiprocessing.Queue(maxsize=2) q = multiprocessing.Queue(maxsize=2)
func_blob = cloudpickle.dumps(func) func_blob = cloudpickle.dumps(func)
# t0 = time.time()
process = multiprocessing.Process(target=_mp_target_wrapper, args=(func_blob, q, args, kwargs)) process = multiprocessing.Process(target=_mp_target_wrapper, args=(func_blob, q, args, kwargs))
# t1 = time.time()
process.start() process.start()
# t2 = time.time()
process.join(timeout=seconds) process.join(timeout=seconds)
# t3 = time.time()
# print(f'In [timeout_limit], time cost is {t3 - t0:.2f} seconds, process creation: {t1 - t0:.2f} seconds, process start: {t2 - t1:.2f} seconds, join: {t3 - t2:.2f} seconds')
if process.is_alive(): if process.is_alive():
process.terminate() process.terminate()
...@@ -139,6 +157,27 @@ def timeout_limit(seconds: float, use_signals: bool = False): ...@@ -139,6 +157,27 @@ def timeout_limit(seconds: float, use_signals: bool = False):
q.close() q.close()
q.join_thread() q.join_thread()
# # Serialize the function and arguments (for compatibility with process pool communication)
# func_blob = cloudpickle.dumps(func)
# args_blob = cloudpickle.dumps((args, kwargs))
# # Time and execute the task
# t0 = time.time()
# result = None
# try:
# # Use the process pool's apply_async with timeout mechanism
# future = pool.apply_async(_mp_task, args=(func_blob, args_blob))
# result = future.get(timeout=seconds) # Timeout control
# except multiprocessing.TimeoutError:
# print(f"Function {func.__name__} timed out after {seconds} seconds")
# # No need to terminate the process after timeout (process pool will reuse it),
# # but clean up the current task
# future = None # Discard the current task, the process pool can still be reused
# finally:
# print(f"External timing: {time.time() - t0:.2f} seconds")
# return result
return wrapper_mp return wrapper_mp
return decorator return decorator
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment