Add ossmounter
This commit is contained in:
parent
aaafa7be1a
commit
e06464758d
|
@ -82,11 +82,12 @@ message Image {
|
||||||
}
|
}
|
||||||
|
|
||||||
message Mount {
|
message Mount {
|
||||||
string localPath = 1; // 本地路径
|
string provider = 1;
|
||||||
string remotePath = 2; // 远程路径
|
string localPath = 2; // 本地路径
|
||||||
string endpoint = 3;
|
string remotePath = 3; // 远程路径
|
||||||
string accessKey = 4;
|
string accessKey = 4;
|
||||||
string secretKey = 5;
|
string secretKey = 5;
|
||||||
|
string other = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
message Instance {
|
message Instance {
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
import abc
|
||||||
|
import subprocess, os
|
||||||
|
from utils.log import logger
|
||||||
|
|
||||||
|
class OssMounter(object):
|
||||||
|
__metaclass__ = abc.ABCMeta
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def execute_cmd(cmd):
|
||||||
|
ret = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
||||||
|
if ret.returncode != 0:
|
||||||
|
msg = ret.stdout.decode(encoding="utf-8")
|
||||||
|
logger.error(msg)
|
||||||
|
return [False,msg]
|
||||||
|
else:
|
||||||
|
return [True,""]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@abc.abstractmethod
|
||||||
|
def mount_oss(datapath, mount_info):
|
||||||
|
# mount oss
|
||||||
|
pass
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
@abc.abstractmethod
|
||||||
|
def umount_oss(datapath, mount_info):
|
||||||
|
# umount oss
|
||||||
|
pass
|
||||||
|
|
||||||
|
class aliyunOssMounter(OssMounter):
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def mount_oss(datapath, mount_info):
|
||||||
|
# mount oss
|
||||||
|
try:
|
||||||
|
pwdfile = open("/etc/passwd-ossfs","w")
|
||||||
|
pwdfile.write(mount_info.remotePath+":"+mount_info.accessKey+":"+mount_info.secretKey+"\n")
|
||||||
|
pwdfile.close()
|
||||||
|
except Exception as err:
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
return [False,msg]
|
||||||
|
|
||||||
|
cmd = "chmod 640 /etc/passwd-ossfs"
|
||||||
|
[success1, msg] = OssMounter.execute_cmd(cmd)
|
||||||
|
mountpath = datapath+"/"+mount_info.remotePath
|
||||||
|
logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath))
|
||||||
|
if not os.path.isdir(mountpath):
|
||||||
|
os.makedirs(mountpath)
|
||||||
|
cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.endpoint)
|
||||||
|
[success, msg] = OssMounter.execute_cmd(cmd)
|
||||||
|
return [True,""]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def umount_oss(datapath, mount_info):
|
||||||
|
mountpath = datapath + "/" + mount_info.remotePath
|
||||||
|
logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath))
|
||||||
|
cmd = "fusermount -u %s" % (mountpath)
|
||||||
|
[success, msg] = self.execute_cmd(cmd)
|
||||||
|
[success, msg] = self.execute_cmd("rm -rf %s" % mountpath)
|
||||||
|
return [True,""]
|
|
@ -16,6 +16,7 @@ import grpc
|
||||||
#from utils import env
|
#from utils import env
|
||||||
import json,lxc,subprocess,threading,os,time,traceback
|
import json,lxc,subprocess,threading,os,time,traceback
|
||||||
from utils import imagemgr,etcdlib
|
from utils import imagemgr,etcdlib
|
||||||
|
from worker import ossmounter
|
||||||
from protos import rpc_pb2, rpc_pb2_grpc
|
from protos import rpc_pb2, rpc_pb2_grpc
|
||||||
|
|
||||||
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
|
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
|
||||||
|
@ -101,50 +102,42 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
||||||
logger.info(str(self.free_ips))
|
logger.info(str(self.free_ips))
|
||||||
self.lock.release()
|
self.lock.release()
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def execute_cmd(cmd):
|
|
||||||
ret = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
|
|
||||||
if ret.returncode != 0:
|
|
||||||
msg = ret.stdout.decode(encoding="utf-8")
|
|
||||||
logger.error(msg)
|
|
||||||
return [False,msg]
|
|
||||||
else:
|
|
||||||
return [True,""]
|
|
||||||
|
|
||||||
#mount_oss
|
#mount_oss
|
||||||
def mount_oss(self, datapath, mount_info):
|
def mount_oss(self, datapath, mount_info):
|
||||||
self.mount_lock.acquire()
|
self.mount_lock.acquire()
|
||||||
try:
|
try:
|
||||||
pwdfile = open("/etc/passwd-ossfs","w")
|
|
||||||
for mount in mount_info:
|
for mount in mount_info:
|
||||||
pwdfile.write(mount.remotePath+":"+mount.accessKey+":"+mount.secretKey+"\n")
|
provider = mount.provider
|
||||||
pwdfile.close()
|
mounter = getattr(ossmounter,provider,None)
|
||||||
except Exception as err:
|
if mounter is None:
|
||||||
logger.error(traceback.format_exc())
|
self.mount_lock.release()
|
||||||
msg = "Fail to write passwd-ossfs %s" % (lxcname)
|
return [False, provider + " doesn't exist!"]
|
||||||
logger.error(msg)
|
[success, msg] = mounter.mount_oss(datapath,mount)
|
||||||
|
if not success:
|
||||||
self.mount_lock.release()
|
self.mount_lock.release()
|
||||||
return [False, msg]
|
return [False, msg]
|
||||||
|
except Exception as err:
|
||||||
cmd = "chmod 640 /etc/passwd-ossfs"
|
|
||||||
[success1, msg] = self.execute_cmd(cmd)
|
|
||||||
for mount in mount_info:
|
|
||||||
mountpath = datapath+"/"+mount.remotePath
|
|
||||||
logger.info("Mount oss %s %s" % (mount.remotePath, mountpath))
|
|
||||||
if not os.path.isdir(mountpath):
|
|
||||||
os.makedirs(mountpath)
|
|
||||||
cmd = "ossfs %s %s -ourl=%s" % (mount.remotePath, mountpath, mount.endpoint)
|
|
||||||
[success, msg] = self.execute_cmd(cmd)
|
|
||||||
self.mount_lock.release()
|
self.mount_lock.release()
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
return [False,msg]
|
||||||
|
|
||||||
|
self.mount_lock.release()
|
||||||
|
return [True,""]
|
||||||
|
|
||||||
#umount oss
|
#umount oss
|
||||||
def umount_oss(self, datapath, mount_info):
|
def umount_oss(self, datapath, mount_info):
|
||||||
|
try:
|
||||||
for mount in mount_info:
|
for mount in mount_info:
|
||||||
mountpath = datapath + "/" + mount.remotePath
|
provider = mount.provider
|
||||||
logger.info("UMount oss %s %s" % (mount.remotePath, mountpath))
|
mounter = getattr(ossmounter,provider,None)
|
||||||
cmd = "fusermount -u %s" % (mountpath)
|
if mounter is None:
|
||||||
[success, msg] = self.execute_cmd(cmd)
|
return [False, provider + " doesn't exist!"]
|
||||||
[success, msg] = self.execute_cmd("rm -rf %s" % mountpath)
|
[success, msg] = mounter.umount_oss(datapath,mount)
|
||||||
|
if not success:
|
||||||
|
return [False, msg]
|
||||||
|
except Exception as err:
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
return [False,msg]
|
||||||
|
|
||||||
def process_task(self, request, context):
|
def process_task(self, request, context):
|
||||||
logger.info('excute task with parameter: ' + str(request))
|
logger.info('excute task with parameter: ' + str(request))
|
||||||
|
|
Loading…
Reference in New Issue