Commit e6be7ddf by Baruch Sterin

pyabc: allow returning large result from sub processes

parent e9286513
...@@ -755,6 +755,16 @@ def _retry_read(fd): ...@@ -755,6 +755,16 @@ def _retry_read(fd):
continue continue
raise raise
def _retry_os_read(fd):
while True:
try:
return os.read(fd, 1)
except OSError as e:
if e.errno == errno.EINTR:
continue
raise
def _retry_wait(): def _retry_wait():
while True: while True:
...@@ -817,6 +827,7 @@ def _child_wait_thread_func(fd): ...@@ -817,6 +827,7 @@ def _child_wait_thread_func(fd):
_active_pids.remove(pid) _active_pids.remove(pid)
_terminated_pids[pid] = status _terminated_pids[pid] = status
os.write(_wait_fd_write, "1")
_terminated_pids_cond.notifyAll() _terminated_pids_cond.notifyAll()
_sigint_pipe_read_fd = -1 _sigint_pipe_read_fd = -1
...@@ -825,8 +836,14 @@ _sigint_pipe_write_fd = -1 ...@@ -825,8 +836,14 @@ _sigint_pipe_write_fd = -1
_sigchld_pipe_read_fd = -1 _sigchld_pipe_read_fd = -1
_sigchld_pipe_write_fd = -1 _sigchld_pipe_write_fd = -1
wait_fd = -1
_wait_fd_write = -1
def _start_threads(): def _start_threads():
global wait_fd, _wait_fd_write
wait_fd, _wait_fd_write = os.pipe()
global _sigint_pipe_read_fd, _sigint_pipe_write_fd global _sigint_pipe_read_fd, _sigint_pipe_write_fd
_sigint_pipe_read_fd, _sigint_pipe_write_fd = os.pipe() _sigint_pipe_read_fd, _sigint_pipe_write_fd = os.pipe()
...@@ -865,6 +882,9 @@ def after_fork(): ...@@ -865,6 +882,9 @@ def after_fork():
_close_on_fork = [] _close_on_fork = []
os.close(wait_fd)
os.close(_wait_fd_write)
os.close(_sigint_pipe_read_fd) os.close(_sigint_pipe_read_fd)
os.close(_sigint_pipe_write_fd) os.close(_sigint_pipe_write_fd)
...@@ -934,6 +954,7 @@ def _waitpid(pid, options=0): ...@@ -934,6 +954,7 @@ def _waitpid(pid, options=0):
with _active_lock: with _active_lock:
if pid in _terminated_pids: if pid in _terminated_pids:
_retry_os_read(wait_fd)
status = _terminated_pids[pid] status = _terminated_pids[pid]
del _terminated_pids[pid] del _terminated_pids[pid]
return pid, status return pid, status
...@@ -950,6 +971,7 @@ def _wait(options=0): ...@@ -950,6 +971,7 @@ def _wait(options=0):
with _active_lock: with _active_lock:
for pid, status in _terminated_pids.iteritems(): for pid, status in _terminated_pids.iteritems():
_retry_os_read(wait_fd)
del _terminated_pids[pid] del _terminated_pids[pid]
return pid, status return pid, status
......
...@@ -82,20 +82,36 @@ Author: Baruch Sterin <sterin@berkeley.edu> ...@@ -82,20 +82,36 @@ Author: Baruch Sterin <sterin@berkeley.edu>
""" """
import os import os
import select
import fcntl
import errno import errno
import sys import sys
import pickle import cPickle as pickle
import signal import signal
import cStringIO
from contextlib import contextmanager from contextlib import contextmanager
import pyabc import pyabc
def _retry_select(rlist):
while True:
try:
rrdy,_,_ = select.select(rlist,[],[])
if rrdy:
return rrdy
except select.error as e:
if e[0] == errno.EINTR:
continue
raise
class _splitter(object): class _splitter(object):
def __init__(self, funcs): def __init__(self, funcs):
self.funcs = funcs self.funcs = funcs
self.pids = [] self.pids = []
self.fds = {} self.fds = {}
self.buffers = {}
self.results = {} self.results = {}
def is_done(self): def is_done(self):
...@@ -117,6 +133,7 @@ class _splitter(object): ...@@ -117,6 +133,7 @@ class _splitter(object):
self.results[pid] = None self.results[pid] = None
self.fds = {} self.fds = {}
self.buffers = {}
def child( self, fdw, f): def child( self, fdw, f):
# call function # call function
...@@ -133,6 +150,9 @@ class _splitter(object): ...@@ -133,6 +150,9 @@ class _splitter(object):
# create a pipe to communicate with the child process # create a pipe to communicate with the child process
pr,pw = os.pipe() pr,pw = os.pipe()
# set pr to be non-blocking
fcntl.fcntl(pr, fcntl.F_SETFL, os.O_NONBLOCK)
parentpid = os.getpid() parentpid = os.getpid()
rc = 1 rc = 1
...@@ -162,25 +182,55 @@ class _splitter(object): ...@@ -162,25 +182,55 @@ class _splitter(object):
pid, fd = self.fork_one(f) pid, fd = self.fork_one(f)
self.pids.append(pid) self.pids.append(pid)
self.fds[pid] = (i,fd) self.fds[pid] = (i,fd)
self.buffers[fd] = cStringIO.StringIO()
def communicate(self):
rlist = [ fd for _, (_,fd) in self.fds.iteritems() ]
rlist.append(pyabc.wait_fd)
stop = False
while not stop:
rrdy = _retry_select( rlist )
for fd in rrdy:
if fd == pyabc.wait_fd:
stop = True
continue
self.buffers[fd].write( os.read(fd, 16384) )
def get_next_result(self): def get_next_result(self):
# read from the pipes as needed, while waiting for the next child process to terminate
self.communicate()
# wait for the next child process to terminate # wait for the next child process to terminate
pid, rc = os.wait() pid, rc = os.wait()
assert pid in self.fds assert pid in self.fds
# retrieve the pipe file descriptor1 # retrieve the pipe file descriptor
i, fd = self.fds[pid] i, fd = self.fds[pid]
del self.fds[pid] del self.fds[pid]
assert pid not in self.fds # retrieve the buffer
buffer = self.buffers[fd]
# read result from file del self.buffers[fd]
with os.fdopen( fd, "r" ) as fin:
try: # fill the buffer
return (i,pickle.load(fin)) while True:
except EOFError, pickle.UnpicklingError: s = os.read(fd, 16384)
return (i, None) if not s:
break
buffer.write(s)
try:
return (i, pickle.loads(buffer.getvalue()))
except EOFError, pickle.UnpicklingError:
return (i, None)
@contextmanager @contextmanager
def _splitter_wrapper(funcs): def _splitter_wrapper(funcs):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment