This commit is contained in:
Firmlyzhu 2019-03-29 15:11:08 +08:00
commit a0db49dee9
6 changed files with 279 additions and 76 deletions

View File

@ -792,6 +792,7 @@ def resetall_system(user, beans, form):
@app.route("/batch/job/add/", methods=['POST']) @app.route("/batch/job/add/", methods=['POST'])
@login_required @login_required
@beans_check
def add_job(user,beans,form): def add_job(user,beans,form):
global G_jobmgr global G_jobmgr
job_data = form.to_dict() job_data = form.to_dict()

View File

@ -1,33 +1,46 @@
import time, threading, random, string, os, traceback import time, threading, random, string, os, traceback, requests
import master.monitor import master.monitor
import subprocess,json import subprocess,json
from functools import wraps from functools import wraps
from datetime import datetime
from utils.log import initlogging, logger from utils.log import initlogging, logger
from utils.model import db, Batchjob, Batchtask
from utils import env from utils import env
def db_commit():
try:
db.session.commit()
except Exception as err:
db.session.rollback()
logger.error(traceback.format_exc())
raise
class BatchJob(object): class BatchJob(object):
def __init__(self, user, job_info): def __init__(self, jobid, user, job_info):
self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority']))
self.user = user self.user = user
self.raw_job_info = job_info #self.raw_job_info = job_info
self.job_id = None self.job_id = jobid
self.job_name = job_info['jobName'] self.job_name = job_info['jobName']
self.job_priority = int(job_info['jobPriority']) self.job_priority = int(job_info['jobPriority'])
self.status = 'pending'
self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
self.lock = threading.Lock() self.lock = threading.Lock()
self.tasks = {} self.tasks = {}
self.dependency_out = {} self.dependency_out = {}
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0}
#init self.tasks & self.dependency_out & self.tasks_cnt #init self.tasks & self.dependency_out & self.tasks_cnt
logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time)) logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.username, self.job_db.name, str(self.job_db.create_time)))
raw_tasks = self.raw_job_info["tasks"] raw_tasks = job_info["tasks"]
self.tasks_cnt['pending'] = len(raw_tasks.keys()) self.tasks_cnt['pending'] = len(raw_tasks.keys())
for task_idx in raw_tasks.keys(): for task_idx in raw_tasks.keys():
task_info = raw_tasks[task_idx] task_info = raw_tasks[task_idx]
task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info)
self.job_db.tasks.append(task_db)
self.tasks[task_idx] = {} self.tasks[task_idx] = {}
self.tasks[task_idx]['id'] = jobid+"_"+task_idx
self.tasks[task_idx]['config'] = task_info self.tasks[task_idx]['config'] = task_info
self.tasks[task_idx]['db'] = task_db
self.tasks[task_idx]['status'] = 'pending' self.tasks[task_idx]['status'] = 'pending'
self.tasks[task_idx]['dependency'] = [] self.tasks[task_idx]['dependency'] = []
dependency = task_info['dependency'].strip().replace(' ', '').split(',') dependency = task_info['dependency'].strip().replace(' ', '').split(',')
@ -41,8 +54,11 @@ class BatchJob(object):
self.dependency_out[d] = [] self.dependency_out[d] = []
self.dependency_out[d].append(task_idx) self.dependency_out[d].append(task_idx)
db.session.add(self.job_db)
db_commit()
self.log_status() self.log_status()
logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3))) logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_db.id, json.dumps(self.dependency_out, indent=3)))
def data_lock(f): def data_lock(f):
@wraps(f) @wraps(f)
@ -60,7 +76,7 @@ class BatchJob(object):
# return the tasks without dependencies # return the tasks without dependencies
@data_lock @data_lock
def get_tasks_no_dependency(self,update_status=False): def get_tasks_no_dependency(self,update_status=False):
logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id) logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_db.id)
ret_tasks = [] ret_tasks = []
for task_idx in self.tasks.keys(): for task_idx in self.tasks.keys():
if (self.tasks[task_idx]['status'] == 'pending' and if (self.tasks[task_idx]['status'] == 'pending' and
@ -68,53 +84,73 @@ class BatchJob(object):
if update_status: if update_status:
self.tasks_cnt['pending'] -= 1 self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1 self.tasks_cnt['scheduling'] += 1
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'scheduling'
self.tasks[task_idx]['status'] = 'scheduling' self.tasks[task_idx]['status'] = 'scheduling'
task_name = self.job_id + '_' + task_idx task_name = self.tasks[task_idx]['db'].id
ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority]) ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority])
self.log_status() self.log_status()
db_commit()
return ret_tasks return ret_tasks
@data_lock @data_lock
def stop_tasks(self): def stop_job(self):
for task_idx in self.tasks.keys(): for task_idx in self.tasks.keys():
self.tasks[task_idx]['status'] = 'stopped' self.tasks[task_idx]['status'] = 'stopped'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'stopped'
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.status = 'stopped'
db_commit()
# update status of this job based # update status of this job based
def _update_job_status(self): def _update_job_status(self):
allcnt = len(self.tasks.keys()) allcnt = len(self.tasks.keys())
if self.tasks_cnt['failed'] != 0: if self.tasks_cnt['failed'] != 0:
self.status = 'failed' self.job_db.status = 'failed'
elif self.tasks_cnt['running'] != 0: elif self.tasks_cnt['running'] != 0:
self.status = 'running' self.job_db.status = 'running'
elif self.tasks_cnt['finished'] == allcnt: elif self.tasks_cnt['finished'] == allcnt:
self.status = 'done' self.job_db.status = 'done'
else: else:
self.status = 'pending' self.job_db.status = 'pending'
db_commit()
# start run a task, update status # start run a task, update status
@data_lock @data_lock
def update_task_running(self, task_idx): def update_task_running(self, task_idx):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id)) logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status'].split('(')[0] old_status = self.tasks[task_idx]['status']
self.tasks_cnt[old_status] -= 1 self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'running' self.tasks[task_idx]['status'] = 'running'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'running'
self.tasks_cnt['running'] += 1 self.tasks_cnt['running'] += 1
self.job_db = Batchjob.query.get(self.job_id)
self._update_job_status() self._update_job_status()
self.log_status() self.log_status()
# a task has finished, update dependency and return tasks without dependencies # a task has finished, update dependency and return tasks without dependencies
@data_lock @data_lock
def finish_task(self, task_idx): def finish_task(self, task_idx, running_time, billing):
if task_idx not in self.tasks.keys(): if task_idx not in self.tasks.keys():
logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id))
return [] return []
logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id)) logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished(running_time=%d,billing=%d). Update dependency..." % (task_idx, self.job_id, running_time, billing))
old_status = self.tasks[task_idx]['status'].split('(')[0] old_status = self.tasks[task_idx]['status']
self.tasks_cnt[old_status] -= 1 self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'finished' self.tasks[task_idx]['status'] = 'finished'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'finished'
self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].billing = billing
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing
self.tasks_cnt['finished'] += 1 self.tasks_cnt['finished'] += 1
self._update_job_status()
if task_idx not in self.dependency_out.keys(): if task_idx not in self.dependency_out.keys():
self._update_job_status()
self.log_status() self.log_status()
return [] return []
ret_tasks = [] ret_tasks = []
@ -129,8 +165,11 @@ class BatchJob(object):
self.tasks_cnt['pending'] -= 1 self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1 self.tasks_cnt['scheduling'] += 1
self.tasks[out_idx]['status'] = 'scheduling' self.tasks[out_idx]['status'] = 'scheduling'
self.tasks[out_idx]['db'] = Batchtask.query.get(self.tasks[out_idx]['id'])
self.tasks[out_idx]['db'].status = 'scheduling'
task_name = self.job_id + '_' + out_idx task_name = self.job_id + '_' + out_idx
ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority])
self._update_job_status()
self.log_status() self.log_status()
return ret_tasks return ret_tasks
@ -138,24 +177,34 @@ class BatchJob(object):
@data_lock @data_lock
def update_task_retrying(self, task_idx, reason, tried_times): def update_task_retrying(self, task_idx, reason, tried_times):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
old_status = self.tasks[task_idx]['status'].split('(')[0] old_status = self.tasks[task_idx]['status']
self.tasks_cnt[old_status] -= 1 self.tasks_cnt[old_status] -= 1
self.tasks_cnt['retrying'] += 1 self.tasks_cnt['retrying'] += 1
self.tasks[task_idx]['status'] = 'retrying(%s)(%d times)' % (reason, int(tried_times)) self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'retrying'
self.tasks[task_idx]['db'].failed_reason = reason
self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['status'] = 'retrying'
self.job_db = Batchjob.query.get(self.job_id)
self._update_job_status() self._update_job_status()
self.log_status() self.log_status()
# update failed status of task # update failed status of task
@data_lock @data_lock
def update_task_failed(self, task_idx, reason, tried_times): def update_task_failed(self, task_idx, reason, tried_times, running_time, billing):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
old_status = self.tasks[task_idx]['status'].split('(')[0] old_status = self.tasks[task_idx]['status']
self.tasks_cnt[old_status] -= 1 self.tasks_cnt[old_status] -= 1
self.tasks_cnt['failed'] += 1 self.tasks_cnt['failed'] += 1
if reason == "OUTPUTERROR": self.tasks[task_idx]['status'] = 'failed'
self.tasks[task_idx]['status'] = 'failed(OUTPUTERROR)' self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
else: self.tasks[task_idx]['db'].status = 'failed'
self.tasks[task_idx]['status'] = 'failed(%s)(%d times)' % (reason, int(tried_times)) self.tasks[task_idx]['db'].failed_reason = reason
self.tasks[task_idx]['db'].tried_times += 1
self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].billing = billing
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing
self._update_job_status() self._update_job_status()
self.log_status() self.log_status()
@ -168,24 +217,54 @@ class BatchJob(object):
task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency'] task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency']
logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3))) logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3)))
logger.debug("BatchJob(id:%s) tasks_cnt: %s" % (self.job_id, self.tasks_cnt)) logger.debug("BatchJob(id:%s) tasks_cnt: %s" % (self.job_id, self.tasks_cnt))
logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status)) logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.job_db.status))
class JobMgr(): class JobMgr():
# load job information from etcd # load job information from etcd
# initial a job queue and job schedueler # initial a job queue and job schedueler
def __init__(self, taskmgr): def __init__(self, taskmgr):
try:
Batchjob.query.all()
except:
db.create_all(bind='__all__')
self.job_map = {} self.job_map = {}
self.taskmgr = taskmgr self.taskmgr = taskmgr
self.fspath = env.getenv('FS_PREFIX') self.fspath = env.getenv('FS_PREFIX')
self.lock = threading.Lock()
self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT'))
self.auth_key = env.getenv('AUTH_KEY')
def charge_beans(self,username,billing):
data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key}
url = "/billing/beans/"
return requests.post(self.userpoint+url,data=data).json()
def add_lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
self.lock.acquire()
try:
result = f(self, *args, **kwargs)
except Exception as err:
self.lock.release()
raise err
self.lock.release()
return result
return new_f
@add_lock
def create_job(self, user, job_info):
jobid = self.gen_jobid()
job = BatchJob(jobid, user, job_info)
return job
# user: username # user: username
# job_info: a json string # job_info: a json string
# user submit a new job, add this job to queue and database # user submit a new job, add this job to queue and database
def add_job(self, user, job_info): def add_job(self, user, job_info):
try: try:
job = BatchJob(user, job_info) job = self.create_job(user, job_info)
job.job_id = self.gen_jobid()
self.job_map[job.job_id] = job self.job_map[job.job_id] = job
self.process_job(job) self.process_job(job)
except ValueError as err: except ValueError as err:
@ -201,9 +280,11 @@ class JobMgr():
# jobid: the id of job # jobid: the id of job
def stop_job(self, user, job_id): def stop_job(self, user, job_id):
logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user)) logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user))
if job_id not in self.job_map.keys():
return [False,"Job id %s does not exists! Maybe it has been finished."]
try: try:
job = self.job_map[job_id] job = self.job_map[job_id]
if job.status == 'done' or job.status == 'failed': if job.job_db.status == 'done' or job.job_db.status == 'failed':
return [True,""] return [True,""]
if job.user != user: if job.user != user:
raise Exception("Wrong User.") raise Exception("Wrong User.")
@ -211,7 +292,7 @@ class JobMgr():
taskid = job_id + '_' + task_idx taskid = job_id + '_' + task_idx
task = self.taskmgr.get_task(taskid) task = self.taskmgr.get_task(taskid)
self.taskmgr.stop_remove_task(task) self.taskmgr.stop_remove_task(task)
job.status = 'stopped' job.stop_job()
except Exception as err: except Exception as err:
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
#logger.error(err) #logger.error(err)
@ -221,24 +302,17 @@ class JobMgr():
# user: username # user: username
# list a user's all job # list a user's all job
def list_jobs(self,user): def list_jobs(self,user):
alljobs = Batchjob.query.filter_by(username=user)
res = [] res = []
for job_id in self.job_map.keys(): for job in alljobs:
job = self.job_map[job_id] jobdata = json.loads(str(job))
logger.debug('job_id: %s, user: %s' % (job_id, job.user)) tasks = job.tasks
if job.user == user: jobdata['tasks'] = [t.idx for t in tasks]
all_tasks = job.raw_job_info['tasks'] tasks_vnodeCount = {}
tasks_vnodeCount = {} for t in tasks:
for task in all_tasks.keys(): tasks_vnodeCount[t.idx] = int(json.loads(t.config)['vnodeCount'])
tasks_vnodeCount[task] = int(all_tasks[task]['vnodeCount']) jobdata['tasks_vnodeCount'] = tasks_vnodeCount
res.append({ res.append(jobdata)
'job_name': job.job_name,
'job_id': job.job_id,
'status': job.status,
'create_time': job.create_time,
'tasks': list(all_tasks.keys()),
'tasks_vnodeCount': tasks_vnodeCount
})
res.sort(key=lambda x:x['create_time'],reverse=True)
return res return res
# user: username # user: username
@ -250,13 +324,14 @@ class JobMgr():
# check if a job exists # check if a job exists
def is_job_exist(self, job_id): def is_job_exist(self, job_id):
return job_id in self.job_map.keys() return Batchjob.query.get(job_id) != None
# generate a random job id # generate a random job id
def gen_jobid(self): def gen_jobid(self):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) datestr = datetime.now().strftime("%y%m%d")
job_id = datestr+''.join(random.sample(string.ascii_letters + string.digits, 3))
while self.is_job_exist(job_id): while self.is_job_exist(job_id):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) job_id = datestr+''.join(random.sample(string.ascii_letters + string.digits, 3))
return job_id return job_id
# add tasks into taskmgr's queue # add tasks into taskmgr's queue
@ -280,24 +355,33 @@ class JobMgr():
# status: 'running', 'finished', 'retrying', 'failed' # status: 'running', 'finished', 'retrying', 'failed'
# reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR" # reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR"
# tried_times: how many times the task has been tried. # tried_times: how many times the task has been tried.
def report(self, user, task_name, status, reason="", tried_times=1): def report(self, user, task_name, status, reason="", tried_times=1, running_time=0, billing=0):
split_task_name = task_name.split('_') split_task_name = task_name.split('_')
if len(split_task_name) != 2: if len(split_task_name) != 2:
logger.error("Illegal task_name(%s) report from taskmgr" % task_name) logger.error("[jobmgr report]Illegal task_name(%s) report from taskmgr" % task_name)
return return
job_id, task_idx = split_task_name job_id, task_idx = split_task_name
if job_id not in self.job_map.keys():
logger.error("[jobmgr report]jobid(%s) does not exist. task_name(%s)" % (job_id,task_name))
return
job = self.job_map[job_id] job = self.job_map[job_id]
if status == "running": if status == "running":
#logger.debug(str(job.job_db))
job.update_task_running(task_idx) job.update_task_running(task_idx)
#logger.debug(str(job.job_db))
elif status == "finished": elif status == "finished":
next_tasks = job.finish_task(task_idx) #logger.debug(str(job.job_db))
if len(next_tasks) == 0: self.charge_beans(user, billing)
return next_tasks = job.finish_task(task_idx, running_time, billing)
ret = self.add_task_taskmgr(user, next_tasks) ret = self.add_task_taskmgr(user, next_tasks)
#logger.debug(str(job.job_db))
elif status == "retrying": elif status == "retrying":
job.update_task_retrying(task_idx, reason, tried_times) job.update_task_retrying(task_idx, reason, tried_times)
elif status == "failed": elif status == "failed":
job.update_task_failed(task_idx, reason, tried_times) self.charge_beans(user, billing)
job.update_task_failed(task_idx, reason, tried_times, running_time, billing)
if job.job_db.status == 'done' or job.job_db.status == 'failed':
del self.job_map[job_id]
# Get Batch job stdout or stderr from its file # Get Batch job stdout or stderr from its file
def get_output(self, username, jobid, taskid, vnodeid, issue): def get_output(self, username, jobid, taskid, vnodeid, issue):

