diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index c66f6cf..09d85bf 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -27,7 +27,7 @@ class BatchJob(object): self.lock = threading.Lock() self.tasks = {} self.dependency_out = {} - self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} + self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0, 'stopped':0} #init self.tasks & self.dependency_out & self.tasks_cnt logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.username, self.job_db.name, str(self.job_db.create_time))) @@ -95,12 +95,8 @@ class BatchJob(object): @data_lock def stop_job(self): - for task_idx in self.tasks.keys(): - self.tasks[task_idx]['status'] = 'stopped' - self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) - self.tasks[task_idx]['db'].status = 'stopped' self.job_db = Batchjob.query.get(self.job_id) - self.job_db.status = 'stopped' + self.job_db.status = 'stopping' db_commit() # update status of this job based @@ -108,10 +104,13 @@ class BatchJob(object): allcnt = len(self.tasks.keys()) if self.tasks_cnt['failed'] != 0: self.job_db.status = 'failed' - elif self.tasks_cnt['running'] != 0: - self.job_db.status = 'running' elif self.tasks_cnt['finished'] == allcnt: self.job_db.status = 'done' + elif self.job_db.status == 'stopping': + if self.tasks_cnt['running'] == 0 and self.tasks_cnt['scheduling'] == 0 and self.tasks_cnt['retrying'] == 0: + self.job_db.status = 'stopped' + elif self.tasks_cnt['running'] != 0 or self.tasks_cnt['retrying'] != 0: + self.job_db.status = 'running' else: self.job_db.status = 'pending' db_commit() @@ -121,7 +120,7 @@ class BatchJob(object): def update_task_running(self, task_idx): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id)) old_status = self.tasks[task_idx]['status'] - if old_status == 'stopped': + if old_status == 'stopping': logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id)) return self.tasks_cnt[old_status] -= 1 @@ -141,7 +140,7 @@ class BatchJob(object): return [] logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished(running_time=%d,billing=%d). Update dependency..." % (task_idx, self.job_id, running_time, billing)) old_status = self.tasks[task_idx]['status'] - if old_status == 'stopped': + if old_status == 'stopping': logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id)) return self.tasks_cnt[old_status] -= 1 @@ -184,7 +183,7 @@ class BatchJob(object): def update_task_retrying(self, task_idx, reason, tried_times): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) old_status = self.tasks[task_idx]['status'] - if old_status == 'stopped': + if old_status == 'stopping': logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id)) return self.tasks_cnt[old_status] -= 1 @@ -203,9 +202,6 @@ class BatchJob(object): def update_task_failed(self, task_idx, reason, tried_times, running_time, billing): logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) old_status = self.tasks[task_idx]['status'] - if old_status == 'stopped': - logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id)) - return self.tasks_cnt[old_status] -= 1 self.tasks_cnt['failed'] += 1 self.tasks[task_idx]['status'] = 'failed' @@ -220,6 +216,26 @@ class BatchJob(object): self._update_job_status() self.log_status() + @data_lock + def update_task_stopped(self, task_idx, running_time, billing): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) stopped.running_time:%d billing:%d" % (task_idx, self.job_id, int(running_time), billing)) + old_status = self.tasks[task_idx]['status'] + if old_status == 'failed' or old_status == 'finished' or old_status == 'stopped': + logger.info("task(idx:%s) of BatchJob(id:%s) has been done."%(task_idx, self.job_id)) + return False + self.tasks_cnt[old_status] -= 1 + self.tasks_cnt['stopped'] += 1 + self.tasks[task_idx]['status'] = 'stopped' + self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id']) + self.tasks[task_idx]['db'].status = 'stopped' + self.tasks[task_idx]['db'].running_time = running_time + self.tasks[task_idx]['db'].billing = billing + self.job_db = Batchjob.query.get(self.job_id) + self.job_db.billing += billing + self._update_job_status() + self.log_status() + return True + # print status for debuging def log_status(self): task_copy = {} @@ -248,6 +264,7 @@ class JobMgr(): self.auth_key = env.getenv('AUTH_KEY') def charge_beans(self,username,billing): + logger.debug("Charge user(%s) for %d beans"%(username, billing)) data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key} url = "/billing/beans/" return requests.post(self.userpoint+url,data=data).json() @@ -304,7 +321,6 @@ class JobMgr(): taskid = job_id + '_' + task_idx self.taskmgr.lazy_stop_task(taskid) job.stop_job() - del self.job_map[job_id] except Exception as err: logger.error(traceback.format_exc()) #logger.error(err) @@ -314,11 +330,11 @@ class JobMgr(): # user: username # list a user's all job def list_jobs(self,user): - alljobs = Batchjob.query.filter_by(username=user) + alljobs = Batchjob.query.filter_by(username=user).all() res = [] for job in alljobs: jobdata = json.loads(str(job)) - tasks = job.tasks + tasks = job.tasks.all() jobdata['tasks'] = [t.idx for t in tasks] tasks_vnodeCount = {} for t in tasks: @@ -372,9 +388,12 @@ class JobMgr(): if len(split_task_name) != 2: logger.error("[jobmgr report]Illegal task_name(%s) report from taskmgr" % task_name) return + if billing > 0 and (status == 'failed' or status == 'finished'): + self.charge_beans(user, billing) job_id, task_idx = split_task_name if job_id not in self.job_map.keys(): logger.error("[jobmgr report]jobid(%s) does not exist. task_name(%s)" % (job_id,task_name)) + #TODO: update data in db return job = self.job_map[job_id] if status == "running": @@ -383,16 +402,17 @@ class JobMgr(): #logger.debug(str(job.job_db)) elif status == "finished": #logger.debug(str(job.job_db)) - self.charge_beans(user, billing) next_tasks = job.finish_task(task_idx, running_time, billing) ret = self.add_task_taskmgr(user, next_tasks) #logger.debug(str(job.job_db)) elif status == "retrying": job.update_task_retrying(task_idx, reason, tried_times) elif status == "failed": - self.charge_beans(user, billing) job.update_task_failed(task_idx, reason, tried_times, running_time, billing) - if job.job_db.status == 'done' or job.job_db.status == 'failed': + elif status == "stopped": + if job.update_task_stopped(task_idx, running_time, billing) and billing > 0: + self.charge_beans(user, billing) + if job.job_db.status == 'done' or job.job_db.status == 'failed' or job.job_db.status == 'stopped': del self.job_map[job_id] # Get Batch job stdout or stderr from its file diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 17355d5..6428355 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -228,6 +228,10 @@ class TaskMgr(threading.Thread): if task.id in self.lazy_stop_list: self.stop_remove_task(task) self.lazy_delete_list.append(task) + running_time, billing = task.get_billing() + self.logger.info('task %s stopped, running_time:%s billing:%d'%(task.id, str(running_time), billing)) + running_time = math.ceil(running_time) + self.jobmgr.report(task.username, task.id,'stopped',running_time=running_time,billing=billing) while self.lazy_delete_list: task = self.lazy_delete_list.pop(0) @@ -236,7 +240,14 @@ class TaskMgr(threading.Thread): except Exception as err: self.logger.warning(str(err)) - self.lazy_append_list = [t for t in self.lazy_append_list if t.id not in self.lazy_stop_list] + new_append_list = [] + for task in self.lazy_append_list: + if task.id in self.lazy_stop_list: + self.jobmgr.report(task.username, task.id, 'stopped') + else: + new_append_list.append(task) + + self.lazy_append_list = new_append_list self.lazy_stop_list.clear() if self.lazy_append_list: self.task_queue.extend(self.lazy_append_list) diff --git a/src/utils/model.py b/src/utils/model.py index 098d95c..8eca040 100755 --- a/src/utils/model.py +++ b/src/utils/model.py @@ -44,7 +44,7 @@ app.config['SQLALCHEMY_BINDS'] = { 'history': 'sqlite:///'+fsdir+'/global/sys/HistoryTable.db', 'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db', 'system': 'sqlite:///'+fsdir+'/global/sys/System.db', - 'batch':'sqlite:///'+fsdir+'/global/sys/Batch.db', + 'batch':'sqlite:///'+fsdir+'/global/sys/Batch.db?check_same_thread=False', 'login': 'sqlite:///'+fsdir+'/global/sys/Login.db' } app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True