diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index d4290fe..f7b9ce4 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -32,7 +32,8 @@ class TaskReporter(MasterServicer): self.taskmgr = taskmgr def report(self, request, context): - self.taskmgr.on_task_report(request) + for task_report in request.taskmsgs: + self.taskmgr.on_task_report(task_report) return Reply(status=Reply.ACCEPTED, message='') @@ -107,6 +108,7 @@ class TaskMgr(threading.Thread): self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu instance['status'] = report.instanceStatus + instance['error_msg'] = report.errmsg instance['last_update_time'] = time.time() if report.instanceStatus == COMPLETED: @@ -114,6 +116,8 @@ class TaskMgr(threading.Thread): elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT: if instance['try_count'] > task.info.maxRetryCount: self.check_task_completed(task) + elif report.instanceStatus == OUTPUTERROR: + task_failed(task) def check_task_completed(self, task): @@ -128,21 +132,39 @@ class TaskMgr(threading.Thread): failed = True else: return - if self.jobmgr is None: - self.logger.error('[check_task_completed] jobmgr is None!') - return + if instance['status'] == OUTPUTERROR: + failed = True + break + if failed: - # TODO tell jobmgr task failed - task.status = FAILED - self.jobmgr.report(task) + self.task_failed(task) + else: + self.task_completed(task) + + + def task_completed(self, task): + task.status = COMPLETED + + if self.jobmgr is None: + self.logger.error('[task_completed] jobmgr is None!') else: - # TODO tell jobmgr task completed - task.status = COMPLETED self.jobmgr.report(task) self.logger.info('task %s completed' % task.info.id) self.task_queue.remove(task) + def task_failed(self, task): + task.status = FAILED + + if self.jobmgr is None: + self.logger.error('[task_failed] jobmgr is None!') + else: + self.jobmgr.report(task) + self.logger.info('task %s failed' % task.info.id) + self.task_queue.remove(task) + + + def task_processor(self, task, instance_id, worker_ip): task.status = RUNNING diff --git a/src/master/testTaskMgr.py b/src/master/testTaskMgr.py index fefb058..23207f8 100644 --- a/src/master/testTaskMgr.py +++ b/src/master/testTaskMgr.py @@ -154,7 +154,7 @@ def report(taskid, instanceid, status, token): master_port = env.getenv('BATCH_MASTER_PORT') channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port)) stub = MasterStub(channel) - response = stub.report(TaskMsg(taskid=taskid, instanceid=instanceid, instanceStatus=status, token=token)) + response = stub.report(ReportMsg(taskmsgs=TaskMsg(taskid=taskid, instanceid=instanceid, instanceStatus=status, token=token))) def stop():