diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index b5e7936..e5993e9 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -344,7 +344,7 @@ class TaskMgr(threading.Thread): vnode_info.vnode.network.CopyFrom(networkinfo) placed_workers.append(sub_task.worker) - [success,msg] = self.start_vnode(sub_task) + [success, msg] = self.start_vnode(sub_task) if not success: sub_task.waiting_for_retry() sub_task.worker = None @@ -361,7 +361,8 @@ class TaskMgr(threading.Thread): continue task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) - if self.start_task(sub_task): + [success, msg] = self.start_task(sub_task) + if success: sub_task.status = RUNNING else: sub_task.waiting_for_retry() @@ -373,6 +374,7 @@ class TaskMgr(threading.Thread): def clear_sub_task(self, sub_task): if sub_task.task_started: self.stop_task(sub_task) + #pass if sub_task.vnode_started: self.stop_vnode(sub_task) #pass @@ -458,7 +460,7 @@ class TaskMgr(threading.Thread): return None, None - def find_proper_workers(self, sub_task_list): + def find_proper_workers(self, sub_task_list, all_res=False): nodes = self.get_all_nodes() if nodes is None or len(nodes) == 0: self.logger.warning('[task_scheduler] running nodes not found') @@ -478,7 +480,7 @@ class TaskMgr(threading.Thread): for worker_ip, worker_info in nodes: #logger.info(worker_info) #logger.info(self.get_cpu_usage(worker_ip)) - if needs.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']: + if needs.cpu + (not all_res) * self.get_cpu_usage(worker_ip) > worker_info['cpu']: continue elif needs.memory > worker_info['memory']: continue @@ -487,7 +489,7 @@ class TaskMgr(threading.Thread): # try not to assign non-gpu task to a worker with gpu #if needs['gpu'] == 0 and worker_info['gpu'] > 0: #continue - elif needs.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']: + elif needs.gpu + (not all_res) * self.get_gpu_usage(worker_ip) > worker_info['gpu']: continue else: worker_info['cpu'] -= needs.cpu @@ -548,6 +550,7 @@ class TaskMgr(threading.Thread): def add_task(self, username, taskid, json_task, task_priority=1): # decode json string to object defined in grpc self.logger.info('[taskmgr add_task] receive task %s' % taskid) + image_dict = { "private": Image.PRIVATE, "base": Image.BASE, @@ -596,7 +599,24 @@ class TaskMgr(threading.Thread): # if in traditional mode, commands will be executed in all vnodes ) if (not 'atSameTime' in json_task.keys() or json_task['runon'] == 'all' or vnode_index == 0) else None } for vnode_index in range(int(json_task['vnodeCount']))]) + + if task.at_same_time: + workers = self.find_proper_workers(task.subtask_list, all_res=True) + if len(workers) == 0: + task.status = FAILED + # tell jobmgr + self.jobmgr.report(username,taskid,"failed","Resources needs exceed limits") + return False + else: + for sub_task in task.subtask_list: + workers = self.find_proper_workers([sub_task], all_res=True) + if len(workers) == 0: + task.status = FAILED + # tell jobmgr + self.jobmgr.report(username,taskid,"failed","Resources needs exceed limits") + return False self.lazy_append_list.append(task) + return True # user: username diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index 5fc9db8..465d015 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -41,7 +41,7 @@