Merge pull request #81 from SourceZh/separate

Independent starting of master and worker. But not work for **init mode** of startup.
This commit is contained in:
leebaok 2016-05-04 06:37:05 -05:00
commit a17aff2b20
6 changed files with 77 additions and 69 deletions

View File

@ -3,7 +3,6 @@
[ $(id -u) != '0' ] && echo "root is needed" && exit 1
# get some path of docklet
bindir=${0%/*}
# $bindir maybe like /opt/docklet/src/../sbin
# use command below to make $bindir in normal absolute path

View File

@ -43,8 +43,6 @@ PIDFILE=$RUN_DIR/$DAEMON_NAME.pid
###########
pre_start () {
log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX"
[ ! -d $FS_PREFIX/global ] && mkdir -p $FS_PREFIX/global
[ ! -d $FS_PREFIX/local ] && mkdir -p $FS_PREFIX/local
[ ! -d $FS_PREFIX/global/users ] && mkdir -p $FS_PREFIX/global/users
@ -81,6 +79,7 @@ pre_start () {
do_start() {
pre_start
log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX"
start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS
log_end_msg $?
}
@ -93,6 +92,7 @@ do_stop () {
case "$1" in
start)
do_start
@ -115,7 +115,6 @@ case "$1" in
status)
status_of_proc -p $PIDFILE "$DAEMON" "$DAEMON_NAME" && exit 0 || exit $?
;;
*)
echo "Usage: $DAEMON_NAME {start|stop|restart|status}"
exit 1

View File

@ -198,5 +198,4 @@ class Client(object):
else:
return [False, 'you are not lock holder']
else:
return [False, 'no one holds this lock']
return [False, 'no one holds this lock']

View File

@ -8,6 +8,7 @@ from flask import Flask, request
# must first init loadenv
import tools, env
# default CONFIG=/opt/docklet/local/docklet-running.conf
config = env.getenv("CONFIG")
tools.loadenv(config)
@ -25,6 +26,7 @@ import userManager
import monitor
import guest_control, threading
#default EXTERNAL_LOGIN=False
external_login = env.getenv('EXTERNAL_LOGIN')
if (external_login == 'TRUE'):
from userDependence import external_auth
@ -645,6 +647,7 @@ if __name__ == '__main__':
etcdclient.clean()
else:
etcdclient.createdir("")
# token is saved at fs_path/golbal/token
token = tools.gen_token()
tokenfile = open(fs_path+"/global/token", 'w')
tokenfile.write(token)
@ -679,9 +682,6 @@ if __name__ == '__main__':
etcdclient.setkey("service/mode", mode)
if etcdclient.isdir("_lock")[0]:
etcdclient.deldir("_lock")
if etcdclient.isdir("machines/runnodes")[0]:
etcdclient.deldir("machines/runnodes")
etcdclient.createdir("machines/runnodes")
G_usermgr = userManager.userManager('root')
clusternet = env.getenv("CLUSTER_NET")
@ -708,7 +708,7 @@ if __name__ == '__main__':
masterport = env.getenv('MASTER_PORT')
logger.info("using MASTER_PORT %d", int(masterport))
# server = http.server.HTTPServer((masterip, masterport), DockletHttpHandler)
# server = http.server.HTTPServer((masterip, masterport), DockletHttpHandler)
logger.info("starting master server")
app.run(host = masterip, port = masterport, threaded=True)

View File

@ -22,6 +22,7 @@ class NodeMgr(object):
self.networkmgr = networkmgr
self.etcd = etcdclient
self.mode = mode
self.workerport = env.getenv('WORKER_PORT')
# initialize the network
logger.info ("initialize network")
@ -45,16 +46,24 @@ class NodeMgr(object):
logger.error("docklet-br not found")
sys.exit(1)
# init rpc list
self.rpcs = []
# get allnodes
self.allnodes = self._nodelist_etcd("allnodes")
self.runnodes = self._nodelist_etcd("runnodes")
self.runnodes = []
[status, runlist] = self.etcd.listdir("machines/runnodes")
for node in runlist:
nodeip = node['key'].rsplit('/',1)[1]
if node['value'] == 'ok':
logger.info ("running node %s" % nodeip)
self.runnodes.append(nodeip)
self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s" % (nodeip, self.workerport)))
logger.info ("add %s:%s in rpc client list" % (nodeip, self.workerport))
logger.info ("all nodes are: %s" % self.allnodes)
logger.info ("run nodes are: %s" % self.runnodes)
if len(self.runnodes)>0:
logger.error ("init runnodes is not null, need to be clean")
sys.exit(1)
# init rpc list
self.rpcs = []
# start new thread to watch whether a new node joins
logger.info ("start thread to watch new nodes ...")
self.thread_watchnewnode = threading.Thread(target=self._watchnewnode)
@ -86,7 +95,6 @@ class NodeMgr(object):
# thread target : watch whether a new node joins
def _watchnewnode(self):
workerport = env.getenv('WORKER_PORT')
while(True):
time.sleep(0.1)
[status, runlist] = self.etcd.listdir("machines/runnodes")
@ -97,25 +105,6 @@ class NodeMgr(object):
nodeip = node['key'].rsplit('/',1)[1]
if node['value']=='waiting':
logger.info ("%s want to joins, call it to init first" % nodeip)
# 'docklet-br' of worker do not need IP Addr. Not need to allocate an IP to it
#if nodeip != self.addr:
# [status, result] = self.networkmgr.acquire_sysips_cidr()
# self.networkmgr.printpools()
# if not status:
# logger.error("no IP for worker bridge, please check network system pool")
# continue
# bridgeip = result[0]
# self.etcd.setkey("network/workbridge", bridgeip)
if nodeip in self.allnodes:
######## HERE MAYBE NEED TO FIX ###############
# here we must use "machines/runnodes/nodeip"
# we cannot use node['key'], node['key'] is absolute
# path, etcd client will append the path to prefix,
# which is wrong
###############################################
self.etcd.setkey("machines/runnodes/"+nodeip, "init-"+self.mode)
else:
self.etcd.setkey('machines/runnodes/'+nodeip, "init-new")
elif node['value']=='work':
logger.info ("new node %s joins" % nodeip)
# setup GRE tunnels for new nodes
@ -127,18 +116,18 @@ class NodeMgr(object):
logger.debug("GRE for %s already exists, reuse it" % nodeip)
else:
netcontrol.setup_gre('docklet-br', nodeip)
self.runnodes.append(nodeip)
self.etcd.setkey("machines/runnodes/"+nodeip, "ok")
if nodeip not in self.allnodes:
self.allnodes.append(nodeip)
self.etcd.setkey("machines/allnodes/"+nodeip, "ok")
logger.debug ("all nodes are: %s" % self.allnodes)
logger.debug ("run nodes are: %s" % self.runnodes)
self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s"
% (nodeip, workerport)))
logger.info ("add %s:%s in rpc client list" %
(nodeip, workerport))
if nodeip not in self.runnodes:
self.runnodes.append(nodeip)
if nodeip not in self.allnodes:
self.allnodes.append(nodeip)
self.etcd.setkey("machines/allnodes/"+nodeip, "ok")
logger.debug ("all nodes are: %s" % self.allnodes)
logger.debug ("run nodes are: %s" % self.runnodes)
self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s"
% (nodeip, self.workerport)))
logger.info ("add %s:%s in rpc client list" %
(nodeip, self.workerport))
# get all run nodes' IP addr
def get_nodeips(self):
return self.allnodes

View File

@ -12,6 +12,7 @@ from log import logger
import xmlrpc.server, sys, time
from socketserver import ThreadingMixIn
import threading
import etcdlib, network, container
from nettools import netcontrol
import monitor
@ -32,6 +33,11 @@ from lvmtool import *
# start rpc service
##################################################################
# imitate etcdlib to genernate the key of etcdlib manually
def generatekey(path):
clustername = env.getenv("CLUSTER_NAME")
return '/'+clustername+'/'+path
class ThreadXMLRPCServer(ThreadingMixIn,xmlrpc.server.SimpleXMLRPCServer):
pass
@ -48,30 +54,31 @@ class Worker(object):
self.master = self.etcd.getkey("service/master")[1]
self.mode=None
# register self to master
self.etcd.setkey("machines/runnodes/"+self.addr, "waiting")
for f in range (0, 3):
[status, value] = self.etcd.getkey("machines/runnodes/"+self.addr)
if not value.startswith("init"):
# master wakesup every 0.1s to check register
logger.debug("worker % register to master failed %d \
time, sleep %fs" % (self.addr, f+1, 0.1))
time.sleep(0.1)
else:
break
if value.startswith("init"):
# check token to check global directory
[status, token_1] = self.etcd.getkey("token")
tokenfile = open(self.fspath+"/global/token", 'r')
token_2 = tokenfile.readline().strip()
if token_1 != token_2:
logger.error("check token failed, global directory is not a shared filesystem")
sys.exit(1)
[status, key] = self.etcd.getkey("machines/runnodes/"+self.addr)
if status:
self.key = generatekey("machines/runnodes/"+self.addr)
else:
logger.error ("worker register in machines/runnodes failed, maybe master not start")
logger.error("get key failed. %s" % node)
sys.exit(1)
logger.info ("worker registered in master and checked the token")
# check token to check global directory
[status, token_1] = self.etcd.getkey("token")
tokenfile = open(self.fspath+"/global/token", 'r')
token_2 = tokenfile.readline().strip()
if token_1 != token_2:
logger.error("check token failed, global directory is not a shared filesystem")
sys.exit(1)
logger.info ("worker registered and checked the token")
# worker search all run nodes to judge how to init
value = 'init-new'
[status, runlist] = self.etcd.listdir("machines/runnodes")
for node in runlist:
if node['key'] == self.key:
value = 'init-recovery'
break
logger.info("worker start in "+value+" mode")
Containers = container.Container(self.addr, etcdclient)
if value == 'init-new':
@ -146,10 +153,25 @@ class Worker(object):
# start service of worker
def start(self):
self.etcd.setkey("machines/runnodes/"+self.addr, "work")
self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat)
self.thread_sendheartbeat.start()
# start serving for rpc
logger.info ("begins to work")
self.rpcserver.serve_forever()
# send heardbeat package to keep alive in etcd, ttl=2s
def sendheartbeat(self):
while(True):
# check send heartbeat package every 1s
time.sleep(1)
[status, value] = self.etcd.getkey("machines/runnodes/"+self.addr)
if status:
# master has know the worker so we start send heartbeat package
if value=='ok':
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 2)
else:
logger.error("get key failed. %s" % node)
sys.exit(1)
if __name__ == '__main__':