diff --git a/conf/docklet.conf.template b/conf/docklet.conf.template index d9f688b..64757d2 100644 --- a/conf/docklet.conf.template +++ b/conf/docklet.conf.template @@ -141,9 +141,15 @@ # DATA_QUOTA_CMD="gluster volume quota docklet-volume limit-usage %s %s" # DISTRIBUTED_GATEWAY : whether the users' gateways are distributed or not +# Must be set by same value on master and workers. # True or False, default: False # DISTRIBUTED_GATEWAY=False +# PUBLIC_IP : publick ip of this machine. If DISTRIBUTED_GATEWAY is True, +# users' gateways can be setup on this machine. Users can visit this machine +# by the public ip. default: IP of NETWORK_DEVICE. +# PUBLIC_IP="" + # NGINX_CONF: the config path of nginx, default: /etc/nginx # NGINX_CONF="/etc/nginx" diff --git a/src/env.py b/src/env.py index a9dbaea..f2e8293 100755 --- a/src/env.py +++ b/src/env.py @@ -1,4 +1,4 @@ -import os +import os,netifaces def getenv(key): if key == "CLUSTER_NAME": @@ -56,6 +56,13 @@ def getenv(key): return os.environ.get("DATA_QUOTA_CMD", "gluster volume quota docklet-volume limit-usage %s %s") elif key == 'DISTRIBUTED_GATEWAY': return os.environ.get("DISTRIBUTED_GATEWAY", "False") + elif key == "PUBLIC_IP": + device = os.environ.get("NETWORK_DEVICE","eth0") + addr = netifaces.ifaddresses(device) + if 2 in addr: + return os.environ.get("PUBLIC_IP",addr[2][0]['addr']) + else: + return os.environ.get("PUBLIC_IP"."0.0.0.0") elif key == "NGINX_CONF": return os.environ.get("NGINX_CONF","/etc/nginx") elif key =="USER_IP": diff --git a/src/httprest.py b/src/httprest.py index 78555c1..40925ee 100755 --- a/src/httprest.py +++ b/src/httprest.py @@ -696,6 +696,10 @@ if __name__ == '__main__': if len(sys.argv) > 1 and sys.argv[1] == "new": mode = 'new' + # get public IP and set public Ip in etcd + public_IP = env.getenv("PUBLIC_IP") + etcdclient.setkey("machines/publicIP/"+ipaddr, public_IP) + # do some initialization for mode: new/recovery if mode == 'new': # clean and initialize the etcd table diff --git a/src/vclustermgr.py b/src/vclustermgr.py index 7c633c7..7b26c93 100755 --- a/src/vclustermgr.py +++ b/src/vclustermgr.py @@ -124,6 +124,7 @@ class VclusterMgr(object): hostpath = self.fspath+"/global/users/"+username+"/hosts/"+str(clusterid)+".hosts" hosts = "127.0.0.1\tlocalhost\n" proxy_server_ip = "" + proxy_public_ip = "" containers = [] for i in range(0, clustersize): workerip = workers[random.randint(0, len(workers)-1)] @@ -135,10 +136,14 @@ class VclusterMgr(object): if i == 0: self.networkmgr.load_usrgw(username) proxy_server_ip = self.networkmgr.usrgws[username] + [status, proxy_public_ip] = self.etcd.getkey("machines/publicIP/"+proxy_server_ip) + if not status: + logger.error("Fail to get proxy_public_ip %s."%(proxy_server_ip)) + return [False, "Fail to get proxy server public IP."] lxc_name = username + "-" + str(clusterid) + "-" + str(i) hostname = "host-"+str(i) logger.info ("create container with : name-%s, username-%s, clustername-%s, clusterid-%s, hostname-%s, ip-%s, gateway-%s, image-%s" % (lxc_name, username, clustername, str(clusterid), hostname, ips[i], gateway, image_json)) - [success,message] = oneworker.create_container(lxc_name, proxy_server_ip, username, uid, json.dumps(setting) , clustername, str(clusterid), str(i), hostname, ips[i], gateway, image_json) + [success,message] = oneworker.create_container(lxc_name, proxy_public_ip, username, uid, json.dumps(setting) , clustername, str(clusterid), str(i), hostname, ips[i], gateway, image_json) if success is False: logger.info("container create failed, so vcluster create failed") return [False, message] @@ -149,8 +154,11 @@ class VclusterMgr(object): hostfile.write(hosts) hostfile.close() clusterfile = open(clusterpath, 'w') - proxy_url = env.getenv("PORTAL_URL") +"/"+ proxy_server_ip +"/_web/" + username + "/" + clustername - info = {'clusterid':clusterid, 'status':'stopped', 'size':clustersize, 'containers':containers, 'nextcid': clustersize, 'create_time':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'start_time':"------" , 'proxy_url':proxy_url, 'proxy_server_ip':proxy_server_ip} + proxy_url = env.getenv("PORTAL_URL") +"/"+ proxy_public_ip +"/_web/" + username + "/" + clustername + info = {'clusterid':clusterid, 'status':'stopped', 'size':clustersize, 'containers':containers, 'nextcid': clustersize, 'create_time':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'start_time':"------"} + info['proxy_url'] = proxy_url + info['proxy_server_ip'] = proxy_server_ip + info['proxy_public_ip'] = proxy_public_ip clusterfile.write(json.dumps(info)) clusterfile.close() return [True, info] @@ -180,8 +188,9 @@ class VclusterMgr(object): lxc_name = username + "-" + str(clusterid) + "-" + str(cid) hostname = "host-" + str(cid) proxy_server_ip = clusterinfo['proxy_server_ip'] + proxy_public_ip = clusterinfo['proxy_public_ip'] uid = json.loads(user_info)["data"]["id"] - [success, message] = oneworker.create_container(lxc_name, proxy_server_ip, username, uid, json.dumps(setting), clustername, clusterid, str(cid), hostname, ip, gateway, image_json) + [success, message] = oneworker.create_container(lxc_name, proxy_public_ip, username, uid, json.dumps(setting), clustername, clusterid, str(cid), hostname, ip, gateway, image_json) if success is False: logger.info("create container failed, so scale out failed") return [False, message] @@ -212,9 +221,9 @@ class VclusterMgr(object): clusterinfo['proxy_ip'] = ip + ":" + port if self.distributedgw == 'True': worker = self.nodemgr.ip_to_rpc(clusterinfo['proxy_server_ip']) - worker.set_route("/"+ clusterinfo['proxy_server_ip'] + "/_web/" + username + "/" + clustername, target) + worker.set_route("/"+ clusterinfo['proxy_public_ip'] + "/_web/" + username + "/" + clustername, target) else: - proxytool.set_route("/" + clusterinfo['proxy_server_ip'] + "/_web/" + username + "/" + clustername, target) + proxytool.set_route("/" + clusterinfo['proxy_public_ip'] + "/_web/" + username + "/" + clustername, target) clusterfile = open(self.fspath + "/global/users/" + username + "/clusters/" + clustername, 'w') clusterfile.write(json.dumps(clusterinfo)) clusterfile.close() @@ -227,9 +236,9 @@ class VclusterMgr(object): clusterinfo.pop('proxy_ip') if self.distributedgw == 'True': worker = self.nodemgr.ip_to_rpc(clusterinfo['proxy_server_ip']) - worker.delete_route("/" + clusterinfo['proxy_server_ip'] + "/_web/" + username + "/" + clustername) + worker.delete_route("/" + clusterinfo['proxy_public_ip'] + "/_web/" + username + "/" + clustername) else: - proxytool.delete_route("/" + clusterinfo['proxy_server_ip'] + "/_web/" + username + "/" + clustername) + proxytool.delete_route("/" + clusterinfo['proxy_public_ip'] + "/_web/" + username + "/" + clustername) clusterfile = open(self.fspath + "/global/users/" + username + "/clusters/" + clustername, 'w') clusterfile.write(json.dumps(clusterinfo)) clusterfile.close() @@ -399,6 +408,17 @@ class VclusterMgr(object): disk += int(container['setting']['disk']) return [True, {'cpu':cpu, 'memory':memory, 'disk':disk}] + def update_cluster_baseurl(self, clustername, username, oldip, newip): + [status, info] = self.get_clusterinfo(clustername, username) + if not status: + return [False, "cluster not found"] + logger.info("%s %s:base_url need to be modified(%s %s)."%(username,clustername,oldip,newip)) + for container in info['containers']: + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) + if worker is None: + return [False, "The worker can't be found or has been stopped."] + worker.update_baseurl(container['containername'],oldip,newip) + worker.stop_container(container['containername']) def start_cluster(self, clustername, username, uid): [status, info] = self.get_clusterinfo(clustername, username) @@ -413,27 +433,30 @@ class VclusterMgr(object): # set proxy if not "proxy_server_ip" in info.keys(): info['proxy_server_ip'] = self.addr - self.write_clusterinfo(info,clustername,username) try: target = 'http://'+info['containers'][0]['ip'].split('/')[0]+":10000" if self.distributedgw == 'True': worker = self.nodemgr.ip_to_rpc(info['proxy_server_ip']) - worker.set_route("/" + info['proxy_server_ip'] + '/go/'+username+'/'+clustername, target) + worker.set_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername, target) else: if not info['proxy_server_ip'] == self.addr: logger.info("%s %s proxy_server_ip has been changed, base_url need to be modified."%(username,clustername)) - for container in info['containers']: - worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) - if worker is None: - return [False, "The worker can't be found or has been stopped."] - worker.update_baseurl(container['containername'],info['proxy_server_ip'],self.addr) - info['proxy_server_ip'] = self.addr - proxy_url = env.getenv("PORTAL_URL") +"/"+ self.addr +"/_web/" + username + "/" + clustername - info['proxy_url'] = proxy_url - self.write_clusterinfo(info,clustername,username) - proxytool.set_route("/" + info['proxy_server_ip'] + '/go/'+username+'/'+clustername, target) + oldpublicIP= info['proxy_public_ip'] + self.update_proxy_ipAndurl(clustername,username,self.addr) + [status, info] = self.get_clusterinfo(clustername, username) + self.update_cluster_baseurl(clustername,username,oldpublicIP,info['proxy_public_ip']) + proxytool.set_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername, target) except: return [False, "start cluster failed with setting proxy failed"] + # check public ip + [status, proxy_public_ip] = self.etcd.getkey("machines/publicIP/"+info['proxy_server_ip']) + if not info['proxy_public_ip'] == proxy_public_ip: + logger.info("%s %s proxy_public_ip has been changed, base_url need to be modified."%(username,clustername)) + oldpublicIP= info['proxy_public_ip'] + self.update_proxy_ipAndurl(clustername,username,info['proxy_server_ip']) + [status, info] = self.get_clusterinfo(clustername, username) + self.update_cluster_baseurl(clustername,username,oldpublicIP,info['proxy_public_ip']) + # start containers for container in info['containers']: # set up gre from user's gateway host to container's host. self.networkmgr.check_usergre(username, uid, container['host'], self.nodemgr, self.distributedgw=='True') @@ -465,35 +488,42 @@ class VclusterMgr(object): [status, info] = self.get_clusterinfo(clustername, username) if not status: return [False, "cluster not found"] - # need to check and recover gateway of this user - self.networkmgr.check_usergw(username, uid, self.nodemgr,self.distributedgw=='True') - # recover proxy of cluster if not "proxy_server_ip" in info.keys(): info['proxy_server_ip'] = self.addr self.write_clusterinfo(info,clustername,username) + [status, info] = self.get_clusterinfo(clustername, username) + if not "proxy_public_ip" in info.keys(): + self.update_proxy_ipAndurl(clustername,username,info['proxy_server_ip']) + [status, info] = self.get_clusterinfo(clustername, username) + self.update_cluster_baseurl(clustername,username,info['proxy_server_ip'],info['proxy_public_ip']) if info['status'] == 'stopped': return [True, "cluster no need to start"] + # need to check and recover gateway of this user + self.networkmgr.check_usergw(username, uid, self.nodemgr,self.distributedgw=='True') + # recover proxy of cluster try: target = 'http://'+info['containers'][0]['ip'].split('/')[0]+":10000" if self.distributedgw == 'True': worker = self.nodemgr.ip_to_rpc(info['proxy_server_ip']) - worker.set_route("/" + info['proxy_server_ip'] + '/go/'+username+'/'+clustername, target) + worker.set_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername, target) else: if not info['proxy_server_ip'] == self.addr: logger.info("%s %s proxy_server_ip has been changed, base_url need to be modified."%(username,clustername)) - for container in info['containers']: - worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) - if worker is None: - return [False, "The worker can't be found or has been stopped."] - worker.update_baseurl(container['containername'],info['proxy_server_ip'],self.addr) - worker.stop_container(container['containername']) - info['proxy_server_ip'] = self.addr - proxy_url = env.getenv("PORTAL_URL") +"/"+ self.addr +"/_web/" + username + "/" + clustername - info['proxy_url'] = proxy_url - self.write_clusterinfo(info,clustername,username) - proxytool.set_route("/" + info['proxy_server_ip'] + '/go/'+username+'/'+clustername, target) + oldpublicIP= info['proxy_public_ip'] + self.update_proxy_ipANdurl(clustername,username,self.addr) + [status, info] = self.get_clusterinfo(clustername, username) + self.update_cluster_baseurl(clustername,username,oldpublicIP,info['proxy_public_ip']) + proxytool.set_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername, target) except: return [False, "start cluster failed with setting proxy failed"] + # check public ip + [status, proxy_public_ip] = self.etcd.getkey("machines/publicIP/"+info['proxy_server_ip']) + if not info['proxy_public_ip'] == proxy_public_ip: + logger.info("%s %s proxy_public_ip has been changed, base_url need to be modified."%(username,clustername)) + oldpublicIP= info['proxy_public_ip'] + self.update_proxy_ipAndurl(clustername,username,info['proxy_server_ip']) + [status, info] = self.get_clusterinfo(clustername, username) + self.update_cluster_baseurl(clustername,username,oldpublicIP,info['proxy_public_ip']) # recover containers of this cluster for container in info['containers']: # set up gre from user's gateway host to container's host. @@ -516,9 +546,9 @@ class VclusterMgr(object): return [False, 'cluster is already stopped'] if self.distributedgw == 'True': worker = self.nodemgr.ip_to_rpc(info['proxy_server_ip']) - worker.delete_route("/" + info['proxy_server_ip'] + '/go/'+username+'/'+clustername) + worker.delete_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername) else: - proxytool.delete_route("/" + info['proxy_server_ip'] + '/go/'+username+'/'+clustername) + proxytool.delete_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername) for container in info['containers']: worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) if worker is None: @@ -577,6 +607,21 @@ class VclusterMgr(object): logger.error ("internal error: cluster:%s info file has no clusterid " % clustername) return -1 + def update_proxy_ipAndurl(self, clustername, username, proxy_server_ip): + [status, info] = self.get_clusterinfo(clustername, username) + if not status: + return [False, "cluster not found"] + info['proxy_server_ip'] = proxy_server_ip + [status, proxy_public_ip] = self.etcd.getkey("machines/publicIP/"+proxy_server_ip) + if not status: + logger.error("Fail to get proxy_public_ip %s."%(proxy_server_ip)) + proxy_public_ip = proxy_server_ip + info['proxy_public_ip'] = proxy_public_ip + proxy_url = env.getenv("PORTAL_URL") +"/"+ proxy_public_ip +"/_web/" + username + "/" + clustername + info['proxy_url'] = proxy_url + self.write_clusterinfo(info,clustername,username) + return proxy_public_ip + def get_clusterinfo(self, clustername, username): clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername if not os.path.isfile(clusterpath): diff --git a/src/worker.py b/src/worker.py index 29cef83..f0776eb 100755 --- a/src/worker.py +++ b/src/worker.py @@ -180,6 +180,8 @@ class Worker(object): logger.info("Monitor Collector has been started.") # worker change it state itself. Independedntly from master. self.etcd.setkey("machines/runnodes/"+self.addr, "work") + publicIP = env.getenv("PUBLIC_IP") + self.etcd.setkey("machines/publicIP/"+self.addr,publicIP) self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat) self.thread_sendheartbeat.start() # start serving for rpc