View File

@ -3,7 +3,7 @@ import time
import string import string
import os import os
import random, copy, subprocess import random, copy, subprocess
import json import json, math
from functools import wraps from functools import wraps
# must import logger after initlogging, ugly # must import logger after initlogging, ugly
@ -29,6 +29,7 @@ class Task():
self.id = task_id self.id = task_id
self.username = username self.username = username
self.status = WAITING self.status = WAITING
self.failed_reason = ""
# if all the vnodes must be started at the same time # if all the vnodes must be started at the same time
self.at_same_time = at_same_time self.at_same_time = at_same_time
# priority the bigger the better # priority the bigger the better
@ -46,6 +47,26 @@ class Task():
max_retry_count = task_info['max_retry_count'] max_retry_count = task_info['max_retry_count']
) for (index, task_info) in enumerate(task_infos)] ) for (index, task_info) in enumerate(task_infos)]
def get_billing(self):
billing_beans = 0
running_time = 0
cpu_price = 1 / 3600.0 # /core*s
mem_price = 1 / 3600.0 # /GB*s
disk_price = 1 / 3600.0 # /GB*s
gpu_price = 100 / 3600.0 # /core*s
for subtask in self.subtask_list:
tmp_time = subtask.running_time
cpu_beans = subtask.vnode_info.vnode.instance.cpu * tmp_time * cpu_price
mem_beans = subtask.vnode_info.vnode.instance.memory / 1024.0 * tmp_time * mem_price
disk_beans = subtask.vnode_info.vnode.instance.disk / 1024.0 * tmp_time * disk_price
gpu_beans = subtask.vnode_info.vnode.instance.gpu * tmp_time * gpu_price
logger.info("subtask:%s running_time=%f beans for: cpu=%f mem_beans=%f disk_beans=%f gpu_beans=%f"
%(self.id, tmp_time, cpu_beans, mem_beans, disk_beans, gpu_beans ))
beans = math.ceil(cpu_beans + mem_beans + disk_beans + gpu_beans)
running_time += tmp_time
billing_beans += beans
return running_time, billing_beans
def __lt__(self, other): def __lt__(self, other):
return self.priority < other.priority return self.priority < other.priority
@ -87,16 +108,18 @@ class SubTask():
self.task_started = False self.task_started = False
self.start_at = 0 self.start_at = 0
self.end_at = 0 self.end_at = 0
self.running_time = 0
self.status = WAITING self.status = WAITING
self.status_reason = '' self.status_reason = ''
self.try_count = 0 self.try_count = 0
self.worker = None self.worker = None
def waiting_for_retry(self): def waiting_for_retry(self,reason=""):
self.try_count += 1 self.try_count += 1
self.status = WAITING if self.try_count <= self.max_retry_count else FAILED self.status = WAITING if self.try_count <= self.max_retry_count else FAILED
if self.status == FAILED and self.root_task.at_same_time: if self.status == FAILED:
self.root_task.status = FAILED self.root_task.status = FAILED
self.failed_reason = reason
class TaskReporter(MasterServicer): class TaskReporter(MasterServicer):
@ -240,6 +263,7 @@ class TaskMgr(threading.Thread):
return [False, e] return [False, e]
subtask.vnode_started = False subtask.vnode_started = False
subtask.end_at = time.time() subtask.end_at = time.time()
subtask.running_time += subtask.end_at - subtask.start_at
self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu
self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu
return [True, ''] return [True, '']
@ -352,7 +376,8 @@ class TaskMgr(threading.Thread):
placed_workers.append(sub_task.worker) placed_workers.append(sub_task.worker)
[success, msg] = self.start_vnode(sub_task) [success, msg] = self.start_vnode(sub_task)
if not success: if not success:
sub_task.waiting_for_retry() sub_task.waiting_for_retry("Fail to start vnode.")
self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start vnode.")
sub_task.worker = None sub_task.worker = None
start_all_vnode_success = False start_all_vnode_success = False
@ -371,7 +396,8 @@ class TaskMgr(threading.Thread):
if success: if success:
sub_task.status = RUNNING sub_task.status = RUNNING
else: else:
sub_task.waiting_for_retry() sub_task.waiting_for_retry("Failt to start task.")
self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start task.")
def clear_sub_tasks(self, sub_task_list): def clear_sub_tasks(self, sub_task_list):
for sub_task in sub_task_list: for sub_task in sub_task_list:
@ -400,11 +426,14 @@ class TaskMgr(threading.Thread):
if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING): if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING):
return False return False
self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list]))) self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list])))
if task.at_same_time and task.status == FAILED:
self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1)
else:
self.jobmgr.report(task.username,task.id,'finished')
self.stop_remove_task(task) self.stop_remove_task(task)
running_time, billing = task.get_billing()
self.logger.info('task %s running_time:%s billing:%d'%(task.id, str(running_time), billing))
running_time = math.ceil(running_time)
if task.status == FAILED:
self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1, running_time, billing)
else:
self.jobmgr.report(task.username,task.id,'finished',running_time=running_time,billing=billing)
return True return True
# this method is called when worker send heart-beat rpc request # this method is called when worker send heart-beat rpc request
@ -430,12 +459,12 @@ class TaskMgr(threading.Thread):
sub_task.status_reason = report.errmsg sub_task.status_reason = report.errmsg
if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT:
sub_task.waiting_for_retry() sub_task.waiting_for_retry(report.errmsg)
self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg)
elif report.subTaskStatus == OUTPUTERROR: elif report.subTaskStatus == OUTPUTERROR:
sub_task.status = FAILED sub_task.status = FAILED
if task.at_same_time: task.status = FAILED
task.status = FAILED task.failed_reason = report.errmsg
self.clear_sub_task(sub_task)
elif report.subTaskStatus == COMPLETED: elif report.subTaskStatus == COMPLETED:
self.clear_sub_task(sub_task) self.clear_sub_task(sub_task)

