Add gpu supports in taskcontroller
This commit is contained in:
parent
e5eb504390
commit
2dd0f42e4f
|
@ -15,7 +15,7 @@ import grpc
|
|||
#from utils.log import logger
|
||||
#from utils import env
|
||||
import json,lxc,subprocess,threading,os,time,traceback
|
||||
from utils import imagemgr,etcdlib
|
||||
from utils import imagemgr,etcdlib,gputools
|
||||
from worker import ossmounter
|
||||
from protos import rpc_pb2, rpc_pb2_grpc
|
||||
|
||||
|
@ -80,6 +80,12 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
self.free_ips.append(i)
|
||||
logger.info("Free ip addresses pool %s" % str(self.free_ips))
|
||||
|
||||
self.gpu_lock = threading.Lock()
|
||||
self.gpu_status = {}
|
||||
gpus = gputools.get_gpu_status()
|
||||
for gpu in gpus:
|
||||
gpu_status[gpu['id']] = ""
|
||||
|
||||
self.start_report()
|
||||
logger.info('TaskController init success')
|
||||
|
||||
|
@ -102,6 +108,41 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
logger.info(str(self.free_ips))
|
||||
self.lock.release()
|
||||
|
||||
def add_gpu_device(self, lxcname, gpu_need):
|
||||
if gpu_need < 0:
|
||||
return [True, ""]
|
||||
self.gpu_lock.acquire()
|
||||
use_gpus = []
|
||||
for gpuid in self.gpu_status.keys():
|
||||
if self.gpu_status[gpuid] == "":
|
||||
use_gpus.append(gpuid)
|
||||
if len(use_gpus) < gpu_need:
|
||||
self.gpu_lock.release()
|
||||
return [False, "No free GPUs"]
|
||||
for gpuid in use_gpus:
|
||||
self.gpu_status[gpuid] = lxcname
|
||||
try:
|
||||
gputools.add_device_node(lxcname, "/dev/nvidiactl")
|
||||
gputools.add_device_node(lxcname, "/dev/nvidia-uvm")
|
||||
for gpuid in use_gpus:
|
||||
gputools.add_device_node(lxcname,"/dev/nvidia"+str(gpuid))
|
||||
logger.info("Add gpu:"+str(gpuid) +" to lxc:"str(lxcname))
|
||||
except Exception as e:
|
||||
logger.error(traceback.format_exc())
|
||||
for gpuid in use_gpus:
|
||||
self.gpu_status[gpuid] = ""
|
||||
self.gpu_lock.release()
|
||||
return [False, "Error occurs when adding gpu device."]
|
||||
|
||||
self.gpu_lock.release()
|
||||
|
||||
def release_gpu_device(self, lxcname):
|
||||
self.gpu_lock.acquire()
|
||||
for gpuid in self.gpu_status.keys():
|
||||
if self.gpu_status[gpuid] == lxcname:
|
||||
self.gpu_status[gpuid] = ""
|
||||
self.gpu_lock.release()
|
||||
|
||||
#mount_oss
|
||||
def mount_oss(self, datapath, mount_info):
|
||||
self.mount_lock.acquire()
|
||||
|
@ -170,6 +211,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
if outpath[i] == "":
|
||||
outpath[i] = "/root/nfs/"
|
||||
timeout = request.timeout
|
||||
gpu_need = int(request.cluster.instance.gpu)
|
||||
|
||||
# acquire ip
|
||||
[status, ip] = self.acquire_ip()
|
||||
|
@ -225,9 +267,19 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
if not container.start():
|
||||
logger.error('start container %s failed' % lxcname)
|
||||
self.release_ip(ip)
|
||||
self.imgmgr.deleteFS(lxcname)
|
||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container")
|
||||
else:
|
||||
logger.info('start container %s success' % lxcname)
|
||||
|
||||
logger.info('start container %s success' % lxcname)
|
||||
|
||||
#add GPU
|
||||
[success, msg] = self.add_gpu_device(lxcname,gpu_need)
|
||||
if not success:
|
||||
logger.error("Fail to add gpu device.")
|
||||
container.stop()
|
||||
self.release_ip(ip)
|
||||
self.imgmgr.deleteFS(lxcname)
|
||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device.")
|
||||
|
||||
thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list))
|
||||
thread.setDaemon(True)
|
||||
|
@ -337,6 +389,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
|
||||
logger.info("release ip address %s" % ip)
|
||||
self.release_ip(ip)
|
||||
self.release_gpu_device(lxcname)
|
||||
|
||||
#umount oss
|
||||
self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_info)
|
||||
|
|
Loading…
Reference in New Issue