record start/end time of every vnode

This commit is contained in:
Gallen 2019-03-24 20:15:11 +08:00
parent a3c6a9b867
commit 90eebc7531
1 changed files with 15 additions and 6 deletions

View File

@ -85,6 +85,8 @@ class SubTask():
self.max_retry_count = max_retry_count
self.vnode_started = False
self.task_started = False
self.start_at = 0
self.end_at = 0
self.status = WAITING
self.status_reason = ''
self.try_count = 0
@ -202,9 +204,8 @@ class TaskMgr(threading.Thread):
except Exception as err:
self.logger.warning(str(err))
if self.lazy_append_list:
while self.lazy_append_list:
task = self.lazy_append_list.pop(0)
self.task_queue.append(task)
self.task_queue.extend(self.lazy_append_list)
self.lazy_append_list.clear()
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
def start_vnode(self, subtask):
@ -220,6 +221,7 @@ class TaskMgr(threading.Thread):
subtask.status_reason = str(e)
return [False, e]
subtask.vnode_started = True
subtask.start_at = time.time()
self.cpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.cpu
self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu
return [True, '']
@ -237,6 +239,7 @@ class TaskMgr(threading.Thread):
subtask.status_reason = str(e)
return [False, e]
subtask.vnode_started = False
subtask.end_at = time.time()
self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu
self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu
return [True, '']
@ -396,7 +399,7 @@ class TaskMgr(threading.Thread):
for sub_task in task.subtask_list:
if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING):
return False
self.logger.info('task %s completed %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list])))
self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list])))
if task.at_same_time and task.status == FAILED:
self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1)
else:
@ -432,6 +435,7 @@ class TaskMgr(threading.Thread):
sub_task.status = FAILED
if task.at_same_time:
task.status = FAILED
self.clear_sub_task(sub_task)
elif report.subTaskStatus == COMPLETED:
self.clear_sub_task(sub_task)
@ -650,8 +654,11 @@ class TaskMgr(threading.Thread):
return True
# user: username
# get the information of a task, including the status, task description and other information
@queue_lock
def get_task_list(self):
return self.task_queue.copy()
@queue_lock
def get_task(self, taskid):
for task in self.task_queue:
@ -659,9 +666,11 @@ class TaskMgr(threading.Thread):
return task
return None
def set_jobmgr(self, jobmgr):
self.jobmgr = jobmgr
# get names of all the batch containers of the user
def get_user_batch_containers(self,username):
return []