bug fixed

This commit is contained in:
root 2019-03-05 21:12:33 +08:00
parent a50e1f5b83
commit 5eaf321097
2 changed files with 37 additions and 13 deletions

View File

@ -248,21 +248,25 @@ class TaskMgr(threading.Thread):
except Exception as e: except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e) self.logger.error('[task_processor] rpc error message: %s' % e)
subtask.status_reason = str(e) subtask.status_reason = str(e)
return [False, e]
subtask.task_started = True subtask.task_started = True
return [True, '']
def stop_task(self, subtask): def stop_task(self, subtask):
try: try:
self.logger.info('[task_processor] Stoping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) self.logger.info('[task_processor] Stoping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
stub = WorkerStub(channel) stub = WorkerStub(channel)
response = stub.stop_stask(subtask.command_info) response = stub.stop_task(subtask.command_info)
if response.status != Reply.ACCEPTED: if response.status != Reply.ACCEPTED:
raise Exception(response.message) raise Exception(response.message)
except Exception as e: except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e) self.logger.error('[task_processor] rpc error message: %s' % e)
subtask.status = FAILED subtask.status = FAILED
subtask.status_reason = str(e) subtask.status_reason = str(e)
return [False, e]
subtask.task_started = False subtask.task_started = False
return [True, '']
@net_lock @net_lock
def acquire_task_ips(self, task): def acquire_task_ips(self, task):
@ -349,6 +353,8 @@ class TaskMgr(threading.Thread):
# start tasks # start tasks
for sub_task in sub_task_list: for sub_task in sub_task_list:
task_info = sub_task.command_info task_info = sub_task.command_info
if task_info is None or sub_task.status == RUNNING:
continue
task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
if self.start_task(sub_task): if self.start_task(sub_task):
@ -371,7 +377,7 @@ class TaskMgr(threading.Thread):
for sub_task in task.subtask_list: for sub_task in task.subtask_list:
if sub_task.status == RUNNING or sub_task.status == WAITING: if sub_task.status == RUNNING or sub_task.status == WAITING:
return False return False
self.logger.info('task %s completed' % task.id) self.logger.info('task %s completed %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list])))
if task.at_same_time and task.status == FAILED: if task.at_same_time and task.status == FAILED:
self.clear_sub_tasks(task.subtask_list) self.clear_sub_tasks(task.subtask_list)
# TODO report to jobmgr # TODO report to jobmgr
@ -387,8 +393,8 @@ class TaskMgr(threading.Thread):
return return
sub_task = task.subtask_list[report.vnodeid] sub_task = task.subtask_list[report.vnodeid]
if sub_task.token != report.token: if sub_task.command_info.token != report.token:
self.logger.warning('[on_task_report] wrong token') self.logger.warning('[on_task_report] wrong token, %s %s' % (sub_task.command_info.token, report.token))
return return
username = task.username username = task.username
# container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token # container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token
@ -400,10 +406,14 @@ class TaskMgr(threading.Thread):
sub_task.status = report.subTaskStatus sub_task.status = report.subTaskStatus
sub_task.status_reason = report.errmsg sub_task.status_reason = report.errmsg
self.clear_sub_task(sub_task)
if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT:
sub_task.waiting_for_retry() sub_task.waiting_for_retry()
elif report.subTaskStatus == OUTPUTERROR:
sub_task.status = FAILED
if task.at_same_time:
task.status = FAILED
elif report.subTaskStatus == COMPLETED:
self.clear_sub_task(sub_task)
# return task, workers # return task, workers
def task_scheduler(self): def task_scheduler(self):

View File

@ -31,9 +31,21 @@ class SimulatedTaskController(WorkerServicer):
def __init__(self, worker): def __init__(self, worker):
self.worker = worker self.worker = worker
def process_task(self, task, context): def start_vnode(self, vnodeinfo, context):
print('[SimulatedTaskController] receive task [%s] instanceid [%d] token [%s]' % (task.id, task.instanceid, task.token)) print('[SimulatedTaskController] start vnode, taskid [%s] vnodeid [%d]' % (vnodeinfo.taskid, vnodeinfo.vnodeid))
worker.process(task) return Reply(status=Reply.ACCEPTED,message="")
def stop_vnode(self, vnodeinfo, context):
print('[SimulatedTaskController] stop vnode, taskid [%s] vnodeid [%d]' % (vnodeinfo.taskid, vnodeinfo.vnodeid))
return Reply(status=Reply.ACCEPTED,message="")
def start_task(self, taskinfo, context):
print('[SimulatedTaskController] start task, taskid [%s] vnodeid [%d] token [%s]' % (taskinfo.taskid, taskinfo.vnodeid, taskinfo.token))
worker.process(taskinfo)
return Reply(status=Reply.ACCEPTED,message="")
def stop_task(self, taskinfo, context):
print('[SimulatedTaskController] stop task, taskid [%s] vnodeid [%d] token [%s]' % (taskinfo.taskid, taskinfo.vnodeid, taskinfo.token))
return Reply(status=Reply.ACCEPTED,message="") return Reply(status=Reply.ACCEPTED,message="")
@ -54,13 +66,15 @@ class SimulatedWorker(threading.Thread):
for task in self.tasks: for task in self.tasks:
seed = random.random() seed = random.random()
if seed < 0.25: if seed < 0.25:
report(task.id, task.instanceid, RUNNING, task.token) report(task.taskid, task.vnodeid, RUNNING, task.token)
elif seed < 0.5: elif seed < 0.5:
report(task.id, task.instanceid, COMPLETED, task.token) report(task.taskid, task.vnodeid, COMPLETED, task.token)
self.tasks.remove(task) self.tasks.remove(task)
break
elif seed < 0.75: elif seed < 0.75:
report(task.id, task.instanceid, FAILED, task.token) report(task.taskid, task.vnodeid, FAILED, task.token)
self.tasks.remove(task) self.tasks.remove(task)
break
else: else:
pass pass
time.sleep(5) time.sleep(5)
@ -166,7 +180,7 @@ def report(taskid, instanceid, status, token):
master_port = env.getenv('BATCH_MASTER_PORT') master_port = env.getenv('BATCH_MASTER_PORT')
channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port)) channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port))
stub = MasterStub(channel) stub = MasterStub(channel)
response = stub.report(ReportMsg(taskmsgs=TaskMsg(taskid=taskid, username='root', vnodeid=instanceid, subTaskStatus=status, token=token))) response = stub.report(ReportMsg(taskmsgs=[TaskMsg(taskid=taskid, username='root', vnodeid=instanceid, subTaskStatus=status, token=token)]))
def stop(): def stop():