From 4e222d46be7bd13be3c662b3624b33065bca2f29 Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Mon, 4 Mar 2019 11:50:50 +0800 Subject: [PATCH] update taskworker.py --- src/master/testTaskWorker.py | 15 ++++- src/worker/taskworker.py | 107 ++++++++++++++++++++++++++++++++++- 2 files changed, 118 insertions(+), 4 deletions(-) diff --git a/src/master/testTaskWorker.py b/src/master/testTaskWorker.py index 776329d..081a61f 100644 --- a/src/master/testTaskWorker.py +++ b/src/master/testTaskWorker.py @@ -44,9 +44,22 @@ def stop_vnode(): response = stub.stop_vnode(vnodeinfo) print("Batch client received: " + str(response.status)+" "+response.message) +def start_task(): + channel = grpc.insecure_channel('localhost:50051') + stub = rpc_pb2_grpc.WorkerStub(channel) + + comm = rpc_pb2.Command(commandLine="ls /root;sleep 5;ls /root", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' + paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/batch_{jobid}/", stdoutRedirectPath="/root/nfs/batch_{jobid}/") + taskinfo = rpc_pb2.TaskInfo(taskid="test",username="root",vnodeid=1,parameters=paras,timeout=20,token="test") + + response = stub.start_task(taskinfo) + print("Batch client received: " + str(response.status)+" "+response.message) + + if __name__ == '__main__': #for i in range(10): - run() + #run() + start_task() #stop_vnode() #time.sleep(4) #stop_task() diff --git a/src/worker/taskworker.py b/src/worker/taskworker.py index 4ebafd6..f6244da 100644 --- a/src/worker/taskworker.py +++ b/src/worker/taskworker.py @@ -70,7 +70,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): #self.start_report() logger.info('TaskWorker init success') - + def add_gpu_device(self, lxcname, gpu_need): if gpu_need < 1: return [True, ""] @@ -205,11 +205,30 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") def start_task(self, request, context): - pass + taskid = request.taskid + username = request.username + vnodeid = request.vnodeid + # get config from request + command = request.parameters.command.commandLine #'/root/getenv.sh' #parameter['Parameters']['Command']['CommandLine'] + #envs = {'MYENV1':'MYVAL1', 'MYENV2':'MYVAL2'} #parameters['Parameters']['Command']['EnvVars'] + pkgpath = request.parameters.command.packagePath + envs = request.parameters.command.envVars + envs['taskid'] = str(taskid) + envs['vnodeid'] = str(vnodeid) + timeout = request.timeout + token = request.token + outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath] + lxcname = '%s-batch-%s-%s' % (username,taskid,str(vnodeid)) + + thread = threading.Thread(target = self.execute_task, args=(username,taskid,vnodeid,envs,lxcname,pkgpath,command,timeout,outpath,token)) + thread.setDaemon(True) + thread.start() + + return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") def stop_task(self, request, context): for msg in request.taskmsgs: - lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token) + lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.vnodeid),msg.token) logger.info("Stop the task with lxc:"+lxcname) subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") @@ -287,6 +306,88 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): conffile.close() return [True, ""] + def write_output(self,lxcname,tmplogpath,filepath): + cmd = "lxc-attach -n " + lxcname + " -- mv %s %s" + if filepath == "" or filepath == "/root/nfs/batch_{jobid}/" or os.path.abspath("/root/nfs/"+tmplogpath) == os.path.abspath(filepath): + return [True,""] + ret = subprocess.run(cmd % ("/root/nfs/"+tmplogpath,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) + if ret.returncode != 0: + msg = ret.stdout.decode(encoding="utf-8") + logger.error(msg) + return [False,msg] + logger.info("Succeed to moving nfs/%s to %s" % (tmplogpath,filepath)) + return [True,""] + + def execute_task(self,username,taskid,vnodeid,envs,lxcname,pkgpath,command,timeout,outpath,token): + lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/" + scriptname = "batch_job.sh" + try: + scriptfile = open(lxcfspath+"root/"+scriptname,"w") + scriptfile.write("#!/bin/bash\n") + scriptfile.write("cd "+str(pkgpath)+"\n") + scriptfile.write(command) + scriptfile.close() + except Exception as err: + logger.error(traceback.format_exc()) + logger.error("Fail to write script file with taskid(%s) vnodeid(%s)" % (str(taskid),str(vnodeid))) + else: + try: + job_id = taskid.split('_')[1] + except Exception as e: + logger.error(traceback.format_exc()) + job_id = "_none" + jobdir = "batch_" + job_id + logdir = "%s/global/users/%s/data/" % (self.fspath,username) + jobdir + if not os.path.exists(logdir): + logger.info("Directory:%s not exists, create it." % logdir) + os.mkdir(logdir) + stdoutname = str(taskid)+"_"+str(vnodeid)+"_stdout.txt" + stderrname = str(taskid)+"_"+str(vnodeid)+"_stderr.txt" + try: + stdoutfile = open(logdir+"/"+stdoutname,"w") + stderrfile = open(logdir+"/"+stderrname,"w") + logger.info("Create stdout(%s) and stderr(%s) file to log" % (stdoutname, stderrname)) + except Exception as e: + logger.error(traceback.format_exc()) + stdoutfile = None + stderrfile = None + + cmd = "lxc-attach -n " + lxcname + for envkey,envval in envs.items(): + cmd = cmd + " -v %s=%s" % (envkey,envval) + cmd = cmd + " -- /bin/bash \"" + "/root/" + scriptname + "\"" + logger.info('run task with command - %s' % cmd) + p = subprocess.Popen(cmd,stdout=stdoutfile,stderr=stderrfile, shell=True) + #logger.info(p) + if timeout == 0: + to = MAX_RUNNING_TIME + else: + to = timeout + while p.poll() is None and to > 0: + time.sleep(min(2,to)) + to -= 2 + if p.poll() is None: + p.kill() + logger.info("Running time(%d) is out. Task(%s-%s-%s) will be killed." % (timeout,str(taskid),str(vnodeid),token)) + self.add_msg(taskid,username,vnodeid,rpc_pb2.TIMEOUT,token,"Running time is out.") + else: + [success1,msg1] = self.write_output(lxcname,jobdir+"/"+stdoutname,outpath[0]) + [success2,msg2] = self.write_output(lxcname,jobdir+"/"+stderrname,outpath[1]) + if not success1 or not success2: + if not success1: + msg = msg1 + else: + msg = msg2 + logger.info("Output error on Task(%s-%s-%s)." % (str(taskid),str(vnodeid),token)) + self.add_msg(taskid,username,vnodeid,rpc_pb2.OUTPUTERROR,token,msg) + else: + if p.poll() == 0: + logger.info("Task(%s-%s-%s) completed." % (str(taskid),str(vnodeid),token)) + self.add_msg(taskid,username,vnodeid,rpc_pb2.COMPLETED,token,"") + else: + logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(vnodeid),token)) + self.add_msg(taskid,username,vnodeid,rpc_pb2.FAILED,token,"") + def add_msg(self,taskid,username,vnodeid,status,token,errmsg): self.msgslock.acquire() try: