diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index b8729d3..9c3587b 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -45,6 +45,8 @@ class TaskMgr(threading.Thread): self.thread_stop = False self.jobmgr = None self.task_queue = [] + self.lazy_delete_list = [] + self.task_queue_lock = threading.Lock() self.user_containers = {} self.scheduler_interval = scheduler_interval @@ -62,9 +64,19 @@ class TaskMgr(threading.Thread): # self.nodes_info_update_interval = 30 # (s) + def queue_lock(self, f): + def new_f(*args, **kwargs): + self.task_queue_lock.acquire() + result = f(args, kwargs) + self.task_queue_lock.release() + return result + return new_f + + def run(self): self.serve() while not self.thread_stop: + self.clean_up_finished_task() task, instance_id, worker = self.task_scheduler() if task is not None and worker is not None: self.task_processor(task, instance_id, worker) @@ -150,7 +162,7 @@ class TaskMgr(threading.Thread): else: self.jobmgr.report(task) self.logger.info('task %s completed' % task.info.id) - self.task_queue.remove(task) + self.lazy_delete_list.append(task) def task_failed(self, task): @@ -161,9 +173,15 @@ class TaskMgr(threading.Thread): else: self.jobmgr.report(task) self.logger.info('task %s failed' % task.info.id) - self.task_queue.remove(task) + self.lazy_delete_list.append(task) + @queue_lock + def clean_up_finished_task(self): + while self.lazy_delete_list: + task = self.lazy_delete_list.pop(0) + self.task_queue.remove(task) + def task_processor(self, task, instance_id, worker_ip): task.status = RUNNING @@ -217,6 +235,8 @@ class TaskMgr(threading.Thread): # self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key])) for task in self.task_queue: + if task in self.lazy_delete_list: + continue worker = self.find_proper_worker(task) for index, instance in enumerate(task.instance_list): @@ -259,8 +279,9 @@ class TaskMgr(threading.Thread): continue if task.info.cluster.instance.memory > worker_info['memory']: continue - # if task.info.cluster.instance.disk > worker_info['disk']: - # continue + # try not to assign non-gpu task to a worker with gpu + if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0: + continue if task.info.cluster.instance.gpu > worker_info['gpu']: continue return worker_ip @@ -289,7 +310,7 @@ class TaskMgr(threading.Thread): info['cpu'] = len(worker_info['cpuconfig']) info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb) info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb) - info['gpu'] = 0 # not support yet + info['gpu'] = len(worker_info['gpuinfo']) return info @@ -350,6 +371,7 @@ class TaskMgr(threading.Thread): # user: username # get the information of a task, including the status, task description and other information + @queue_lock def get_task(self, taskid): for task in self.task_queue: if task.info.id == taskid: diff --git a/src/utils/gputools.py b/src/utils/gputools.py index 303bb7b..01aad8e 100644 --- a/src/utils/gputools.py +++ b/src/utils/gputools.py @@ -1,5 +1,8 @@ import lxc import subprocess +import os +import signal +from utils.log import logger # Note: keep physical device id always the same as the virtual device id @@ -37,64 +40,79 @@ def remove_device(container_name, device_path): # +-----------------------------------------------------------------------------+ # def nvidia_smi(): - try: - ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) - return ret.stdout.decode('utf-8').split('\n') - except subprocess.CalledProcessError: - return None + try: + ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + return ret.stdout.decode('utf-8').split('\n') + except subprocess.CalledProcessError: + return None def get_gpu_driver_version(): - output = nvidia_smi() - if not output: - return None - else: - return output[2].split()[-2] + output = nvidia_smi() + if not output: + return None + else: + return output[2].split()[-2] def get_gpu_status(): - output = nvidia_smi() - if not output: - return [] - interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] - status_list = [] - for index in range(7, interval_index, 3): - status = {} - status['id'] = output[index].split()[1] - sp = output[index+1].split() - status['fan'] = sp[1] - status['memory'] = sp[8] - status['memory_max'] = sp[10] - status['util'] = sp[12] - status_list.append(status) - return status_list + output = nvidia_smi() + if not output: + return [] + interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] + status_list = [] + for index in range(7, interval_index, 3): + status = {} + status['id'] = output[index].split()[1] + sp = output[index+1].split() + status['fan'] = sp[1] + status['memory'] = sp[8] + status['memory_max'] = sp[10] + status['util'] = sp[12] + status_list.append(status) + return status_list def get_gpu_processes(): - output = nvidia_smi() - if not output: - return [] - interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] - process_list = [] - for index in range(interval_index + 5, len(output)): - sp = output[index].split() - if len(sp) != 7: - break - process = {} - process['gpu'] = sp[1] - process['pid'] = sp[2] - process['name'] = sp[4] - process['memory'] = sp[5] - process['container'] = get_container_name_by_pid(sp[2]) - process_list.append(process) - return process_list + output = nvidia_smi() + if not output: + return [] + interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] + process_list = [] + for index in range(interval_index + 5, len(output)): + sp = output[index].split() + if len(sp) != 7: + break + process = {} + process['gpu'] = sp[1] + process['pid'] = sp[2] + process['name'] = sp[4] + process['memory'] = sp[5] + process['container'] = get_container_name_by_pid(sp[2]) + process_list.append(process) + return process_list def get_container_name_by_pid(pid): - with open('/proc/%s/cgroup' % pid) as f: - content = f.readlines()[0].strip().split('/') - if content[1] != 'lxc': - return 'host' - else: - return content[2] - return None \ No newline at end of file + with open('/proc/%s/cgroup' % pid) as f: + content = f.readlines()[0].strip().split('/') + if content[1] != 'lxc': + return 'host' + else: + return content[2] + return None + + +def clean_up_processes_in_gpu(gpu_id): + logger.info('[gputools] start clean up processes in gpu %d' % gpu_id) + processes = get_gpu_processes() + for process in [p for p in processes if p['gpu'] == gpu_id]: + logger.info('[gputools] find process %d running in gpu %d' % (process['pid'], process['gpu'])) + if process['container'] == 'host': + logger.warning('[gputools] find process of host, ignored') + else: + logger.warning('[gputools] find process of container [%s], killed' % process['container']) + try: + os.kill(process['pid'], signal.SIGKILL) + except OSError: + continue diff --git a/src/worker/monitor.py b/src/worker/monitor.py index a69c905..0fe7f26 100755 --- a/src/worker/monitor.py +++ b/src/worker/monitor.py @@ -19,7 +19,7 @@ Design:Monitor mainly consists of three parts: Collectors, Master_Collector and import subprocess,re,os,psutil,math,sys import time,threading,json,traceback,platform -from utils import env, etcdlib +from utils import env, etcdlib, gputools import lxc import xmlrpc.client from datetime import datetime @@ -481,6 +481,10 @@ class Collector(threading.Thread): info[idx][key] = val return [cpuset, info] + # collect gpu used information + def collect_gpuinfo(self): + return gputools.get_gpu_status() + # collect disk used information def collect_diskinfo(self): global workercinfo @@ -537,6 +541,7 @@ class Collector(threading.Thread): [cpuinfo,cpuconfig] = self.collect_cpuinfo() workerinfo['cpuinfo'] = cpuinfo workerinfo['cpuconfig'] = cpuconfig + workerinfo['gpuinfo'] = self.collect_gpuinfo() workerinfo['diskinfo'] = self.collect_diskinfo() workerinfo['running'] = True #time.sleep(self.interval)