Use subprocess.popen to redirect task stdout and stderr to root/nfs
This commit is contained in:
parent
831b0ea94e
commit
d29508775f
|
@ -208,9 +208,6 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
instance_type = request.cluster.instance
|
||||
mount_list = request.cluster.mount
|
||||
outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath]
|
||||
for i in range(len(outpath)):
|
||||
if outpath[i] == "":
|
||||
outpath[i] = "/root/nfs/"
|
||||
timeout = request.timeout
|
||||
gpu_need = int(request.cluster.instance.gpu)
|
||||
|
||||
|
@ -288,35 +285,16 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
|
||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
||||
|
||||
def write_output(self,lxcname,tmpfilename,content,lxcfspath,filepath):
|
||||
try:
|
||||
outfile = open(lxcfspath+"/root/output_tmp.txt","w")
|
||||
outfile.write(content.decode(encoding="utf-8"))
|
||||
outfile.close()
|
||||
except Exception as err:
|
||||
logger.error(traceback.format_exc())
|
||||
msg = "Fail to write to path(%s)" % (lxcfspath+"/root/output_tmp.txt")
|
||||
logger.error(msg)
|
||||
return [False,msg]
|
||||
logger.info("Succeed to writing to %s" % (lxcfspath+"/root/output_tmp.txt"))
|
||||
|
||||
def write_output(self,lxcname,tmplogpath,filepath):
|
||||
cmd = "lxc-attach -n " + lxcname + " -- mv %s %s"
|
||||
ret = subprocess.run(cmd % ("/root/output_tmp.txt","/root/nfs/"+tmpfilename),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
|
||||
if ret.returncode != 0:
|
||||
msg = "Fail to move output_tmp.txt to nfs/%s" % tmpfilename
|
||||
logger.error(msg)
|
||||
logger.error(ret.stdout)
|
||||
return [False,msg]
|
||||
logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename)
|
||||
|
||||
if os.path.abspath("/root/nfs/"+tmpfilename) == os.path.abspath(filepath):
|
||||
if filepath == "" or filepath == "/root/nfs"+tmplogpath or os.path.abspath("/root/nfs/"+tmplogpath) == os.path.abspath(filepath):
|
||||
return [True,""]
|
||||
ret = subprocess.run(cmd % ("/root/nfs/"+tmpfilename,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
|
||||
ret = subprocess.run(cmd % ("/root/nfs/"+tmplogpath,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
|
||||
if ret.returncode != 0:
|
||||
msg = ret.stdout.decode(encoding="utf-8")
|
||||
logger.error(msg)
|
||||
return [False,msg]
|
||||
logger.info("Succeed to moving nfs/%s to %s" % (tmpfilename,filepath))
|
||||
logger.info("Succeed to moving nfs/%s to %s" % (tmplogpath,filepath))
|
||||
return [True,""]
|
||||
|
||||
def execute_task(self,username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_info):
|
||||
|
@ -332,12 +310,33 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
logger.error(traceback.format_exc())
|
||||
logger.error("Fail to write script file with taskid(%s) instanceid(%s)" % (str(taskid),str(instanceid)))
|
||||
else:
|
||||
try:
|
||||
job_id = taskid.split('_')[1]
|
||||
except Exception as e:
|
||||
logger.error(traceback.format_exc())
|
||||
job_id = "_none"
|
||||
jobdir = "batch_" + job_id
|
||||
logdir = "%s/global/users/%s/data/" % (self.fspath,username) + jobdir
|
||||
if not os.path.exists(logdir):
|
||||
logger.info("Directory:%s not exists, create it." % logdir)
|
||||
os.mkdir(logdir)
|
||||
stdoutname = str(taskid)+"_"+str(instanceid)+"_"+token+"_stdout.txt"
|
||||
stderrname = str(taskid)+"_"+str(instanceid)+"_"+token+"_stderr.txt"
|
||||
try:
|
||||
stdoutfile = open(logdir+"/"+stdoutname,"w")
|
||||
stderrfile = open(logdir+"/"+stderrname,"w")
|
||||
logger.info("Create stdout(%s) and stderr(%s) file to log" % (stdoutname, stderrname))
|
||||
except Exception as e:
|
||||
logger.error(traceback.format_exc())
|
||||
stdoutfile = None
|
||||
stderrfile = None
|
||||
|
||||
cmd = "lxc-attach -n " + lxcname
|
||||
for envkey,envval in envs.items():
|
||||
cmd = cmd + " -v %s=%s" % (envkey,envval)
|
||||
cmd = cmd + " -- /bin/bash \"" + "/root/" + scriptname + "\""
|
||||
logger.info('run task with command - %s' % cmd)
|
||||
p = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
|
||||
p = subprocess.Popen(cmd,stdout=stdoutfile,stderr=stderrfile, shell=True)
|
||||
#logger.info(p)
|
||||
if timeout == 0:
|
||||
to = MAX_RUNNING_TIME
|
||||
|
@ -351,17 +350,12 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
logger.info("Running time(%d) is out. Task(%s-%s-%s) will be killed." % (timeout,str(taskid),str(instanceid),token))
|
||||
self.add_msg(taskid,username,instanceid,rpc_pb2.TIMEOUT,token,"Running time is out.")
|
||||
else:
|
||||
out,err = p.communicate()
|
||||
logger.info(out)
|
||||
logger.info(err)
|
||||
stdoutname = str(taskid)+"-"+str(instanceid)+"-stdout.txt"
|
||||
stderrname = str(taskid)+"-"+str(instanceid)+"-stderr.txt"
|
||||
if outpath[0][-1] == "/":
|
||||
if len(outpath[0]) > 0 and outpath[0][-1] == "/":
|
||||
outpath[0] += stdoutname
|
||||
if outpath[1][-1] == "/":
|
||||
if len(outpath[1]) > 0 and outpath[1][-1] == "/":
|
||||
outpath[1] += stderrname
|
||||
[success1,msg1] = self.write_output(lxcname,stdoutname,out,lxcfspath,outpath[0])
|
||||
[success2,msg2] = self.write_output(lxcname,stderrname,err,lxcfspath,outpath[1])
|
||||
[success1,msg1] = self.write_output(lxcname,jobdir+"/"+stdoutname,outpath[0])
|
||||
[success2,msg2] = self.write_output(lxcname,jobdir+"/"+stderrname,outpath[1])
|
||||
if not success1 or not success2:
|
||||
if not success1:
|
||||
msg = msg1
|
||||
|
|
Loading…
Reference in New Issue