diff --git a/bin/docklet-master b/bin/docklet-master index 954dbda..861bfd3 100755 --- a/bin/docklet-master +++ b/bin/docklet-master @@ -3,9 +3,7 @@ [ $(id -u) != '0' ] && echo "root is needed" && exit 1 # get some path of docklet -echo $0 bindir=${0%/*} -echo "bindir=$bindir" # $bindir maybe like /opt/docklet/src/../sbin # use command below to make $bindir in normal absolute path DOCKLET_BIN=$(cd $bindir; pwd) diff --git a/bin/docklet-worker b/bin/docklet-worker index 6e95a84..5ca5fe4 100755 --- a/bin/docklet-worker +++ b/bin/docklet-worker @@ -33,6 +33,7 @@ DAEMON_USER=root # settings for docklet worker DAEMON=$DOCKLET_LIB/worker.py +STOP_DEAMON=$DOCKLET_LIB/stopworker.py DAEMON_NAME=docklet-worker DAEMON_OPTS= # The process ID of the script when it runs is stored here: @@ -43,7 +44,6 @@ PIDFILE=$RUN_DIR/$DAEMON_NAME.pid ########### pre_start () { - log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX" [ ! -d $FS_PREFIX/global ] && mkdir -p $FS_PREFIX/global [ ! -d $FS_PREFIX/local ] && mkdir -p $FS_PREFIX/local @@ -81,6 +81,7 @@ pre_start () { do_start() { pre_start + log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX" start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS log_end_msg $? } @@ -89,6 +90,10 @@ do_stop () { log_daemon_msg "Stopping $DAEMON_NAME daemon" start-stop-daemon --stop --quiet --oknodo --remove-pidfile --pidfile $PIDFILE --retry 10 log_end_msg $? + log_daemon_msg "Change $DAEMON_NAME daemon state" + pre_start + start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $STOP_DAEMON -- $DAEMON_OPTS + log_end_msg $? } diff --git a/src/etcdlib.py b/src/etcdlib.py index 757c6a2..af15937 100755 --- a/src/etcdlib.py +++ b/src/etcdlib.py @@ -199,4 +199,13 @@ class Client(object): return [False, 'you are not lock holder'] else: return [False, 'no one holds this lock'] - + + def getnode(self, key): + key = key.strip("/") + out = dorequest(self.keysurl+key) + if 'action' not in out: + return [False, "key not found"] + elif 'dir' in out: + return [False, dirname+" is a directory"] + else: + return [True, {"key":out['node']['key'], 'value':out['node']['value']}] \ No newline at end of file diff --git a/src/httprest.py b/src/httprest.py index 6fbfc4f..60f8b42 100755 --- a/src/httprest.py +++ b/src/httprest.py @@ -565,9 +565,6 @@ if __name__ == '__main__': etcdclient.setkey("service/mode", mode) if etcdclient.isdir("_lock")[0]: etcdclient.deldir("_lock") - if etcdclient.isdir("machines/runnodes")[0]: - etcdclient.deldir("machines/runnodes") - etcdclient.createdir("machines/runnodes") G_usermgr = userManager.userManager('root') clusternet = env.getenv("CLUSTER_NET") diff --git a/src/nodemgr.py b/src/nodemgr.py index ffd1a09..3bcd22e 100755 --- a/src/nodemgr.py +++ b/src/nodemgr.py @@ -22,6 +22,7 @@ class NodeMgr(object): self.networkmgr = networkmgr self.etcd = etcdclient self.mode = mode + self.workerport = env.getenv('WORKER_PORT') # initialize the network logger.info ("initialize network") @@ -45,16 +46,24 @@ class NodeMgr(object): logger.error("docklet-br not found") sys.exit(1) - # get allnodes - self.allnodes = self._nodelist_etcd("allnodes") - self.runnodes = self._nodelist_etcd("runnodes") - logger.info ("all nodes are: %s" % self.allnodes) - logger.info ("run nodes are: %s" % self.runnodes) - if len(self.runnodes)>0: - logger.error ("init runnodes is not null, need to be clean") - sys.exit(1) # init rpc list self.rpcs = [] + + # get allnodes + self.allnodes = self._nodelist_etcd("allnodes") + self.runnodes = [] + [status, runlist] = self.etcd.listdir("machines/runnodes") + for node in runlist: + nodeip = node['key'].rsplit('/',1)[1] + if node['value'] == 'ok': + logger.info ("running node %s" % nodeip) + self.runnodes.append(nodeip) + self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s" % (nodeip, self.workerport))) + logger.info ("add %s:%s in rpc client list" % (nodeip, self.workerport)) + + logger.info ("all nodes are: %s" % self.allnodes) + logger.info ("run nodes are: %s" % self.runnodes) + # start new thread to watch whether a new node joins logger.info ("start thread to watch new nodes ...") self.thread_watchnewnode = threading.Thread(target=self._watchnewnode) @@ -86,7 +95,6 @@ class NodeMgr(object): # thread target : watch whether a new node joins def _watchnewnode(self): - workerport = env.getenv('WORKER_PORT') while(True): time.sleep(0.1) [status, runlist] = self.etcd.listdir("machines/runnodes") @@ -97,25 +105,6 @@ class NodeMgr(object): nodeip = node['key'].rsplit('/',1)[1] if node['value']=='waiting': logger.info ("%s want to joins, call it to init first" % nodeip) - # 'docklet-br' of worker do not need IP Addr. Not need to allocate an IP to it - #if nodeip != self.addr: - # [status, result] = self.networkmgr.acquire_sysips_cidr() - # self.networkmgr.printpools() - # if not status: - # logger.error("no IP for worker bridge, please check network system pool") - # continue - # bridgeip = result[0] - # self.etcd.setkey("network/workbridge", bridgeip) - if nodeip in self.allnodes: - ######## HERE MAYBE NEED TO FIX ############### - # here we must use "machines/runnodes/nodeip" - # we cannot use node['key'], node['key'] is absolute - # path, etcd client will append the path to prefix, - # which is wrong - ############################################### - self.etcd.setkey("machines/runnodes/"+nodeip, "init-"+self.mode) - else: - self.etcd.setkey('machines/runnodes/'+nodeip, "init-new") elif node['value']=='work': logger.info ("new node %s joins" % nodeip) # setup GRE tunnels for new nodes @@ -127,17 +116,18 @@ class NodeMgr(object): logger.debug("GRE for %s already exists, reuse it" % nodeip) else: netcontrol.setup_gre('docklet-br', nodeip) - self.runnodes.append(nodeip) self.etcd.setkey("machines/runnodes/"+nodeip, "ok") - if nodeip not in self.allnodes: - self.allnodes.append(nodeip) - self.etcd.setkey("machines/allnodes/"+nodeip, "ok") - logger.debug ("all nodes are: %s" % self.allnodes) - logger.debug ("run nodes are: %s" % self.runnodes) - self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s" - % (nodeip, workerport))) - logger.info ("add %s:%s in rpc client list" % - (nodeip, workerport)) + if nodeip not in self.runnodes: + self.runnodes.append(nodeip) + if nodeip not in self.allnodes: + self.allnodes.append(nodeip) + self.etcd.setkey("machines/allnodes/"+nodeip, "ok") + logger.debug ("all nodes are: %s" % self.allnodes) + logger.debug ("run nodes are: %s" % self.runnodes) + self.rpcs.append(xmlrpc.client.ServerProxy("http://%s:%s" + % (nodeip, self.workerport))) + logger.info ("add %s:%s in rpc client list" % + (nodeip, self.workerport)) # get all run nodes' IP addr def get_nodeips(self): diff --git a/src/stopworker.py b/src/stopworker.py new file mode 100755 index 0000000..b73e69e --- /dev/null +++ b/src/stopworker.py @@ -0,0 +1,13 @@ +#!/usr/bin/python3 +import env,tools +config = env.getenv("CONFIG") +tools.loadenv(config) +import etcdlib, network + +if __name__ == '__main__': + etcdaddr = env.getenv("ETCD") + clustername = env.getenv("CLUSTER_NAME") + etcdclient = etcdlib.Client(etcdaddr, prefix = clustername) + net_dev = env.getenv("NETWORK_DEVICE") + ipaddr = network.getip(net_dev) + etcdclient.setkey("machines/runnodes/"+ipaddr, "stop") \ No newline at end of file diff --git a/src/worker.py b/src/worker.py index 87ebb33..47ee1f3 100755 --- a/src/worker.py +++ b/src/worker.py @@ -48,30 +48,31 @@ class Worker(object): self.master = self.etcd.getkey("service/master")[1] self.mode=None - # register self to master self.etcd.setkey("machines/runnodes/"+self.addr, "waiting") - for f in range (0, 3): - [status, value] = self.etcd.getkey("machines/runnodes/"+self.addr) - if not value.startswith("init"): - # master wakesup every 0.1s to check register - logger.debug("worker % register to master failed %d \ - time, sleep %fs" % (self.addr, f+1, 0.1)) - time.sleep(0.1) - else: - break - - if value.startswith("init"): - # check token to check global directory - [status, token_1] = self.etcd.getkey("token") - tokenfile = open(self.fspath+"/global/token", 'r') - token_2 = tokenfile.readline().strip() - if token_1 != token_2: - logger.error("check token failed, global directory is not a shared filesystem") - sys.exit(1) + [status, node] = self.etcd.getnode("machines/runnodes/"+self.addr) + if status: + self.key = node['key'] else: - logger.error ("worker register in machines/runnodes failed, maybe master not start") + logger.error("get key failed. %s" % node) sys.exit(1) - logger.info ("worker registered in master and checked the token") + + # check token to check global directory + [status, token_1] = self.etcd.getkey("token") + tokenfile = open(self.fspath+"/global/token", 'r') + token_2 = tokenfile.readline().strip() + if token_1 != token_2: + logger.error("check token failed, global directory is not a shared filesystem") + sys.exit(1) + logger.info ("worker registered and checked the token") + + # worker itself to judge how to init + value = 'init-new' + [status, runlist] = self.etcd.listdir("machines/runnodes") + for node in runlist: + if node['key'] == self.key: + value = 'init-recovery' + break + logger.info("worker start in "+value+" mode") Containers = container.Container(self.addr, etcdclient) if value == 'init-new':