From eae99bf275677165dd400444723595b86575fbb9 Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Mon, 30 Jul 2018 17:16:26 +0800 Subject: [PATCH 1/2] Use script to execute command --- src/master/testTaskCtrler.py | 4 ++-- src/worker/taskcontroller.py | 38 ++++++++++++++++++++++++------------ 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/master/testTaskCtrler.py b/src/master/testTaskCtrler.py index 0ea7d81..5b52992 100644 --- a/src/master/testTaskCtrler.py +++ b/src/master/testTaskCtrler.py @@ -10,7 +10,7 @@ def run(): channel = grpc.insecure_channel('localhost:50051') stub = rpc_pb2_grpc.WorkerStub(channel) - comm = rpc_pb2.Command(commandLine=r"echo \"s\" | awk '{print \"test\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' + comm = rpc_pb2.Command(commandLine="echo \"s\" | awk '{print \"test\\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="", stdoutRedirectPath="") img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet") @@ -18,7 +18,7 @@ def run(): mnt = rpc_pb2.Mount(localPath="",remotePath="") clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt]) - task = rpc_pb2.Task(id="test",username="root",instanceid=0,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=10) + task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=0,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=10,token="test") response = stub.process_task(task) print("Batch client received: " + str(response.status)+" "+response.message) diff --git a/src/worker/taskcontroller.py b/src/worker/taskcontroller.py index a1bc028..74c5a65 100755 --- a/src/worker/taskcontroller.py +++ b/src/worker/taskcontroller.py @@ -14,7 +14,7 @@ from concurrent import futures import grpc #from utils.log import logger #from utils import env -import json,lxc,subprocess,threading,os,time +import json,lxc,subprocess,threading,os,time,traceback from utils import imagemgr from protos import rpc_pb2, rpc_pb2_grpc @@ -152,19 +152,31 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,ip): - cmd = "lxc-attach -n " + lxcname - for envkey,envval in envs.items(): - cmd = cmd + " -v %s=%s" % (envkey,envval) - cmd = cmd + " -- /bin/bash -c " + "\"cd " + pkgpath + ";" + command + "\"" - logger.info('run task with command - %s' % cmd) - ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) - logger.info(ret) - if ret.returncode == 0: - #call master rpc function to tell the taskmgr - pass + lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs" + scriptname = "batch_job.sh" + try: + scriptfile = open(lxcfspath+"/root/"+scriptname,"w") + scriptfile.write("#!/bin/bash\n") + scriptfile.write("cd "+str(pkgpath)+"\n") + scriptfile.write(command) + scriptfile.close() + except Exception as err: + logger.error(traceback.format_exc()) + logger.error("Fail to write script file with taskid(%s) instanceid(%s)" % (str(taskid),str(instanceid))) else: - #call master rpc function to tell the wrong - pass + cmd = "lxc-attach -n " + lxcname + for envkey,envval in envs.items(): + cmd = cmd + " -v %s=%s" % (envkey,envval) + cmd = cmd + " -- /bin/bash \"" + "/root/" + scriptname + "\"" + logger.info('run task with command - %s' % cmd) + ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) + logger.info(ret) + if ret.returncode == 0: + #call master rpc function to tell the taskmgr + pass + else: + #call master rpc function to tell the wrong + pass #umount oss here From 060d9d49f616aaeff3ec5ebafccc9a62cdd0d790 Mon Sep 17 00:00:00 2001 From: zhuyj17 Date: Tue, 31 Jul 2018 17:23:15 +0800 Subject: [PATCH 2/2] add RedirectPath --- src/master/testTaskCtrler.py | 2 +- src/worker/taskcontroller.py | 32 +++++++++++++++++++++++++++----- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/src/master/testTaskCtrler.py b/src/master/testTaskCtrler.py index 5b52992..907703b 100644 --- a/src/master/testTaskCtrler.py +++ b/src/master/testTaskCtrler.py @@ -11,7 +11,7 @@ def run(): stub = rpc_pb2_grpc.WorkerStub(channel) comm = rpc_pb2.Command(commandLine="echo \"s\" | awk '{print \"test\\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' - paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="", stdoutRedirectPath="") + paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="tmp/", stdoutRedirectPath="tmp/") img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet") inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0) diff --git a/src/worker/taskcontroller.py b/src/worker/taskcontroller.py index 74c5a65..1716eb6 100755 --- a/src/worker/taskcontroller.py +++ b/src/worker/taskcontroller.py @@ -90,6 +90,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): username = request.username lxcname = '%s-batch-%s-%s' % (username,taskid,str(instanceid)) instance_type = request.cluster.instance + outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath] # acquire ip [status, ip] = self.acquire_ip() @@ -145,17 +146,32 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): #mount oss here - thread = threading.Thread(target = self.excute_task, args=(taskid,instanceid,envs,lxcname,pkgpath,command,ip)) + thread = threading.Thread(target = self.excute_task, args=(taskid,instanceid,envs,lxcname,pkgpath,command,outpath,ip)) thread.setDaemon(True) thread.start() return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") - def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,ip): - lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs" + def write_output(self,content,path): + dirpath = path[:path.rfind("/")] + if not os.path.isdir(dirpath): + logger.info("Output directory doesn't exist. Create (%s)" % dirpath) + os.makedirs(dirpath) + try: + outfile = open(path,"w") + outfile.write(content.decode(encoding="utf-8")) + outfile.close() + except Exception as err: + logger.error(traceback.format_exc()) + logger.error("Fail to write to path(%s)" % path) + else: + logger.info("Succeed to writing to %s" % path) + + def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,outpath,ip): + lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/" scriptname = "batch_job.sh" try: - scriptfile = open(lxcfspath+"/root/"+scriptname,"w") + scriptfile = open(lxcfspath+"root/"+scriptname,"w") scriptfile.write("#!/bin/bash\n") scriptfile.write("cd "+str(pkgpath)+"\n") scriptfile.write(command) @@ -169,8 +185,14 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): cmd = cmd + " -v %s=%s" % (envkey,envval) cmd = cmd + " -- /bin/bash \"" + "/root/" + scriptname + "\"" logger.info('run task with command - %s' % cmd) - ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) + ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True) logger.info(ret) + if outpath[0][-1] == "/": + outpath[0] += "stdout.txt" + if outpath[1][-1] == "/": + outpath[1] += "stderr.txt" + self.write_output(ret.stdout,lxcfspath+outpath[0]) + self.write_output(ret.stderr,lxcfspath+outpath[1]) if ret.returncode == 0: #call master rpc function to tell the taskmgr pass