diff --git a/src/master/testTaskCtrler.py b/src/master/testTaskCtrler.py index 907703b..1aa5932 100644 --- a/src/master/testTaskCtrler.py +++ b/src/master/testTaskCtrler.py @@ -11,7 +11,7 @@ def run(): stub = rpc_pb2_grpc.WorkerStub(channel) comm = rpc_pb2.Command(commandLine="echo \"s\" | awk '{print \"test\\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' - paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="tmp/", stdoutRedirectPath="tmp/") + paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/test1/test2/", stdoutRedirectPath="") img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet") inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0) diff --git a/src/worker/taskcontroller.py b/src/worker/taskcontroller.py index 1716eb6..174f1ce 100755 --- a/src/worker/taskcontroller.py +++ b/src/worker/taskcontroller.py @@ -91,6 +91,9 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): lxcname = '%s-batch-%s-%s' % (username,taskid,str(instanceid)) instance_type = request.cluster.instance outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath] + for i in range(len(outpath)): + if outpath[i] == "": + outpath[i] = "/root/nfs/" # acquire ip [status, ip] = self.acquire_ip() @@ -152,20 +155,35 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") - def write_output(self,content,path): - dirpath = path[:path.rfind("/")] - if not os.path.isdir(dirpath): - logger.info("Output directory doesn't exist. Create (%s)" % dirpath) - os.makedirs(dirpath) + def write_output(self,lxcname,tmpfilename,content,lxcfspath,filepath): try: - outfile = open(path,"w") + outfile = open(lxcfspath+"/root/output_tmp.txt","w") outfile.write(content.decode(encoding="utf-8")) outfile.close() except Exception as err: logger.error(traceback.format_exc()) - logger.error("Fail to write to path(%s)" % path) - else: - logger.info("Succeed to writing to %s" % path) + msg = "Fail to write to path(%s)" % (lxcfspath+"/root/output_tmp.txt") + logger.error(msg) + return [False,msg] + logger.info("Succeed to writing to %s" % (lxcfspath+"/root/output_tmp.txt")) + + cmd = "lxc-attach -n " + lxcname + " -- mv %s %s" + ret = subprocess.run(cmd % ("/root/output_tmp.txt","/root/nfs/"+tmpfilename),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) + if ret.returncode != 0: + msg = "Fail to move output_tmp.txt to nfs/%s" % tmpfilename + logger.error(msg) + return [False,msg] + logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename) + + if "/root/nfs/"+tmpfilename == filepath: + return [True,""] + ret = subprocess.run(cmd % ("/root/nfs/"+tmpfilename,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) + if ret.returncode != 0: + msg = ret.stdout.decode(encoding="utf-8") + logger.error(msg) + return [False,msg] + logger.info("Succeed to moving nfs/%s to %s" % (tmpfilename,filepath)) + return [True,""] def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,outpath,ip): lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/" @@ -187,12 +205,14 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info('run task with command - %s' % cmd) ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True) logger.info(ret) + stdoutname = str(taskid)+"-"+str(instanceid)+"-stdout.txt" + stderrname = str(taskid)+"-"+str(instanceid)+"-stderr.txt" if outpath[0][-1] == "/": - outpath[0] += "stdout.txt" + outpath[0] += stdoutname if outpath[1][-1] == "/": - outpath[1] += "stderr.txt" - self.write_output(ret.stdout,lxcfspath+outpath[0]) - self.write_output(ret.stderr,lxcfspath+outpath[1]) + outpath[1] += stderrname + [success,msg] = self.write_output(lxcname,stdoutname,ret.stdout,lxcfspath,outpath[0]) + [success,msg] = self.write_output(lxcname,stderrname,ret.stderr,lxcfspath,outpath[1]) if ret.returncode == 0: #call master rpc function to tell the taskmgr pass