Commit fe472dec by Yaoyu Zhu

fix pickle issues and add ray debugging approach in README

parent ad08907b
......@@ -66,24 +66,19 @@ BrokenPipeError: [Errno 32] Broken pipe
#### 运行卡死
ssh进去debug,`ps -ef | grep <username>`拿到主进程号`<pid>`,然后`gdb -p <pid>``info threads`得到主线程有两种卡死的原因(后来发现这俩是一个问题,第二个问题是因为有别的线程卡在了write()上):
1. `* 1 Thread 0x7ff93de75740 (LWP 1123511) "ray::TaskRunner" 0x00007ff93da4fa17 in write () from /lib64/libpthread.so.0`
2. `* 1 Thread 0x7fdbc010a740 (LWP 127034) "ray::TaskRunner" 0x00007fdbbfce3da6 in do_futex_wait.constprop () from /lib64/libpthread.so.0`
尝试了gdb/strace,然后没成功、并且被带沟里了:
~~ssh进去debug,`ps -ef | grep <username>`拿到主进程号`<pid>`,然后`gdb -p <pid>``info threads`得到主线程有两种卡死的原因(后来发现这俩是一个问题,第二个问题是因为有别的线程卡在了write()上):~~
~~1. `* 1 Thread 0x7ff93de75740 (LWP 1123511) "ray::TaskRunner" 0x00007ff93da4fa17 in write () from /lib64/libpthread.so.0`
2. `* 1 Thread 0x7fdbc010a740 (LWP 127034) "ray::TaskRunner" 0x00007fdbbfce3da6 in do_futex_wait.constprop () from /lib64/libpthread.so.0`~~
最后用gdb debug出来是打日志的时候卡死了(不能完全确定,这是gpt o3的说法)。具体debug方法大致是gdb里面配置输出到文件然后`thread apply all bt full`,再在输出文件里面grep相应关键字。
最终判断方法是调用栈在python IO代码而不是底层Ray C++,并且文件描述符较大(200多),而通常 Python 里打开的 socket、pipe、文件等,其 fd 要么小于 10(标准输入输出),要么是通过 socket()/pipe() 自己分配的。
然后似乎需要(在gdb里面调用):
```
<!-- (gdb) catch syscall write -->
(gdb) catch syscall write
(gdb) commands
silent
printf ">>> write(fd=%d, len=%d)\n", $rdi, $rdx
set $blen = $rdx
eval "x/%ds $rsi", $blen
continue
end
`ray dashboard`进行debug发现了问题。
Debug方法大致是slurm交上任务之后把远程ray dashboard端口转发到登录节点,例如
```bash
ssh -L 8266:localhost:49931 zhuyaoyu@r8l40-a02 -N
```
这里的8266是本地端口,可以随便设置;49931是远程的ray dashboard端口,我脚本里面设置的是`(slurm jobid) + 40000`
可以在登录节点上`ss -ltnp | grep :8266`检查一下是否成功。
成功了之后可以在vscode的`PORTS`里面的`Forwarded Address`里面找到`localhost:8266`然后点开,就会在本地浏览器里面开个ray dashboard。
随后在`ray dashboard``jobs`那一栏,点开任务的job id,下拉到task table,在卡死的时候看running的task就能知道卡在哪了。
或者`strace -f -tt -T -e write -p <PID>`
\ No newline at end of file
发现是卡在了`verl/workers/reward_manager/prime.py``single_compute_score`里面,应该是pickle的东西太大了(最大的在`29M`左右),这个是里面有一些编程竞赛题IO有几万几十万条数据撑起来的。
\ No newline at end of file
......@@ -25,18 +25,31 @@ from verl import DataProto
from verl.utils.reward_score import default_compute_score
from verl.workers.reward_manager import register
import random
import pickle
import ray
@ray.remote(num_cpus=1)
def eval_wrapper(fn, payload_ref):
return fn(*payload_ref)
# 注意:sandbox场景下,这边里面任务就是提交个API,所以按道理的话合batch肯定比下面这种single的快
async def single_compute_score(evaluation_func, completion, reference, task, task_extra_info, executor, timeout=30.0):
loop = asyncio.get_running_loop()
# sometimes (completion, reference, task, task_extra_info) is large, especially the `reference` containing large-size IO
# payload_size = len(pickle.dumps((task, completion, reference, task_extra_info)))
# print(f"[single_compute_score] eval={evaluation_func} pickle={payload_size} bytes")
try:
# Ensure process_completion is called properly
future = loop.run_in_executor(executor, partial(evaluation_func, task, completion, reference, task_extra_info))
# future = loop.run_in_executor(executor, partial(evaluation_func, task, completion, reference, task_extra_info))
obj_ref = eval_wrapper.remote(evaluation_func, (task, completion, reference, task_extra_info))
future = loop.run_in_executor(None, ray.get, obj_ref)
return await asyncio.wait_for(future, timeout=timeout)
except asyncio.TimeoutError:
if random.randint(0, 99) > 0:
completion = completion[:80]
print(f"[Timeout] Task timeout: {completion}")
if random.randint(0, 199) == 0:
print(f"[Timeout] Task timeout: {completion}")
return None # Default value for timed-out rows
except Exception as e:
print(f"[Error] Task failed: {e}, completion: {completion[:80]}")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment