roslaunch: #1229 #2096 roslaunch no longer blocks on SIGKILL. Also implemented parallel killing of processes with 10 workers. The latter exposes a lot of race condition issues in shutdown with nodes.

This commit is contained in:
Ken Conley 2009-12-03 00:58:02 +00:00
parent a7be41a519
commit aa9d8c6504
5 changed files with 120 additions and 48 deletions

View File

@ -119,7 +119,7 @@ class ProcessMock(roslaunch.pmon.Process):
def __init__(self, package, name, args, env, respawn=False):
super(ProcessMock, self).__init__(package, name, args, env, respawn)
self.stopped = False
def stop(self):
def stop(self, errors):
self.stopped = True
class RespawnOnceProcessMock(ProcessMock):

View File

@ -42,6 +42,7 @@ import xmlrpclib
import roslib.network
import roslib.scriptutil
_ID = '/roslaunch-netapi'
def get_roslaunch_uris():
"""
@return: list of roslaunch XML-RPC URIs for roscore that's in
@ -50,7 +51,7 @@ def get_roslaunch_uris():
"""
try:
param_server = roslib.scriptutil.get_param_server()
code, msg, vals = param_server.getParam('/roslaunch-netapi', '/roslaunch/uris')
code, msg, vals = param_server.getParam(_ID, '/roslaunch/uris')
if code == 1 and vals:
return vals.values()
except socket.error: pass

View File

