diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index ae4e82a..70e9105 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -62,7 +62,7 @@ class BatchJob(object): task_name = self.user + '_' + self.job_id + '_' + task_idx return task_name, self.raw_job_info["tasks"][task_idx] return '', None - + # a task has finished def finish_task(self, task_idx): pass @@ -137,7 +137,7 @@ class JobMgr(threading.Thread): if not task_info: return False else: - task_priority = self.job_priority + task_priority = job.job_priority self.taskmgr.add_task(job.user, task_name, task_info, task_priority) return True @@ -155,4 +155,3 @@ class JobMgr(threading.Thread): # a task has finished def report(self, task): pass - diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 6d41492..07936d8 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -374,8 +374,8 @@ class TaskMgr(threading.Thread): commandLine = json_task['command'], packagePath = json_task['srcAddr'], envVars = {}), - stderrRedirectPath = json_task['stdErrRedPth'], - stdoutRedirectPath = json_task['stdOutRedPth']), + stderrRedirectPath = json_task.get('stdErrRedPth',""), + stdoutRedirectPath = json_task.get('stdOutRedPth',"")), cluster = Cluster( image = Image( name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'], diff --git a/src/master/testTaskCtrler.py b/src/master/testTaskCtrler.py index 09899e0..be36ef3 100644 --- a/src/master/testTaskCtrler.py +++ b/src/master/testTaskCtrler.py @@ -10,15 +10,15 @@ def run(): channel = grpc.insecure_channel('localhost:50051') stub = rpc_pb2_grpc.WorkerStub(channel) - comm = rpc_pb2.Command(commandLine="echo \"stestsfdsf\\ntewtgsdgfdsgret\newarsafsda\" > /root/test.txt;ls /root;sleep 100", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' + comm = rpc_pb2.Command(commandLine="python3 CNN.py", packagePath="/root/nfs/17flowers", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="") - img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet") - inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0) + img = rpc_pb2.Image(name="17flowers", type=rpc_pb2.Image.PRIVATE, owner="root") + inst = rpc_pb2.Instance(cpu=4, memory=4000, disk=8000, gpu=2) mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="") clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[]) - task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=600,token="test") + task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token="test") response = stub.process_task(task) print("Batch client received: " + str(response.status)+" "+response.message) @@ -35,5 +35,5 @@ def stop_task(): if __name__ == '__main__': run() - time.sleep(2) - stop_task() + #time.sleep(4) + #stop_task() diff --git a/src/utils/gputools.py b/src/utils/gputools.py index 01aad8e..03f5bef 100644 --- a/src/utils/gputools.py +++ b/src/utils/gputools.py @@ -45,6 +45,8 @@ def nvidia_smi(): return ret.stdout.decode('utf-8').split('\n') except subprocess.CalledProcessError: return None + except Exception as e: + return None def get_gpu_driver_version(): diff --git a/src/worker/monitor.py b/src/worker/monitor.py index 0fe7f26..99170ed 100755 --- a/src/worker/monitor.py +++ b/src/worker/monitor.py @@ -544,7 +544,7 @@ class Collector(threading.Thread): workerinfo['gpuinfo'] = self.collect_gpuinfo() workerinfo['diskinfo'] = self.collect_diskinfo() workerinfo['running'] = True - #time.sleep(self.interval) + time.sleep(self.interval) if self.test: break # print(self.etcdser.getkey('/meminfo/total')) diff --git a/src/worker/taskcontroller.py b/src/worker/taskcontroller.py index 05a8362..3f0a7aa 100755 --- a/src/worker/taskcontroller.py +++ b/src/worker/taskcontroller.py @@ -15,7 +15,7 @@ import grpc #from utils.log import logger #from utils import env import json,lxc,subprocess,threading,os,time,traceback -from utils import imagemgr,etcdlib +from utils import imagemgr,etcdlib,gputools from worker import ossmounter from protos import rpc_pb2, rpc_pb2_grpc @@ -80,6 +80,12 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): self.free_ips.append(i) logger.info("Free ip addresses pool %s" % str(self.free_ips)) + self.gpu_lock = threading.Lock() + self.gpu_status = {} + gpus = gputools.get_gpu_status() + for gpu in gpus: + self.gpu_status[gpu['id']] = "" + self.start_report() logger.info('TaskController init success') @@ -102,6 +108,42 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info(str(self.free_ips)) self.lock.release() + def add_gpu_device(self, lxcname, gpu_need): + if gpu_need < 1: + return [True, ""] + self.gpu_lock.acquire() + use_gpus = [] + for gpuid in self.gpu_status.keys(): + if self.gpu_status[gpuid] == "": + use_gpus.append(gpuid) + if len(use_gpus) < gpu_need: + self.gpu_lock.release() + return [False, "No free GPUs"] + for gpuid in use_gpus: + self.gpu_status[gpuid] = lxcname + try: + gputools.add_device(lxcname, "/dev/nvidiactl") + gputools.add_device(lxcname, "/dev/nvidia-uvm") + for gpuid in use_gpus: + gputools.add_device(lxcname,"/dev/nvidia"+str(gpuid)) + logger.info("Add gpu:"+str(gpuid) +" to lxc:"+str(lxcname)) + except Exception as e: + logger.error(traceback.format_exc()) + for gpuid in use_gpus: + self.gpu_status[gpuid] = "" + self.gpu_lock.release() + return [False, "Error occurs when adding gpu device."] + + self.gpu_lock.release() + return [True, ""] + + def release_gpu_device(self, lxcname): + self.gpu_lock.acquire() + for gpuid in self.gpu_status.keys(): + if self.gpu_status[gpuid] == lxcname: + self.gpu_status[gpuid] = "" + self.gpu_lock.release() + #mount_oss def mount_oss(self, datapath, mount_info): self.mount_lock.acquire() @@ -170,6 +212,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): if outpath[i] == "": outpath[i] = "/root/nfs/" timeout = request.timeout + gpu_need = int(request.cluster.instance.gpu) # acquire ip [status, ip] = self.acquire_ip() @@ -225,9 +268,19 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): if not container.start(): logger.error('start container %s failed' % lxcname) self.release_ip(ip) + self.imgmgr.deleteFS(lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container") - else: - logger.info('start container %s success' % lxcname) + + logger.info('start container %s success' % lxcname) + + #add GPU + [success, msg] = self.add_gpu_device(lxcname,gpu_need) + if not success: + logger.error("Fail to add gpu device. " + msg) + container.stop() + self.release_ip(ip) + self.imgmgr.deleteFS(lxcname) + return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg) thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list)) thread.setDaemon(True) @@ -252,6 +305,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): if ret.returncode != 0: msg = "Fail to move output_tmp.txt to nfs/%s" % tmpfilename logger.error(msg) + logger.error(ret.stdout) return [False,msg] logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename) @@ -337,6 +391,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info("release ip address %s" % ip) self.release_ip(ip) + self.release_gpu_device(lxcname) #umount oss self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_info) @@ -344,6 +399,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): def stop_tasks(self, request, context): for msg in request.taskmsgs: lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token) + logger.info("Stop the task with lxc:"+lxcname) subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") diff --git a/src/worker/worker.py b/src/worker/worker.py index 5977a5a..fb4e324 100755 --- a/src/worker/worker.py +++ b/src/worker/worker.py @@ -220,7 +220,7 @@ class Worker(object): if status: # master has know the worker so we start send heartbeat package if value=='ok': - self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 3) + self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) else: logger.error("get key %s failed, master may be crashed" % self.addr) self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60)