This commit is contained in:
ooooo 2016-04-20 14:17:12 +08:00
parent cd0df9847f
commit 56242920c4
7 changed files with 79 additions and 66 deletions

View File

@ -3,9 +3,7 @@
[ $(id -u) != '0' ] && echo "root is needed" && exit 1
# get some path of docklet
echo $0
bindir=${0%/*}
echo "bindir=$bindir"
# $bindir maybe like /opt/docklet/src/../sbin
# use command below to make $bindir in normal absolute path
DOCKLET_BIN=$(cd $bindir; pwd)

View File

@ -33,6 +33,7 @@ DAEMON_USER=root
# settings for docklet worker
DAEMON=$DOCKLET_LIB/worker.py
STOP_DEAMON=$DOCKLET_LIB/stopworker.py
DAEMON_NAME=docklet-worker
DAEMON_OPTS=
# The process ID of the script when it runs is stored here:
@ -43,7 +44,6 @@ PIDFILE=$RUN_DIR/$DAEMON_NAME.pid
###########
pre_start () {
log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX"
[ ! -d $FS_PREFIX/global ] && mkdir -p $FS_PREFIX/global
[ ! -d $FS_PREFIX/local ] && mkdir -p $FS_PREFIX/local
@ -81,6 +81,7 @@ pre_start () {
do_start() {
pre_start
log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX"
start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS
log_end_msg $?
}
@ -89,6 +90,10 @@ do_stop () {
log_daemon_msg "Stopping $DAEMON_NAME daemon"
start-stop-daemon --stop --quiet --oknodo --remove-pidfile --pidfile $PIDFILE --retry 10
log_end_msg $?
log_daemon_msg "Change $DAEMON_NAME daemon state"
pre_start
start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $STOP_DAEMON -- $DAEMON_OPTS
log_end_msg $?
}

View File

@ -199,4 +199,13 @@ class Client(object):
return [False, 'you are not lock holder']
else:
return [False, 'no one holds this lock']
def getnode(self, key):
key = key.strip("/")
out = dorequest(self.keysurl+key)
if 'action' not in out:
return [False, "key not found"]
elif 'dir' in out:
return [False, dirname+" is a directory"]
else:
return [True, {"key":out['node']['key'], 'value':out['node']['value']}]

View File

@ -565,9 +565,6 @@ if __name__ == '__main__':
etcdclient.setkey("service/mode", mode)
if etcdclient.isdir("_lock")[0]:
etcdclient.deldir("_lock")
if etcdclient.isdir("machines/runnodes")[0]:
etcdclient.deldir("machines/runnodes")
etcdclient.createdir("machines/runnodes")
G_usermgr = userManager.userManager('root')
clusternet = env.getenv("CLUSTER_NET")

View File

@ -22,6 +22,7 @@ class NodeMgr(object):
self.networkmgr = networkmgr
self.etcd = etcdclient
self.mode = mode
self.workerport = env.getenv('WORKER_PORT')
# initialize the network
logger.info ("initialize network")
@ -45,16 +46,24 @@ class NodeMgr(object):
logger.error("docklet-br not found")
sys.exit(1)
# get allnodes
self.allnodes = self._nodelist_etcd("allnodes")
self.runnodes = self._nodelist_etcd("runnodes")
logger.info ("all nodes are: %s" % self.allnodes)
logger.info ("run nodes are: %s" % self.runnodes)
if len(self.runnodes)>0:
logger.error ("init runnodes is not null, need to be clean")
sys.exit(1)
# init rpc list
self.rpcs = []
# get allnodes
self.allnodes = self._nodelist_etcd("allnodes")
self.runnodes = []
[status, runlist] = self.etcd.listdir("machines/runnodes")
for node in runlist:
nodeip = node['key'].rsplit('/',1)[1]
if node['value'] == 'ok':
logger.info ("running node %s" % nodeip)
self.runnodes.append(nodeip)
self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s" % (nodeip, self.workerport)))
logger.info ("add %s:%s in rpc client list" % (nodeip, self.workerport))
logger.info ("all nodes are: %s" % self.allnodes)
logger.info ("run nodes are: %s" % self.runnodes)
# start new thread to watch whether a new node joins
logger.info ("start thread to watch new nodes ...")
self.thread_watchnewnode = threading.Thread(target=self._watchnewnode)
@ -86,7 +95,6 @@ class NodeMgr(object):
# thread target : watch whether a new node joins
def _watchnewnode(self):
workerport = env.getenv('WORKER_PORT')
while(True):
time.sleep(0.1)
[status, runlist] = self.etcd.listdir("machines/runnodes")
@ -97,25 +105,6 @@ class NodeMgr(object):
nodeip = node['key'].rsplit('/',1)[1]
if node['value']=='waiting':
logger.info ("%s want to joins, call it to init first" % nodeip)
# 'docklet-br' of worker do not need IP Addr. Not need to allocate an IP to it
#if nodeip != self.addr:
# [status, result] = self.networkmgr.acquire_sysips_cidr()
# self.networkmgr.printpools()
# if not status:
# logger.error("no IP for worker bridge, please check network system pool")
# continue
# bridgeip = result[0]
# self.etcd.setkey("network/workbridge", bridgeip)
if nodeip in self.allnodes:
######## HERE MAYBE NEED TO FIX ###############
# here we must use "machines/runnodes/nodeip"
# we cannot use node['key'], node['key'] is absolute
# path, etcd client will append the path to prefix,
# which is wrong
###############################################
self.etcd.setkey("machines/runnodes/"+nodeip, "init-"+self.mode)
else:
self.etcd.setkey('machines/runnodes/'+nodeip, "init-new")
elif node['value']=='work':
logger.info ("new node %s joins" % nodeip)
# setup GRE tunnels for new nodes
@ -127,17 +116,18 @@ class NodeMgr(object):
logger.debug("GRE for %s already exists, reuse it" % nodeip)
else:
netcontrol.setup_gre('docklet-br', nodeip)
self.runnodes.append(nodeip)
self.etcd.setkey("machines/runnodes/"+nodeip, "ok")
if nodeip not in self.allnodes:
self.allnodes.append(nodeip)
self.etcd.setkey("machines/allnodes/"+nodeip, "ok")
logger.debug ("all nodes are: %s" % self.allnodes)
logger.debug ("run nodes are: %s" % self.runnodes)
self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s"
% (nodeip, workerport)))
logger.info ("add %s:%s in rpc client list" %
(nodeip, workerport))
if nodeip not in self.runnodes:
self.runnodes.append(nodeip)
if nodeip not in self.allnodes:
self.allnodes.append(nodeip)
self.etcd.setkey("machines/allnodes/"+nodeip, "ok")
logger.debug ("all nodes are: %s" % self.allnodes)
logger.debug ("run nodes are: %s" % self.runnodes)
self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s"
% (nodeip, self.workerport)))
logger.info ("add %s:%s in rpc client list" %
(nodeip, self.workerport))
# get all run nodes' IP addr
def get_nodeips(self):

13
src/stopworker.py Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/python3
import env,tools
config = env.getenv("CONFIG")
tools.loadenv(config)
import etcdlib, network
if __name__ == '__main__':
etcdaddr = env.getenv("ETCD")
clustername = env.getenv("CLUSTER_NAME")
etcdclient = etcdlib.Client(etcdaddr, prefix = clustername)
net_dev = env.getenv("NETWORK_DEVICE")
ipaddr = network.getip(net_dev)
etcdclient.setkey("machines/runnodes/"+ipaddr, "stop")

View File

@ -48,30 +48,31 @@ class Worker(object):
self.master = self.etcd.getkey("service/master")[1]
self.mode=None
# register self to master
self.etcd.setkey("machines/runnodes/"+self.addr, "waiting")
for f in range (0, 3):
[status, value] = self.etcd.getkey("machines/runnodes/"+self.addr)
if not value.startswith("init"):
# master wakesup every 0.1s to check register
logger.debug("worker % register to master failed %d \
time, sleep %fs" % (self.addr, f+1, 0.1))
time.sleep(0.1)
else:
break
if value.startswith("init"):
# check token to check global directory
[status, token_1] = self.etcd.getkey("token")
tokenfile = open(self.fspath+"/global/token", 'r')
token_2 = tokenfile.readline().strip()
if token_1 != token_2:
logger.error("check token failed, global directory is not a shared filesystem")
sys.exit(1)
[status, node] = self.etcd.getnode("machines/runnodes/"+self.addr)
if status:
self.key = node['key']
else:
logger.error ("worker register in machines/runnodes failed, maybe master not start")
logger.error("get key failed. %s" % node)
sys.exit(1)
logger.info ("worker registered in master and checked the token")
# check token to check global directory
[status, token_1] = self.etcd.getkey("token")
tokenfile = open(self.fspath+"/global/token", 'r')
token_2 = tokenfile.readline().strip()
if token_1 != token_2:
logger.error("check token failed, global directory is not a shared filesystem")
sys.exit(1)
logger.info ("worker registered and checked the token")
# worker itself to judge how to init
value = 'init-new'
[status, runlist] = self.etcd.listdir("machines/runnodes")
for node in runlist:
if node['key'] == self.key:
value = 'init-recovery'
break
logger.info("worker start in "+value+" mode")
Containers = container.Container(self.addr, etcdclient)
if value == 'init-new':