diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index b8729d3..6d41492 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -3,6 +3,7 @@ import time import string import random import json +from functools import wraps # must import logger after initlogging, ugly from utils.log import logger @@ -17,11 +18,17 @@ from utils import env class Task(): - def __init__(self, info): + def __init__(self, info, priority): self.info = info self.status = WAITING self.instance_list = [] self.token = '' + # priority the bigger the better + # self.priority the smaller the better + self.priority = int(time.time()) / 60 / 60 - priority + + def __lt__(self, other): + return self.priority < other.priority class TaskReporter(MasterServicer): @@ -45,6 +52,9 @@ class TaskMgr(threading.Thread): self.thread_stop = False self.jobmgr = None self.task_queue = [] + self.lazy_append_list = [] + self.lazy_delete_list = [] + self.task_queue_lock = threading.Lock() self.user_containers = {} self.scheduler_interval = scheduler_interval @@ -57,14 +67,26 @@ class TaskMgr(threading.Thread): self.nodemgr = nodemgr self.monitor_fetcher = monitor_fetcher self.cpu_usage = {} + self.gpu_usage = {} # self.all_nodes = None # self.last_nodes_info_update_time = 0 # self.nodes_info_update_interval = 30 # (s) + def queue_lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): + self.task_queue_lock.acquire() + result = f(self, *args, **kwargs) + self.task_queue_lock.release() + return result + return new_f + + def run(self): self.serve() while not self.thread_stop: + self.sort_out_task_queue() task, instance_id, worker = self.task_scheduler() if task is not None and worker is not None: self.task_processor(task, instance_id, worker) @@ -107,6 +129,7 @@ class TaskMgr(threading.Thread): if instance['status'] == RUNNING and report.instanceStatus != RUNNING: self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu + self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu instance['status'] = report.instanceStatus instance['error_msg'] = report.errmsg @@ -150,7 +173,7 @@ class TaskMgr(threading.Thread): else: self.jobmgr.report(task) self.logger.info('task %s completed' % task.info.id) - self.task_queue.remove(task) + self.lazy_delete_list.append(task) def task_failed(self, task): @@ -161,9 +184,20 @@ class TaskMgr(threading.Thread): else: self.jobmgr.report(task) self.logger.info('task %s failed' % task.info.id) - self.task_queue.remove(task) + self.lazy_delete_list.append(task) + @queue_lock + def sort_out_task_queue(self): + while self.lazy_delete_list: + task = self.lazy_delete_list.pop(0) + self.task_queue.remove(task) + if self.lazy_append_list: + while self.lazy_append_list: + task = self.lazy_append_list.pop(0) + self.task_queue.append(task) + self.task_queue = sorted(self.task_queue, key=lambda x: x.priority) + def task_processor(self, task, instance_id, worker_ip): task.status = RUNNING @@ -179,6 +213,7 @@ class TaskMgr(threading.Thread): instance['worker'] = worker_ip self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu + self.gpu_usage[worker_ip] += task.info.cluster.instance.gpu username = task.info.username container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token if not username in self.user_containers.keys(): @@ -201,7 +236,7 @@ class TaskMgr(threading.Thread): # return task, worker def task_scheduler(self): - # simple FIFO + # simple FIFO with priority self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue)) # nodes = self.get_all_nodes() @@ -217,6 +252,8 @@ class TaskMgr(threading.Thread): # self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key])) for task in self.task_queue: + if task in self.lazy_delete_list: + continue worker = self.find_proper_worker(task) for index, instance in enumerate(task.instance_list): @@ -231,6 +268,7 @@ class TaskMgr(threading.Thread): instance['status'] = FAILED instance['token'] = '' self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu + self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index)) if worker is not None: @@ -259,9 +297,10 @@ class TaskMgr(threading.Thread): continue if task.info.cluster.instance.memory > worker_info['memory']: continue - # if task.info.cluster.instance.disk > worker_info['disk']: - # continue - if task.info.cluster.instance.gpu > worker_info['gpu']: + # try not to assign non-gpu task to a worker with gpu + if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0: + continue + if task.info.cluster.instance.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']: continue return worker_ip return None @@ -289,7 +328,7 @@ class TaskMgr(threading.Thread): info['cpu'] = len(worker_info['cpuconfig']) info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb) info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb) - info['gpu'] = 0 # not support yet + info['gpu'] = len(worker_info['gpuinfo']) return info @@ -301,15 +340,21 @@ class TaskMgr(threading.Thread): return 0 + def get_gpu_usage(self, worker_ip): + try: + return self.gpu_usage[worker_ip] + except: + self.gpu_usage[worker_ip] = 0 + return 0 + + def set_jobmgr(self, jobmgr): self.jobmgr = jobmgr - # user: username - # task: a json string # save the task information into database # called when jobmgr assign task to taskmgr - def add_task(self, username, taskid, json_task): + def add_task(self, username, taskid, json_task, task_priority=1): # decode json string to object defined in grpc self.logger.info('[taskmgr add_task] receive task %s' % taskid) image_dict = { @@ -340,16 +385,18 @@ class TaskMgr(threading.Thread): cpu = int(json_task['cpuSetting']), memory = int(json_task['memorySetting']), disk = int(json_task['diskSetting']), - gpu = int(json_task['gpuSetting']))))) + gpu = int(json_task['gpuSetting'])))), + priority=task_priority) if 'mapping' in json_task: task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'], remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) for mapping_key in json_task['mapping']]) - self.task_queue.append(task) + self.lazy_append_list.append(task) # user: username # get the information of a task, including the status, task description and other information + @queue_lock def get_task(self, taskid): for task in self.task_queue: if task.info.id == taskid: diff --git a/src/utils/gputools.py b/src/utils/gputools.py index 303bb7b..01aad8e 100644 --- a/src/utils/gputools.py +++ b/src/utils/gputools.py @@ -1,5 +1,8 @@ import lxc import subprocess +import os +import signal +from utils.log import logger # Note: keep physical device id always the same as the virtual device id @@ -37,64 +40,79 @@ def remove_device(container_name, device_path): # +-----------------------------------------------------------------------------+ # def nvidia_smi(): - try: - ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) - return ret.stdout.decode('utf-8').split('\n') - except subprocess.CalledProcessError: - return None + try: + ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + return ret.stdout.decode('utf-8').split('\n') + except subprocess.CalledProcessError: + return None def get_gpu_driver_version(): - output = nvidia_smi() - if not output: - return None - else: - return output[2].split()[-2] + output = nvidia_smi() + if not output: + return None + else: + return output[2].split()[-2] def get_gpu_status(): - output = nvidia_smi() - if not output: - return [] - interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] - status_list = [] - for index in range(7, interval_index, 3): - status = {} - status['id'] = output[index].split()[1] - sp = output[index+1].split() - status['fan'] = sp[1] - status['memory'] = sp[8] - status['memory_max'] = sp[10] - status['util'] = sp[12] - status_list.append(status) - return status_list + output = nvidia_smi() + if not output: + return [] + interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] + status_list = [] + for index in range(7, interval_index, 3): + status = {} + status['id'] = output[index].split()[1] + sp = output[index+1].split() + status['fan'] = sp[1] + status['memory'] = sp[8] + status['memory_max'] = sp[10] + status['util'] = sp[12] + status_list.append(status) + return status_list def get_gpu_processes(): - output = nvidia_smi() - if not output: - return [] - interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] - process_list = [] - for index in range(interval_index + 5, len(output)): - sp = output[index].split() - if len(sp) != 7: - break - process = {} - process['gpu'] = sp[1] - process['pid'] = sp[2] - process['name'] = sp[4] - process['memory'] = sp[5] - process['container'] = get_container_name_by_pid(sp[2]) - process_list.append(process) - return process_list + output = nvidia_smi() + if not output: + return [] + interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] + process_list = [] + for index in range(interval_index + 5, len(output)): + sp = output[index].split() + if len(sp) != 7: + break + process = {} + process['gpu'] = sp[1] + process['pid'] = sp[2] + process['name'] = sp[4] + process['memory'] = sp[5] + process['container'] = get_container_name_by_pid(sp[2]) + process_list.append(process) + return process_list def get_container_name_by_pid(pid): - with open('/proc/%s/cgroup' % pid) as f: - content = f.readlines()[0].strip().split('/') - if content[1] != 'lxc': - return 'host' - else: - return content[2] - return None \ No newline at end of file + with open('/proc/%s/cgroup' % pid) as f: + content = f.readlines()[0].strip().split('/') + if content[1] != 'lxc': + return 'host' + else: + return content[2] + return None + + +def clean_up_processes_in_gpu(gpu_id): + logger.info('[gputools] start clean up processes in gpu %d' % gpu_id) + processes = get_gpu_processes() + for process in [p for p in processes if p['gpu'] == gpu_id]: + logger.info('[gputools] find process %d running in gpu %d' % (process['pid'], process['gpu'])) + if process['container'] == 'host': + logger.warning('[gputools] find process of host, ignored') + else: + logger.warning('[gputools] find process of container [%s], killed' % process['container']) + try: + os.kill(process['pid'], signal.SIGKILL) + except OSError: + continue diff --git a/src/worker/monitor.py b/src/worker/monitor.py index a69c905..0fe7f26 100755 --- a/src/worker/monitor.py +++ b/src/worker/monitor.py @@ -19,7 +19,7 @@ Design:Monitor mainly consists of three parts: Collectors, Master_Collector and import subprocess,re,os,psutil,math,sys import time,threading,json,traceback,platform -from utils import env, etcdlib +from utils import env, etcdlib, gputools import lxc import xmlrpc.client from datetime import datetime @@ -481,6 +481,10 @@ class Collector(threading.Thread): info[idx][key] = val return [cpuset, info] + # collect gpu used information + def collect_gpuinfo(self): + return gputools.get_gpu_status() + # collect disk used information def collect_diskinfo(self): global workercinfo @@ -537,6 +541,7 @@ class Collector(threading.Thread): [cpuinfo,cpuconfig] = self.collect_cpuinfo() workerinfo['cpuinfo'] = cpuinfo workerinfo['cpuconfig'] = cpuconfig + workerinfo['gpuinfo'] = self.collect_gpuinfo() workerinfo['diskinfo'] = self.collect_diskinfo() workerinfo['running'] = True #time.sleep(self.interval)