update taskmgr according to rpc.proto

This commit is contained in:
Gallen 2018-08-05 20:57:49 +08:00
parent 09ef26c30e
commit 1aa5995095
2 changed files with 32 additions and 10 deletions

View File

@ -32,7 +32,8 @@ class TaskReporter(MasterServicer):
self.taskmgr = taskmgr self.taskmgr = taskmgr
def report(self, request, context): def report(self, request, context):
self.taskmgr.on_task_report(request) for task_report in request.taskmsgs:
self.taskmgr.on_task_report(task_report)
return Reply(status=Reply.ACCEPTED, message='') return Reply(status=Reply.ACCEPTED, message='')
@ -107,6 +108,7 @@ class TaskMgr(threading.Thread):
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
instance['status'] = report.instanceStatus instance['status'] = report.instanceStatus
instance['error_msg'] = report.errmsg
instance['last_update_time'] = time.time() instance['last_update_time'] = time.time()
if report.instanceStatus == COMPLETED: if report.instanceStatus == COMPLETED:
@ -114,6 +116,8 @@ class TaskMgr(threading.Thread):
elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT: elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT:
if instance['try_count'] > task.info.maxRetryCount: if instance['try_count'] > task.info.maxRetryCount:
self.check_task_completed(task) self.check_task_completed(task)
elif report.instanceStatus == OUTPUTERROR:
task_failed(task)
def check_task_completed(self, task): def check_task_completed(self, task):
@ -128,21 +132,39 @@ class TaskMgr(threading.Thread):
failed = True failed = True
else: else:
return return
if self.jobmgr is None: if instance['status'] == OUTPUTERROR:
self.logger.error('[check_task_completed] jobmgr is None!') failed = True
return break
if failed: if failed:
# TODO tell jobmgr task failed self.task_failed(task)
task.status = FAILED
self.jobmgr.report(task)
else: else:
# TODO tell jobmgr task completed self.task_completed(task)
def task_completed(self, task):
task.status = COMPLETED task.status = COMPLETED
if self.jobmgr is None:
self.logger.error('[task_completed] jobmgr is None!')
else:
self.jobmgr.report(task) self.jobmgr.report(task)
self.logger.info('task %s completed' % task.info.id) self.logger.info('task %s completed' % task.info.id)
self.task_queue.remove(task) self.task_queue.remove(task)
def task_failed(self, task):
task.status = FAILED
if self.jobmgr is None:
self.logger.error('[task_failed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.logger.info('task %s failed' % task.info.id)
self.task_queue.remove(task)
def task_processor(self, task, instance_id, worker_ip): def task_processor(self, task, instance_id, worker_ip):
task.status = RUNNING task.status = RUNNING

View File

@ -154,7 +154,7 @@ def report(taskid, instanceid, status, token):
master_port = env.getenv('BATCH_MASTER_PORT') master_port = env.getenv('BATCH_MASTER_PORT')
channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port)) channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port))
stub = MasterStub(channel) stub = MasterStub(channel)
response = stub.report(TaskMsg(taskid=taskid, instanceid=instanceid, instanceStatus=status, token=token)) response = stub.report(ReportMsg(taskmsgs=TaskMsg(taskid=taskid, instanceid=instanceid, instanceStatus=status, token=token)))
def stop(): def stop():