Merge branch 'batch' of https://github.com/unias/docklet into batch

This commit is contained in:
Firmlyzhu 2019-03-29 15:10:56 +08:00
commit 80e15ae0a0
1 changed files with 15 additions and 6 deletions

View File

@ -85,6 +85,8 @@ class SubTask():
self.max_retry_count = max_retry_count self.max_retry_count = max_retry_count
self.vnode_started = False self.vnode_started = False
self.task_started = False self.task_started = False
self.start_at = 0
self.end_at = 0
self.status = WAITING self.status = WAITING
self.status_reason = '' self.status_reason = ''
self.try_count = 0 self.try_count = 0
@ -202,9 +204,8 @@ class TaskMgr(threading.Thread):
except Exception as err: except Exception as err:
self.logger.warning(str(err)) self.logger.warning(str(err))
if self.lazy_append_list: if self.lazy_append_list:
while self.lazy_append_list: self.task_queue.extend(self.lazy_append_list)
task = self.lazy_append_list.pop(0) self.lazy_append_list.clear()
self.task_queue.append(task)
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority) self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
def start_vnode(self, subtask): def start_vnode(self, subtask):
@ -220,6 +221,7 @@ class TaskMgr(threading.Thread):
subtask.status_reason = str(e) subtask.status_reason = str(e)
return [False, e] return [False, e]
subtask.vnode_started = True subtask.vnode_started = True
subtask.start_at = time.time()
self.cpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.cpu self.cpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.cpu
self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu
return [True, ''] return [True, '']
@ -237,6 +239,7 @@ class TaskMgr(threading.Thread):
subtask.status_reason = str(e) subtask.status_reason = str(e)
return [False, e] return [False, e]
subtask.vnode_started = False subtask.vnode_started = False
subtask.end_at = time.time()
self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu
self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu
return [True, ''] return [True, '']
@ -396,7 +399,7 @@ class TaskMgr(threading.Thread):
for sub_task in task.subtask_list: for sub_task in task.subtask_list:
if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING): if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING):
return False return False
self.logger.info('task %s completed %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list])))
if task.at_same_time and task.status == FAILED: if task.at_same_time and task.status == FAILED:
self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1) self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1)
else: else:
@ -432,6 +435,7 @@ class TaskMgr(threading.Thread):
sub_task.status = FAILED sub_task.status = FAILED
if task.at_same_time: if task.at_same_time:
task.status = FAILED task.status = FAILED
self.clear_sub_task(sub_task)
elif report.subTaskStatus == COMPLETED: elif report.subTaskStatus == COMPLETED:
self.clear_sub_task(sub_task) self.clear_sub_task(sub_task)
@ -650,8 +654,11 @@ class TaskMgr(threading.Thread):
return True return True
# user: username @queue_lock
# get the information of a task, including the status, task description and other information def get_task_list(self):
return self.task_queue.copy()
@queue_lock @queue_lock
def get_task(self, taskid): def get_task(self, taskid):
for task in self.task_queue: for task in self.task_queue:
@ -659,9 +666,11 @@ class TaskMgr(threading.Thread):
return task return task
return None return None
def set_jobmgr(self, jobmgr): def set_jobmgr(self, jobmgr):
self.jobmgr = jobmgr self.jobmgr = jobmgr
# get names of all the batch containers of the user # get names of all the batch containers of the user
def get_user_batch_containers(self,username): def get_user_batch_containers(self,username):
return [] return []