change the way to write stdout and stderr to file
This commit is contained in:
parent
30b2b88eb8
commit
3bb6feb220
|
@ -11,7 +11,7 @@ def run():
|
||||||
stub = rpc_pb2_grpc.WorkerStub(channel)
|
stub = rpc_pb2_grpc.WorkerStub(channel)
|
||||||
|
|
||||||
comm = rpc_pb2.Command(commandLine="echo \"s\" | awk '{print \"test\\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
comm = rpc_pb2.Command(commandLine="echo \"s\" | awk '{print \"test\\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
||||||
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="tmp/", stdoutRedirectPath="tmp/")
|
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/test1/test2/", stdoutRedirectPath="")
|
||||||
|
|
||||||
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
|
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
|
||||||
inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0)
|
inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0)
|
||||||
|
|
|
@ -91,6 +91,9 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
lxcname = '%s-batch-%s-%s' % (username,taskid,str(instanceid))
|
lxcname = '%s-batch-%s-%s' % (username,taskid,str(instanceid))
|
||||||
instance_type = request.cluster.instance
|
instance_type = request.cluster.instance
|
||||||
outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath]
|
outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath]
|
||||||
|
for i in range(len(outpath)):
|
||||||
|
if outpath[i] == "":
|
||||||
|
outpath[i] = "/root/nfs/"
|
||||||
|
|
||||||
# acquire ip
|
# acquire ip
|
||||||
[status, ip] = self.acquire_ip()
|
[status, ip] = self.acquire_ip()
|
||||||
|
@ -152,20 +155,35 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
|
|
||||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
||||||
|
|
||||||
def write_output(self,content,path):
|
def write_output(self,lxcname,tmpfilename,content,lxcfspath,filepath):
|
||||||
dirpath = path[:path.rfind("/")]
|
|
||||||
if not os.path.isdir(dirpath):
|
|
||||||
logger.info("Output directory doesn't exist. Create (%s)" % dirpath)
|
|
||||||
os.makedirs(dirpath)
|
|
||||||
try:
|
try:
|
||||||
outfile = open(path,"w")
|
outfile = open(lxcfspath+"/root/output_tmp.txt","w")
|
||||||
outfile.write(content.decode(encoding="utf-8"))
|
outfile.write(content.decode(encoding="utf-8"))
|
||||||
outfile.close()
|
outfile.close()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
logger.error("Fail to write to path(%s)" % path)
|
msg = "Fail to write to path(%s)" % (lxcfspath+"/root/output_tmp.txt")
|
||||||
else:
|
logger.error(msg)
|
||||||
logger.info("Succeed to writing to %s" % path)
|
return [False,msg]
|
||||||
|
logger.info("Succeed to writing to %s" % (lxcfspath+"/root/output_tmp.txt"))
|
||||||
|
|
||||||
|
cmd = "lxc-attach -n " + lxcname + " -- mv %s %s"
|
||||||
|
ret = subprocess.run(cmd % ("/root/output_tmp.txt","/root/nfs/"+tmpfilename),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
|
||||||
|
if ret.returncode != 0:
|
||||||
|
msg = "Fail to move output_tmp.txt to nfs/%s" % tmpfilename
|
||||||
|
logger.error(msg)
|
||||||
|
return [False,msg]
|
||||||
|
logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename)
|
||||||
|
|
||||||
|
if "/root/nfs/"+tmpfilename == filepath:
|
||||||
|
return [True,""]
|
||||||
|
ret = subprocess.run(cmd % ("/root/nfs/"+tmpfilename,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
|
||||||
|
if ret.returncode != 0:
|
||||||
|
msg = ret.stdout.decode(encoding="utf-8")
|
||||||
|
logger.error(msg)
|
||||||
|
return [False,msg]
|
||||||
|
logger.info("Succeed to moving nfs/%s to %s" % (tmpfilename,filepath))
|
||||||
|
return [True,""]
|
||||||
|
|
||||||
def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,outpath,ip):
|
def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,outpath,ip):
|
||||||
lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/"
|
lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/"
|
||||||
|
@ -187,12 +205,14 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
logger.info('run task with command - %s' % cmd)
|
logger.info('run task with command - %s' % cmd)
|
||||||
ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
|
ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
|
||||||
logger.info(ret)
|
logger.info(ret)
|
||||||
|
stdoutname = str(taskid)+"-"+str(instanceid)+"-stdout.txt"
|
||||||
|
stderrname = str(taskid)+"-"+str(instanceid)+"-stderr.txt"
|
||||||
if outpath[0][-1] == "/":
|
if outpath[0][-1] == "/":
|
||||||
outpath[0] += "stdout.txt"
|
outpath[0] += stdoutname
|
||||||
if outpath[1][-1] == "/":
|
if outpath[1][-1] == "/":
|
||||||
outpath[1] += "stderr.txt"
|
outpath[1] += stderrname
|
||||||
self.write_output(ret.stdout,lxcfspath+outpath[0])
|
[success,msg] = self.write_output(lxcname,stdoutname,ret.stdout,lxcfspath,outpath[0])
|
||||||
self.write_output(ret.stderr,lxcfspath+outpath[1])
|
[success,msg] = self.write_output(lxcname,stderrname,ret.stderr,lxcfspath,outpath[1])
|
||||||
if ret.returncode == 0:
|
if ret.returncode == 0:
|
||||||
#call master rpc function to tell the taskmgr
|
#call master rpc function to tell the taskmgr
|
||||||
pass
|
pass
|
||||||
|
|
Loading…
Reference in New Issue