diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 6428355..3e7d06d 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -113,6 +113,7 @@ class SubTask(): self.status_reason = '' self.try_count = 0 self.worker = None + self.lock = threading.Lock() def waiting_for_retry(self,reason=""): self.try_count += 1 @@ -120,7 +121,7 @@ class SubTask(): if self.status == FAILED: self.root_task.status = FAILED self.failed_reason = reason - + self.root_task.failed_reason = reason class TaskReporter(MasterServicer): @@ -197,6 +198,19 @@ class TaskMgr(threading.Thread): return new_f return lock + def subtask_lock(f): + @wraps(f) + def new_f(self, subtask, *args, **kwargs): + subtask.lock.acquire() + try: + result = f(self, subtask, *args, **kwargs) + except Exception as err: + subtask.lock.release() + raise err + subtask.lock.release() + return result + return new_f + def run(self): self.serve() while not self.thread_stop: @@ -272,7 +286,10 @@ class TaskMgr(threading.Thread): self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu return [True, ''] + @subtask_lock def stop_vnode(self, subtask): + if not subtask.vnode_started: + return [True, ""] try: self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) @@ -484,11 +501,15 @@ class TaskMgr(threading.Thread): sub_task.status = report.subTaskStatus sub_task.status_reason = report.errmsg + sub_task.task_started = False if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: + self.clear_sub_task(sub_task) sub_task.waiting_for_retry(report.errmsg) - self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) + if sub_task.status == WAITING: + self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) elif report.subTaskStatus == OUTPUTERROR: + self.clear_sub_task(sub_task) sub_task.status = FAILED task.status = FAILED task.failed_reason = report.errmsg @@ -506,7 +527,7 @@ class TaskMgr(threading.Thread): self.logger.info('task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if self.check_task_completed(task): continue - self.logger.info("test") + self.logger.info('schedule task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if task.at_same_time: # parallel tasks @@ -689,7 +710,7 @@ class TaskMgr(threading.Thread): timeout = int(json_task['expTime']) # commands are executed in all vnodes / only excuted in the first vnode # if in traditional mode, commands will be executed in all vnodes - ) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None + ) if (json_task['runon'] == 'all' or vnode_index == 0) else None } for vnode_index in range(int(json_task['vnodeCount']))]) if task.at_same_time: