Merge pull request #320 from GallenShao/batch

update taskmgr according to rpc.proto
This commit is contained in:
GallenShao 2018-08-05 20:58:59 +08:00 committed by GitHub
commit 2d80d2505a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 10 deletions

View File

@ -32,7 +32,8 @@ class TaskReporter(MasterServicer):
self.taskmgr = taskmgr
def report(self, request, context):
self.taskmgr.on_task_report(request)
for task_report in request.taskmsgs:
self.taskmgr.on_task_report(task_report)
return Reply(status=Reply.ACCEPTED, message='')
@ -107,6 +108,7 @@ class TaskMgr(threading.Thread):
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
instance['status'] = report.instanceStatus
instance['error_msg'] = report.errmsg
instance['last_update_time'] = time.time()
if report.instanceStatus == COMPLETED:
@ -114,6 +116,8 @@ class TaskMgr(threading.Thread):
elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT:
if instance['try_count'] > task.info.maxRetryCount:
self.check_task_completed(task)
elif report.instanceStatus == OUTPUTERROR:
task_failed(task)
def check_task_completed(self, task):
@ -128,21 +132,39 @@ class TaskMgr(threading.Thread):
failed = True
else:
return
if self.jobmgr is None:
self.logger.error('[check_task_completed] jobmgr is None!')
return
if instance['status'] == OUTPUTERROR:
failed = True
break
if failed:
# TODO tell jobmgr task failed
task.status = FAILED
self.jobmgr.report(task)
self.task_failed(task)
else:
# TODO tell jobmgr task completed
self.task_completed(task)
def task_completed(self, task):
task.status = COMPLETED
if self.jobmgr is None:
self.logger.error('[task_completed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.logger.info('task %s completed' % task.info.id)
self.task_queue.remove(task)
def task_failed(self, task):
task.status = FAILED
if self.jobmgr is None:
self.logger.error('[task_failed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.logger.info('task %s failed' % task.info.id)
self.task_queue.remove(task)
def task_processor(self, task, instance_id, worker_ip):
task.status = RUNNING

View File

@ -154,7 +154,7 @@ def report(taskid, instanceid, status, token):
master_port = env.getenv('BATCH_MASTER_PORT')
channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port))
stub = MasterStub(channel)
response = stub.report(TaskMsg(taskid=taskid, instanceid=instanceid, instanceStatus=status, token=token))
response = stub.report(ReportMsg(taskmsgs=TaskMsg(taskid=taskid, instanceid=instanceid, instanceStatus=status, token=token)))
def stop():