Fix some bugs
This commit is contained in:
parent
0c14f3684d
commit
b71138b3a7
|
@ -113,6 +113,7 @@ class SubTask():
|
|||
self.status_reason = ''
|
||||
self.try_count = 0
|
||||
self.worker = None
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def waiting_for_retry(self,reason=""):
|
||||
self.try_count += 1
|
||||
|
@ -120,7 +121,7 @@ class SubTask():
|
|||
if self.status == FAILED:
|
||||
self.root_task.status = FAILED
|
||||
self.failed_reason = reason
|
||||
|
||||
self.root_task.failed_reason = reason
|
||||
|
||||
class TaskReporter(MasterServicer):
|
||||
|
||||
|
@ -197,6 +198,19 @@ class TaskMgr(threading.Thread):
|
|||
return new_f
|
||||
return lock
|
||||
|
||||
def subtask_lock(f):
|
||||
@wraps(f)
|
||||
def new_f(self, subtask, *args, **kwargs):
|
||||
subtask.lock.acquire()
|
||||
try:
|
||||
result = f(self, subtask, *args, **kwargs)
|
||||
except Exception as err:
|
||||
subtask.lock.release()
|
||||
raise err
|
||||
subtask.lock.release()
|
||||
return result
|
||||
return new_f
|
||||
|
||||
def run(self):
|
||||
self.serve()
|
||||
while not self.thread_stop:
|
||||
|
@ -272,7 +286,10 @@ class TaskMgr(threading.Thread):
|
|||
self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu
|
||||
return [True, '']
|
||||
|
||||
@subtask_lock
|
||||
def stop_vnode(self, subtask):
|
||||
if not subtask.vnode_started:
|
||||
return [True, ""]
|
||||
try:
|
||||
self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
|
||||
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
|
||||
|
@ -484,11 +501,15 @@ class TaskMgr(threading.Thread):
|
|||
|
||||
sub_task.status = report.subTaskStatus
|
||||
sub_task.status_reason = report.errmsg
|
||||
sub_task.task_started = False
|
||||
|
||||
if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT:
|
||||
self.clear_sub_task(sub_task)
|
||||
sub_task.waiting_for_retry(report.errmsg)
|
||||
self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg)
|
||||
if sub_task.status == WAITING:
|
||||
self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg)
|
||||
elif report.subTaskStatus == OUTPUTERROR:
|
||||
self.clear_sub_task(sub_task)
|
||||
sub_task.status = FAILED
|
||||
task.status = FAILED
|
||||
task.failed_reason = report.errmsg
|
||||
|
@ -506,7 +527,7 @@ class TaskMgr(threading.Thread):
|
|||
self.logger.info('task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list])))
|
||||
if self.check_task_completed(task):
|
||||
continue
|
||||
self.logger.info("test")
|
||||
self.logger.info('schedule task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list])))
|
||||
|
||||
if task.at_same_time:
|
||||
# parallel tasks
|
||||
|
@ -689,7 +710,7 @@ class TaskMgr(threading.Thread):
|
|||
timeout = int(json_task['expTime'])
|
||||
# commands are executed in all vnodes / only excuted in the first vnode
|
||||
# if in traditional mode, commands will be executed in all vnodes
|
||||
) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None
|
||||
) if (json_task['runon'] == 'all' or vnode_index == 0) else None
|
||||
} for vnode_index in range(int(json_task['vnodeCount']))])
|
||||
|
||||
if task.at_same_time:
|
||||
|
|
Loading…
Reference in New Issue