diff --git a/bin/docklet-worker b/bin/docklet-worker index 55d6f92..d8affaf 100755 --- a/bin/docklet-worker +++ b/bin/docklet-worker @@ -105,6 +105,8 @@ pre_start () { do_start() { pre_start + + DAEMON_OPTS=$1 log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX" #python3 $DAEMON start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS @@ -178,7 +180,7 @@ do_stop_meter() { case "$1" in start) - do_start + do_start "normal-worker" do_start_batch do_start_proxy ;; @@ -197,11 +199,12 @@ case "$1" in ;; start_batch) - pre_start + do_start "batch-worker" do_start_batch ;; stop_batch) + do_stop do_stop_batch ;; @@ -222,7 +225,7 @@ case "$1" in do_stop do_stop_batch do_stop_proxy - do_start + do_start "normal-worker" do_start_batch do_start_proxy ;; diff --git a/src/master/nodemgr.py b/src/master/nodemgr.py index d3396a6..a623e2f 100755 --- a/src/master/nodemgr.py +++ b/src/master/nodemgr.py @@ -47,6 +47,8 @@ class NodeMgr(object): # get allnodes self.allnodes = self._nodelist_etcd("allnodes") self.runnodes = [] + self.batchnodes = [] + self.allrunnodes = [] [status, runlist] = self.etcd.listdir("machines/runnodes") for node in runlist: nodeip = node['key'].rsplit('/',1)[1] @@ -140,6 +142,14 @@ class NodeMgr(object): #print(etcd_runip) #print(self.rpcs) self.runnodes = etcd_runip + self.batchnodes = self.runnodes.copy() + self.allrunnodes = self.runnodes.copy() + [status, batchlist] = self.etcd.listdir("machines/batchnodes") + if status: + for node in batchlist: + nodeip = node['key'].rsplit('/', 1)[1] + self.batchnodes.append(nodeip) + self.allrunnodes.append(nodeip) def recover_node(self,ip,tasks): logger.info("now recover for worker:%s" % ip) @@ -152,14 +162,19 @@ class NodeMgr(object): # get all run nodes' IP addr def get_nodeips(self): - return self.runnodes + return self.allrunnodes + + def get_batch_nodeips(self): + return self.batchnodes + def get_base_nodeips(self): + return self.runnodes def get_allnodes(self): return self.allnodes def ip_to_rpc(self,ip): - if ip in self.runnodes: + if ip in self.allrunnodes: return xmlrpc.client.ServerProxy("http://%s:%s" % (ip, env.getenv("WORKER_PORT"))) else: logger.info('Worker %s is not connected, create rpc client failed, push task into queue') diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index cd9043b..df16343 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -262,13 +262,13 @@ class TaskMgr(threading.Thread): # if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval: # return self.all_nodes # get running nodes - node_ips = self.nodemgr.get_nodeips() + node_ips = self.nodemgr.get_batch_nodeips() all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips] return all_nodes def is_alive(self, worker): - nodes = self.nodemgr.get_nodeips() + nodes = self.nodemgr.get_batch_nodeips() return worker in nodes diff --git a/src/master/testTaskMgr.py b/src/master/testTaskMgr.py index 9503fdc..85fbeb5 100644 --- a/src/master/testTaskMgr.py +++ b/src/master/testTaskMgr.py @@ -8,7 +8,7 @@ from utils import env class SimulatedNodeMgr(): - def get_nodeips(self): + def get_batch_nodeips(self): return ['0.0.0.0'] @@ -178,4 +178,4 @@ def stop(): worker.stop() jobmgr.stop() - taskmgr.stop() \ No newline at end of file + taskmgr.stop() diff --git a/src/master/vclustermgr.py b/src/master/vclustermgr.py index b7ef839..54ceea1 100755 --- a/src/master/vclustermgr.py +++ b/src/master/vclustermgr.py @@ -118,7 +118,7 @@ class VclusterMgr(object): return [False, "cluster:%s already exists" % clustername] clustersize = int(self.defaultsize) logger.info ("starting cluster %s with %d containers for %s" % (clustername, int(clustersize), username)) - workers = self.nodemgr.get_nodeips() + workers = self.nodemgr.get_base_nodeips() image_json = json.dumps(image) groupname = json.loads(user_info)["data"]["group"] groupquota = json.loads(user_info)["data"]["groupinfo"] @@ -202,7 +202,7 @@ class VclusterMgr(object): def scale_out_cluster(self,clustername,username, image,user_info, setting): if not self.is_cluster(clustername,username): return [False, "cluster:%s not found" % clustername] - workers = self.nodemgr.get_nodeips() + workers = self.nodemgr.get_base_nodeips() if (len(workers) == 0): logger.warning("no workers to start containers, scale out failed") return [False, "no workers are running"] diff --git a/src/worker/worker.py b/src/worker/worker.py index 88839c7..5977a5a 100755 --- a/src/worker/worker.py +++ b/src/worker/worker.py @@ -57,17 +57,23 @@ class Worker(object): self.etcd = etcdclient self.master = self.etcd.getkey("service/master")[1] - self.mode=None + self.mode = None + self.workertype = "normal" + self.key="" - # waiting state is preserved for compatible. - self.etcd.setkey("machines/runnodes/"+self.addr, "waiting") - # get this node's key to judge how to init. - [status, key] = self.etcd.getkey("machines/runnodes/"+self.addr) - if status: - self.key = generatekey("machines/allnodes/"+self.addr) - else: - logger.error("get key failed. %s" % 'machines/runnodes/'+self.addr) - sys.exit(1) + if len(sys.argv) > 1 and sys.argv[1] == "batch-worker": + self.workertype = "batch" + + if self.workertype == "normal": + # waiting state is preserved for compatible. + self.etcd.setkey("machines/runnodes/"+self.addr, "waiting") + # get this node's key to judge how to init. + [status, key] = self.etcd.getkey("machines/runnodes/"+self.addr) + if status: + self.key = generatekey("machines/allnodes/"+self.addr) + else: + logger.error("get key failed. %s" % 'machines/runnodes/'+self.addr) + sys.exit(1) # check token to check global directory [status, token_1] = self.etcd.getkey("token") @@ -87,7 +93,8 @@ class Worker(object): if node['key'] == self.key: value = 'init-recovery' break - logger.info("worker start in "+value+" mode") + + logger.info("worker start in "+value+" mode, worker type is"+self.workertype) Containers = container.Container(self.addr, etcdclient) if value == 'init-new': @@ -193,7 +200,8 @@ class Worker(object): self.hosts_collector.start() logger.info("Monitor Collector has been started.") # worker change it state itself. Independedntly from master. - self.etcd.setkey("machines/runnodes/"+self.addr, "work") + if self.workertype == "normal": + self.etcd.setkey("machines/runnodes/"+self.addr, "work") publicIP = env.getenv("PUBLIC_IP") self.etcd.setkey("machines/publicIP/"+self.addr,publicIP) self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat) @@ -204,17 +212,22 @@ class Worker(object): # send heardbeat package to keep alive in etcd, ttl=2s def sendheartbeat(self): - while(True): - # check send heartbeat package every 1s - time.sleep(2) - [status, value] = self.etcd.getkey("machines/runnodes/"+self.addr) - if status: - # master has know the worker so we start send heartbeat package - if value=='ok': - self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 3) - else: - logger.error("get key %s failed, master may be crashed" % self.addr) - self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) + if self.workertype == "normal": + while(True): + # check send heartbeat package every 1s + time.sleep(2) + [status, value] = self.etcd.getkey("machines/runnodes/"+self.addr) + if status: + # master has know the worker so we start send heartbeat package + if value=='ok': + self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 3) + else: + logger.error("get key %s failed, master may be crashed" % self.addr) + self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) + elif self.workertype == "batch": + while(True): + time.sleep(2) + self.etcd.setkey("machines/batchnodes/"+self.addr, "ok", ttl = 60) if __name__ == '__main__':