View File

@ -44,6 +44,7 @@ app.config['SQLALCHEMY_BINDS'] = {
'history': 'sqlite:///'+fsdir+'/global/sys/HistoryTable.db', 'history': 'sqlite:///'+fsdir+'/global/sys/HistoryTable.db',
'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db', 'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db',
'system': 'sqlite:///'+fsdir+'/global/sys/System.db', 'system': 'sqlite:///'+fsdir+'/global/sys/System.db',
'batch':'sqlite:///'+fsdir+'/global/sys/Batch.db',
'login': 'sqlite:///'+fsdir+'/global/sys/Login.db' 'login': 'sqlite:///'+fsdir+'/global/sys/Login.db'
} }
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
@ -435,3 +436,90 @@ class Image(db.Model):
def __repr__(self): def __repr__(self):
return "{\"id\":\"%d\",\"imagename\":\"%s\",\"hasPrivate\":\"%s\",\"hasPublic\":\"%s\",\"ownername\":\"%s\",\"updatetime\":\"%s\",\"description\":\"%s\"}" % (self.id,self.imagename,str(self.hasPrivate),str(self.hasPublic),self.create_time.strftime("%Y-%m-%d %H:%M:%S"),self.ownername,self.description) return "{\"id\":\"%d\",\"imagename\":\"%s\",\"hasPrivate\":\"%s\",\"hasPublic\":\"%s\",\"ownername\":\"%s\",\"updatetime\":\"%s\",\"description\":\"%s\"}" % (self.id,self.imagename,str(self.hasPrivate),str(self.hasPublic),self.create_time.strftime("%Y-%m-%d %H:%M:%S"),self.ownername,self.description)
class Batchjob(db.Model):
__bind_key__ = 'batch'
id = db.Column(db.String(9), primary_key=True)
username = db.Column(db.String(10))
name = db.Column(db.String(30))
priority = db.Column(db.Integer)
status = db.Column(db.String(10))
failed_reason = db.Column(db.Text)
create_time = db.Column(db.DateTime)
end_time = db.Column(db.DateTime)
billing = db.Column(db.Integer)
tasks = db.relationship('Batchtask', backref='batchjob', lazy='dynamic')
def __init__(self,id,username,name,priority):
self.id = id
self.username = username
self.name = name
self.priority = priority
self.status = "pending"
self.failed_reason = ""
self.create_time = datetime.now()
self.end_time = None
self.billing = 0
def __repr__(self):
info = {}
info['job_id'] = self.id
info['username'] = self.username
info['job_name'] = self.name
info['priority'] = self.priority
info['status'] = self.status
info['failed_reason'] = self.failed_reason
info['create_time'] = self.create_time.strftime("%Y-%m-%d %H:%M:%S")
if self.end_time is None:
info['end_time'] = "------"
else:
info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S")
info['billing'] = self.billing
return json.dumps(info)
class Batchtask(db.Model):
__bind_key__ = 'batch'
id = db.Column(db.String(15), primary_key=True)
idx = db.Column(db.String(10))
jobid = db.Column(db.String(9), db.ForeignKey('batchjob.id'))
status = db.Column(db.String(15))
failed_reason = db.Column(db.Text)
start_time = db.Column(db.DateTime)
end_time = db.Column(db.DateTime)
running_time = db.Column(db.Integer)
billing = db.Column(db.Integer)
config = db.Column(db.Text)
tried_times = db.Column(db.Integer)
def __init__(self, id, idx, config):
self.id = id
self.idx = idx
self.status = "pending"
self.failed_reason = ""
self.start_time = None
self.end_time = None
self.running_time = 0
self.billing = 0
self.config = json.dumps(config)
self.tried_times = 0
def __repr__(self):
info = {}
info['id'] = self.id
info['idx'] = self.idx
info['jobid'] = self.jobid
info['status'] = self.status
info['failed_reason'] = self.failed_reason
if self.start_time is None:
info['start_time'] = "------"
else:
info['start_time'] = self.start_time.strftime("%Y-%m-%d %H:%M:%S")
if self.end_time is None:
info['end_time'] = "------"
else:
info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S")
info['running_time'] = self.running_time
info['billing'] = self.billing
info['config'] = json.loads(self.config)
info['tried_times'] = self.tried_times
return json.dumps(info)

