diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 53c3366..6ea7469 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -248,21 +248,25 @@ class TaskMgr(threading.Thread): except Exception as e: self.logger.error('[task_processor] rpc error message: %s' % e) subtask.status_reason = str(e) + return [False, e] subtask.task_started = True + return [True, ''] def stop_task(self, subtask): try: self.logger.info('[task_processor] Stoping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) stub = WorkerStub(channel) - response = stub.stop_stask(subtask.command_info) + response = stub.stop_task(subtask.command_info) if response.status != Reply.ACCEPTED: raise Exception(response.message) except Exception as e: self.logger.error('[task_processor] rpc error message: %s' % e) subtask.status = FAILED subtask.status_reason = str(e) + return [False, e] subtask.task_started = False + return [True, ''] @net_lock def acquire_task_ips(self, task): @@ -349,6 +353,8 @@ class TaskMgr(threading.Thread): # start tasks for sub_task in sub_task_list: task_info = sub_task.command_info + if task_info is None or sub_task.status == RUNNING: + continue task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) if self.start_task(sub_task): @@ -371,7 +377,7 @@ class TaskMgr(threading.Thread): for sub_task in task.subtask_list: if sub_task.status == RUNNING or sub_task.status == WAITING: return False - self.logger.info('task %s completed' % task.id) + self.logger.info('task %s completed %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if task.at_same_time and task.status == FAILED: self.clear_sub_tasks(task.subtask_list) # TODO report to jobmgr @@ -387,8 +393,8 @@ class TaskMgr(threading.Thread): return sub_task = task.subtask_list[report.vnodeid] - if sub_task.token != report.token: - self.logger.warning('[on_task_report] wrong token') + if sub_task.command_info.token != report.token: + self.logger.warning('[on_task_report] wrong token, %s %s' % (sub_task.command_info.token, report.token)) return username = task.username # container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token @@ -400,10 +406,14 @@ class TaskMgr(threading.Thread): sub_task.status = report.subTaskStatus sub_task.status_reason = report.errmsg - self.clear_sub_task(sub_task) - if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: sub_task.waiting_for_retry() + elif report.subTaskStatus == OUTPUTERROR: + sub_task.status = FAILED + if task.at_same_time: + task.status = FAILED + elif report.subTaskStatus == COMPLETED: + self.clear_sub_task(sub_task) # return task, workers def task_scheduler(self): diff --git a/src/master/testTaskMgr.py b/src/master/testTaskMgr.py index f9b7854..3211e64 100644 --- a/src/master/testTaskMgr.py +++ b/src/master/testTaskMgr.py @@ -31,9 +31,21 @@ class SimulatedTaskController(WorkerServicer): def __init__(self, worker): self.worker = worker - def process_task(self, task, context): - print('[SimulatedTaskController] receive task [%s] instanceid [%d] token [%s]' % (task.id, task.instanceid, task.token)) - worker.process(task) + def start_vnode(self, vnodeinfo, context): + print('[SimulatedTaskController] start vnode, taskid [%s] vnodeid [%d]' % (vnodeinfo.taskid, vnodeinfo.vnodeid)) + return Reply(status=Reply.ACCEPTED,message="") + + def stop_vnode(self, vnodeinfo, context): + print('[SimulatedTaskController] stop vnode, taskid [%s] vnodeid [%d]' % (vnodeinfo.taskid, vnodeinfo.vnodeid)) + return Reply(status=Reply.ACCEPTED,message="") + + def start_task(self, taskinfo, context): + print('[SimulatedTaskController] start task, taskid [%s] vnodeid [%d] token [%s]' % (taskinfo.taskid, taskinfo.vnodeid, taskinfo.token)) + worker.process(taskinfo) + return Reply(status=Reply.ACCEPTED,message="") + + def stop_task(self, taskinfo, context): + print('[SimulatedTaskController] stop task, taskid [%s] vnodeid [%d] token [%s]' % (taskinfo.taskid, taskinfo.vnodeid, taskinfo.token)) return Reply(status=Reply.ACCEPTED,message="") @@ -54,13 +66,15 @@ class SimulatedWorker(threading.Thread): for task in self.tasks: seed = random.random() if seed < 0.25: - report(task.id, task.instanceid, RUNNING, task.token) + report(task.taskid, task.vnodeid, RUNNING, task.token) elif seed < 0.5: - report(task.id, task.instanceid, COMPLETED, task.token) + report(task.taskid, task.vnodeid, COMPLETED, task.token) self.tasks.remove(task) + break elif seed < 0.75: - report(task.id, task.instanceid, FAILED, task.token) + report(task.taskid, task.vnodeid, FAILED, task.token) self.tasks.remove(task) + break else: pass time.sleep(5) @@ -166,7 +180,7 @@ def report(taskid, instanceid, status, token): master_port = env.getenv('BATCH_MASTER_PORT') channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port)) stub = MasterStub(channel) - response = stub.report(ReportMsg(taskmsgs=TaskMsg(taskid=taskid, username='root', vnodeid=instanceid, subTaskStatus=status, token=token))) + response = stub.report(ReportMsg(taskmsgs=[TaskMsg(taskid=taskid, username='root', vnodeid=instanceid, subTaskStatus=status, token=token)])) def stop():