add thread lock to task_queue & add gpu info fetcher to monitor

This commit is contained in:
Gallen 2018-11-01 15:31:44 +08:00
parent 16f7f6b547
commit 39831e9fb9
3 changed files with 100 additions and 55 deletions

View File

@ -45,6 +45,8 @@ class TaskMgr(threading.Thread):
self.thread_stop = False
self.jobmgr = None
self.task_queue = []
self.lazy_delete_list = []
self.task_queue_lock = threading.Lock()
self.user_containers = {}
self.scheduler_interval = scheduler_interval
@ -62,9 +64,19 @@ class TaskMgr(threading.Thread):
# self.nodes_info_update_interval = 30 # (s)
def queue_lock(self, f):
def new_f(*args, **kwargs):
self.task_queue_lock.acquire()
result = f(args, kwargs)
self.task_queue_lock.release()
return result
return new_f
def run(self):
self.serve()
while not self.thread_stop:
self.clean_up_finished_task()
task, instance_id, worker = self.task_scheduler()
if task is not None and worker is not None:
self.task_processor(task, instance_id, worker)
@ -150,7 +162,7 @@ class TaskMgr(threading.Thread):
else:
self.jobmgr.report(task)
self.logger.info('task %s completed' % task.info.id)
self.task_queue.remove(task)
self.lazy_delete_list.append(task)
def task_failed(self, task):
@ -161,9 +173,15 @@ class TaskMgr(threading.Thread):
else:
self.jobmgr.report(task)
self.logger.info('task %s failed' % task.info.id)
self.task_queue.remove(task)
self.lazy_delete_list.append(task)
@queue_lock
def clean_up_finished_task(self):
while self.lazy_delete_list:
task = self.lazy_delete_list.pop(0)
self.task_queue.remove(task)
def task_processor(self, task, instance_id, worker_ip):
task.status = RUNNING
@ -217,6 +235,8 @@ class TaskMgr(threading.Thread):
# self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key]))
for task in self.task_queue:
if task in self.lazy_delete_list:
continue
worker = self.find_proper_worker(task)
for index, instance in enumerate(task.instance_list):
@ -259,8 +279,9 @@ class TaskMgr(threading.Thread):
continue
if task.info.cluster.instance.memory > worker_info['memory']:
continue
# if task.info.cluster.instance.disk > worker_info['disk']:
# continue
# try not to assign non-gpu task to a worker with gpu
if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0:
continue
if task.info.cluster.instance.gpu > worker_info['gpu']:
continue
return worker_ip
@ -289,7 +310,7 @@ class TaskMgr(threading.Thread):
info['cpu'] = len(worker_info['cpuconfig'])
info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb)
info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb)
info['gpu'] = 0 # not support yet
info['gpu'] = len(worker_info['gpuinfo'])
return info
@ -350,6 +371,7 @@ class TaskMgr(threading.Thread):
# user: username
# get the information of a task, including the status, task description and other information
@queue_lock
def get_task(self, taskid):
for task in self.task_queue:
if task.info.id == taskid:

View File

@ -1,5 +1,8 @@
import lxc
import subprocess
import os
import signal
from utils.log import logger
# Note: keep physical device id always the same as the virtual device id
@ -37,64 +40,79 @@ def remove_device(container_name, device_path):
# +-----------------------------------------------------------------------------+
#
def nvidia_smi():
try:
ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return ret.stdout.decode('utf-8').split('\n')
except subprocess.CalledProcessError:
return None
try:
ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return ret.stdout.decode('utf-8').split('\n')
except subprocess.CalledProcessError:
return None
def get_gpu_driver_version():
output = nvidia_smi()
if not output:
return None
else:
return output[2].split()[-2]
output = nvidia_smi()
if not output:
return None
else:
return output[2].split()[-2]
def get_gpu_status():
output = nvidia_smi()
if not output:
return []
interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0]
status_list = []
for index in range(7, interval_index, 3):
status = {}
status['id'] = output[index].split()[1]
sp = output[index+1].split()
status['fan'] = sp[1]
status['memory'] = sp[8]
status['memory_max'] = sp[10]
status['util'] = sp[12]
status_list.append(status)
return status_list
output = nvidia_smi()
if not output:
return []
interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0]
status_list = []
for index in range(7, interval_index, 3):
status = {}
status['id'] = output[index].split()[1]
sp = output[index+1].split()
status['fan'] = sp[1]
status['memory'] = sp[8]
status['memory_max'] = sp[10]
status['util'] = sp[12]
status_list.append(status)
return status_list
def get_gpu_processes():
output = nvidia_smi()
if not output:
return []
interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0]
process_list = []
for index in range(interval_index + 5, len(output)):
sp = output[index].split()
if len(sp) != 7:
break
process = {}
process['gpu'] = sp[1]
process['pid'] = sp[2]
process['name'] = sp[4]
process['memory'] = sp[5]
process['container'] = get_container_name_by_pid(sp[2])
process_list.append(process)
return process_list
output = nvidia_smi()
if not output:
return []
interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0]
process_list = []
for index in range(interval_index + 5, len(output)):
sp = output[index].split()
if len(sp) != 7:
break
process = {}
process['gpu'] = sp[1]
process['pid'] = sp[2]
process['name'] = sp[4]
process['memory'] = sp[5]
process['container'] = get_container_name_by_pid(sp[2])
process_list.append(process)
return process_list
def get_container_name_by_pid(pid):
with open('/proc/%s/cgroup' % pid) as f:
content = f.readlines()[0].strip().split('/')
if content[1] != 'lxc':
return 'host'
else:
return content[2]
return None
with open('/proc/%s/cgroup' % pid) as f:
content = f.readlines()[0].strip().split('/')
if content[1] != 'lxc':
return 'host'
else:
return content[2]
return None
def clean_up_processes_in_gpu(gpu_id):
logger.info('[gputools] start clean up processes in gpu %d' % gpu_id)
processes = get_gpu_processes()
for process in [p for p in processes if p['gpu'] == gpu_id]:
logger.info('[gputools] find process %d running in gpu %d' % (process['pid'], process['gpu']))
if process['container'] == 'host':
logger.warning('[gputools] find process of host, ignored')
else:
logger.warning('[gputools] find process of container [%s], killed' % process['container'])
try:
os.kill(process['pid'], signal.SIGKILL)
except OSError:
continue

View File

@ -19,7 +19,7 @@ Design:Monitor mainly consists of three parts: Collectors, Master_Collector and
import subprocess,re,os,psutil,math,sys
import time,threading,json,traceback,platform
from utils import env, etcdlib
from utils import env, etcdlib, gputools
import lxc
import xmlrpc.client
from datetime import datetime
@ -481,6 +481,10 @@ class Collector(threading.Thread):
info[idx][key] = val
return [cpuset, info]
# collect gpu used information
def collect_gpuinfo(self):
return gputools.get_gpu_status()
# collect disk used information
def collect_diskinfo(self):
global workercinfo
@ -537,6 +541,7 @@ class Collector(threading.Thread):
[cpuinfo,cpuconfig] = self.collect_cpuinfo()
workerinfo['cpuinfo'] = cpuinfo
workerinfo['cpuconfig'] = cpuconfig
workerinfo['gpuinfo'] = self.collect_gpuinfo()
workerinfo['diskinfo'] = self.collect_diskinfo()
workerinfo['running'] = True
#time.sleep(self.interval)