diff --git a/src/protos/rpc.proto b/src/protos/rpc.proto index c5b2816..67f5d08 100644 --- a/src/protos/rpc.proto +++ b/src/protos/rpc.proto @@ -82,11 +82,12 @@ message Image { } message Mount { - string localPath = 1; // 本地路径 - string remotePath = 2; // 远程路径 - string endpoint = 3; + string provider = 1; + string localPath = 2; // 本地路径 + string remotePath = 3; // 远程路径 string accessKey = 4; string secretKey = 5; + string other = 6; } message Instance { diff --git a/src/worker/ossmounter.py b/src/worker/ossmounter.py new file mode 100644 index 0000000..0b99482 --- /dev/null +++ b/src/worker/ossmounter.py @@ -0,0 +1,60 @@ +import abc +import subprocess, os +from utils.log import logger + +class OssMounter(object): + __metaclass__ = abc.ABCMeta + + @staticmethod + def execute_cmd(cmd): + ret = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + if ret.returncode != 0: + msg = ret.stdout.decode(encoding="utf-8") + logger.error(msg) + return [False,msg] + else: + return [True,""] + + @staticmethod + @abc.abstractmethod + def mount_oss(datapath, mount_info): + # mount oss + pass + + @staticmethod + @abc.abstractmethod + def umount_oss(datapath, mount_info): + # umount oss + pass + +class aliyunOssMounter(OssMounter): + + @staticmethod + def mount_oss(datapath, mount_info): + # mount oss + try: + pwdfile = open("/etc/passwd-ossfs","w") + pwdfile.write(mount_info.remotePath+":"+mount_info.accessKey+":"+mount_info.secretKey+"\n") + pwdfile.close() + except Exception as err: + logger.error(traceback.format_exc()) + return [False,msg] + + cmd = "chmod 640 /etc/passwd-ossfs" + [success1, msg] = OssMounter.execute_cmd(cmd) + mountpath = datapath+"/"+mount_info.remotePath + logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath)) + if not os.path.isdir(mountpath): + os.makedirs(mountpath) + cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.endpoint) + [success, msg] = OssMounter.execute_cmd(cmd) + return [True,""] + + @staticmethod + def umount_oss(datapath, mount_info): + mountpath = datapath + "/" + mount_info.remotePath + logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath)) + cmd = "fusermount -u %s" % (mountpath) + [success, msg] = self.execute_cmd(cmd) + [success, msg] = self.execute_cmd("rm -rf %s" % mountpath) + return [True,""] diff --git a/src/worker/taskcontroller.py b/src/worker/taskcontroller.py index 6a7cdd9..4350581 100755 --- a/src/worker/taskcontroller.py +++ b/src/worker/taskcontroller.py @@ -16,6 +16,7 @@ import grpc #from utils import env import json,lxc,subprocess,threading,os,time,traceback from utils import imagemgr,etcdlib +from worker import ossmounter from protos import rpc_pb2, rpc_pb2_grpc _ONE_DAY_IN_SECONDS = 60 * 60 * 24 @@ -101,50 +102,42 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info(str(self.free_ips)) self.lock.release() - @staticmethod - def execute_cmd(cmd): - ret = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) - if ret.returncode != 0: - msg = ret.stdout.decode(encoding="utf-8") - logger.error(msg) - return [False,msg] - else: - return [True,""] - #mount_oss def mount_oss(self, datapath, mount_info): self.mount_lock.acquire() try: - pwdfile = open("/etc/passwd-ossfs","w") for mount in mount_info: - pwdfile.write(mount.remotePath+":"+mount.accessKey+":"+mount.secretKey+"\n") - pwdfile.close() + provider = mount.provider + mounter = getattr(ossmounter,provider,None) + if mounter is None: + self.mount_lock.release() + return [False, provider + " doesn't exist!"] + [success, msg] = mounter.mount_oss(datapath,mount) + if not success: + self.mount_lock.release() + return [False, msg] except Exception as err: - logger.error(traceback.format_exc()) - msg = "Fail to write passwd-ossfs %s" % (lxcname) - logger.error(msg) self.mount_lock.release() + logger.error(traceback.format_exc()) return [False,msg] - cmd = "chmod 640 /etc/passwd-ossfs" - [success1, msg] = self.execute_cmd(cmd) - for mount in mount_info: - mountpath = datapath+"/"+mount.remotePath - logger.info("Mount oss %s %s" % (mount.remotePath, mountpath)) - if not os.path.isdir(mountpath): - os.makedirs(mountpath) - cmd = "ossfs %s %s -ourl=%s" % (mount.remotePath, mountpath, mount.endpoint) - [success, msg] = self.execute_cmd(cmd) self.mount_lock.release() + return [True,""] #umount oss def umount_oss(self, datapath, mount_info): - for mount in mount_info: - mountpath = datapath + "/" + mount.remotePath - logger.info("UMount oss %s %s" % (mount.remotePath, mountpath)) - cmd = "fusermount -u %s" % (mountpath) - [success, msg] = self.execute_cmd(cmd) - [success, msg] = self.execute_cmd("rm -rf %s" % mountpath) + try: + for mount in mount_info: + provider = mount.provider + mounter = getattr(ossmounter,provider,None) + if mounter is None: + return [False, provider + " doesn't exist!"] + [success, msg] = mounter.umount_oss(datapath,mount) + if not success: + return [False, msg] + except Exception as err: + logger.error(traceback.format_exc()) + return [False,msg] def process_task(self, request, context): logger.info('excute task with parameter: ' + str(request))