@ -194,10 +194,13 @@ class LocalProcess(Process):
return logfileout, logfileerr
## start the process.
## @throws FatalProcessLaunch: if process cannot be started and it
## is not likely to ever succeed
def start(self):
"""
Start the process.
@raise FatalProcessLaunch: if process cannot be started and it
is not likely to ever succeed
"""
super(LocalProcess, self).start()
try:
self.lock.acquire()
@ -254,9 +257,11 @@ executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '
finally:
self.lock.release()
## @param self
## @return bool: True if process is still running
def is_alive(self):
"""
@return: True if process is still running
@rtype: bool
"""
if not self.started: #not started yet
return True
if self.stopped or self.popen is None:
@ -266,9 +271,11 @@ executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '
return False
return True
## @param self
## @return str: human-readable description of exit state
def get_exit_description(self):
"""
@return: human-readable description of exit state
@rtype: str
"""
# #973: include location of output location in message
if self.exit_code is not None:
if self.exit_code:
@ -284,9 +291,13 @@ executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '
else:
return 'process has died'
## kill UNIX process
## @param self
def _stop_unix(self):
def _stop_unix(self, errors):
"""
UNIX implementation of process killing
@param errors: error messages. stop() will record messages into this list.
@type errors: [str]
"""
self.exit_code = self.popen.poll()
if self.exit_code is not None:
_logger.debug("process[%s].stop(): process has already returned %s", self.name, self.exit_code)
@ -294,9 +305,11 @@ executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '
self.popen = None
self.stopped = True
return
pid = self.popen.pid
pgid = os.getpgid(pid)
_logger.info("process[%s]: killing os process with pid[%s] pgid[%s]", self.name, pid, pgid)
try:
# Start with SIGINT and escalate from there.
_logger.info("[%s] sending SIGINT to pgid [%s]", self.name, pgid)
@ -321,12 +334,14 @@ executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '
retcode = self.popen.poll()
if retcode is None:
printerrlog("[%s] escalating to SIGKILL"%self.name)
errors.append("process[%s, pid %s]: required SIGKILL. May still be running."%(self.name, pid))
try:
os.killpg(pgid, signal.SIGKILL)
_logger.info("[%s] sent SIGKILL to pgid [%s]"%(self.name, pgid))
# #2096: don't block on SIGKILL, because this results in more orphaned processes overall
#self.popen.wait()
os.wait()
_logger.info("process[%s]: SIGKILL killed", self.name)
#os.wait()
_logger.info("process[%s]: sent SIGKILL", self.name)
except OSError, e:
if e.args[0] == 3:
printerrlog("no [%s] process with pid [%s]"%(self.name, pid))
@ -340,11 +355,15 @@ executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '
finally:
self.popen = None
def stop(self, errors=[]):
"""
Stop the process. Record any significant error messages in the errors parameter
## Stop the process
## @param self
def stop(self):
super(LocalProcess, self).stop()
@param errors: error messages. stop() will record messages into this list.
@type errors: [str]
"""
super(LocalProcess, self).stop(errors)
self.lock.acquire()
try:
try:
@ -354,7 +373,7 @@ executable permission. This is often caused by a bad launch-prefix."""%(msg, ' '
return
#NOTE: currently POSIX-only. Need to add in Windows code once I have a test environment:
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/347462
self._stop_unix()
self._stop_unix(errors)
except:
#traceback.print_exc()
_logger.error("[%s] EXCEPTION %s", self.name, traceback.format_exc())

View File

@ -202,7 +202,13 @@ class Process(object):
def is_alive(self):
return False
def stop(self):
def stop(self, errors=[]):
"""
Stop the process. Record any significant error messages in the errors parameter
@param errors: error messages. stop() will record messages into this list.
@type errors: [str]
"""
pass
def get_exit_description(self):
@ -395,7 +401,8 @@ class ProcessMonitor(Thread):
p = self.get_process(name)
if p:
try:
p.stop()
# no need to accumulate errors, so pass in []
p.stop([])
except:
logger.error(traceback.format_exc())
return True
@ -535,7 +542,8 @@ class ProcessMonitor(Thread):
for d in dead:
try:
self.unregister(d)
d.stop()
# stop process, don't accumulate errors
d.stop([])
# save process data to dead list
plock.acquire()
@ -558,7 +566,8 @@ class ProcessMonitor(Thread):
if self.is_shutdown:
break
printlog("[%s] restarting process"%r.name)
r.stop()
# stop process, don't accumulate errors
r.stop([])
r.start()
except:
traceback.print_exc()
@ -573,43 +582,49 @@ class ProcessMonitor(Thread):
# this is already true entering, but go ahead and make sure
self.is_shutdown = True
# killall processes on run exit
import Queue
q = Queue.Queue()
q.join()
try:
self.plock.acquire()
procs = self.procs[:]
# make copy of core_procs for threadsafe usage
core_procs = self.core_procs[:]
logger.info("ProcessMonitor._post_run %s: remaining procs are %s"%(self, self.procs))
# enqueue all non-core procs in reverse order for parallel kill
# #526/885: ignore core procs
[q.put(p) for p in reversed(self.procs) if not p in core_procs]
finally:
self.plock.release()
logger.info("ProcessMonitor._post_run %s: remaining procs are %s"%(self, procs))
# kill in reverse order
for p in reversed(procs):
# #526/885: ignore core procs
if p in core_procs:
continue
try:
logger.info("ProcessMonitor exit: killing %s", p.name)
printlog("[%s] killing on exit"%p.name)
p.stop()
except:
traceback.print_exc()
logger.error(traceback.format_exc())
# use 10 workers
killers = []
for i in range(10):
t = _ProcessKiller(q, i)
killers.append(t)
t.start()
# wait for workers to finish
q.join()
shutdown_errors = []
# accumulate all the shutdown errors
for t in killers:
shutdown_errors.extend(t.errors)
del killers[:]
# #526/885: kill core procs last
# we don't want to parallelize this as the master has to be last
for p in reversed(core_procs):
try:
logger.info("ProcessMonitor exit: killing %s", p.name)
printlog("[%s] killing on exit"%p.name)
p.stop()
except:
traceback.print_exc()
logger.error(traceback.format_exc())
_kill_process(p, shutdown_errors)
# delete everything except dead_list
logger.info("ProcessMonitor exit: cleaning up data structures and signals")
try:
self.plock.acquire()
del procs[:]
del core_procs[:]
del self.procs[:]
del self.core_procs[:]
@ -621,3 +636,40 @@ class ProcessMonitor(Thread):
reacquire_signals.clear()
logger.info("ProcessMonitor exit: pmon has shutdown")
self.done = True
if shutdown_errors:
printerrlog("Shutdown errors:\n"+'\n'.join([" * %s"%e for e in shutdown_errors]))
def _kill_process(p, errors):
"""
Routine for kill Process p with appropriate logging to screen and logfile
@param p: process to kill
@type p: Process
@param errors: list of error messages from killed process
@type errors: [str]
"""
try:
logger.info("ProcessMonitor exit: killing %s", p.name)
printlog("[%s] killing on exit"%p.name)
# we accumulate errors from each process so that we can print these at the end
p.stop(errors)
except:
traceback.print_exc()
logger.error(traceback.format_exc())
class _ProcessKiller(Thread):
def __init__(self, q, i):
Thread.__init__(self, name="ProcessKiller-%s"%i)
self.q = q
self.errors = []
def run(self):
q = self.q
while not q.empty():
_kill_process(q.get(), self.errors)
q.task_done()

View File

@ -242,7 +242,7 @@ class SSHChildROSLaunchProcess(roslaunch.server.ChildROSLaunchProcess):
return False
return True
def stop(self):
def stop(self, errors=[]):
try:
self.lock.acquire()
if not self.ssh: