Commit 34d59b0b by Baruch Sterin

fixes to pyabc kill mechanism

parent 02081dba
......@@ -12,23 +12,35 @@ from contextlib import contextmanager, nested
import pyabc
def wait_with_timeout(p, timeout):
def popen_and_wait_with_timeout(timeout,cmd, *args, **kwargs):
""" Wait for a subprocess.Popen object to terminate, or until timeout (in seconds) expires. """
if timeout <= 0:
timeout = None
t = threading.Thread(target=lambda: p.wait())
t.start()
t.join(timeout)
if t.is_alive():
p.kill()
t.join()
return p.returncode
p = None
t = None
try:
p = subprocess.Popen(cmd, *args, **kwargs)
if timeout <= 0:
timeout = None
t = threading.Thread(target=lambda: p.communicate())
t.start()
t.join(timeout)
finally:
if p is not None and p.poll() is None:
p.kill()
if t is not None and t.is_alive():
t.join()
if p is not None:
return p.returncode
return -1
@contextmanager
def replace_sys_argv(argv):
......@@ -74,13 +86,11 @@ def run_reachx_cmd(effort, timeout):
'qua_ffix -effort %d -L %s'%(effort, cygpath(tmplog_name)),
'quit'
]
cmd = ["jabc", "-c", " ; ".join(cmdline)]
p = subprocess.Popen(cmd, shell=False, stdout=sys.stdout, stderr=sys.stderr)
rc = wait_with_timeout(p,timeout)
rc = popen_and_wait_with_timeout(timeout, cmd, shell=False, stdout=sys.stdout, stderr=sys.stderr)
if rc != 0:
# jabc failed or stopped. Write a status file to update the status to unknown
with open(tmplog_name, "w") as f:
......
......@@ -4,9 +4,9 @@
SystemName [ABC: Logic synthesis and verification system.]
PackageName [Signal handling utilities.]
PackageName []
Synopsis [Signal handling utilities.]
Synopsis []
Author [Baruch Sterin]
......@@ -18,19 +18,15 @@
***********************************************************************/
#include <main.h>
#include <stdlib.h>
#include <signal.h>
#include "abc_global.h"
#include "hashGen.h"
#include "utilSignal.h"
#ifndef _MSC_VER
#ifdef _MSC_VER
#define remove _remove
#else
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>
#endif
ABC_NAMESPACE_IMPL_START
......@@ -38,478 +34,36 @@ ABC_NAMESPACE_IMPL_START
/// DECLARATIONS ///
////////////////////////////////////////////////////////////////////////
static Hash_Gen_t* watched_pid_hash = NULL;
static Hash_Gen_t* watched_tmp_files_hash = NULL;
static sigset_t old_procmask;
////////////////////////////////////////////////////////////////////////
/// FUNCTION DEFINITIONS ///
////////////////////////////////////////////////////////////////////////
/**Function*************************************************************
Synopsis []
Description [Kills all watched child processes and remove all watched termporary files.]
SideEffects []
SeeAlso []
***********************************************************************/
void Util_SignalCleanup()
{
int i;
Hash_Gen_Entry_t* pEntry;
// kill all watched child processes
Hash_GenForEachEntry(watched_pid_hash, pEntry, i)
{
pid_t pid = (pid_t)(ABC_PTRINT_T)pEntry->key;
pid_t ppid = (pid_t)(ABC_PTRINT_T)pEntry->data;
if (getpid() == ppid)
{
kill(pid, SIGINT);
}
}
// remove watched temporary files
Hash_GenForEachEntry(watched_tmp_files_hash, pEntry, i)
{
const char* fname = (const char*)pEntry->key;
pid_t ppid = (pid_t)(ABC_PTRINT_T)pEntry->data;
if( getpid() == ppid )
{
remove(fname);
}
}
}
/**Function*************************************************************
Synopsis []
Description [Sets up data structures needed for cleanup in signal handler.]
SideEffects []
SeeAlso []
***********************************************************************/
void Util_SignalStartHandler()
{
watched_pid_hash = Hash_GenAlloc(100, Hash_DefaultHashFuncInt, Hash_DefaultCmpFuncInt, 0);
watched_tmp_files_hash = Hash_GenAlloc(100, Hash_DefaultHashFuncStr, (Hash_GenCompFunction_t)strcmp, 1);
}
/**Function*************************************************************
Synopsis []
Description [Frees data structures used for clean up in signal handler.]
SideEffects []
SeeAlso []
***********************************************************************/
void Util_SignalResetHandler()
{
sigset_t procmask, old_procmask;
sigemptyset(&procmask);
sigaddset(&procmask, SIGINT);
sigprocmask(SIG_BLOCK, &procmask, &old_procmask);
Hash_GenFree(watched_pid_hash);
watched_pid_hash = Hash_GenAlloc(100, Hash_DefaultHashFuncInt, Hash_DefaultCmpFuncInt, 0);
Hash_GenFree(watched_tmp_files_hash);
watched_tmp_files_hash = Hash_GenAlloc(100, Hash_DefaultHashFuncStr, (Hash_GenCompFunction_t)strcmp, 1);
sigprocmask(SIG_SETMASK, &old_procmask, NULL);
}
void Util_SignalStopHandler()
{
Hash_GenFree(watched_pid_hash);
watched_pid_hash = NULL;
Hash_GenFree(watched_tmp_files_hash);
watched_tmp_files_hash = NULL;
}
/**Function*************************************************************
Synopsis []
Description [Blocks SIGINT. For use when updating watched processes and temporary files to prevent race conditions with the signal handler.]
SideEffects []
SeeAlso []
***********************************************************************/
static int nblocks = 0;
void Util_SignalBlockSignals()
{
sigset_t procmask;
assert(nblocks==0);
nblocks ++ ;
sigemptyset(&procmask);
sigaddset(&procmask, SIGINT);
sigprocmask(SIG_BLOCK, &procmask, &old_procmask);
}
/**Function*************************************************************
Synopsis []
Description [Unblocks SIGINT after a call to Util_SignalBlockSignals.]
SideEffects []
SeeAlso []
***********************************************************************/
void Util_SignalUnblockSignals()
{
assert( nblocks==1);
nblocks--;
sigprocmask(SIG_SETMASK, &old_procmask, NULL);
}
static void watch_tmp_file(const char* fname)
{
if( watched_tmp_files_hash != NULL )
{
Hash_GenWriteEntry(watched_tmp_files_hash, Extra_UtilStrsav(fname), (void*)getpid() );
}
}
static void unwatch_tmp_file(const char* fname)
{
if ( watched_tmp_files_hash )
{
assert( Hash_GenExists(watched_tmp_files_hash, (void*)fname) );
Hash_GenRemove(watched_tmp_files_hash, (void*)fname);
assert( !Hash_GenExists(watched_tmp_files_hash, (void*)fname) );
}
}
/**Function*************************************************************
Synopsis []
Description [Adds a process id to the list of processes that should be killed in a signal handler.]
SideEffects []
SeeAlso []
***********************************************************************/
void Util_SignalAddChildPid(int pid)
{
if ( watched_pid_hash )
{
Hash_GenWriteEntry(watched_pid_hash, (void*)pid, (void*)getpid());
}
}
/**Function*************************************************************
Synopsis []
Description [Removes a process id from the list of processes that should be killed in a signal handler.]
SideEffects []
SeeAlso []
***********************************************************************/
void Util_SignalRemoveChildPid(int pid)
{
if ( watched_pid_hash )
{
Hash_GenRemove(watched_pid_hash, (void*)pid);
}
}
// a dummy signal hanlder to make sure that SIGCHLD and SIGINT will cause sigsuspend() to return
static void null_sig_handler(int signum)
{
}
// enusre that sigsuspend() returns when signal signum occurs -- sigsuspend() does not return if a signal is ignored
static void replace_sighandler(int signum, struct sigaction* old_sa, int replace_dfl)
{
sigaction(signum, NULL, old_sa);
if( old_sa->sa_handler == SIG_IGN || old_sa->sa_handler==SIG_DFL && replace_dfl)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = null_sig_handler;
sigaction(signum, &sa, old_sa);
}
}
//
static int do_waitpid(pid_t pid, sigset_t* old_procmask)
{
int status;
struct sigaction sigint_sa;
struct sigaction sigchld_sa;
sigset_t waitmask;
// ensure SIGINT and SIGCHLD are not blocked during sigsuspend()
memcpy(&waitmask, old_procmask, sizeof(sigset_t));
sigdelset(&waitmask, SIGINT);
sigdelset(&waitmask, SIGCHLD);
// ensure sigsuspend() returns if SIGINT or SIGCHLD occur, and save the current settings for SIGCHLD and SIGINT
replace_sighandler(SIGINT, &sigint_sa, 0);
replace_sighandler(SIGCHLD, &sigchld_sa, 1);
for(;;)
{
int rc;
// wait for a signal -- returns if SIGINT or SIGCHLD (or any other signal that is unblocked and not ignored) occur
sigsuspend(&waitmask);
// check if pid has terminated
rc = waitpid(pid, &status, WNOHANG);
// stop if terminated or some other error occurs
if( rc > 0 || rc == -1 && errno!=EINTR )
{
break;
}
}
// process is dead, should no longer be watched
Util_SignalRemoveChildPid(pid);
// restore original behavior of SIGINT and SIGCHLD
sigaction(SIGINT, &sigint_sa, NULL);
sigaction(SIGCHLD, &sigchld_sa, NULL);
return status;
}
static int do_system(const char* cmd, sigset_t* old_procmask)
{
int pid;
pid = fork();
if (pid == -1)
{
// fork failed
return -1;
}
else if( pid == 0)
{
// child process
sigprocmask(SIG_SETMASK, old_procmask, NULL);
execl("/bin/sh", "sh", "-c", cmd, NULL);
_exit(127);
}
Util_SignalAddChildPid(pid);
return do_waitpid(pid, old_procmask);
}
/**Function*************************************************************
Synopsis []
Description [Replaces system() with a function that allows SIGINT to interrupt.]
SideEffects []
SeeAlso []
***********************************************************************/
int Util_SignalSystem(const char* cmd)
{
int status;
sigset_t procmask;
sigset_t old_procmask;
// if signal handler is not installed, run the original system()
if ( ! watched_pid_hash && ! watched_tmp_files_hash )
return system(cmd);
// block SIGINT and SIGCHLD
sigemptyset(&procmask);
sigaddset(&procmask, SIGINT);
sigaddset(&procmask, SIGCHLD);
sigprocmask(SIG_BLOCK, &procmask, &old_procmask);
// call the actual function
status = do_system(cmd, &old_procmask);
// restore signal block mask
sigprocmask(SIG_SETMASK, &old_procmask, NULL);
return status;
}
/**Function*************************************************************
Synopsis []
Description []
SideEffects []
SeeAlso []
***********************************************************************/
////////////////////////////////////////////////////////////////////////
/// END OF FILE ///
////////////////////////////////////////////////////////////////////////
#else /* #ifndef _MSC_VER */
#include "abc_global.h"
ABC_NAMESPACE_IMPL_START
void Util_SignalCleanup()
{
}
void Util_SignalStartHandler()
{
}
void Util_SignalResetHandler()
{
}
void Util_SignalStopHandler()
{
}
void Util_SignalBlockSignals()
{
}
void Util_SignalUnblockSignals()
{
}
void watch_tmp_file(const char* fname)
{
}
void unwatch_tmp_file(const char* fname)
{
}
void Util_SignalAddChildPid(int pid)
{
}
void Util_SignalRemoveChildPid(int pid)
{
}
#ifndef ABC_PYTHON_EMBED
int Util_SignalSystem(const char* cmd)
{
return system(cmd);
}
#endif /* #ifdef _MSC_VER */
int tmpFile(const char* prefix, const char* suffix, char** out_name);
/**Function*************************************************************
Synopsis []
Description [Create a temporary file and add it to the list of files to be cleaned up in the signal handler.]
SideEffects []
SeeAlso []
***********************************************************************/
int Util_SignalTmpFile(const char* prefix, const char* suffix, char** out_name)
{
int fd;
Util_SignalBlockSignals();
fd = tmpFile(prefix, suffix, out_name);
if ( fd != -1 )
{
watch_tmp_file( *out_name );
}
Util_SignalUnblockSignals();
return fd;
return tmpFile(prefix, suffix, out_name);
}
/**Function*************************************************************
Synopsis []
Description [Remove a temporary file (and remove it from the watched files list.]
SideEffects []
SeeAlso []
***********************************************************************/
void Util_SignalTmpFileRemove(const char* fname, int fLeave)
{
Util_SignalBlockSignals();
unwatch_tmp_file(fname);
if (! fLeave)
{
remove(fname);
}
Util_SignalUnblockSignals();
}
#endif /* ABC_PYTHON_EMBED */
ABC_NAMESPACE_IMPL_END
////////////////////////////////////////////////////////////////////////
/// END OF FILE ///
////////////////////////////////////////////////////////////////////////
......@@ -44,14 +44,7 @@ ABC_NAMESPACE_HEADER_START
////////////////////////////////////////////////////////////////////////
/*=== utilSignal.c ==========================================================*/
extern void Util_SignalCleanup();
extern void Util_SignalStartHandler();
extern void Util_SignalResetHandler();
extern void Util_SignalStopHandler();
extern void Util_SignalBlockSignals();
extern void Util_SignalUnblockSignals();
extern void Util_SignalAddChildPid(int pid);
extern void Util_SignalRemoveChildPid(int pid);
extern int Util_SignalTmpFile(const char* prefix, const char* suffix, char** out_name);
extern void Util_SignalTmpFileRemove(const char* fname, int fLeave);
extern int Util_SignalSystem(const char* cmd);
......
......@@ -61,4 +61,21 @@ pyabc.tgz : $(PROG) $(ABC_PYTHON_SRC:_wrap.c=.py) $(ABC_PYTHON_FILES_PREFIX)/abc
--out=$@ \
$(ABC_PYTHON_OPTIONS)
PYABC_INSTALL_TARGET ?= $(shell date +%Y-%m-%d_%H-%M.%N_${USER})
PYABC_INSTALL_TARGET := $(PYABC_INSTALL_TARGET)
PYABC_INSTALL_DIR ?= /hd/common/pyabc/builds/pyabc_builds/
.PHONY: zzz
pyabc_install_target: pyabc_extension_bdist
mkdir -p "$(PYABC_INSTALL_DIR)/$(PYABC_INSTALL_TARGET)"
tar \
--directory="$(PYABC_INSTALL_DIR)/$(PYABC_INSTALL_TARGET)" \
--show-transformed-names \
--transform='s#^.*/##g' \
-xvzf "$(ABC_PYTHON_FILES_PREFIX)/dist/pyabc-1.0.linux-x86_64.tar.gz"
find "$(PYABC_INSTALL_DIR)/$(PYABC_INSTALL_TARGET)/"* -type d | xargs rmdir
echo "Installed at $(PYABC_INSTALL_DIR)/$(PYABC_INSTALL_TARGET)"
endif
......@@ -23,9 +23,18 @@
%{
#include <main.h>
#include <utilCex.h>
#include <stdlib.h>
#include <signal.h>
#include "utilSignal.h"
#include <sys/prctl.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
int n_ands()
{
......@@ -160,6 +169,56 @@ int n_phases()
return pNtk ? Abc_NtkPhaseFrameNum(pNtk) : 1;
}
Abc_Cex_t* _cex_get()
{
Abc_Frame_t* pAbc = Abc_FrameGetGlobalFrame();
Abc_Cex_t* pCex = Abc_FrameReadCex(pAbc);
if ( ! pCex )
{
return NULL;
}
return Abc_CexDup( pCex, -1 );
}
void _cex_put(Abc_Cex_t* pCex)
{
Abc_Frame_t* pAbc = Abc_FrameGetGlobalFrame();
if ( pCex )
{
pCex = Abc_CexDup(pCex, -1);
}
Abc_FrameSetCex( Abc_CexDup(pCex, -1) );
}
void _cex_free(Abc_Cex_t* pCex)
{
Abc_CexFree(pCex);
}
int _cex_n_regs(Abc_Cex_t* pCex)
{
return pCex->nRegs;
}
int _cex_n_pis(Abc_Cex_t* pCex)
{
return pCex->nPis;
}
int _cex_get_po(Abc_Cex_t* pCex)
{
return pCex->iPo;
}
int _cex_get_frame(Abc_Cex_t* pCex)
{
return pCex->iFrame;
}
static PyObject* pyabc_internal_python_command_callback = 0;
void pyabc_internal_set_command_callback( PyObject* callback )
......@@ -170,6 +229,8 @@ void pyabc_internal_set_command_callback( PyObject* callback )
pyabc_internal_python_command_callback = callback;
}
PyThreadState *_save;
static int pyabc_internal_abc_command_callback(Abc_Frame_t * pAbc, int argc, char ** argv)
{
int i;
......@@ -183,6 +244,8 @@ static int pyabc_internal_abc_command_callback(Abc_Frame_t * pAbc, int argc, cha
if ( !pyabc_internal_python_command_callback )
return 0;
Py_BLOCK_THREADS
args = PyList_New(argc);
for( i=0 ; i<argc ; i++ )
......@@ -196,19 +259,30 @@ static int pyabc_internal_abc_command_callback(Abc_Frame_t * pAbc, int argc, cha
if ( !res )
{
Py_UNBLOCK_THREADS
return -1;
}
lres = PyInt_AsLong(res);
Py_DECREF(res);
Py_UNBLOCK_THREADS
return lres;
}
int run_command(char* cmd)
{
Abc_Frame_t* pAbc = Abc_FrameGetGlobalFrame();
return Cmd_CommandExecute(pAbc, cmd);
int rc;
Py_UNBLOCK_THREADS
rc = Cmd_CommandExecute(pAbc, cmd);
Py_BLOCK_THREADS
return rc;
}
void pyabc_internal_register_command( char * sGroup, char * sName, int fChanges )
......@@ -218,45 +292,212 @@ void pyabc_internal_register_command( char * sGroup, char * sName, int fChanges
Cmd_CommandAdd( pAbc, sGroup, sName, (void*)pyabc_internal_abc_command_callback, fChanges);
}
static void sigint_handler(int signum)
static int sigchld_pipe_fd = -1;
static void sigchld_handler(int signum)
{
while( write(sigchld_pipe_fd, "", 1) == -1 && errno==EINTR )
;
}
static void install_sigchld_handler(int sigchld_fd)
{
Util_SignalCleanup();
_exit(1);
sigchld_pipe_fd = sigchld_fd;
signal(SIGCHLD, sigchld_handler);
}
void add_child_pid(int pid)
static int sigint_pipe_fd = -1;
static void sigint_handler(int signum)
{
Util_SignalAddChildPid(pid);
unsigned char tmp = (unsigned char)signum;
while( write(sigint_pipe_fd, &tmp, 1) == -1 && errno==EINTR )
;
}
void remove_child_pid(int pid)
static void install_sigint_handler(int sigint_fd)
{
Util_SignalRemoveChildPid(pid);
sigint_pipe_fd = sigint_fd;
signal(SIGINT, sigint_handler);
// try to catch other signals that ask the process to terminate
signal(SIGABRT, sigint_handler);
signal(SIGQUIT, sigint_handler);
signal(SIGTERM, sigint_handler);
// try to ensure cleanup on exceptional conditions
signal(SIGBUS, sigint_handler);
signal(SIGILL, sigint_handler);
signal(SIGSEGV, sigint_handler);
// try to ensure cleanup before being killed due to resource limit
signal(SIGXCPU, sigint_handler);
signal(SIGXFSZ, sigint_handler);
}
sigset_t old_procmask;
static int nblocks = 0;
void block_sigint()
{
Util_SignalBlockSignals();
sigset_t procmask;
assert(nblocks==0);
nblocks ++ ;
sigemptyset(&procmask);
sigaddset(&procmask, SIGINT);
sigprocmask(SIG_BLOCK, &procmask, &old_procmask);
}
void restore_sigint_block()
void unblock_sigint()
{
Util_SignalUnblockSignals();
assert( nblocks==1);
nblocks--;
sigprocmask(SIG_SETMASK, &old_procmask, NULL);
}
void reset_sigint_handler()
static PyObject* pyabc_internal_system_callback = 0;
static PyObject* pyabc_internal_tmpfile_callback = 0;
static PyObject* pyabc_internal_tmpfile_remove_callback = 0;
int Util_SignalSystem(const char* cmd)
{
Util_SignalResetHandler();
PyObject* arglist;
PyObject* res;
long lres;
if ( !pyabc_internal_system_callback )
return -1;
Py_BLOCK_THREADS
arglist = Py_BuildValue("(O)", PyString_FromString(cmd));
Py_INCREF(arglist);
res = PyEval_CallObject( pyabc_internal_system_callback, arglist );
Py_DECREF(arglist);
if ( !res )
{
Py_UNBLOCK_THREADS
return -1;
}
lres = PyInt_AsLong(res);
Py_DECREF(res);
Py_UNBLOCK_THREADS
return lres;
}
int Util_SignalTmpFile(const char* prefix, const char* suffix, char** out_name)
{
char* str;
Py_ssize_t size;
PyObject* arglist;
PyObject* res;
*out_name = NULL;
if ( !pyabc_internal_tmpfile_callback )
return 0;
Py_BLOCK_THREADS
arglist = Py_BuildValue("(ss)", prefix, suffix);
Py_INCREF(arglist);
res = PyEval_CallObject( pyabc_internal_tmpfile_callback, arglist );
Py_DECREF(arglist);
if ( !res )
{
Py_UNBLOCK_THREADS
return -1;
}
PyString_AsStringAndSize(res, &str, &size);
*out_name = ABC_ALLOC(char, size+1);
strcpy(*out_name, str);
Py_DECREF(res);
Py_UNBLOCK_THREADS
return open(*out_name, O_WRONLY);
}
void Util_SignalTmpFileRemove(const char* fname, int fLeave)
{
PyObject* arglist;
PyObject* res;
if ( !pyabc_internal_tmpfile_remove_callback )
return;
Py_BLOCK_THREADS
arglist = Py_BuildValue("(si)", fname, fLeave);
Py_INCREF(arglist);
res = PyEval_CallObject( pyabc_internal_tmpfile_remove_callback, arglist );
Py_DECREF(arglist);
Py_XDECREF(res);
Py_UNBLOCK_THREADS
}
void pyabc_internal_set_util_callbacks( PyObject* system_callback, PyObject* tmpfile_callback, PyObject* tmpfile_remove_callback )
{
Py_XINCREF(system_callback);
Py_XDECREF(pyabc_internal_system_callback);
pyabc_internal_system_callback = system_callback;
Py_XINCREF(tmpfile_callback);
Py_XDECREF(pyabc_internal_tmpfile_callback);
pyabc_internal_tmpfile_callback = tmpfile_callback;
Py_XINCREF(tmpfile_remove_callback);
Py_XDECREF(pyabc_internal_tmpfile_remove_callback);
pyabc_internal_tmpfile_remove_callback = tmpfile_remove_callback;
}
PyObject* _wait_no_hang()
{
int status;
int pid;
pid = wait3(&status, WNOHANG, NULL);
return Py_BuildValue("(iii)", pid, status, errno);
}
int _posix_kill(int pid, int signum)
{
return kill(pid, signum);
}
void _set_death_signal()
{
prctl(PR_SET_PDEATHSIG, SIGINT);
}
%}
%init
%{
Abc_Start();
Util_SignalStartHandler();
signal(SIGINT, sigint_handler);
%}
int n_ands();
......@@ -281,18 +522,442 @@ int cex_frame();
int n_phases();
Abc_Cex_t* _cex_get();
void _cex_put(Abc_Cex_t* pCex);
void _cex_free(Abc_Cex_t* pCex);
int _cex_n_regs(Abc_Cex_t* pCex);
int _cex_n_pis(Abc_Cex_t* pCex);
int _cex_get_po(Abc_Cex_t* pCex);
int _cex_get_frame(Abc_Cex_t* pCex);
void pyabc_internal_set_command_callback( PyObject* callback );
void pyabc_internal_register_command( char * sGroup, char * sName, int fChanges );
void install_sigchld_handler(int sigint_fd);
void install_sigint_handler(int sigint_fd);
void block_sigint();
void restore_sigint_block();
void add_child_pid(int pid);
void remove_child_pid(int pid);
void reset_sigint_handler();
void unblock_sigint();
void pyabc_internal_set_util_callbacks( PyObject* system_callback, PyObject* tmpfile_callback, PyObject* tmpfile_remove_callback );
PyObject* _wait_no_hang();
void _set_death_signal();
int _posix_kill(int pid, int signum);
void _set_death_signal();
%pythoncode
%{
class _Cex(object):
def __init__(self, pCex):
self.pCex = pCex
def __del__(self):
_cex_free(self.pCex)
def n_regs(self):
return _cex_n_regs(self.pCex)
def n_pis(self):
return _cex_n_pis(self.pCex)
def get_po(self):
return _cex_get_po(self.pCex)
def get_frame(self):
return _cex_get_frame(self.pCex)
def cex_get():
cex = _cex_get()
if cex is None:
return None
return _Cex(_cex_get())
def cex_put(cex):
assert cex is not None
assert cex.pCex is not None
return _cex_put(cex.pCex)
import threading
import select
import signal
import tempfile
import os
import errno
import sys, traceback
import subprocess
_active_lock = threading.Lock()
_die_flag = False
_active_pids = set()
_active_temp_files = set()
_terminated_pids_cond = threading.Condition(_active_lock)
_terminated_pids = {}
def add_temp_file(fname):
with _active_lock:
_active_temp_files.add(fname)
def remove_temp_file(fname):
with _active_lock:
_active_temp_files.remove(fname)
_old_os_wait3 = os.wait3
_select_select = select.select
def _retry_select(fd):
while True:
try:
rrdy,_,_ = _select_select([fd],[],[])
if fd in rrdy:
return
except select.error as e:
if e[0] == errno.EINTR:
continue
raise
def _retry_read(fd):
while True:
try:
return fd.read(1)
except OSError as e:
if e.errno == errno.EINTR:
continue
raise
def _retry_wait():
while True:
pid, status, e = _wait_no_hang()
if pid>0:
return pid, status
elif pid==0:
return 0,0
elif pid == -1 and e == errno.ECHILD:
return 0,0
elif pid==-1 and e != errno.EINTR:
raise OSError(e, 'unknown error in wait3()')
def _sigint_wait_thread_func(fd):
global _die_flag
while True:
_retry_select(fd)
_retry_read(fd)
with _active_lock:
if _die_flag:
os._exit(-1)
_die_flag = True
for pid in _active_pids:
rc = _posix_kill(pid, signal.SIGINT)
for fname in _active_temp_files:
os.remove(fname)
os._exit(-1)
def _child_wait_thread_func(fd):
while True:
_retry_select(fd)
rc = _retry_read(fd)
with _active_lock:
while True:
pid, status = _retry_wait()
if pid==0:
break
if pid in _active_pids:
_active_pids.remove(pid)
_terminated_pids[pid] = status
_terminated_pids_cond.notifyAll()
_sigint_pipe_read_fd = -1
_sigint_pipe_write_fd = -1
_sigchld_pipe_read_fd = -1
_sigchld_pipe_write_fd = -1
def _start_threads():
global _sigint_pipe_read_fd, _sigint_pipe_write_fd
_sigint_pipe_read_fd, _sigint_pipe_write_fd = os.pipe()
sigint_read = os.fdopen(_sigint_pipe_read_fd, "r", 0 )
sigint_wait_thread = threading.Thread(target=_sigint_wait_thread_func, name="SIGINT wait thread", args=(sigint_read,))
sigint_wait_thread.setDaemon(True)
sigint_wait_thread.start()
install_sigint_handler(_sigint_pipe_write_fd)
global _sigchld_pipe_read_fd, _sigchld_pipe_write_fd
_sigchld_pipe_read_fd, _sigchld_pipe_write_fd = os.pipe()
sigchld_read = os.fdopen(_sigchld_pipe_read_fd, "r", 0 )
child_wait_thread = threading.Thread(target=_child_wait_thread_func, name="child process wait thread", args=(sigchld_read,))
child_wait_thread.setDaemon(True)
child_wait_thread.start()
install_sigchld_handler(_sigchld_pipe_write_fd)
_close_on_fork = []
def close_on_fork(fd):
_close_on_fork.append(fd)
def after_fork():
_set_death_signal()
global _close_on_fork
for fd in _close_on_fork:
os.close(fd)
_close_on_fork = []
os.close(_sigint_pipe_read_fd)
os.close(_sigint_pipe_write_fd)
os.close(_sigchld_pipe_read_fd)
os.close(_sigchld_pipe_write_fd)
global _active_lock
_active_lock = threading.Lock()
global _terminated_pids_cond
_terminated_pids_cond = threading.Condition(_active_lock)
global _terminated_pids
_terminated_pids = {}
global _active_pids
_active_pids = set()
global _active_temp_files
_active_temp_files = set()
_start_threads()
class _sigint_block_section(object):
def __init__(self):
self.blocked = False
def __enter__(self):
block_sigint()
self.blocked = True
def __exit__(self, type, value, traceback):
self.release()
def release(self):
if self.blocked:
self.blocked = False
unblock_sigint()
_old_os_fork = os.fork
def _fork():
ppid = os.getpid()
with _sigint_block_section() as cs:
with _active_lock:
if _die_flag:
os._exit(-1)
pid = _old_os_fork()
if pid == 0:
after_fork()
if pid > 0:
_active_pids.add(pid)
return pid
def _waitpid(pid, options=0):
while True:
with _active_lock:
if pid in _terminated_pids:
status = _terminated_pids[pid]
del _terminated_pids[pid]
return pid, status
if options==os.WNOHANG:
return 0, 0
_terminated_pids_cond.wait()
def _wait(options=0):
while True:
with _active_lock:
for pid, status in _terminated_pids.iteritems():
del _terminated_pids[pid]
return pid, status
if options==os.WNOHANG:
return 0, 0
_terminated_pids_cond.wait()
_old_os_kill = os.kill
def _kill(pid, sig):
with _active_lock:
if pid in _terminated_pids:
return None
return _old_os_kill(pid,sig)
os.kill = _kill
os.fork = _fork
os.wait = _wait
os.waitpid = _waitpid
def _split_command_line(cmd):
args = []
i=0
while i<len(cmd):
while i<len(cmd) and cmd[i] in [' ','\t','\f']:
i += 1
if i >= len(cmd):
break
arg = []
in_quotes = None
while i<len(cmd):
if not in_quotes and cmd[i] in ['\'','\"','\'']:
in_quotes = cmd[i]
elif in_quotes and cmd[i]==in_quotes:
in_quotes = None
elif cmd[i] == '\\' and i<(len(cmd)+1):
i += 1
if cmd[i]=='\\':
arg.append('\\')
elif cmd[i]=='\'':
arg.append('\'')
elif cmd[i]=='\"':
arg.append('\'')
elif cmd[i]=='\"':
arg.append('\"')
elif cmd[i]=='a':
arg.append('\a')
elif cmd[i]=='b':
arg.append('\b')
elif cmd[i]=='n':
arg.append('\n')
elif cmd[i]=='f':
arg.append('\f')
elif cmd[i]=='r':
arg.append('\r')
elif cmd[i]=='t':
arg.append('\t')
elif cmd[i]=='v':
arg.append('\v')
else:
arg.append(cmd[i])
elif not in_quotes and cmd[i] in [' ','\t','\f']:
break
else:
arg.append(cmd[i])
i += 1
args.append( "".join(arg) )
return args
def system(cmd):
args = _split_command_line(cmd)
if args[-2] == '>':
with open(args[-1],'w') as fout:
p = subprocess.Popen(args[:-2], stdout=fout)
rc = p.wait()
return rc
else:
p = subprocess.Popen(args)
return p.wait()
def tmpfile(prefix, suffix):
with _active_lock:
with tempfile.NamedTemporaryFile(delete=False, prefix=prefix, suffix=suffix) as file:
_active_temp_files.add(file.name)
return file.name
def tmpfile_remove(fname, leave):
with _active_lock:
os.remove(fname)
_active_temp_files.remove(fname)
pyabc_internal_set_util_callbacks( system, tmpfile,tmpfile_remove )
_start_threads()
_registered_commands = {}
def _cmd_callback(args):
......@@ -309,7 +974,8 @@ def _cmd_callback(args):
return res
except Exception, e:
print "Python error: ", e
import traceback
traceback.print_exc()
except SystemExit, se:
pass
......@@ -322,14 +988,11 @@ def add_abc_command(fcmd, group, cmd, change):
_registered_commands[ cmd ] = fcmd
pyabc_internal_register_command( group, cmd, change)
import sys
import optparse
import os.path
import __main__
xxx = {}
def cmd_python(cmd_args):
global __main__
usage = "usage: %prog [options] <Python files>"
......@@ -345,7 +1008,7 @@ def cmd_python(cmd_args):
return 0
if options.cmd:
exec options.cmd in __main__.__dict__
exec options.cmd in xxx
return 0
scripts_dir = os.getenv('ABC_PYTHON_SCRIPTS', ".")
......@@ -353,12 +1016,12 @@ def cmd_python(cmd_args):
for fname in args[1:]:
if os.path.isabs(fname):
execfile(fname, __main__.__dict__)
execfile(fname, xxx)
else:
for d in scripts_dirs:
fname = os.path.join(scripts_dir, fname)
if os.path.exists(fname):
execfile(fname, __main__.__dict__)
execfile(fname, xxx)
break
return 0
......
......@@ -15,7 +15,6 @@ Caveats:
1. Global variables in the parent process are not affected by the child processes.
2. The functions can only return simple types, see the pickle module for details
3. Signals are currently not handled correctly
Usage:
......@@ -91,47 +90,6 @@ from contextlib import contextmanager
import pyabc
def _waitpid(pid, flags):
while True:
try:
res = os.waitpid(pid, flags)
return res
except OSError as e:
if e.errno != errno.EINTR:
raise
def _wait():
while True:
try:
pid,rc = os.wait()
return pid, rc
except OSError as e:
if e.errno != errno.EINTR:
raise
except Exceptions as e:
raise
class _sigint_critical_section(object):
def __init__(self):
self.blocked = False
def __enter__(self):
self.acquire()
return self
def __exit__(self, type, value, traceback):
self.release()
def acquire(self):
if not self.blocked:
self.blocked = True
pyabc.block_sigint()
def release(self):
if self.blocked:
self.blocked = False
pyabc.restore_sigint_block()
class _splitter(object):
def __init__(self, funcs):
......@@ -144,18 +102,19 @@ class _splitter(object):
return len(self.fds) == 0
def cleanup(self):
# close pipes and kill child processes
for pid,(i,fd) in self.fds.iteritems():
os.close(fd)
os.kill( pid, signal.SIGINT )
try:
os.kill( pid, signal.SIGINT )
except Exception as e:
print >>sys.stderr, 'exception while trying to kill pid=%d: '%pid, e
raise
with _sigint_critical_section() as cs:
# wait for termination and update result
for pid, _ in self.fds.iteritems():
_waitpid( pid, 0 )
pyabc.remove_child_pid(pid)
self.results[pid] = None
for pid, _ in self.fds.iteritems():
os.waitpid( pid, 0 )
self.results[pid] = None
self.fds = {}
......@@ -179,22 +138,20 @@ class _splitter(object):
try:
with _sigint_critical_section() as cs:
# create child process
pid = os.fork()
if pid == 0:
# child process:
pyabc.reset_sigint_handler()
cs.release()
os.close(pr)
rc = self.child( pw, f)
os._exit(rc)
else:
# parent process:
pyabc.add_child_pid(pid)
os.close(pw)
return (pid, pr)
# create child process
pid = os.fork()
if pid == 0:
# child process:
os.close(pr)
pyabc.close_on_fork(pw)
rc = self.child( pw, f)
os._exit(rc)
else:
# parent process:
os.close(pw)
return (pid, pr)
finally:
if os.getpid() != parentpid:
......@@ -209,12 +166,9 @@ class _splitter(object):
def get_next_result(self):
# wait for the next child process to terminate
pid, rc = _wait()
pid, rc = os.wait()
assert pid in self.fds
with _sigint_critical_section() as cs:
pyabc.remove_child_pid(pid)
# retrieve the pipe file descriptor1
i, fd = self.fds[pid]
del self.fds[pid]
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment