Merge pull request #345 from zhongyehong/batch

divide worker into two kinds
This commit is contained in:
zhong yehong 2018-10-21 20:28:42 +08:00 committed by GitHub
commit 8865e205d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 65 additions and 34 deletions

View File

@ -105,6 +105,8 @@ pre_start () {
do_start() { do_start() {
pre_start pre_start
DAEMON_OPTS=$1
log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX" log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX"
#python3 $DAEMON #python3 $DAEMON
start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS
@ -178,7 +180,7 @@ do_stop_meter() {
case "$1" in case "$1" in
start) start)
do_start do_start "normal-worker"
do_start_batch do_start_batch
do_start_proxy do_start_proxy
;; ;;
@ -197,11 +199,12 @@ case "$1" in
;; ;;
start_batch) start_batch)
pre_start do_start "batch-worker"
do_start_batch do_start_batch
;; ;;
stop_batch) stop_batch)
do_stop
do_stop_batch do_stop_batch
;; ;;
@ -222,7 +225,7 @@ case "$1" in
do_stop do_stop
do_stop_batch do_stop_batch
do_stop_proxy do_stop_proxy
do_start do_start "normal-worker"
do_start_batch do_start_batch
do_start_proxy do_start_proxy
;; ;;

View File

@ -47,6 +47,8 @@ class NodeMgr(object):
# get allnodes # get allnodes
self.allnodes = self._nodelist_etcd("allnodes") self.allnodes = self._nodelist_etcd("allnodes")
self.runnodes = [] self.runnodes = []
self.batchnodes = []
self.allrunnodes = []
[status, runlist] = self.etcd.listdir("machines/runnodes") [status, runlist] = self.etcd.listdir("machines/runnodes")
for node in runlist: for node in runlist:
nodeip = node['key'].rsplit('/',1)[1] nodeip = node['key'].rsplit('/',1)[1]
@ -140,6 +142,14 @@ class NodeMgr(object):
#print(etcd_runip) #print(etcd_runip)
#print(self.rpcs) #print(self.rpcs)
self.runnodes = etcd_runip self.runnodes = etcd_runip
self.batchnodes = self.runnodes.copy()
self.allrunnodes = self.runnodes.copy()
[status, batchlist] = self.etcd.listdir("machines/batchnodes")
if status:
for node in batchlist:
nodeip = node['key'].rsplit('/', 1)[1]
self.batchnodes.append(nodeip)
self.allrunnodes.append(nodeip)
def recover_node(self,ip,tasks): def recover_node(self,ip,tasks):
logger.info("now recover for worker:%s" % ip) logger.info("now recover for worker:%s" % ip)
@ -152,14 +162,19 @@ class NodeMgr(object):
# get all run nodes' IP addr # get all run nodes' IP addr
def get_nodeips(self): def get_nodeips(self):
return self.runnodes return self.allrunnodes
def get_batch_nodeips(self):
return self.batchnodes
def get_base_nodeips(self):
return self.runnodes
def get_allnodes(self): def get_allnodes(self):
return self.allnodes return self.allnodes
def ip_to_rpc(self,ip): def ip_to_rpc(self,ip):
if ip in self.runnodes: if ip in self.allrunnodes:
return xmlrpc.client.ServerProxy("http://%s:%s" % (ip, env.getenv("WORKER_PORT"))) return xmlrpc.client.ServerProxy("http://%s:%s" % (ip, env.getenv("WORKER_PORT")))
else: else:
logger.info('Worker %s is not connected, create rpc client failed, push task into queue') logger.info('Worker %s is not connected, create rpc client failed, push task into queue')

View File

@ -262,13 +262,13 @@ class TaskMgr(threading.Thread):
# if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval: # if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
# return self.all_nodes # return self.all_nodes
# get running nodes # get running nodes
node_ips = self.nodemgr.get_nodeips() node_ips = self.nodemgr.get_batch_nodeips()
all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips] all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips]
return all_nodes return all_nodes
def is_alive(self, worker): def is_alive(self, worker):
nodes = self.nodemgr.get_nodeips() nodes = self.nodemgr.get_batch_nodeips()
return worker in nodes return worker in nodes

View File

@ -8,7 +8,7 @@ from utils import env
class SimulatedNodeMgr(): class SimulatedNodeMgr():
def get_nodeips(self): def get_batch_nodeips(self):
return ['0.0.0.0'] return ['0.0.0.0']
@ -178,4 +178,4 @@ def stop():
worker.stop() worker.stop()
jobmgr.stop() jobmgr.stop()
taskmgr.stop() taskmgr.stop()

View File

@ -118,7 +118,7 @@ class VclusterMgr(object):
return [False, "cluster:%s already exists" % clustername] return [False, "cluster:%s already exists" % clustername]
clustersize = int(self.defaultsize) clustersize = int(self.defaultsize)
logger.info ("starting cluster %s with %d containers for %s" % (clustername, int(clustersize), username)) logger.info ("starting cluster %s with %d containers for %s" % (clustername, int(clustersize), username))
workers = self.nodemgr.get_nodeips() workers = self.nodemgr.get_base_nodeips()
image_json = json.dumps(image) image_json = json.dumps(image)
groupname = json.loads(user_info)["data"]["group"] groupname = json.loads(user_info)["data"]["group"]
groupquota = json.loads(user_info)["data"]["groupinfo"] groupquota = json.loads(user_info)["data"]["groupinfo"]
@ -202,7 +202,7 @@ class VclusterMgr(object):
def scale_out_cluster(self,clustername,username, image,user_info, setting): def scale_out_cluster(self,clustername,username, image,user_info, setting):
if not self.is_cluster(clustername,username): if not self.is_cluster(clustername,username):
return [False, "cluster:%s not found" % clustername] return [False, "cluster:%s not found" % clustername]
workers = self.nodemgr.get_nodeips() workers = self.nodemgr.get_base_nodeips()
if (len(workers) == 0): if (len(workers) == 0):
logger.warning("no workers to start containers, scale out failed") logger.warning("no workers to start containers, scale out failed")
return [False, "no workers are running"] return [False, "no workers are running"]

View File

@ -57,17 +57,23 @@ class Worker(object):
self.etcd = etcdclient self.etcd = etcdclient
self.master = self.etcd.getkey("service/master")[1] self.master = self.etcd.getkey("service/master")[1]
self.mode=None self.mode = None
self.workertype = "normal"
self.key=""
# waiting state is preserved for compatible. if len(sys.argv) > 1 and sys.argv[1] == "batch-worker":
self.etcd.setkey("machines/runnodes/"+self.addr, "waiting") self.workertype = "batch"
# get this node's key to judge how to init.
[status, key] = self.etcd.getkey("machines/runnodes/"+self.addr) if self.workertype == "normal":
if status: # waiting state is preserved for compatible.
self.key = generatekey("machines/allnodes/"+self.addr) self.etcd.setkey("machines/runnodes/"+self.addr, "waiting")
else: # get this node's key to judge how to init.
logger.error("get key failed. %s" % 'machines/runnodes/'+self.addr) [status, key] = self.etcd.getkey("machines/runnodes/"+self.addr)
sys.exit(1) if status:
self.key = generatekey("machines/allnodes/"+self.addr)
else:
logger.error("get key failed. %s" % 'machines/runnodes/'+self.addr)
sys.exit(1)
# check token to check global directory # check token to check global directory
[status, token_1] = self.etcd.getkey("token") [status, token_1] = self.etcd.getkey("token")
@ -87,7 +93,8 @@ class Worker(object):
if node['key'] == self.key: if node['key'] == self.key:
value = 'init-recovery' value = 'init-recovery'
break break
logger.info("worker start in "+value+" mode")
logger.info("worker start in "+value+" mode, worker type is"+self.workertype)
Containers = container.Container(self.addr, etcdclient) Containers = container.Container(self.addr, etcdclient)
if value == 'init-new': if value == 'init-new':
@ -193,7 +200,8 @@ class Worker(object):
self.hosts_collector.start() self.hosts_collector.start()
logger.info("Monitor Collector has been started.") logger.info("Monitor Collector has been started.")
# worker change it state itself. Independedntly from master. # worker change it state itself. Independedntly from master.
self.etcd.setkey("machines/runnodes/"+self.addr, "work") if self.workertype == "normal":
self.etcd.setkey("machines/runnodes/"+self.addr, "work")
publicIP = env.getenv("PUBLIC_IP") publicIP = env.getenv("PUBLIC_IP")
self.etcd.setkey("machines/publicIP/"+self.addr,publicIP) self.etcd.setkey("machines/publicIP/"+self.addr,publicIP)
self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat) self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat)
@ -204,17 +212,22 @@ class Worker(object):
# send heardbeat package to keep alive in etcd, ttl=2s # send heardbeat package to keep alive in etcd, ttl=2s
def sendheartbeat(self): def sendheartbeat(self):
while(True): if self.workertype == "normal":
# check send heartbeat package every 1s while(True):
time.sleep(2) # check send heartbeat package every 1s
[status, value] = self.etcd.getkey("machines/runnodes/"+self.addr) time.sleep(2)
if status: [status, value] = self.etcd.getkey("machines/runnodes/"+self.addr)
# master has know the worker so we start send heartbeat package if status:
if value=='ok': # master has know the worker so we start send heartbeat package
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 3) if value=='ok':
else: self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 3)
logger.error("get key %s failed, master may be crashed" % self.addr) else:
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) logger.error("get key %s failed, master may be crashed" % self.addr)
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60)
elif self.workertype == "batch":
while(True):
time.sleep(2)
self.etcd.setkey("machines/batchnodes/"+self.addr, "ok", ttl = 60)
if __name__ == '__main__': if __name__ == '__main__':