Merge pull request #317 from FirmlyReality/batch

Batch
This commit is contained in:
Yujian Zhu 2018-07-31 17:24:54 +08:00 committed by GitHub
commit 220b4308e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 18 deletions

View File

@ -10,15 +10,15 @@ def run():
channel = grpc.insecure_channel('localhost:50051')
stub = rpc_pb2_grpc.WorkerStub(channel)
comm = rpc_pb2.Command(commandLine=r"echo \"s\" | awk '{print \"test\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="", stdoutRedirectPath="")
comm = rpc_pb2.Command(commandLine="echo \"s\" | awk '{print \"test\\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="tmp/", stdoutRedirectPath="tmp/")
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0)
mnt = rpc_pb2.Mount(localPath="",remotePath="")
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt])
task = rpc_pb2.Task(id="test",username="root",instanceid=0,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=10)
task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=0,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=10,token="test")
response = stub.process_task(task)
print("Batch client received: " + str(response.status)+" "+response.message)

View File

@ -14,7 +14,7 @@ from concurrent import futures
import grpc
#from utils.log import logger
#from utils import env
import json,lxc,subprocess,threading,os,time
import json,lxc,subprocess,threading,os,time,traceback
from utils import imagemgr
from protos import rpc_pb2, rpc_pb2_grpc
@ -90,6 +90,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
username = request.username
lxcname = '%s-batch-%s-%s' % (username,taskid,str(instanceid))
instance_type = request.cluster.instance
outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath]
# acquire ip
[status, ip] = self.acquire_ip()
@ -145,26 +146,59 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
#mount oss here
thread = threading.Thread(target = self.excute_task, args=(taskid,instanceid,envs,lxcname,pkgpath,command,ip))
thread = threading.Thread(target = self.excute_task, args=(taskid,instanceid,envs,lxcname,pkgpath,command,outpath,ip))
thread.setDaemon(True)
thread.start()
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,ip):
cmd = "lxc-attach -n " + lxcname
for envkey,envval in envs.items():
cmd = cmd + " -v %s=%s" % (envkey,envval)
cmd = cmd + " -- /bin/bash -c " + "\"cd " + pkgpath + ";" + command + "\""
logger.info('run task with command - %s' % cmd)
ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
logger.info(ret)
if ret.returncode == 0:
#call master rpc function to tell the taskmgr
pass
def write_output(self,content,path):
dirpath = path[:path.rfind("/")]
if not os.path.isdir(dirpath):
logger.info("Output directory doesn't exist. Create (%s)" % dirpath)
os.makedirs(dirpath)
try:
outfile = open(path,"w")
outfile.write(content.decode(encoding="utf-8"))
outfile.close()
except Exception as err:
logger.error(traceback.format_exc())
logger.error("Fail to write to path(%s)" % path)
else:
#call master rpc function to tell the wrong
pass
logger.info("Succeed to writing to %s" % path)
def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,outpath,ip):
lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/"
scriptname = "batch_job.sh"
try:
scriptfile = open(lxcfspath+"root/"+scriptname,"w")
scriptfile.write("#!/bin/bash\n")
scriptfile.write("cd "+str(pkgpath)+"\n")
scriptfile.write(command)
scriptfile.close()
except Exception as err:
logger.error(traceback.format_exc())
logger.error("Fail to write script file with taskid(%s) instanceid(%s)" % (str(taskid),str(instanceid)))
else:
cmd = "lxc-attach -n " + lxcname
for envkey,envval in envs.items():
cmd = cmd + " -v %s=%s" % (envkey,envval)
cmd = cmd + " -- /bin/bash \"" + "/root/" + scriptname + "\""
logger.info('run task with command - %s' % cmd)
ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
logger.info(ret)
if outpath[0][-1] == "/":
outpath[0] += "stdout.txt"
if outpath[1][-1] == "/":
outpath[1] += "stderr.txt"
self.write_output(ret.stdout,lxcfspath+outpath[0])
self.write_output(ret.stderr,lxcfspath+outpath[1])
if ret.returncode == 0:
#call master rpc function to tell the taskmgr
pass
else:
#call master rpc function to tell the wrong
pass
#umount oss here