diff --git a/src/master/httprest.py b/src/master/httprest.py index 6d6c603..fb3b059 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -792,6 +792,7 @@ def resetall_system(user, beans, form): @app.route("/batch/job/add/", methods=['POST']) @login_required +@beans_check def add_job(user,beans,form): global G_jobmgr job_data = form.to_dict() @@ -879,6 +880,17 @@ def list_job(user,beans,form): } return json.dumps(result) +@app.route("/batch/job/info/", methods=['POST']) +@login_required +def info_job(user,beans,form): + global G_jobmgr + jobid = form.get("jobid","") + [success, data] = G_jobmgr.get_job(user, jobid) + if success: + return json.dumps({'success':'true', 'data':data}) + else: + return json.dumps({'success':'false', 'message': data}) + @app.route("/batch/job/stop/", methods=['POST']) @login_required def stop_job(user,beans,form): @@ -904,12 +916,6 @@ def get_output(user,beans,form): } return json.dumps(result) - -@app.route("/batch/job/info/", methods=['POST']) -@login_required -def info_job(user,beans,form): - pass - @app.route("/batch/task/info/", methods=['POST']) @login_required def info_task(user,beans,form): diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 7b7fcff..5097c03 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -1,33 +1,46 @@ -import time, threading, random, string, os, traceback +import time, threading, random, string, os, traceback, requests import master.monitor import subprocess,json from functools import wraps +from datetime import datetime from utils.log import initlogging, logger +from utils.model import db, Batchjob, Batchtask from utils import env +def db_commit(): + try: + db.session.commit() + except Exception as err: + db.session.rollback() + logger.error(traceback.format_exc()) + raise + class BatchJob(object): - def __init__(self, user, job_info): + def __init__(self, jobid, user, job_info): + self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority'])) self.user = user - self.raw_job_info = job_info - self.job_id = None + #self.raw_job_info = job_info + self.job_id = jobid self.job_name = job_info['jobName'] self.job_priority = int(job_info['jobPriority']) - self.status = 'pending' - self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) self.lock = threading.Lock() self.tasks = {} self.dependency_out = {} - self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} + self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0, 'stopped':0} #init self.tasks & self.dependency_out & self.tasks_cnt - logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time)) - raw_tasks = self.raw_job_info["tasks"] + logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.username, self.job_db.name, str(self.job_db.create_time))) + raw_tasks = job_info["tasks"] self.tasks_cnt['pending'] = len(raw_tasks.keys()) for task_idx in raw_tasks.keys(): task_info = raw_tasks[task_idx] + task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info) + self.job_db.tasks.append(task_db) self.tasks[task_idx] = {} + self.tasks[task_idx]['id'] = jobid+"_"+task_idx self.tasks[task_idx]['config'] = task_info + self.tasks[task_idx]['db'] = task_db self.tasks[task_idx]['status'] = 'pending' self.tasks[task_idx]['dependency'] = [] dependency = task_info['dependency'].strip().replace(' ', '').split(',') @@ -41,8 +54,11 @@ class BatchJob(object): self.dependency_out[d] = [] self.dependency_out[d].append(task_idx) + db.session.add(self.job_db) + db_commit() + self.log_status() - logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3))) + logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_db.id, json.dumps(self.dependency_out, indent=3))) def data_lock(f): @wraps(f) @@ -60,7 +76,7 @@ class BatchJob(object): # return the tasks without dependencies @data_lock def get_tasks_no_dependency(self,update_status=False): - logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id) + logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_db.id) ret_tasks = [] for task_idx in self.tasks.keys(): if (self.tasks[task_idx]['status'] == 'pending' and @@ -68,53 +84,83 @@ class BatchJob(object): if update_status: self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'scheduling' self.tasks[task_idx]['status'] = 'scheduling' - task_name = self.job_id + '_' + task_idx + task_name = self.tasks[task_idx]['db'].id ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority]) self.log_status() + db_commit() return ret_tasks @data_lock - def stop_tasks(self): - for task_idx in self.tasks.keys(): - self.tasks[task_idx]['status'] = 'stopped' + def stop_job(self): + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.status = 'stopping' + db_commit() # update status of this job based def _update_job_status(self): allcnt = len(self.tasks.keys()) if self.tasks_cnt['failed'] != 0: - self.status = 'failed' - elif self.tasks_cnt['running'] != 0: - self.status = 'running' + self.job_db.status = 'failed' + self.job_db.end_time = datetime.now() elif self.tasks_cnt['finished'] == allcnt: - self.status = 'done' + self.job_db.status = 'done' + self.job_db.end_time = datetime.now() + elif self.job_db.status == 'stopping': + if self.tasks_cnt['running'] == 0 and self.tasks_cnt['scheduling'] == 0 and self.tasks_cnt['retrying'] == 0: + self.job_db.status = 'stopped' + self.job_db.end_time = datetime.now() + elif self.tasks_cnt['running'] != 0 or self.tasks_cnt['retrying'] != 0: + self.job_db.status = 'running' else: - self.status = 'pending' + self.job_db.status = 'pending' + db_commit() # start run a task, update status @data_lock def update_task_running(self, task_idx): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id)) - old_status = self.tasks[task_idx]['status'].split('(')[0] + old_status = self.tasks[task_idx]['status'] + if old_status == 'stopping': + logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id)) + return self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'running' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'running' + self.tasks[task_idx]['db'].start_time = datetime.now() self.tasks_cnt['running'] += 1 + self.job_db = Batchjob.query.get(self.job_id) self._update_job_status() self.log_status() # a task has finished, update dependency and return tasks without dependencies @data_lock - def finish_task(self, task_idx): + def finish_task(self, task_idx, running_time, billing): if task_idx not in self.tasks.keys(): logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) return [] - logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id)) - old_status = self.tasks[task_idx]['status'].split('(')[0] + logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished(running_time=%d,billing=%d). Update dependency..." % (task_idx, self.job_id, running_time, billing)) + old_status = self.tasks[task_idx]['status'] + if old_status == 'stopping': + logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id)) + return self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'finished' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'finished' + self.tasks[task_idx]['db'].tried_times += 1 + self.tasks[task_idx]['db'].running_time = running_time + self.tasks[task_idx]['db'].end_time = datetime.now() + self.tasks[task_idx]['db'].billing = billing + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.billing += billing self.tasks_cnt['finished'] += 1 - self._update_job_status() + if task_idx not in self.dependency_out.keys(): + self._update_job_status() self.log_status() return [] ret_tasks = [] @@ -129,8 +175,11 @@ class BatchJob(object): self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[out_idx]['status'] = 'scheduling' + self.tasks[out_idx]['db'] = Batchtask.query.get(self.tasks[out_idx]['id']) + self.tasks[out_idx]['db'].status = 'scheduling' task_name = self.job_id + '_' + out_idx ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) + self._update_job_status() self.log_status() return ret_tasks @@ -138,27 +187,62 @@ class BatchJob(object): @data_lock def update_task_retrying(self, task_idx, reason, tried_times): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) - old_status = self.tasks[task_idx]['status'].split('(')[0] + old_status = self.tasks[task_idx]['status'] + if old_status == 'stopping': + logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id)) + return self.tasks_cnt[old_status] -= 1 self.tasks_cnt['retrying'] += 1 - self.tasks[task_idx]['status'] = 'retrying(%s)(%d times)' % (reason, int(tried_times)) + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'retrying' + self.tasks[task_idx]['db'].failed_reason = reason + self.tasks[task_idx]['db'].tried_times += 1 + self.tasks[task_idx]['status'] = 'retrying' + self.job_db = Batchjob.query.get(self.job_id) self._update_job_status() self.log_status() # update failed status of task @data_lock - def update_task_failed(self, task_idx, reason, tried_times): + def update_task_failed(self, task_idx, reason, tried_times, running_time, billing): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) - old_status = self.tasks[task_idx]['status'].split('(')[0] + old_status = self.tasks[task_idx]['status'] self.tasks_cnt[old_status] -= 1 self.tasks_cnt['failed'] += 1 - if reason == "OUTPUTERROR": - self.tasks[task_idx]['status'] = 'failed(OUTPUTERROR)' - else: - self.tasks[task_idx]['status'] = 'failed(%s)(%d times)' % (reason, int(tried_times)) + self.tasks[task_idx]['status'] = 'failed' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'failed' + self.tasks[task_idx]['db'].failed_reason = reason + self.tasks[task_idx]['db'].tried_times += 1 + self.tasks[task_idx]['db'].end_time = datetime.now() + self.tasks[task_idx]['db'].running_time = running_time + self.tasks[task_idx]['db'].billing = billing + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.billing += billing self._update_job_status() self.log_status() + @data_lock + def update_task_stopped(self, task_idx, running_time, billing): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) stopped.running_time:%d billing:%d" % (task_idx, self.job_id, int(running_time), billing)) + old_status = self.tasks[task_idx]['status'] + if old_status == 'failed' or old_status == 'finished' or old_status == 'stopped': + logger.info("task(idx:%s) of BatchJob(id:%s) has been done."%(task_idx, self.job_id)) + return False + self.tasks_cnt[old_status] -= 1 + self.tasks_cnt['stopped'] += 1 + self.tasks[task_idx]['status'] = 'stopped' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'stopped' + self.tasks[task_idx]['db'].end_time = datetime.now() + self.tasks[task_idx]['db'].running_time = running_time + self.tasks[task_idx]['db'].billing = billing + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.billing += billing + self._update_job_status() + self.log_status() + return True + # print status for debuging def log_status(self): task_copy = {} @@ -168,24 +252,55 @@ class BatchJob(object): task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency'] logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3))) logger.debug("BatchJob(id:%s) tasks_cnt: %s" % (self.job_id, self.tasks_cnt)) - logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status)) + logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.job_db.status)) class JobMgr(): # load job information from etcd # initial a job queue and job schedueler def __init__(self, taskmgr): + try: + Batchjob.query.all() + except: + db.create_all(bind='__all__') self.job_map = {} self.taskmgr = taskmgr self.fspath = env.getenv('FS_PREFIX') + self.lock = threading.Lock() + self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT')) + self.auth_key = env.getenv('AUTH_KEY') + + def charge_beans(self,username,billing): + logger.debug("Charge user(%s) for %d beans"%(username, billing)) + data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key} + url = "/billing/beans/" + return requests.post(self.userpoint+url,data=data).json() + + def add_lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): + self.lock.acquire() + try: + result = f(self, *args, **kwargs) + except Exception as err: + self.lock.release() + raise err + self.lock.release() + return result + return new_f + + @add_lock + def create_job(self, user, job_info): + jobid = self.gen_jobid() + job = BatchJob(jobid, user, job_info) + return job # user: username # job_info: a json string # user submit a new job, add this job to queue and database def add_job(self, user, job_info): try: - job = BatchJob(user, job_info) - job.job_id = self.gen_jobid() + job = self.create_job(user, job_info) self.job_map[job.job_id] = job self.process_job(job) except ValueError as err: @@ -201,17 +316,18 @@ class JobMgr(): # jobid: the id of job def stop_job(self, user, job_id): logger.info("[jobmgr] stop job(id:%s) user(%s)"%(job_id, user)) + if job_id not in self.job_map.keys(): + return [False,"Job id %s does not exists! Maybe it has been finished."%job_id] try: job = self.job_map[job_id] - if job.status == 'done' or job.status == 'failed': + if job.job_db.status == 'done' or job.job_db.status == 'failed': return [True,""] if job.user != user: raise Exception("Wrong User.") for task_idx in job.tasks.keys(): taskid = job_id + '_' + task_idx - task = self.taskmgr.get_task(taskid) - self.taskmgr.stop_remove_task(task) - job.status = 'stopped' + self.taskmgr.lazy_stop_task(taskid) + job.stop_job() except Exception as err: logger.error(traceback.format_exc()) #logger.error(err) @@ -221,42 +337,44 @@ class JobMgr(): # user: username # list a user's all job def list_jobs(self,user): + alljobs = Batchjob.query.filter_by(username=user).all() res = [] - for job_id in self.job_map.keys(): - job = self.job_map[job_id] - logger.debug('job_id: %s, user: %s' % (job_id, job.user)) - if job.user == user: - all_tasks = job.raw_job_info['tasks'] - tasks_vnodeCount = {} - for task in all_tasks.keys(): - tasks_vnodeCount[task] = int(all_tasks[task]['vnodeCount']) - res.append({ - 'job_name': job.job_name, - 'job_id': job.job_id, - 'status': job.status, - 'create_time': job.create_time, - 'tasks': list(all_tasks.keys()), - 'tasks_vnodeCount': tasks_vnodeCount - }) - res.sort(key=lambda x:x['create_time'],reverse=True) + for job in alljobs: + jobdata = json.loads(str(job)) + tasks = job.tasks.all() + jobdata['tasks'] = [t.idx for t in tasks] + tasks_vnodeCount = {} + for t in tasks: + tasks_vnodeCount[t.idx] = int(json.loads(t.config)['vnodeCount']) + jobdata['tasks_vnodeCount'] = tasks_vnodeCount + res.append(jobdata) return res # user: username # jobid: the id of job # get the information of a job, including the status, json description and other information - # call get_task to get the task information def get_job(self, user, job_id): - pass + job = Batchjob.query.get(job_id) + if job is None: + return [False, "Jobid(%s) does not exist."%job_id] + if job.username != user: + return [False, "Wrong User!"] + jobdata = json.loads(str(job)) + tasks = job.tasks.order_by(Batchtask.idx).all() + tasksdata = [json.loads(str(t)) for t in tasks] + jobdata['tasks'] = tasksdata + return [True, jobdata] # check if a job exists def is_job_exist(self, job_id): - return job_id in self.job_map.keys() + return Batchjob.query.get(job_id) != None # generate a random job id def gen_jobid(self): - job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + datestr = datetime.now().strftime("%y%m%d") + job_id = datestr+''.join(random.sample(string.ascii_letters + string.digits, 3)) while self.is_job_exist(job_id): - job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + job_id = datestr+''.join(random.sample(string.ascii_letters + string.digits, 3)) return job_id # add tasks into taskmgr's queue @@ -277,27 +395,53 @@ class JobMgr(): # report task status from taskmgr when running, failed and finished # task_name: job_id + '_' + task_idx - # status: 'running', 'finished', 'retrying', 'failed' + # status: 'running', 'finished', 'retrying', 'failed', 'stopped' # reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR" # tried_times: how many times the task has been tried. - def report(self, user, task_name, status, reason="", tried_times=1): + def report(self, user, task_name, status, reason="", tried_times=1, running_time=0, billing=0): split_task_name = task_name.split('_') if len(split_task_name) != 2: - logger.error("Illegal task_name(%s) report from taskmgr" % task_name) + logger.error("[jobmgr report]Illegal task_name(%s) report from taskmgr" % task_name) return + if billing > 0 and (status == 'failed' or status == 'finished'): + self.charge_beans(user, billing) job_id, task_idx = split_task_name + if job_id not in self.job_map.keys(): + logger.error("[jobmgr report]jobid(%s) does not exist. task_name(%s)" % (job_id,task_name)) + #update data in db + taskdb = Batchtask.query.get(task_name) + if (taskdb is None or taskdb.status == 'finished' or + taskdb.status == 'failed' or taskdb.status == 'stopped'): + return + taskdb.status = status + if status == 'failed': + taskdb.failed_reason = reason + if status == 'failed' or status == 'stopped' or status == 'finished': + taskdb.end_time = datetime.now() + if billing > 0: + taskdb.running_time = running_time + taskdb.billing = billing + db_commit() + return job = self.job_map[job_id] if status == "running": + #logger.debug(str(job.job_db)) job.update_task_running(task_idx) + #logger.debug(str(job.job_db)) elif status == "finished": - next_tasks = job.finish_task(task_idx) - if len(next_tasks) == 0: - return + #logger.debug(str(job.job_db)) + next_tasks = job.finish_task(task_idx, running_time, billing) ret = self.add_task_taskmgr(user, next_tasks) + #logger.debug(str(job.job_db)) elif status == "retrying": job.update_task_retrying(task_idx, reason, tried_times) elif status == "failed": - job.update_task_failed(task_idx, reason, tried_times) + job.update_task_failed(task_idx, reason, tried_times, running_time, billing) + elif status == "stopped": + if job.update_task_stopped(task_idx, running_time, billing) and billing > 0: + self.charge_beans(user, billing) + if job.job_db.status == 'done' or job.job_db.status == 'failed' or job.job_db.status == 'stopped': + del self.job_map[job_id] # Get Batch job stdout or stderr from its file def get_output(self, username, jobid, taskid, vnodeid, issue): diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index d320a67..6428355 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -3,7 +3,7 @@ import time import string import os import random, copy, subprocess -import json +import json, math from functools import wraps # must import logger after initlogging, ugly @@ -29,6 +29,7 @@ class Task(): self.id = task_id self.username = username self.status = WAITING + self.failed_reason = "" # if all the vnodes must be started at the same time self.at_same_time = at_same_time # priority the bigger the better @@ -46,6 +47,26 @@ class Task(): max_retry_count = task_info['max_retry_count'] ) for (index, task_info) in enumerate(task_infos)] + def get_billing(self): + billing_beans = 0 + running_time = 0 + cpu_price = 1 / 3600.0 # /core*s + mem_price = 1 / 3600.0 # /GB*s + disk_price = 1 / 3600.0 # /GB*s + gpu_price = 100 / 3600.0 # /core*s + for subtask in self.subtask_list: + tmp_time = subtask.running_time + cpu_beans = subtask.vnode_info.vnode.instance.cpu * tmp_time * cpu_price + mem_beans = subtask.vnode_info.vnode.instance.memory / 1024.0 * tmp_time * mem_price + disk_beans = subtask.vnode_info.vnode.instance.disk / 1024.0 * tmp_time * disk_price + gpu_beans = subtask.vnode_info.vnode.instance.gpu * tmp_time * gpu_price + logger.info("subtask:%s running_time=%f beans for: cpu=%f mem_beans=%f disk_beans=%f gpu_beans=%f" + %(self.id, tmp_time, cpu_beans, mem_beans, disk_beans, gpu_beans )) + beans = math.ceil(cpu_beans + mem_beans + disk_beans + gpu_beans) + running_time += tmp_time + billing_beans += beans + return running_time, billing_beans + def __lt__(self, other): return self.priority < other.priority @@ -87,16 +108,18 @@ class SubTask(): self.task_started = False self.start_at = 0 self.end_at = 0 + self.running_time = 0 self.status = WAITING self.status_reason = '' self.try_count = 0 self.worker = None - def waiting_for_retry(self): + def waiting_for_retry(self,reason=""): self.try_count += 1 self.status = WAITING if self.try_count <= self.max_retry_count else FAILED - if self.status == FAILED and self.root_task.at_same_time: + if self.status == FAILED: self.root_task.status = FAILED + self.failed_reason = reason class TaskReporter(MasterServicer): @@ -123,7 +146,10 @@ class TaskMgr(threading.Thread): self.task_queue = [] self.lazy_append_list = [] self.lazy_delete_list = [] + self.lazy_stop_list = [] self.task_queue_lock = threading.Lock() + self.stop_lock = threading.Lock() + self.add_lock = threading.Lock() #self.user_containers = {} self.scheduler_interval = scheduler_interval @@ -155,23 +181,21 @@ class TaskMgr(threading.Thread): self.logger.info("Free nets addresses pool %s" % str(self.free_nets)) self.logger.info("Each Batch Net CIDR:%s"%(str(self.task_cidr))) - def queue_lock(f): - @wraps(f) - def new_f(self, *args, **kwargs): - self.task_queue_lock.acquire() - result = f(self, *args, **kwargs) - self.task_queue_lock.release() - return result - return new_f - - def net_lock(f): - @wraps(f) - def new_f(self, *args, **kwargs): - self.network_lock.acquire() - result = f(self, *args, **kwargs) - self.network_lock.release() - return result - return new_f + def data_lock(lockname): + def lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): + lockobj = getattr(self,lockname) + lockobj.acquire() + try: + result = f(self, *args, **kwargs) + except Exception as err: + lockobj.release() + raise err + lockobj.release() + return result + return new_f + return lock def run(self): self.serve() @@ -195,14 +219,36 @@ class TaskMgr(threading.Thread): self.server.stop(0) self.logger.info('[taskmgr_rpc] stop rpc server') - @queue_lock + @data_lock('task_queue_lock') + @data_lock('add_lock') + @data_lock('stop_lock') def sort_out_task_queue(self): + + for task in self.task_queue: + if task.id in self.lazy_stop_list: + self.stop_remove_task(task) + self.lazy_delete_list.append(task) + running_time, billing = task.get_billing() + self.logger.info('task %s stopped, running_time:%s billing:%d'%(task.id, str(running_time), billing)) + running_time = math.ceil(running_time) + self.jobmgr.report(task.username, task.id,'stopped',running_time=running_time,billing=billing) + while self.lazy_delete_list: task = self.lazy_delete_list.pop(0) try: self.task_queue.remove(task) except Exception as err: self.logger.warning(str(err)) + + new_append_list = [] + for task in self.lazy_append_list: + if task.id in self.lazy_stop_list: + self.jobmgr.report(task.username, task.id, 'stopped') + else: + new_append_list.append(task) + + self.lazy_append_list = new_append_list + self.lazy_stop_list.clear() if self.lazy_append_list: self.task_queue.extend(self.lazy_append_list) self.lazy_append_list.clear() @@ -240,6 +286,7 @@ class TaskMgr(threading.Thread): return [False, e] subtask.vnode_started = False subtask.end_at = time.time() + subtask.running_time += subtask.end_at - subtask.start_at self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu return [True, ''] @@ -261,7 +308,7 @@ class TaskMgr(threading.Thread): def stop_subtask(self, subtask): try: - self.logger.info('[task_processor] Stoping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) + self.logger.info('[task_processor] Stopping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) stub = WorkerStub(channel) response = stub.stop_task(subtask.command_info) @@ -275,14 +322,14 @@ class TaskMgr(threading.Thread): subtask.task_started = False return [True, ''] - @net_lock + @data_lock('network_lock') def acquire_task_ips(self, task): self.logger.info("[acquire_task_ips] user(%s) task(%s) net(%s)" % (task.username, task.id, str(task.task_base_ip))) if task.task_base_ip == None: task.task_base_ip = self.free_nets.pop(0) return task.task_base_ip - @net_lock + @data_lock('network_lock') def release_task_ips(self, task): self.logger.info("[release_task_ips] user(%s) task(%s) net(%s)" % (task.username, task.id, str(task.task_base_ip))) if task.task_base_ip == None: @@ -352,7 +399,8 @@ class TaskMgr(threading.Thread): placed_workers.append(sub_task.worker) [success, msg] = self.start_vnode(sub_task) if not success: - sub_task.waiting_for_retry() + sub_task.waiting_for_retry("Fail to start vnode.") + self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start vnode.") sub_task.worker = None start_all_vnode_success = False @@ -371,7 +419,8 @@ class TaskMgr(threading.Thread): if success: sub_task.status = RUNNING else: - sub_task.waiting_for_retry() + sub_task.waiting_for_retry("Failt to start task.") + self.jobmgr.report(task.username, task.id, 'retrying', "Fail to start task.") def clear_sub_tasks(self, sub_task_list): for sub_task in sub_task_list: @@ -385,6 +434,10 @@ class TaskMgr(threading.Thread): self.stop_vnode(sub_task) #pass + @data_lock('stop_lock') + def lazy_stop_task(self, taskid): + self.lazy_stop_list.append(taskid) + def stop_remove_task(self, task): if task is None: return @@ -392,7 +445,6 @@ class TaskMgr(threading.Thread): self.clear_sub_tasks(task.subtask_list) self.release_task_ips(task) self.remove_tasknet(task) - self.lazy_delete_list.append(task) def check_task_completed(self, task): if task.status == RUNNING or task.status == WAITING: @@ -400,11 +452,15 @@ class TaskMgr(threading.Thread): if sub_task.command_info != None and (sub_task.status == RUNNING or sub_task.status == WAITING): return False self.logger.info('task %s finished, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list]))) - if task.at_same_time and task.status == FAILED: - self.jobmgr.report(task.username,task.id,"failed","",task.subtask_list[0].max_retry_count+1) - else: - self.jobmgr.report(task.username,task.id,'finished') self.stop_remove_task(task) + self.lazy_delete_list.append(task) + running_time, billing = task.get_billing() + self.logger.info('task %s running_time:%s billing:%d'%(task.id, str(running_time), billing)) + running_time = math.ceil(running_time) + if task.status == FAILED: + self.jobmgr.report(task.username,task.id,"failed",task.failed_reason,task.subtask_list[0].max_retry_count+1, running_time, billing) + else: + self.jobmgr.report(task.username,task.id,'finished',running_time=running_time,billing=billing) return True # this method is called when worker send heart-beat rpc request @@ -430,12 +486,12 @@ class TaskMgr(threading.Thread): sub_task.status_reason = report.errmsg if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: - sub_task.waiting_for_retry() + sub_task.waiting_for_retry(report.errmsg) + self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg) elif report.subTaskStatus == OUTPUTERROR: sub_task.status = FAILED - if task.at_same_time: - task.status = FAILED - self.clear_sub_task(sub_task) + task.status = FAILED + task.failed_reason = report.errmsg elif report.subTaskStatus == COMPLETED: self.clear_sub_task(sub_task) @@ -445,7 +501,7 @@ class TaskMgr(threading.Thread): self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue)) for task in self.task_queue: - if task in self.lazy_delete_list: + if task in self.lazy_delete_list or task.id in self.lazy_stop_list: continue self.logger.info('task %s sub_tasks %s' % (task.id, str([sub_task.status for sub_task in task.subtask_list]))) if self.check_task_completed(task): @@ -577,6 +633,7 @@ class TaskMgr(threading.Thread): # save the task information into database # called when jobmgr assign task to taskmgr + @data_lock('add_lock') def add_task(self, username, taskid, json_task, task_priority=1): # decode json string to object defined in grpc self.logger.info('[taskmgr add_task] receive task %s' % taskid) @@ -654,12 +711,12 @@ class TaskMgr(threading.Thread): return True - @queue_lock + @data_lock('task_queue_lock') def get_task_list(self): return self.task_queue.copy() - @queue_lock + @data_lock('task_queue_lock') def get_task(self, taskid): for task in self.task_queue: if task.id == taskid: diff --git a/src/utils/model.py b/src/utils/model.py index 37ef56c..8eca040 100755 --- a/src/utils/model.py +++ b/src/utils/model.py @@ -44,6 +44,7 @@ app.config['SQLALCHEMY_BINDS'] = { 'history': 'sqlite:///'+fsdir+'/global/sys/HistoryTable.db', 'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db', 'system': 'sqlite:///'+fsdir+'/global/sys/System.db', + 'batch':'sqlite:///'+fsdir+'/global/sys/Batch.db?check_same_thread=False', 'login': 'sqlite:///'+fsdir+'/global/sys/Login.db' } app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True @@ -435,3 +436,90 @@ class Image(db.Model): def __repr__(self): return "{\"id\":\"%d\",\"imagename\":\"%s\",\"hasPrivate\":\"%s\",\"hasPublic\":\"%s\",\"ownername\":\"%s\",\"updatetime\":\"%s\",\"description\":\"%s\"}" % (self.id,self.imagename,str(self.hasPrivate),str(self.hasPublic),self.create_time.strftime("%Y-%m-%d %H:%M:%S"),self.ownername,self.description) + +class Batchjob(db.Model): + __bind_key__ = 'batch' + id = db.Column(db.String(9), primary_key=True) + username = db.Column(db.String(10)) + name = db.Column(db.String(30)) + priority = db.Column(db.Integer) + status = db.Column(db.String(10)) + failed_reason = db.Column(db.Text) + create_time = db.Column(db.DateTime) + end_time = db.Column(db.DateTime) + billing = db.Column(db.Integer) + tasks = db.relationship('Batchtask', backref='batchjob', lazy='dynamic') + + def __init__(self,id,username,name,priority): + self.id = id + self.username = username + self.name = name + self.priority = priority + self.status = "pending" + self.failed_reason = "" + self.create_time = datetime.now() + self.end_time = None + self.billing = 0 + + def __repr__(self): + info = {} + info['job_id'] = self.id + info['username'] = self.username + info['job_name'] = self.name + info['priority'] = self.priority + info['status'] = self.status + info['failed_reason'] = self.failed_reason + info['create_time'] = self.create_time.strftime("%Y-%m-%d %H:%M:%S") + if self.end_time is None: + info['end_time'] = "------" + else: + info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S") + info['billing'] = self.billing + return json.dumps(info) + +class Batchtask(db.Model): + __bind_key__ = 'batch' + id = db.Column(db.String(15), primary_key=True) + idx = db.Column(db.String(10)) + jobid = db.Column(db.String(9), db.ForeignKey('batchjob.id')) + status = db.Column(db.String(15)) + failed_reason = db.Column(db.Text) + start_time = db.Column(db.DateTime) + end_time = db.Column(db.DateTime) + running_time = db.Column(db.Integer) + billing = db.Column(db.Integer) + config = db.Column(db.Text) + tried_times = db.Column(db.Integer) + + def __init__(self, id, idx, config): + self.id = id + self.idx = idx + self.status = "pending" + self.failed_reason = "" + self.start_time = None + self.end_time = None + self.running_time = 0 + self.billing = 0 + self.config = json.dumps(config) + self.tried_times = 0 + + def __repr__(self): + info = {} + info['id'] = self.id + info['idx'] = self.idx + info['jobid'] = self.jobid + info['status'] = self.status + info['failed_reason'] = self.failed_reason + if self.start_time is None: + info['start_time'] = "------" + else: + info['start_time'] = self.start_time.strftime("%Y-%m-%d %H:%M:%S") + if self.end_time is None: + info['end_time'] = "------" + else: + info['end_time'] = self.end_time.strftime("%Y-%m-%d %H:%M:%S") + info['running_time'] = self.running_time + info['billing'] = self.billing + info['config'] = json.loads(self.config) + info['tried_times'] = self.tried_times + return json.dumps(info) diff --git a/src/utils/nettools.py b/src/utils/nettools.py index 49ce7d1..dc28d0e 100755 --- a/src/utils/nettools.py +++ b/src/utils/nettools.py @@ -430,6 +430,10 @@ class portcontrol(object): ports_lock.release() try: subprocess.run(['iptables','-t','nat','-A','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + except subprocess.CalledProcessError as suberror: + return [False, "set port mapping failed : %s" % suberror.stdout.decode('utf-8')] + try: + subprocess.run(['iptables','-t','nat','-A','PREROUTING','-p','udp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) return [True, str(free_port)] except subprocess.CalledProcessError as suberror: return [False, "set port mapping failed : %s" % suberror.stdout.decode('utf-8')] @@ -447,6 +451,10 @@ class portcontrol(object): subprocess.run(['iptables','-t','nat','-D','PREROUTING','-p','tcp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) except subprocess.CalledProcessError as suberror: return [False, "release port mapping failed : %s" % suberror.stdout.decode('utf-8')] + try: + subprocess.run(['iptables','-t','nat','-D','PREROUTING','-p','udp','--dport',str(free_port),"-j","DNAT",'--to-destination','%s:%s'%(container_ip,container_port)], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=False, check=True) + except subprocess.CalledProcessError as suberror: + return [False, "release port mapping failed : %s" % suberror.stdout.decode('utf-8')] ports_lock.acquire() free_ports[free_port] = True allocated_ports[container_name].pop(container_port) diff --git a/src/worker/taskworker.py b/src/worker/taskworker.py index b6f7c2e..83304ed 100755 --- a/src/worker/taskworker.py +++ b/src/worker/taskworker.py @@ -432,7 +432,7 @@ class TaskWorker(rpc_pb2_grpc.WorkerServicer): self.add_msg(taskid,username,vnodeid,rpc_pb2.COMPLETED,token,"") else: logger.info("Task(%s-%s-%s) failed." % (str(taskid),str(vnodeid),token)) - self.add_msg(taskid,username,vnodeid,rpc_pb2.FAILED,token,"") + self.add_msg(taskid,username,vnodeid,rpc_pb2.FAILED,token,"Runtime Error. More information in stderr log.") def add_msg(self,taskid,username,vnodeid,status,token,errmsg): self.msgslock.acquire() diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index 465d015..469f560 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -43,6 +43,14 @@