Merge pull request #374 from GallenShao/batch

prevent task from starving
This commit is contained in:
GallenShao 2019-03-17 22:07:30 +08:00 committed by GitHub
commit c908f0a95e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 1 deletions

View File

@ -444,24 +444,36 @@ class TaskMgr(threading.Thread):
if task.at_same_time: if task.at_same_time:
# parallel tasks # parallel tasks
if not self.has_waiting(task.subtask_list):
continue
workers = self.find_proper_workers(task.subtask_list) workers = self.find_proper_workers(task.subtask_list)
if len(workers) == 0: if len(workers) == 0:
continue return None, None
else: else:
for i in range(len(workers)): for i in range(len(workers)):
task.subtask_list[i].worker = workers[i] task.subtask_list[i].worker = workers[i]
return task, task.subtask_list return task, task.subtask_list
else: else:
# traditional tasks # traditional tasks
has_waiting = False
for sub_task in task.subtask_list: for sub_task in task.subtask_list:
if sub_task.status == WAITING: if sub_task.status == WAITING:
has_waiting = True
workers = self.find_proper_workers([sub_task]) workers = self.find_proper_workers([sub_task])
if len(workers) > 0: if len(workers) > 0:
sub_task.worker = workers[0] sub_task.worker = workers[0]
return task, [sub_task] return task, [sub_task]
if has_waiting:
return None, None
return None, None return None, None
def has_waiting(self, sub_task_list):
for sub_task in sub_task_list:
if sub_task.status == WAITING:
return True
return False
def find_proper_workers(self, sub_task_list, all_res=False): def find_proper_workers(self, sub_task_list, all_res=False):
nodes = self.get_all_nodes() nodes = self.get_all_nodes()
if nodes is None or len(nodes) == 0: if nodes is None or len(nodes) == 0: