Merge pull request #378 from FirmlyReality/batch

Batch
This commit is contained in:
Yujian Zhu 2019-04-01 01:46:04 +08:00 committed by GitHub
commit 0c14f3684d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 867 additions and 201 deletions

View File

@ -792,6 +792,7 @@ def resetall_system(user, beans, form):
@app.route("/batch/job/add/", methods=['POST'])
@login_required
@beans_check
def add_job(user,beans,form):
global G_jobmgr
job_data = form.to_dict()
@ -879,6 +880,17 @@ def list_job(user,beans,form):
}
return json.dumps(result)
@app.route("/batch/job/info/", methods=['POST'])
@login_required
def info_job(user,beans,form):
global G_jobmgr
jobid = form.get("jobid","")
[success, data] = G_jobmgr.get_job(user, jobid)
if success:
return json.dumps({'success':'true', 'data':data})
else:
return json.dumps({'success':'false', 'message': data})
@app.route("/batch/job/stop/", methods=['POST'])
@login_required
def stop_job(user,beans,form):
@ -904,12 +916,6 @@ def get_output(user,beans,form):
}
return json.dumps(result)
@app.route("/batch/job/info/", methods=['POST'])
@login_required
def info_job(user,beans,form):
pass
@app.route("/batch/task/info/", methods=['POST'])
@login_required
def info_task(user,beans,form):

View File

@ -1,33 +1,46 @@
import time, threading, random, string, os, traceback
import time, threading, random, string, os, traceback, requests
import master.monitor
import subprocess,json
from functools import wraps
from datetime import datetime
from utils.log import initlogging, logger
from utils.model import db, Batchjob, Batchtask
from utils import env
def db_commit():
try:
db.session.commit()
except Exception as err:
db.session.rollback()
logger.error(traceback.format_exc())
raise
class BatchJob(object):
def __init__(self, user, job_info):
def __init__(self, jobid, user, job_info):
self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority']))
self.user = user
self.raw_job_info = job_info
self.job_id = None
#self.raw_job_info = job_info
self.job_id = jobid
self.job_name = job_info['jobName']
self.job_priority = int(job_info['jobPriority'])
self.status = 'pending'
self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
self.lock = threading.Lock()
self.tasks = {}
self.dependency_out = {}
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0}
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0, 'stopped':0}
#init self.tasks & self.dependency_out & self.tasks_cnt
logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time))
raw_tasks = self.raw_job_info["tasks"]
logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.username, self.job_db.name, str(self.job_db.create_time)))
raw_tasks = job_info["tasks"]
self.tasks_cnt['pending'] = len(raw_tasks.keys())
for task_idx in raw_tasks.keys():
task_info = raw_tasks[task_idx]
task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info)
self.job_db.tasks.append(task_db)
self.tasks[task_idx] = {}
self.tasks[task_idx]['id'] = jobid+"_"+task_idx
self.tasks[task_idx]['config'] = task_info
self.tasks[task_idx]['db'] = task_db
self.tasks[task_idx]['status'] = 'pending'
self.tasks[task_idx]['dependency'] = []
dependency = task_info['dependency'].strip().replace(' ', '').split(',')
@ -41,8 +54,11 @@ class BatchJob(object):
self.dependency_out[d] = []
self.dependency_out[d].append(task_idx)
db.session.add(self.job_db)
db_commit()
self.log_status()
logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3)))
logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_db.id, json.dumps(self.dependency_out, indent=3)))
def data_lock(f):
@wraps(f)
@ -60,7 +76,7 @@ class BatchJob(object):
# return the tasks without dependencies
@data_lock
def get_tasks_no_dependency(self,update_status=False):
logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id)
logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_db.id)
ret_tasks = []
for task_idx in self.tasks.keys():
if (self.tasks[task_idx]['status'] == 'pending' and
@ -68,53 +84,83 @@ class BatchJob(object):
if update_status:
self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'scheduling'
self.tasks[task_idx]['status'] = 'scheduling'
task_name = self.job_id + '_' + task_idx
task_name = self.tasks[task_idx]['db'].id
ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority])
self.log_status()
db_commit()
return ret_tasks
@data_lock
def stop_tasks(self):
for task_idx in self.tasks.keys():
self.tasks[task_idx]['status'] = 'stopped'
def stop_job(self):
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.status = 'stopping'
db_commit()
# update status of this job based
def _update_job_status(self):
allcnt = len(self.tasks.keys())
if self.tasks_cnt['failed'] != 0:
self.status = 'failed'
elif self.tasks_cnt['running'] != 0:
self.status = 'running'
self.job_db.status = 'failed'
self.job_db.end_time = datetime.now()
elif self.tasks_cnt['finished'] == allcnt:
self.status = 'done'
self.job_db.status = 'done'
self.job_db.end_time = datetime.now()
elif self.job_db.status == 'stopping':
if self.tasks_cnt['running'] == 0 and self.tasks_cnt['scheduling'] == 0 and self.tasks_cnt['retrying'] == 0:
self.job_db.status = 'stopped'
self.job_db.end_time = datetime.now()
elif self.tasks_cnt['running'] != 0 or self.tasks_cnt['retrying'] != 0:
self.job_db.status = 'running'
else:
self.status = 'pending'
self.job_db.status = 'pending'
db_commit()
# start run a task, update status
@data_lock
def update_task_running(self, task_idx):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status'].split('(')[0]
old_status = self.tasks[task_idx]['status']
if old_status == 'stopping':
logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id))
return
self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'running'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'running'
self.tasks[task_idx]['db'].start_time = datetime.now()
self.tasks_cnt['running'] += 1
self.job_db = Batchjob.query.get(self.job_id)
self._update_job_status()
self.log_status()
# a task has finished, update dependency and return tasks without dependencies
@data_lock
def finish_task(self, task_idx):
def finish_task(self, task_idx, running_time, billing):
if task_idx not in self.tasks.keys():
logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id))
return []
logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status'].split('(')[0]
logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished(running_time=%d,billing=%d). Update dependency..." % (task_idx, self.job_id, running_time, billing))
old_status = self.tasks[task_idx]['status']
if old_status == 'stopping':
logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id))
return
self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'finished'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'finished'
self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].end_time = datetime.now()
self.tasks[task_idx]['db'].billing = billing
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing
self.tasks_cnt['finished'] += 1
self._update_job_status()
if task_idx not in self.dependency_out.keys():
self._update_job_status()
self.log_status()
return []
ret_tasks = []
@ -129,8 +175,11 @@ class BatchJob(object):
self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1
self.tasks[out_idx]['status'] = 'scheduling'
self.tasks[out_idx]['db'] = Batchtask.query.get(self.tasks[out_idx]['id'])
self.tasks[out_idx]['db'].status = 'scheduling'
task_name = self.job_id + '_' + out_idx
ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority])
self._update_job_status()
self.log_status()
return ret_tasks
@ -138,27 +187,62 @@ class BatchJob(object):
@data_lock
def update_task_retrying(self, task_idx, reason, tried_times):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
old_status = self.tasks[task_idx]['status'].split('(')[0]
old_status = self.tasks[task_idx]['status']
if old_status == 'stopping':
logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id))
return
self.tasks_cnt[old_status] -= 1
self.tasks_cnt['retrying'] += 1
self.tasks[task_idx]['status'] = 'retrying(%s)(%d times)' % (reason, int(tried_times))
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'retrying'
self.tasks[task_idx]['db'].failed_reason = reason
self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['status'] = 'retrying'
self.job_db = Batchjob.query.get(self.job_id)
self._update_job_status()
self.log_status()
# update failed status of task
@data_lock
def update_task_failed(self, task_idx, reason, tried_times):
def update_task_failed(self, task_idx, reason, tried_times, running_time, billing):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
old_status = self.tasks[task_idx]['status'].split('(')[0]
old_status = self.tasks[task_idx]['status']
self.tasks_cnt[old_status] -= 1
self.tasks_cnt['failed'] += 1
if reason == "OUTPUTERROR":
self.tasks[task_idx]['status'] = 'failed(OUTPUTERROR)'
else:
self.tasks[task_idx]['status'] = 'failed(%s)(%d times)' % (reason, int(tried_times))
self.tasks[task_idx]['status'] = 'failed'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'failed'
self.tasks[task_idx]['db'].failed_reason = reason
self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['db'].end_time = datetime.now()
self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].billing = billing
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing
self._update_job_status()
self.log_status()
@data_lock
def update_task_stopped(self, task_idx, running_time, billing):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) stopped.running_time:%d billing:%d" % (task_idx, self.job_id, int(running_time), billing))
old_status = self.tasks[task_idx]['status']
if old_status == 'failed' or old_status == 'finished' or old_status == 'stopped':
logger.info("task(idx:%s) of BatchJob(id:%s) has been done."%(task_idx, self.job_id))
return False
self.tasks_cnt[old_status] -= 1
self.tasks_cnt['stopped'] += 1
self.tasks[task_idx]['status'] = 'stopped'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'stopped'
self.tasks[task_idx]['db'].end_time = datetime.now()
self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].billing = billing
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing
self._update_job_status()
self.log_status()
return True
# print status for debuging
def log_status(self):
task_copy = {}
@ -168,24 +252,55 @@ class BatchJob(object):
task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency']
logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3)))
logger.debug("BatchJob(id:%s) tasks_cnt: %s" % (self.job_id, self.tasks_cnt))
logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status))
logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.job_db.status))
class JobMgr():
# load job information from etcd
# initial a job queue and job schedueler
def __init__(self, taskmgr):
try:
Batchjob.query.all()
except:
db.create_all(bind='__all__')
self.job_map = {}
self.taskmgr = taskmgr
self.fspath = env.getenv('FS_PREFIX')
self.lock = threading.Lock()
self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT'))
self.auth_key = env.getenv('AUTH_KEY')
def charge_beans(self,username,billing):
logger.debug("Charge user(%s) for %d beans"%(username, billing))
data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key}
url = "/billing/beans/"
return requests.post(self.userpoint+url,data=data).json()
def add_lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
self.lock.acquire()
try:
result = f(self, *args, **kwargs)
except Exception as err:
self.lock.release()
raise err
self.lock.release()
return result
return new_f
@add_lock
def create_job(self, user, job_info):
jobid = self.gen_jobid()
job = BatchJob(jobid, user, job_info)
return job
# user: username
# job_info: a json string
# user submit a new job, add this job to queue and database
def add_job(self, user, job_info):
try:
job = BatchJob(user, job_info)
job.job_id = self.gen_jobid()
job = self.create_job(user, job_info)
self.job_map[job.job_id] = job
self.process_job(job)
except ValueError as err:
@ -201,17 +316,18 @@ class JobMgr():
# jobid: the id of job
def stop_job(self, user, job_id):
logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user))
if job_id not in self.job_map.keys():
return [False,"Job id %s does not exists! Maybe it has been finished."%job_id]
try:
job = self.job_map[job_id]
if job.status == 'done' or job.status == 'failed':
if job.job_db.status == 'done' or job.job_db.status == 'failed':
return [True,""]
if job.user != user:
raise Exception("Wrong User.")
for task_idx in job.tasks.keys():
taskid = job_id + '_' + task_idx
task = self.taskmgr.get_task(taskid)
self.taskmgr.stop_remove_task(task)
job.status = 'stopped'
self.taskmgr.lazy_stop_task(taskid)
job.stop_job()
except Exception as err:
logger.error(traceback.format_exc())
#logger.error(err)
@ -221,42 +337,44 @@ class JobMgr():
# user: username
# list a user's all job
def list_jobs(self,user):
alljobs = Batchjob.query.filter_by(username=user).all()
res = []
for job_id in self.job_map.keys():
job = self.job_map[job_id]
logger.debug('job_id: %s, user: %s' % (job_id, job.user))
if job.user == user:
all_tasks = job.raw_job_info['tasks']
for job in alljobs:
jobdata = json.loads(str(job))
tasks = job.tasks.all()
jobdata['tasks'] = [t.idx for t in tasks]
tasks_vnodeCount = {}
for task in all_tasks.keys():
tasks_vnodeCount[task] = int(all_tasks[task]['vnodeCount'])
res.append({
'job_name': job.job_name,
'job_id': job.job_id,
'status': job.status,
'create_time': job.create_time,
'tasks': list(all_tasks.keys()),
'tasks_vnodeCount': tasks_vnodeCount
})
res.sort(key=lambda x:x['create_time'],reverse=True)
for t in tasks:
tasks_vnodeCount[t.idx] = int(json.loads(t.config)['vnodeCount'])
jobdata['tasks_vnodeCount'] = tasks_vnodeCount
res.append(jobdata)
return res
# user: username
# jobid: the id of job
# get the information of a job, including the status, json description and other information
# call get_task to get the task information
def get_job(self, user, job_id):
pass
job = Batchjob.query.get(job_id)
if job is None:
return [False, "Jobid(%s) does not exist."%job_id]
if job.username != user:
return [False, "Wrong User!"]
jobdata = json.loads(str(job))
tasks = job.tasks.order_by(Batchtask.idx).all()
tasksdata = [json.loads(str(t)) for t in tasks]
jobdata['tasks'] = tasksdata
return [True, jobdata]
# check if a job exists
def is_job_exist(self, job_id):
return job_id in self.job_map.keys()
return Batchjob.query.get(job_id) != None
# generate a random job id
def gen_jobid(self):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
datestr = datetime.now().strftime("%y%m%d")
job_id = datestr+''.join(random.sample(string.ascii_letters + string.digits, 3))
while self.is_job_exist(job_id):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
job_id = datestr+''.join(random.sample(string.ascii_letters + string.digits, 3))
return job_id
# add tasks into taskmgr's queue
@ -277,27 +395,53 @@ class JobMgr():
# report task status from taskmgr when running, failed and finished
# task_name: job_id + '_' + task_idx
# status: 'running', 'finished', 'retrying', 'failed'
# status: 'running', 'finished', 'retrying', 'failed', 'stopped'
# reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR"
# tried_times: how many times the task has been tried.
def report(self, user, task_name, status, reason="", tried_times=1):
def report(self, user, task_name, status, reason="", tried_times=1, running_time=0, billing=0):
split_task_name = task_name.split('_')
if len(split_task_name) != 2:
logger.error("Illegal task_name(%s) report from taskmgr" % task_name)
logger.error("[jobmgr report]Illegal task_name(%s) report from taskmgr" % task_name)
return
if billing > 0 and (status == 'failed' or status == 'finished'):
self.charge_beans(user, billing)
job_id, task_idx = split_task_name
if job_id not in self.job_map.keys():
logger.error("[jobmgr report]jobid(%s) does not exist. task_name(%s)" % (job_id,task_name))
#update data in db
taskdb = Batchtask.query.get(task_name)
if (taskdb is None or taskdb.status == 'finished' or
taskdb.status == 'failed' or taskdb.status == 'stopped'):
return
taskdb.status = status
if status == 'failed':
taskdb.failed_reason = reason
if status == 'failed' or status == 'stopped' or status == 'finished':
taskdb.end_time = datetime.now()
if billing > 0:
taskdb.running_time = running_time
taskdb.billing = billing
db_commit()
return
job = self.job_map[job_id]
if status == "running":
#logger.debug(str(job.job_db))
job.update_task_running(task_idx)
#logger.debug(str(job.job_db))
elif status == "finished":
next_tasks = job.finish_task(task_idx)
if len(next_tasks) == 0:
return
#logger.debug(str(job.job_db))
next_tasks = job.finish_task(task_idx, running_time, billing)
ret = self.add_task_taskmgr(user, next_tasks)
#logger.debug(str(job.job_db))
elif status == "retrying":
job.update_task_retrying(task_idx, reason, tried_times)
elif status == "failed":
job.update_task_failed(task_idx, reason, tried_times)
job.update_task_failed(task_idx, reason, tried_times, running_time, billing)
elif status == "stopped":
if job.update_task_stopped(task_idx, running_time, billing) and billing > 0:
self.charge_beans(user, billing)
if job.job_db.status == 'done' or job.job_db.status == 'failed' or job.job_db.status == 'stopped':
del self.job_map[job_id]
# Get Batch job stdout or stderr from its file
def get_output(self, username, jobid, taskid, vnodeid, issue):

View File

@ -3,7 +3,7 @@ import time
import string
import os
import random, copy, subprocess
import json
import json, math
from functools import wraps
# must import logger after initlogging, ugly
@ -29,6 +29,7 @@ class Task():
self.id = task_id
self.username = username
self.status = WAITING
self.failed_reason = ""
# if all the vnodes must be started at the same time
self.at_same_time = at_same_time
# priority the bigger the better
@ -46,6 +47,26 @@ class Task():
max_retry_count = task_info['max_retry_count']
) for (index, task_info) in enumerate(task_infos)]
def get_billing(self):
billing_beans = 0
running_time = 0
cpu_price = 1 / 3600.0 # /core*s
mem_price = 1 / 3600.0 # /GB*s
disk_price = 1 / 3600.0 # /GB*s
gpu_price = 100 / 3600.0 # /core*s
for subtask in self.subtask_list:
tmp_time = subtask.running_time
cpu_beans = subtask.vnode_info.vnode.instance.cpu * tmp_time * cpu_price
mem_beans = subtask.vnode_info.vnode.instance.memory / 1024.0 * tmp_time * mem_price
disk_beans = subtask.vnode_info.vnode.instance.disk / 1024.0 * tmp_time * disk_price
gpu_beans = subtask.vnode_info.vnode.instance.gpu * tmp_time * gpu_price
logger.info("subtask:%s running_time=%f beans for: cpu=%f mem_beans=%f disk_beans=%f gpu_beans=%f"
%(self.id, tmp_time, cpu_beans, mem_beans, disk_beans, gpu_beans ))
beans = math.ceil(cpu_beans + mem_beans + disk_beans + gpu_beans)
running_time += tmp_time
billing_beans += beans
return running_time, billing_beans
def __lt__(self, other):
return self.priority < other.priority
@ -87,16 +108,18 @@ class SubTask():
self.task_started = False
self.start_at = 0
self.end_at = 0
self.running_time = 0
self.status = WAITING
self.status_reason = ''
self.try_count = 0
self.worker = None
def waiting_for_retry(self):
def waiting_for_retry(self,reason=""):
self.try_count += 1
self.status = WAITING if self.try_count <= self.max_retry_count else FAILED
if self.status == FAILED and self.root_task.at_same_time:
if self.status == FAILED:
self.root_task.status = FAILED
self.failed_reason = reason
class TaskReporter(MasterServicer):
@ -123,7 +146,10 @@ class TaskMgr(threading.Thread):
self.task_queue = []
self.lazy_append_list = []
self.lazy_delete_list = []
self.lazy_stop_list = []
self.task_queue_lock = threading.Lock()
self.stop_lock = threading.Lock()
self.add_lock = threading.Lock()
#self.user_containers = {}
self.scheduler_interval = scheduler_interval
@ -155,23 +181,21 @@ class TaskMgr(threading.Thread):
self.logger.info("Free nets addresses pool %s" % str(self.free_nets))
self.logger.info("Each Batch Net CIDR:%s"%(str(self.task_cidr)))
def queue_lock(f):
def data_lock(lockname):
def lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
self.task_queue_lock.acquire()
lockobj = getattr(self,lockname)
lockobj.acquire()
try:
result = f(self, *args, **kwargs)
self.task_queue_lock.release()
return result
return new_f
def net_lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
self.network_lock.acquire()
result = f(self, *args, **kwargs)
self.network_lock.release()
except Exception as err:
lockobj.release()
raise err
lockobj.release()
return result
return new_f
return lock
def run(self):
self.serve()
@ -195,14 +219,36 @@ class TaskMgr(threading.Thread):
self.server.stop(0)
self.logger.info('[taskmgr_rpc] stop rpc server')
@queue_lock
@data_lock('task_queue_lock')
@data_lock('add_lock')
@data_lock('stop_lock')
def sort_out_task_queue(self):
for task in self.task_queue:
if task.id in self.lazy_stop_list:
self.stop_remove_task(task)
self.lazy_delete_list.append(task)
running_time, billing = task.get_billing()
self.logger.info('task %s stopped, running_time:%s billing:%d'%(task.id, str(running_time), billing))
running_time = math.ceil(running_time)
self.jobmgr.report(task.username, task.id,'stopped',running_time=running_time,billing=billing)
while self.lazy_delete_list:
task = self.lazy_delete_list.pop(0)
try:
self.task_queue.remove(task)
except Exception as err:
self.logger.warning(str(err))
new_append_list = []
for task in self.lazy_append_list:
if task.id in self.lazy_stop_list:
self.jobmgr.report(task.username, task.id, 'stopped')
else:
new_append_list.append(task)
self.lazy_append_list = new_append_list
self.lazy_stop_list.clear()
if self.lazy_append_list:
self.task_queue.extend(self.lazy_append_list)
self.lazy_append_list.clear()
@ -240,6 +286,7 @@ class TaskMgr(threading.Thread):
return [False, e]
subtask.vnode_started = False
subtask.end_at = time.time()
subtask.running_time += subtask.end_at - subtask.start_at
self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu
self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu
return [True, '']
@ -261,7 +308,7 @@ class TaskMgr(threading.Thread):
def stop_subtask(self, subtask):
try:
self.logger.info('[task_processor] Stoping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
self.logger.info('[task_processor] Stopping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.stop_task(subtask.command_info)
@ -275,14 +322,14 @@ class TaskMgr(threading.Thread):
subtask.task_started = False
return [True, '']
@net_lock
@data_lock('network_lock')
def acquire_task_ips(self, task):
self.logger.info("[acquire_task_ips] user(%s) task(%s) net(%s)" % (task.username, task.id, str(task.task_base_ip)))
if task.task_base_ip == None:
task.task_base_ip = self.free_nets.pop(0)
return task.task_base_ip
@net_lock
@data_lock('network_lock')
def release_task_ips(self, task):
self.logger.info("[release_task_ips] user(%s) task(%s) net(%s)" % (task.username, task.id, str(task.task_base_ip)))
if task.task_base_ip == None:
@ -352,7 +399,8 @@ class TaskMgr(threading.Thread):
placed_workers.append(sub_task.worker)
[success, msg] = self.start_vnode(sub_task)
if not success:
sub_task.waiting_for_retry()
sub_task.waiting_for_retry("Fail to start vnode.")
self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start vnode.")
sub_task.worker = None
start_all_vnode_success = False
@ -371,7 +419,8 @@ class TaskMgr(threading.Thread):
if success:
sub_task.status = RUNNING
else:
sub_task.waiting_for_retry()
sub_task.waiting_for_retry("Failt to start task.")
self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start task.")
def clear_sub_tasks(self, sub_task_list):
for sub_task in sub_task_list:
@ -385,6 +434,10 @@ class TaskMgr(threading.Thread):
self.stop_vnode(sub_task)
#pass
@data_lock('stop_lock')
def lazy_stop_task(self, taskid):
self.lazy_stop_list.append(taskid)
def stop_remove_task(self, task):
if task is None:
return
@ -392,7 +445,6 @@ class TaskMgr(threading.Thread):
self.clear_sub_tasks(task.subtask_list)
self.release_task_ips(task)
self.remove_tasknet(task)
self.lazy_delete_list.append(task)
def check_task_completed(self, task):
if task.status == RUNNING or task.status == WAITING:
@ -400,11 +452,15 @@ class TaskMgr(threading.Thread):
if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING):
return False
self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list])))
if task.at_same_time and task.status == FAILED:
self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1)
else:
self.jobmgr.report(task.username,task.id,'finished')
self.stop_remove_task(task)
self.lazy_delete_list.append(task)
running_time, billing = task.get_billing()
self.logger.info('task %s running_time:%s billing:%d'%(task.id, str(running_time), billing))
running_time = math.ceil(running_time)
if task.status == FAILED:
self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1, running_time, billing)
else:
self.jobmgr.report(task.username,task.id,'finished',running_time=running_time,billing=billing)
return True
# this method is called when worker send heart-beat rpc request
@ -430,12 +486,12 @@ class TaskMgr(threading.Thread):
sub_task.status_reason = report.errmsg
if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT:
sub_task.waiting_for_retry()
sub_task.waiting_for_retry(report.errmsg)
self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg)
elif report.subTaskStatus == OUTPUTERROR:
sub_task.status = FAILED
if task.at_same_time:
task.status = FAILED
self.clear_sub_task(sub_task)
task.failed_reason = report.errmsg
elif report.subTaskStatus == COMPLETED:
self.clear_sub_task(sub_task)
@ -445,7 +501,7 @@ class TaskMgr(threading.Thread):
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
for task in self.task_queue:
if task in self.lazy_delete_list:
if task in self.lazy_delete_list or task.id in self.lazy_stop_list:
continue
self.logger.info('task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list])))
if self.check_task_completed(task):
@ -577,6 +633,7 @@ class TaskMgr(threading.Thread):
# save the task information into database
# called when jobmgr assign task to taskmgr
@data_lock('add_lock')
def add_task(self, username, taskid, json_task, task_priority=1):
# decode json string to object defined in grpc
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
@ -654,12 +711,12 @@ class TaskMgr(threading.Thread):
return True
@queue_lock
@data_lock('task_queue_lock')
def get_task_list(self):
return self.task_queue.copy()
@queue_lock
@data_lock('task_queue_lock')
def get_task(self, taskid):
for task in self.task_queue:
if task.id == taskid:

View File

@ -44,6 +44,7 @@ app.config['SQLALCHEMY_BINDS'] = {
'history': 'sqlite:///'+fsdir+'/global/sys/HistoryTable.db',
'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db',
'system': 'sqlite:///'+fsdir+'/global/sys/System.db',
'batch':'sqlite:///'+fsdir+'/global/sys/Batch.db?check_same_thread=False',
'login': 'sqlite:///'+fsdir+'/global/sys/Login.db'
}
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
@ -435,3 +436,90 @@ class Image(db.Model):
def __repr__(self):
return "{\"id\":\"%d\",\"imagename\":\"%s\",\"hasPrivate\":\"%s\",\"hasPublic\":\"%s\",\"ownername\":\"%s\",\"updatetime\":\"%s\",\"description\":\"%s\"}" % (self.id,self.imagename,str(self.hasPrivate),str(self.hasPublic),self.create_time.strftime("%Y-%m-%d %H:%M:%S"),self.ownername,self.description)
class Batchjob(db.Model):
__bind_key__ = 'batch'
id = db.Column(db.String(9), primary_key=True)
username = db.Column(db.String(10))
name = db.Column(db.String(30))
priority = db.Column(db.Integer)
status = db.Column(db.String(10))
failed_reason = db.Column(db.Text)
create_time = db.Column(db.DateTime)
end_time = db.Column(db.DateTime)
billing = db.Column(db.Integer)
tasks = db.relationship('Batchtask', backref='batchjob', lazy='dynamic')
def __init__(self,id,username,name,priority):
self.id = id
self.username = username
self.name = name
self.priority = priority
self.status = "pending"
self.failed_reason = ""
self.create_time = datetime.now()
self.end_time = None
self.billing = 0
def __repr__(self):
info = {}
info['job_id'] = self.id
info['username'] = self.username
info['job_name'] = self.name
info['priority'] = self.priority
info['status'] = self.status
info['failed_reason'] = self.failed_reason
info['create_time'] = self.create_time.strftime("%Y-%m-%d %H:%M:%S")
if self.end_time is None:
info['end_time'] = "------"
else:
info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S")
info['billing'] = self.billing
return json.dumps(info)
class Batchtask(db.Model):
__bind_key__ = 'batch'
id = db.Column(db.String(15), primary_key=True)
idx = db.Column(db.String(10))
jobid = db.Column(db.String(9), db.ForeignKey('batchjob.id'))
status = db.Column(db.String(15))
failed_reason = db.Column(db.Text)
start_time = db.Column(db.DateTime)
end_time = db.Column(db.DateTime)
running_time = db.Column(db.Integer)
billing = db.Column(db.Integer)
config = db.Column(db.Text)
tried_times = db.Column(db.Integer)
def __init__(self, id, idx, config):
self.id = id
self.idx = idx
self.status = "pending"
self.failed_reason = ""
self.start_time = None
self.end_time = None
self.running_time = 0
self.billing = 0
self.config = json.dumps(config)
self.tried_times = 0
def __repr__(self):
info = {}
info['id'] = self.id
info['idx'] = self.idx
info['jobid'] = self.jobid
info['status'] = self.status
info['failed_reason'] = self.failed_reason
if self.start_time is None:
info['start_time'] = "------"
else:
info['start_time'] = self.start_time.strftime("%Y-%m-%d %H:%M:%S")
if self.end_time is None:
info['end_time'] = "------"
else:
info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S")
info['running_time'] = self.running_time
info['billing'] = self.billing
info['config'] = json.loads(self.config)
info['tried_times'] = self.tried_times
return json.dumps(info)

View File

@ -430,6 +430,10 @@ class portcontrol(object):
ports_lock.release()
try:
subprocess.run(['iptables','-t','nat','-A','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
except subprocess.CalledProcessError as suberror:
return [False, "set port mapping failed : %s" % suberror.stdout.decode('utf-8')]
try:
subprocess.run(['iptables','-t','nat','-A','PREROUTING','-p','udp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
return [True, str(free_port)]
except subprocess.CalledProcessError as suberror:
return [False, "set port mapping failed : %s" % suberror.stdout.decode('utf-8')]
@ -447,6 +451,10 @@ class portcontrol(object):
subprocess.run(['iptables','-t','nat','-D','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
except subprocess.CalledProcessError as suberror:
return [False, "release port mapping failed : %s" % suberror.stdout.decode('utf-8')]
try:
subprocess.run(['iptables','-t','nat','-D','PREROUTING','-p','udp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True)
except subprocess.CalledProcessError as suberror:
return [False, "release port mapping failed : %s" % suberror.stdout.decode('utf-8')]
ports_lock.acquire()
free_ports[free_port] = True
allocated_ports[container_name].pop(container_port)

View File

@ -432,7 +432,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer):
self.add_msg(taskid,username,vnodeid,rpc_pb2.COMPLETED,token,"")
else:
logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(vnodeid),token))
self.add_msg(taskid,username,vnodeid,rpc_pb2.FAILED,token,"")
self.add_msg(taskid,username,vnodeid,rpc_pb2.FAILED,token,"Runtime Error. More information in stderr log.")
def add_msg(self,taskid,username,vnodeid,status,token,errmsg):
self.msgslock.acquire()

View File

@ -43,6 +43,14 @@
<div class="form-group"><label class="col-sm-2 control-label">Job Name</label>
<div class="col-sm-10"><input type="text" class="form-control" name="jobName" id="job_name" required></div>
</div>
<br/>
<div class="form-group"><label class="col-sm-2 control-label">Location</label>
<div class="col-sm-10"><select id="masterselector" class="form-control">
{% for master in masterips %}
<option value="{{master.split("@")[0]}}">{{master.split("@")[1]}}</option>
{% endfor %}
</select></div>
</div>
<div class="hr-line-dashed"></div>
<br/>
<div class="form-group"><label class="col-sm-2 control-label">Priority</label>
@ -64,7 +72,7 @@
<div class="form-group">
<div class="col-sm-4 col-sm-offset-2">
<button class="btn btn-primary" type="button" id="add_task" class="btn btn-box-tool" title="add a task">Add Task <i class="fa fa-plus"></i></button>
<button class="btn btn-primary" type="submit">Create</button>
<button class="btn btn-primary" type="submit">Create Job</button>
</div>
</div>
</div>
@ -96,6 +104,11 @@
<script type="text/javascript">
var task_number = 0;
var mapping_number = 0;
var images_text = "{{ images }}";
images_text = images_text.replace(/&#39;/g,"\"");
console.log(images_text);
var images_info = JSON.parse(images_text);
console.log(images_info);
$().ready(function() {
$("#form").validate();
});
@ -132,8 +145,69 @@
+'Remove</button></div>';
}
$("select#masterselector").change(function() {
var masterip=$(this).children('option:selected').val();
$("#form").attr("action","/batch_job/"+ masterip +"/add/");
var mastername=$(this).children('option:selected').html();
console.log(masterip);
var host = window.location.host;
var images = images_info;
for(var tnum = 1; tnum<=task_number; ++tnum)
{
var imagehtml =
"<thead>"
+"<tr>"
+"<th>ImageName</th>"
+"<th>Type</th>"
+"<th>Owner</th>"
+"<th>Size</th>"
+"<th>Description</th>"
+"<th>Choose</th>"
+"</tr>"
+"</thead>"
+"<tbody>"
+"<tr>"
+"<td>base</td>"
+"<td>public</td>"
+"<td>docklet</td>"
+"<td>--</td>"
+"<td>A base image for you</td>"
+'<td><div class="i-checks"><label><input type="radio" name="image_' + tnum + '" value="base_base_base" checked="checked"></label></div></td>'
+"</tr>";
for(var index in images[masterip].private) {
var image = images[masterip].private[index];
imagehtml +=
"<tr>"
+"<td>"+image.name+"</td>"
+"<td>private</td>"
+"<td>{{user}}</td>"
+"<td>"+image.size_format+"</td>"
+'<td><a href="/image/' + masterip + '/description/' + image.name + '_' + '{{user}}' + '_private/" target="_blank">' + image.description + '</a></td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + tnum + '" value="'+image.name+'_{{user}}_private"><label></div></td>'
+"</tr>";
}
for(var p_user in images[masterip].public) {
for(var index in images[masterip].public[p_user]) {
image=images[masterip].public[p_user][index];
imagehtml +=
"<tr>"
+"<td>"+image.name+"</td>"
+"<td>public</td>"
+"<td>" + p_user + "</td>"
+"<td>"+image.size_format+"</td>"
+'<td><a href="/image/' + masterip + '/description/' + image.name + "_" + p_user + '_public/" target="_blank">' + image.description + '</a></td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + tnum + '" value="'+image.name+'_{{p_user}}_public"><label></div></td>'
+"</tr>";
}
}
imagehtml += "</tbody>";
$("#imagetable"+tnum).html(imagehtml);
}
});
function addTask() {
task_number += 1;
var masterip=$("select#masterselector").children('option:selected').val();
mapping_number = 0;
var task_html = '';
task_html +=
@ -161,14 +235,14 @@
+'<div class="col-sm-3"><input type="number" class="form-control" name="diskSetting_' + task_number + '" id="diskSetting_' + task_number + '" value= 1024 min="128" max="10000" required/>'
+'</div>MB</div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">VNode Count</label>'
+'<label class="col-sm-2 control-label">VNode Number</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="vnodeCount_' + task_number + '" id="vnodeCount_' + task_number + '" value= 1 min="1" max="14" required/>'
+'</div>'
+'<label class="col-sm-2 control-label">Max Retry Count</label>'
+'<label class="col-sm-2 control-label">Max Retry Times</label>'
+'<div class="col-sm-3"><input type="number" class="form-control" name="retryCount_' + task_number + '" id="retryCount_' + task_number + '" value= 1 min="0" max="5" required/>'
+'</div></div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">Source Code Address</label>'
+'<label class="col-sm-2 control-label">Running Path</label>'
+'<div class="col-sm-3"><input type="text" class="form-control" name="srcAddr_' + task_number + '" id="srcAddr_' + task_number + '" value="/root" required/>'
+'</div>'
+'<label class="col-sm-2 control-label">Expire Time</label>'
@ -195,48 +269,58 @@
+'<label class="col-sm-2 control-label">Start at the Same Time</label>'
+'<div class="col-sm-3"><input type="checkbox" name="atSameTime_' + task_number + '" checked="checked"/>'
+'</div></div>'
+'<div class="form-group"><label class="col-sm-2 control-label">Image Choose</label>'
var images = images_info
task_html +=
'<div class="form-group"><label class="col-sm-2 control-label">Image Choose</label>'
+'<div class="col-sm-10">'
+'<table id="imagetable" class="table table-striped table-bordered table-hover table-image" >'
+'<thead>'
+'<tr>'
+'<th>ImageName</th>'
+'<th>Type</th>'
+'<th>Owner</th>'
+'<th>Description</th>'
+'<th>Choose</th>'
+'</tr>'
+'</thead>'
+'<tbody>'
+'<tr>'
+'<td>base</td>'
+'<td>public</td>'
+'<td>docklet</td>'
+'<td>A base image for you</td>'
+'<table id="imagetable' + task_number +'" class="table table-striped table-bordered table-hover table-image" >'
+"<thead>"
+"<tr>"
+"<th>ImageName</th>"
+"<th>Type</th>"
+"<th>Owner</th>"
+"<th>Size</th>"
+"<th>Description</th>"
+"<th>Choose</th>"
+"</tr>"
+"</thead>"
+"<tbody>"
+"<tr>"
+"<td>base</td>"
+"<td>public</td>"
+"<td>docklet</td>"
+"<td>--</td>"
+"<td>A base image for you</td>"
+'<td><div class="i-checks"><label><input type="radio" name="image_' + task_number + '" value="base_base_base" checked="checked"></label></div></td>'
+'</tr>'
+'{% for image in images['private'] %}'
+'<tr>'
+'<td>{{image['name']}}</td>'
+'<td>private</td>'
+'<td>{{user}}</td>'
+'<td><a href="/image/{{masterips[0].split("@")[1]}}/description/{{image['name']}}_{{user}}_private/" target="_blank">{{image['description']}}</a></td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + task_number + '" value="{{image['name']}}_{{user}}_private"></label></div></td>'
+'</tr>'
+'{% endfor %}'
+'{% for p_user,p_images in images['public'].items() %}'
+'{% for image in p_images %}'
+'<tr>'
+'<td>{{image['name']}}</td>'
+'<td>public</td>'
+'<td>{{p_user}}</td>'
+'<td><a href="/image/{{masterips[0].split("@")[1]}}/description/{{image['name']}}_{{p_user}}_public/" target="_blank">{{image['description']}}</a></td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + task_number + '" value="{{image['name']}}_{{p_user}}_public"></label></div></td>'
+'</tr>'
+'{% endfor %}'
+'{% endfor %}'
+'</tbody>'
+'</table>'
+"</tr>";
for(var index in images[masterip].private) {
var image = images[masterip].private[index];
task_html +=
"<tr>"
+"<td>"+image.name+"</td>"
+"<td>private</td>"
+"<td>{{user}}</td>"
+"<td>"+image.size_format+"</td>"
+'<td><a href="/image/' + masterip + '/description/' + image.name + '_' + '{{user}}' + '_private/" target="_blank">' + image.description + '</a></td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + task_number + '" value="'+image.name+'_{{user}}_private"><label></div></td>'
+"</tr>";
}
for(var p_user in images[masterip].public) {
for(var index in images[masterip].public[p_user]) {
image=images[masterip].public[p_user][index];
task_html +=
"<tr>"
+"<td>"+image.name+"</td>"
+"<td>public</td>"
+"<td>" + p_user + "</td>"
+"<td>"+image.size_format+"</td>"
+'<td><a href="/image/' + masterip + '/description/' + image.name + "_" + p_user + '_public/" target="_blank">' + image.description + '</a></td>'
+'<td><div class="i-checks"><label><input type="radio" name="image_' + task_number + '" value="'+image.name+'_{{p_user}}_public"><label></div></td>'
+"</tr>";
}
}
task_html +=
'</tbody></table>'
+'</div>'
+'</div>'
+'<div class="form-group">'

View File

@ -0,0 +1,238 @@
{% extends 'base_AdminLTE.html' %}
{% block title %}Docklet | Batch Job Info{% endblock %}
{% block panel_title %}Info for {{ jobinfo['job_id'] }}{% endblock %}
{% block css_src %}
<link href="//cdn.bootcss.com/datatables/1.10.11/css/dataTables.bootstrap.min.css" rel="stylesheet">
<link href="//cdn.bootcss.com/datatables/1.10.11/css/jquery.dataTables_themeroller.css" rel="stylesheet">
<link href="/static/dist/css/modalconfig.css" rel="stylesheet">
{% endblock %}
{% block panel_list %}
<ol class="breadcrumb">
<li>
<a href="/dashboard/"><i class="fa fa-dashboard"></i>Home</a>
</li>
<li>
<a href='/batch_jobs/'>Batch Job</a>
</li>
<li class='active'>
<strong>Info</strong>
</li>
</ol>
{% endblock %}
{% block content %}
<div class="row">
<div class="col-md-12">
<div class="box box-info">
<div class="box-header with-border">
<h3 class="box-title">Overview</h3>
<div class="box-tools pull-right">
<button type="button" class="btn btn-box-tool" data-widget="collapse"><i class="fa fa-minus"></i>
</button>
<button type="button" class="btn btn-box-tool" data-widget="remove"><i class="fa fa-times"></i></button>
</div>
</div>
<div class="box-body table-responsive">
<table class="table table-bordered">
<thead>
<tr>
<th>Job ID</th>
<th>Name</th>
<th>Priority</th>
<th>Status</th>
<th>Create Time</th>
<th>End Time</th>
<th>Billing</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{ jobinfo['job_id'] }}</td>
<td>{{ jobinfo['job_name'] }}</td>
<td>{{ jobinfo['priority'] }}</td>
<td>{{ jobinfo['status'] }}</td>
<td>{{ jobinfo['create_time'] }}</td>
<td>{{ jobinfo['end_time'] }}</td>
<td>{{ jobinfo['billing'] }} <img src='/static/img/bean.png' /></td>
</tr>
</tbody>
</table>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col-md-12">
<div class="box box-info">
<div class="box-header with-border">
<h3 class="box-title">Tasks Overview</h3>
<div class="box-tools pull-right">
<button type="button" class="btn btn-box-tool" data-widget="collapse"><i class="fa fa-minus"></i>
</button>
<button type="button" class="btn btn-box-tool" data-widget="remove"><i class="fa fa-times"></i></button>
</div>
</div>
<div class="box-body table-responsive">
<table width="100%" cellspacing="0" style="margin: 0 auto;" id="table-tasks" class="table table-striped table-bordered table-hover">
<thead>
<tr>
<th>Task Index</th>
<th>Status</th>
<th>Failed Reason(if fails)</th>
<th>Tried Times</th>
<th>Start Time</th>
<th>End Time</th>
<th>Total Running Time</th>
<th>Billing</th>
</tr>
</thead>
<tbody>
{% for task in jobinfo['tasks'] %}
<tr>
<td>{{ task['idx'] }}</td>
<td>{{ task['status'] }}</td>
<td>{{ task['failed_reason'] }}</td>
<td>{{ task['tried_times'] }}</td>
<td>{{ task['start_time'] }}</td>
<td>{{ task['end_time'] }}</td>
<td>{{ task['running_time'] }} s</td>
<td>{{ task['billing'] }} <img src='/static/img/bean.png' /></td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col-md-12">
<div class="box box-info">
<div class="box-header with-border">
<h3 class="box-title">Tasks Configs</h3>
<div class="box-tools pull-right">
<button type="button" class="btn btn-box-tool" data-widget="collapse"><i class="fa fa-minus"></i>
</button>
<button type="button" class="btn btn-box-tool" data-widget="remove"><i class="fa fa-times"></i></button>
</div>
</div>
<div class="box-body">
{% for task in jobinfo['tasks'] %}
<div class="panel panel-default" id="task_pannel_{{ task['idx'] }}">
<div class="panel-heading">
<h4 class="panel-title">
<a data-toggle="collapse" data-panel="#accordion" href="#collapse{{ task['idx'] }}">
Task #{{ task['idx'] }}
</a>
</h4>
</div>
<div id="collapse{{ task['idx'] }}" class="panel-collapse collapse in">
<div class="panel-body">
<div class="table-responsive">
<table class="table table-bordered table-hover">
<thead>
<tr>
<th>CPU Cores</th>
<th>Memory</th>
<th>GPU</th>
<th>Disk</th>
<th>VNode Number</th>
<th>Max Retry Times</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{ task['config']['cpuSetting'] }}</td>
<td>{{ task['config']['memorySetting'] }} MB</td>
<td>{{ task['config']['gpuSetting'] }}</td>
<td>{{ task['config']['diskSetting'] }} MB</td>
<td>{{ task['config']['vnodeCount'] }}</td>
<td>{{ task['config']['retryCount'] }}</td>
</tr>
</tbody>
<thead>
<tr>
<th>Running Path</th>
<th>Expire Time</th>
<th>Stdout Redirect Path</th>
<th>Stderr Redirect Path</th>
<th>Dependency</th>
<th>Command</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{ task['config']['srcAddr'] }}</td>
<td>{{ task['config']['expTime'] }} seconds</td>
<td>{{ task['config']['stdOutRedPth'] }}</td>
<td>{{ task['config']['stdErrRedPth'] }}</td>
<td>{{ task['config']['dependency'] }}</td>
<td>{{ task['config']['command'] }}</td>
</tr>
</tbody>
<thead>
<tr>
<th>Run on</th>
<th>Start at the Same Time</th>
<th>Image Name</th>
<th>Image Owner</th>
<th>Image Type</th>
</tr>
</thead>
<tbody>
<tr>
{% if task['config']['runon'] == 'all' %}
<td>all vnodes</td>
{% else %}
<td>master vnode</td>
{% endif %}
{% if 'atSameTime' in task['config'].keys() %}
<td>True</td>
{% else %}
<td>False</td>
{% endif %}
{% if task['config']['image'] == 'base_base_base' %}
<td>base</td>
<td>docklet</td>
<td>public</td>
{% else %}
<td>{{ task['config']['image'].split('_')[0] }}</td>
<td>{{ task['config']['image'].split('_')[1] }}</td>
<td>{{ task['config']['image'].split('_')[2] }}</td>
{% endif %}
</tr>
</tbody>
</table>
</div>
</div>
</div>
</div>
{% endfor %}
</div>
</div>
</div>
</div>
{% endblock %}
{% block script_src %}
<script src="//cdn.bootcss.com/datatables/1.10.11/js/jquery.dataTables.min.js"></script>
<script src="//cdn.bootcss.com/datatables/1.10.11/js/dataTables.bootstrap.min.js"></script>
<script type="text/javascript">
$(document).ready(function() {
$("#table-tasks").DataTable({"scrollX":true,"order":[[ 0, "asc" ]]});
});
</script>
{% endblock %}

View File

@ -3,6 +3,13 @@
{% block panel_title %}Batch Job{% endblock %}
{% block css_src %}
<link href="//cdn.bootcss.com/datatables/1.10.11/css/dataTables.bootstrap.min.css" rel="stylesheet">
<link href="//cdn.bootcss.com/datatables/1.10.11/css/jquery.dataTables_themeroller.css" rel="stylesheet">
<link href="/static/dist/css/modalconfig.css" rel="stylesheet">
{% endblock %}
{% block panel_list %}
<ol class="breadcrumb">
<li>
@ -31,8 +38,9 @@
<p>
<a href="/batch_job/create/"><button type="button" class="btn btn-primary btn-sm"><i class="fa fa-plus"></i> Create Batch Job</button></a>
</p>
{% for job_info in job_list %}
<div class="modal inmodal" id='OutputModal_{{ job_info['job_id'] }}' tabindex="-1" role="dialog" aria-hidden="true">
{% for master in masterips %}
{% for job_info in job_list[master.split('@')[0]] %}
<div class="modal inmodal" id='OutputModal_{{ master.split('@')[1] }}_{{ job_info['job_id'] }}' tabindex="-1" role="dialog" aria-hidden="true">
<div class="modal-dialog">
<div class="modal-content animated fadeIn">
<div class="modal-header">
@ -55,8 +63,8 @@
<tr>
<td>{{ taskid }}</td>
<td>{{ vnodeid }}</td>
<td><a class="btn btn-info btn-xs" href='/batch_job/output/{{ job_info["job_id"] }}/{{ taskid }}/{{ vnodeid }}/stdout/' target="_blank">Stdout</a></td>
<td><a class="btn btn-info btn-xs" href='/batch_job/output/{{ job_info["job_id"] }}/{{ taskid }}/{{ vnodeid }}/stderr/' target="_blank">Stderr</a></td>
<td><a class="btn btn-info btn-xs" href='/batch_job/output/{{ master.split('@')[0] }}/{{ job_info["job_id"] }}/{{ taskid }}/{{ vnodeid }}/stdout/' target="_blank">Stdout</a></td>
<td><a class="btn btn-info btn-xs" href='/batch_job/output/{{ master.split('@')[0] }}/{{ job_info["job_id"] }}/{{ taskid }}/{{ vnodeid }}/stderr/' target="_blank">Stderr</a></td>
</tr>
{% endfor %}
{% endfor %}
@ -70,33 +78,46 @@
</div>
</div>
{% endfor %}
{% endfor %}
<div class="table">
<table width="100%" cellspacing="0" style="margin: 0 auto;" class="table table-striped table-bordered table-hover table-batch">
<thead>
<tr>
<th>Location</th>
<th>ID</th>
<th>Name</th>
<th>Status</th>
<th>Tasks</th>
<th>Operations</th>
<th>Create Time</th>
<th>End Time</th>
<th>billing</th>
<th>Stdout and Stderr</th>
<th>Detailed Info</th>
</tr>
<thead>
<tbody>
{% for job_info in job_list %}
{% for master in masterips %}
{% for job_info in job_list[master.split('@')[0]] %}
<tr>
<td>{{ master.split('@')[1] }}</td>
<td>{{ job_info['job_id'] }}</td>
<td>{{ job_info['job_name'] }}</td>
<td>
{{ job_info['status'] }}
</td>
<td>Tasks</td>
<td><a href="/batch_job/{{masterips[0].split("@")[0]}}/stop/{{ job_info['job_id'] }}/"><button type="button" class="btn btn-xs btn-warning"> &nbsp;Stop&nbsp;&nbsp; </button></a></td>
{% if job_info['status'] == 'done' or job_info['status'] == 'failed' or job_info['status'] == 'stopping' or job_info['status'] == 'stopped'%}
<td><button type="button" class="btn btn-xs btn-default"> &nbsp;Stop&nbsp;&nbsp; </button></td>
{% else %}
<td><a href="/batch_job/{{master.split("@")[0]}}/stop/{{ job_info['job_id'] }}/"><button type="button" class="btn btn-xs btn-danger"> &nbsp;Stop&nbsp; </button></a></td>
{% endif %}
<td>{{ job_info['create_time'] }}</td>
<td><a role="button" class="btn btn-info btn-xs" id='{{ job_info['job_id'] }}_output' data-toggle="modal" data-target='#OutputModal_{{ job_info['job_id'] }}'>Get Output</a></td>
<td>{{ job_info['end_time'] }}</td>
<td>{{ job_info['billing'] }} <img src='/static/img/bean.png' /></td>
<td><a role="button" class="btn btn-info btn-xs" id='{{ master }}_{{ job_info['job_id'] }}_output' data-toggle="modal" data-target='#OutputModal_{{ master.split('@')[1] }}_{{ job_info['job_id'] }}'>Get Output</a></td>
<td><a href="/batch_job/{{master.split("@")[0]}}/info/{{ job_info['job_id'] }}/"><button type="button" class="btn btn-xs btn-info"> &nbsp;Info&nbsp; </button></a></td>
</tr>
{% endfor %}
{% endfor %}
</tbody>
</table>
</div>

View File

@ -152,14 +152,17 @@ def stop_batch_job(masterip,jobid):
stopBatchJobView.jobid = jobid
return stopBatchJobView().as_view()
@app.route("/batch_job/state/", methods=['GET'])
@app.route("/batch_job/<masterip>/info/<jobid>/", methods=['GET'])
@login_required
def state_batch_job():
return stateBatchJobView().as_view()
def info_batch_job(masterip,jobid):
infoBatchJobView.masterip = masterip
infoBatchJobView.jobid = jobid
return infoBatchJobView().as_view()
@app.route("/batch_job/output/<jobid>/<taskid>/<vnodeid>/<issue>/", methods=['GET'])
@app.route("/batch_job/output/<masterip>/<jobid>/<taskid>/<vnodeid>/<issue>/", methods=['GET'])
@login_required
def output_batch_job(jobid, taskid, vnodeid, issue):
def output_batch_job(masterip, jobid, taskid, vnodeid, issue):
outputBatchJobView.masterip = masterip
outputBatchJobView.jobid = jobid
outputBatchJobView.taskid = taskid
outputBatchJobView.vnodeid = vnodeid

View File

@ -3,6 +3,7 @@ from webViews.view import normalView
from webViews.log import logger
from webViews.checkname import checkname
from webViews.dockletrequest import dockletRequest
import json
class batchJobListView(normalView):
template_path = "batch/batch_list.html"
@ -10,9 +11,12 @@ class batchJobListView(normalView):
@classmethod
def get(self):
masterips = dockletRequest.post_to_all()
result = dockletRequest.post("/batch/job/list/",{},masterips[0].split("@")[0])
job_list = result.get("data")
logger.debug("job_list: %s" % job_list)
job_list = {}
for ipname in masterips:
ip = ipname.split("@")[0]
result = dockletRequest.post("/batch/job/list/",{},ip)
job_list[ip] = result.get("data")
logger.debug("job_list[%s]: %s" % (ip,job_list[ip]))
if True:
return self.render(self.template_path, masterips=masterips, job_list=job_list)
else:
@ -24,49 +28,63 @@ class createBatchJobView(normalView):
@classmethod
def get(self):
masterips = dockletRequest.post_to_all()
images = dockletRequest.post("/image/list/",{},masterips[0].split("@")[0]).get("images")
if True:
images = {}
for master in masterips:
images[master.split("@")[0]] = dockletRequest.post("/image/list/",{},master.split("@")[0]).get("images")
logger.info(images)
return self.render(self.template_path, masterips=masterips, images=images)
else:
return self.error()
class stateBatchJobView(normalView):
template_path = "batch/batch_state.html"
class infoBatchJobView(normalView):
template_path = "batch/batch_info.html"
error_path = "error.html"
masterip = ""
jobid = ""
@classmethod
def get(self):
if True:
return self.render(self.template_path)
data = {
'jobid':self.jobid
}
result = dockletRequest.post("/batch/job/info/",data,self.masterip)
data = result.get("data")
logger.info(str(data))
#logger.debug("job_list: %s" % job_list)
if result.get('success',"") == "true":
return self.render(self.template_path, masterip=self.masterip, jobinfo=data)
else:
return self.error()
return self.render(self.error_path, message = result.get('message'))
class addBatchJobView(normalView):
template_path = "batch/batch_list.html"
error_path = "error.html"
@classmethod
def post(self):
masterip = self.masterip
result = dockletRequest.post("/batch/job/add/", self.job_data, masterip)
#if result.get('success', None) == "true":
if result.get('success', None) == "true":
return redirect('/batch_jobs/')
#else:
#return self.error()
else:
return self.render(self.error_path, message = result.get('message'))
class stopBatchJobView(normalView):
template_path = "batch/batch_list.html"
error_path = "error.html"
@classmethod
def get(self):
masterip = self.masterip
data = {'jobid':self.jobid}
result = dockletRequest.post("/batch/job/stop/", data, masterip)
#if result.get('success', None) == "true":
if result.get('success', None) == "true":
return redirect('/batch_jobs/')
#else:
#return self.error()
else:
return self.render(self.error_path, message = result.get('message'))
class outputBatchJobView(normalView):
template_path = "batch/batch_output.html"
masterip = ""
jobid = ""
taskid = ""
vnodeid = ""
@ -74,18 +92,17 @@ class outputBatchJobView(normalView):
@classmethod
def get(self):
masterips = dockletRequest.post_to_all()
data = {
'jobid':self.jobid,
'taskid':self.taskid,
'vnodeid':self.vnodeid,
'issue':self.issue
}
result = dockletRequest.post("/batch/job/output/",data,masterips[0].split("@")[0])
result = dockletRequest.post("/batch/job/output/",data,self.masterip)
output = result.get("data")
#logger.debug("job_list: %s" % job_list)
if result.get('success',"") == "true":
return self.render(self.template_path, masterip=masterips[0].split("@")[0], jobid=self.jobid,
return self.render(self.template_path, masterip=self.masterip, jobid=self.jobid,
taskid=self.taskid, vnodeid=self.vnodeid, issue=self.issue, output=output)
else:
return self.error()