diff --git a/conf/container.batch.conf b/conf/container.batch.conf index 439172d..7d8dd18 100644 --- a/conf/container.batch.conf +++ b/conf/container.batch.conf @@ -45,7 +45,7 @@ lxc.cgroup.cpu.cfs_quota_us = %CONTAINER_CPU% lxc.cap.drop = sys_admin net_admin mac_admin mac_override sys_time sys_module lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/data %ROOTFS%/root/nfs none bind,rw,create=dir 0 0 -#lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/%CLUSTERID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0 +#lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/batch-%TASKID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0 lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/ssh %ROOTFS%/root/.ssh none bind,ro,create=dir 0 0 lxc.mount.entry = %FS_PREFIX%/local/temp/%LXCNAME%/ %ROOTFS%/tmp none bind,rw,create=dir 0 0 diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 7c20361..d0f6399 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -1,7 +1,7 @@ import threading import time import string -import random, copy +import random, copy, subprocess import json from functools import wraps @@ -48,10 +48,30 @@ class Task(): return self.priority < other.priority def gen_ips_from_base(self,base_ip): + if self.task_base_ip == None: + return self.ips = [] for i in range(task.max_size): self.ips.append(int_to_ip(base_ip + self.task_base_ip + i + 2)) + def gen_hosts(self): + username = self.taskinfo.username + taskid = self.taskinfo.taskid + logger.info("Generate hosts for user(%s) task(%s) base_ip(%s)"%(username,taskid,str(self.task_base_ip))) + fspath = env.getenv('FS_PREFIX') + if not os.path.isdir("%s/global/users/%s" % (fspath,username)): + path = env.getenv('DOCKLET_LIB') + subprocess.call([path+"/master/userinit.sh", username]) + logger.info("user %s directory not found, create it" % username) + + hosts_file = open("%s/global/users/%s/%s.hosts" % (fspath,username,"batch-"+taskid),"w") + hosts_file.write("127.0.0.1 localhost\n") + i = 0 + for ip in self.ips: + hosts_file.write(ip+" batch-"+str(i)+"\n") + i += 1 + hosts_file.close() + def get_one_resources_need(self): return self.vnodeinfo.vnode.instance @@ -225,11 +245,12 @@ class TaskMgr(threading.Thread): self.acquire_task_net(task) task.gen_ips_from_base(self.base_ip) + task.gen_hosts() #need to create hosts [success, gwip] = self.setup_tasknet(task,[w[1] for w in vnodes_workers]) if not success: + self.release_task_ips(task) return [False, gwip] - token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) placed_workers = [] @@ -237,6 +258,7 @@ class TaskMgr(threading.Thread): for vid, worker in vnodes_workers: vnodeinfo = copy.copy(task.vnodeinfo) vnodeinfo.vnodeid = vid + vnodeinfo.vnode.hostname = "batch-"+str(vid%task.max_size) vnode = task.subtask_list[vid] vnode['status'] = RUNNING vnode['try_count'] += 1 @@ -253,7 +275,7 @@ class TaskMgr(threading.Thread): ipaddr = task.ips[vid%task.max_size] brname = "docklet-batch-%s-%s"%(username, taskid) networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.masterip, brname=brname) - vnode.network = networkinfo + vnodeinfo.vnode.network = networkinfo try: self.logger.info('[task_processor] starting vnode for task [%s] instance [%d]' % (task.vnodeinfo.id, vid)) diff --git a/src/protos/rpc_pb2.py b/src/protos/rpc_pb2.py index 739d621..d96c035 100644 --- a/src/protos/rpc_pb2.py +++ b/src/protos/rpc_pb2.py @@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( name='rpc.proto', package='', syntax='proto3', - serialized_pb=_b('\n\trpc.proto\"U\n\tVNodeInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x15\n\x05vnode\x18\x04 \x01(\x0b\x32\x06.VNode\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"{\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1e\n\rsubTaskStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"~\n\x08TaskInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1f\n\nparameters\x18\x04 \x01(\x0b\x32\x0b.Parameters\x12\x0f\n\x07timeout\x18\x05 \x01(\x05\x12\r\n\x05token\x18\x06 \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"m\n\x05VNode\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\x12\x19\n\x07network\x18\x04 \x01(\x0b\x32\x08.Network\"L\n\x07Network\x12\x0e\n\x06ipaddr\x18\x01 \x01(\t\x12\x0f\n\x07gateway\x18\x02 \x01(\t\x12\x10\n\x08masterip\x18\x03 \x01(\t\x12\x0e\n\x06\x62rname\x18\x04 \x01(\t\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32\x96\x01\n\x06Worker\x12#\n\x0bstart_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x12!\n\nstart_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12 \n\tstop_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12\"\n\nstop_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x62\x06proto3') + serialized_pb=_b('\n\trpc.proto\"U\n\tVNodeInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x15\n\x05vnode\x18\x04 \x01(\x0b\x32\x06.VNode\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"{\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1e\n\rsubTaskStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"~\n\x08TaskInfo\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x0f\n\x07vnodeid\x18\x03 \x01(\x05\x12\x1f\n\nparameters\x18\x04 \x01(\x0b\x32\x0b.Parameters\x12\x0f\n\x07timeout\x18\x05 \x01(\x05\x12\r\n\x05token\x18\x06 \x01(\t\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x7f\n\x05VNode\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\x12\x19\n\x07network\x18\x04 \x01(\x0b\x32\x08.Network\x12\x10\n\x08hostname\x18\x05 \x01(\t\"L\n\x07Network\x12\x0e\n\x06ipaddr\x18\x01 \x01(\t\x12\x0f\n\x07gateway\x18\x02 \x01(\t\x12\x10\n\x08masterip\x18\x03 \x01(\t\x12\x0e\n\x06\x62rname\x18\x04 \x01(\t\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32\x96\x01\n\x06Worker\x12#\n\x0bstart_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x12!\n\nstart_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12 \n\tstop_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12\"\n\nstop_vnode\x12\n.VNodeInfo\x1a\x06.Reply\"\x00\x62\x06proto3') ) _STATUS = _descriptor.EnumDescriptor( @@ -56,8 +56,8 @@ _STATUS = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=1231, - serialized_end=1322, + serialized_start=1249, + serialized_end=1340, ) _sym_db.RegisterEnumDescriptor(_STATUS) @@ -113,8 +113,8 @@ _IMAGE_IMAGETYPE = _descriptor.EnumDescriptor( ], containing_type=None, options=None, - serialized_start=996, - serialized_end=1042, + serialized_start=1014, + serialized_end=1060, ) _sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE) @@ -535,6 +535,13 @@ _VNODE = _descriptor.Descriptor( message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='hostname', full_name='VNode.hostname', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), ], extensions=[ ], @@ -548,7 +555,7 @@ _VNODE = _descriptor.Descriptor( oneofs=[ ], serialized_start=737, - serialized_end=846, + serialized_end=864, ) @@ -599,8 +606,8 @@ _NETWORK = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=848, - serialized_end=924, + serialized_start=866, + serialized_end=942, ) @@ -645,8 +652,8 @@ _IMAGE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=926, - serialized_end=1042, + serialized_start=944, + serialized_end=1060, ) @@ -711,8 +718,8 @@ _MOUNT = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1044, - serialized_end=1161, + serialized_start=1062, + serialized_end=1179, ) @@ -763,8 +770,8 @@ _INSTANCE = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=1163, - serialized_end=1229, + serialized_start=1181, + serialized_end=1247, ) _VNODEINFO.fields_by_name['vnode'].message_type = _VNODE @@ -899,8 +906,8 @@ _MASTER = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=0, options=None, - serialized_start=1324, - serialized_end=1364, + serialized_start=1342, + serialized_end=1382, methods=[ _descriptor.MethodDescriptor( name='report', @@ -923,8 +930,8 @@ _WORKER = _descriptor.ServiceDescriptor( file=DESCRIPTOR, index=1, options=None, - serialized_start=1367, - serialized_end=1517, + serialized_start=1385, + serialized_end=1535, methods=[ _descriptor.MethodDescriptor( name='start_vnode', diff --git a/src/worker/taskworker.py b/src/worker/taskworker.py index 96abef6..3bbdbb4 100755 --- a/src/worker/taskworker.py +++ b/src/worker/taskworker.py @@ -178,9 +178,10 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): gateway = request.vnode.network.gateway brname = request.vnode.network.brname masterip = request.vnode.network.masterip + hostname = request.vnode.hostname #create container - [success, msg] = self.create_container(taskid, vnodeid, username, image, lxcname, instance_type, ipaddr, gateway, brname) + [success, msg] = self.create_container(taskid, vnodeid, username, image, lxcname, instance_type, ipaddr, gateway, brname, hostname) if not success: return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=msg) @@ -285,7 +286,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): #accquire ip and create a container - def create_container(self,taskid,vnodeid,username,image,lxcname,quota,ipaddr,gateway,brname): + def create_container(self,taskid,vnodeid,username,image,lxcname,quota,ipaddr,gateway,brname,hostname): # prepare image and filesystem status = self.imgmgr.prepareFS(username,image,lxcname,str(quota.disk)) if not status: @@ -302,7 +303,8 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): def config_prepare(content): content = content.replace("%ROOTFS%",rootfs) - content = content.replace("%HOSTNAME%","batch-%s" % str(vnodeid)) + content = content.replace("%HOSTNAME%",hostname) + content = content.replace("%TASKID%",taskid) content = content.replace("%CONTAINER_MEMORY%",str(quota.memory)) content = content.replace("%CONTAINER_CPU%",str(quota.cpu*100000)) content = content.replace("%FS_PREFIX%",self.fspath)