diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index c8ca5b9..d87432a 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -213,6 +213,7 @@ class JobMgr(): 'tasks': list(all_tasks.keys()), 'tasks_vnodeCount': tasks_vnodeCount }) + res.sort(key=lambda x:x['create_time'],reverse=True) return res # user: username diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 0c5ec1a..2425b0d 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -80,7 +80,8 @@ class SubTask(): self.vnode_info = vnode_info self.vnode_info.vnodeid = idx self.command_info = command_info - self.command_info.vnodeid = idx + if self.command_info != None: + self.command_info.vnodeid = idx self.max_retry_count = max_retry_count self.vnode_started = False self.task_started = False @@ -147,10 +148,10 @@ class TaskMgr(threading.Thread): self.task_cidr = max(task_cidr,2) self.base_ip = ip_to_int(batch_net) self.free_nets = [] - for i in range((1 << self.task_cidr), (1 << (32-self.batch_cidr)) - 1): + for i in range(0, (1 << (32-self.batch_cidr)) - 1, (1 << self.task_cidr)): self.free_nets.append(i) - # self.logger.info("Free nets addresses pool %s" % str(self.free_nets)) - # self.logger.info("Each Batch Net CIDR:%s"%(str(self.task_cidr))) + self.logger.info("Free nets addresses pool %s" % str(self.free_nets)) + self.logger.info("Each Batch Net CIDR:%s"%(str(self.task_cidr))) def queue_lock(f): @wraps(f) @@ -282,7 +283,7 @@ class TaskMgr(threading.Thread): return self.free_nets.append(task.task_base_ip) task.task_base_ip = None - self.logger.error('[release task_net] %s' % str(e)) + #self.logger.error('[release task_net] %s' % str(e)) def setup_tasknet(self, task, workers=None): taskid = task.id @@ -309,7 +310,7 @@ class TaskMgr(threading.Thread): def task_processor(self, task, sub_task_list): task.status = RUNNING - # self.jobmgr.report(task.id,'running') + self.jobmgr.report(task.username, task.id, 'running') # properties for transactio @@ -356,6 +357,7 @@ class TaskMgr(threading.Thread): for sub_task in sub_task_list: task_info = sub_task.command_info if task_info is None or sub_task.status == RUNNING: + sub_task.status = RUNNING continue task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) @@ -372,24 +374,25 @@ class TaskMgr(threading.Thread): if sub_task.task_started: self.stop_task(sub_task) if sub_task.vnode_started: - self.stop_vnode(sub_task) - #pass + #self.stop_vnode(sub_task) + pass def check_task_completed(self, task): if task.status == RUNNING or task.status == WAITING: for sub_task in task.subtask_list: - if sub_task.status == RUNNING or sub_task.status == WAITING: + if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING): return False self.logger.info('task %s completed %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if task.at_same_time and task.status == FAILED: self.clear_sub_tasks(task.subtask_list) - # TODO report to jobmgr - if self.jobmgr is None: - self.logger.error('[task_completed] jobmgr is None!') + self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1) else: - username = task.username - taskid = task.id - self.jobmgr.report(username,taskid,'finished') + self.jobmgr.report(task.username,task.id,'finished') + for sub_task in task.subtask_list: + if sub_task.command_info == None and sub_task.status == RUNNING: + self.clear_sub_task(sub_task) + self.release_task_ips(task) + self.remove_tasknet(task) self.lazy_delete_list.append(task) return True diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index 359b7ba..b8d06bd 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -141,7 +141,7 @@ +'Task #' + task_number +'
' +'' - +'
' + +'
' +'
' +'
' +''