Merge branch 'batch' into master

This commit is contained in:
Yujian Zhu 2018-12-22 15:55:18 +08:00 committed by GitHub
commit b4989de64d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 3509 additions and 53 deletions

View File

@ -20,6 +20,8 @@ FS_PREFIX=/opt/docklet
# cluster net ip range, default is 172.16.0.1/16 # cluster net ip range, default is 172.16.0.1/16
CLUSTER_NET="172.16.0.1/16" CLUSTER_NET="172.16.0.1/16"
# ip addresses range of containers for batch job, default is 10.0.3.0/24
BATCH_NET="10.0.3.0/24"
#configurable-http-proxy public port, default is 8000 #configurable-http-proxy public port, default is 8000
PROXY_PORT=8000 PROXY_PORT=8000
#configurable-http-proxy api port, default is 8001 #configurable-http-proxy api port, default is 8001
@ -43,6 +45,13 @@ DAEMON_OPTS=
# The process ID of the script when it runs is stored here: # The process ID of the script when it runs is stored here:
PIDFILE=$RUN_DIR/$DAEMON_NAME.pid PIDFILE=$RUN_DIR/$DAEMON_NAME.pid
# settings for docklet batch worker, which is required for batch job processing system
BATCH_ON=True
DAEMON_BATCH=$DOCKLET_LIB/worker/taskcontroller.py
DAEMON_NAME_BATCH=docklet-taskcontroller
PIDFILE_BATCH=$RUN_DIR/batch.pid
DAEMON_OPTS_BATCH=
# settings for docklet proxy, which is required for web access # settings for docklet proxy, which is required for web access
DAEMON_PROXY=`which configurable-http-proxy` DAEMON_PROXY=`which configurable-http-proxy`
DAEMON_NAME_PROXY=docklet-proxy DAEMON_NAME_PROXY=docklet-proxy
@ -83,6 +92,7 @@ pre_start () {
# iptables for NAT network for containers to access web # iptables for NAT network for containers to access web
iptables -t nat -F iptables -t nat -F
iptables -t nat -A POSTROUTING -s $CLUSTER_NET -j MASQUERADE iptables -t nat -A POSTROUTING -s $CLUSTER_NET -j MASQUERADE
iptables -t nat -A POSTROUTING -s $BATCH_NET -j MASQUERADE
if [ ! -d $FS_PREFIX/local/basefs ]; then if [ ! -d $FS_PREFIX/local/basefs ]; then
log_daemon_msg "basefs does not exist, run prepare.sh first" && exit 1 log_daemon_msg "basefs does not exist, run prepare.sh first" && exit 1
@ -95,12 +105,27 @@ pre_start () {
do_start() { do_start() {
pre_start pre_start
DAEMON_OPTS=$1
log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX" log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX"
#python3 $DAEMON #python3 $DAEMON
start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS
log_end_msg $? log_end_msg $?
} }
do_start_batch () {
if [ "$BATCH_ON" = "False" ]
then
return 1
fi
log_daemon_msg "Starting $DAEMON_NAME_BATCH in $FS_PREFIX"
DAEMON_OPTS_BATCH=""
start-stop-daemon --start --background --pidfile $PIDFILE_BATCH --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON_BATCH -- $DAEMON_OPTS_BATCH
log_end_msg $?
}
do_start_proxy () { do_start_proxy () {
if [ "$DISTRIBUTED_GATEWAY" = "False" ] if [ "$DISTRIBUTED_GATEWAY" = "False" ]
then then
@ -118,6 +143,16 @@ do_stop () {
log_end_msg $? log_end_msg $?
} }
do_stop_batch () {
if [ "$BATCH_ON" = "False" ]
then
return 1
fi
log_daemon_msg "Stopping $DAEMON_NAME_BATCH daemon"
start-stop-daemon --stop --quiet --oknodo --remove-pidfile --pidfile $PIDFILE_BATCH --retry 10
log_end_msg $?
}
do_stop_proxy () { do_stop_proxy () {
if [ "$DISTRIBUTED_GATEWAY" = "False" ] if [ "$DISTRIBUTED_GATEWAY" = "False" ]
then then
@ -145,12 +180,14 @@ do_stop_meter() {
case "$1" in case "$1" in
start) start)
do_start do_start "normal-worker"
do_start_batch
do_start_proxy do_start_proxy
;; ;;
stop) stop)
do_stop do_stop
do_stop_batch
do_stop_proxy do_stop_proxy
;; ;;
start-meter) start-meter)
@ -161,6 +198,16 @@ case "$1" in
do_stop_meter do_stop_meter
;; ;;
start_batch)
do_start "batch-worker"
do_start_batch
;;
stop_batch)
do_stop
do_stop_batch
;;
start_proxy) start_proxy)
do_start_proxy do_start_proxy
;; ;;
@ -176,13 +223,16 @@ case "$1" in
restart) restart)
do_stop do_stop
do_stop_batch
do_stop_proxy do_stop_proxy
do_start do_start "normal-worker"
do_start_batch
do_start_proxy do_start_proxy
;; ;;
status) status)
status_of_proc -p $PIDFILE "$DAEMON" "$DAEMON_NAME" && exit 0 || exit $? status_of_proc -p $PIDFILE "$DAEMON" "$DAEMON_NAME" && exit 0 || exit $?
status_of_proc -p $PIDFILE_BATCH "$DAEMON_BATCH" "$DAEMON_NAME_BATCH" || status=$?
status_of_proc -p $PIDFILE_PROXY "$DAEMON_PROXY" "$DAEMON_NAME_PROXY" || status=$? status_of_proc -p $PIDFILE_PROXY "$DAEMON_PROXY" "$DAEMON_NAME_PROXY" || status=$?
;; ;;
*) *)

52
conf/container.batch.conf Normal file
View File

@ -0,0 +1,52 @@
# This is the common container.conf for all containers.
# If want set custom settings, you have two choices:
# 1. Directly modify this file, which is not recommend, because the
# setting will be overriden when new version container.conf released.
# 2. Use a custom config file in this conf directory: lxc.custom.conf,
# it uses the same grammer as container.conf, and will be merged
# with the default container.conf by docklet at runtime.
#
# The following is an example mounting user html directory
# lxc.mount.entry = /public/home/%USERNAME%/public_html %ROOTFS%/root/public_html none bind,rw,create=dir 0 0
#
#### include /usr/share/lxc/config/ubuntu.common.conf
lxc.include = /usr/share/lxc/config/ubuntu.common.conf
############## DOCKLET CONFIG ##############
# Setup 0 tty devices
lxc.tty = 0
lxc.rootfs = %ROOTFS%
lxc.utsname = %HOSTNAME%
lxc.network.type = veth
lxc.network.name = eth0
lxc.network.link = lxcbr0
lxc.network.flags = up
lxc.network.ipv4 = %IP%
lxc.network.ipv4.gateway = %GATEWAY%
lxc.cgroup.pids.max = 2048
lxc.cgroup.memory.limit_in_bytes = %CONTAINER_MEMORY%M
#lxc.cgroup.memory.kmem.limit_in_bytes = 512M
#lxc.cgroup.memory.soft_limit_in_bytes = 4294967296
#lxc.cgroup.memory.memsw.limit_in_bytes = 8589934592
# lxc.cgroup.cpu.cfs_period_us : period time of cpu, default 100000, means 100ms
# lxc.cgroup.cpu.cfs_quota_us : quota time of this process
lxc.cgroup.cpu.cfs_quota_us = %CONTAINER_CPU%
lxc.cap.drop = sys_admin net_admin mac_admin mac_override sys_time sys_module
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/data %ROOTFS%/root/nfs none bind,rw,create=dir 0 0
#lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/%CLUSTERID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/ssh %ROOTFS%/root/.ssh none bind,ro,create=dir 0 0
lxc.mount.entry = %FS_PREFIX%/local/temp/%LXCNAME%/ %ROOTFS%/tmp none bind,rw,create=dir 0 0
# setting hostname
lxc.hook.pre-start = HNAME=%HOSTNAME% %LXCSCRIPT%/lxc-prestart
# setting nfs softlink
#lxc.hook.mount = %LXCSCRIPT%/lxc-mount

View File

@ -182,3 +182,32 @@
# ALLOW_SCALE_OUT: allow docklet to rent server on the cloud to scale out # ALLOW_SCALE_OUT: allow docklet to rent server on the cloud to scale out
# Only when you deploy docklet on the cloud can you set it to True # Only when you deploy docklet on the cloud can you set it to True
# ALLOW_SCALE_OUT=False # ALLOW_SCALE_OUT=False
# ==================================================
#
# Batch Config
#
# ==================================================
# BATCH_ON: whether to start batch job processing system when start
# the docklet. Default: True
# BATCH_ON=True
# BATCH_MASTER_PORT: the rpc server port on master.
# default: 50050
# BATCH_MASTER_PORT=50050
# BATCH_WORKER_PORT: the rpc server port on worker.
# default: 50051
# BATCH_WORKER_PORT=50051
# BATCH_GATEWAY: the ip address of gateway for the containers processing
# batch jobs. default: 10.0.3.1
# BATCH_GATEWAY=10.0.3.1
# BATCH_NET: ip addresses range of containers for batch job, default is 10.0.3.0/24
# BATCH_NET=10.0.3.0/24
# BATCH_MAX_THREAD_WORKER: the maximun number of threads of the rpc server on
# the batch job worker. default:5
# BATCH_MAX_THREAD_WORKER=5

View File

@ -16,7 +16,7 @@ fi
# some packages' name maybe different in debian # some packages' name maybe different in debian
apt-get install -y cgmanager lxc lxcfs lxc-templates lvm2 bridge-utils curl exim4 openssh-server openvswitch-switch apt-get install -y cgmanager lxc lxcfs lxc-templates lvm2 bridge-utils curl exim4 openssh-server openvswitch-switch
apt-get install -y python3 python3-netifaces python3-flask python3-flask-sqlalchemy python3-pampy python3-httplib2 python3-pip apt-get install -y python3 python3-netifaces python3-flask python3-flask-sqlalchemy python3-pampy python3-httplib2 python3-pip
apt-get install -y python3-psutil python3-flask-migrate apt-get install -y python3-psutil python3-flask-migrate python3-paramiko
apt-get install -y python3-lxc apt-get install -y python3-lxc
apt-get install -y python3-requests python3-suds apt-get install -y python3-requests python3-suds
apt-get install -y nodejs nodejs-legacy npm apt-get install -y nodejs nodejs-legacy npm
@ -24,6 +24,9 @@ apt-get install -y etcd
apt-get install -y glusterfs-client attr apt-get install -y glusterfs-client attr
apt-get install -y nginx apt-get install -y nginx
pip3 install Flask-WTF pip3 install Flask-WTF
apt-get install -y gdebi-core
gdebi ossfs_1.80.5_ubuntu16.04_amd64.deb
pip3 install grpcio grpcio-tools googleapis-common-protos
#add ip forward #add ip forward
echo "net.ipv4.ip_forward=1" >>/etc/sysctl.conf echo "net.ipv4.ip_forward=1" >>/etc/sysctl.conf

View File

@ -27,7 +27,7 @@ import http.server, cgi, json, sys, shutil, traceback
import xmlrpc.client import xmlrpc.client
from socketserver import ThreadingMixIn from socketserver import ThreadingMixIn
from utils import etcdlib, imagemgr from utils import etcdlib, imagemgr
from master import nodemgr, vclustermgr, notificationmgr, lockmgr, cloudmgr from master import nodemgr, vclustermgr, notificationmgr, lockmgr, cloudmgr, jobmgr, taskmgr
from utils.logs import logs from utils.logs import logs
from master import userManager, beansapplicationmgr, monitor, sysmgr, network from master import userManager, beansapplicationmgr, monitor, sysmgr, network
from worker.monitor import History_Manager from worker.monitor import History_Manager
@ -790,6 +790,128 @@ def resetall_system(user, beans, form):
return json.dumps({'success':'false', 'message': message}) return json.dumps({'success':'false', 'message': message})
return json.dumps(result) return json.dumps(result)
@app.route("/batch/job/add/", methods=['POST'])
@login_required
def add_job(user,beans,form):
global G_jobmgr
job_data = form.to_dict()
job_info = {
'tasks': {}
}
message = {
'success': 'true',
'message': 'add batch job success'
}
for key in job_data:
key_arr = key.split('_')
value = job_data[key]
if key_arr[0] == 'srcAddr' and value == '':
task_idx = 'task_' + key_arr[1]
if task_idx in job_info['tasks']:
job_info['tasks'][task_idx]['srcAddr'] = '/root/nfs'
else:
job_info['tasks'][task_idx] = {
'srcAddr': '/root/nfs/'
}
elif key_arr[0] != 'dependency'and value == '':
message['success'] = 'false'
message['message'] = 'value of %s is null' % key
elif len(key_arr) == 1:
job_info[key_arr[0]] = value
elif len(key_arr) == 2:
key_prefix, task_idx = key_arr[0], key_arr[1]
task_idx = 'task_' + task_idx
if task_idx in job_info["tasks"]:
job_info["tasks"][task_idx][key_prefix] = value
else:
tmp_dict = {
key_prefix: value
}
job_info["tasks"][task_idx] = tmp_dict
elif len(key_arr) == 3:
key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2]
task_idx = 'task_' + task_idx
mapping_idx = 'mapping_' + mapping_idx
if task_idx in job_info["tasks"]:
if "mapping" in job_info["tasks"][task_idx]:
if mapping_idx in job_info["tasks"][task_idx]["mapping"]:
job_info["tasks"][task_idx]["mapping"][mapping_idx][key_prefix] = value
else:
tmp_dict = {
key_prefix: value
}
job_info["tasks"][task_idx]["mapping"][mapping_idx] = tmp_dict
else:
job_info["tasks"][task_idx]["mapping"] = {
mapping_idx: {
key_prefix: value
}
}
else:
tmp_dict = {
"mapping":{
mapping_idx: {
key_prefix: value
}
}
}
job_info["tasks"][task_idx] = tmp_dict
logger.debug('batch job adding info %s' % json.dumps(job_info, indent=4))
[status, msg] = G_jobmgr.add_job(user, job_info)
if status:
return json.dumps(message)
else:
logger.debug('fail to add batch job: %s' % msg)
message["success"] = "false"
message["message"] = msg
return json.dumps(message)
return json.dumps(message)
@app.route("/batch/job/list/", methods=['POST'])
@login_required
def list_job(user,beans,form):
global G_jobmgr
result = {
'success': 'true',
'data': G_jobmgr.list_jobs(user)
}
return json.dumps(result)
@app.route("/batch/job/output/", methods=['POST'])
@login_required
def get_output(user,beans,form):
global G_jobmgr
jobid = form.get("jobid","")
taskid = form.get("taskid","")
instid = form.get("instid","")
issue = form.get("issue","")
result = {
'success': 'true',
'data': G_jobmgr.get_output(user,jobid,taskid,instid,issue)
}
return json.dumps(result)
@app.route("/batch/job/info/", methods=['POST'])
@login_required
def info_job(user,beans,form):
pass
@app.route("/batch/task/info/", methods=['POST'])
@login_required
def info_task(user,beans,form):
pass
@app.route("/batch/vnodes/list/", methods=['POST'])
@login_required
def batch_vnodes_list(user,beans,form):
global G_taskmgr
result = {
'success': 'true',
'data': G_taskmgr.get_user_batch_containers(user)
}
return json.dumps(result)
# @app.route("/inside/cluster/scaleout/", methods=['POST']) # @app.route("/inside/cluster/scaleout/", methods=['POST'])
# @inside_ip_required # @inside_ip_required
# def inside_cluster_scalout(cur_user, cluster_info, form): # def inside_cluster_scalout(cur_user, cluster_info, form):
@ -857,6 +979,8 @@ if __name__ == '__main__':
global G_applicationmgr global G_applicationmgr
global G_ulockmgr global G_ulockmgr
global G_cloudmgr global G_cloudmgr
global G_jobmgr
global G_taskmgr
# move 'tools.loadenv' to the beginning of this file # move 'tools.loadenv' to the beginning of this file
fs_path = env.getenv("FS_PREFIX") fs_path = env.getenv("FS_PREFIX")
@ -973,4 +1097,10 @@ if __name__ == '__main__':
# server = http.server.HTTPServer((masterip, masterport), DockletHttpHandler) # server = http.server.HTTPServer((masterip, masterport), DockletHttpHandler)
logger.info("starting master server") logger.info("starting master server")
G_taskmgr = taskmgr.TaskMgr(G_nodemgr, monitor.Fetcher)
G_jobmgr = jobmgr.JobMgr(G_taskmgr)
G_taskmgr.set_jobmgr(G_jobmgr)
G_taskmgr.start()
G_jobmgr.start()
app.run(host = masterip, port = masterport, threaded=True) app.run(host = masterip, port = masterport, threaded=True)

180
src/master/jobmgr.py Normal file
View File

@ -0,0 +1,180 @@
import time, threading, random, string, os, traceback
import master.monitor
import subprocess
from utils.log import initlogging, logger
from utils import env
class BatchJob(object):
def __init__(self, user, job_info):
self.user = user
self.raw_job_info = job_info
self.task_queue = []
self.task_finished = []
self.job_id = None
self.job_name = job_info['jobName']
self.job_priority = int(job_info['jobPriority'])
self.status = 'pending'
self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
self.top_sort()
# transfer the dependency graph into a job queue
def top_sort(self):
logger.debug('top sorting')
tasks = self.raw_job_info["tasks"]
dependency_graph = {}
for task_idx in tasks:
dependency_graph[task_idx] = set()
task_info = tasks[task_idx]
dependency = task_info['dependency'].strip().replace(' ', '').split(',')
if len(dependency) == 1 and dependency[0] == '':
continue
for t in dependency:
if not t in tasks:
raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx))
dependency_graph[task_idx].add(t)
while len(dependency_graph) > 0:
s = set()
flag = False
for task_idx in dependency_graph:
if len(dependency_graph[task_idx]) == 0:
flag = True
s.add(task_idx)
for task_idx in s:
dependency_graph.pop(task_idx)
#there is a circle in the graph
if not flag:
raise ValueError('there is a circle in the dependency graph')
break
for task_idx in dependency_graph:
for t in s:
if t in dependency_graph[task_idx]:
dependency_graph[task_idx].remove(t)
self.task_queue.append({
'task_idx': s,
'status': 'pending'
})
# get a task and pass it to taskmgr
def get_task(self):
for task in self.task_queue:
if task['status'] == 'pending':
task_idx = task['task_idx'].pop()
task['status'] = 'running'
task_name = self.user + '_' + self.job_id + '_' + task_idx
return task_name, self.raw_job_info["tasks"][task_idx]
return '', None
# a task has finished
def finish_task(self, task_idx):
pass
class JobMgr(threading.Thread):
# load job information from etcd
# initial a job queue and job schedueler
def __init__(self, taskmgr):
threading.Thread.__init__(self)
self.job_queue = []
self.job_map = {}
self.taskmgr = taskmgr
self.fspath = env.getenv('FS_PREFIX')
def run(self):
while True:
self.job_scheduler()
time.sleep(2)
# user: username
# job_data: a json string
# user submit a new job, add this job to queue and database
def add_job(self, user, job_info):
try:
job = BatchJob(user, job_info)
job.job_id = self.gen_jobid()
self.job_queue.append(job.job_id)
self.job_map[job.job_id] = job
except ValueError as err:
return [False, err.args[0]]
except Exception as err:
return [False, err.args[0]]
finally:
return [True, "add batch job success"]
# user: username
# list a user's all job
def list_jobs(self,user):
res = []
for job_id in self.job_queue:
job = self.job_map[job_id]
logger.debug('job_id: %s, user: %s' % (job_id, job.user))
if job.user == user:
all_tasks = job.raw_job_info['tasks']
tasks_instCount = {}
for task in all_tasks.keys():
tasks_instCount[task] = int(all_tasks[task]['instCount'])
res.append({
'job_name': job.job_name,
'job_id': job.job_id,
'status': job.status,
'create_time': job.create_time,
'tasks': list(all_tasks.keys()),
'tasks_instCount': tasks_instCount
})
return res
# user: username
# jobid: the id of job
# get the information of a job, including the status, json description and other information
# call get_task to get the task information
def get_job(self, user, job_id):
pass
# check if a job exists
def is_job_exist(self, job_id):
return job_id in self.job_queue
# generate a random job id
def gen_jobid(self):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
while self.is_job_exist(job_id):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
return job_id
# this is a thread to process a job
def job_processor(self, job):
task_name, task_info = job.get_task()
if not task_info:
return False
else:
task_priority = job.job_priority
self.taskmgr.add_task(job.user, task_name, task_info, task_priority)
return True
# this is a thread to schedule the jobs
def job_scheduler(self):
# choose a job from queue, create a job processor for it
for job_id in self.job_queue:
job = self.job_map[job_id]
if self.job_processor(job):
job.status = 'running'
break
else:
job.status = 'done'
# a task has finished
def report(self, task):
pass
def get_output(self, username, jobid, taskid, instid, issue):
filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt"
fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename)
logger.info("Get output from:%s" % fpath)
try:
ret = subprocess.run('tail -n 100 ' + fpath,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
if ret.returncode != 0:
raise IOError(ret.stdout.decode(encoding="utf-8"))
except Exception as err:
logger.error(traceback.format_exc())
return ""
else:
return ret.stdout.decode(encoding="utf-8")

View File

@ -47,6 +47,8 @@ class NodeMgr(object):
# get allnodes # get allnodes
self.allnodes = self._nodelist_etcd("allnodes") self.allnodes = self._nodelist_etcd("allnodes")
self.runnodes = [] self.runnodes = []
self.batchnodes = []
self.allrunnodes = []
[status, runlist] = self.etcd.listdir("machines/runnodes") [status, runlist] = self.etcd.listdir("machines/runnodes")
for node in runlist: for node in runlist:
nodeip = node['key'].rsplit('/',1)[1] nodeip = node['key'].rsplit('/',1)[1]
@ -140,6 +142,14 @@ class NodeMgr(object):
#print(etcd_runip) #print(etcd_runip)
#print(self.rpcs) #print(self.rpcs)
self.runnodes = etcd_runip self.runnodes = etcd_runip
self.batchnodes = self.runnodes.copy()
self.allrunnodes = self.runnodes.copy()
[status, batchlist] = self.etcd.listdir("machines/batchnodes")
if status:
for node in batchlist:
nodeip = node['key'].rsplit('/', 1)[1]
self.batchnodes.append(nodeip)
self.allrunnodes.append(nodeip)
def recover_node(self,ip,tasks): def recover_node(self,ip,tasks):
logger.info("now recover for worker:%s" % ip) logger.info("now recover for worker:%s" % ip)
@ -152,14 +162,19 @@ class NodeMgr(object):
# get all run nodes' IP addr # get all run nodes' IP addr
def get_nodeips(self): def get_nodeips(self):
return self.runnodes return self.allrunnodes
def get_batch_nodeips(self):
return self.batchnodes
def get_base_nodeips(self):
return self.runnodes
def get_allnodes(self): def get_allnodes(self):
return self.allnodes return self.allnodes
def ip_to_rpc(self,ip): def ip_to_rpc(self,ip):
if ip in self.runnodes: if ip in self.allrunnodes:
return xmlrpc.client.ServerProxy("http://%s:%s" % (ip, env.getenv("WORKER_PORT"))) return xmlrpc.client.ServerProxy("http://%s:%s" % (ip, env.getenv("WORKER_PORT")))
else: else:
logger.info('Worker %s is not connected, create rpc client failed, push task into queue') logger.info('Worker %s is not connected, create rpc client failed, push task into queue')

55
src/master/parser.py Normal file
View File

@ -0,0 +1,55 @@
#!/user/bin/python3
import json
job_data = {'image_1': 'base_base_base', 'mappingRemoteDir_2_2': 'sss', 'dependency_1': 'aaa', 'mappingLocalDir_2_1': 'xxx', 'mappingLocalDir_1_2': 'aaa', 'mappingLocalDir_1_1': 'aaa', 'mappingLocalDir_2_3': 'fdsffdf', 'mappingRemoteDir_1_1': 'ddd', 'mappingRemoteDir_2_3': 'sss', 'srcAddr_1': 'aaa', 'mappingSource_2_1': 'Aliyun', 'cpuSetting_1': '1', 'mappingSource_2_2': 'Aliyun', 'retryCount_2': '1', 'mappingSource_1_1': 'Aliyun', 'expTime_1': '60', 'diskSetting_2': '1024', 'diskSetting_1': '1024', 'dependency_2': 'ddd', 'memorySetting_1': '1024', 'command_2': 'ccc', 'mappingRemoteDir_1_2': 'ddd', 'gpuSetting_2': '0', 'memorySetting_2': '1024', 'gpuSetting_1': '0', 'mappingLocalDir_2_2': 'bbb', 'mappingSource_1_2': 'Aliyun', 'expTime_2': '60', 'mappingRemoteDir_2_1': 'vvv', 'srcAddr_2': 'fff', 'cpuSetting_2': '1', 'instCount_1': '1', 'mappingSource_2_3': 'Aliyun', 'token': 'ZXlKaGJHY2lPaUpJVXpJMU5pSXNJbWxoZENJNk1UVXpNelE0TVRNMU5Td2laWGh3SWpveE5UTXpORGcwT1RVMWZRLmV5SnBaQ0k2TVgwLkF5UnRnaGJHZXhJY2lBSURZTUd5eXZIUVJnUGd1ZTA3OEtGWkVoejJVMkE=', 'instCount_2': '1', 'retryCount_1': '1', 'command_1': 'aaa', 'jobPriority': '0', 'image_2': 'base_base_base', 'jobName': 'aaa'}
def parse(job_data):
job_info = {}
message = {}
for key in job_data:
key_arr = key.split('_')
value = job_data[key]
if len(key_arr) == 1:
job_info[key_arr[0]] = value
elif len(key_arr) == 2:
key_prefix, task_idx = key_arr[0], key_arr[1]
task_idx = 'task_' + task_idx
if task_idx in job_info:
job_info[task_idx][key_prefix] = value
else:
tmp_dict = {
key_prefix: value
}
job_info[task_idx] = tmp_dict
elif len(key_arr) == 3:
key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2]
task_idx = 'task_' + task_idx
mapping_idx = 'mapping_' + mapping_idx
if task_idx in job_info:
if "mapping" in job_info[task_idx]:
if mapping_idx in job_info[task_idx]["mapping"]:
job_info[task_idx]["mapping"][mapping_idx][key_prefix] = value
else:
tmp_dict = {
key_prefix: value
}
job_info[task_idx]["mapping"][mapping_idx] = tmp_dict
else:
job_info[task_idx]["mapping"] = {
mapping_idx: {
key_prefix: value
}
}
else:
tmp_dict = {
"mapping":{
mapping_idx: {
key_prefix: value
}
}
}
job_info[task_idx] = tmp_dict
print(json.dumps(job_info, indent=4))
if __name__ == '__main__':
parse(job_data)

411
src/master/taskmgr.py Normal file
View File