View File

@ -147,6 +147,7 @@
$("select#masterselector").change(function() { $("select#masterselector").change(function() {
var masterip=$(this).children('option:selected').val(); var masterip=$(this).children('option:selected').val();
$("#form").attr("action","/batch_job/"+ masterip +"/add/");
var mastername=$(this).children('option:selected').html(); var mastername=$(this).children('option:selected').html();
console.log(masterip); console.log(masterip);
var host = window.location.host; var host = window.location.host;

View File

@ -33,7 +33,7 @@
</p> </p>
{% for master in masterips %} {% for master in masterips %}
{% for job_info in job_list[master.split('@')[0]] %} {% for job_info in job_list[master.split('@')[0]] %}
<div class="modal inmodal" id='OutputModal_{{ master }}_{{ job_info['job_id'] }}' tabindex="-1" role="dialog" aria-hidden="true"> <div class="modal inmodal" id='OutputModal_{{ master.split('@')[1] }}_{{ job_info['job_id'] }}' tabindex="-1" role="dialog" aria-hidden="true">
<div class="modal-dialog"> <div class="modal-dialog">
<div class="modal-content animated fadeIn"> <div class="modal-content animated fadeIn">
<div class="modal-header"> <div class="modal-header">
@ -99,7 +99,7 @@
<td>Tasks</td> <td>Tasks</td>
<td><a href="/batch_job/{{master.split("@")[0]}}/stop/{{ job_info['job_id'] }}/"><button type="button" class="btn btn-xs btn-warning"> &nbsp;Stop&nbsp;&nbsp; </button></a></td> <td><a href="/batch_job/{{master.split("@")[0]}}/stop/{{ job_info['job_id'] }}/"><button type="button" class="btn btn-xs btn-warning"> &nbsp;Stop&nbsp;&nbsp; </button></a></td>
<td>{{ job_info['create_time'] }}</td> <td>{{ job_info['create_time'] }}</td>
<td><a role="button" class="btn btn-info btn-xs" id='{{ master }}_{{ job_info['job_id'] }}_output' data-toggle="modal" data-target='#OutputModal_{{ master }}_{{ job_info['job_id'] }}'>Get Output</a></td> <td><a role="button" class="btn btn-info btn-xs" id='{{ master }}_{{ job_info['job_id'] }}_output' data-toggle="modal" data-target='#OutputModal_{{ master.split('@')[1] }}_{{ job_info['job_id'] }}'>Get Output</a></td>
</tr> </tr>
{% endfor %} {% endfor %}
{% endfor %} {% endfor %}
@ -118,7 +118,7 @@
<script type="text/javascript"> <script type="text/javascript">
$(document).ready(function() { $(document).ready(function() {
$(".table-batch").DataTable({"scrollX":true,"order":[[ 5, "desc" ]]}); $(".table-batch").DataTable({"scrollX":true,"order":[[ 6, "desc" ]]});
$(".table-output").DataTable({ $(".table-output").DataTable({
"lengthChange":false}); "lengthChange":false});
}); });