diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index aea04da..6d41492 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -3,6 +3,7 @@ import time import string import random import json +from functools import wraps # must import logger after initlogging, ugly from utils.log import logger @@ -17,11 +18,17 @@ from utils import env class Task(): - def __init__(self, info): + def __init__(self, info, priority): self.info = info self.status = WAITING self.instance_list = [] self.token = '' + # priority the bigger the better + # self.priority the smaller the better + self.priority = int(time.time()) / 60 / 60 - priority + + def __lt__(self, other): + return self.priority < other.priority class TaskReporter(MasterServicer): @@ -45,6 +52,7 @@ class TaskMgr(threading.Thread): self.thread_stop = False self.jobmgr = None self.task_queue = [] + self.lazy_append_list = [] self.lazy_delete_list = [] self.task_queue_lock = threading.Lock() self.user_containers = {} @@ -65,10 +73,11 @@ class TaskMgr(threading.Thread): # self.nodes_info_update_interval = 30 # (s) - def queue_lock(self, f): - def new_f(*args, **kwargs): + def queue_lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): self.task_queue_lock.acquire() - result = f(args, kwargs) + result = f(self, *args, **kwargs) self.task_queue_lock.release() return result return new_f @@ -77,7 +86,7 @@ class TaskMgr(threading.Thread): def run(self): self.serve() while not self.thread_stop: - self.clean_up_finished_task() + self.sort_out_task_queue() task, instance_id, worker = self.task_scheduler() if task is not None and worker is not None: self.task_processor(task, instance_id, worker) @@ -179,10 +188,15 @@ class TaskMgr(threading.Thread): @queue_lock - def clean_up_finished_task(self): + def sort_out_task_queue(self): while self.lazy_delete_list: task = self.lazy_delete_list.pop(0) self.task_queue.remove(task) + if self.lazy_append_list: + while self.lazy_append_list: + task = self.lazy_append_list.pop(0) + self.task_queue.append(task) + self.task_queue = sorted(self.task_queue, key=lambda x: x.priority) def task_processor(self, task, instance_id, worker_ip): @@ -222,7 +236,7 @@ class TaskMgr(threading.Thread): # return task, worker def task_scheduler(self): - # simple FIFO + # simple FIFO with priority self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue)) # nodes = self.get_all_nodes() @@ -338,11 +352,9 @@ class TaskMgr(threading.Thread): self.jobmgr = jobmgr - # user: username - # task: a json string # save the task information into database # called when jobmgr assign task to taskmgr - def add_task(self, username, taskid, json_task): + def add_task(self, username, taskid, json_task, task_priority=1): # decode json string to object defined in grpc self.logger.info('[taskmgr add_task] receive task %s' % taskid) image_dict = { @@ -373,12 +385,13 @@ class TaskMgr(threading.Thread): cpu = int(json_task['cpuSetting']), memory = int(json_task['memorySetting']), disk = int(json_task['diskSetting']), - gpu = int(json_task['gpuSetting']))))) + gpu = int(json_task['gpuSetting'])))), + priority=task_priority) if 'mapping' in json_task: task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'], remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) for mapping_key in json_task['mapping']]) - self.task_queue.append(task) + self.lazy_append_list.append(task) # user: username