@ -0,0 +1,411 @@
import threading
import time
import string
import random
import json
from functools import wraps
# must import logger after initlogging, ugly
from utils.log import logger
# grpc
from concurrent import futures
import grpc
from protos.rpc_pb2 import *
from protos.rpc_pb2_grpc import MasterServicer, add_MasterServicer_to_server, WorkerStub
from utils import env
class Task():
def __init__(self, info, priority):
self.info = info
self.status = WAITING
self.instance_list = []
self.token = ''
# priority the bigger the better
# self.priority the smaller the better
self.priority = int(time.time()) / 60 / 60 - priority
def __lt__(self, other):
return self.priority < other.priority
class TaskReporter(MasterServicer):
def __init__(self, taskmgr):
self.taskmgr = taskmgr
def report(self, request, context):
for task_report in request.taskmsgs:
self.taskmgr.on_task_report(task_report)
return Reply(status=Reply.ACCEPTED, message='')
class TaskMgr(threading.Thread):
# load task information from etcd
# initial a task queue and task schedueler
# taskmgr: a taskmgr instance
def __init__(self, nodemgr, monitor_fetcher, scheduler_interval=2, external_logger=None):
threading.Thread.__init__(self)
self.thread_stop = False
self.jobmgr = None
self.task_queue = []
self.lazy_append_list = []
self.lazy_delete_list = []
self.task_queue_lock = threading.Lock()
self.user_containers = {}
self.scheduler_interval = scheduler_interval
self.logger = logger
self.master_port = env.getenv('BATCH_MASTER_PORT')
self.worker_port = env.getenv('BATCH_WORKER_PORT')
# nodes
self.nodemgr = nodemgr
self.monitor_fetcher = monitor_fetcher
self.cpu_usage = {}
self.gpu_usage = {}
# self.all_nodes = None
# self.last_nodes_info_update_time = 0
# self.nodes_info_update_interval = 30 # (s)
def queue_lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
self.task_queue_lock.acquire()
result = f(self, *args, **kwargs)
self.task_queue_lock.release()
return result
return new_f
def run(self):
self.serve()
while not self.thread_stop:
self.sort_out_task_queue()
task, instance_id, worker = self.task_scheduler()
if task is not None and worker is not None:
self.task_processor(task, instance_id, worker)
else:
time.sleep(self.scheduler_interval)
def serve(self):
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_MasterServicer_to_server(TaskReporter(self), self.server)
self.server.add_insecure_port('[::]:' + self.master_port)
self.server.start()
self.logger.info('[taskmgr_rpc] start rpc server')
def stop(self):
self.thread_stop = True
self.server.stop(0)
self.logger.info('[taskmgr_rpc] stop rpc server')
# this method is called when worker send heart-beat rpc request
def on_task_report(self, report):
self.logger.info('[on_task_report] receive task report: id %s-%d, status %d' % (report.taskid, report.instanceid, report.instanceStatus))
task = self.get_task(report.taskid)
if task == None:
self.logger.error('[on_task_report] task not found')
return
instance = task.instance_list[report.instanceid]
if instance['token'] != report.token:
self.logger.warning('[on_task_report] wrong token')
return
username = task.info.username
container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token
self.user_containers[username].remove(container_name)
if instance['status'] != RUNNING:
self.logger.error('[on_task_report] receive task report when instance is not running')
if instance['status'] == RUNNING and report.instanceStatus != RUNNING:
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
instance['status'] = report.instanceStatus
instance['error_msg'] = report.errmsg
if report.instanceStatus == COMPLETED:
self.check_task_completed(task)
elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT:
if instance['try_count'] > task.info.maxRetryCount:
self.check_task_completed(task)
elif report.instanceStatus == OUTPUTERROR:
self.task_failed(task)
def check_task_completed(self, task):
if len(task.instance_list) < task.info.instanceCount:
return
failed = False
for instance in task.instance_list:
if instance['status'] == RUNNING or instance['status'] == WAITING:
return
if instance['status'] == FAILED or instance['status'] == TIMEOUT:
if instance['try_count'] > task.info.maxRetryCount:
failed = True
else:
return
if instance['status'] == OUTPUTERROR:
failed = True
break
if failed:
self.task_failed(task)
else:
self.task_completed(task)
def task_completed(self, task):
task.status = COMPLETED
if self.jobmgr is None:
self.logger.error('[task_completed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.logger.info('task %s completed' % task.info.id)
self.lazy_delete_list.append(task)
def task_failed(self, task):
task.status = FAILED
if self.jobmgr is None:
self.logger.error('[task_failed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.logger.info('task %s failed' % task.info.id)
self.lazy_delete_list.append(task)
@queue_lock
def sort_out_task_queue(self):
while self.lazy_delete_list:
task = self.lazy_delete_list.pop(0)
self.task_queue.remove(task)
if self.lazy_append_list:
while self.lazy_append_list:
task = self.lazy_append_list.pop(0)
self.task_queue.append(task)
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
def task_processor(self, task, instance_id, worker_ip):
task.status = RUNNING
# properties for transaction
task.info.instanceid = instance_id
task.info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
instance = task.instance_list[instance_id]
instance['status'] = RUNNING
instance['try_count'] += 1
instance['token'] = task.info.token
instance['worker'] = worker_ip
self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu
self.gpu_usage[worker_ip] += task.info.cluster.instance.gpu
username = task.info.username
container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token
if not username in self.user_containers.keys():
self.user_containers[username] = []
self.user_containers[username].append(container_name)
try:
self.logger.info('[task_processor] processing task [%s] instance [%d]' % (task.info.id, task.info.instanceid))
channel = grpc.insecure_channel('%s:%s' % (worker_ip, self.worker_port))
stub = WorkerStub(channel)
response = stub.process_task(task.info)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
instance['status'] = FAILED
instance['try_count'] -= 1
self.user_containers[username].remove(container_name)
# return task, worker
def task_scheduler(self):
# simple FIFO with priority
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
# nodes = self.get_all_nodes()
# if nodes is None or len(nodes) == 0:
# self.logger.info('[task_scheduler] no nodes found')
# else:
# for worker_ip, worker_info in nodes:
# self.logger.info('[task_scheduler] nodes %s' % worker_ip)
# for key in worker_info:
# if key == 'cpu':
# self.logger.info('[task_scheduler] %s: %d/%d' % (key, self.get_cpu_usage(worker_ip), worker_info[key]))
# else:
# self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key]))
for task in self.task_queue:
if task in self.lazy_delete_list:
continue
worker = self.find_proper_worker(task)
for index, instance in enumerate(task.instance_list):
# find instance to retry
if (instance['status'] == FAILED or instance['status'] == TIMEOUT) and instance['try_count'] <= task.info.maxRetryCount:
if worker is not None:
self.logger.info('[task_scheduler] retry')
return task, index, worker
# find timeout instance
elif instance['status'] == RUNNING:
if not self.is_alive(instance['worker']):
instance['status'] = FAILED
instance['token'] = ''
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index))
if worker is not None:
return task, index, worker
if worker is not None:
# start new instance
if len(task.instance_list) < task.info.instanceCount:
instance = {}
instance['try_count'] = 0
task.instance_list.append(instance)
return task, len(task.instance_list) - 1, worker
self.check_task_completed(task)
return None, None, None
def find_proper_worker(self, task):
nodes = self.get_all_nodes()
if nodes is None or len(nodes) == 0:
self.logger.warning('[task_scheduler] running nodes not found')
return None
for worker_ip, worker_info in nodes:
if task.info.cluster.instance.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']:
continue
if task.info.cluster.instance.memory > worker_info['memory']:
continue
# try not to assign non-gpu task to a worker with gpu
if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0:
continue
if task.info.cluster.instance.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']:
continue
return worker_ip
return None
def get_all_nodes(self):
# cache running nodes
# if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
# return self.all_nodes
# get running nodes
node_ips = self.nodemgr.get_batch_nodeips()
all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips]
return all_nodes
def is_alive(self, worker):
nodes = self.nodemgr.get_batch_nodeips()
return worker in nodes
def get_worker_resource_info(self, worker_ip):
fetcher = self.monitor_fetcher(worker_ip)
worker_info = fetcher.info
info = {}
info['cpu'] = len(worker_info['cpuconfig'])
info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb)
info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb)
info['gpu'] = len(worker_info['gpuinfo'])
return info
def get_cpu_usage(self, worker_ip):
try:
return self.cpu_usage[worker_ip]
except:
self.cpu_usage[worker_ip] = 0
return 0
def get_gpu_usage(self, worker_ip):
try:
return self.gpu_usage[worker_ip]
except:
self.gpu_usage[worker_ip] = 0
return 0
def set_jobmgr(self, jobmgr):
self.jobmgr = jobmgr
# save the task information into database
# called when jobmgr assign task to taskmgr
def add_task(self, username, taskid, json_task, task_priority=1):
# decode json string to object defined in grpc
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
image_dict = {
"private": Image.PRIVATE,
"base": Image.BASE,
"public": Image.PUBLIC
}
# json_task = json.loads(json_task)
task = Task(TaskInfo(
id = taskid,
username = username,
instanceCount = int(json_task['instCount']),
maxRetryCount = int(json_task['retryCount']),
timeout = int(json_task['expTime']),
parameters = Parameters(
command = Command(
commandLine = json_task['command'],
packagePath = json_task['srcAddr'],
envVars = {}),
stderrRedirectPath = json_task.get('stdErrRedPth',""),
stdoutRedirectPath = json_task.get('stdOutRedPth',"")),
cluster = Cluster(
image = Image(
name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'],
type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'],
owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']),
instance = Instance(
cpu = int(json_task['cpuSetting']),
memory = int(json_task['memorySetting']),
disk = int(json_task['diskSetting']),
gpu = int(json_task['gpuSetting'])))),
priority=task_priority)
if 'mapping' in json_task:
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
for mapping_key in json_task['mapping']])
self.lazy_append_list.append(task)
# user: username
# get the information of a task, including the status, task description and other information
@queue_lock
def get_task(self, taskid):
for task in self.task_queue:
if task.info.id == taskid:
return task
return None
# get names of all the batch containers of the user
def get_user_batch_containers(self,username):
if not username in self.user_containers.keys():
return []
else:
return self.user_containers[username]

View File

@ -0,0 +1,39 @@
import sys
if sys.path[0].endswith("master"):
sys.path[0] = sys.path[0][:-6]
import grpc,time
from protos import rpc_pb2, rpc_pb2_grpc
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = rpc_pb2_grpc.WorkerStub(channel)
comm = rpc_pb2.Command(commandLine="ls /root/oss/test-for-docklet", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/oss/test-for-docklet/", stdoutRedirectPath="/root/oss/test-for-docklet/")
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
inst = rpc_pb2.Instance(cpu=1, memory=1000, disk=1000, gpu=0)
mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="")
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt])
task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token="test")
response = stub.process_task(task)
print("Batch client received: " + str(response.status)+" "+response.message)
def stop_task():
channel = grpc.insecure_channel('localhost:50051')
stub = rpc_pb2_grpc.WorkerStub(channel)
taskmsg = rpc_pb2.TaskMsg(taskid="test",username="root",instanceid=1,instanceStatus=rpc_pb2.COMPLETED,token="test",errmsg="")
reportmsg = rpc_pb2.ReportMsg(taskmsgs = [taskmsg])
response = stub.stop_tasks(reportmsg)
print("Batch client received: " + str(response.status)+" "+response.message)
if __name__ == '__main__':
run()
#time.sleep(4)
#stop_task()

181
src/master/testTaskMgr.py Normal file
View File

@ -0,0 +1,181 @@
import master.taskmgr
from concurrent import futures
import grpc
from protos.rpc_pb2 import *
from protos.rpc_pb2_grpc import *
import threading, json, time, random
from utils import env
class SimulatedNodeMgr():
def get_batch_nodeips(self):
return ['0.0.0.0']
class SimulatedMonitorFetcher():
def __init__(self, ip):
self.info = {}
self.info['cpuconfig'] = [1,1,1,1]
self.info['meminfo'] = {}
self.info['meminfo']['free'] = 4 * 1024 * 1024 # (kb) simulate 4 GB memory
self.info['diskinfo'] = []
self.info['diskinfo'].append({})
self.info['diskinfo'][0]['free'] = 8 * 1024 * 1024 * 1024 # (b) simulate 8 GB disk
class SimulatedTaskController(WorkerServicer):
def __init__(self, worker):
self.worker = worker
def process_task(self, task, context):
print('[SimulatedTaskController] receive task [%s] instanceid [%d] token [%s]' % (task.id, task.instanceid, task.token))
worker.process(task)
return Reply(status=Reply.ACCEPTED,message="")
class SimulatedWorker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.thread_stop = False
self.tasks = []
def run(self):
worker_port = env.getenv('BATCH_WORKER_PORT')
server = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
add_WorkerServicer_to_server(SimulatedTaskController(self), server)
server.add_insecure_port('[::]:' + worker_port)
server.start()
while not self.thread_stop:
for task in self.tasks:
seed = random.random()
if seed < 0.25:
report(task.id, task.instanceid, RUNNING, task.token)
elif seed < 0.5:
report(task.id, task.instanceid, COMPLETED, task.token)
self.tasks.remove(task)
elif seed < 0.75:
report(task.id, task.instanceid, FAILED, task.token)
self.tasks.remove(task)
else:
pass
time.sleep(5)
server.stop(0)
def stop(self):
self.thread_stop = True
def process(self, task):
self.tasks.append(task)
class SimulatedJobMgr(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.thread_stop = False
def run(self):
while not self.thread_stop:
time.sleep(5)
server.stop(0)
def stop(self):
self.thread_stop = True
def report(self, task):
print('[SimulatedJobMgr] task[%s] status %d' % (task.info.id, task.status))
def assignTask(self, taskmgr, taskid, instance_count, retry_count, timeout, cpu, memory, disk):
task = {}
task['instanceCount'] = instance_count
task['maxRetryCount'] = retry_count
task['timeout'] = timeout
task['parameters'] = {}
task['parameters']['command'] = {}
task['parameters']['command']['commandLine'] = 'ls'
task['parameters']['command']['packagePath'] = ''
task['parameters']['command']['envVars'] = {'a':'1'}
task['parameters']['stderrRedirectPath'] = ''
task['parameters']['stdoutRedirectPath'] = ''
task['cluster'] = {}
task['cluster']['image'] = {}
task['cluster']['image']['name'] = ''
task['cluster']['image']['type'] = 1
task['cluster']['image']['owner'] = ''
task['cluster']['instance'] = {}
task['cluster']['instance']['cpu'] = cpu
task['cluster']['instance']['memory'] = memory
task['cluster']['instance']['disk'] = disk
task['cluster']['instance']['gpu'] = 0
task['cluster']['mount'] = [{'remotePath':'', 'localPath':''}]
taskmgr.add_task('root', taskid, json.dumps(task))
class SimulatedLogger():
def info(self, msg):
print('[INFO] ' + msg)
def warning(self, msg):
print('[WARNING] ' + msg)
def error(self, msg):
print('[ERROR] ' + msg)
def test():
global worker
global jobmgr
global taskmgr
worker = SimulatedWorker()
worker.start()
jobmgr = SimulatedJobMgr()
jobmgr.start()
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, external_logger=SimulatedLogger())
taskmgr.set_jobmgr(jobmgr)
taskmgr.start()
add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048)
def test2():
global jobmgr
global taskmgr
jobmgr = SimulatedJobMgr()
jobmgr.start()
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, external_logger=SimulatedLogger())
taskmgr.set_jobmgr(jobmgr)
taskmgr.start()
add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048)
def add(taskid, instance_count, retry_count, timeout, cpu, memory, disk):
global jobmgr
global taskmgr
jobmgr.assignTask(taskmgr, taskid, instance_count, retry_count, timeout, cpu, memory, disk)
def report(taskid, instanceid, status, token):
global taskmgr
master_port = env.getenv('BATCH_MASTER_PORT')
channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port))
stub = MasterStub(channel)
response = stub.report(ReportMsg(taskmsgs=TaskMsg(taskid=taskid, instanceid=instanceid, instanceStatus=status, token=token)))
def stop():
global worker
global jobmgr
global taskmgr
worker.stop()
jobmgr.stop()
taskmgr.stop()

View File

@ -120,7 +120,7 @@ class VclusterMgr(object):
return [False, "the size of disk is not big enough for the image"] return [False, "the size of disk is not big enough for the image"]
clustersize = int(self.defaultsize) clustersize = int(self.defaultsize)
logger.info ("starting cluster %s with %d containers for %s" % (clustername, int(clustersize), username)) logger.info ("starting cluster %s with %d containers for %s" % (clustername, int(clustersize), username))
workers = self.nodemgr.get_nodeips() workers = self.nodemgr.get_base_nodeips()
image_json = json.dumps(image) image_json = json.dumps(image)
groupname = json.loads(user_info)["data"]["group"] groupname = json.loads(user_info)["data"]["group"]
groupquota = json.loads(user_info)["data"]["groupinfo"] groupquota = json.loads(user_info)["data"]["groupinfo"]
@ -206,7 +206,7 @@ class VclusterMgr(object):
return [False, "cluster:%s not found" % clustername] return [False, "cluster:%s not found" % clustername]
if self.imgmgr.get_image_size(image) + 100 > int(setting["disk"]): if self.imgmgr.get_image_size(image) + 100 > int(setting["disk"]):
return [False, "the size of disk is not big enough for the image"] return [False, "the size of disk is not big enough for the image"]
workers = self.nodemgr.get_nodeips() workers = self.nodemgr.get_base_nodeips()
if (len(workers) == 0): if (len(workers) == 0):
logger.warning("no workers to start containers, scale out failed") logger.warning("no workers to start containers, scale out failed")
return [False, "no workers are running"] return [False, "no workers are running"]

101
src/protos/rpc.proto Normal file
View File

@ -0,0 +1,101 @@
syntax = "proto3";
service Master {
rpc report (ReportMsg) returns (Reply) {}
}
service Worker {
rpc process_task (TaskInfo) returns (Reply) {}
rpc stop_tasks (ReportMsg) returns (Reply) {}
}
message Reply {
ReplyStatus status = 1; //
string message = 2;
enum ReplyStatus {
ACCEPTED = 0;
REFUSED = 1;
}
}
message ReportMsg {
repeated TaskMsg taskmsgs = 1;
}
message TaskMsg {
string taskid = 1;
string username = 2;
int32 instanceid = 3;
Status instanceStatus = 4; //
string token = 5;
string errmsg = 6;
}
enum Status {
WAITING = 0;
RUNNING = 1;
COMPLETED = 2;
FAILED = 3;
TIMEOUT = 4;
OUTPUTERROR = 5;
}
message TaskInfo {
string id = 1;
string username = 2;
int32 instanceid = 3;
int32 instanceCount = 4; //
int32 maxRetryCount = 5; //
Parameters parameters = 6; //
Cluster cluster = 7; //
int32 timeout = 8; //
string token = 9;
bool reused = 10; //
}
message Parameters {
Command command = 1; //
string stderrRedirectPath = 2; //
string stdoutRedirectPath = 3; //
}
message Command {
string commandLine = 1; //
string packagePath = 2; //
map<string, string> envVars = 3; //
}
message Cluster {
Image image = 1; //
Instance instance = 2; //
repeated Mount mount = 3; //
}
message Image {
string name = 1; //
ImageType type = 2; // public/private)
string owner = 3; //
enum ImageType {
BASE = 0;
PUBLIC = 1;
PRIVATE = 2;
}
}
message Mount {
string provider = 1;
string localPath = 2; //
string remotePath = 3; //
string accessKey = 4;
string secretKey = 5;
string other = 6;
}
message Instance {
int32 cpu = 1; // CPU
int32 memory = 2; // mb
int32 disk = 3; // mb
int32 gpu = 4; //
}

852
src/protos/rpc_pb2.py Normal file
View File

@ -0,0 +1,852 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: rpc.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf.internal import enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='rpc.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"\x7f\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"\xd6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\x12\x0e\n\x06reused\x18\n \x01(\x08\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32Q\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12\"\n\nstop_tasks\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x62\x06proto3')
)
_STATUS = _descriptor.EnumDescriptor(
name='Status',
full_name='Status',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='WAITING', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='RUNNING', index=1, number=1,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='COMPLETED', index=2, number=2,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='FAILED', index=3, number=3,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='TIMEOUT', index=4, number=4,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='OUTPUTERROR', index=5, number=5,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=1134,
serialized_end=1225,
)
_sym_db.RegisterEnumDescriptor(_STATUS)
Status = enum_type_wrapper.EnumTypeWrapper(_STATUS)
WAITING = 0
RUNNING = 1
COMPLETED = 2
FAILED = 3
TIMEOUT = 4
OUTPUTERROR = 5
_REPLY_REPLYSTATUS = _descriptor.EnumDescriptor(
name='ReplyStatus',
full_name='Reply.ReplyStatus',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='ACCEPTED', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='REFUSED', index=1, number=1,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=75,
serialized_end=115,
)
_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS)
_IMAGE_IMAGETYPE = _descriptor.EnumDescriptor(
name='ImageType',
full_name='Image.ImageType',
filename=None,
file=DESCRIPTOR,
values=[
_descriptor.EnumValueDescriptor(
name='BASE', index=0, number=0,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='PUBLIC', index=1, number=1,
options=None,
type=None),
_descriptor.EnumValueDescriptor(
name='PRIVATE', index=2, number=2,
options=None,
type=None),
],
containing_type=None,
options=None,
serialized_start=899,
serialized_end=945,
)
_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE)
_REPLY = _descriptor.Descriptor(
name='Reply',
full_name='Reply',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='status', full_name='Reply.status', index=0,
number=1, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='message', full_name='Reply.message', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
_REPLY_REPLYSTATUS,
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=13,
serialized_end=115,
)
_REPORTMSG = _descriptor.Descriptor(
name='ReportMsg',
full_name='ReportMsg',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='taskmsgs', full_name='ReportMsg.taskmsgs', index=0,
number=1, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=117,
serialized_end=156,
)
_TASKMSG = _descriptor.Descriptor(
name='TaskMsg',
full_name='TaskMsg',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='taskid', full_name='TaskMsg.taskid', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='username', full_name='TaskMsg.username', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceid', full_name='TaskMsg.instanceid', index=2,
number=3, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceStatus', full_name='TaskMsg.instanceStatus', index=3,
number=4, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='token', full_name='TaskMsg.token', index=4,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='errmsg', full_name='TaskMsg.errmsg', index=5,
number=6, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=158,
serialized_end=285,
)
_TASKINFO = _descriptor.Descriptor(
name='TaskInfo',
full_name='TaskInfo',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='id', full_name='TaskInfo.id', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='username', full_name='TaskInfo.username', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceid', full_name='TaskInfo.instanceid', index=2,
number=3, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instanceCount', full_name='TaskInfo.instanceCount', index=3,
number=4, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='maxRetryCount', full_name='TaskInfo.maxRetryCount', index=4,
number=5, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='parameters', full_name='TaskInfo.parameters', index=5,
number=6, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='cluster', full_name='TaskInfo.cluster', index=6,
number=7, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='timeout', full_name='TaskInfo.timeout', index=7,
number=8, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='token', full_name='TaskInfo.token', index=8,
number=9, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='reused', full_name='TaskInfo.reused', index=9,
number=10, type=8, cpp_type=7, label=1,
has_default_value=False, default_value=False,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=288,
serialized_end=502,
)
_PARAMETERS = _descriptor.Descriptor(
name='Parameters',
full_name='Parameters',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='command', full_name='Parameters.command', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='stderrRedirectPath', full_name='Parameters.stderrRedirectPath', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='stdoutRedirectPath', full_name='Parameters.stdoutRedirectPath', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=504,
serialized_end=599,
)
_COMMAND_ENVVARSENTRY = _descriptor.Descriptor(
name='EnvVarsEntry',
full_name='Command.EnvVarsEntry',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='Command.EnvVarsEntry.key', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='value', full_name='Command.EnvVarsEntry.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')),
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=695,
serialized_end=741,
)
_COMMAND = _descriptor.Descriptor(
name='Command',
full_name='Command',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='commandLine', full_name='Command.commandLine', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='packagePath', full_name='Command.packagePath', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='envVars', full_name='Command.envVars', index=2,
number=3, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[_COMMAND_ENVVARSENTRY, ],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=602,
serialized_end=741,
)
_CLUSTER = _descriptor.Descriptor(
name='Cluster',
full_name='Cluster',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='image', full_name='Cluster.image', index=0,
number=1, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='instance', full_name='Cluster.instance', index=1,
number=2, type=11, cpp_type=10, label=1,
has_default_value=False, default_value=None,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='mount', full_name='Cluster.mount', index=2,
number=3, type=11, cpp_type=10, label=3,
has_default_value=False, default_value=[],
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=743,
serialized_end=827,
)
_IMAGE = _descriptor.Descriptor(
name='Image',
full_name='Image',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='name', full_name='Image.name', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='type', full_name='Image.type', index=1,
number=2, type=14, cpp_type=8, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='owner', full_name='Image.owner', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
_IMAGE_IMAGETYPE,
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=829,
serialized_end=945,
)
_MOUNT = _descriptor.Descriptor(
name='Mount',
full_name='Mount',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='provider', full_name='Mount.provider', index=0,
number=1, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='localPath', full_name='Mount.localPath', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='remotePath', full_name='Mount.remotePath', index=2,
number=3, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='accessKey', full_name='Mount.accessKey', index=3,
number=4, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='secretKey', full_name='Mount.secretKey', index=4,
number=5, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='other', full_name='Mount.other', index=5,
number=6, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=947,
serialized_end=1064,
)
_INSTANCE = _descriptor.Descriptor(
name='Instance',
full_name='Instance',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='cpu', full_name='Instance.cpu', index=0,
number=1, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='memory', full_name='Instance.memory', index=1,
number=2, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='disk', full_name='Instance.disk', index=2,
number=3, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
_descriptor.FieldDescriptor(
name='gpu', full_name='Instance.gpu', index=3,
number=4, type=5, cpp_type=1, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None, file=DESCRIPTOR),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=1066,
serialized_end=1132,
)
_REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS
_REPLY_REPLYSTATUS.containing_type = _REPLY
_REPORTMSG.fields_by_name['taskmsgs'].message_type = _TASKMSG
_TASKMSG.fields_by_name['instanceStatus'].enum_type = _STATUS
_TASKINFO.fields_by_name['parameters'].message_type = _PARAMETERS
_TASKINFO.fields_by_name['cluster'].message_type = _CLUSTER
_PARAMETERS.fields_by_name['command'].message_type = _COMMAND
_COMMAND_ENVVARSENTRY.containing_type = _COMMAND
_COMMAND.fields_by_name['envVars'].message_type = _COMMAND_ENVVARSENTRY
_CLUSTER.fields_by_name['image'].message_type = _IMAGE
_CLUSTER.fields_by_name['instance'].message_type = _INSTANCE
_CLUSTER.fields_by_name['mount'].message_type = _MOUNT
_IMAGE.fields_by_name['type'].enum_type = _IMAGE_IMAGETYPE
_IMAGE_IMAGETYPE.containing_type = _IMAGE
DESCRIPTOR.message_types_by_name['Reply'] = _REPLY
DESCRIPTOR.message_types_by_name['ReportMsg'] = _REPORTMSG
DESCRIPTOR.message_types_by_name['TaskMsg'] = _TASKMSG
DESCRIPTOR.message_types_by_name['TaskInfo'] = _TASKINFO
DESCRIPTOR.message_types_by_name['Parameters'] = _PARAMETERS
DESCRIPTOR.message_types_by_name['Command'] = _COMMAND
DESCRIPTOR.message_types_by_name['Cluster'] = _CLUSTER
DESCRIPTOR.message_types_by_name['Image'] = _IMAGE
DESCRIPTOR.message_types_by_name['Mount'] = _MOUNT
DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE
DESCRIPTOR.enum_types_by_name['Status'] = _STATUS
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict(
DESCRIPTOR = _REPLY,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Reply)
))
_sym_db.RegisterMessage(Reply)
ReportMsg = _reflection.GeneratedProtocolMessageType('ReportMsg', (_message.Message,), dict(
DESCRIPTOR = _REPORTMSG,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:ReportMsg)
))
_sym_db.RegisterMessage(ReportMsg)
TaskMsg = _reflection.GeneratedProtocolMessageType('TaskMsg', (_message.Message,), dict(
DESCRIPTOR = _TASKMSG,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:TaskMsg)
))
_sym_db.RegisterMessage(TaskMsg)
TaskInfo = _reflection.GeneratedProtocolMessageType('TaskInfo', (_message.Message,), dict(
DESCRIPTOR = _TASKINFO,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:TaskInfo)
))
_sym_db.RegisterMessage(TaskInfo)
Parameters = _reflection.GeneratedProtocolMessageType('Parameters', (_message.Message,), dict(
DESCRIPTOR = _PARAMETERS,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Parameters)
))
_sym_db.RegisterMessage(Parameters)
Command = _reflection.GeneratedProtocolMessageType('Command', (_message.Message,), dict(
EnvVarsEntry = _reflection.GeneratedProtocolMessageType('EnvVarsEntry', (_message.Message,), dict(
DESCRIPTOR = _COMMAND_ENVVARSENTRY,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Command.EnvVarsEntry)
))
,
DESCRIPTOR = _COMMAND,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Command)
))
_sym_db.RegisterMessage(Command)
_sym_db.RegisterMessage(Command.EnvVarsEntry)
Cluster = _reflection.GeneratedProtocolMessageType('Cluster', (_message.Message,), dict(
DESCRIPTOR = _CLUSTER,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Cluster)
))
_sym_db.RegisterMessage(Cluster)
Image = _reflection.GeneratedProtocolMessageType('Image', (_message.Message,), dict(
DESCRIPTOR = _IMAGE,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Image)
))
_sym_db.RegisterMessage(Image)
Mount = _reflection.GeneratedProtocolMessageType('Mount', (_message.Message,), dict(
DESCRIPTOR = _MOUNT,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Mount)
))
_sym_db.RegisterMessage(Mount)
Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict(
DESCRIPTOR = _INSTANCE,
__module__ = 'rpc_pb2'
# @@protoc_insertion_point(class_scope:Instance)
))
_sym_db.RegisterMessage(Instance)
_COMMAND_ENVVARSENTRY.has_options = True
_COMMAND_ENVVARSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001'))
_MASTER = _descriptor.ServiceDescriptor(
name='Master',
full_name='Master',
file=DESCRIPTOR,
index=0,
options=None,
serialized_start=1227,
serialized_end=1267,
methods=[
_descriptor.MethodDescriptor(
name='report',
full_name='Master.report',
index=0,
containing_service=None,
input_type=_REPORTMSG,
output_type=_REPLY,
options=None,
),
])
_sym_db.RegisterServiceDescriptor(_MASTER)
DESCRIPTOR.services_by_name['Master'] = _MASTER
_WORKER = _descriptor.ServiceDescriptor(
name='Worker',
full_name='Worker',
file=DESCRIPTOR,
index=1,
options=None,
serialized_start=1269,
serialized_end=1350,
methods=[
_descriptor.MethodDescriptor(
name='process_task',
full_name='Worker.process_task',
index=0,
containing_service=None,
input_type=_TASKINFO,
output_type=_REPLY,
options=None,
),
_descriptor.MethodDescriptor(
name='stop_tasks',
full_name='Worker.stop_tasks',
index=1,
containing_service=None,
input_type=_REPORTMSG,
output_type=_REPLY,
options=None,
),
])
_sym_db.RegisterServiceDescriptor(_WORKER)
DESCRIPTOR.services_by_name['Worker'] = _WORKER
# @@protoc_insertion_point(module_scope)

105
src/protos/rpc_pb2_grpc.py Normal file
View File

@ -0,0 +1,105 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
import grpc
from protos import rpc_pb2 as rpc__pb2
class MasterStub(object):
# missing associated documentation comment in .proto file
pass
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.report = channel.unary_unary(
'/Master/report',
request_serializer=rpc__pb2.ReportMsg.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
class MasterServicer(object):
# missing associated documentation comment in .proto file
pass
def report(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_MasterServicer_to_server(servicer, server):
rpc_method_handlers = {
'report': grpc.unary_unary_rpc_method_handler(
servicer.report,
request_deserializer=rpc__pb2.ReportMsg.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'Master', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
class WorkerStub(object):
# missing associated documentation comment in .proto file
pass
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.process_task = channel.unary_unary(
'/Worker/process_task',
request_serializer=rpc__pb2.TaskInfo.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
self.stop_tasks = channel.unary_unary(
'/Worker/stop_tasks',
request_serializer=rpc__pb2.ReportMsg.SerializeToString,
response_deserializer=rpc__pb2.Reply.FromString,
)
class WorkerServicer(object):
# missing associated documentation comment in .proto file
pass
def process_task(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def stop_tasks(self, request, context):
# missing associated documentation comment in .proto file
pass
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_WorkerServicer_to_server(servicer, server):
rpc_method_handlers = {
'process_task': grpc.unary_unary_rpc_method_handler(
servicer.process_task,
request_deserializer=rpc__pb2.TaskInfo.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
'stop_tasks': grpc.unary_unary_rpc_method_handler(
servicer.stop_tasks,
request_deserializer=rpc__pb2.ReportMsg.FromString,
response_serializer=rpc__pb2.Reply.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'Worker', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))

View File

@ -79,5 +79,17 @@ def getenv(key):
return os.environ.get("ALLOCATED_PORTS","10000-65535") return os.environ.get("ALLOCATED_PORTS","10000-65535")
elif key =="ALLOW_SCALE_OUT": elif key =="ALLOW_SCALE_OUT":
return os.environ.get("ALLOW_SCALE_OUT", "False") return os.environ.get("ALLOW_SCALE_OUT", "False")
elif key == "BATCH_ON":
return os.environ.get("BATCH_ON","True")
elif key == "BATCH_MASTER_PORT":
return os.environ.get("BATCH_MASTER_PORT","50050")
elif key == "BATCH_WORKER_PORT":
return os.environ.get("BATCH_WORKER_PORT","50051")
elif key == "BATCH_GATEWAY":
return os.environ.get("BATCH_GATEWAY","10.0.3.1")
elif key == "BATCH_NET":
return os.environ.get("BATCH_NET","10.0.3.0/24")
elif key == "BATCH_MAX_THREAD_WORKER":
return os.environ.get("BATCH_MAX_THREAD_WORKER","5")
else: else:
return os.environ.get(key,"") return os.environ.get(key,"")

120
src/utils/gputools.py Normal file
View File

@ -0,0 +1,120 @@
import lxc
import subprocess
import os
import signal
from utils.log import logger
# Note: keep physical device id always the same as the virtual device id
# device_path e.g. /dev/nvidia0
def add_device(container_name, device_path):
c = lxc.Container(container_name)
return c.add_device_node(device_path, device_path)
def remove_device(container_name, device_path):
c = lxc.Container(container_name)
return c.remove_device_node('', device_path)
# Mon May 21 10:51:45 2018
# +-----------------------------------------------------------------------------+
# | NVIDIA-SMI 381.22 Driver Version: 381.22 |
# |-------------------------------+----------------------+----------------------+
# | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
# | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
# |===============================+======================+======================|
# | 0 GeForce GTX 108... Off | 0000:02:00.0 Off | N/A |
# | 33% 53C P2 59W / 250W | 295MiB / 11172MiB | 2% Default |
# +-------------------------------+----------------------+----------------------+
# | 1 GeForce GTX 108... Off | 0000:84:00.0 Off | N/A |
# | 21% 35C P8 10W / 250W | 161MiB / 11172MiB | 0% Default |
# +-------------------------------+----------------------+----------------------+
#
# +-----------------------------------------------------------------------------+
# | Processes: GPU Memory |
# | GPU PID Type Process name Usage |
# |=============================================================================|
# | 0 111893 C python3 285MiB |
# | 1 111893 C python3 151MiB |
# +-----------------------------------------------------------------------------+
#
def nvidia_smi():
try:
ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return ret.stdout.decode('utf-8').split('\n')
except subprocess.CalledProcessError:
return None
except Exception as e:
return None
def get_gpu_driver_version():
output = nvidia_smi()
if not output:
return None
else:
return output[2].split()[-2]
def get_gpu_status():
output = nvidia_smi()
if not output:
return []
interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0]
status_list = []
for index in range(7, interval_index, 3):
status = {}
status['id'] = output[index].split()[1]
sp = output[index+1].split()
status['fan'] = sp[1]
status['memory'] = sp[8]
status['memory_max'] = sp[10]
status['util'] = sp[12]
status_list.append(status)
return status_list
def get_gpu_processes():
output = nvidia_smi()
if not output:
return []
interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0]
process_list = []
for index in range(interval_index + 5, len(output)):
sp = output[index].split()
if len(sp) != 7:
break
process = {}
process['gpu'] = sp[1]
process['pid'] = sp[2]
process['name'] = sp[4]
process['memory'] = sp[5]
process['container'] = get_container_name_by_pid(sp[2])
process_list.append(process)
return process_list
def get_container_name_by_pid(pid):
with open('/proc/%s/cgroup' % pid) as f:
content = f.readlines()[0].strip().split('/')
if content[1] != 'lxc':
return 'host'
else:
return content[2]
return None
def clean_up_processes_in_gpu(gpu_id):
logger.info('[gputools] start clean up processes in gpu %d' % gpu_id)
processes = get_gpu_processes()
for process in [p for p in processes if p['gpu'] == gpu_id]:
logger.info('[gputools] find process %d running in gpu %d' % (process['pid'], process['gpu']))
if process['container'] == 'host':
logger.warning('[gputools] find process of host, ignored')
else:
logger.warning('[gputools] find process of container [%s], killed' % process['container'])
try:
os.kill(process['pid'], signal.SIGKILL)
except OSError:
continue

View File

@ -19,7 +19,7 @@ Design:Monitor mainly consists of three parts: Collectors, Master_Collector and
import subprocess,re,os,psutil,math,sys import subprocess,re,os,psutil,math,sys
import time,threading,json,traceback,platform import time,threading,json,traceback,platform
from utils import env, etcdlib from utils import env, etcdlib, gputools
import lxc import lxc
import xmlrpc.client import xmlrpc.client
from datetime import datetime from datetime import datetime
@ -262,6 +262,7 @@ class Container_Collector(threading.Thread):
global pid2name global pid2name
global laststopcpuval global laststopcpuval
global laststopruntime global laststopruntime
is_batch = container_name.split('-')[1] == 'batch'
# collect basic information, such as running time,state,pid,ip,name # collect basic information, such as running time,state,pid,ip,name
container = lxc.Container(container_name) container = lxc.Container(container_name)
basic_info = {} basic_info = {}
@ -286,7 +287,8 @@ class Container_Collector(threading.Thread):
containerpids.append(container_pid_str) containerpids.append(container_pid_str)
pid2name[container_pid_str] = container_name pid2name[container_pid_str] = container_name
running_time = self.get_proc_etime(container.init_pid) running_time = self.get_proc_etime(container.init_pid)
running_time += laststopruntime[container_name] if not is_batch:
running_time += laststopruntime[container_name]
basic_info['PID'] = container_pid_str basic_info['PID'] = container_pid_str
basic_info['IP'] = container.get_ips()[0] basic_info['IP'] = container.get_ips()[0]
basic_info['RunningTime'] = running_time basic_info['RunningTime'] = running_time
@ -326,7 +328,8 @@ class Container_Collector(threading.Thread):
cpu_use = {} cpu_use = {}
lastval = 0 lastval = 0
try: try:
lastval = laststopcpuval[container_name] if not is_batch:
lastval = laststopcpuval[container_name]
except: except:
logger.warning(traceback.format_exc()) logger.warning(traceback.format_exc())
cpu_val += lastval cpu_val += lastval
@ -369,7 +372,7 @@ class Container_Collector(threading.Thread):
# deal with network used data # deal with network used data
containerids = re.split("-",container_name) containerids = re.split("-",container_name)
if len(containerids) >= 3: if not is_batch and len(containerids) >= 3:
workercinfo[container_name]['net_stats'] = self.net_stats[containerids[1] + '-' + containerids[2]] workercinfo[container_name]['net_stats'] = self.net_stats[containerids[1] + '-' + containerids[2]]
#logger.info(workercinfo[container_name]['net_stats']) #logger.info(workercinfo[container_name]['net_stats'])
@ -378,7 +381,7 @@ class Container_Collector(threading.Thread):
lasttime = lastbillingtime[container_name] lasttime = lastbillingtime[container_name]
#logger.info(lasttime) #logger.info(lasttime)
# process real billing if running time reach an hour # process real billing if running time reach an hour
if not int(running_time/self.billingtime) == lasttime: if not is_batch and not int(running_time/self.billingtime) == lasttime:
#logger.info("billing:"+str(float(cpu_val))) #logger.info("billing:"+str(float(cpu_val)))
lastbillingtime[container_name] = int(running_time/self.billingtime) lastbillingtime[container_name] = int(running_time/self.billingtime)
self.billing_increment(container_name) self.billing_increment(container_name)
@ -478,6 +481,10 @@ class Collector(threading.Thread):
info[idx][key] = val info[idx][key] = val
return [cpuset, info] return [cpuset, info]
# collect gpu used information
def collect_gpuinfo(self):
return gputools.get_gpu_status()
# collect disk used information # collect disk used information
def collect_diskinfo(self): def collect_diskinfo(self):
global workercinfo global workercinfo
@ -534,9 +541,10 @@ class Collector(threading.Thread):
[cpuinfo,cpuconfig] = self.collect_cpuinfo() [cpuinfo,cpuconfig] = self.collect_cpuinfo()
workerinfo['cpuinfo'] = cpuinfo workerinfo['cpuinfo'] = cpuinfo
workerinfo['cpuconfig'] = cpuconfig workerinfo['cpuconfig'] = cpuconfig
workerinfo['gpuinfo'] = self.collect_gpuinfo()
workerinfo['diskinfo'] = self.collect_diskinfo() workerinfo['diskinfo'] = self.collect_diskinfo()
workerinfo['running'] = True workerinfo['running'] = True
#time.sleep(self.interval) time.sleep(self.interval)
if self.test: if self.test:
break break
# print(self.etcdser.getkey('/meminfo/total')) # print(self.etcdser.getkey('/meminfo/total'))

60
src/worker/ossmounter.py Normal file
View File

@ -0,0 +1,60 @@
import abc
import subprocess, os
from utils.log import logger
class OssMounter(object):
__metaclass__ = abc.ABCMeta
@staticmethod
def execute_cmd(cmd):
ret = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
if ret.returncode != 0:
msg = ret.stdout.decode(encoding="utf-8")
logger.error(msg)
return [False,msg]
else:
return [True,""]
@staticmethod
@abc.abstractmethod
def mount_oss(datapath, mount_info):
# mount oss
pass
@staticmethod
@abc.abstractmethod
def umount_oss(datapath, mount_info):
# umount oss
pass
class aliyunOssMounter(OssMounter):
@staticmethod
def mount_oss(datapath, mount_info):
# mount oss
try:
pwdfile = open("/etc/passwd-ossfs","w")
pwdfile.write(mount_info.remotePath+":"+mount_info.accessKey+":"+mount_info.secretKey+"\n")
pwdfile.close()
except Exception as err:
logger.error(traceback.format_exc())
return [False,msg]
cmd = "chmod 640 /etc/passwd-ossfs"
[success1, msg] = OssMounter.execute_cmd(cmd)
mountpath = datapath+"/"+mount_info.remotePath
logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath))
if not os.path.isdir(mountpath):
os.makedirs(mountpath)
cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.other)
[success, msg] = OssMounter.execute_cmd(cmd)
return [True,""]
@staticmethod
def umount_oss(datapath, mount_info):
mountpath = datapath + "/" + mount_info.remotePath
logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath))
cmd = "fusermount -u %s" % (mountpath)
[success, msg] = OssMounter.execute_cmd(cmd)
[success, msg] = OssMounter.execute_cmd("rm -rf %s" % mountpath)
return [True,""]

454
src/worker/taskcontroller.py Executable file
View File

@ -0,0 +1,454 @@
#!/usr/bin/python3
import sys
if sys.path[0].endswith("worker"):
sys.path[0] = sys.path[0][:-6]
from utils import env, tools
config = env.getenv("CONFIG")
#config = "/opt/docklet/local/docklet-running.conf"
tools.loadenv(config)
from utils.log import initlogging
initlogging("docklet-taskcontroller")
from utils.log import logger
from concurrent import futures
import grpc
#from utils.log import logger
#from utils import env
import json,lxc,subprocess,threading,os,time,traceback
from utils import imagemgr,etcdlib,gputools
from worker import ossmounter
from protos import rpc_pb2, rpc_pb2_grpc
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
MAX_RUNNING_TIME = _ONE_DAY_IN_SECONDS
def ip_to_int(addr):
[a, b, c, d] = addr.split('.')
return (int(a)<<24) + (int(b)<<16) + (int(c)<<8) + int(d)
def int_to_ip(num):
return str((num>>24)&255)+"."+str((num>>16)&255)+"."+str((num>>8)&255)+"."+str(num&255)
class TaskController(rpc_pb2_grpc.WorkerServicer):
def __init__(self):
rpc_pb2_grpc.WorkerServicer.__init__(self)
etcdaddr = env.getenv("ETCD")
logger.info ("using ETCD %s" % etcdaddr )
clustername = env.getenv("CLUSTER_NAME")
logger.info ("using CLUSTER_NAME %s" % clustername )
# init etcdlib client
try:
self.etcdclient = etcdlib.Client(etcdaddr, prefix = clustername)
except Exception:
logger.error ("connect etcd failed, maybe etcd address not correct...")
sys.exit(1)
else:
logger.info("etcd connected")
# get master ip and report port
[success,masterip] = self.etcdclient.getkey("service/master")
if not success:
logger.error("Fail to get master ip address.")
sys.exit(1)
else:
self.master_ip = masterip
logger.info("Get master ip address: %s" % (self.master_ip))
self.master_port = env.getenv('BATCH_MASTER_PORT')
self.imgmgr = imagemgr.ImageMgr()
self.fspath = env.getenv('FS_PREFIX')
self.confpath = env.getenv('DOCKLET_CONF')
self.taskmsgs = []
self.msgslock = threading.Lock()
self.report_interval = 2
self.lock = threading.Lock()
self.mount_lock = threading.Lock()
self.cons_gateway = env.getenv('BATCH_GATEWAY')
self.cons_ips = env.getenv('BATCH_NET')
logger.info("Batch gateway ip address %s" % self.cons_gateway)
logger.info("Batch ip pools %s" % self.cons_ips)
self.cidr = 32 - int(self.cons_ips.split('/')[1])
self.ipbase = ip_to_int(self.cons_ips.split('/')[0])
self.free_ips = []
for i in range(2, (1 << self.cidr) - 1):
self.free_ips.append(i)
logger.info("Free ip addresses pool %s" % str(self.free_ips))
self.gpu_lock = threading.Lock()
self.gpu_status = {}
gpus = gputools.get_gpu_status()
for gpu in gpus:
self.gpu_status[gpu['id']] = ""
self.start_report()
logger.info('TaskController init success')
# Need Locks
def acquire_ip(self):
self.lock.acquire()
if len(self.free_ips) == 0:
return [False, "No free ips"]
ip = int_to_ip(self.ipbase + self.free_ips[0])
self.free_ips.remove(self.free_ips[0])
logger.info(str(self.free_ips))
self.lock.release()
return [True, ip + "/" + str(32 - self.cidr)]
# Need Locks
def release_ip(self,ipstr):
self.lock.acquire()
ipnum = ip_to_int(ipstr.split('/')[0]) - self.ipbase
self.free_ips.append(ipnum)
logger.info(str(self.free_ips))
self.lock.release()
def add_gpu_device(self, lxcname, gpu_need):
if gpu_need < 1:
return [True, ""]
self.gpu_lock.acquire()
use_gpus = []
for gpuid in self.gpu_status.keys():
if self.gpu_status[gpuid] == "" and gpu_need > 0:
use_gpus.append(gpuid)
gpu_need -= 1
if gpu_need > 0:
self.gpu_lock.release()
return [False, "No free GPUs"]
for gpuid in use_gpus:
self.gpu_status[gpuid] = lxcname
try:
gputools.add_device(lxcname, "/dev/nvidiactl")
gputools.add_device(lxcname, "/dev/nvidia-uvm")
for gpuid in use_gpus:
gputools.add_device(lxcname,"/dev/nvidia"+str(gpuid))
logger.info("Add gpu:"+str(gpuid) +" to lxc:"+str(lxcname))
except Exception as e:
logger.error(traceback.format_exc())
for gpuid in use_gpus:
self.gpu_status[gpuid] = ""
self.gpu_lock.release()
return [False, "Error occurs when adding gpu device."]
self.gpu_lock.release()
return [True, ""]
def release_gpu_device(self, lxcname):
self.gpu_lock.acquire()
for gpuid in self.gpu_status.keys():
if self.gpu_status[gpuid] == lxcname:
self.gpu_status[gpuid] = ""
self.gpu_lock.release()
#mount_oss
def mount_oss(self, datapath, mount_info):
self.mount_lock.acquire()
try:
for mount in mount_info:
provider = mount.provider
mounter = getattr(ossmounter,provider+"OssMounter",None)
if mounter is None:
self.mount_lock.release()
return [False, provider + " doesn't exist!"]
[success, msg] = mounter.mount_oss(datapath,mount)
if not success:
self.mount_lock.release()
return [False, msg]
except Exception as err:
self.mount_lock.release()
logger.error(traceback.format_exc())
return [False,""]
self.mount_lock.release()
return [True,""]
#umount oss
def umount_oss(self, datapath, mount_info):
try:
for mount in mount_info:
provider = mount.provider
mounter = getattr(ossmounter,provider+"OssMounter",None)
if mounter is None:
return [False, provider + " doesn't exist!"]
[success, msg] = mounter.umount_oss(datapath,mount)
if not success:
return [False, msg]
except Exception as err:
logger.error(traceback.format_exc())
return [False,""]
#accquire ip and create a container
def create_container(self,instanceid,username,image,lxcname,quota):
# acquire ip
[status, ip] = self.acquire_ip()
if not status:
return [False, ip]
# prepare image and filesystem
status = self.imgmgr.prepareFS(username,image,lxcname,str(quota.disk))
if not status:
self.release_ip(ip)
return [False, "Create container for batch failed when preparing filesystem"]
rootfs = "/var/lib/lxc/%s/rootfs" % lxcname
if not os.path.isdir("%s/global/users/%s" % (self.fspath,username)):
path = env.getenv('DOCKLET_LIB')
subprocess.call([path+"/master/userinit.sh", username])
logger.info("user %s directory not found, create it" % username)
sys_run("mkdir -p /var/lib/lxc/%s" % lxcname)
logger.info("generate config file for %s" % lxcname)
def config_prepare(content):
content = content.replace("%ROOTFS%",rootfs)
content = content.replace("%HOSTNAME%","batch-%s" % str(instanceid))
content = content.replace("%CONTAINER_MEMORY%",str(quota.memory))
content = content.replace("%CONTAINER_CPU%",str(quota.cpu*100000))
content = content.replace("%FS_PREFIX%",self.fspath)
content = content.replace("%LXCSCRIPT%",env.getenv("LXC_SCRIPT"))
content = content.replace("%USERNAME%",username)
content = content.replace("%LXCNAME%",lxcname)
content = content.replace("%IP%",ip)
content = content.replace("%GATEWAY%",self.cons_gateway)
return content
logger.info(self.confpath)
conffile = open(self.confpath+"/container.batch.conf", 'r')
conftext = conffile.read()
conffile.close()
conftext = config_prepare(conftext)
conffile = open("/var/lib/lxc/%s/config" % lxcname, 'w')
conffile.write(conftext)
conffile.close()
return [True, ip]
def process_task(self, request, context):
logger.info('excute task with parameter: ' + str(request))
taskid = request.id
instanceid = request.instanceid
# get config from request
command = request.parameters.command.commandLine #'/root/getenv.sh' #parameter['Parameters']['Command']['CommandLine']
#envs = {'MYENV1':'MYVAL1', 'MYENV2':'MYVAL2'} #parameters['Parameters']['Command']['EnvVars']
pkgpath = request.parameters.command.packagePath
envs = request.parameters.command.envVars
envs['taskid'] = str(taskid)
envs['instanceid'] = str(instanceid)
image = {}
image['name'] = request.cluster.image.name
if request.cluster.image.type == rpc_pb2.Image.PRIVATE:
image['type'] = 'private'
elif request.cluster.image.type == rpc_pb2.Image.PUBLIC:
image['type'] = 'public'
else:
image['type'] = 'base'
image['owner'] = request.cluster.image.owner
username = request.username
token = request.token
lxcname = '%s-batch-%s-%s-%s' % (username,taskid,str(instanceid),token)
instance_type = request.cluster.instance
mount_list = request.cluster.mount
outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath]
timeout = request.timeout
gpu_need = int(request.cluster.instance.gpu)
reused = request.reused
#create container
[success, ip] = self.create_container(instanceid, username, image, lxcname, instance_type)
if not success:
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=ip)
#mount oss
self.mount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list)
conffile = open("/var/lib/lxc/%s/config" % lxcname, 'a+')
mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s %s/root/oss/%s none bind,rw,create=dir 0 0"
for mount in mount_list:
conffile.write("\n"+ mount_str % (self.fspath, username, mount.remotePath, rootfs, mount.remotePath))
conffile.close()
container = lxc.Container(lxcname)
if not container.start():
logger.error('start container %s failed' % lxcname)
self.release_ip(ip)
self.imgmgr.deleteFS(lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container")
logger.info('start container %s success' % lxcname)
#add GPU
[success, msg] = self.add_gpu_device(lxcname,gpu_need)
if not success:
logger.error("Fail to add gpu device. " + msg)
container.stop()
self.release_ip(ip)
self.imgmgr.deleteFS(lxcname)
return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg)
thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list))
thread.setDaemon(True)
thread.start()
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
def write_output(self,lxcname,tmplogpath,filepath):
cmd = "lxc-attach -n " + lxcname + " -- mv %s %s"
if filepath == "" or filepath == "/root/nfs/batch_{jobid}/" or os.path.abspath("/root/nfs/"+tmplogpath) == os.path.abspath(filepath):
return [True,""]
ret = subprocess.run(cmd % ("/root/nfs/"+tmplogpath,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
if ret.returncode != 0:
msg = ret.stdout.decode(encoding="utf-8")
logger.error(msg)
return [False,msg]
logger.info("Succeed to moving nfs/%s to %s" % (tmplogpath,filepath))
return [True,""]
def execute_task(self,username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_info):
lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/"
scriptname = "batch_job.sh"
try:
scriptfile = open(lxcfspath+"root/"+scriptname,"w")
scriptfile.write("#!/bin/bash\n")
scriptfile.write("cd "+str(pkgpath)+"\n")
scriptfile.write(command)
scriptfile.close()
except Exception as err:
logger.error(traceback.format_exc())
logger.error("Fail to write script file with taskid(%s) instanceid(%s)" % (str(taskid),str(instanceid)))
else:
try:
job_id = taskid.split('_')[1]
except Exception as e:
logger.error(traceback.format_exc())
job_id = "_none"
jobdir = "batch_" + job_id
logdir = "%s/global/users/%s/data/" % (self.fspath,username) + jobdir
if not os.path.exists(logdir):
logger.info("Directory:%s not exists, create it." % logdir)
os.mkdir(logdir)
stdoutname = str(taskid)+"_"+str(instanceid)+"_stdout.txt"
stderrname = str(taskid)+"_"+str(instanceid)+"_stderr.txt"
try:
stdoutfile = open(logdir+"/"+stdoutname,"w")
stderrfile = open(logdir+"/"+stderrname,"w")
logger.info("Create stdout(%s) and stderr(%s) file to log" % (stdoutname, stderrname))
except Exception as e:
logger.error(traceback.format_exc())
stdoutfile = None
stderrfile = None
cmd = "lxc-attach -n " + lxcname
for envkey,envval in envs.items():
cmd = cmd + " -v %s=%s" % (envkey,envval)
cmd = cmd + " -- /bin/bash \"" + "/root/" + scriptname + "\""
logger.info('run task with command - %s' % cmd)
p = subprocess.Popen(cmd,stdout=stdoutfile,stderr=stderrfile, shell=True)
#logger.info(p)
if timeout == 0:
to = MAX_RUNNING_TIME
else:
to = timeout
while p.poll() is None and to > 0:
time.sleep(min(2,to))
to -= 2
if p.poll() is None:
p.kill()
logger.info("Running time(%d) is out. Task(%s-%s-%s) will be killed." % (timeout,str(taskid),str(instanceid),token))
self.add_msg(taskid,username,instanceid,rpc_pb2.TIMEOUT,token,"Running time is out.")
else:
[success1,msg1] = self.write_output(lxcname,jobdir+"/"+stdoutname,outpath[0])
[success2,msg2] = self.write_output(lxcname,jobdir+"/"+stderrname,outpath[1])
if not success1 or not success2:
if not success1:
msg = msg1
else:
msg = msg2
logger.info("Output error on Task(%s-%s-%s)." % (str(taskid),str(instanceid),token))
self.add_msg(taskid,username,instanceid,rpc_pb2.OUTPUTERROR,token,msg)
else:
if p.poll() == 0:
logger.info("Task(%s-%s-%s) completed." % (str(taskid),str(instanceid),token))
self.add_msg(taskid,username,instanceid,rpc_pb2.COMPLETED,token,"")
else:
logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(instanceid),token))
self.add_msg(taskid,username,instanceid,rpc_pb2.FAILED,token,"")
container = lxc.Container(lxcname)
if container.stop():
logger.info("stop container %s success" % lxcname)
else:
logger.error("stop container %s failed" % lxcname)
logger.info("deleting container:%s" % lxcname)
if self.imgmgr.deleteFS(lxcname):
logger.info("delete container %s success" % lxcname)
else:
logger.error("delete container %s failed" % lxcname)
logger.info("release ip address %s" % ip)
self.release_ip(ip)
self.release_gpu_device(lxcname)
#umount oss
self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_info)
def stop_tasks(self, request, context):
for msg in request.taskmsgs:
lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token)
logger.info("Stop the task with lxc:"+lxcname)
subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="")
def add_msg(self,taskid,username,instanceid,status,token,errmsg):
self.msgslock.acquire()
try:
self.taskmsgs.append(rpc_pb2.TaskMsg(taskid=str(taskid),username=username,instanceid=int(instanceid),instanceStatus=status,token=token,errmsg=errmsg))
except Exception as err:
logger.error(traceback.format_exc())
self.msgslock.release()
#logger.info(str(self.taskmsgs))
def report_msg(self):
channel = grpc.insecure_channel(self.master_ip+":"+self.master_port)
stub = rpc_pb2_grpc.MasterStub(channel)
while True:
self.msgslock.acquire()
reportmsg = rpc_pb2.ReportMsg(taskmsgs = self.taskmsgs)
try:
response = stub.report(reportmsg)
logger.info("Response from master by reporting: "+str(response.status)+" "+response.message)
except Exception as err:
logger.error(traceback.format_exc())
self.taskmsgs = []
self.msgslock.release()
time.sleep(self.report_interval)
def start_report(self):
thread = threading.Thread(target = self.report_msg, args=())
thread.setDaemon(True)
thread.start()
logger.info("Start to report task messages to master every %d seconds." % self.report_interval)
def TaskControllerServe():
max_threads = int(env.getenv('BATCH_MAX_THREAD_WORKER'))
worker_port = int(env.getenv('BATCH_WORKER_PORT'))
logger.info("Max Threads on a worker is %d" % max_threads)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_threads))
rpc_pb2_grpc.add_WorkerServicer_to_server(TaskController(), server)
server.add_insecure_port('[::]:'+str(worker_port))
server.start()
logger.info("Start TaskController Servicer on port:%d" % worker_port)
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == "__main__":
TaskControllerServe()

View File

@ -57,17 +57,23 @@ class Worker(object):
self.etcd = etcdclient self.etcd = etcdclient
self.master = self.etcd.getkey("service/master")[1] self.master = self.etcd.getkey("service/master")[1]
self.mode=None self.mode = None
self.workertype = "normal"
self.key=""
# waiting state is preserved for compatible. if len(sys.argv) > 1 and sys.argv[1] == "batch-worker":
self.etcd.setkey("machines/runnodes/"+self.addr, "waiting") self.workertype = "batch"
# get this node's key to judge how to init.
[status, key] = self.etcd.getkey("machines/runnodes/"+self.addr) if self.workertype == "normal":
if status: # waiting state is preserved for compatible.
self.key = generatekey("machines/allnodes/"+self.addr) self.etcd.setkey("machines/runnodes/"+self.addr, "waiting")
else: # get this node's key to judge how to init.
logger.error("get key failed. %s" % 'machines/runnodes/'+self.addr) [status, key] = self.etcd.getkey("machines/runnodes/"+self.addr)
sys.exit(1) if status:
self.key = generatekey("machines/allnodes/"+self.addr)
else:
logger.error("get key failed. %s" % 'machines/runnodes/'+self.addr)
sys.exit(1)
# check token to check global directory # check token to check global directory
[status, token_1] = self.etcd.getkey("token") [status, token_1] = self.etcd.getkey("token")
@ -87,7 +93,8 @@ class Worker(object):
if node['key'] == self.key: if node['key'] == self.key:
value = 'init-recovery' value = 'init-recovery'
break break
logger.info("worker start in "+value+" mode")
logger.info("worker start in "+value+" mode, worker type is"+self.workertype)
Containers = container.Container(self.addr, etcdclient) Containers = container.Container(self.addr, etcdclient)
if value == 'init-new': if value == 'init-new':
@ -193,7 +200,8 @@ class Worker(object):
self.hosts_collector.start() self.hosts_collector.start()
logger.info("Monitor Collector has been started.") logger.info("Monitor Collector has been started.")
# worker change it state itself. Independedntly from master. # worker change it state itself. Independedntly from master.
self.etcd.setkey("machines/runnodes/"+self.addr, "work") if self.workertype == "normal":
self.etcd.setkey("machines/runnodes/"+self.addr, "work")
publicIP = env.getenv("PUBLIC_IP") publicIP = env.getenv("PUBLIC_IP")
self.etcd.setkey("machines/publicIP/"+self.addr,publicIP) self.etcd.setkey("machines/publicIP/"+self.addr,publicIP)
self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat) self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat)
@ -204,17 +212,22 @@ class Worker(object):
# send heardbeat package to keep alive in etcd, ttl=2s # send heardbeat package to keep alive in etcd, ttl=2s
def sendheartbeat(self): def sendheartbeat(self):
while(True): if self.workertype == "normal":
# check send heartbeat package every 1s while(True):
time.sleep(2) # check send heartbeat package every 1s
[status, value] = self.etcd.getkey("machines/runnodes/"+self.addr) time.sleep(2)
if status: [status, value] = self.etcd.getkey("machines/runnodes/"+self.addr)
# master has know the worker so we start send heartbeat package if status:
if value=='ok': # master has know the worker so we start send heartbeat package
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 3) if value=='ok':
else: self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60)
logger.error("get key %s failed, master may be crashed" % self.addr) else:
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) logger.error("get key %s failed, master may be crashed" % self.addr)
self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60)
elif self.workertype == "batch":
while(True):
time.sleep(2)
self.etcd.setkey("machines/batchnodes/"+self.addr, "ok", ttl = 60)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -240,21 +240,38 @@ function processInfo()
$("#con_disk").html(usedp+"%<br/>"+detail); $("#con_disk").html(usedp+"%<br/>"+detail);
//processNetStats //processNetStats
var net_stats = data.monitor.net_stats; var net_stats = data.monitor.net_stats;
var in_rate = parseInt(net_stats.bytes_recv_per_sec); if(!$.isEmptyObject(net_stats))
var out_rate = parseInt(net_stats.bytes_sent_per_sec); {
ingress_rate = in_rate; var in_rate = parseInt(net_stats.bytes_recv_per_sec);
egress_rate = out_rate; var out_rate = parseInt(net_stats.bytes_sent_per_sec);
$("#net_in_rate").html(num2human(in_rate)+"Bps"); ingress_rate = in_rate;
$("#net_out_rate").html(num2human(out_rate)+"Bps"); egress_rate = out_rate;
$("#net_in_bytes").html(num2human(net_stats.bytes_recv)+"B"); $("#net_in_rate").html(num2human(in_rate)+"Bps");
$("#net_out_bytes").html(num2human(net_stats.bytes_sent)+"B"); $("#net_out_rate").html(num2human(out_rate)+"Bps");
$("#net_in_packs").html(net_stats.packets_recv); $("#net_in_bytes").html(num2human(net_stats.bytes_recv)+"B");
$("#net_out_packs").html(net_stats.packets_sent); $("#net_out_bytes").html(num2human(net_stats.bytes_sent)+"B");
$("#net_in_err").html(net_stats.errout); $("#net_in_packs").html(net_stats.packets_recv);
$("#net_out_err").html(net_stats.errin); $("#net_out_packs").html(net_stats.packets_sent);
$("#net_in_drop").html(net_stats.dropout); $("#net_in_err").html(net_stats.errout);
$("#net_out_drop").html(net_stats.dropin); $("#net_out_err").html(net_stats.errin);
$("#net_in_drop").html(net_stats.dropout);
$("#net_out_drop").html(net_stats.dropin);
}
else {
ingress_rate = 0;
egress_rate = 0;
$("#net_in_rate").html("--");
$("#net_out_rate").html("--");
$("#net_in_bytes").html("--");
$("#net_out_bytes").html("--");
$("#net_in_packs").html("--");
$("#net_out_packs").html("--");
$("#net_in_err").html("--");
$("#net_out_err").html("--");
$("#net_in_drop").html("--");
$("#net_out_drop").html("--");
}
},"json"); },"json");
} }

View File

@ -174,6 +174,9 @@
<li id="nav_History"> <li id="nav_History">
<a href='/history/'><i class="fa fa-history"></i> <span class="nav-label">History</span></a> <a href='/history/'><i class="fa fa-history"></i> <span class="nav-label">History</span></a>
</li> </li>
<li id="nav_Batch">
<a href='/batch_jobs/'><i class="fa fa-tasks"></i> <span class="nav-label">Batch</span></a>
</li>
{% if mysession['usergroup'] == 'root' or mysession['usergroup'] == 'admin'%} {% if mysession['usergroup'] == 'root' or mysession['usergroup'] == 'admin'%}

View File

@ -0,0 +1,248 @@
{% extends 'base_AdminLTE.html' %}
{% block title %}Docklet | Create Batch Job{% endblock %}
{% block css_src %}
<!--<style>
.divcontent { overflow-y:scroll; height:200px;}
</style>-->
<link href="//cdn.bootcss.com/datatables/1.10.11/css/dataTables.bootstrap.min.css" rel="stylesheet">
<link href="//cdn.bootcss.com/datatables/1.10.11/css/jquery.dataTables_themeroller.css" rel="stylesheet">
<link href="/static/dist/css/modalconfig.css" rel="stylesheet">
{% endblock %}
{% block panel_title %}Batch Job Info{% endblock %}
{% block panel_list %}
<ol class="breadcrumb">
<li>
<a href="/dashboard/"><i class="fa fa-dashboard"></i>Home</a>
</li>
</ol>
{% endblock %}
<div>
{% block content %}
<div class="row">
<div class="col-lg-12">
<div class="box box-info">
<div class="box-header with-border">
<h3 class="box-title">Batch Job Create
<button type="button" id="add_task" class="btn btn-box-tool" title="add a task"><i class="fa fa-plus"></i>
</button>
</h3>
<div class="box-tools pull-right">
<button type="button" class="btn btn-box-tool" data-widget="collapse"><i class="fa fa-minus"></i>
</button>
<button type="button" class="btn btn-box-tool" data-widget="remove"><i class="fa fa-times"></i></button>
</div>
</div>
<div class="box-body">
<form id="form" class="form-horizontal" action="/batch_job/{{masterips[0].split("@")[0]}}/add/" method="POST">
<div class="form-group"><label class="col-sm-2 control-label">Job Name</label>
<div class="col-sm-10"><input type="text" class="form-control" name="jobName" id="job_name"></div>
</div>
<div class="hr-line-dashed"></div>
<br/>
<div class="form-group"><label class="col-sm-2 control-label">Priority</label>
<div class="col-sm-10"><select id="priority_selector" class="form-control" name="jobPriority">
{% for priority in range(10) %}
<option value="{{priority}}">{{priority}}</option>
{% endfor %}
</select></div>
</div>
<br/>
<div class="hr-line-dashed"></div>
<div class="panel-group" id="accordion">
<!-- Tasks -->
</div>
<br/>
<div class="hr-line-dashed"></div>
<div class="row">
<div class="form-group">
<div class="col-sm-4 col-sm-offset-2">
<button class="btn btn-primary" type="submit">Create</button>
</div>
</div>
</div>
</form>
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block script_src %}
<!-- Custom and plugin javascript -->
<script src="/static/js/inspinia.js"></script>
<script src="http://cdn.bootcss.com/pace/1.0.2/pace.min.js"></script>
<!-- Steps -->
<script src="http://cdn.bootcss.com/jquery-steps/1.1.0/jquery.steps.min.js"></script>
<!-- Jquery Validate -->
<script src="http://cdn.bootcss.com/jquery-validate/1.15.0/jquery.validate.min.js"></script>
<script src="http://cdn.bootcss.com/datatables/1.10.11/js/jquery.dataTables.js"></script>
<script src="http://cdn.bootcss.com/datatables/1.10.11/js/dataTables.bootstrap.js"></script>
<script src="http://cdn.bootcss.com/datatables-tabletools/2.1.5/js/TableTools.min.js"></script>
<script type="text/javascript">
var task_number = 0;
var mapping_number = 0;
function removeTask(obj) {
$("#task_pannel_" + obj.id).remove();
}
function removeMapping(obj) {
$("#mapping_" + obj.id).remove();
}
function addMapping(obj) {
mapping_number += 1;
var table = $("#storage_mapping_" + obj.id)[0];
var new_mapping = table.insertRow();
new_mapping.id = "mapping_" + task_number + "_" + mapping_number;
var local_dir = new_mapping.insertCell();
var remote_dir = new_mapping.insertCell();
var source = new_mapping.insertCell();
var remove = new_mapping.insertCell();
local_dir.innerHTML = '<input type="text" class="form-control" name="mappingLocalDir_' + task_number + '_' + mapping_number + '" id="mapping_local_dir_'
+ task_number + '_' + mapping_number + '" />';
remote_dir.innerHTML = '<input type="text" class="form-control" name="mappingRemoteDir_' + task_number + '_' + mapping_number + '" id="mapping_remote_dir_'
+ task_number + '_' + mapping_number + '" />';
source.innerHTML = '<select class="form-control" name="mappingSource_' + task_number + '_' + mapping_number + '" id="mapping_source_'
+ task_number + '_' + mapping_number + '">'
+'<option>Aliyun</option><option>AWS</option></select>';
remove.innerHTML = '<div class="box-tool pull-left"><button type="button" id="' + task_number + '_' + mapping_number +'" onclick="removeMapping(this)" class="btn btn-xs btn-danger">'
+'Remove</button></div>';
}
$("#add_task").click(function() {
task_number += 1;
mapping_number = 0;
var task_html = '';
task_html +=
'<div class="panel panel-default" id="task_pannel_' + task_number + '">'
+'<div class="panel-heading">'
+'<h4 class="panel-title">'
+'<a data-toggle="collapse" data-panel="#accordion" href="#collapse' + task_number + '">'
+'Task #' + task_number
+'</a><div class="box-tools pull-right"><button type="button" id="' + task_number + '" onclick="removeTask(this)" class="btn btn-box-tool"><i class="fa fa-times"></i></button></div>'
+'</h4></div>'
+'<div id="collapse' + task_number + '" class="panel-collapse collapse">'
+'<div class="panel-body">'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">CPU</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="cpuSetting_' + task_number + '" id="cpuSetting_' + task_number + '" value = 1 />'
+'</div>'
+'<label class="col-sm-2 control-label">Memory</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="memorySetting_' + task_number + '" id="memorySetting_' + task_number + '" value = 1024 />'
+'</div>MB</div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">GPU</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="gpuSetting_' + task_number + '" id="gpuSetting_' + task_number + '" value= 0 />'
+'</div>'
+'<label class="col-sm-2 control-label">Disk</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="diskSetting_' + task_number + '" id="diskSetting_' + task_number + '" value= 1024 />'
+'</div>MB</div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">Instance Count</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="instCount_' + task_number + '" id="instCount_' + task_number + '" value= 1 />'
+'</div>'
+'<label class="col-sm-2 control-label">Max Retry Count</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="retryCount_' + task_number + '" id="retryCount_' + task_number + '" value= 1 />'
+'</div></div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">Source Code Address</label>'
+'<div class="col-sm-3"><input type="text" class="form-control" name="srcAddr_' + task_number + '" id="srcAddr_' + task_number + '" />'
+'</div>'
+'<label class="col-sm-2 control-label">Expire Time</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="expTime_' + task_number + '" id="expTime_' + task_number + '" value= 60 />'
+'</div>Seconds</div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">Stderr Redirect Path</label>'
+'<div class="col-sm-3"><input type="text" class="form-control" placeholder="/path/to/file or /path/" name="stdErrRedPth_' + task_number + '" id="stdErrRedPth_' + task_number + '" value="/root/nfs/batch_{jobid}/" />'
+'</div>'
+'<label class="col-sm-2 control-label">Stdout Redirect Path</label>'
+'<div class="col-sm-3"><input type="text" class="form-control" placeholder="/path/to/file or /path/" name="stdOutRedPth_' + task_number + '" id="stdOutRedPth_' + task_number + '" value="/root/nfs/batch_{jobid}/" />'
+'</div></div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">Dependency&nbsp<i class="fa fa-question-circle" title="The tasks that this task depends on, seperate them with commas, eg: task_1, task_2"></i></label>'
+'<div class="col-sm-3"><input type="text" class="form-control" name="dependency_' + task_number + '" id="dependency_' + task_number + '" />'
+'</div>'
+'<label class="col-sm-2 control-label">Command</label>'
+'<div class="col-sm-3"><input type="text" class="form-control" name="command_' + task_number + '" id="command_' + task_number + '" />'
+'</div></div>'
+'<div class="form-group"><label class="col-sm-2 control-label">Image Choose</label>'
+'<div class="col-sm-10">'
+'<table id="imagetable" class="table table-striped table-bordered table-hover table-image" >'
+'<thead>'
+'<tr>'
+'<th>ImageName</th>'
+'<th>Type</th>'
+'<th>Owner</th>'
+'<th>Description</th>'
+'<th>Choose</th>'
+'</tr>'
+'</thead>'
+'<tbody>'
+'<tr>'
+'<td>base</td>'
+'<td>public</td>'
+'<td>docklet</td>'
+'<td>A base image for you</td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + task_number + '" value="base_base_base" checked="checked"></label></div></td>'
+'</tr>'
+'{% for image in images['private'] %}'
+'<tr>'
+'<td>{{image['name']}}</td>'
+'<td>private</td>'
+'<td>{{user}}</td>'
+'<td><a href="/image/{{masterips[0].split("@")[1]}}/description/{{image['name']}}_{{user}}_private/" target="_blank">{{image['description']}}</a></td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + task_number + '" value="{{image['name']}}_{{user}}_private"></label></div></td>'
+'</tr>'
+'{% endfor %}'
+'{% for p_user,p_images in images['public'].items() %}'
+'{% for image in p_images %}'
+'<tr>'
+'<td>{{image['name']}}</td>'
+'<td>public</td>'
+'<td>{{p_user}}</td>'
+'<td><a href="/image/{{masterips[0].split("@")[1]}}/description/{{image['name']}}_{{p_user}}_public/" target="_blank">{{image['description']}}</a></td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + task_number + '" value="{{image['name']}}_{{p_user}}_public"></label></div></td>'
+'</tr>'
+'{% endfor %}'
+'{% endfor %}'
+'</tbody>'
+'</table>'
+'</div>'
+'</div>'
+'<div class="form-group">'
+'<span>'
+'<label class="col-sm-2 contril-label">Exteranl Storage Mapping</label>'
+'<table class="table table-bordered" id="storage_mapping_' + task_number + '" style="display:inline;">'
+'<thead>'
+'<tr><td><button type="button" id="' + task_number + '" class="btn btn-primary btn-xs" title="add an external storage mapping" onclick="addMapping(this)">'
+'<i class="fa fa-plus"></i></button></td></tr>'
+'<tr><th style="width:217px">Local Dir</th><th style="width:217px">Remote Dir</th><th style="width:217px">source</th><th style="width:217px">Operation</th></tr>'
+'</thead>'
+'<tbody>'
+'</tbody>'
+'</table>'
+'</span></div>'
+'</div></div></div>'
$(task_html).appendTo("#accordion");
});
</script>
{% endblock %}

