From 90eebc75318d9a6b39a2eb5586d5adb647ab1cf7 Mon Sep 17 00:00:00 2001 From: Gallen Date: Sun, 24 Mar 2019 20:15:11 +0800 Subject: [PATCH] record start/end time of every vnode --- src/master/taskmgr.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 43e0063..d320a67 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -85,6 +85,8 @@ class SubTask(): self.max_retry_count = max_retry_count self.vnode_started = False self.task_started = False + self.start_at = 0 + self.end_at = 0 self.status = WAITING self.status_reason = '' self.try_count = 0 @@ -202,9 +204,8 @@ class TaskMgr(threading.Thread): except Exception as err: self.logger.warning(str(err)) if self.lazy_append_list: - while self.lazy_append_list: - task = self.lazy_append_list.pop(0) - self.task_queue.append(task) + self.task_queue.extend(self.lazy_append_list) + self.lazy_append_list.clear() self.task_queue = sorted(self.task_queue, key=lambda x: x.priority) def start_vnode(self, subtask): @@ -220,6 +221,7 @@ class TaskMgr(threading.Thread): subtask.status_reason = str(e) return [False, e] subtask.vnode_started = True + subtask.start_at = time.time() self.cpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.cpu self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu return [True, ''] @@ -237,6 +239,7 @@ class TaskMgr(threading.Thread): subtask.status_reason = str(e) return [False, e] subtask.vnode_started = False + subtask.end_at = time.time() self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu return [True, ''] @@ -396,7 +399,7 @@ class TaskMgr(threading.Thread): for sub_task in task.subtask_list: if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING): return False - self.logger.info('task %s completed %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) + self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list]))) if task.at_same_time and task.status == FAILED: self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1) else: @@ -432,6 +435,7 @@ class TaskMgr(threading.Thread): sub_task.status = FAILED if task.at_same_time: task.status = FAILED + self.clear_sub_task(sub_task) elif report.subTaskStatus == COMPLETED: self.clear_sub_task(sub_task) @@ -650,8 +654,11 @@ class TaskMgr(threading.Thread): return True - # user: username - # get the information of a task, including the status, task description and other information + @queue_lock + def get_task_list(self): + return self.task_queue.copy() + + @queue_lock def get_task(self, taskid): for task in self.task_queue: @@ -659,9 +666,11 @@ class TaskMgr(threading.Thread): return task return None + def set_jobmgr(self, jobmgr): self.jobmgr = jobmgr + # get names of all the batch containers of the user def get_user_batch_containers(self,username): return []