From 4d81254f04a36444b8a3156ef670a3c828722b7b Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Mon, 24 Sep 2018 16:39:08 +0800 Subject: [PATCH] Enable that Mount aliyun oss when using batch job computing. --- src/master/testTaskCtrler.py | 6 ++-- src/protos/rpc.proto | 3 ++ src/protos/rpc_pb2.py | 41 +++++++++++++++++------ src/worker/taskcontroller.py | 64 ++++++++++++++++++++++++++++++++---- 4 files changed, 95 insertions(+), 19 deletions(-) diff --git a/src/master/testTaskCtrler.py b/src/master/testTaskCtrler.py index 797e94a..75c5ccf 100644 --- a/src/master/testTaskCtrler.py +++ b/src/master/testTaskCtrler.py @@ -10,12 +10,12 @@ def run(): channel = grpc.insecure_channel('localhost:50051') stub = rpc_pb2_grpc.WorkerStub(channel) - comm = rpc_pb2.Command(commandLine="echo \"s\" | awk '{print \"test4\\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' - paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="/root/nfs/") + comm = rpc_pb2.Command(commandLine="echo \"stestsfdsf\\ntewtgsdgfdsgret\newarsafsda\" > /root/nfs/test-for-docklet/test.txt;ls /root/nfs/test-for-docklet", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' + paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="/root/nfs/test-for-docklet") img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet") inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0) - mnt = rpc_pb2.Mount(localPath="",remotePath="") + mnt = rpc_pb2.Mount(localPath="",remotePath="test-for-docklet",endpoint="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="") clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt]) task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=5,token="test") diff --git a/src/protos/rpc.proto b/src/protos/rpc.proto index a3b05a0..c5b2816 100644 --- a/src/protos/rpc.proto +++ b/src/protos/rpc.proto @@ -84,6 +84,9 @@ message Image { message Mount { string localPath = 1; // 本地路径 string remotePath = 2; // 远程路径 + string endpoint = 3; + string accessKey = 4; + string secretKey = 5; } message Instance { diff --git a/src/protos/rpc_pb2.py b/src/protos/rpc_pb2.py index e9b6a51..aaef67e 100644 --- a/src/protos/rpc_pb2.py +++ b/src/protos/rpc_pb2.py @@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='rpc.proto', package='', syntax='proto3', - serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"m\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x04 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x05 \x01(\t\"\xc6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\".\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32-\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x62\x06proto3') + serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"m\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x12\n\ninstanceid\x18\x02 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x03 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x04 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x05 \x01(\t\"\xc6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"f\n\x05Mount\x12\x11\n\tlocalPath\x18\x01 \x01(\t\x12\x12\n\nremotePath\x18\x02 \x01(\t\x12\x10\n\x08\x65ndpoint\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32-\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x62\x06proto3') ) _STATUS = _descriptor.EnumDescriptor( @@ -56,8 +56,8 @@ _STATUS = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1029, - serialized_end=1120, + serialized_start=1085, + serialized_end=1176, ) _sym_db.RegisterEnumDescriptor(_STATUS) @@ -574,6 +574,27 @@ _MOUNT = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='endpoint', full_name='Mount.endpoint', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='accessKey', full_name='Mount.accessKey', index=3, + number=4, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='secretKey', full_name='Mount.secretKey', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -587,7 +608,7 @@ _MOUNT = _descriptor.Descriptor( oneofs=[ ], serialized_start=913, - serialized_end=959, + serialized_end=1015, ) @@ -638,8 +659,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=961, - serialized_end=1027, + serialized_start=1017, + serialized_end=1083, ) _REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS @@ -757,8 +778,8 @@ _MASTER = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=0, options=None, - serialized_start=1122, - serialized_end=1162, + serialized_start=1178, + serialized_end=1218, methods=[ _descriptor.MethodDescriptor( name='report', @@ -781,8 +802,8 @@ _WORKER = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=1, options=None, - serialized_start=1164, - serialized_end=1209, + serialized_start=1220, + serialized_end=1265, methods=[ _descriptor.MethodDescriptor( name='process_task', diff --git a/src/worker/taskcontroller.py b/src/worker/taskcontroller.py index 61ca8a3..6a7cdd9 100755 --- a/src/worker/taskcontroller.py +++ b/src/worker/taskcontroller.py @@ -66,6 +66,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): self.report_interval = 2 self.lock = threading.Lock() + self.mount_lock = threading.Lock() self.cons_gateway = env.getenv('BATCH_GATEWAY') self.cons_ips = env.getenv('BATCH_NET') logger.info("Batch gateway ip address %s" % self.cons_gateway) @@ -100,6 +101,51 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info(str(self.free_ips)) self.lock.release() + @staticmethod + def execute_cmd(cmd): + ret = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + if ret.returncode != 0: + msg = ret.stdout.decode(encoding="utf-8") + logger.error(msg) + return [False,msg] + else: + return [True,""] + + #mount_oss + def mount_oss(self, datapath, mount_info): + self.mount_lock.acquire() + try: + pwdfile = open("/etc/passwd-ossfs","w") + for mount in mount_info: + pwdfile.write(mount.remotePath+":"+mount.accessKey+":"+mount.secretKey+"\n") + pwdfile.close() + except Exception as err: + logger.error(traceback.format_exc()) + msg = "Fail to write passwd-ossfs %s" % (lxcname) + logger.error(msg) + self.mount_lock.release() + return [False,msg] + + cmd = "chmod 640 /etc/passwd-ossfs" + [success1, msg] = self.execute_cmd(cmd) + for mount in mount_info: + mountpath = datapath+"/"+mount.remotePath + logger.info("Mount oss %s %s" % (mount.remotePath, mountpath)) + if not os.path.isdir(mountpath): + os.makedirs(mountpath) + cmd = "ossfs %s %s -ourl=%s" % (mount.remotePath, mountpath, mount.endpoint) + [success, msg] = self.execute_cmd(cmd) + self.mount_lock.release() + + #umount oss + def umount_oss(self, datapath, mount_info): + for mount in mount_info: + mountpath = datapath + "/" + mount.remotePath + logger.info("UMount oss %s %s" % (mount.remotePath, mountpath)) + cmd = "fusermount -u %s" % (mountpath) + [success, msg] = self.execute_cmd(cmd) + [success, msg] = self.execute_cmd("rm -rf %s" % mountpath) + def process_task(self, request, context): logger.info('excute task with parameter: ' + str(request)) taskid = request.id @@ -125,6 +171,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): token = request.token lxcname = '%s-batch-%s-%s-%s' % (username,taskid,str(instanceid),token) instance_type = request.cluster.instance + mount_list = request.cluster.mount outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath] for i in range(len(outpath)): if outpath[i] == "": @@ -173,6 +220,12 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): conffile = open("/var/lib/lxc/%s/config" % lxcname, 'w') conffile.write(conftext) + + #mount oss + self.mount_oss("%s/global/users/%s/data" % (self.fspath,username), mount_list) + mount_str = "lxc.mount.entry = %s/global/users/%s/data/%s %s/root/nfs/%s none bind,rw,create=dir 0 0" + for mount in mount_list: + conffile.write("\n"+ mount_str % (self.fspath, username, mount.remotePath, rootfs, mount.remotePath)) conffile.close() container = lxc.Container(lxcname) @@ -183,9 +236,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): else: logger.info('start container %s success' % lxcname) - #mount oss here - - thread = threading.Thread(target = self.excute_task, args=(taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token)) + thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list)) thread.setDaemon(True) thread.start() @@ -221,7 +272,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info("Succeed to moving nfs/%s to %s" % (tmpfilename,filepath)) return [True,""] - def excute_task(self,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token): + def execute_task(self,username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_info): lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/" scriptname = "batch_job.sh" try: @@ -279,8 +330,6 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(instanceid),token)) self.add_msg(taskid,instanceid,rpc_pb2.FAILED,token,"") - #umount oss here - container = lxc.Container(lxcname) if container.stop(): logger.info("stop container %s success" % lxcname) @@ -296,6 +345,9 @@ class TaskController(rpc_pb2_grpc.WorkerServicer): logger.info("release ip address %s" % ip) self.release_ip(ip) + #umount oss + self.umount_oss("%s/global/users/%s/data" % (self.fspath,username), mount_info) + def add_msg(self,taskid,instanceid,status,token,errmsg): self.msgslock.acquire() try: