diff --git a/src/httprest.py b/src/httprest.py index 82d6d5e..a2bef00 100755 --- a/src/httprest.py +++ b/src/httprest.py @@ -21,6 +21,7 @@ from log import logger import os import http.server, cgi, json, sys, shutil +import xmlrpc.client from socketserver import ThreadingMixIn import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr import userManager,beansapplicationmgr @@ -69,13 +70,11 @@ def worker_ip_required(func): @wraps(func) def wrapper(*args, **kwargs): global G_nodemgr - workers = G_nodemgr.get_rpcs() + workers = G_nodemgr.get_nodeips() ip = request.remote_addr flag = False for worker in workers: - workerip = G_nodemgr.rpc_to_ip(worker) - #logger.info(str(ip) + " " + str(workerip)) - if ip == '127.0.0.1' or ip == '0.0.0.0' or ip == workerip: + if ip == '127.0.0.1' or ip == '0.0.0.0' or ip == worker: flag = True break if not flag: diff --git a/src/imagemgr.py b/src/imagemgr.py index c0e1a29..6b20ec5 100755 --- a/src/imagemgr.py +++ b/src/imagemgr.py @@ -19,6 +19,7 @@ design: from configparser import ConfigParser from io import StringIO import os,sys,subprocess,time,re,datetime,threading,random +import xmlrpc.client from log import logger import env @@ -311,10 +312,11 @@ class ImageMgr(): logger.info("only root can update base image") #vclustermgr.stop_allclusters() #vclustermgr.detach_allclusters() - workers = vclustermgr.nodemgr.get_rpcs() + workers = vclustermgr.nodemgr.get_nodeips() logger.info("update base image in all workers") for worker in workers: - worker.update_basefs(image) + workerrpc = xmlprc.client.ServerProxy("http://%s:%s" % (worker, env.getenv("WORKER_PORT"))) + workerrpc.update_basefs(image) logger.info("update base image success") #vclustermgr.mount_allclusters() #logger.info("mount all cluster success") diff --git a/src/monitor.py b/src/monitor.py index 7560c91..bbf5dac 100755 --- a/src/monitor.py +++ b/src/monitor.py @@ -20,6 +20,7 @@ Design:Monitor mainly consists of three parts: Collectors, Master_Collector and import subprocess,re,os,etcdlib,psutil,math,sys import time,threading,json,traceback,platform import env +import xmlrpc.client from datetime import datetime from model import db,VNode,History @@ -544,12 +545,13 @@ class Master_Collector(threading.Thread): while not self.thread_stop: for worker in monitor_hosts.keys(): monitor_hosts[worker]['running'] = False - workers = self.nodemgr.get_rpcs() + workers = self.nodemgr.get_nodeips() for worker in workers: try: - ip = self.nodemgr.rpc_to_ip(worker) + ip = worker + workerrpc = xmlrpc.client.ServerProxy("http://%s:%s" % (worker, env.getenv("WORKER_PORT"))) # fetch data - info = list(eval(worker.workerFetchInfo(self.master_ip))) + info = list(eval(workerrpc.workerFetchInfo(self.master_ip))) #logger.info(info[0]) # store data in monitor_hosts and monitor_vnodes monitor_hosts[ip] = info[0] diff --git a/src/nodemgr.py b/src/nodemgr.py index 487fe4e..13240b1 100755 --- a/src/nodemgr.py +++ b/src/nodemgr.py @@ -46,10 +46,6 @@ class NodeMgr(object): logger.error("docklet-br not found") sys.exit(1) - # init rpc list - self.rpcs = [] - self.maprpcs = {} - # get allnodes self.allnodes = self._nodelist_etcd("allnodes") self.runnodes = [] @@ -59,8 +55,6 @@ class NodeMgr(object): if node['value'] == 'ok': logger.info ("running node %s" % nodeip) self.runnodes.append(nodeip) - self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s" % (nodeip, self.workerport))) - logger.info ("add %s:%s in rpc client list" % (nodeip, self.workerport)) logger.info ("all nodes are: %s" % self.allnodes) logger.info ("run nodes are: %s" % self.runnodes) @@ -131,11 +125,6 @@ class NodeMgr(object): self.etcd.setkey("machines/allnodes/"+nodeip, "ok") logger.debug ("all nodes are: %s" % self.allnodes) logger.debug ("run nodes are: %s" % self.runnodes) - rpccl = xmlrpc.client.ServerProxy("http://%s:%s" % (nodeip, self.workerport)) - self.rpcs.append(rpccl) - self.maprpcs[nodeip] = rpccl - logger.info ("add %s:%s in rpc client list" % - (nodeip, self.workerport)) elif node['value'] == 'ok': etcd_runip.append(nodeip) for nodeip in self.runnodes: @@ -145,30 +134,12 @@ class NodeMgr(object): #print(self.runnodes) #print(etcd_runip) #print(self.rpcs) - self.rpcs.remove(self.maprpcs[nodeip]) self.runnodes = etcd_runip # get all run nodes' IP addr def get_nodeips(self): - return self.allnodes + return self.runnodes - def get_rpcs(self): - return self.rpcs - - def get_onerpc(self): - return self.rpcs[random.randint(0, len(self.rpcs)-1)] - - def rpc_to_ip(self, rpcclient): - try: - return self.runnodes[self.rpcs.index(rpcclient)] - except: - return None - - def ip_to_rpc(self, nodeip): - try: - return self.rpcs[self.runnodes.index(nodeip)] - except: - return None def get_allnodes(self): return self.allnodes diff --git a/src/vclustermgr.py b/src/vclustermgr.py index 6e97fa8..4502d6e 100755 --- a/src/vclustermgr.py +++ b/src/vclustermgr.py @@ -2,6 +2,7 @@ import os, random, json, sys, imagemgr import datetime +import xmlrpc.client from log import logger import env @@ -84,7 +85,7 @@ class VclusterMgr(object): return [False, "cluster:%s already exists" % clustername] clustersize = int(self.defaultsize) logger.info ("starting cluster %s with %d containers for %s" % (clustername, int(clustersize), username)) - workers = self.nodemgr.get_rpcs() + workers = self.nodemgr.get_nodeips() image_json = json.dumps(image) groupname = json.loads(user_info)["data"]["group"] if (len(workers) == 0): @@ -108,17 +109,18 @@ class VclusterMgr(object): hosts = "127.0.0.1\tlocalhost\n" containers = [] for i in range(0, clustersize): - onework = workers[random.randint(0, len(workers)-1)] + workerip = workers[random.randint(0, len(workers)-1)] + oneworker = xmlrpc.client.ServerProxy("http://%s:%s" % (workerip, env.getenv("WORKER_PORT"))) lxc_name = username + "-" + str(clusterid) + "-" + str(i) hostname = "host-"+str(i) logger.info ("create container with : name-%s, username-%s, clustername-%s, clusterid-%s, hostname-%s, ip-%s, gateway-%s, image-%s" % (lxc_name, username, clustername, str(clusterid), hostname, ips[i], gateway, image_json)) - [success,message] = onework.create_container(lxc_name, username, json.dumps(setting) , clustername, str(clusterid), str(i), hostname, ips[i], gateway, str(vlanid), image_json) + [success,message] = oneworker.create_container(lxc_name, username, json.dumps(setting) , clustername, str(clusterid), str(i), hostname, ips[i], gateway, str(vlanid), image_json) if success is False: logger.info("container create failed, so vcluster create failed") return [False, message] logger.info("container create success") hosts = hosts + ips[i].split("/")[0] + "\t" + hostname + "\t" + hostname + "."+clustername + "\n" - containers.append({ 'containername':lxc_name, 'hostname':hostname, 'ip':ips[i], 'host':self.nodemgr.rpc_to_ip(onework), 'image':image['name'], 'lastsave':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'setting': setting }) + containers.append({ 'containername':lxc_name, 'hostname':hostname, 'ip':ips[i], 'host':workerip, 'image':image['name'], 'lastsave':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'setting': setting }) hostfile = open(hostpath, 'w') hostfile.write(hosts) hostfile.close() @@ -132,7 +134,7 @@ class VclusterMgr(object): def scale_out_cluster(self,clustername,username,image,user_info, setting): if not self.is_cluster(clustername,username): return [False, "cluster:%s not found" % clustername] - workers = self.nodemgr.get_rpcs() + workers = self.nodemgr.get_nodeips() if (len(workers) == 0): logger.warning("no workers to start containers, scale out failed") return [False, "no workers are running"] @@ -149,23 +151,24 @@ class VclusterMgr(object): clusterpath = self.fspath + "/global/users/" + username + "/clusters/" + clustername hostpath = self.fspath + "/global/users/" + username + "/hosts/" + str(clusterid) + ".hosts" cid = clusterinfo['nextcid'] - onework = workers[random.randint(0, len(workers)-1)] + workerip = workers[random.randint(0, len(workers)-1)] + oneworker = xmlrpc.client.ServerProxy("http://%s:%s" % (workerip, env.getenv("WORKER_PORT"))) lxc_name = username + "-" + str(clusterid) + "-" + str(cid) hostname = "host-" + str(cid) - [success, message] = onework.create_container(lxc_name, username, json.dumps(setting), clustername, clusterid, str(cid), hostname, ip, gateway, str(vlanid), image_json) + [success, message] = oneworker.create_container(lxc_name, username, json.dumps(setting), clustername, clusterid, str(cid), hostname, ip, gateway, str(vlanid), image_json) if success is False: logger.info("create container failed, so scale out failed") return [False, message] if clusterinfo['status'] == "running": - onework.start_container(lxc_name) - onework.start_services(lxc_name, ["ssh"]) # TODO: need fix + oneworker.start_container(lxc_name) + oneworker.start_services(lxc_name, ["ssh"]) # TODO: need fix logger.info("scale out success") hostfile = open(hostpath, 'a') hostfile.write(ip.split("/")[0] + "\t" + hostname + "\t" + hostname + "." + clustername + "\n") hostfile.close() clusterinfo['nextcid'] = int(clusterinfo['nextcid']) + 1 clusterinfo['size'] = int(clusterinfo['size']) + 1 - clusterinfo['containers'].append({'containername':lxc_name, 'hostname':hostname, 'ip':ip, 'host':self.nodemgr.rpc_to_ip(onework), 'image':image['name'], 'lastsave':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'setting': setting}) + clusterinfo['containers'].append({'containername':lxc_name, 'hostname':hostname, 'ip':ip, 'host':workerip, 'image':image['name'], 'lastsave':datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), 'setting': setting}) clusterfile = open(clusterpath, 'w') clusterfile.write(json.dumps(clusterinfo)) clusterfile.close() @@ -204,8 +207,8 @@ class VclusterMgr(object): for container in containers: if container['containername'] == containername: logger.info("container: %s found" % containername) - onework = self.nodemgr.ip_to_rpc(container['host']) - onework.create_image(username,imagetmp,containername) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) + worker.create_image(username,imagetmp,containername) fimage = container['image'] logger.info("image: %s created" % imagetmp) break @@ -214,10 +217,10 @@ class VclusterMgr(object): for container in containers: if container['containername'] != containername: logger.info("container: %s now flush" % container['containername']) - onework = self.nodemgr.ip_to_rpc(container['host']) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) #t = threading.Thread(target=onework.flush_container,args=(username,imagetmp,container['containername'])) #threads.append(t) - onework.flush_container(username,imagetmp,container['containername']) + worker.flush_container(username,imagetmp,container['containername']) container['lastsave'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") container['image'] = fimage logger.info("thread for container: %s has been prepared" % container['containername']) @@ -247,10 +250,10 @@ class VclusterMgr(object): for container in containers: if container['containername'] == containername: logger.info("container: %s found" % containername) - onework = self.nodemgr.ip_to_rpc(container['host']) - if onework is None: + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) + if worker is None: return [False, "The worker can't be found or has been stopped."] - res = onework.create_image(username,imagename,containername,description,imagenum) + res = worker.create_image(username,imagename,containername,description,imagenum) container['lastsave'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") container['image'] = imagename break @@ -271,7 +274,7 @@ class VclusterMgr(object): return [False, "cluster is still running, you need to stop it and then delete"] ips = [] for container in info['containers']: - worker = self.nodemgr.ip_to_rpc(container['host']) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) if worker is None: return [False, "The worker can't be found or has been stopped."] worker.delete_container(container['containername']) @@ -297,7 +300,7 @@ class VclusterMgr(object): new_containers = [] for container in info['containers']: if container['containername'] == containername: - worker = self.nodemgr.ip_to_rpc(container['host']) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) if worker is None: return [False, "The worker can't be found or has been stopped."] worker.delete_container(containername) @@ -348,7 +351,7 @@ class VclusterMgr(object): except: return [False, "start cluster failed with setting proxy failed"] for container in info['containers']: - worker = self.nodemgr.ip_to_rpc(container['host']) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) if worker is None: return [False, "The worker can't be found or has been stopped."] worker.start_container(container['containername']) @@ -365,7 +368,7 @@ class VclusterMgr(object): if not status: return [False, "cluster not found"] for container in info['containers']: - worker = self.nodemgr.ip_to_rpc(container['host']) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) if worker is None: return [False, "The worker can't be found or has been stopped."] worker.mount_container(container['containername']) @@ -387,7 +390,7 @@ class VclusterMgr(object): return [False, "start cluster failed with setting proxy failed"] # recover containers of this cluster for container in info['containers']: - worker = self.nodemgr.ip_to_rpc(container['host']) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) if worker is None: return [False, "The worker can't be found or has been stopped."] worker.recover_container(container['containername']) @@ -402,7 +405,7 @@ class VclusterMgr(object): if info['status'] == 'stopped': return [False, 'cluster is already stopped'] for container in info['containers']: - worker = self.nodemgr.ip_to_rpc(container['host']) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) if worker is None: return [False, "The worker can't be found or has been stopped."] worker.stop_container(container['containername']) @@ -420,7 +423,7 @@ class VclusterMgr(object): if info['status'] == 'running': return [False, 'cluster is running, please stop it first'] for container in info['containers']: - worker = self.nodemgr.ip_to_rpc(container['host']) + worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT"))) if worker is None: return [False, "The worker can't be found or has been stopped."] worker.detach_container(container['containername']) diff --git a/src/worker.py b/src/worker.py index ff69b9a..e7f7b63 100755 --- a/src/worker.py +++ b/src/worker.py @@ -177,12 +177,12 @@ class Worker(object): def sendheartbeat(self): while(True): # check send heartbeat package every 1s - time.sleep(1) + time.sleep(20) [status, value] = self.etcd.getkey("machines/runnodes/"+self.addr) if status: # master has know the worker so we start send heartbeat package if value=='ok': - self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 2) + self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) else: logger.error("get key %s failed, master crashed or initialized. restart worker please." % self.addr) sys.exit(1) diff --git a/web/templates/addCluster.html b/web/templates/addCluster.html index 743a377..65ce6aa 100644 --- a/web/templates/addCluster.html +++ b/web/templates/addCluster.html @@ -24,22 +24,6 @@