diff --git a/bin/docklet-worker b/bin/docklet-worker index 02f1a01..d8affaf 100755 --- a/bin/docklet-worker +++ b/bin/docklet-worker @@ -20,6 +20,8 @@ FS_PREFIX=/opt/docklet # cluster net ip range, default is 172.16.0.1/16 CLUSTER_NET="172.16.0.1/16" +# ip addresses range of containers for batch job, default is 10.0.3.0/24 +BATCH_NET="10.0.3.0/24" #configurable-http-proxy public port, default is 8000 PROXY_PORT=8000 #configurable-http-proxy api port, default is 8001 @@ -43,6 +45,13 @@ DAEMON_OPTS= # The process ID of the script when it runs is stored here: PIDFILE=$RUN_DIR/$DAEMON_NAME.pid +# settings for docklet batch worker, which is required for batch job processing system +BATCH_ON=True +DAEMON_BATCH=$DOCKLET_LIB/worker/taskcontroller.py +DAEMON_NAME_BATCH=docklet-taskcontroller +PIDFILE_BATCH=$RUN_DIR/batch.pid +DAEMON_OPTS_BATCH= + # settings for docklet proxy, which is required for web access DAEMON_PROXY=`which configurable-http-proxy` DAEMON_NAME_PROXY=docklet-proxy @@ -83,6 +92,7 @@ pre_start () { # iptables for NAT network for containers to access web iptables -t nat -F iptables -t nat -A POSTROUTING -s $CLUSTER_NET -j MASQUERADE + iptables -t nat -A POSTROUTING -s $BATCH_NET -j MASQUERADE if [ ! -d $FS_PREFIX/local/basefs ]; then log_daemon_msg "basefs does not exist, run prepare.sh first" && exit 1 @@ -95,12 +105,27 @@ pre_start () { do_start() { pre_start + + DAEMON_OPTS=$1 log_daemon_msg "Starting $DAEMON_NAME in $FS_PREFIX" #python3 $DAEMON start-stop-daemon --start --oknodo --background --pidfile $PIDFILE --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON -- $DAEMON_OPTS log_end_msg $? } +do_start_batch () { + if [ "$BATCH_ON" = "False" ] + then + return 1 + fi + log_daemon_msg "Starting $DAEMON_NAME_BATCH in $FS_PREFIX" + + DAEMON_OPTS_BATCH="" + + start-stop-daemon --start --background --pidfile $PIDFILE_BATCH --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON_BATCH -- $DAEMON_OPTS_BATCH + log_end_msg $? +} + do_start_proxy () { if [ "$DISTRIBUTED_GATEWAY" = "False" ] then @@ -118,6 +143,16 @@ do_stop () { log_end_msg $? } +do_stop_batch () { + if [ "$BATCH_ON" = "False" ] + then + return 1 + fi + log_daemon_msg "Stopping $DAEMON_NAME_BATCH daemon" + start-stop-daemon --stop --quiet --oknodo --remove-pidfile --pidfile $PIDFILE_BATCH --retry 10 + log_end_msg $? +} + do_stop_proxy () { if [ "$DISTRIBUTED_GATEWAY" = "False" ] then @@ -145,12 +180,14 @@ do_stop_meter() { case "$1" in start) - do_start + do_start "normal-worker" + do_start_batch do_start_proxy ;; stop) do_stop + do_stop_batch do_stop_proxy ;; start-meter) @@ -161,6 +198,16 @@ case "$1" in do_stop_meter ;; + start_batch) + do_start "batch-worker" + do_start_batch + ;; + + stop_batch) + do_stop + do_stop_batch + ;; + start_proxy) do_start_proxy ;; @@ -176,13 +223,16 @@ case "$1" in restart) do_stop + do_stop_batch do_stop_proxy - do_start + do_start "normal-worker" + do_start_batch do_start_proxy ;; status) status_of_proc -p $PIDFILE "$DAEMON" "$DAEMON_NAME" && exit 0 || exit $? + status_of_proc -p $PIDFILE_BATCH "$DAEMON_BATCH" "$DAEMON_NAME_BATCH" || status=$? status_of_proc -p $PIDFILE_PROXY "$DAEMON_PROXY" "$DAEMON_NAME_PROXY" || status=$? ;; *) diff --git a/conf/container.batch.conf b/conf/container.batch.conf new file mode 100644 index 0000000..f91af20 --- /dev/null +++ b/conf/container.batch.conf @@ -0,0 +1,52 @@ +# This is the common container.conf for all containers. +# If want set custom settings, you have two choices: +# 1. Directly modify this file, which is not recommend, because the +# setting will be overriden when new version container.conf released. +# 2. Use a custom config file in this conf directory: lxc.custom.conf, +# it uses the same grammer as container.conf, and will be merged +# with the default container.conf by docklet at runtime. +# +# The following is an example mounting user html directory +# lxc.mount.entry = /public/home/%USERNAME%/public_html %ROOTFS%/root/public_html none bind,rw,create=dir 0 0 +# + +#### include /usr/share/lxc/config/ubuntu.common.conf +lxc.include = /usr/share/lxc/config/ubuntu.common.conf + +############## DOCKLET CONFIG ############## + +# Setup 0 tty devices +lxc.tty = 0 + +lxc.rootfs = %ROOTFS% +lxc.utsname = %HOSTNAME% + +lxc.network.type = veth +lxc.network.name = eth0 +lxc.network.link = lxcbr0 +lxc.network.flags = up +lxc.network.ipv4 = %IP% +lxc.network.ipv4.gateway = %GATEWAY% + +lxc.cgroup.pids.max = 2048 +lxc.cgroup.memory.limit_in_bytes = %CONTAINER_MEMORY%M +#lxc.cgroup.memory.kmem.limit_in_bytes = 512M +#lxc.cgroup.memory.soft_limit_in_bytes = 4294967296 +#lxc.cgroup.memory.memsw.limit_in_bytes = 8589934592 + +# lxc.cgroup.cpu.cfs_period_us : period time of cpu, default 100000, means 100ms +# lxc.cgroup.cpu.cfs_quota_us : quota time of this process +lxc.cgroup.cpu.cfs_quota_us = %CONTAINER_CPU% + +lxc.cap.drop = sys_admin net_admin mac_admin mac_override sys_time sys_module + +lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/data %ROOTFS%/root/nfs none bind,rw,create=dir 0 0 +#lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/%CLUSTERID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0 +lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/ssh %ROOTFS%/root/.ssh none bind,ro,create=dir 0 0 +lxc.mount.entry = %FS_PREFIX%/local/temp/%LXCNAME%/ %ROOTFS%/tmp none bind,rw,create=dir 0 0 + +# setting hostname +lxc.hook.pre-start = HNAME=%HOSTNAME% %LXCSCRIPT%/lxc-prestart + +# setting nfs softlink +#lxc.hook.mount = %LXCSCRIPT%/lxc-mount diff --git a/conf/docklet.conf.template b/conf/docklet.conf.template index e9c838b..912b6e5 100644 --- a/conf/docklet.conf.template +++ b/conf/docklet.conf.template @@ -182,3 +182,32 @@ # ALLOW_SCALE_OUT: allow docklet to rent server on the cloud to scale out # Only when you deploy docklet on the cloud can you set it to True # ALLOW_SCALE_OUT=False + +# ================================================== +# +# Batch Config +# +# ================================================== + +# BATCH_ON: whether to start batch job processing system when start +# the docklet. Default: True +# BATCH_ON=True + +# BATCH_MASTER_PORT: the rpc server port on master. +# default: 50050 +# BATCH_MASTER_PORT=50050 + +# BATCH_WORKER_PORT: the rpc server port on worker. +# default: 50051 +# BATCH_WORKER_PORT=50051 + +# BATCH_GATEWAY: the ip address of gateway for the containers processing +# batch jobs. default: 10.0.3.1 +# BATCH_GATEWAY=10.0.3.1 + +# BATCH_NET: ip addresses range of containers for batch job, default is 10.0.3.0/24 +# BATCH_NET=10.0.3.0/24 + +# BATCH_MAX_THREAD_WORKER: the maximun number of threads of the rpc server on +# the batch job worker. default:5 +# BATCH_MAX_THREAD_WORKER=5 diff --git a/prepare.sh b/prepare.sh index f7b4208..f58f006 100755 --- a/prepare.sh +++ b/prepare.sh @@ -16,7 +16,7 @@ fi # some packages' name maybe different in debian apt-get install -y cgmanager lxc lxcfs lxc-templates lvm2 bridge-utils curl exim4 openssh-server openvswitch-switch apt-get install -y python3 python3-netifaces python3-flask python3-flask-sqlalchemy python3-pampy python3-httplib2 python3-pip -apt-get install -y python3-psutil python3-flask-migrate +apt-get install -y python3-psutil python3-flask-migrate python3-paramiko apt-get install -y python3-lxc apt-get install -y python3-requests python3-suds apt-get install -y nodejs nodejs-legacy npm @@ -24,6 +24,9 @@ apt-get install -y etcd apt-get install -y glusterfs-client attr apt-get install -y nginx pip3 install Flask-WTF +apt-get install -y gdebi-core +gdebi ossfs_1.80.5_ubuntu16.04_amd64.deb +pip3 install grpcio grpcio-tools googleapis-common-protos #add ip forward echo "net.ipv4.ip_forward=1" >>/etc/sysctl.conf diff --git a/src/master/httprest.py b/src/master/httprest.py index b867e81..cb2d00e 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -27,7 +27,7 @@ import http.server, cgi, json, sys, shutil, traceback import xmlrpc.client from socketserver import ThreadingMixIn from utils import etcdlib, imagemgr -from master import nodemgr, vclustermgr, notificationmgr, lockmgr, cloudmgr +from master import nodemgr, vclustermgr, notificationmgr, lockmgr, cloudmgr, jobmgr, taskmgr from utils.logs import logs from master import userManager, beansapplicationmgr, monitor, sysmgr, network from worker.monitor import History_Manager @@ -790,6 +790,128 @@ def resetall_system(user, beans, form): return json.dumps({'success':'false', 'message': message}) return json.dumps(result) +@app.route("/batch/job/add/", methods=['POST']) +@login_required +def add_job(user,beans,form): + global G_jobmgr + job_data = form.to_dict() + job_info = { + 'tasks': {} + } + message = { + 'success': 'true', + 'message': 'add batch job success' + } + for key in job_data: + key_arr = key.split('_') + value = job_data[key] + if key_arr[0] == 'srcAddr' and value == '': + task_idx = 'task_' + key_arr[1] + if task_idx in job_info['tasks']: + job_info['tasks'][task_idx]['srcAddr'] = '/root/nfs' + else: + job_info['tasks'][task_idx] = { + 'srcAddr': '/root/nfs/' + } + elif key_arr[0] != 'dependency'and value == '': + message['success'] = 'false' + message['message'] = 'value of %s is null' % key + elif len(key_arr) == 1: + job_info[key_arr[0]] = value + elif len(key_arr) == 2: + key_prefix, task_idx = key_arr[0], key_arr[1] + task_idx = 'task_' + task_idx + if task_idx in job_info["tasks"]: + job_info["tasks"][task_idx][key_prefix] = value + else: + tmp_dict = { + key_prefix: value + } + job_info["tasks"][task_idx] = tmp_dict + elif len(key_arr) == 3: + key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2] + task_idx = 'task_' + task_idx + mapping_idx = 'mapping_' + mapping_idx + if task_idx in job_info["tasks"]: + if "mapping" in job_info["tasks"][task_idx]: + if mapping_idx in job_info["tasks"][task_idx]["mapping"]: + job_info["tasks"][task_idx]["mapping"][mapping_idx][key_prefix] = value + else: + tmp_dict = { + key_prefix: value + } + job_info["tasks"][task_idx]["mapping"][mapping_idx] = tmp_dict + else: + job_info["tasks"][task_idx]["mapping"] = { + mapping_idx: { + key_prefix: value + } + } + else: + tmp_dict = { + "mapping":{ + mapping_idx: { + key_prefix: value + } + } + } + job_info["tasks"][task_idx] = tmp_dict + logger.debug('batch job adding info %s' % json.dumps(job_info, indent=4)) + [status, msg] = G_jobmgr.add_job(user, job_info) + if status: + return json.dumps(message) + else: + logger.debug('fail to add batch job: %s' % msg) + message["success"] = "false" + message["message"] = msg + return json.dumps(message) + return json.dumps(message) + +@app.route("/batch/job/list/", methods=['POST']) +@login_required +def list_job(user,beans,form): + global G_jobmgr + result = { + 'success': 'true', + 'data': G_jobmgr.list_jobs(user) + } + return json.dumps(result) + +@app.route("/batch/job/output/", methods=['POST']) +@login_required +def get_output(user,beans,form): + global G_jobmgr + jobid = form.get("jobid","") + taskid = form.get("taskid","") + instid = form.get("instid","") + issue = form.get("issue","") + result = { + 'success': 'true', + 'data': G_jobmgr.get_output(user,jobid,taskid,instid,issue) + } + return json.dumps(result) + + +@app.route("/batch/job/info/", methods=['POST']) +@login_required +def info_job(user,beans,form): + pass + +@app.route("/batch/task/info/", methods=['POST']) +@login_required +def info_task(user,beans,form): + pass + +@app.route("/batch/vnodes/list/", methods=['POST']) +@login_required +def batch_vnodes_list(user,beans,form): + global G_taskmgr + result = { + 'success': 'true', + 'data': G_taskmgr.get_user_batch_containers(user) + } + return json.dumps(result) + # @app.route("/inside/cluster/scaleout/", methods=['POST']) # @inside_ip_required # def inside_cluster_scalout(cur_user, cluster_info, form): @@ -857,6 +979,8 @@ if __name__ == '__main__': global G_applicationmgr global G_ulockmgr global G_cloudmgr + global G_jobmgr + global G_taskmgr # move 'tools.loadenv' to the beginning of this file fs_path = env.getenv("FS_PREFIX") @@ -973,4 +1097,10 @@ if __name__ == '__main__': # server = http.server.HTTPServer((masterip, masterport), DockletHttpHandler) logger.info("starting master server") + G_taskmgr = taskmgr.TaskMgr(G_nodemgr, monitor.Fetcher) + G_jobmgr = jobmgr.JobMgr(G_taskmgr) + G_taskmgr.set_jobmgr(G_jobmgr) + G_taskmgr.start() + G_jobmgr.start() + app.run(host = masterip, port = masterport, threaded=True) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py new file mode 100644 index 0000000..e758d79 --- /dev/null +++ b/src/master/jobmgr.py @@ -0,0 +1,180 @@ +import time, threading, random, string, os, traceback +import master.monitor +import subprocess + +from utils.log import initlogging, logger +from utils import env + +class BatchJob(object): + def __init__(self, user, job_info): + self.user = user + self.raw_job_info = job_info + self.task_queue = [] + self.task_finished = [] + self.job_id = None + self.job_name = job_info['jobName'] + self.job_priority = int(job_info['jobPriority']) + self.status = 'pending' + self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) + self.top_sort() + + # transfer the dependency graph into a job queue + def top_sort(self): + logger.debug('top sorting') + tasks = self.raw_job_info["tasks"] + dependency_graph = {} + for task_idx in tasks: + dependency_graph[task_idx] = set() + task_info = tasks[task_idx] + dependency = task_info['dependency'].strip().replace(' ', '').split(',') + if len(dependency) == 1 and dependency[0] == '': + continue + for t in dependency: + if not t in tasks: + raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx)) + dependency_graph[task_idx].add(t) + while len(dependency_graph) > 0: + s = set() + flag = False + for task_idx in dependency_graph: + if len(dependency_graph[task_idx]) == 0: + flag = True + s.add(task_idx) + for task_idx in s: + dependency_graph.pop(task_idx) + #there is a circle in the graph + if not flag: + raise ValueError('there is a circle in the dependency graph') + break + for task_idx in dependency_graph: + for t in s: + if t in dependency_graph[task_idx]: + dependency_graph[task_idx].remove(t) + self.task_queue.append({ + 'task_idx': s, + 'status': 'pending' + }) + + # get a task and pass it to taskmgr + def get_task(self): + for task in self.task_queue: + if task['status'] == 'pending': + task_idx = task['task_idx'].pop() + task['status'] = 'running' + task_name = self.user + '_' + self.job_id + '_' + task_idx + return task_name, self.raw_job_info["tasks"][task_idx] + return '', None + + # a task has finished + def finish_task(self, task_idx): + pass + +class JobMgr(threading.Thread): + # load job information from etcd + # initial a job queue and job schedueler + def __init__(self, taskmgr): + threading.Thread.__init__(self) + self.job_queue = [] + self.job_map = {} + self.taskmgr = taskmgr + self.fspath = env.getenv('FS_PREFIX') + + def run(self): + while True: + self.job_scheduler() + time.sleep(2) + + # user: username + # job_data: a json string + # user submit a new job, add this job to queue and database + def add_job(self, user, job_info): + try: + job = BatchJob(user, job_info) + job.job_id = self.gen_jobid() + self.job_queue.append(job.job_id) + self.job_map[job.job_id] = job + except ValueError as err: + return [False, err.args[0]] + except Exception as err: + return [False, err.args[0]] + finally: + return [True, "add batch job success"] + + # user: username + # list a user's all job + def list_jobs(self,user): + res = [] + for job_id in self.job_queue: + job = self.job_map[job_id] + logger.debug('job_id: %s, user: %s' % (job_id, job.user)) + if job.user == user: + all_tasks = job.raw_job_info['tasks'] + tasks_instCount = {} + for task in all_tasks.keys(): + tasks_instCount[task] = int(all_tasks[task]['instCount']) + res.append({ + 'job_name': job.job_name, + 'job_id': job.job_id, + 'status': job.status, + 'create_time': job.create_time, + 'tasks': list(all_tasks.keys()), + 'tasks_instCount': tasks_instCount + }) + return res + + # user: username + # jobid: the id of job + # get the information of a job, including the status, json description and other information + # call get_task to get the task information + def get_job(self, user, job_id): + pass + + # check if a job exists + def is_job_exist(self, job_id): + return job_id in self.job_queue + + # generate a random job id + def gen_jobid(self): + job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + while self.is_job_exist(job_id): + job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + return job_id + + # this is a thread to process a job + def job_processor(self, job): + task_name, task_info = job.get_task() + if not task_info: + return False + else: + task_priority = job.job_priority + self.taskmgr.add_task(job.user, task_name, task_info, task_priority) + return True + + # this is a thread to schedule the jobs + def job_scheduler(self): + # choose a job from queue, create a job processor for it + for job_id in self.job_queue: + job = self.job_map[job_id] + if self.job_processor(job): + job.status = 'running' + break + else: + job.status = 'done' + + # a task has finished + def report(self, task): + pass + + def get_output(self, username, jobid, taskid, instid, issue): + filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt" + fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename) + logger.info("Get output from:%s" % fpath) + try: + ret = subprocess.run('tail -n 100 ' + fpath,stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) + if ret.returncode != 0: + raise IOError(ret.stdout.decode(encoding="utf-8")) + except Exception as err: + logger.error(traceback.format_exc()) + return "" + else: + return ret.stdout.decode(encoding="utf-8") diff --git a/src/master/nodemgr.py b/src/master/nodemgr.py index d3396a6..a623e2f 100755 --- a/src/master/nodemgr.py +++ b/src/master/nodemgr.py @@ -47,6 +47,8 @@ class NodeMgr(object): # get allnodes self.allnodes = self._nodelist_etcd("allnodes") self.runnodes = [] + self.batchnodes = [] + self.allrunnodes = [] [status, runlist] = self.etcd.listdir("machines/runnodes") for node in runlist: nodeip = node['key'].rsplit('/',1)[1] @@ -140,6 +142,14 @@ class NodeMgr(object): #print(etcd_runip) #print(self.rpcs) self.runnodes = etcd_runip + self.batchnodes = self.runnodes.copy() + self.allrunnodes = self.runnodes.copy() + [status, batchlist] = self.etcd.listdir("machines/batchnodes") + if status: + for node in batchlist: + nodeip = node['key'].rsplit('/', 1)[1] + self.batchnodes.append(nodeip) + self.allrunnodes.append(nodeip) def recover_node(self,ip,tasks): logger.info("now recover for worker:%s" % ip) @@ -152,14 +162,19 @@ class NodeMgr(object): # get all run nodes' IP addr def get_nodeips(self): - return self.runnodes + return self.allrunnodes + + def get_batch_nodeips(self): + return self.batchnodes + def get_base_nodeips(self): + return self.runnodes def get_allnodes(self): return self.allnodes def ip_to_rpc(self,ip): - if ip in self.runnodes: + if ip in self.allrunnodes: return xmlrpc.client.ServerProxy("http://%s:%s" % (ip, env.getenv("WORKER_PORT"))) else: logger.info('Worker %s is not connected, create rpc client failed, push task into queue') diff --git a/src/master/parser.py b/src/master/parser.py new file mode 100644 index 0000000..9449383 --- /dev/null +++ b/src/master/parser.py @@ -0,0 +1,55 @@ +#!/user/bin/python3 +import json + +job_data = {'image_1': 'base_base_base', 'mappingRemoteDir_2_2': 'sss', 'dependency_1': 'aaa', 'mappingLocalDir_2_1': 'xxx', 'mappingLocalDir_1_2': 'aaa', 'mappingLocalDir_1_1': 'aaa', 'mappingLocalDir_2_3': 'fdsffdf', 'mappingRemoteDir_1_1': 'ddd', 'mappingRemoteDir_2_3': 'sss', 'srcAddr_1': 'aaa', 'mappingSource_2_1': 'Aliyun', 'cpuSetting_1': '1', 'mappingSource_2_2': 'Aliyun', 'retryCount_2': '1', 'mappingSource_1_1': 'Aliyun', 'expTime_1': '60', 'diskSetting_2': '1024', 'diskSetting_1': '1024', 'dependency_2': 'ddd', 'memorySetting_1': '1024', 'command_2': 'ccc', 'mappingRemoteDir_1_2': 'ddd', 'gpuSetting_2': '0', 'memorySetting_2': '1024', 'gpuSetting_1': '0', 'mappingLocalDir_2_2': 'bbb', 'mappingSource_1_2': 'Aliyun', 'expTime_2': '60', 'mappingRemoteDir_2_1': 'vvv', 'srcAddr_2': 'fff', 'cpuSetting_2': '1', 'instCount_1': '1', 'mappingSource_2_3': 'Aliyun', 'token': 'ZXlKaGJHY2lPaUpJVXpJMU5pSXNJbWxoZENJNk1UVXpNelE0TVRNMU5Td2laWGh3SWpveE5UTXpORGcwT1RVMWZRLmV5SnBaQ0k2TVgwLkF5UnRnaGJHZXhJY2lBSURZTUd5eXZIUVJnUGd1ZTA3OEtGWkVoejJVMkE=', 'instCount_2': '1', 'retryCount_1': '1', 'command_1': 'aaa', 'jobPriority': '0', 'image_2': 'base_base_base', 'jobName': 'aaa'} + +def parse(job_data): + job_info = {} + message = {} + for key in job_data: + key_arr = key.split('_') + value = job_data[key] + if len(key_arr) == 1: + job_info[key_arr[0]] = value + elif len(key_arr) == 2: + key_prefix, task_idx = key_arr[0], key_arr[1] + task_idx = 'task_' + task_idx + if task_idx in job_info: + job_info[task_idx][key_prefix] = value + else: + tmp_dict = { + key_prefix: value + } + job_info[task_idx] = tmp_dict + elif len(key_arr) == 3: + key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2] + task_idx = 'task_' + task_idx + mapping_idx = 'mapping_' + mapping_idx + if task_idx in job_info: + if "mapping" in job_info[task_idx]: + if mapping_idx in job_info[task_idx]["mapping"]: + job_info[task_idx]["mapping"][mapping_idx][key_prefix] = value + else: + tmp_dict = { + key_prefix: value + } + job_info[task_idx]["mapping"][mapping_idx] = tmp_dict + else: + job_info[task_idx]["mapping"] = { + mapping_idx: { + key_prefix: value + } + } + else: + tmp_dict = { + "mapping":{ + mapping_idx: { + key_prefix: value + } + } + } + job_info[task_idx] = tmp_dict + print(json.dumps(job_info, indent=4)) + +if __name__ == '__main__': + parse(job_data) diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py new file mode 100644 index 0000000..07936d8 --- /dev/null +++ b/src/master/taskmgr.py @@ -0,0 +1,411 @@ +import threading +import time +import string +import random +import json +from functools import wraps + +# must import logger after initlogging, ugly +from utils.log import logger + +# grpc +from concurrent import futures +import grpc +from protos.rpc_pb2 import * +from protos.rpc_pb2_grpc import MasterServicer, add_MasterServicer_to_server, WorkerStub + +from utils import env + + +class Task(): + def __init__(self, info, priority): + self.info = info + self.status = WAITING + self.instance_list = [] + self.token = '' + # priority the bigger the better + # self.priority the smaller the better + self.priority = int(time.time()) / 60 / 60 - priority + + def __lt__(self, other): + return self.priority < other.priority + + +class TaskReporter(MasterServicer): + + def __init__(self, taskmgr): + self.taskmgr = taskmgr + + def report(self, request, context): + for task_report in request.taskmsgs: + self.taskmgr.on_task_report(task_report) + return Reply(status=Reply.ACCEPTED, message='') + + +class TaskMgr(threading.Thread): + + # load task information from etcd + # initial a task queue and task schedueler + # taskmgr: a taskmgr instance + def __init__(self, nodemgr, monitor_fetcher, scheduler_interval=2, external_logger=None): + threading.Thread.__init__(self) + self.thread_stop = False + self.jobmgr = None + self.task_queue = [] + self.lazy_append_list = [] + self.lazy_delete_list = [] + self.task_queue_lock = threading.Lock() + self.user_containers = {} + + self.scheduler_interval = scheduler_interval + self.logger = logger + + self.master_port = env.getenv('BATCH_MASTER_PORT') + self.worker_port = env.getenv('BATCH_WORKER_PORT') + + # nodes + self.nodemgr = nodemgr + self.monitor_fetcher = monitor_fetcher + self.cpu_usage = {} + self.gpu_usage = {} + # self.all_nodes = None + # self.last_nodes_info_update_time = 0 + # self.nodes_info_update_interval = 30 # (s) + + + def queue_lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): + self.task_queue_lock.acquire() + result = f(self, *args, **kwargs) + self.task_queue_lock.release() + return result + return new_f + + + def run(self): + self.serve() + while not self.thread_stop: + self.sort_out_task_queue() + task, instance_id, worker = self.task_scheduler() + if task is not None and worker is not None: + self.task_processor(task, instance_id, worker) + else: + time.sleep(self.scheduler_interval) + + + def serve(self): + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + add_MasterServicer_to_server(TaskReporter(self), self.server) + self.server.add_insecure_port('[::]:' + self.master_port) + self.server.start() + self.logger.info('[taskmgr_rpc] start rpc server') + + + def stop(self): + self.thread_stop = True + self.server.stop(0) + self.logger.info('[taskmgr_rpc] stop rpc server') + + + # this method is called when worker send heart-beat rpc request + def on_task_report(self, report): + self.logger.info('[on_task_report] receive task report: id %s-%d, status %d' % (report.taskid, report.instanceid, report.instanceStatus)) + task = self.get_task(report.taskid) + if task == None: + self.logger.error('[on_task_report] task not found') + return + + instance = task.instance_list[report.instanceid] + if instance['token'] != report.token: + self.logger.warning('[on_task_report] wrong token') + return + username = task.info.username + container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token + self.user_containers[username].remove(container_name) + + if instance['status'] != RUNNING: + self.logger.error('[on_task_report] receive task report when instance is not running') + + if instance['status'] == RUNNING and report.instanceStatus != RUNNING: + self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu + self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu + + instance['status'] = report.instanceStatus + instance['error_msg'] = report.errmsg + + if report.instanceStatus == COMPLETED: + self.check_task_completed(task) + elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT: + if instance['try_count'] > task.info.maxRetryCount: + self.check_task_completed(task) + elif report.instanceStatus == OUTPUTERROR: + self.task_failed(task) + + + def check_task_completed(self, task): + if len(task.instance_list) < task.info.instanceCount: + return + failed = False + for instance in task.instance_list: + if instance['status'] == RUNNING or instance['status'] == WAITING: + return + if instance['status'] == FAILED or instance['status'] == TIMEOUT: + if instance['try_count'] > task.info.maxRetryCount: + failed = True + else: + return + if instance['status'] == OUTPUTERROR: + failed = True + break + + if failed: + self.task_failed(task) + else: + self.task_completed(task) + + + def task_completed(self, task): + task.status = COMPLETED + + if self.jobmgr is None: + self.logger.error('[task_completed] jobmgr is None!') + else: + self.jobmgr.report(task) + self.logger.info('task %s completed' % task.info.id) + self.lazy_delete_list.append(task) + + + def task_failed(self, task): + task.status = FAILED + + if self.jobmgr is None: + self.logger.error('[task_failed] jobmgr is None!') + else: + self.jobmgr.report(task) + self.logger.info('task %s failed' % task.info.id) + self.lazy_delete_list.append(task) + + + @queue_lock + def sort_out_task_queue(self): + while self.lazy_delete_list: + task = self.lazy_delete_list.pop(0) + self.task_queue.remove(task) + if self.lazy_append_list: + while self.lazy_append_list: + task = self.lazy_append_list.pop(0) + self.task_queue.append(task) + self.task_queue = sorted(self.task_queue, key=lambda x: x.priority) + + + def task_processor(self, task, instance_id, worker_ip): + task.status = RUNNING + + # properties for transaction + task.info.instanceid = instance_id + task.info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + + instance = task.instance_list[instance_id] + instance['status'] = RUNNING + instance['try_count'] += 1 + instance['token'] = task.info.token + instance['worker'] = worker_ip + + self.cpu_usage[worker_ip] += task.info.cluster.instance.cpu + self.gpu_usage[worker_ip] += task.info.cluster.instance.gpu + username = task.info.username + container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token + if not username in self.user_containers.keys(): + self.user_containers[username] = [] + self.user_containers[username].append(container_name) + + try: + self.logger.info('[task_processor] processing task [%s] instance [%d]' % (task.info.id, task.info.instanceid)) + channel = grpc.insecure_channel('%s:%s' % (worker_ip, self.worker_port)) + stub = WorkerStub(channel) + response = stub.process_task(task.info) + if response.status != Reply.ACCEPTED: + raise Exception(response.message) + except Exception as e: + self.logger.error('[task_processor] rpc error message: %s' % e) + instance['status'] = FAILED + instance['try_count'] -= 1 + self.user_containers[username].remove(container_name) + + + # return task, worker + def task_scheduler(self): + # simple FIFO with priority + self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue)) + + # nodes = self.get_all_nodes() + # if nodes is None or len(nodes) == 0: + # self.logger.info('[task_scheduler] no nodes found') + # else: + # for worker_ip, worker_info in nodes: + # self.logger.info('[task_scheduler] nodes %s' % worker_ip) + # for key in worker_info: + # if key == 'cpu': + # self.logger.info('[task_scheduler] %s: %d/%d' % (key, self.get_cpu_usage(worker_ip), worker_info[key])) + # else: + # self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key])) + + for task in self.task_queue: + if task in self.lazy_delete_list: + continue + worker = self.find_proper_worker(task) + + for index, instance in enumerate(task.instance_list): + # find instance to retry + if (instance['status'] == FAILED or instance['status'] == TIMEOUT) and instance['try_count'] <= task.info.maxRetryCount: + if worker is not None: + self.logger.info('[task_scheduler] retry') + return task, index, worker + # find timeout instance + elif instance['status'] == RUNNING: + if not self.is_alive(instance['worker']): + instance['status'] = FAILED + instance['token'] = '' + self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu + self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu + + self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index)) + if worker is not None: + return task, index, worker + + if worker is not None: + # start new instance + if len(task.instance_list) < task.info.instanceCount: + instance = {} + instance['try_count'] = 0 + task.instance_list.append(instance) + return task, len(task.instance_list) - 1, worker + + self.check_task_completed(task) + + return None, None, None + + def find_proper_worker(self, task): + nodes = self.get_all_nodes() + if nodes is None or len(nodes) == 0: + self.logger.warning('[task_scheduler] running nodes not found') + return None + + for worker_ip, worker_info in nodes: + if task.info.cluster.instance.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']: + continue + if task.info.cluster.instance.memory > worker_info['memory']: + continue + # try not to assign non-gpu task to a worker with gpu + if task.info.cluster.instance.gpu == 0 and worker_info['gpu'] > 0: + continue + if task.info.cluster.instance.gpu + self.get_gpu_usage(worker_ip) > worker_info['gpu']: + continue + return worker_ip + return None + + + def get_all_nodes(self): + # cache running nodes + # if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval: + # return self.all_nodes + # get running nodes + node_ips = self.nodemgr.get_batch_nodeips() + all_nodes = [(node_ip, self.get_worker_resource_info(node_ip)) for node_ip in node_ips] + return all_nodes + + + def is_alive(self, worker): + nodes = self.nodemgr.get_batch_nodeips() + return worker in nodes + + + def get_worker_resource_info(self, worker_ip): + fetcher = self.monitor_fetcher(worker_ip) + worker_info = fetcher.info + info = {} + info['cpu'] = len(worker_info['cpuconfig']) + info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb) + info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb) + info['gpu'] = len(worker_info['gpuinfo']) + return info + + + def get_cpu_usage(self, worker_ip): + try: + return self.cpu_usage[worker_ip] + except: + self.cpu_usage[worker_ip] = 0 + return 0 + + + def get_gpu_usage(self, worker_ip): + try: + return self.gpu_usage[worker_ip] + except: + self.gpu_usage[worker_ip] = 0 + return 0 + + + def set_jobmgr(self, jobmgr): + self.jobmgr = jobmgr + + + # save the task information into database + # called when jobmgr assign task to taskmgr + def add_task(self, username, taskid, json_task, task_priority=1): + # decode json string to object defined in grpc + self.logger.info('[taskmgr add_task] receive task %s' % taskid) + image_dict = { + "private": Image.PRIVATE, + "base": Image.BASE, + "public": Image.PUBLIC + } + # json_task = json.loads(json_task) + task = Task(TaskInfo( + id = taskid, + username = username, + instanceCount = int(json_task['instCount']), + maxRetryCount = int(json_task['retryCount']), + timeout = int(json_task['expTime']), + parameters = Parameters( + command = Command( + commandLine = json_task['command'], + packagePath = json_task['srcAddr'], + envVars = {}), + stderrRedirectPath = json_task.get('stdErrRedPth',""), + stdoutRedirectPath = json_task.get('stdOutRedPth',"")), + cluster = Cluster( + image = Image( + name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'], + type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'], + owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']), + instance = Instance( + cpu = int(json_task['cpuSetting']), + memory = int(json_task['memorySetting']), + disk = int(json_task['diskSetting']), + gpu = int(json_task['gpuSetting'])))), + priority=task_priority) + if 'mapping' in json_task: + task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'], + remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) + for mapping_key in json_task['mapping']]) + self.lazy_append_list.append(task) + + + # user: username + # get the information of a task, including the status, task description and other information + @queue_lock + def get_task(self, taskid): + for task in self.task_queue: + if task.info.id == taskid: + return task + return None + + # get names of all the batch containers of the user + def get_user_batch_containers(self,username): + if not username in self.user_containers.keys(): + return [] + else: + return self.user_containers[username] diff --git a/src/master/testTaskCtrler.py b/src/master/testTaskCtrler.py new file mode 100644 index 0000000..ba6b947 --- /dev/null +++ b/src/master/testTaskCtrler.py @@ -0,0 +1,39 @@ +import sys +if sys.path[0].endswith("master"): + sys.path[0] = sys.path[0][:-6] + +import grpc,time + +from protos import rpc_pb2, rpc_pb2_grpc + +def run(): + channel = grpc.insecure_channel('localhost:50051') + stub = rpc_pb2_grpc.WorkerStub(channel) + + comm = rpc_pb2.Command(commandLine="ls /root/oss/test-for-docklet", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' + paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/oss/test-for-docklet/", stdoutRedirectPath="/root/oss/test-for-docklet/") + + img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet") + inst = rpc_pb2.Instance(cpu=1, memory=1000, disk=1000, gpu=0) + mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="") + clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt]) + + task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token="test") + + response = stub.process_task(task) + print("Batch client received: " + str(response.status)+" "+response.message) + +def stop_task(): + channel = grpc.insecure_channel('localhost:50051') + stub = rpc_pb2_grpc.WorkerStub(channel) + + taskmsg = rpc_pb2.TaskMsg(taskid="test",username="root",instanceid=1,instanceStatus=rpc_pb2.COMPLETED,token="test",errmsg="") + reportmsg = rpc_pb2.ReportMsg(taskmsgs = [taskmsg]) + + response = stub.stop_tasks(reportmsg) + print("Batch client received: " + str(response.status)+" "+response.message) + +if __name__ == '__main__': + run() + #time.sleep(4) + #stop_task() diff --git a/src/master/testTaskMgr.py b/src/master/testTaskMgr.py new file mode 100644 index 0000000..85fbeb5 --- /dev/null +++ b/src/master/testTaskMgr.py @@ -0,0 +1,181 @@ +import master.taskmgr +from concurrent import futures +import grpc +from protos.rpc_pb2 import * +from protos.rpc_pb2_grpc import * +import threading, json, time, random +from utils import env + + +class SimulatedNodeMgr(): + def get_batch_nodeips(self): + return ['0.0.0.0'] + + +class SimulatedMonitorFetcher(): + def __init__(self, ip): + self.info = {} + self.info['cpuconfig'] = [1,1,1,1] + self.info['meminfo'] = {} + self.info['meminfo']['free'] = 4 * 1024 * 1024 # (kb) simulate 4 GB memory + self.info['diskinfo'] = [] + self.info['diskinfo'].append({}) + self.info['diskinfo'][0]['free'] = 8 * 1024 * 1024 * 1024 # (b) simulate 8 GB disk + + +class SimulatedTaskController(WorkerServicer): + + def __init__(self, worker): + self.worker = worker + + def process_task(self, task, context): + print('[SimulatedTaskController] receive task [%s] instanceid [%d] token [%s]' % (task.id, task.instanceid, task.token)) + worker.process(task) + return Reply(status=Reply.ACCEPTED,message="") + + +class SimulatedWorker(threading.Thread): + + def __init__(self): + threading.Thread.__init__(self) + self.thread_stop = False + self.tasks = [] + + def run(self): + worker_port = env.getenv('BATCH_WORKER_PORT') + server = grpc.server(futures.ThreadPoolExecutor(max_workers=5)) + add_WorkerServicer_to_server(SimulatedTaskController(self), server) + server.add_insecure_port('[::]:' + worker_port) + server.start() + while not self.thread_stop: + for task in self.tasks: + seed = random.random() + if seed < 0.25: + report(task.id, task.instanceid, RUNNING, task.token) + elif seed < 0.5: + report(task.id, task.instanceid, COMPLETED, task.token) + self.tasks.remove(task) + elif seed < 0.75: + report(task.id, task.instanceid, FAILED, task.token) + self.tasks.remove(task) + else: + pass + time.sleep(5) + server.stop(0) + + def stop(self): + self.thread_stop = True + + def process(self, task): + self.tasks.append(task) + + +class SimulatedJobMgr(threading.Thread): + + def __init__(self): + threading.Thread.__init__(self) + self.thread_stop = False + + def run(self): + while not self.thread_stop: + time.sleep(5) + server.stop(0) + + def stop(self): + self.thread_stop = True + + def report(self, task): + print('[SimulatedJobMgr] task[%s] status %d' % (task.info.id, task.status)) + + def assignTask(self, taskmgr, taskid, instance_count, retry_count, timeout, cpu, memory, disk): + task = {} + task['instanceCount'] = instance_count + task['maxRetryCount'] = retry_count + task['timeout'] = timeout + task['parameters'] = {} + task['parameters']['command'] = {} + task['parameters']['command']['commandLine'] = 'ls' + task['parameters']['command']['packagePath'] = '' + task['parameters']['command']['envVars'] = {'a':'1'} + task['parameters']['stderrRedirectPath'] = '' + task['parameters']['stdoutRedirectPath'] = '' + task['cluster'] = {} + task['cluster']['image'] = {} + task['cluster']['image']['name'] = '' + task['cluster']['image']['type'] = 1 + task['cluster']['image']['owner'] = '' + task['cluster']['instance'] = {} + task['cluster']['instance']['cpu'] = cpu + task['cluster']['instance']['memory'] = memory + task['cluster']['instance']['disk'] = disk + task['cluster']['instance']['gpu'] = 0 + task['cluster']['mount'] = [{'remotePath':'', 'localPath':''}] + + taskmgr.add_task('root', taskid, json.dumps(task)) + + +class SimulatedLogger(): + def info(self, msg): + print('[INFO] ' + msg) + + def warning(self, msg): + print('[WARNING] ' + msg) + + def error(self, msg): + print('[ERROR] ' + msg) + + +def test(): + global worker + global jobmgr + global taskmgr + + worker = SimulatedWorker() + worker.start() + jobmgr = SimulatedJobMgr() + jobmgr.start() + + taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, external_logger=SimulatedLogger()) + taskmgr.set_jobmgr(jobmgr) + taskmgr.start() + + add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048) + + +def test2(): + global jobmgr + global taskmgr + jobmgr = SimulatedJobMgr() + jobmgr.start() + + taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, external_logger=SimulatedLogger()) + taskmgr.set_jobmgr(jobmgr) + taskmgr.start() + + add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048) + + + +def add(taskid, instance_count, retry_count, timeout, cpu, memory, disk): + global jobmgr + global taskmgr + jobmgr.assignTask(taskmgr, taskid, instance_count, retry_count, timeout, cpu, memory, disk) + + +def report(taskid, instanceid, status, token): + global taskmgr + + master_port = env.getenv('BATCH_MASTER_PORT') + channel = grpc.insecure_channel('%s:%s' % ('0.0.0.0', master_port)) + stub = MasterStub(channel) + response = stub.report(ReportMsg(taskmsgs=TaskMsg(taskid=taskid, instanceid=instanceid, instanceStatus=status, token=token))) + + +def stop(): + global worker + global jobmgr + global taskmgr + + worker.stop() + jobmgr.stop() + taskmgr.stop() diff --git a/src/master/vclustermgr.py b/src/master/vclustermgr.py index a09a3a9..caef4ad 100755 --- a/src/master/vclustermgr.py +++ b/src/master/vclustermgr.py @@ -120,7 +120,7 @@ class VclusterMgr(object): return [False, "the size of disk is not big enough for the image"] clustersize = int(self.defaultsize) logger.info ("starting cluster %s with %d containers for %s" % (clustername, int(clustersize), username)) - workers = self.nodemgr.get_nodeips() + workers = self.nodemgr.get_base_nodeips() image_json = json.dumps(image) groupname = json.loads(user_info)["data"]["group"] groupquota = json.loads(user_info)["data"]["groupinfo"] @@ -206,7 +206,7 @@ class VclusterMgr(object): return [False, "cluster:%s not found" % clustername] if self.imgmgr.get_image_size(image) + 100 > int(setting["disk"]): return [False, "the size of disk is not big enough for the image"] - workers = self.nodemgr.get_nodeips() + workers = self.nodemgr.get_base_nodeips() if (len(workers) == 0): logger.warning("no workers to start containers, scale out failed") return [False, "no workers are running"] diff --git a/src/protos/rpc.proto b/src/protos/rpc.proto new file mode 100644 index 0000000..6a5d274 --- /dev/null +++ b/src/protos/rpc.proto @@ -0,0 +1,101 @@ +syntax = "proto3"; + +service Master { + rpc report (ReportMsg) returns (Reply) {} +} + +service Worker { + rpc process_task (TaskInfo) returns (Reply) {} + rpc stop_tasks (ReportMsg) returns (Reply) {} +} + +message Reply { + ReplyStatus status = 1; // 返回值 + string message = 2; + + enum ReplyStatus { + ACCEPTED = 0; + REFUSED = 1; + } +} + +message ReportMsg { + repeated TaskMsg taskmsgs = 1; +} + +message TaskMsg { + string taskid = 1; + string username = 2; + int32 instanceid = 3; + Status instanceStatus = 4; // 任务状态 + string token = 5; + string errmsg = 6; +} + +enum Status { + WAITING = 0; + RUNNING = 1; + COMPLETED = 2; + FAILED = 3; + TIMEOUT = 4; + OUTPUTERROR = 5; +} + +message TaskInfo { + string id = 1; + string username = 2; + int32 instanceid = 3; + int32 instanceCount = 4; // 实例个数 + int32 maxRetryCount = 5; // 最大重试次数 + Parameters parameters = 6; // 参数 + Cluster cluster = 7; // 集群配置 + int32 timeout = 8; // 超时阈值 + string token = 9; + bool reused = 10; //是否重用 +} + +message Parameters { + Command command = 1; // 命令配置 + string stderrRedirectPath = 2; // 错误输出重定向 + string stdoutRedirectPath = 3; // 标准输出重定向 +} + +message Command { + string commandLine = 1; // 命令 + string packagePath = 2; // 工作路径 + map envVars = 3; // 自定义环境变量 +} + +message Cluster { + Image image = 1; // 镜像配置 + Instance instance = 2; // 实例配置 + repeated Mount mount = 3; // 挂载配置 +} + +message Image { + string name = 1; // 镜像名 + ImageType type = 2; // 镜像类型(public/private) + string owner = 3; // 所有者 + + enum ImageType { + BASE = 0; + PUBLIC = 1; + PRIVATE = 2; + } +} + +message Mount { + string provider = 1; + string localPath = 2; // 本地路径 + string remotePath = 3; // 远程路径 + string accessKey = 4; + string secretKey = 5; + string other = 6; +} + +message Instance { + int32 cpu = 1; // CPU,单位 个? + int32 memory = 2; // 内存,单位 mb + int32 disk = 3; // 磁盘,单位 mb + int32 gpu = 4; // 显卡,单位 个 +} diff --git a/src/protos/rpc_pb2.py b/src/protos/rpc_pb2.py new file mode 100644 index 0000000..4287adf --- /dev/null +++ b/src/protos/rpc_pb2.py @@ -0,0 +1,852 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: rpc.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='rpc.proto', + package='', + syntax='proto3', + serialized_pb=_b('\n\trpc.proto\"f\n\x05Reply\x12\"\n\x06status\x18\x01 \x01(\x0e\x32\x12.Reply.ReplyStatus\x12\x0f\n\x07message\x18\x02 \x01(\t\"(\n\x0bReplyStatus\x12\x0c\n\x08\x41\x43\x43\x45PTED\x10\x00\x12\x0b\n\x07REFUSED\x10\x01\"\'\n\tReportMsg\x12\x1a\n\x08taskmsgs\x18\x01 \x03(\x0b\x32\x08.TaskMsg\"\x7f\n\x07TaskMsg\x12\x0e\n\x06taskid\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x1f\n\x0einstanceStatus\x18\x04 \x01(\x0e\x32\x07.Status\x12\r\n\x05token\x18\x05 \x01(\t\x12\x0e\n\x06\x65rrmsg\x18\x06 \x01(\t\"\xd6\x01\n\x08TaskInfo\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x12\n\ninstanceid\x18\x03 \x01(\x05\x12\x15\n\rinstanceCount\x18\x04 \x01(\x05\x12\x15\n\rmaxRetryCount\x18\x05 \x01(\x05\x12\x1f\n\nparameters\x18\x06 \x01(\x0b\x32\x0b.Parameters\x12\x19\n\x07\x63luster\x18\x07 \x01(\x0b\x32\x08.Cluster\x12\x0f\n\x07timeout\x18\x08 \x01(\x05\x12\r\n\x05token\x18\t \x01(\t\x12\x0e\n\x06reused\x18\n \x01(\x08\"_\n\nParameters\x12\x19\n\x07\x63ommand\x18\x01 \x01(\x0b\x32\x08.Command\x12\x1a\n\x12stderrRedirectPath\x18\x02 \x01(\t\x12\x1a\n\x12stdoutRedirectPath\x18\x03 \x01(\t\"\x8b\x01\n\x07\x43ommand\x12\x13\n\x0b\x63ommandLine\x18\x01 \x01(\t\x12\x13\n\x0bpackagePath\x18\x02 \x01(\t\x12&\n\x07\x65nvVars\x18\x03 \x03(\x0b\x32\x15.Command.EnvVarsEntry\x1a.\n\x0c\x45nvVarsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"T\n\x07\x43luster\x12\x15\n\x05image\x18\x01 \x01(\x0b\x32\x06.Image\x12\x1b\n\x08instance\x18\x02 \x01(\x0b\x32\t.Instance\x12\x15\n\x05mount\x18\x03 \x03(\x0b\x32\x06.Mount\"t\n\x05Image\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x1e\n\x04type\x18\x02 \x01(\x0e\x32\x10.Image.ImageType\x12\r\n\x05owner\x18\x03 \x01(\t\".\n\tImageType\x12\x08\n\x04\x42\x41SE\x10\x00\x12\n\n\x06PUBLIC\x10\x01\x12\x0b\n\x07PRIVATE\x10\x02\"u\n\x05Mount\x12\x10\n\x08provider\x18\x01 \x01(\t\x12\x11\n\tlocalPath\x18\x02 \x01(\t\x12\x12\n\nremotePath\x18\x03 \x01(\t\x12\x11\n\taccessKey\x18\x04 \x01(\t\x12\x11\n\tsecretKey\x18\x05 \x01(\t\x12\r\n\x05other\x18\x06 \x01(\t\"B\n\x08Instance\x12\x0b\n\x03\x63pu\x18\x01 \x01(\x05\x12\x0e\n\x06memory\x18\x02 \x01(\x05\x12\x0c\n\x04\x64isk\x18\x03 \x01(\x05\x12\x0b\n\x03gpu\x18\x04 \x01(\x05*[\n\x06Status\x12\x0b\n\x07WAITING\x10\x00\x12\x0b\n\x07RUNNING\x10\x01\x12\r\n\tCOMPLETED\x10\x02\x12\n\n\x06\x46\x41ILED\x10\x03\x12\x0b\n\x07TIMEOUT\x10\x04\x12\x0f\n\x0bOUTPUTERROR\x10\x05\x32(\n\x06Master\x12\x1e\n\x06report\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x32Q\n\x06Worker\x12#\n\x0cprocess_task\x12\t.TaskInfo\x1a\x06.Reply\"\x00\x12\"\n\nstop_tasks\x12\n.ReportMsg\x1a\x06.Reply\"\x00\x62\x06proto3') +) + +_STATUS = _descriptor.EnumDescriptor( + name='Status', + full_name='Status', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='WAITING', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='RUNNING', index=1, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='COMPLETED', index=2, number=2, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='FAILED', index=3, number=3, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='TIMEOUT', index=4, number=4, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='OUTPUTERROR', index=5, number=5, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=1134, + serialized_end=1225, +) +_sym_db.RegisterEnumDescriptor(_STATUS) + +Status = enum_type_wrapper.EnumTypeWrapper(_STATUS) +WAITING = 0 +RUNNING = 1 +COMPLETED = 2 +FAILED = 3 +TIMEOUT = 4 +OUTPUTERROR = 5 + + +_REPLY_REPLYSTATUS = _descriptor.EnumDescriptor( + name='ReplyStatus', + full_name='Reply.ReplyStatus', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='ACCEPTED', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='REFUSED', index=1, number=1, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=75, + serialized_end=115, +) +_sym_db.RegisterEnumDescriptor(_REPLY_REPLYSTATUS) + +_IMAGE_IMAGETYPE = _descriptor.EnumDescriptor( + name='ImageType', + full_name='Image.ImageType', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='BASE', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='PUBLIC', index=1, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='PRIVATE', index=2, number=2, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=899, + serialized_end=945, +) +_sym_db.RegisterEnumDescriptor(_IMAGE_IMAGETYPE) + + +_REPLY = _descriptor.Descriptor( + name='Reply', + full_name='Reply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='status', full_name='Reply.status', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='message', full_name='Reply.message', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _REPLY_REPLYSTATUS, + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=13, + serialized_end=115, +) + + +_REPORTMSG = _descriptor.Descriptor( + name='ReportMsg', + full_name='ReportMsg', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='taskmsgs', full_name='ReportMsg.taskmsgs', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=117, + serialized_end=156, +) + + +_TASKMSG = _descriptor.Descriptor( + name='TaskMsg', + full_name='TaskMsg', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='taskid', full_name='TaskMsg.taskid', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='username', full_name='TaskMsg.username', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='instanceid', full_name='TaskMsg.instanceid', index=2, + number=3, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='instanceStatus', full_name='TaskMsg.instanceStatus', index=3, + number=4, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='token', full_name='TaskMsg.token', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='errmsg', full_name='TaskMsg.errmsg', index=5, + number=6, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=158, + serialized_end=285, +) + + +_TASKINFO = _descriptor.Descriptor( + name='TaskInfo', + full_name='TaskInfo', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='TaskInfo.id', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='username', full_name='TaskInfo.username', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='instanceid', full_name='TaskInfo.instanceid', index=2, + number=3, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='instanceCount', full_name='TaskInfo.instanceCount', index=3, + number=4, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='maxRetryCount', full_name='TaskInfo.maxRetryCount', index=4, + number=5, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='parameters', full_name='TaskInfo.parameters', index=5, + number=6, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='cluster', full_name='TaskInfo.cluster', index=6, + number=7, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='timeout', full_name='TaskInfo.timeout', index=7, + number=8, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='token', full_name='TaskInfo.token', index=8, + number=9, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='reused', full_name='TaskInfo.reused', index=9, + number=10, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=288, + serialized_end=502, +) + + +_PARAMETERS = _descriptor.Descriptor( + name='Parameters', + full_name='Parameters', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='command', full_name='Parameters.command', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='stderrRedirectPath', full_name='Parameters.stderrRedirectPath', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='stdoutRedirectPath', full_name='Parameters.stdoutRedirectPath', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=504, + serialized_end=599, +) + + +_COMMAND_ENVVARSENTRY = _descriptor.Descriptor( + name='EnvVarsEntry', + full_name='Command.EnvVarsEntry', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='key', full_name='Command.EnvVarsEntry.key', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='value', full_name='Command.EnvVarsEntry.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=_descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')), + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=695, + serialized_end=741, +) + +_COMMAND = _descriptor.Descriptor( + name='Command', + full_name='Command', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='commandLine', full_name='Command.commandLine', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='packagePath', full_name='Command.packagePath', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='envVars', full_name='Command.envVars', index=2, + number=3, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[_COMMAND_ENVVARSENTRY, ], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=602, + serialized_end=741, +) + + +_CLUSTER = _descriptor.Descriptor( + name='Cluster', + full_name='Cluster', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='image', full_name='Cluster.image', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='instance', full_name='Cluster.instance', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='mount', full_name='Cluster.mount', index=2, + number=3, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=743, + serialized_end=827, +) + + +_IMAGE = _descriptor.Descriptor( + name='Image', + full_name='Image', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='Image.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='type', full_name='Image.type', index=1, + number=2, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='owner', full_name='Image.owner', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _IMAGE_IMAGETYPE, + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=829, + serialized_end=945, +) + + +_MOUNT = _descriptor.Descriptor( + name='Mount', + full_name='Mount', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='provider', full_name='Mount.provider', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='localPath', full_name='Mount.localPath', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='remotePath', full_name='Mount.remotePath', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='accessKey', full_name='Mount.accessKey', index=3, + number=4, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='secretKey', full_name='Mount.secretKey', index=4, + number=5, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='other', full_name='Mount.other', index=5, + number=6, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=947, + serialized_end=1064, +) + + +_INSTANCE = _descriptor.Descriptor( + name='Instance', + full_name='Instance', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='cpu', full_name='Instance.cpu', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='memory', full_name='Instance.memory', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='disk', full_name='Instance.disk', index=2, + number=3, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='gpu', full_name='Instance.gpu', index=3, + number=4, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=1066, + serialized_end=1132, +) + +_REPLY.fields_by_name['status'].enum_type = _REPLY_REPLYSTATUS +_REPLY_REPLYSTATUS.containing_type = _REPLY +_REPORTMSG.fields_by_name['taskmsgs'].message_type = _TASKMSG +_TASKMSG.fields_by_name['instanceStatus'].enum_type = _STATUS +_TASKINFO.fields_by_name['parameters'].message_type = _PARAMETERS +_TASKINFO.fields_by_name['cluster'].message_type = _CLUSTER +_PARAMETERS.fields_by_name['command'].message_type = _COMMAND +_COMMAND_ENVVARSENTRY.containing_type = _COMMAND +_COMMAND.fields_by_name['envVars'].message_type = _COMMAND_ENVVARSENTRY +_CLUSTER.fields_by_name['image'].message_type = _IMAGE +_CLUSTER.fields_by_name['instance'].message_type = _INSTANCE +_CLUSTER.fields_by_name['mount'].message_type = _MOUNT +_IMAGE.fields_by_name['type'].enum_type = _IMAGE_IMAGETYPE +_IMAGE_IMAGETYPE.containing_type = _IMAGE +DESCRIPTOR.message_types_by_name['Reply'] = _REPLY +DESCRIPTOR.message_types_by_name['ReportMsg'] = _REPORTMSG +DESCRIPTOR.message_types_by_name['TaskMsg'] = _TASKMSG +DESCRIPTOR.message_types_by_name['TaskInfo'] = _TASKINFO +DESCRIPTOR.message_types_by_name['Parameters'] = _PARAMETERS +DESCRIPTOR.message_types_by_name['Command'] = _COMMAND +DESCRIPTOR.message_types_by_name['Cluster'] = _CLUSTER +DESCRIPTOR.message_types_by_name['Image'] = _IMAGE +DESCRIPTOR.message_types_by_name['Mount'] = _MOUNT +DESCRIPTOR.message_types_by_name['Instance'] = _INSTANCE +DESCRIPTOR.enum_types_by_name['Status'] = _STATUS +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Reply = _reflection.GeneratedProtocolMessageType('Reply', (_message.Message,), dict( + DESCRIPTOR = _REPLY, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:Reply) + )) +_sym_db.RegisterMessage(Reply) + +ReportMsg = _reflection.GeneratedProtocolMessageType('ReportMsg', (_message.Message,), dict( + DESCRIPTOR = _REPORTMSG, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:ReportMsg) + )) +_sym_db.RegisterMessage(ReportMsg) + +TaskMsg = _reflection.GeneratedProtocolMessageType('TaskMsg', (_message.Message,), dict( + DESCRIPTOR = _TASKMSG, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:TaskMsg) + )) +_sym_db.RegisterMessage(TaskMsg) + +TaskInfo = _reflection.GeneratedProtocolMessageType('TaskInfo', (_message.Message,), dict( + DESCRIPTOR = _TASKINFO, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:TaskInfo) + )) +_sym_db.RegisterMessage(TaskInfo) + +Parameters = _reflection.GeneratedProtocolMessageType('Parameters', (_message.Message,), dict( + DESCRIPTOR = _PARAMETERS, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:Parameters) + )) +_sym_db.RegisterMessage(Parameters) + +Command = _reflection.GeneratedProtocolMessageType('Command', (_message.Message,), dict( + + EnvVarsEntry = _reflection.GeneratedProtocolMessageType('EnvVarsEntry', (_message.Message,), dict( + DESCRIPTOR = _COMMAND_ENVVARSENTRY, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:Command.EnvVarsEntry) + )) + , + DESCRIPTOR = _COMMAND, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:Command) + )) +_sym_db.RegisterMessage(Command) +_sym_db.RegisterMessage(Command.EnvVarsEntry) + +Cluster = _reflection.GeneratedProtocolMessageType('Cluster', (_message.Message,), dict( + DESCRIPTOR = _CLUSTER, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:Cluster) + )) +_sym_db.RegisterMessage(Cluster) + +Image = _reflection.GeneratedProtocolMessageType('Image', (_message.Message,), dict( + DESCRIPTOR = _IMAGE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:Image) + )) +_sym_db.RegisterMessage(Image) + +Mount = _reflection.GeneratedProtocolMessageType('Mount', (_message.Message,), dict( + DESCRIPTOR = _MOUNT, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:Mount) + )) +_sym_db.RegisterMessage(Mount) + +Instance = _reflection.GeneratedProtocolMessageType('Instance', (_message.Message,), dict( + DESCRIPTOR = _INSTANCE, + __module__ = 'rpc_pb2' + # @@protoc_insertion_point(class_scope:Instance) + )) +_sym_db.RegisterMessage(Instance) + + +_COMMAND_ENVVARSENTRY.has_options = True +_COMMAND_ENVVARSENTRY._options = _descriptor._ParseOptions(descriptor_pb2.MessageOptions(), _b('8\001')) + +_MASTER = _descriptor.ServiceDescriptor( + name='Master', + full_name='Master', + file=DESCRIPTOR, + index=0, + options=None, + serialized_start=1227, + serialized_end=1267, + methods=[ + _descriptor.MethodDescriptor( + name='report', + full_name='Master.report', + index=0, + containing_service=None, + input_type=_REPORTMSG, + output_type=_REPLY, + options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_MASTER) + +DESCRIPTOR.services_by_name['Master'] = _MASTER + + +_WORKER = _descriptor.ServiceDescriptor( + name='Worker', + full_name='Worker', + file=DESCRIPTOR, + index=1, + options=None, + serialized_start=1269, + serialized_end=1350, + methods=[ + _descriptor.MethodDescriptor( + name='process_task', + full_name='Worker.process_task', + index=0, + containing_service=None, + input_type=_TASKINFO, + output_type=_REPLY, + options=None, + ), + _descriptor.MethodDescriptor( + name='stop_tasks', + full_name='Worker.stop_tasks', + index=1, + containing_service=None, + input_type=_REPORTMSG, + output_type=_REPLY, + options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_WORKER) + +DESCRIPTOR.services_by_name['Worker'] = _WORKER + +# @@protoc_insertion_point(module_scope) diff --git a/src/protos/rpc_pb2_grpc.py b/src/protos/rpc_pb2_grpc.py new file mode 100644 index 0000000..f962d31 --- /dev/null +++ b/src/protos/rpc_pb2_grpc.py @@ -0,0 +1,105 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +from protos import rpc_pb2 as rpc__pb2 + + +class MasterStub(object): + # missing associated documentation comment in .proto file + pass + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.report = channel.unary_unary( + '/Master/report', + request_serializer=rpc__pb2.ReportMsg.SerializeToString, + response_deserializer=rpc__pb2.Reply.FromString, + ) + + +class MasterServicer(object): + # missing associated documentation comment in .proto file + pass + + def report(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_MasterServicer_to_server(servicer, server): + rpc_method_handlers = { + 'report': grpc.unary_unary_rpc_method_handler( + servicer.report, + request_deserializer=rpc__pb2.ReportMsg.FromString, + response_serializer=rpc__pb2.Reply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'Master', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + +class WorkerStub(object): + # missing associated documentation comment in .proto file + pass + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.process_task = channel.unary_unary( + '/Worker/process_task', + request_serializer=rpc__pb2.TaskInfo.SerializeToString, + response_deserializer=rpc__pb2.Reply.FromString, + ) + self.stop_tasks = channel.unary_unary( + '/Worker/stop_tasks', + request_serializer=rpc__pb2.ReportMsg.SerializeToString, + response_deserializer=rpc__pb2.Reply.FromString, + ) + + +class WorkerServicer(object): + # missing associated documentation comment in .proto file + pass + + def process_task(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def stop_tasks(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_WorkerServicer_to_server(servicer, server): + rpc_method_handlers = { + 'process_task': grpc.unary_unary_rpc_method_handler( + servicer.process_task, + request_deserializer=rpc__pb2.TaskInfo.FromString, + response_serializer=rpc__pb2.Reply.SerializeToString, + ), + 'stop_tasks': grpc.unary_unary_rpc_method_handler( + servicer.stop_tasks, + request_deserializer=rpc__pb2.ReportMsg.FromString, + response_serializer=rpc__pb2.Reply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'Worker', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/src/utils/env.py b/src/utils/env.py index 3fcc8d1..d999516 100755 --- a/src/utils/env.py +++ b/src/utils/env.py @@ -79,5 +79,17 @@ def getenv(key): return os.environ.get("ALLOCATED_PORTS","10000-65535") elif key =="ALLOW_SCALE_OUT": return os.environ.get("ALLOW_SCALE_OUT", "False") + elif key == "BATCH_ON": + return os.environ.get("BATCH_ON","True") + elif key == "BATCH_MASTER_PORT": + return os.environ.get("BATCH_MASTER_PORT","50050") + elif key == "BATCH_WORKER_PORT": + return os.environ.get("BATCH_WORKER_PORT","50051") + elif key == "BATCH_GATEWAY": + return os.environ.get("BATCH_GATEWAY","10.0.3.1") + elif key == "BATCH_NET": + return os.environ.get("BATCH_NET","10.0.3.0/24") + elif key == "BATCH_MAX_THREAD_WORKER": + return os.environ.get("BATCH_MAX_THREAD_WORKER","5") else: return os.environ.get(key,"") diff --git a/src/utils/gputools.py b/src/utils/gputools.py new file mode 100644 index 0000000..03f5bef --- /dev/null +++ b/src/utils/gputools.py @@ -0,0 +1,120 @@ +import lxc +import subprocess +import os +import signal +from utils.log import logger + + +# Note: keep physical device id always the same as the virtual device id +# device_path e.g. /dev/nvidia0 +def add_device(container_name, device_path): + c = lxc.Container(container_name) + return c.add_device_node(device_path, device_path) + + +def remove_device(container_name, device_path): + c = lxc.Container(container_name) + return c.remove_device_node('', device_path) + + +# Mon May 21 10:51:45 2018 +# +-----------------------------------------------------------------------------+ +# | NVIDIA-SMI 381.22 Driver Version: 381.22 | +# |-------------------------------+----------------------+----------------------+ +# | GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC | +# | Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. | +# |===============================+======================+======================| +# | 0 GeForce GTX 108... Off | 0000:02:00.0 Off | N/A | +# | 33% 53C P2 59W / 250W | 295MiB / 11172MiB | 2% Default | +# +-------------------------------+----------------------+----------------------+ +# | 1 GeForce GTX 108... Off | 0000:84:00.0 Off | N/A | +# | 21% 35C P8 10W / 250W | 161MiB / 11172MiB | 0% Default | +# +-------------------------------+----------------------+----------------------+ +# +# +-----------------------------------------------------------------------------+ +# | Processes: GPU Memory | +# | GPU PID Type Process name Usage | +# |=============================================================================| +# | 0 111893 C python3 285MiB | +# | 1 111893 C python3 151MiB | +# +-----------------------------------------------------------------------------+ +# +def nvidia_smi(): + try: + ret = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + return ret.stdout.decode('utf-8').split('\n') + except subprocess.CalledProcessError: + return None + except Exception as e: + return None + + +def get_gpu_driver_version(): + output = nvidia_smi() + if not output: + return None + else: + return output[2].split()[-2] + + +def get_gpu_status(): + output = nvidia_smi() + if not output: + return [] + interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] + status_list = [] + for index in range(7, interval_index, 3): + status = {} + status['id'] = output[index].split()[1] + sp = output[index+1].split() + status['fan'] = sp[1] + status['memory'] = sp[8] + status['memory_max'] = sp[10] + status['util'] = sp[12] + status_list.append(status) + return status_list + + +def get_gpu_processes(): + output = nvidia_smi() + if not output: + return [] + interval_index = [index for index in range(len(output)) if len(output[index].strip()) == 0][0] + process_list = [] + for index in range(interval_index + 5, len(output)): + sp = output[index].split() + if len(sp) != 7: + break + process = {} + process['gpu'] = sp[1] + process['pid'] = sp[2] + process['name'] = sp[4] + process['memory'] = sp[5] + process['container'] = get_container_name_by_pid(sp[2]) + process_list.append(process) + return process_list + + +def get_container_name_by_pid(pid): + with open('/proc/%s/cgroup' % pid) as f: + content = f.readlines()[0].strip().split('/') + if content[1] != 'lxc': + return 'host' + else: + return content[2] + return None + + +def clean_up_processes_in_gpu(gpu_id): + logger.info('[gputools] start clean up processes in gpu %d' % gpu_id) + processes = get_gpu_processes() + for process in [p for p in processes if p['gpu'] == gpu_id]: + logger.info('[gputools] find process %d running in gpu %d' % (process['pid'], process['gpu'])) + if process['container'] == 'host': + logger.warning('[gputools] find process of host, ignored') + else: + logger.warning('[gputools] find process of container [%s], killed' % process['container']) + try: + os.kill(process['pid'], signal.SIGKILL) + except OSError: + continue diff --git a/src/worker/monitor.py b/src/worker/monitor.py index 23f2892..99170ed 100755 --- a/src/worker/monitor.py +++ b/src/worker/monitor.py @@ -19,7 +19,7 @@ Design:Monitor mainly consists of three parts: Collectors, Master_Collector and import subprocess,re,os,psutil,math,sys import time,threading,json,traceback,platform -from utils import env, etcdlib +from utils import env, etcdlib, gputools import lxc import xmlrpc.client from datetime import datetime @@ -262,6 +262,7 @@ class Container_Collector(threading.Thread): global pid2name global laststopcpuval global laststopruntime + is_batch = container_name.split('-')[1] == 'batch' # collect basic information, such as running time,state,pid,ip,name container = lxc.Container(container_name) basic_info = {} @@ -286,7 +287,8 @@ class Container_Collector(threading.Thread): containerpids.append(container_pid_str) pid2name[container_pid_str] = container_name running_time = self.get_proc_etime(container.init_pid) - running_time += laststopruntime[container_name] + if not is_batch: + running_time += laststopruntime[container_name] basic_info['PID'] = container_pid_str basic_info['IP'] = container.get_ips()[0] basic_info['RunningTime'] = running_time @@ -326,7 +328,8 @@ class Container_Collector(threading.Thread): cpu_use = {} lastval = 0 try: - lastval = laststopcpuval[container_name] + if not is_batch: + lastval = laststopcpuval[container_name] except: logger.warning(traceback.format_exc()) cpu_val += lastval @@ -369,7 +372,7 @@ class Container_Collector(threading.Thread): # deal with network used data containerids = re.split("-",container_name) - if len(containerids) >= 3: + if not is_batch and len(containerids) >= 3: workercinfo[container_name]['net_stats'] = self.net_stats[containerids[1] + '-' + containerids[2]] #logger.info(workercinfo[container_name]['net_stats']) @@ -378,7 +381,7 @@ class Container_Collector(threading.Thread): lasttime = lastbillingtime[container_name] #logger.info(lasttime) # process real billing if running time reach an hour - if not int(running_time/self.billingtime) == lasttime: + if not is_batch and not int(running_time/self.billingtime) == lasttime: #logger.info("billing:"+str(float(cpu_val))) lastbillingtime[container_name] = int(running_time/self.billingtime) self.billing_increment(container_name) @@ -478,6 +481,10 @@ class Collector(threading.Thread): info[idx][key] = val return [cpuset, info] + # collect gpu used information + def collect_gpuinfo(self): + return gputools.get_gpu_status() + # collect disk used information def collect_diskinfo(self): global workercinfo @@ -534,9 +541,10 @@ class Collector(threading.Thread): [cpuinfo,cpuconfig] = self.collect_cpuinfo() workerinfo['cpuinfo'] = cpuinfo workerinfo['cpuconfig'] = cpuconfig + workerinfo['gpuinfo'] = self.collect_gpuinfo() workerinfo['diskinfo'] = self.collect_diskinfo() workerinfo['running'] = True - #time.sleep(self.interval) + time.sleep(self.interval) if self.test: break # print(self.etcdser.getkey('/meminfo/total')) diff --git a/src/worker/ossmounter.py b/src/worker/ossmounter.py new file mode 100644 index 0000000..607ca45 --- /dev/null +++ b/src/worker/ossmounter.py @@ -0,0 +1,60 @@ +import abc +import subprocess, os +from utils.log import logger + +class OssMounter(object): + __metaclass__ = abc.ABCMeta + + @staticmethod + def execute_cmd(cmd): + ret = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + if ret.returncode != 0: + msg = ret.stdout.decode(encoding="utf-8") + logger.error(msg) + return [False,msg] + else: + return [True,""] + + @staticmethod + @abc.abstractmethod + def mount_oss(datapath, mount_info): + # mount oss + pass + + @staticmethod + @abc.abstractmethod + def umount_oss(datapath, mount_info): + # umount oss + pass + +class aliyunOssMounter(OssMounter): + + @staticmethod + def mount_oss(datapath, mount_info): + # mount oss + try: + pwdfile = open("/etc/passwd-ossfs","w") + pwdfile.write(mount_info.remotePath+":"+mount_info.accessKey+":"+mount_info.secretKey+"\n") + pwdfile.close() + except Exception as err: + logger.error(traceback.format_exc()) + return [False,msg] + + cmd = "chmod 640 /etc/passwd-ossfs" + [success1, msg] = OssMounter.execute_cmd(cmd) + mountpath = datapath+"/"+mount_info.remotePath + logger.info("Mount oss %s %s" % (mount_info.remotePath, mountpath)) + if not os.path.isdir(mountpath): + os.makedirs(mountpath) + cmd = "ossfs %s %s -ourl=%s" % (mount_info.remotePath, mountpath, mount_info.other) + [success, msg] = OssMounter.execute_cmd(cmd) + return [True,""] + + @staticmethod + def umount_oss(datapath, mount_info): + mountpath = datapath + "/" + mount_info.remotePath + logger.info("UMount oss %s %s" % (mount_info.remotePath, mountpath)) + cmd = "fusermount -u %s" % (mountpath) + [success, msg] = OssMounter.execute_cmd(cmd) + [success, msg] = OssMounter.execute_cmd("rm -rf %s" % mountpath) + return [True,""] diff --git a/src/worker/taskcontroller.py b/src/worker/taskcontroller.py new file mode 100755 index 0000000..3fd6daa --- /dev/null +++ b/src/worker/taskcontroller.py @@ -0,0 +1,454 @@ +#!/usr/bin/python3 +import sys +if sys.path[0].endswith("worker"): + sys.path[0] = sys.path[0][:-6] +from utils import env, tools +config = env.getenv("CONFIG") +#config = "/opt/docklet/local/docklet-running.conf" +tools.loadenv(config) +from utils.log import initlogging +initlogging("docklet-taskcontroller") +from utils.log import logger + +from concurrent import futures +import grpc +#from utils.log import logger +#from utils import env +import json,lxc,subprocess,threading,os,time,traceback +from utils import imagemgr,etcdlib,gputools +from worker import ossmounter +from protos import rpc_pb2, rpc_pb2_grpc + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 +MAX_RUNNING_TIME = _ONE_DAY_IN_SECONDS + +def ip_to_int(addr): + [a, b, c, d] = addr.split('.') + return (int(a)<<24) + (int(b)<<16) + (int(c)<<8) + int(d) + +def int_to_ip(num): + return str((num>>24)&255)+"."+str((num>>16)&255)+"."+str((num>>8)&255)+"."+str(num&255) + +class TaskController(rpc_pb2_grpc.WorkerServicer): + + def __init__(self): + rpc_pb2_grpc.WorkerServicer.__init__(self) + etcdaddr = env.getenv("ETCD") + logger.info ("using ETCD %s" % etcdaddr ) + + clustername = env.getenv("CLUSTER_NAME") + logger.info ("using CLUSTER_NAME %s" % clustername ) + + # init etcdlib client + try: + self.etcdclient = etcdlib.Client(etcdaddr, prefix = clustername) + except Exception: + logger.error ("connect etcd failed, maybe etcd address not correct...") + sys.exit(1) + else: + logger.info("etcd connected") + + # get master ip and report port + [success,masterip] = self.etcdclient.getkey("service/master") + if not success: + logger.error("Fail to get master ip address.") + sys.exit(1) + else: + self.master_ip = masterip + logger.info("Get master ip address: %s" % (self.master_ip)) + self.master_port = env.getenv('BATCH_MASTER_PORT') + + self.imgmgr = imagemgr.ImageMgr() + self.fspath = env.getenv('FS_PREFIX') + self.confpath = env.getenv('DOCKLET_CONF') + + self.taskmsgs = [] + self.msgslock = threading.Lock() + self.report_interval = 2 + + self.lock = threading.Lock() + self.mount_lock = threading.Lock() + self.cons_gateway = env.getenv('BATCH_GATEWAY') + self.cons_ips = env.getenv('BATCH_NET') + logger.info("Batch gateway ip address %s" % self.cons_gateway) + logger.info("Batch ip pools %s" % self.cons_ips) + + self.cidr = 32 - int(self.cons_ips.split('/')[1]) + self.ipbase = ip_to_int(self.cons_ips.split('/')[0]) + self.free_ips = [] + for i in range(2, (1 << self.cidr) - 1): + self.free_ips.append(i) + logger.info("Free ip addresses pool %s" % str(self.free_ips)) + + self.gpu_lock = threading.Lock() + self.gpu_status = {} + gpus = gputools.get_gpu_status() + for gpu in gpus: + self.gpu_status[gpu['id']] = "" + + self.start_report() + logger.info('TaskController init success') + + # Need Locks + def acquire_ip(self): + self.lock.acquire() + if len(self.free_ips) == 0: + return [False, "No free ips"] + ip = int_to_ip(self.ipbase + self.free_ips[0]) + self.free_ips.remove(self.free_ips[0]) + logger.info(str(self.free_ips)) + self.lock.release() + return [True, ip + "/" + str(32 - self.cidr)] + + # Need Locks + def release_ip(self,ipstr): + self.lock.acquire() + ipnum = ip_to_int(ipstr.split('/')[0]) - self.ipbase + self.free_ips.append(ipnum) + logger.info(str(self.free_ips)) + self.lock.release() + + def add_gpu_device(self, lxcname, gpu_need): + if gpu_need < 1: + return [True, ""] + self.gpu_lock.acquire() + use_gpus = [] + for gpuid in self.gpu_status.keys(): + if self.gpu_status[gpuid] == "" and gpu_need > 0: + use_gpus.append(gpuid) + gpu_need -= 1 + if gpu_need > 0: + self.gpu_lock.release() + return [False, "No free GPUs"] + for gpuid in use_gpus: + self.gpu_status[gpuid] = lxcname + try: + gputools.add_device(lxcname, "/dev/nvidiactl") + gputools.add_device(lxcname, "/dev/nvidia-uvm") + for gpuid in use_gpus: + gputools.add_device(lxcname,"/dev/nvidia"+str(gpuid)) + logger.info("Add gpu:"+str(gpuid) +" to lxc:"+str(lxcname)) + except Exception as e: + logger.error(traceback.format_exc()) + for gpuid in use_gpus: + self.gpu_status[gpuid] = "" + self.gpu_lock.release() + return [False, "Error occurs when adding gpu device."] + + self.gpu_lock.release() + return [True, ""] + + def release_gpu_device(self, lxcname): + self.gpu_lock.acquire() + for gpuid in self.gpu_status.keys(): + if self.gpu_status[gpuid] == lxcname: + self.gpu_status[gpuid] = "" + self.gpu_lock.release() + + #mount_oss + def mount_oss(self, datapath, mount_info): + self.mount_lock.acquire() + try: + for mount in mount_info: + provider = mount.provider + mounter = getattr(ossmounter,provider+"OssMounter",None) + if mounter is None: + self.mount_lock.release() + return [False, provider + " doesn't exist!"] + [success, msg] = mounter.mount_oss(datapath,mount) + if not success: + self.mount_lock.release() + return [False, msg] + except Exception as err: + self.mount_lock.release() + logger.error(traceback.format_exc()) + return [False,""] + + self.mount_lock.release() + return [True,""] + + #umount oss + def umount_oss(self, datapath, mount_info): + try: + for mount in mount_info: + provider = mount.provider + mounter = getattr(ossmounter,provider+"OssMounter",None) + if mounter is None: + return [False, provider + " doesn't exist!"] + [success, msg] = mounter.umount_oss(datapath,mount) + if not success: + return [False, msg] + except Exception as err: + logger.error(traceback.format_exc()) + return [False,""] + #accquire ip and create a container + def create_container(self,instanceid,username,image,lxcname,quota): + # acquire ip + [status, ip] = self.acquire_ip() + if not status: + return [False, ip] + + # prepare image and filesystem + status = self.imgmgr.prepareFS(username,image,lxcname,str(quota.disk)) + if not status: + self.release_ip(ip) + return [False, "Create container for batch failed when preparing filesystem"] + + rootfs = "/var/lib/lxc/%s/rootfs" % lxcname + + if not os.path.isdir("%s/global/users/%s" % (self.fspath,username)): + path = env.getenv('DOCKLET_LIB') + subprocess.call([path+"/master/userinit.sh", username]) + logger.info("user %s directory not found, create it" % username) + sys_run("mkdir -p /var/lib/lxc/%s" % lxcname) + logger.info("generate config file for %s" % lxcname) + + def config_prepare(content): + content = content.replace("%ROOTFS%",rootfs) + content = content.replace("%HOSTNAME%","batch-%s" % str(instanceid)) + content = content.replace("%CONTAINER_MEMORY%",str(quota.memory)) + content = content.replace("%CONTAINER_CPU%",str(quota.cpu*100000)) + content = content.replace("%FS_PREFIX%",self.fspath) + content = content.replace("%LXCSCRIPT%",env.getenv("LXC_SCRIPT")) + content = content.replace("%USERNAME%",username) + content = content.replace("%LXCNAME%",lxcname) + content = content.replace("%IP%",ip) + content = content.replace("%GATEWAY%",self.cons_gateway) + return content + + logger.info(self.confpath) + conffile = open(self.confpath+"/container.batch.conf", 'r') + conftext = conffile.read() + conffile.close() + + conftext = config_prepare(conftext) + + conffile = open("/var/lib/lxc/%s/config" % lxcname, 'w') + conffile.write(conftext) + conffile.close() + return [True, ip] + + def process_task(self, request, context): + logger.info('excute task with parameter: ' + str(request)) + taskid = request.id + instanceid = request.instanceid + + # get config from request + command = request.parameters.command.commandLine #'/root/getenv.sh' #parameter['Parameters']['Command']['CommandLine'] + #envs = {'MYENV1':'MYVAL1', 'MYENV2':'MYVAL2'} #parameters['Parameters']['Command']['EnvVars'] + pkgpath = request.parameters.command.packagePath + envs = request.parameters.command.envVars + envs['taskid'] = str(taskid) + envs['instanceid'] = str(instanceid) + image = {} + image['name'] = request.cluster.image.name + if request.cluster.image.type == rpc_pb2.Image.PRIVATE: + image['type'] = 'private' + elif request.cluster.image.type == rpc_pb2.Image.PUBLIC: + image['type'] = 'public' + else: + image['type'] = 'base' + image['owner'] = request.cluster.image.owner + username = request.username + token = request.token + lxcname = '%s-batch-%s-%s-%s' % (username,taskid,str(instanceid),token) + instance_type = request.cluster.instance + mount_list = request.cluster.mount + outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath] + timeout = request.timeout + gpu_need = int(request.cluster.instance.gpu) + reused = request.reused + + #create container + [success, ip] = self.create_container(instanceid, username, image, lxcname, instance_type) + if not success: + return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED, message=ip) + + #mount oss + self.mount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_list) + conffile = open("/var/lib/lxc/%s/config" % lxcname, 'a+') + mount_str = "lxc.mount.entry = %s/global/users/%s/oss/%s %s/root/oss/%s none bind,rw,create=dir 0 0" + for mount in mount_list: + conffile.write("\n"+ mount_str % (self.fspath, username, mount.remotePath, rootfs, mount.remotePath)) + conffile.close() + + container = lxc.Container(lxcname) + if not container.start(): + logger.error('start container %s failed' % lxcname) + self.release_ip(ip) + self.imgmgr.deleteFS(lxcname) + return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Can't start the container") + + logger.info('start container %s success' % lxcname) + + #add GPU + [success, msg] = self.add_gpu_device(lxcname,gpu_need) + if not success: + logger.error("Fail to add gpu device. " + msg) + container.stop() + self.release_ip(ip) + self.imgmgr.deleteFS(lxcname) + return rpc_pb2.Reply(status=rpc_pb2.Reply.REFUSED,message="Fail to add gpu device. " + msg) + + thread = threading.Thread(target = self.execute_task, args=(username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_list)) + thread.setDaemon(True) + thread.start() + + return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") + + def write_output(self,lxcname,tmplogpath,filepath): + cmd = "lxc-attach -n " + lxcname + " -- mv %s %s" + if filepath == "" or filepath == "/root/nfs/batch_{jobid}/" or os.path.abspath("/root/nfs/"+tmplogpath) == os.path.abspath(filepath): + return [True,""] + ret = subprocess.run(cmd % ("/root/nfs/"+tmplogpath,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True) + if ret.returncode != 0: + msg = ret.stdout.decode(encoding="utf-8") + logger.error(msg) + return [False,msg] + logger.info("Succeed to moving nfs/%s to %s" % (tmplogpath,filepath)) + return [True,""] + + def execute_task(self,username,taskid,instanceid,envs,lxcname,pkgpath,command,timeout,outpath,ip,token,mount_info): + lxcfspath = "/var/lib/lxc/"+lxcname+"/rootfs/" + scriptname = "batch_job.sh" + try: + scriptfile = open(lxcfspath+"root/"+scriptname,"w") + scriptfile.write("#!/bin/bash\n") + scriptfile.write("cd "+str(pkgpath)+"\n") + scriptfile.write(command) + scriptfile.close() + except Exception as err: + logger.error(traceback.format_exc()) + logger.error("Fail to write script file with taskid(%s) instanceid(%s)" % (str(taskid),str(instanceid))) + else: + try: + job_id = taskid.split('_')[1] + except Exception as e: + logger.error(traceback.format_exc()) + job_id = "_none" + jobdir = "batch_" + job_id + logdir = "%s/global/users/%s/data/" % (self.fspath,username) + jobdir + if not os.path.exists(logdir): + logger.info("Directory:%s not exists, create it." % logdir) + os.mkdir(logdir) + stdoutname = str(taskid)+"_"+str(instanceid)+"_stdout.txt" + stderrname = str(taskid)+"_"+str(instanceid)+"_stderr.txt" + try: + stdoutfile = open(logdir+"/"+stdoutname,"w") + stderrfile = open(logdir+"/"+stderrname,"w") + logger.info("Create stdout(%s) and stderr(%s) file to log" % (stdoutname, stderrname)) + except Exception as e: + logger.error(traceback.format_exc()) + stdoutfile = None + stderrfile = None + + cmd = "lxc-attach -n " + lxcname + for envkey,envval in envs.items(): + cmd = cmd + " -v %s=%s" % (envkey,envval) + cmd = cmd + " -- /bin/bash \"" + "/root/" + scriptname + "\"" + logger.info('run task with command - %s' % cmd) + p = subprocess.Popen(cmd,stdout=stdoutfile,stderr=stderrfile, shell=True) + #logger.info(p) + if timeout == 0: + to = MAX_RUNNING_TIME + else: + to = timeout + while p.poll() is None and to > 0: + time.sleep(min(2,to)) + to -= 2 + if p.poll() is None: + p.kill() + logger.info("Running time(%d) is out. Task(%s-%s-%s) will be killed." % (timeout,str(taskid),str(instanceid),token)) + self.add_msg(taskid,username,instanceid,rpc_pb2.TIMEOUT,token,"Running time is out.") + else: + [success1,msg1] = self.write_output(lxcname,jobdir+"/"+stdoutname,outpath[0]) + [success2,msg2] = self.write_output(lxcname,jobdir+"/"+stderrname,outpath[1]) + if not success1 or not success2: + if not success1: + msg = msg1 + else: + msg = msg2 + logger.info("Output error on Task(%s-%s-%s)." % (str(taskid),str(instanceid),token)) + self.add_msg(taskid,username,instanceid,rpc_pb2.OUTPUTERROR,token,msg) + else: + if p.poll() == 0: + logger.info("Task(%s-%s-%s) completed." % (str(taskid),str(instanceid),token)) + self.add_msg(taskid,username,instanceid,rpc_pb2.COMPLETED,token,"") + else: + logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(instanceid),token)) + self.add_msg(taskid,username,instanceid,rpc_pb2.FAILED,token,"") + + container = lxc.Container(lxcname) + if container.stop(): + logger.info("stop container %s success" % lxcname) + else: + logger.error("stop container %s failed" % lxcname) + + logger.info("deleting container:%s" % lxcname) + if self.imgmgr.deleteFS(lxcname): + logger.info("delete container %s success" % lxcname) + else: + logger.error("delete container %s failed" % lxcname) + + logger.info("release ip address %s" % ip) + self.release_ip(ip) + self.release_gpu_device(lxcname) + + #umount oss + self.umount_oss("%s/global/users/%s/oss" % (self.fspath,username), mount_info) + + def stop_tasks(self, request, context): + for msg in request.taskmsgs: + lxcname = '%s-batch-%s-%s-%s' % (msg.username,msg.taskid,str(msg.instanceid),msg.token) + logger.info("Stop the task with lxc:"+lxcname) + subprocess.run("lxc-stop -k -n %s" % lxcname, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) + return rpc_pb2.Reply(status=rpc_pb2.Reply.ACCEPTED,message="") + + def add_msg(self,taskid,username,instanceid,status,token,errmsg): + self.msgslock.acquire() + try: + self.taskmsgs.append(rpc_pb2.TaskMsg(taskid=str(taskid),username=username,instanceid=int(instanceid),instanceStatus=status,token=token,errmsg=errmsg)) + except Exception as err: + logger.error(traceback.format_exc()) + self.msgslock.release() + #logger.info(str(self.taskmsgs)) + + def report_msg(self): + channel = grpc.insecure_channel(self.master_ip+":"+self.master_port) + stub = rpc_pb2_grpc.MasterStub(channel) + while True: + self.msgslock.acquire() + reportmsg = rpc_pb2.ReportMsg(taskmsgs = self.taskmsgs) + try: + response = stub.report(reportmsg) + logger.info("Response from master by reporting: "+str(response.status)+" "+response.message) + except Exception as err: + logger.error(traceback.format_exc()) + self.taskmsgs = [] + self.msgslock.release() + time.sleep(self.report_interval) + + def start_report(self): + thread = threading.Thread(target = self.report_msg, args=()) + thread.setDaemon(True) + thread.start() + logger.info("Start to report task messages to master every %d seconds." % self.report_interval) + + +def TaskControllerServe(): + max_threads = int(env.getenv('BATCH_MAX_THREAD_WORKER')) + worker_port = int(env.getenv('BATCH_WORKER_PORT')) + logger.info("Max Threads on a worker is %d" % max_threads) + server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_threads)) + rpc_pb2_grpc.add_WorkerServicer_to_server(TaskController(), server) + server.add_insecure_port('[::]:'+str(worker_port)) + server.start() + logger.info("Start TaskController Servicer on port:%d" % worker_port) + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except KeyboardInterrupt: + server.stop(0) + +if __name__ == "__main__": + TaskControllerServe() diff --git a/src/worker/worker.py b/src/worker/worker.py index 88839c7..fb4e324 100755 --- a/src/worker/worker.py +++ b/src/worker/worker.py @@ -57,17 +57,23 @@ class Worker(object): self.etcd = etcdclient self.master = self.etcd.getkey("service/master")[1] - self.mode=None + self.mode = None + self.workertype = "normal" + self.key="" - # waiting state is preserved for compatible. - self.etcd.setkey("machines/runnodes/"+self.addr, "waiting") - # get this node's key to judge how to init. - [status, key] = self.etcd.getkey("machines/runnodes/"+self.addr) - if status: - self.key = generatekey("machines/allnodes/"+self.addr) - else: - logger.error("get key failed. %s" % 'machines/runnodes/'+self.addr) - sys.exit(1) + if len(sys.argv) > 1 and sys.argv[1] == "batch-worker": + self.workertype = "batch" + + if self.workertype == "normal": + # waiting state is preserved for compatible. + self.etcd.setkey("machines/runnodes/"+self.addr, "waiting") + # get this node's key to judge how to init. + [status, key] = self.etcd.getkey("machines/runnodes/"+self.addr) + if status: + self.key = generatekey("machines/allnodes/"+self.addr) + else: + logger.error("get key failed. %s" % 'machines/runnodes/'+self.addr) + sys.exit(1) # check token to check global directory [status, token_1] = self.etcd.getkey("token") @@ -87,7 +93,8 @@ class Worker(object): if node['key'] == self.key: value = 'init-recovery' break - logger.info("worker start in "+value+" mode") + + logger.info("worker start in "+value+" mode, worker type is"+self.workertype) Containers = container.Container(self.addr, etcdclient) if value == 'init-new': @@ -193,7 +200,8 @@ class Worker(object): self.hosts_collector.start() logger.info("Monitor Collector has been started.") # worker change it state itself. Independedntly from master. - self.etcd.setkey("machines/runnodes/"+self.addr, "work") + if self.workertype == "normal": + self.etcd.setkey("machines/runnodes/"+self.addr, "work") publicIP = env.getenv("PUBLIC_IP") self.etcd.setkey("machines/publicIP/"+self.addr,publicIP) self.thread_sendheartbeat = threading.Thread(target=self.sendheartbeat) @@ -204,17 +212,22 @@ class Worker(object): # send heardbeat package to keep alive in etcd, ttl=2s def sendheartbeat(self): - while(True): - # check send heartbeat package every 1s - time.sleep(2) - [status, value] = self.etcd.getkey("machines/runnodes/"+self.addr) - if status: - # master has know the worker so we start send heartbeat package - if value=='ok': - self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 3) - else: - logger.error("get key %s failed, master may be crashed" % self.addr) - self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) + if self.workertype == "normal": + while(True): + # check send heartbeat package every 1s + time.sleep(2) + [status, value] = self.etcd.getkey("machines/runnodes/"+self.addr) + if status: + # master has know the worker so we start send heartbeat package + if value=='ok': + self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) + else: + logger.error("get key %s failed, master may be crashed" % self.addr) + self.etcd.setkey("machines/runnodes/"+self.addr, "ok", ttl = 60) + elif self.workertype == "batch": + while(True): + time.sleep(2) + self.etcd.setkey("machines/batchnodes/"+self.addr, "ok", ttl = 60) if __name__ == '__main__': diff --git a/web/static/js/plot_monitor.js b/web/static/js/plot_monitor.js index f669723..4470fc0 100755 --- a/web/static/js/plot_monitor.js +++ b/web/static/js/plot_monitor.js @@ -240,21 +240,38 @@ function processInfo() $("#con_disk").html(usedp+"%
"+detail); //processNetStats - var net_stats = data.monitor.net_stats; - var in_rate = parseInt(net_stats.bytes_recv_per_sec); - var out_rate = parseInt(net_stats.bytes_sent_per_sec); - ingress_rate = in_rate; - egress_rate = out_rate; - $("#net_in_rate").html(num2human(in_rate)+"Bps"); - $("#net_out_rate").html(num2human(out_rate)+"Bps"); - $("#net_in_bytes").html(num2human(net_stats.bytes_recv)+"B"); - $("#net_out_bytes").html(num2human(net_stats.bytes_sent)+"B"); - $("#net_in_packs").html(net_stats.packets_recv); - $("#net_out_packs").html(net_stats.packets_sent); - $("#net_in_err").html(net_stats.errout); - $("#net_out_err").html(net_stats.errin); - $("#net_in_drop").html(net_stats.dropout); - $("#net_out_drop").html(net_stats.dropin); + var net_stats = data.monitor.net_stats; + if(!$.isEmptyObject(net_stats)) + { + var in_rate = parseInt(net_stats.bytes_recv_per_sec); + var out_rate = parseInt(net_stats.bytes_sent_per_sec); + ingress_rate = in_rate; + egress_rate = out_rate; + $("#net_in_rate").html(num2human(in_rate)+"Bps"); + $("#net_out_rate").html(num2human(out_rate)+"Bps"); + $("#net_in_bytes").html(num2human(net_stats.bytes_recv)+"B"); + $("#net_out_bytes").html(num2human(net_stats.bytes_sent)+"B"); + $("#net_in_packs").html(net_stats.packets_recv); + $("#net_out_packs").html(net_stats.packets_sent); + $("#net_in_err").html(net_stats.errout); + $("#net_out_err").html(net_stats.errin); + $("#net_in_drop").html(net_stats.dropout); + $("#net_out_drop").html(net_stats.dropin); + } + else { + ingress_rate = 0; + egress_rate = 0; + $("#net_in_rate").html("--"); + $("#net_out_rate").html("--"); + $("#net_in_bytes").html("--"); + $("#net_out_bytes").html("--"); + $("#net_in_packs").html("--"); + $("#net_out_packs").html("--"); + $("#net_in_err").html("--"); + $("#net_out_err").html("--"); + $("#net_in_drop").html("--"); + $("#net_out_drop").html("--"); + } },"json"); } diff --git a/web/templates/base_AdminLTE.html b/web/templates/base_AdminLTE.html index b1ac314..d015311 100644 --- a/web/templates/base_AdminLTE.html +++ b/web/templates/base_AdminLTE.html @@ -174,6 +174,9 @@ + {% if mysession['usergroup'] == 'root' or mysession['usergroup'] == 'admin'%} diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html new file mode 100644 index 0000000..31945f6 --- /dev/null +++ b/web/templates/batch/batch_create.html @@ -0,0 +1,248 @@ +{% extends 'base_AdminLTE.html' %} + +{% block title %}Docklet | Create Batch Job{% endblock %} + +{% block css_src %} + + + + + +{% endblock %} + +{% block panel_title %}Batch Job Info{% endblock %} + +{% block panel_list %} + +{% endblock %} + +
+{% block content %} +
+
+
+
+

Batch Job Create + +

+ +
+ + +
+
+
+
+ +
+
+
+
+
+
+
+
+
+ +
+
+ +
+
+
+
+
+
+ +
+
+
+
+
+
+
+ +
+
+{% endblock %} + +{% block script_src %} + + + + + + + + + + + + + + + + + + +{% endblock %} diff --git a/web/templates/batch/batch_list.html b/web/templates/batch/batch_list.html new file mode 100644 index 0000000..a8914d6 --- /dev/null +++ b/web/templates/batch/batch_list.html @@ -0,0 +1,126 @@ +{% extends "base_AdminLTE.html"%} +{% block title %}Docklet | Batch Job{% endblock %} + +{% block panel_title %}Batch Job{% endblock %} + +{% block panel_list %} + +{% endblock %} +{% block content %} +
+
+
+
+

Batch Job List

+ +
+ + +
+
+
+ +

+ +

+ {% for job_info in job_list %} + + {% endfor %} +
+ + + + + + + + + + + + + + {% for job_info in job_list %} + + + + + + + + + + {% endfor %} + +
IDNameStatusTasksOperationsCreate TimeStdout and Stderr
{{ job_info['job_id'] }}{{ job_info['job_name'] }} + {{ job_info['status'] }} + Tasks{{ job_info['create_time'] }}Get Output
+
+
+
+
+
+ +{% endblock %} +{% block script_src %} + + + + +{% endblock %} diff --git a/web/templates/batch/batch_output.html b/web/templates/batch/batch_output.html new file mode 100644 index 0000000..58bc011 --- /dev/null +++ b/web/templates/batch/batch_output.html @@ -0,0 +1,59 @@ + + + + + + Docklet | Batch {{ issue }}: {{ jobid }}/{{ taskid }}/{{ instid }} + + + + + + + + + + + + + + + + + + + + + + + +

Jobid: {{ jobid }}

+

Taskid: {{ taskid }}

+

Instanceid: {{ instid }}

+

The output of {{ issue }} will be updated in every 2 seconds.

+
+
{{ output }}
+ + + + + + + + + + + + + + diff --git a/web/templates/batch/batch_state.html b/web/templates/batch/batch_state.html new file mode 100644 index 0000000..e69de29 diff --git a/web/web.py b/web/web.py index 4f9bda0..a4ce826 100755 --- a/web/web.py +++ b/web/web.py @@ -41,6 +41,7 @@ from webViews.reportbug import * from webViews.authenticate.auth import login_required, administration_required,activated_required from webViews.authenticate.register import registerView from webViews.authenticate.login import loginView, logoutView +from webViews.batch import * import webViews.dockletrequest from webViews import cookie_tool import traceback @@ -127,6 +128,49 @@ def reportBug(): reportBugView.bugmessage = request.form['bugmessage'] return reportBugView.as_view() +@app.route("/batch_jobs/", methods=['GET']) +@login_required +def batch_job(): + return batchJobListView().as_view() + +@app.route("/batch_job/create/", methods=['GET']) +@login_required +def create_batch_job(): + return createBatchJobView().as_view() + +@app.route("/batch_job//add/", methods=['POST']) +@login_required +def add_batch_job(masterip): + addBatchJobView.masterip = masterip + addBatchJobView.job_data = request.form + return addBatchJobView().as_view() + +@app.route("/batch_job/state/", methods=['GET']) +@login_required +def state_batch_job(): + return stateBatchJobView().as_view() + +@app.route("/batch_job/output/////", methods=['GET']) +@login_required +def output_batch_job(jobid, taskid, instid, issue): + outputBatchJobView.jobid = jobid + outputBatchJobView.taskid = taskid + outputBatchJobView.instid = instid + outputBatchJobView.issue = issue + return outputBatchJobView().as_view() + +@app.route("/batch/job/output//////", methods=['POST']) +@login_required +def output_batch_job_request(masterip, jobid, taskid, instid, issue): + data = { + 'jobid':jobid, + 'taskid':taskid, + 'instid':instid, + 'issue':issue + } + result = dockletRequest.post("/batch/job/output/",data,masterip) + return json.dumps(result) + @app.route("/workspace/create/", methods=['GET']) #@activated_required def addCluster(): diff --git a/web/webViews/batch.py b/web/webViews/batch.py new file mode 100644 index 0000000..20afd44 --- /dev/null +++ b/web/webViews/batch.py @@ -0,0 +1,78 @@ +from flask import session, redirect, request +from webViews.view import normalView +from webViews.log import logger +from webViews.checkname import checkname +from webViews.dockletrequest import dockletRequest + +class batchJobListView(normalView): + template_path = "batch/batch_list.html" + + @classmethod + def get(self): + masterips = dockletRequest.post_to_all() + result = dockletRequest.post("/batch/job/list/",{},masterips[0].split("@")[0]) + job_list = result.get("data") + logger.debug("job_list: %s" % job_list) + if True: + return self.render(self.template_path, job_list=job_list) + else: + return self.error() + +class createBatchJobView(normalView): + template_path = "batch/batch_create.html" + + @classmethod + def get(self): + masterips = dockletRequest.post_to_all() + images = dockletRequest.post("/image/list/",{},masterips[0].split("@")[0]).get("images") + if True: + return self.render(self.template_path, masterips=masterips, images=images) + else: + return self.error() + +class stateBatchJobView(normalView): + template_path = "batch/batch_state.html" + + @classmethod + def get(self): + if True: + return self.render(self.template_path) + else: + return self.error() + +class addBatchJobView(normalView): + template_path = "batch/batch_list.html" + + @classmethod + def post(self): + masterip = self.masterip + result = dockletRequest.post("/batch/job/add/", self.job_data, masterip) + if result.get('success', None) == "true": + return redirect('/batch_jobs/') + else: + return self.error() + +class outputBatchJobView(normalView): + template_path = "batch/batch_output.html" + jobid = "" + taskid = "" + instid = "" + issue = "" + + @classmethod + def get(self): + masterips = dockletRequest.post_to_all() + data = { + 'jobid':self.jobid, + 'taskid':self.taskid, + 'instid':self.instid, + 'issue':self.issue + } + result = dockletRequest.post("/batch/job/output/",data,masterips[0].split("@")[0]) + output = result.get("data") + #logger.debug("job_list: %s" % job_list) + if result.get('success',"") == "true": + return self.render(self.template_path, masterip=masterips[0].split("@")[0], jobid=self.jobid, + taskid=self.taskid, instid=self.instid, issue=self.issue, output=output) + else: + return self.error() diff --git a/web/webViews/monitor.py b/web/webViews/monitor.py index dcb9d3d..cc87736 100755 --- a/web/webViews/monitor.py +++ b/web/webViews/monitor.py @@ -21,7 +21,6 @@ class statusView(normalView): print(quotainfo)''' allcontainers = {} if (result): - containers = {} for master in allclusters: allcontainers[master] = {} for cluster in allclusters[master]: @@ -32,6 +31,18 @@ class statusView(normalView): else: self.error() allcontainers[master][cluster] = message + message = dockletRequest.post('/batch/vnodes/list/', data, master.split("@")[0]) + message = message.get('data') + containers = [] + for m in message: + container = {} + container['containername'] = m + container['ip'] = '--' + containers.append(container) + tmp = {} + tmp['containers'] = containers + tmp['status'] = 'running' + allcontainers[master]['Batch_Job'] = tmp return self.render(self.template_path, quotas = quotas, quotanames = quotanames, allcontainers = allcontainers, user = session['username']) else: self.error()