Modified recovery mode

This commit is contained in:
zhongyehong 2018-04-22 14:03:41 +08:00
parent 9dc3f346ac
commit 39fcf8a26f
6 changed files with 65 additions and 37 deletions

View File

@ -308,7 +308,7 @@ class ImageMgr():
workers = vclustermgr.nodemgr.get_nodeips()
logger.info("update base image in all workers")
for worker in workers:
workerrpc = xmlrpc.client.ServerProxy("http://%s:%s" % (worker, env.getenv("WORKER_PORT")))
workerrpc = vclustermgr.nodemgr.ip_to_rpc(worker)
workerrpc.update_basefs(image)
logger.info("update base image success")
#vclustermgr.mount_allclusters()

View File

@ -693,7 +693,7 @@ class Master_Collector(threading.Thread):
for worker in workers:
try:
ip = worker
workerrpc = xmlrpc.client.ServerProxy("http://%s:%s" % (worker, env.getenv("WORKER_PORT")))
workerrpc = self.nodemgr.ip_to_rpc(worker)
# fetch data
info = list(eval(workerrpc.workerFetchInfo(self.master_ip)))
#logger.info(info[0])

View File

@ -553,9 +553,10 @@ class NetworkMgr(object):
self.usrgws[username] = self.masterip
self.dump_usrgw(username)
netcontrol.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr(), input_rate_limit, output_rate_limit)
logger.info("recover gw success")
else:
worker = nodemgr.ip_to_rpc(ip)
worker.check_gw('docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr(), input_rate_limit, output_rate_limit)
nodemgr.call_rpc_function(worker,'check_gw',['docklet-br-'+str(uid), username, uid, self.users[username].get_gateway_cidr(), input_rate_limit, output_rate_limit])
del self.users[username]
return [True, 'check gw ok']
@ -571,7 +572,7 @@ class NetworkMgr(object):
else:
if not remote == ip:
worker = nodemgr.ip_to_rpc(ip)
worker.add_port_gre_withkey('docklet-br-'+str(uid), 'gre-'+str(uid)+'-'+remote, remote, uid)
nodemgr.call_rpc_function(worker,'add_port_gre_withkey',['docklet-br-'+str(uid), 'gre-'+str(uid)+'-'+remote, remote, uid])
return [True, 'check gre ok']
def has_user(self, username):

View File

@ -23,6 +23,7 @@ class NodeMgr(object):
self.etcd = etcdclient
self.mode = mode
self.workerport = env.getenv('WORKER_PORT')
self.tasks = {}
# delete the existing network
logger.info ("delete the existing network")
@ -62,7 +63,7 @@ class NodeMgr(object):
self.thread_watchnewnode.start()
# wait for all nodes joins
# while(True):
for i in range(60):
for i in range(10):
allin = True
for node in self.allnodes:
if node not in self.runnodes:
@ -121,6 +122,11 @@ class NodeMgr(object):
if nodeip not in self.allnodes:
self.allnodes.append(nodeip)
self.etcd.setkey("machines/allnodes/"+nodeip, "ok")
else:
if nodeip in self.tasks:
recover_task = threading.Thread(target = self.recover_node, args=(nodeip,self.tasks[nodeip]))
recover_task.start()
del self.tasks[nodeip]
logger.debug ("all nodes are: %s" % self.allnodes)
logger.debug ("run nodes are: %s" % self.runnodes)
elif node['value'] == 'ok':
@ -135,6 +141,15 @@ class NodeMgr(object):
#print(self.rpcs)
self.runnodes = etcd_runip
def recover_node(self,ip,tasks):
logger.info("now recover for worker:%s" % ip)
worker = self.ip_to_rpc(ip)
for task in tasks:
taskname = task['taskname']
taskargs = task['args']
logger.info("recover task:%s in worker:%s" % (taskname, ip))
eval('worker.'+taskname)(*taskargs)
# get all run nodes' IP addr
def get_nodeips(self):
return self.runnodes
@ -144,4 +159,17 @@ class NodeMgr(object):
return self.allnodes
def ip_to_rpc(self,ip):
return xmlrpc.client.ServerProxy("http://%s:%s" % (ip, env.getenv("WORKER_PORT")))
if ip in self.runnodes:
return xmlrpc.client.ServerProxy("http://%s:%s" % (ip, env.getenv("WORKER_PORT")))
else:
logger.info('Worker %s is not connected, create rpc client failed, push task into queue')
if not ip in self.tasks:
self.tasks[ip] = []
return self.tasks[ip]
def call_rpc_function(self, worker, function, args):
if type(worker) is list:
worker.append({'taskname':function,'args':args})
return [True, 'append task success']
else:
return eval('worker.'+function)(*args)

View File

@ -2,7 +2,6 @@
import os, random, json, sys, imagemgr
import datetime, math
import xmlrpc.client
from log import logger
import env
@ -146,7 +145,7 @@ class VclusterMgr(object):
containers = []
for i in range(0, clustersize):
workerip = workers[random.randint(0, len(workers)-1)]
oneworker = xmlrpc.client.ServerProxy("http://%s:%s" % (workerip, env.getenv("WORKER_PORT")))
oneworker = self.nodemgr.ip_to_rpc(workerip)
if self.distributedgw == "True" and i == 0 and not self.networkmgr.has_usrgw(username):
[success,message] = self.networkmgr.setup_usrgw(groupquota['input_rate_limit'], groupquota['output_rate_limit'], username, uid, self.nodemgr, workerip)
if not success:
@ -204,7 +203,7 @@ class VclusterMgr(object):
hostpath = self.fspath + "/global/users/" + username + "/hosts/" + str(clusterid) + ".hosts"
cid = clusterinfo['nextcid']
workerip = workers[random.randint(0, len(workers)-1)]
oneworker = xmlrpc.client.ServerProxy("http://%s:%s" % (workerip, env.getenv("WORKER_PORT")))
oneworker = self.nodemgr.ip_to_rpc(workerip)
lxc_name = username + "-" + str(clusterid) + "-" + str(cid)
hostname = "host-" + str(cid)
proxy_server_ip = clusterinfo['proxy_server_ip']
@ -367,7 +366,7 @@ class VclusterMgr(object):
for container in containers:
if container['containername'] == containername:
logger.info("container: %s found" % containername)
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
worker.create_image(username,imagetmp,containername)
fimage = container['image']
logger.info("image: %s created" % imagetmp)
@ -377,7 +376,7 @@ class VclusterMgr(object):
for container in containers:
if container['containername'] != containername:
logger.info("container: %s now flush" % container['containername'])
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
#t = threading.Thread(target=onework.flush_container,args=(username,imagetmp,container['containername']))
#threads.append(t)
worker.flush_container(username,imagetmp,container['containername'])
@ -410,7 +409,7 @@ class VclusterMgr(object):
for container in containers:
if container['containername'] == containername:
logger.info("container: %s found" % containername)
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
if worker is None:
return [False, "The worker can't be found or has been stopped."]
res = worker.create_image(username,imagename,containername,description,imagenum)
@ -434,7 +433,7 @@ class VclusterMgr(object):
return [False, "cluster is still running, you need to stop it and then delete"]
ips = []
for container in info['containers']:
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.delete_container(container['containername'])
@ -462,7 +461,7 @@ class VclusterMgr(object):
new_containers = []
for container in info['containers']:
if container['containername'] == containername:
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.delete_container(containername)
@ -531,11 +530,11 @@ class VclusterMgr(object):
return [False, "cluster not found"]
logger.info("%s %s:base_url need to be modified(%s %s)."%(username,clustername,oldip,newip))
for container in info['containers']:
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.update_baseurl(container['containername'],oldip,newip)
worker.stop_container(container['containername'])
worker = self.nodemgr.ip_to_rpc(container['host'])
#if worker is None:
# return [False, "The worker can't be found or has been stopped."]
self.nodemgr.call_rpc_function(worker,'update_baseurl',[container['containername'],oldip,newip])
self.nodemgr.call_rpc_function(worker,'stop_container',[container['containername']])
def check_public_ip(self, clustername, username):
[status, info] = self.get_clusterinfo(clustername, username)
@ -591,7 +590,7 @@ class VclusterMgr(object):
for container in info['containers']:
# set up gre from user's gateway host to container's host.
self.networkmgr.check_usergre(username, uid, container['host'], self.nodemgr, self.distributedgw=='True')
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.start_container(container['containername'])
@ -609,7 +608,7 @@ class VclusterMgr(object):
if not status:
return [False, "cluster not found"]
for container in info['containers']:
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.mount_container(container['containername'])
@ -640,7 +639,7 @@ class VclusterMgr(object):
# check public ip
if not self.check_public_ip(clustername,username):
[status, info] = self.get_clusterinfo(clustername, username)
worker.set_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername, target)
self.nodemgr.call_rpc_function(worker,'set_route',["/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername, target])
else:
if not info['proxy_server_ip'] == self.addr:
logger.info("%s %s proxy_server_ip has been changed, base_url need to be modified."%(username,clustername))
@ -660,13 +659,13 @@ class VclusterMgr(object):
for container in info['containers']:
# set up gre from user's gateway host to container's host.
self.networkmgr.check_usergre(username, uid, container['host'], self.nodemgr, self.distributedgw=='True')
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.recover_container(container['containername'])
self.nodemgr.call_rpc_function(worker,'recover_container',[container['containername']])
namesplit = container['containername'].split('-')
portname = namesplit[1] + '-' + namesplit[2]
worker.recover_usernet(portname, uid, info['proxy_server_ip'], container['host']==info['proxy_server_ip'])
self.nodemgr.call_rpc_function(worker,'recover_usernet',[portname, uid, info['proxy_server_ip'], container['host']==info['proxy_server_ip']])
# recover ports mapping
[success, msg] = self.recover_port_mapping(username,clustername)
if not success:
@ -687,7 +686,7 @@ class VclusterMgr(object):
proxytool.delete_route("/" + info['proxy_public_ip'] + '/go/'+username+'/'+clustername)
for container in info['containers']:
self.delete_all_port_mapping(username,clustername,container['containername'])
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.stop_container(container['containername'])
@ -706,7 +705,7 @@ class VclusterMgr(object):
if info['status'] == 'running':
return [False, 'cluster is running, please stop it first']
for container in info['containers']:
worker = xmlrpc.client.ServerProxy("http://%s:%s" % (container['host'], env.getenv("WORKER_PORT")))
worker = self.nodemgr.ip_to_rpc(container['host'])
if worker is None:
return [False, "The worker can't be found or has been stopped."]
worker.detach_container(container['containername'])

View File

@ -147,16 +147,16 @@ class Worker(object):
self.hosts_collector = monitor.Collector()
# delete the existing network
[success, bridges] = ovscontrol.list_bridges()
if success:
for bridge in bridges:
if bridge.startswith("docklet-br"):
ovscontrol.del_bridge(bridge)
else:
logger.error(bridges)
[success, message] = ovscontrol.destroy_all_qos()
if not success:
logger.error(message)
#[success, bridges] = ovscontrol.list_bridges()
#if success:
# for bridge in bridges:
# if bridge.startswith("docklet-br"):
# ovscontrol.del_bridge(bridge)
#else:
# logger.error(bridges)
#[success, message] = ovscontrol.destroy_all_qos()
#if not success:
# logger.error(message)
'''if (self.addr == self.master):
logger.info ("master also on this node. reuse master's network")
else: