thread lock bug fixed & add priority
This commit is contained in:
parent
cb7f476ec6
commit
6f937eb6b3
|
@ -3,6 +3,7 @@ import time
|
||||||
import string
|
import string
|
||||||
import random
|
import random
|
||||||
import json
|
import json
|
||||||
|
from functools import wraps
|
||||||
|
|
||||||
# must import logger after initlogging, ugly
|
# must import logger after initlogging, ugly
|
||||||
from utils.log import logger
|
from utils.log import logger
|
||||||
|
@ -17,11 +18,17 @@ from utils import env
|
||||||
|
|
||||||
|
|
||||||
class Task():
|
class Task():
|
||||||
def __init__(self, info):
|
def __init__(self, info, priority):
|
||||||
self.info = info
|
self.info = info
|
||||||
self.status = WAITING
|
self.status = WAITING
|
||||||
self.instance_list = []
|
self.instance_list = []
|
||||||
self.token = ''
|
self.token = ''
|
||||||
|
# priority the bigger the better
|
||||||
|
# self.priority the smaller the better
|
||||||
|
self.priority = int(time.time()) / 60 / 60 - priority
|
||||||
|
|
||||||
|
def __lt__(self, other):
|
||||||
|
return self.priority < other.priority
|
||||||
|
|
||||||
|
|
||||||
class TaskReporter(MasterServicer):
|
class TaskReporter(MasterServicer):
|
||||||
|
@ -45,6 +52,7 @@ class TaskMgr(threading.Thread):
|
||||||
self.thread_stop = False
|
self.thread_stop = False
|
||||||
self.jobmgr = None
|
self.jobmgr = None
|
||||||
self.task_queue = []
|
self.task_queue = []
|
||||||
|
self.lazy_append_list = []
|
||||||
self.lazy_delete_list = []
|
self.lazy_delete_list = []
|
||||||
self.task_queue_lock = threading.Lock()
|
self.task_queue_lock = threading.Lock()
|
||||||
self.user_containers = {}
|
self.user_containers = {}
|
||||||
|
@ -65,10 +73,11 @@ class TaskMgr(threading.Thread):
|
||||||
# self.nodes_info_update_interval = 30 # (s)
|
# self.nodes_info_update_interval = 30 # (s)
|
||||||
|
|
||||||
|
|
||||||
def queue_lock(self, f):
|
def queue_lock(f):
|
||||||
def new_f(*args, **kwargs):
|
@wraps(f)
|
||||||
|
def new_f(self, *args, **kwargs):
|
||||||
self.task_queue_lock.acquire()
|
self.task_queue_lock.acquire()
|
||||||
result = f(args, kwargs)
|
result = f(self, *args, **kwargs)
|
||||||
self.task_queue_lock.release()
|
self.task_queue_lock.release()
|
||||||
return result
|
return result
|
||||||
return new_f
|
return new_f
|
||||||
|
@ -77,7 +86,7 @@ class TaskMgr(threading.Thread):
|
||||||
def run(self):
|
def run(self):
|
||||||
self.serve()
|
self.serve()
|
||||||
while not self.thread_stop:
|
while not self.thread_stop:
|
||||||
self.clean_up_finished_task()
|
self.sort_out_task_queue()
|
||||||
task, instance_id, worker = self.task_scheduler()
|
task, instance_id, worker = self.task_scheduler()
|
||||||
if task is not None and worker is not None:
|
if task is not None and worker is not None:
|
||||||
self.task_processor(task, instance_id, worker)
|
self.task_processor(task, instance_id, worker)
|
||||||
|
@ -179,10 +188,15 @@ class TaskMgr(threading.Thread):
|
||||||
|
|
||||||
|
|
||||||
@queue_lock
|
@queue_lock
|
||||||
def clean_up_finished_task(self):
|
def sort_out_task_queue(self):
|
||||||
while self.lazy_delete_list:
|
while self.lazy_delete_list:
|
||||||
task = self.lazy_delete_list.pop(0)
|
task = self.lazy_delete_list.pop(0)
|
||||||
self.task_queue.remove(task)
|
self.task_queue.remove(task)
|
||||||
|
if self.lazy_append_list:
|
||||||
|
while self.lazy_append_list:
|
||||||
|
task = self.lazy_append_list.pop(0)
|
||||||
|
self.task_queue.append(task)
|
||||||
|
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
|
||||||
|
|
||||||
|
|
||||||
def task_processor(self, task, instance_id, worker_ip):
|
def task_processor(self, task, instance_id, worker_ip):
|
||||||
|
@ -222,7 +236,7 @@ class TaskMgr(threading.Thread):
|
||||||
|
|
||||||
# return task, worker
|
# return task, worker
|
||||||
def task_scheduler(self):
|
def task_scheduler(self):
|
||||||
# simple FIFO
|
# simple FIFO with priority
|
||||||
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
|
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
|
||||||
|
|
||||||
# nodes = self.get_all_nodes()
|
# nodes = self.get_all_nodes()
|
||||||
|
@ -338,11 +352,9 @@ class TaskMgr(threading.Thread):
|
||||||
self.jobmgr = jobmgr
|
self.jobmgr = jobmgr
|
||||||
|
|
||||||
|
|
||||||
# user: username
|
|
||||||
# task: a json string
|
|
||||||
# save the task information into database
|
# save the task information into database
|
||||||
# called when jobmgr assign task to taskmgr
|
# called when jobmgr assign task to taskmgr
|
||||||
def add_task(self, username, taskid, json_task):
|
def add_task(self, username, taskid, json_task, task_priority=1):
|
||||||
# decode json string to object defined in grpc
|
# decode json string to object defined in grpc
|
||||||
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
|
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
|
||||||
image_dict = {
|
image_dict = {
|
||||||
|
@ -373,12 +385,13 @@ class TaskMgr(threading.Thread):
|
||||||
cpu = int(json_task['cpuSetting']),
|
cpu = int(json_task['cpuSetting']),
|
||||||
memory = int(json_task['memorySetting']),
|
memory = int(json_task['memorySetting']),
|
||||||
disk = int(json_task['diskSetting']),
|
disk = int(json_task['diskSetting']),
|
||||||
gpu = int(json_task['gpuSetting'])))))
|
gpu = int(json_task['gpuSetting'])))),
|
||||||
|
priority=task_priority)
|
||||||
if 'mapping' in json_task:
|
if 'mapping' in json_task:
|
||||||
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
|
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
|
||||||
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
|
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
|
||||||
for mapping_key in json_task['mapping']])
|
for mapping_key in json_task['mapping']])
|
||||||
self.task_queue.append(task)
|
self.lazy_append_list.append(task)
|
||||||
|
|
||||||
|
|
||||||
# user: username
|
# user: username
|
||||||
|
|
Loading…
Reference in New Issue