Fix bugs & add log
This commit is contained in:
parent
2dd0f42e4f
commit
acbabad214
|
@ -10,7 +10,7 @@ def run():
|
||||||
channel = grpc.insecure_channel('localhost:50051')
|
channel = grpc.insecure_channel('localhost:50051')
|
||||||
stub = rpc_pb2_grpc.WorkerStub(channel)
|
stub = rpc_pb2_grpc.WorkerStub(channel)
|
||||||
|
|
||||||
comm = rpc_pb2.Command(commandLine="echo \"stestsfdsf\\ntewtgsdgfdsgret\newarsafsda\" > /root/test.txt;ls /root;sleep 100", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
comm = rpc_pb2.Command(commandLine="echo \"stestsfdsf\\ntewtgsdgfdsgret\newarsafsda\" > /root/test.txt;ls /root;sleep 2", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
||||||
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="")
|
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="")
|
||||||
|
|
||||||
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
|
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
|
||||||
|
@ -35,5 +35,5 @@ def stop_task():
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
run()
|
run()
|
||||||
time.sleep(2)
|
#time.sleep(4)
|
||||||
stop_task()
|
#stop_task()
|
||||||
|
|
|
@ -45,6 +45,8 @@ def nvidia_smi():
|
||||||
return ret.stdout.decode('utf-8').split('\n')
|
return ret.stdout.decode('utf-8').split('\n')
|
||||||
except subprocess.CalledProcessError:
|
except subprocess.CalledProcessError:
|
||||||
return None
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def get_gpu_driver_version():
|
def get_gpu_driver_version():
|
||||||
|
|
|
@ -109,7 +109,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
|
||||||
def add_gpu_device(self, lxcname, gpu_need):
|
def add_gpu_device(self, lxcname, gpu_need):
|
||||||
if gpu_need < 0:
|
if gpu_need < 1:
|
||||||
return [True, ""]
|
return [True, ""]
|
||||||
self.gpu_lock.acquire()
|
self.gpu_lock.acquire()
|
||||||
use_gpus = []
|
use_gpus = []
|
||||||
|
@ -122,11 +122,11 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
for gpuid in use_gpus:
|
for gpuid in use_gpus:
|
||||||
self.gpu_status[gpuid] = lxcname
|
self.gpu_status[gpuid] = lxcname
|
||||||
try:
|
try:
|
||||||
gputools.add_device_node(lxcname, "/dev/nvidiactl")
|
gputools.add_device(lxcname, "/dev/nvidiactl")
|
||||||
gputools.add_device_node(lxcname, "/dev/nvidia-uvm")
|
gputools.add_device(lxcname, "/dev/nvidia-uvm")
|
||||||
for gpuid in use_gpus:
|
for gpuid in use_gpus:
|
||||||
gputools.add_device_node(lxcname,"/dev/nvidia"+str(gpuid))
|
gputools.add_device(lxcname,"/dev/nvidia"+str(gpuid))
|
||||||
logger.info("Add gpu:"+str(gpuid) +" to lxc:"str(lxcname))
|
logger.info("Add gpu:"+str(gpuid) +" to lxc:"+str(lxcname))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
for gpuid in use_gpus:
|
for gpuid in use_gpus:
|
||||||
|
@ -275,11 +275,11 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
#add GPU
|
#add GPU
|
||||||
[success, msg] = self.add_gpu_device(lxcname,gpu_need)
|
[success, msg] = self.add_gpu_device(lxcname,gpu_need)
|
||||||
if not success:
|
if not success:
|
||||||
logger.error("Fail to add gpu device.")
|
logger.error("Fail to add gpu device. " + msg)
|
||||||
container.stop()
|
container.stop()
|
||||||
self.release_ip(ip)
|
self.release_ip(ip)
|
||||||
self.imgmgr.deleteFS(lxcname)
|
self.imgmgr.deleteFS(lxcname)
|
||||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device.")
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg)
|
||||||
|
|
||||||
thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list))
|
thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list))
|
||||||
thread.setDaemon(True)
|
thread.setDaemon(True)
|
||||||
|
@ -304,6 +304,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
if ret.returncode != 0:
|
if ret.returncode != 0:
|
||||||
msg = "Fail to move output_tmp.txt to nfs/%s" % tmpfilename
|
msg = "Fail to move output_tmp.txt to nfs/%s" % tmpfilename
|
||||||
logger.error(msg)
|
logger.error(msg)
|
||||||
|
logger.error(ret.stdout)
|
||||||
return [False,msg]
|
return [False,msg]
|
||||||
logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename)
|
logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename)
|
||||||
|
|
||||||
|
@ -397,6 +398,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
def stop_tasks(self, request, context):
|
def stop_tasks(self, request, context):
|
||||||
for msg in request.taskmsgs:
|
for msg in request.taskmsgs:
|
||||||
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token)
|
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token)
|
||||||
|
logger.info("Stop the task with lxc:"+lxcname)
|
||||||
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
||||||
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue