Merge pull request #350 from GallenShao/batch

add gpu support
This commit is contained in:
GallenShao 2018-11-09 21:48:20 +08:00 committed by GitHub
commit 085efb0537
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 133 additions and 63 deletions

View File

@ -3,6 +3,7 @@ import time
import string import string
import random import random
import json import json
from functools import wraps
# must import logger after initlogging, ugly # must import logger after initlogging, ugly
from utils.log import logger from utils.log import logger
@ -17,11 +18,17 @@ from utils import env
class Task(): class Task():
def __init__(self, info): def __init__(self, info, priority):
self.info = info self.info = info
self.status = WAITING self.status = WAITING
self.instance_list = [] self.instance_list = []
self.token = '' self.token = ''
# priority the bigger the better
# self.priority the smaller the better
self.priority = int(time.time()) / 60 / 60 - priority
def __lt__(self, other):
return self.priority < other.priority
class TaskReporter(MasterServicer): class TaskReporter(MasterServicer):
@ -45,6 +52,9 @@ class TaskMgr(threading.Thread):
self.thread_stop = False self.thread_stop = False
self.jobmgr = None self.jobmgr = None
self.task_queue = [] self.task_queue = []
self.lazy_append_list = []
self.lazy_delete_list = []
self.task_queue_lock = threading.Lock()
self.user_containers = {} self.user_containers = {}
self.scheduler_interval = scheduler_interval self.scheduler_interval = scheduler_interval
@ -57,14 +67,26 @@ class TaskMgr(threading.Thread):
self.nodemgr = nodemgr self.nodemgr = nodemgr
self.monitor_fetcher = monitor_fetcher self.monitor_fetcher = monitor_fetcher
self.cpu_usage = {} self.cpu_usage = {}
self.gpu_usage = {}
# self.all_nodes = None # self.all_nodes = None
# self.last_nodes_info_update_time = 0 # self.last_nodes_info_update_time = 0
# self.nodes_info_update_interval = 30 # (s) # self.nodes_info_update_interval = 30 # (s)
def queue_lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
self.task_queue_lock.acquire()
result = f(self, *args, **kwargs)
self.task_queue_lock.release()
return result
return new_f
def run(self): def run(self):
self.serve() self.serve()
while not self.thread_stop: while not self.thread_stop:
self.sort_out_task_queue()
task, instance_id, worker = self.task_scheduler() task, instance_id, worker = self.task_scheduler()
if task is not None and worker is not None: if task is not None and worker is not None:
self.task_processor(task, instance_id, worker) self.task_processor(task, instance_id, worker)
@ -107,6 +129,7 @@ class TaskMgr(threading.Thread):
if instance['status'] == RUNNING and report.instanceStatus != RUNNING: if instance['status'] == RUNNING and report.instanceStatus != RUNNING:
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
instance['status'] = report.instanceStatus instance['status'] = report.instanceStatus
instance['error_msg'] = report.errmsg instance['error_msg'] = report.errmsg
@ -150,7 +173,7 @@ class TaskMgr(threading.Thread):
else: else:
self.jobmgr.report(task) self.jobmgr.report(task)
self.logger.info('task %s completed' % task.info.id) self.logger.info('task %s completed' % task.info.id)
self.task_queue.remove(task) self.lazy_delete_list.append(task)
def task_failed(self, task): def task_failed(self, task):
@ -161,9 +184,20 @@ class TaskMgr(threading.Thread):
else: else:
self.jobmgr.report(task) self.jobmgr.report(task)
self.logger.info('task %s failed' % task.info.id) self.logger.info('task %s failed' % task.info.id)
self.task_queue.remove(task) self.lazy_delete_list.append(task)
@queue_lock
def sort_out_task_queue(self):
while self.lazy_delete_list:
task = self.lazy_delete_list.pop(0)
self.task_queue.remove(task)
if self.lazy_append_list:
while self.lazy_append_list:
task = self.lazy_append_list.pop(0)
self.task_queue.append(task)
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
def task_processor(self, task, instance_id, worker_ip): def task_processor(self, task, instance_id, worker_ip):
task.status = RUNNING task.status = RUNNING
@ -179,6 +213,7 @@ class TaskMgr(threading.Thread):
instance['worker'] = worker_ip instance['worker'] = worker_ip
self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu
self.gpu_usage[worker_ip] += task.info.cluster.instance.gpu
username = task.info.username username = task.info.username
container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token
if not username in self.user_containers.keys(): if not username in self.user_containers.keys():
@ -201,7 +236,7 @@ class TaskMgr(threading.Thread):
# return task, worker # return task, worker
def task_scheduler(self): def task_scheduler(self):
# simple FIFO # simple FIFO with priority
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue)) self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
# nodes = self.get_all_nodes() # nodes = self.get_all_nodes()
@ -217,6 +252,8 @@ class TaskMgr(threading.Thread):
# self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key])) # self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key]))
for task in self.task_queue: for task in self.task_queue:
if task in self.lazy_delete_list:
continue
worker = self.find_proper_worker(task) worker = self.find_proper_worker(task)
for index, instance in enumerate(task.instance_list): for index, instance in enumerate(task.instance_list):
@ -231,6 +268,7 @@ class TaskMgr(threading.Thread):
instance['status'] = FAILED instance['status'] = FAILED
instance['token'] = '' instance['token'] = ''
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index)) self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index))
if worker is not None: if worker is not None:
@ -259,9 +297,10 @@ class TaskMgr(threading.Thread):
continue continue
if task.info.cluster.instance.memory > worker_info['memory']: if task.info.cluster.instance.memory > worker_info['memory']:
continue continue
# if task.info.cluster.instance.disk > worker_info['disk']: # try not to assign non-gpu task to a worker with gpu
# continue if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0:
if task.info.cluster.instance.gpu > worker_info['gpu']: continue
if task.info.cluster.instance.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']:
continue continue
return worker_ip return worker_ip
return None return None
@ -289,7 +328,7 @@ class TaskMgr(threading.Thread):
info['cpu'] = len(worker_info['cpuconfig']) info['cpu'] = len(worker_info['cpuconfig'])
info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb) info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb)
info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb) info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb)
info['gpu'] = 0 # not support yet info['gpu'] = len(worker_info['gpuinfo'])
return info return info
@ -301,15 +340,21 @@ class TaskMgr(threading.Thread):
return 0 return 0
def get_gpu_usage(self, worker_ip):
try:
return self.gpu_usage[worker_ip]
except:
self.gpu_usage[worker_ip] = 0
return 0
def set_jobmgr(self, jobmgr): def set_jobmgr(self, jobmgr):
self.jobmgr = jobmgr self.jobmgr = jobmgr
# user: username
# task: a json string
# save the task information into database # save the task information into database
# called when jobmgr assign task to taskmgr # called when jobmgr assign task to taskmgr
def add_task(self, username, taskid, json_task): def add_task(self, username, taskid, json_task, task_priority=1):
# decode json string to object defined in grpc # decode json string to object defined in grpc
self.logger.info('[taskmgr add_task] receive task %s' % taskid) self.logger.info('[taskmgr add_task] receive task %s' % taskid)
image_dict = { image_dict = {
@ -340,16 +385,18 @@ class TaskMgr(threading.Thread):
cpu = int(json_task['cpuSetting']), cpu = int(json_task['cpuSetting']),
memory = int(json_task['memorySetting']), memory = int(json_task['memorySetting']),
disk = int(json_task['diskSetting']), disk = int(json_task['diskSetting']),
gpu = int(json_task['gpuSetting']))))) gpu = int(json_task['gpuSetting'])))),
priority=task_priority)
if 'mapping' in json_task: if 'mapping' in json_task:
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'], task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
for mapping_key in json_task['mapping']]) for mapping_key in json_task['mapping']])
self.task_queue.append(task) self.lazy_append_list.append(task)
# user: username # user: username
# get the information of a task, including the status, task description and other information # get the information of a task, including the status, task description and other information
@queue_lock
def get_task(self, taskid): def get_task(self, taskid):
for task in self.task_queue: for task in self.task_queue:
if task.info.id == taskid: if task.info.id == taskid:

View File

@ -1,5 +1,8 @@
import lxc import lxc
import subprocess import subprocess
import os
import signal
from utils.log import logger
# Note: keep physical device id always the same as the virtual device id # Note: keep physical device id always the same as the virtual device id
@ -37,64 +40,79 @@ def remove_device(container_name, device_path):
# +-----------------------------------------------------------------------------+ # +-----------------------------------------------------------------------------+
# #
def nvidia_smi(): def nvidia_smi():
try: try:
ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return ret.stdout.decode('utf-8').split('\n') return ret.stdout.decode('utf-8').split('\n')
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return None return None
def get_gpu_driver_version(): def get_gpu_driver_version():
output = nvidia_smi() output = nvidia_smi()
if not output: if not output:
return None return None
else: else:
return output[2].split()[-2] return output[2].split()[-2]
def get_gpu_status(): def get_gpu_status():
output = nvidia_smi() output = nvidia_smi()
if not output: if not output:
return [] return []
interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0]
status_list = [] status_list = []
for index in range(7, interval_index, 3): for index in range(7, interval_index, 3):
status = {} status = {}
status['id'] = output[index].split()[1] status['id'] = output[index].split()[1]
sp = output[index+1].split() sp = output[index+1].split()
status['fan'] = sp[1] status['fan'] = sp[1]
status['memory'] = sp[8] status['memory'] = sp[8]
status['memory_max'] = sp[10] status['memory_max'] = sp[10]
status['util'] = sp[12] status['util'] = sp[12]
status_list.append(status) status_list.append(status)
return status_list return status_list
def get_gpu_processes(): def get_gpu_processes():
output = nvidia_smi() output = nvidia_smi()
if not output: if not output:
return [] return []
interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0]
process_list = [] process_list = []
for index in range(interval_index + 5, len(output)): for index in range(interval_index + 5, len(output)):
sp = output[index].split() sp = output[index].split()
if len(sp) != 7: if len(sp) != 7:
break break
process = {} process = {}
process['gpu'] = sp[1] process['gpu'] = sp[1]
process['pid'] = sp[2] process['pid'] = sp[2]
process['name'] = sp[4] process['name'] = sp[4]
process['memory'] = sp[5] process['memory'] = sp[5]
process['container'] = get_container_name_by_pid(sp[2]) process['container'] = get_container_name_by_pid(sp[2])
process_list.append(process) process_list.append(process)
return process_list return process_list
def get_container_name_by_pid(pid): def get_container_name_by_pid(pid):
with open('/proc/%s/cgroup' % pid) as f: with open('/proc/%s/cgroup' % pid) as f:
content = f.readlines()[0].strip().split('/') content = f.readlines()[0].strip().split('/')
if content[1] != 'lxc': if content[1] != 'lxc':
return 'host' return 'host'
else: else:
return content[2] return content[2]
return None return None
def clean_up_processes_in_gpu(gpu_id):
logger.info('[gputools] start clean up processes in gpu %d' % gpu_id)
processes = get_gpu_processes()
for process in [p for p in processes if p['gpu'] == gpu_id]:
logger.info('[gputools] find process %d running in gpu %d' % (process['pid'], process['gpu']))
if process['container'] == 'host':
logger.warning('[gputools] find process of host, ignored')
else:
logger.warning('[gputools] find process of container [%s], killed' % process['container'])
try:
os.kill(process['pid'], signal.SIGKILL)
except OSError:
continue

View File

@ -19,7 +19,7 @@ Design:Monitor mainly consists of three parts: Collectors, Master_Collector and
import subprocess,re,os,psutil,math,sys import subprocess,re,os,psutil,math,sys
import time,threading,json,traceback,platform import time,threading,json,traceback,platform
from utils import env, etcdlib from utils import env, etcdlib, gputools
import lxc import lxc
import xmlrpc.client import xmlrpc.client
from datetime import datetime from datetime import datetime
@ -481,6 +481,10 @@ class Collector(threading.Thread):
info[idx][key] = val info[idx][key] = val
return [cpuset, info] return [cpuset, info]
# collect gpu used information
def collect_gpuinfo(self):
return gputools.get_gpu_status()
# collect disk used information # collect disk used information
def collect_diskinfo(self): def collect_diskinfo(self):
global workercinfo global workercinfo
@ -537,6 +541,7 @@ class Collector(threading.Thread):
[cpuinfo,cpuconfig] = self.collect_cpuinfo() [cpuinfo,cpuconfig] = self.collect_cpuinfo()
workerinfo['cpuinfo'] = cpuinfo workerinfo['cpuinfo'] = cpuinfo
workerinfo['cpuconfig'] = cpuconfig workerinfo['cpuconfig'] = cpuconfig
workerinfo['gpuinfo'] = self.collect_gpuinfo()
workerinfo['diskinfo'] = self.collect_diskinfo() workerinfo['diskinfo'] = self.collect_diskinfo()
workerinfo['running'] = True workerinfo['running'] = True
#time.sleep(self.interval) #time.sleep(self.interval)