mount oss on lxcpath & clear oss mount when restart taskworker

This commit is contained in:
Firmlyzhu 2019-04-12 17:54:24 +08:00
parent c97e6a3a67
commit 876fb83326
2 changed files with 38 additions and 9 deletions

View File

@ -42,19 +42,31 @@ class AliyunOssMounter(OssMounter):
cmd = "chmod 640 /etc/passwd-ossfs" cmd = "chmod 640 /etc/passwd-ossfs"
[success1, msg] = OssMounter.execute_cmd(cmd) [success1, msg] = OssMounter.execute_cmd(cmd)
if not success1:
logger.error("Aliyun OSS mount chmod err:%s" % msg)
return [False, msg]
mountpath = datapath+"/Aliyun/"+mount_info.remotePath mountpath = datapath+"/Aliyun/"+mount_info.remotePath
logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath)) logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath))
if not os.path.isdir(mountpath): if not os.path.isdir(mountpath):
os.makedirs(mountpath) os.makedirs(mountpath)
cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.other) cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.other)
[success, msg] = OssMounter.execute_cmd(cmd) [success, msg] = OssMounter.execute_cmd(cmd)
if not success:
logger.error("Aliyun OSS mount err:%s" % msg)
return [False, msg]
return [True,""] return [True,""]
@staticmethod @staticmethod
def umount_oss(datapath, mount_info): def umount_oss(datapath, mount_info):
mountpath = datapath + "/" + mount_info.remotePath mountpath = datapath+"/Aliyun/"+mount_info.remotePath
logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath)) logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath))
cmd = "fusermount -u %s" % (mountpath) cmd = "fusermount -u %s" % (mountpath)
[success, msg] = OssMounter.execute_cmd(cmd) [success, msg] = OssMounter.execute_cmd(cmd)
if not success:
logger.error("Aliyun OSS umount err:%s"%msg)
return [False,msg]
[success, msg] = OssMounter.execute_cmd("rm -rf %s" % mountpath) [success, msg] = OssMounter.execute_cmd("rm -rf %s" % mountpath)
if not success:
logger.error("Aliyun OSS umount err:%s"%msg)
return [False,msg]
return [True,""] return [True,""]

View File

@ -82,6 +82,16 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
def stop_and_rm_containers(self,lxcname): def stop_and_rm_containers(self,lxcname):
logger.info("Stop the container with name:"+lxcname) logger.info("Stop the container with name:"+lxcname)
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
lxcpath = "/var/lib/lxc/%s" % lxcname
try:
mount_info = []
for provider in os.listdir(lxcpath+"/oss"):
for bkname in os.listdir(lxcpath+"/oss/"+provider):
mount_info.append(rpc_pb2.Mount(provider=provider,remotePath=bkname))
self.umount_oss(lxcpath+"/oss", mount_info)
except Exception as err:
logger.info(err)
pass
return self.imgmgr.deleteFS(lxcname) return self.imgmgr.deleteFS(lxcname)
def rm_all_batch_containers(self): def rm_all_batch_containers(self):
@ -202,12 +212,13 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=msg) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=msg)
#mount oss #mount oss
rootfs = "/var/lib/lxc/%s/rootfs" % lxcname lxcpath = "/var/lib/lxc/%s" % lxcname
self.mount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list) rootfs = lxcpath + "/rootfs"
conffile = open("/var/lib/lxc/%s/config" % lxcname, 'a+') self.mount_oss(lxcpath + "/oss", mount_list)
mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s/%s %s/root/oss/%s none bind,rw,create=dir 0 0" conffile = open(lxcpath + "/config", 'a+')
mount_str = "lxc.mount.entry = "+ lxcpath +"/oss/%s/%s %s/root/oss/%s none bind,rw,create=dir 0 0"
for mount in mount_list: for mount in mount_list:
conffile.write("\n"+ mount_str % (self.fspath, username, mount.provider, mount.remotePath, rootfs, mount.remotePath)) conffile.write("\n"+ mount_str % (mount.provider, mount.remotePath, rootfs, mount.remotePath))
conffile.close() conffile.close()
logger.info("Start container %s..." % lxcname) logger.info("Start container %s..." % lxcname)
@ -215,6 +226,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
ret = subprocess.run('lxc-start -n %s'%lxcname,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) ret = subprocess.run('lxc-start -n %s'%lxcname,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
if ret.returncode != 0: if ret.returncode != 0:
logger.error('start container %s failed' % lxcname) logger.error('start container %s failed' % lxcname)
self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list)
self.imgmgr.deleteFS(lxcname) self.imgmgr.deleteFS(lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container(%s)"%lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container(%s)"%lxcname)
@ -228,6 +240,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
if not success: if not success:
logger.error("Fail to add gpu device. " + msg) logger.error("Fail to add gpu device. " + msg)
container.stop() container.stop()
self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list)
self.imgmgr.deleteFS(lxcname) self.imgmgr.deleteFS(lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg)
@ -235,6 +248,10 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
cmd = "lxc-attach -n %s -- service ssh start" % lxcname cmd = "lxc-attach -n %s -- service ssh start" % lxcname
ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) ret = subprocess.run(cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
if ret.returncode != 0: if ret.returncode != 0:
logger.error('Fail to start ssh service of container %s' % lxcname)
container.stop()
self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list)
self.imgmgr.deleteFS(lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to start ssh service. lxc(%s)"%lxcname) return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to start ssh service. lxc(%s)"%lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
@ -289,6 +306,9 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
else: else:
logger.error("stop container %s failed" % lxcname) logger.error("stop container %s failed" % lxcname)
#umount oss
self.umount_oss("/var/lib/lxc/%s/oss" % (lxcname), mount_list)
logger.info("deleting container:%s" % lxcname) logger.info("deleting container:%s" % lxcname)
if self.imgmgr.deleteFS(lxcname): if self.imgmgr.deleteFS(lxcname):
logger.info("delete container %s success" % lxcname) logger.info("delete container %s success" % lxcname)
@ -302,9 +322,6 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
#release gpu #release gpu
self.release_gpu_device(lxcname) self.release_gpu_device(lxcname)
#umount oss
self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list)
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")