diff --git a/src/master/httprest.py b/src/master/httprest.py index 6d6c603..b17d0ff 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -792,6 +792,7 @@ def resetall_system(user, beans, form): @app.route("/batch/job/add/", methods=['POST']) @login_required +@beans_check def add_job(user,beans,form): global G_jobmgr job_data = form.to_dict() diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 7b7fcff..5049bba 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -1,33 +1,46 @@ -import time, threading, random, string, os, traceback +import time, threading, random, string, os, traceback, requests import master.monitor import subprocess,json from functools import wraps +from datetime import datetime from utils.log import initlogging, logger +from utils.model import db, Batchjob, Batchtask from utils import env +def db_commit(): + try: + db.session.commit() + except Exception as err: + db.session.rollback() + logger.error(traceback.format_exc()) + raise + class BatchJob(object): - def __init__(self, user, job_info): + def __init__(self, jobid, user, job_info): + self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority'])) self.user = user - self.raw_job_info = job_info - self.job_id = None + #self.raw_job_info = job_info + self.job_id = jobid self.job_name = job_info['jobName'] self.job_priority = int(job_info['jobPriority']) - self.status = 'pending' - self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) self.lock = threading.Lock() self.tasks = {} self.dependency_out = {} self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} #init self.tasks & self.dependency_out & self.tasks_cnt - logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time)) - raw_tasks = self.raw_job_info["tasks"] + logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.username, self.job_db.name, str(self.job_db.create_time))) + raw_tasks = job_info["tasks"] self.tasks_cnt['pending'] = len(raw_tasks.keys()) for task_idx in raw_tasks.keys(): task_info = raw_tasks[task_idx] + task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info) + self.job_db.tasks.append(task_db) self.tasks[task_idx] = {} + self.tasks[task_idx]['id'] = jobid+"_"+task_idx self.tasks[task_idx]['config'] = task_info + self.tasks[task_idx]['db'] = task_db self.tasks[task_idx]['status'] = 'pending' self.tasks[task_idx]['dependency'] = [] dependency = task_info['dependency'].strip().replace(' ', '').split(',') @@ -41,8 +54,11 @@ class BatchJob(object): self.dependency_out[d] = [] self.dependency_out[d].append(task_idx) + db.session.add(self.job_db) + db_commit() + self.log_status() - logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3))) + logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_db.id, json.dumps(self.dependency_out, indent=3))) def data_lock(f): @wraps(f) @@ -60,7 +76,7 @@ class BatchJob(object): # return the tasks without dependencies @data_lock def get_tasks_no_dependency(self,update_status=False): - logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id) + logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_db.id) ret_tasks = [] for task_idx in self.tasks.keys(): if (self.tasks[task_idx]['status'] == 'pending' and @@ -68,53 +84,73 @@ class BatchJob(object): if update_status: self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'scheduling' self.tasks[task_idx]['status'] = 'scheduling' - task_name = self.job_id + '_' + task_idx + task_name = self.tasks[task_idx]['db'].id ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority]) self.log_status() + db_commit() return ret_tasks @data_lock - def stop_tasks(self): + def stop_job(self): for task_idx in self.tasks.keys(): self.tasks[task_idx]['status'] = 'stopped' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'stopped' + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.status = 'stopped' + db_commit() # update status of this job based def _update_job_status(self): allcnt = len(self.tasks.keys()) if self.tasks_cnt['failed'] != 0: - self.status = 'failed' + self.job_db.status = 'failed' elif self.tasks_cnt['running'] != 0: - self.status = 'running' + self.job_db.status = 'running' elif self.tasks_cnt['finished'] == allcnt: - self.status = 'done' + self.job_db.status = 'done' else: - self.status = 'pending' + self.job_db.status = 'pending' + db_commit() # start run a task, update status @data_lock def update_task_running(self, task_idx): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id)) - old_status = self.tasks[task_idx]['status'].split('(')[0] + old_status = self.tasks[task_idx]['status'] self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'running' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'running' self.tasks_cnt['running'] += 1 + self.job_db = Batchjob.query.get(self.job_id) self._update_job_status() self.log_status() # a task has finished, update dependency and return tasks without dependencies @data_lock - def finish_task(self, task_idx): + def finish_task(self, task_idx, running_time, billing): if task_idx not in self.tasks.keys(): logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) return [] - logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id)) - old_status = self.tasks[task_idx]['status'].split('(')[0] + logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished(running_time=%d,billing=%d). Update dependency..." % (task_idx, self.job_id, running_time, billing)) + old_status = self.tasks[task_idx]['status'] self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'finished' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'finished' + self.tasks[task_idx]['db'].tried_times += 1 + self.tasks[task_idx]['db'].running_time = running_time + self.tasks[task_idx]['db'].billing = billing + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.billing += billing self.tasks_cnt['finished'] += 1 - self._update_job_status() + if task_idx not in self.dependency_out.keys(): + self._update_job_status() self.log_status() return [] ret_tasks = [] @@ -129,8 +165,11 @@ class BatchJob(object): self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[out_idx]['status'] = 'scheduling' + self.tasks[out_idx]['db'] = Batchtask.query.get(self.tasks[out_idx]['id']) + self.tasks[out_idx]['db'].status = 'scheduling' task_name = self.job_id + '_' + out_idx ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) + self._update_job_status() self.log_status() return ret_tasks @@ -138,24 +177,34 @@ class BatchJob(object): @data_lock def update_task_retrying(self, task_idx, reason, tried_times): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) - old_status = self.tasks[task_idx]['status'].split('(')[0] + old_status = self.tasks[task_idx]['status'] self.tasks_cnt[old_status] -= 1 self.tasks_cnt['retrying'] += 1 - self.tasks[task_idx]['status'] = 'retrying(%s)(%d times)' % (reason, int(tried_times)) + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'retrying' + self.tasks[task_idx]['db'].failed_reason = reason + self.tasks[task_idx]['db'].tried_times += 1 + self.tasks[task_idx]['status'] = 'retrying' + self.job_db = Batchjob.query.get(self.job_id) self._update_job_status() self.log_status() # update failed status of task @data_lock - def update_task_failed(self, task_idx, reason, tried_times): + def update_task_failed(self, task_idx, reason, tried_times, running_time, billing): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) - old_status = self.tasks[task_idx]['status'].split('(')[0] + old_status = self.tasks[task_idx]['status'] self.tasks_cnt[old_status] -= 1 self.tasks_cnt['failed'] += 1 - if reason == "OUTPUTERROR": - self.tasks[task_idx]['status'] = 'failed(OUTPUTERROR)' - else: - self.tasks[task_idx]['status'] = 'failed(%s)(%d times)' % (reason, int(tried_times)) + self.tasks[task_idx]['status'] = 'failed' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'failed' + self.tasks[task_idx]['db'].failed_reason = reason + self.tasks[task_idx]['db'].tried_times += 1 + self.tasks[task_idx]['db'].running_time = running_time + self.tasks[task_idx]['db'].billing = billing + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.billing += billing self._update_job_status() self.log_status() @@ -168,24 +217,54 @@ class BatchJob(object): task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency'] logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3))) logger.debug("BatchJob(id:%s) tasks_cnt: %s" % (self.job_id, self.tasks_cnt)) - logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status)) + logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.job_db.status)) class JobMgr(): # load job information from etcd # initial a job queue and job schedueler def __init__(self, taskmgr): + try: + Batchjob.query.all() + except: + db.create_all(bind='__all__') self.job_map = {} self.taskmgr = taskmgr self.fspath = env.getenv('FS_PREFIX') + self.lock = threading.Lock() + self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT')) + self.auth_key = env.getenv('AUTH_KEY') + + def charge_beans(self,username,billing): + data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key} + url = "/billing/beans/" + return requests.post(self.userpoint+url,data=data).json() + + def add_lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): + self.lock.acquire() + try: + result = f(self, *args, **kwargs) + except Exception as err: + self.lock.release() + raise err + self.lock.release() + return result + return new_f + + @add_lock + def create_job(self, user, job_info): + jobid = self.gen_jobid() + job = BatchJob(jobid, user, job_info) + return job # user: username # job_info: a json string # user submit a new job, add this job to queue and database def add_job(self, user, job_info): try: - job = BatchJob(user, job_info) - job.job_id = self.gen_jobid() + job = self.create_job(user, job_info) self.job_map[job.job_id] = job self.process_job(job) except ValueError as err: @@ -201,9 +280,11 @@ class JobMgr(): # jobid: the id of job def stop_job(self, user, job_id): logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user)) + if job_id not in self.job_map.keys(): + return [False,"Job id %s does not exists! Maybe it has been finished."] try: job = self.job_map[job_id] - if job.status == 'done' or job.status == 'failed': + if job.job_db.status == 'done' or job.job_db.status == 'failed': return [True,""] if job.user != user: raise Exception("Wrong User.") @@ -211,7 +292,7 @@ class JobMgr(): taskid = job_id + '_' + task_idx task = self.taskmgr.get_task(taskid) self.taskmgr.stop_remove_task(task) - job.status = 'stopped' + job.stop_job() except Exception as err: logger.error(traceback.format_exc()) #logger.error(err) @@ -221,24 +302,17 @@ class JobMgr(): # user: username # list a user's all job def list_jobs(self,user): + alljobs = Batchjob.query.filter_by(username=user) res = [] - for job_id in self.job_map.keys(): - job = self.job_map[job_id] - logger.debug('job_id: %s, user: %s' % (job_id, job.user)) - if job.user == user: - all_tasks = job.raw_job_info['tasks'] - tasks_vnodeCount = {} - for task in all_tasks.keys(): - tasks_vnodeCount[task] = int(all_tasks[task]['vnodeCount']) - res.append({ - 'job_name': job.job_name, - 'job_id': job.job_id, - 'status': job.status, - 'create_time': job.create_time, - 'tasks': list(all_tasks.keys()), - 'tasks_vnodeCount': tasks_vnodeCount - }) - res.sort(key=lambda x:x['create_time'],reverse=True) + for job in alljobs: + jobdata = json.loads(str(job)) + tasks = job.tasks + jobdata['tasks'] = [t.idx for t in tasks] + tasks_vnodeCount = {} + for t in tasks: + tasks_vnodeCount[t.idx] = int(json.loads(t.config)['vnodeCount']) + jobdata['tasks_vnodeCount'] = tasks_vnodeCount + res.append(jobdata) return res # user: username @@ -250,13 +324,14 @@ class JobMgr(): # check if a job exists def is_job_exist(self, job_id): - return job_id in self.job_map.keys() + return Batchjob.query.get(job_id) != None # generate a random job id def gen_jobid(self): - job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + datestr = datetime.now().strftime("%y%m%d") + job_id = datestr+''.join(random.sample(string.ascii_letters + string.digits, 3)) while self.is_job_exist(job_id): - job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + job_id = datestr+''.join(random.sample(string.ascii_letters + string.digits, 3)) return job_id # add tasks into taskmgr's queue @@ -280,24 +355,33 @@ class JobMgr(): # status: 'running', 'finished', 'retrying', 'failed' # reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR" # tried_times: how many times the task has been tried. - def report(self, user, task_name, status, reason="", tried_times=1): + def report(self, user, task_name, status, reason="", tried_times=1, running_time=0, billing=0): split_task_name = task_name.split('_') if len(split_task_name) != 2: - logger.error("Illegal task_name(%s) report from taskmgr" % task_name) + logger.error("[jobmgr report]Illegal task_name(%s) report from taskmgr" % task_name) return job_id, task_idx = split_task_name + if job_id not in self.job_map.keys(): + logger.error("[jobmgr report]jobid(%s) does not exist. task_name(%s)" % (job_id,task_name)) + return job = self.job_map[job_id] if status == "running": + #logger.debug(str(job.job_db)) job.update_task_running(task_idx) + #logger.debug(str(job.job_db)) elif status == "finished": - next_tasks = job.finish_task(task_idx) - if len(next_tasks) == 0: - return + #logger.debug(str(job.job_db)) + self.charge_beans(user, billing) + next_tasks = job.finish_task(task_idx, running_time, billing) ret = self.add_task_taskmgr(user, next_tasks) + #logger.debug(str(job.job_db)) elif status == "retrying": job.update_task_retrying(task_idx, reason, tried_times) elif status == "failed": - job.update_task_failed(task_idx, reason, tried_times) + self.charge_beans(user, billing) + job.update_task_failed(task_idx, reason, tried_times, running_time, billing) + if job.job_db.status == 'done' or job.job_db.status == 'failed': + del self.job_map[job_id] # Get Batch job stdout or stderr from its file def get_output(self, username, jobid, taskid, vnodeid, issue): diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index d320a67..3ccbd7b 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -3,7 +3,7 @@ import time import string import os import random, copy, subprocess -import json +import json, math from functools import wraps # must import logger after initlogging, ugly @@ -29,6 +29,7 @@ class Task(): self.id = task_id self.username = username self.status = WAITING + self.failed_reason = "" # if all the vnodes must be started at the same time self.at_same_time = at_same_time # priority the bigger the better @@ -46,6 +47,26 @@ class Task(): max_retry_count = task_info['max_retry_count'] ) for (index, task_info) in enumerate(task_infos)] + def get_billing(self): + billing_beans = 0 + running_time = 0 + cpu_price = 1 / 3600.0 # /core*s + mem_price = 1 / 3600.0 # /GB*s + disk_price = 1 / 3600.0 # /GB*s + gpu_price = 100 / 3600.0 # /core*s + for subtask in self.subtask_list: + tmp_time = subtask.running_time + cpu_beans = subtask.vnode_info.vnode.instance.cpu * tmp_time * cpu_price + mem_beans = subtask.vnode_info.vnode.instance.memory / 1024.0 * tmp_time * mem_price + disk_beans = subtask.vnode_info.vnode.instance.disk / 1024.0 * tmp_time * disk_price + gpu_beans = subtask.vnode_info.vnode.instance.gpu * tmp_time * gpu_price + logger.info("subtask:%s running_time=%f beans for: cpu=%f mem_beans=%f disk_beans=%f gpu_beans=%f" + %(self.id, tmp_time, cpu_beans, mem_beans, disk_beans, gpu_beans )) + beans = math.ceil(cpu_beans + mem_beans + disk_beans + gpu_beans) + running_time += tmp_time + billing_beans += beans + return running_time, billing_beans + def __lt__(self, other): return self.priority < other.priority @@ -87,16 +108,18 @@ class SubTask(): self.task_started = False self.start_at = 0 self.end_at = 0 + self.running_time = 0 self.status = WAITING self.status_reason = '' self.try_count = 0 self.worker = None - def waiting_for_retry(self): + def waiting_for_retry(self,reason=""): self.try_count += 1 self.status = WAITING if self.try_count <= self.max_retry_count else FAILED - if self.status == FAILED and self.root_task.at_same_time: + if self.status == FAILED: self.root_task.status = FAILED + self.failed_reason = reason class TaskReporter(MasterServicer): @@ -240,6 +263,7 @@ class TaskMgr(threading.Thread): return [False, e] subtask.vnode_started = False subtask.end_at = time.time() + subtask.running_time += subtask.end_at - subtask.start_at self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu return [True, ''] @@ -352,7 +376,8 @@ class TaskMgr(threading.Thread): placed_workers.append(sub_task.worker) [success, msg] = self.start_vnode(sub_task) if not success: - sub_task.waiting_for_retry() + sub_task.waiting_for_retry("Fail to start vnode.") + self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start vnode.") sub_task.worker = None start_all_vnode_success = False @@ -371,7 +396,8 @@ class TaskMgr(threading.Thread): if success: sub_task.status = RUNNING else: - sub_task.waiting_for_retry() + sub_task.waiting_for_retry("Failt to start task.") + self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start task.") def clear_sub_tasks(self, sub_task_list): for sub_task in sub_task_list: @@ -400,11 +426,14 @@ class TaskMgr(threading.Thread): if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING): return False self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list]))) - if task.at_same_time and task.status == FAILED: - self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1) - else: - self.jobmgr.report(task.username,task.id,'finished') self.stop_remove_task(task) + running_time, billing = task.get_billing() + self.logger.info('task %s running_time:%s billing:%d'%(task.id, str(running_time), billing)) + running_time = math.ceil(running_time) + if task.status == FAILED: + self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1, running_time, billing) + else: + self.jobmgr.report(task.username,task.id,'finished',running_time=running_time,billing=billing) return True # this method is called when worker send heart-beat rpc request @@ -430,12 +459,12 @@ class TaskMgr(threading.Thread): sub_task.status_reason = report.errmsg if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: - sub_task.waiting_for_retry() + sub_task.waiting_for_retry(report.errmsg) + self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) elif report.subTaskStatus == OUTPUTERROR: sub_task.status = FAILED - if task.at_same_time: - task.status = FAILED - self.clear_sub_task(sub_task) + task.status = FAILED + task.failed_reason = report.errmsg elif report.subTaskStatus == COMPLETED: self.clear_sub_task(sub_task) diff --git a/src/utils/model.py b/src/utils/model.py index 37ef56c..098d95c 100755 --- a/src/utils/model.py +++ b/src/utils/model.py @@ -44,6 +44,7 @@ app.config['SQLALCHEMY_BINDS'] = { 'history': 'sqlite:///'+fsdir+'/global/sys/HistoryTable.db', 'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db', 'system': 'sqlite:///'+fsdir+'/global/sys/System.db', + 'batch':'sqlite:///'+fsdir+'/global/sys/Batch.db', 'login': 'sqlite:///'+fsdir+'/global/sys/Login.db' } app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True @@ -435,3 +436,90 @@ class Image(db.Model): def __repr__(self): return "{\"id\":\"%d\",\"imagename\":\"%s\",\"hasPrivate\":\"%s\",\"hasPublic\":\"%s\",\"ownername\":\"%s\",\"updatetime\":\"%s\",\"description\":\"%s\"}" % (self.id,self.imagename,str(self.hasPrivate),str(self.hasPublic),self.create_time.strftime("%Y-%m-%d %H:%M:%S"),self.ownername,self.description) + +class Batchjob(db.Model): + __bind_key__ = 'batch' + id = db.Column(db.String(9), primary_key=True) + username = db.Column(db.String(10)) + name = db.Column(db.String(30)) + priority = db.Column(db.Integer) + status = db.Column(db.String(10)) + failed_reason = db.Column(db.Text) + create_time = db.Column(db.DateTime) + end_time = db.Column(db.DateTime) + billing = db.Column(db.Integer) + tasks = db.relationship('Batchtask', backref='batchjob', lazy='dynamic') + + def __init__(self,id,username,name,priority): + self.id = id + self.username = username + self.name = name + self.priority = priority + self.status = "pending" + self.failed_reason = "" + self.create_time = datetime.now() + self.end_time = None + self.billing = 0 + + def __repr__(self): + info = {} + info['job_id'] = self.id + info['username'] = self.username + info['job_name'] = self.name + info['priority'] = self.priority + info['status'] = self.status + info['failed_reason'] = self.failed_reason + info['create_time'] = self.create_time.strftime("%Y-%m-%d %H:%M:%S") + if self.end_time is None: + info['end_time'] = "------" + else: + info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S") + info['billing'] = self.billing + return json.dumps(info) + +class Batchtask(db.Model): + __bind_key__ = 'batch' + id = db.Column(db.String(15), primary_key=True) + idx = db.Column(db.String(10)) + jobid = db.Column(db.String(9), db.ForeignKey('batchjob.id')) + status = db.Column(db.String(15)) + failed_reason = db.Column(db.Text) + start_time = db.Column(db.DateTime) + end_time = db.Column(db.DateTime) + running_time = db.Column(db.Integer) + billing = db.Column(db.Integer) + config = db.Column(db.Text) + tried_times = db.Column(db.Integer) + + def __init__(self, id, idx, config): + self.id = id + self.idx = idx + self.status = "pending" + self.failed_reason = "" + self.start_time = None + self.end_time = None + self.running_time = 0 + self.billing = 0 + self.config = json.dumps(config) + self.tried_times = 0 + + def __repr__(self): + info = {} + info['id'] = self.id + info['idx'] = self.idx + info['jobid'] = self.jobid + info['status'] = self.status + info['failed_reason'] = self.failed_reason + if self.start_time is None: + info['start_time'] = "------" + else: + info['start_time'] = self.start_time.strftime("%Y-%m-%d %H:%M:%S") + if self.end_time is None: + info['end_time'] = "------" + else: + info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S") + info['running_time'] = self.running_time + info['billing'] = self.billing + info['config'] = json.loads(self.config) + info['tried_times'] = self.tried_times + return json.dumps(info) diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index be50d3b..5f77c87 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -147,6 +147,7 @@ $("select#masterselector").change(function() { var masterip=$(this).children('option:selected').val(); + $("#form").attr("action","/batch_job/"+ masterip +"/add/"); var mastername=$(this).children('option:selected').html(); console.log(masterip); var host = window.location.host; diff --git a/web/templates/batch/batch_list.html b/web/templates/batch/batch_list.html index eba03be..56ff385 100644 --- a/web/templates/batch/batch_list.html +++ b/web/templates/batch/batch_list.html @@ -33,7 +33,7 @@
{% for master in masterips %} {% for job_info in job_list[master.split('@')[0]] %} -