View File

@ -0,0 +1,126 @@
{% extends "base_AdminLTE.html"%}
{% block title %}Docklet | Batch Job{% endblock %}
{% block panel_title %}Batch Job{% endblock %}
{% block panel_list %}
<ol class="breadcrumb">
<li>
<a href="/dashboard/"><i class="fa fa-dashboard"></i>Home</a>
</li>
<li class="active">
<strong>Batch Job</strong>
</li>
</ol>
{% endblock %}
{% block content %}
<div class="row">
<div class="col-lg-12">
<div class="box box-info">
<div class="box-header with-border">
<h3 class="box-title">Batch Job List</h3>
<div class="box-tools pull-right">
<button type="button" class="btn btn-box-tool" data-widget="collapse"><i class="fa fa-minus"></i>
</button>
<button type="button" class="btn btn-box-tool" data-widget="remove"><i class="fa fa-times"></i></button>
</div>
</div>
<div class="box-body">
<p>
<a href="/batch_job/create/"><button type="button" class="btn btn-primary btn-sm"><i class="fa fa-plus"></i> Create Batch Job</button></a>
</p>
{% for job_info in job_list %}
<div class="modal inmodal" id='OutputModal_{{ job_info['job_id'] }}' tabindex="-1" role="dialog" aria-hidden="true">
<div class="modal-dialog">
<div class="modal-content animated fadeIn">
<div class="modal-header">
<button type="button" class="close" data-dismiss="modal"><span aria-hidden="true">&times;</span><span class="sr-only">Close</span></button>
<h4 class="modal-title">Job:{{ job_info['job_name'] }}({{ job_info['job_id'] }}) Stdout and Stderr of tasks</h4>
</div>
<div class="modal-body">
<table width="100%" cellspacing="0" class="table table-bordered table-striped table-hover table-output">
<thead>
<tr>
<th>Task ID</th>
<th>Instance ID</th>
<th>Stdout</th>
<th>Stderr</th>
</tr>
</thead>
<tbody>
{% for taskid in job_info['tasks'] %}
{% for instid in range(job_info['tasks_instCount'][taskid]) %}
<tr>
<td>{{ taskid }}</td>
<td>{{ instid }}</td>
<td><a class="btn btn-info btn-xs" href='/batch_job/output/{{ job_info["job_id"] }}/{{ taskid }}/{{ instid }}/stdout/' target="_blank">Stdout</a></td>
<td><a class="btn btn-info btn-xs" href='/batch_job/output/{{ job_info["job_id"] }}/{{ taskid }}/{{ instid }}/stderr/' target="_blank">Stderr</a></td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>
<div class="modal-footer">
<button type="button" class="btn btn-white" data-dismiss="modal">Close</button>
</div>
</div>
</div>
</div>
</div>
{% endfor %}
<div class="table">
<table width="100%" cellspacing="0" style="margin: 0 auto;" class="table table-striped table-bordered table-hover table-batch">
<thead>
<tr>
<th>ID</th>
<th>Name</th>
<th>Status</th>
<th>Tasks</th>
<th>Operations</th>
<th>Create Time</th>
<th>Stdout and Stderr</th>
</tr>
<thead>
<tbody>
{% for job_info in job_list %}
<tr>
<td>{{ job_info['job_id'] }}</td>
<td>{{ job_info['job_name'] }}</td>
<td>
{{ job_info['status'] }}
</td>
<td>Tasks</td>
<td><button type="button" class="btn btn-xs btn-default">&nbsp;&nbsp;&nbsp;Stop&nbsp;&nbsp;&nbsp;</button></td>
<td>{{ job_info['create_time'] }}</td>
<td><a role="button" class="btn btn-info btn-xs" id='{{ job_info['job_id'] }}_output' data-toggle="modal" data-target='#OutputModal_{{ job_info['job_id'] }}'>Get Output</a></td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
</div>
</div>
{% endblock %}
{% block script_src %}
<script src="//cdn.bootcss.com/datatables/1.10.11/js/jquery.dataTables.min.js"></script>
<script src="//cdn.bootcss.com/datatables/1.10.11/js/dataTables.bootstrap.min.js"></script>
<script type="text/javascript">
$(document).ready(function() {
$(".table-batch").DataTable({"scrollX":true});
$(".table-output").DataTable({
"lengthChange":false});
});
function sendAdd(){
document.getElementById("addForm").submit();
}
function sendDel(){
document.getElementById("delForm").submit();
}
</script>
{% endblock %}

View File

@ -0,0 +1,59 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<title>Docklet | Batch {{ issue }}: {{ jobid }}/{{ taskid }}/{{ instid }}</title>
<!-- Tell the browser to be responsive to screen width -->
<meta content="width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no" name="viewport">
<link rel="shortcut icon" href="/static/img/favicon.ico">
<link href="//cdn.bootcss.com/bootstrap/3.3.5/css/bootstrap.min.css" rel="stylesheet">
<!-- Font Awesome -->
<link href="//cdn.bootcss.com/font-awesome/4.3.0/css/font-awesome.min.css" rel="stylesheet">
<!-- Ionicons -->
<link href="//cdn.bootcss.com/ionicons/2.0.1/css/ionicons.min.css" rel="stylesheet">
<link href="//cdn.bootcss.com/animate.css/3.5.1/animate.min.css" rel="stylesheet">
<link href="//cdn.bootcss.com/toastr.js/latest/css/toastr.min.css" rel="stylesheet">
<!-- Theme style -->
<link rel="stylesheet" href="/static/dist/css/AdminLTE.min.css">
<link rel="stylesheet" href="/static/dist/css/skins/skin-blue.min.css">
</head>
<body>
<h3>Jobid: {{ jobid }}</h3>
<h3>Taskid: {{ taskid }}</h3>
<h3>Instanceid: {{ instid }}</h3>
<h4><small>The output of {{ issue }} will be updated in every 2 seconds.</small></h4>
<hr>
<pre id="output">{{ output }}</pre>
<!-- jQuery 2.2.1 -->
<script src="//cdn.bootcss.com/jquery/2.2.1/jquery.min.js"></script>
<!-- Bootstrap 3.3.5 -->
<script src="//cdn.bootcss.com/bootstrap/3.3.5/js/bootstrap.min.js"></script>
<!-- AdminLTE App -->
<script src="/static/dist/js/app.min.js"></script>
<script src="//cdn.bootcss.com/fastclick/1.0.6/fastclick.min.js"></script>
<script src="//cdn.bootcss.com/jQuery-slimScroll/1.3.7/jquery.slimscroll.min.js"></script>
<script src="//cdn.bootcss.com/toastr.js/latest/js/toastr.min.js"></script>
<script type="text/javascript">
function updateOutput()
{
var host = window.location.host;
url = "http://" + host + "/batch/job/output/" + "{{ masterip }}" + "/" + "{{ jobid }}" + "/" + "{{ taskid }}" + "/" + "{{ instid }}" + "/" + "{{ issue }}" + "/";
$.post(url,{},function(data){
$("#output").html(data.data);
},"json");
}
setInterval(updateOutput,2000);
</script>
</body>
</html>

View File

View File

@ -41,6 +41,7 @@ from webViews.reportbug import *
from webViews.authenticate.auth import login_required, administration_required,activated_required from webViews.authenticate.auth import login_required, administration_required,activated_required
from webViews.authenticate.register import registerView from webViews.authenticate.register import registerView
from webViews.authenticate.login import loginView, logoutView from webViews.authenticate.login import loginView, logoutView
from webViews.batch import *
import webViews.dockletrequest import webViews.dockletrequest
from webViews import cookie_tool from webViews import cookie_tool
import traceback import traceback
@ -127,6 +128,49 @@ def reportBug():
reportBugView.bugmessage = request.form['bugmessage'] reportBugView.bugmessage = request.form['bugmessage']
return reportBugView.as_view() return reportBugView.as_view()
@app.route("/batch_jobs/", methods=['GET'])
@login_required
def batch_job():
return batchJobListView().as_view()
@app.route("/batch_job/create/", methods=['GET'])
@login_required
def create_batch_job():
return createBatchJobView().as_view()
@app.route("/batch_job/<masterip>/add/", methods=['POST'])
@login_required
def add_batch_job(masterip):
addBatchJobView.masterip = masterip
addBatchJobView.job_data = request.form
return addBatchJobView().as_view()
@app.route("/batch_job/state/", methods=['GET'])
@login_required
def state_batch_job():
return stateBatchJobView().as_view()
@app.route("/batch_job/output/<jobid>/<taskid>/<instid>/<issue>/", methods=['GET'])
@login_required
def output_batch_job(jobid, taskid, instid, issue):
outputBatchJobView.jobid = jobid
outputBatchJobView.taskid = taskid
outputBatchJobView.instid = instid
outputBatchJobView.issue = issue
return outputBatchJobView().as_view()
@app.route("/batch/job/output/<masterip>/<jobid>/<taskid>/<instid>/<issue>/", methods=['POST'])
@login_required
def output_batch_job_request(masterip, jobid, taskid, instid, issue):
data = {
'jobid':jobid,
'taskid':taskid,
'instid':instid,
'issue':issue
}
result = dockletRequest.post("/batch/job/output/",data,masterip)
return json.dumps(result)
@app.route("/workspace/create/", methods=['GET']) @app.route("/workspace/create/", methods=['GET'])
#@activated_required #@activated_required
def addCluster(): def addCluster():

78
web/webViews/batch.py Normal file
View File

@ -0,0 +1,78 @@
from flask import session, redirect, request
from webViews.view import normalView
from webViews.log import logger
from webViews.checkname import checkname
from webViews.dockletrequest import dockletRequest
class batchJobListView(normalView):
template_path = "batch/batch_list.html"
@classmethod
def get(self):
masterips = dockletRequest.post_to_all()
result = dockletRequest.post("/batch/job/list/",{},masterips[0].split("@")[0])
job_list = result.get("data")
logger.debug("job_list: %s" % job_list)
if True:
return self.render(self.template_path, job_list=job_list)
else:
return self.error()
class createBatchJobView(normalView):
template_path = "batch/batch_create.html"
@classmethod
def get(self):
masterips = dockletRequest.post_to_all()
images = dockletRequest.post("/image/list/",{},masterips[0].split("@")[0]).get("images")
if True:
return self.render(self.template_path, masterips=masterips, images=images)
else:
return self.error()
class stateBatchJobView(normalView):
template_path = "batch/batch_state.html"
@classmethod
def get(self):
if True:
return self.render(self.template_path)
else:
return self.error()
class addBatchJobView(normalView):
template_path = "batch/batch_list.html"
@classmethod
def post(self):
masterip = self.masterip
result = dockletRequest.post("/batch/job/add/", self.job_data, masterip)
if result.get('success', None) == "true":
return redirect('/batch_jobs/')
else:
return self.error()
class outputBatchJobView(normalView):
template_path = "batch/batch_output.html"
jobid = ""
taskid = ""
instid = ""
issue = ""
@classmethod
def get(self):
masterips = dockletRequest.post_to_all()
data = {
'jobid':self.jobid,
'taskid':self.taskid,
'instid':self.instid,
'issue':self.issue
}
result = dockletRequest.post("/batch/job/output/",data,masterips[0].split("@")[0])
output = result.get("data")
#logger.debug("job_list: %s" % job_list)
if result.get('success',"") == "true":
return self.render(self.template_path, masterip=masterips[0].split("@")[0], jobid=self.jobid,
taskid=self.taskid, instid=self.instid, issue=self.issue, output=output)
else:
return self.error()

View File

@ -21,7 +21,6 @@ class statusView(normalView):
print(quotainfo)''' print(quotainfo)'''
allcontainers = {} allcontainers = {}
if (result): if (result):
containers = {}
for master in allclusters: for master in allclusters:
allcontainers[master] = {} allcontainers[master] = {}
for cluster in allclusters[master]: for cluster in allclusters[master]:
@ -32,6 +31,18 @@ class statusView(normalView):
else: else:
self.error() self.error()
allcontainers[master][cluster] = message allcontainers[master][cluster] = message
message = dockletRequest.post('/batch/vnodes/list/', data, master.split("@")[0])
message = message.get('data')
containers = []
for m in message:
container = {}
container['containername'] = m
container['ip'] = '--'
containers.append(container)
tmp = {}
tmp['containers'] = containers
tmp['status'] = 'running'
allcontainers[master]['Batch_Job'] = tmp
return self.render(self.template_path, quotas = quotas, quotanames = quotanames, allcontainers = allcontainers, user = session['username']) return self.render(self.template_path, quotas = quotas, quotanames = quotanames, allcontainers = allcontainers, user = session['username'])
else: else:
self.error() self.error()