update taskworker.py
This commit is contained in:
parent
bd732e679f
commit
4e222d46be
|
@ -44,9 +44,22 @@ def stop_vnode():
|
||||||
response = stub.stop_vnode(vnodeinfo)
|
response = stub.stop_vnode(vnodeinfo)
|
||||||
print("Batch client received: " + str(response.status)+" "+response.message)
|
print("Batch client received: " + str(response.status)+" "+response.message)
|
||||||
|
|
||||||
|
def start_task():
|
||||||
|
channel = grpc.insecure_channel('localhost:50051')
|
||||||
|
stub = rpc_pb2_grpc.WorkerStub(channel)
|
||||||
|
|
||||||
|
comm = rpc_pb2.Command(commandLine="ls /root;sleep 5;ls /root", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
||||||
|
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/batch_{jobid}/", stdoutRedirectPath="/root/nfs/batch_{jobid}/")
|
||||||
|
taskinfo = rpc_pb2.TaskInfo(taskid="test",username="root",vnodeid=1,parameters=paras,timeout=20,token="test")
|
||||||
|
|
||||||
|
response = stub.start_task(taskinfo)
|
||||||
|
print("Batch client received: " + str(response.status)+" "+response.message)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
#for i in range(10):
|
#for i in range(10):
|
||||||
run()
|
#run()
|
||||||
|
start_task()
|
||||||
#stop_vnode()
|
#stop_vnode()
|
||||||
#time.sleep(4)
|
#time.sleep(4)
|
||||||
#stop_task()
|
#stop_task()
|
||||||
|
|
|
@ -70,7 +70,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
|
||||||
|
|
||||||
#self.start_report()
|
#self.start_report()
|
||||||
logger.info('TaskWorker init success')
|
logger.info('TaskWorker init success')
|
||||||
|
|
||||||
def add_gpu_device(self, lxcname, gpu_need):
|
def add_gpu_device(self, lxcname, gpu_need):
|
||||||
if gpu_need < 1:
|
if gpu_need < 1:
|
||||||
return [True, ""]
|
return [True, ""]
|
||||||
|
@ -205,11 +205,30 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
|
||||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
||||||
|
|
||||||
def start_task(self, request, context):
|
def start_task(self, request, context):
|
||||||
pass
|
taskid = request.taskid
|
||||||
|
username = request.username
|
||||||
|
vnodeid = request.vnodeid
|
||||||
|
# get config from request
|
||||||
|
command = request.parameters.command.commandLine #'/root/getenv.sh' #parameter['Parameters']['Command']['CommandLine']
|
||||||
|
#envs = {'MYENV1':'MYVAL1', 'MYENV2':'MYVAL2'} #parameters['Parameters']['Command']['EnvVars']
|
||||||
|
pkgpath = request.parameters.command.packagePath
|
||||||
|
envs = request.parameters.command.envVars
|
||||||
|
envs['taskid'] = str(taskid)
|
||||||
|
envs['vnodeid'] = str(vnodeid)
|
||||||
|
timeout = request.timeout
|
||||||
|
token = request.token
|
||||||
|
outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath]
|
||||||
|
lxcname = '%s-batch-%s-%s' % (username,taskid,str(vnodeid))
|
||||||
|
|
||||||
|
thread = threading.Thread(target = self.execute_task, args=(username,taskid,vnodeid,envs,lxcname,pkgpath,command,timeout,outpath,token))
|
||||||
|
thread.setDaemon(True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
||||||
|
|
||||||
def stop_task(self, request, context):
|
def stop_task(self, request, context):
|
||||||
for msg in request.taskmsgs:
|
for msg in request.taskmsgs:
|
||||||
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token)
|
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.vnodeid),msg.token)
|
||||||
logger.info("Stop the task with lxc:"+lxcname)
|
logger.info("Stop the task with lxc:"+lxcname)
|
||||||
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
||||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
||||||
|
@ -287,6 +306,88 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
|
||||||
conffile.close()
|
conffile.close()
|
||||||
return [True, ""]
|
return [True, ""]
|
||||||
|
|
||||||
|
def write_output(self,lxcname,tmplogpath,filepath):
|
||||||
|
cmd = "lxc-attach -n " + lxcname + " -- mv %s %s"
|
||||||
|
if filepath == "" or filepath == "/root/nfs/batch_{jobid}/" or os.path.abspath("/root/nfs/"+tmplogpath) == os.path.abspath(filepath):
|
||||||
|
return [True,""]
|
||||||
|
ret = subprocess.run(cmd % ("/root/nfs/"+tmplogpath,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
|
||||||
|
if ret.returncode != 0:
|
||||||
|
msg = ret.stdout.decode(encoding="utf-8")
|
||||||
|
logger.error(msg)
|
||||||
|
return [False,msg]
|
||||||
|
logger.info("Succeed to moving nfs/%s to %s" % (tmplogpath,filepath))
|
||||||
|
return [True,""]
|
||||||
|
|
||||||
|
def execute_task(self,username,taskid,vnodeid,envs,lxcname,pkgpath,command,timeout,outpath,token):
|
||||||
|
lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/"
|
||||||
|
scriptname = "batch_job.sh"
|
||||||
|
try:
|
||||||
|
scriptfile = open(lxcfspath+"root/"+scriptname,"w")
|
||||||
|
scriptfile.write("#!/bin/bash\n")
|
||||||
|
scriptfile.write("cd "+str(pkgpath)+"\n")
|
||||||
|
scriptfile.write(command)
|
||||||
|
scriptfile.close()
|
||||||
|
except Exception as err:
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
logger.error("Fail to write script file with taskid(%s) vnodeid(%s)" % (str(taskid),str(vnodeid)))
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
job_id = taskid.split('_')[1]
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
job_id = "_none"
|
||||||
|
jobdir = "batch_" + job_id
|
||||||
|
logdir = "%s/global/users/%s/data/" % (self.fspath,username) + jobdir
|
||||||
|
if not os.path.exists(logdir):
|
||||||
|
logger.info("Directory:%s not exists, create it." % logdir)
|
||||||
|
os.mkdir(logdir)
|
||||||
|
stdoutname = str(taskid)+"_"+str(vnodeid)+"_stdout.txt"
|
||||||
|
stderrname = str(taskid)+"_"+str(vnodeid)+"_stderr.txt"
|
||||||
|
try:
|
||||||
|
stdoutfile = open(logdir+"/"+stdoutname,"w")
|
||||||
|
stderrfile = open(logdir+"/"+stderrname,"w")
|
||||||
|
logger.info("Create stdout(%s) and stderr(%s) file to log" % (stdoutname, stderrname))
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
stdoutfile = None
|
||||||
|
stderrfile = None
|
||||||
|
|
||||||
|
cmd = "lxc-attach -n " + lxcname
|
||||||
|
for envkey,envval in envs.items():
|
||||||
|
cmd = cmd + " -v %s=%s" % (envkey,envval)
|
||||||
|
cmd = cmd + " -- /bin/bash \"" + "/root/" + scriptname + "\""
|
||||||
|
logger.info('run task with command - %s' % cmd)
|
||||||
|
p = subprocess.Popen(cmd,stdout=stdoutfile,stderr=stderrfile, shell=True)
|
||||||
|
#logger.info(p)
|
||||||
|
if timeout == 0:
|
||||||
|
to = MAX_RUNNING_TIME
|
||||||
|
else:
|
||||||
|
to = timeout
|
||||||
|
while p.poll() is None and to > 0:
|
||||||
|
time.sleep(min(2,to))
|
||||||
|
to -= 2
|
||||||
|
if p.poll() is None:
|
||||||
|
p.kill()
|
||||||
|
logger.info("Running time(%d) is out. Task(%s-%s-%s) will be killed." % (timeout,str(taskid),str(vnodeid),token))
|
||||||
|
self.add_msg(taskid,username,vnodeid,rpc_pb2.TIMEOUT,token,"Running time is out.")
|
||||||
|
else:
|
||||||
|
[success1,msg1] = self.write_output(lxcname,jobdir+"/"+stdoutname,outpath[0])
|
||||||
|
[success2,msg2] = self.write_output(lxcname,jobdir+"/"+stderrname,outpath[1])
|
||||||
|
if not success1 or not success2:
|
||||||
|
if not success1:
|
||||||
|
msg = msg1
|
||||||
|
else:
|
||||||
|
msg = msg2
|
||||||
|
logger.info("Output error on Task(%s-%s-%s)." % (str(taskid),str(vnodeid),token))
|
||||||
|
self.add_msg(taskid,username,vnodeid,rpc_pb2.OUTPUTERROR,token,msg)
|
||||||
|
else:
|
||||||
|
if p.poll() == 0:
|
||||||
|
logger.info("Task(%s-%s-%s) completed." % (str(taskid),str(vnodeid),token))
|
||||||
|
self.add_msg(taskid,username,vnodeid,rpc_pb2.COMPLETED,token,"")
|
||||||
|
else:
|
||||||
|
logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(vnodeid),token))
|
||||||
|
self.add_msg(taskid,username,vnodeid,rpc_pb2.FAILED,token,"")
|
||||||
|
|
||||||
def add_msg(self,taskid,username,vnodeid,status,token,errmsg):
|
def add_msg(self,taskid,username,vnodeid,status,token,errmsg):
|
||||||
self.msgslock.acquire()
|
self.msgslock.acquire()
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue