update process of stop job action

This commit is contained in:
Firmlyzhu 2019-03-30 17:49:46 +08:00
parent ce295cb041
commit cee574fd5c
3 changed files with 53 additions and 22 deletions

View File

@ -27,7 +27,7 @@ class BatchJob(object):
self.lock = threading.Lock()
self.tasks = {}
self.dependency_out = {}
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0}
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0, 'stopped':0}
#init self.tasks & self.dependency_out & self.tasks_cnt
logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.job_db.username, self.job_db.name, str(self.job_db.create_time)))
@ -95,12 +95,8 @@ class BatchJob(object):
@data_lock
def stop_job(self):
for task_idx in self.tasks.keys():
self.tasks[task_idx]['status'] = 'stopped'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'stopped'
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.status = 'stopped'
self.job_db.status = 'stopping'
db_commit()
# update status of this job based
@ -108,10 +104,13 @@ class BatchJob(object):
allcnt = len(self.tasks.keys())
if self.tasks_cnt['failed'] != 0:
self.job_db.status = 'failed'
elif self.tasks_cnt['running'] != 0:
self.job_db.status = 'running'
elif self.tasks_cnt['finished'] == allcnt:
self.job_db.status = 'done'
elif self.job_db.status == 'stopping':
if self.tasks_cnt['running'] == 0 and self.tasks_cnt['scheduling'] == 0 and self.tasks_cnt['retrying'] == 0:
self.job_db.status = 'stopped'
elif self.tasks_cnt['running'] != 0 or self.tasks_cnt['retrying'] != 0:
self.job_db.status = 'running'
else:
self.job_db.status = 'pending'
db_commit()
@ -121,7 +120,7 @@ class BatchJob(object):
def update_task_running(self, task_idx):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status']
if old_status == 'stopped':
if old_status == 'stopping':
logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id))
return
self.tasks_cnt[old_status] -= 1
@ -141,7 +140,7 @@ class BatchJob(object):
return []
logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished(running_time=%d,billing=%d). Update dependency..." % (task_idx, self.job_id, running_time, billing))
old_status = self.tasks[task_idx]['status']
if old_status == 'stopped':
if old_status == 'stopping':
logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id))
return
self.tasks_cnt[old_status] -= 1
@ -184,7 +183,7 @@ class BatchJob(object):
def update_task_retrying(self, task_idx, reason, tried_times):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
old_status = self.tasks[task_idx]['status']
if old_status == 'stopped':
if old_status == 'stopping':
logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id))
return
self.tasks_cnt[old_status] -= 1
@ -203,9 +202,6 @@ class BatchJob(object):
def update_task_failed(self, task_idx, reason, tried_times, running_time, billing):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
old_status = self.tasks[task_idx]['status']
if old_status == 'stopped':
logger.info("Task(idx:%s) of BatchJob(id:%s) has been stopped."% (task_idx, self.job_id))
return
self.tasks_cnt[old_status] -= 1
self.tasks_cnt['failed'] += 1
self.tasks[task_idx]['status'] = 'failed'
@ -220,6 +216,26 @@ class BatchJob(object):
self._update_job_status()
self.log_status()
@data_lock
def update_task_stopped(self, task_idx, running_time, billing):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) stopped.running_time:%d billing:%d" % (task_idx, self.job_id, int(running_time), billing))
old_status = self.tasks[task_idx]['status']
if old_status == 'failed' or old_status == 'finished' or old_status == 'stopped':
logger.info("task(idx:%s) of BatchJob(id:%s) has been done."%(task_idx, self.job_id))
return False
self.tasks_cnt[old_status] -= 1
self.tasks_cnt['stopped'] += 1
self.tasks[task_idx]['status'] = 'stopped'
self.tasks[task_idx]['db'] = Batchtask.query.get(self.tasks[task_idx]['id'])
self.tasks[task_idx]['db'].status = 'stopped'
self.tasks[task_idx]['db'].running_time = running_time
self.tasks[task_idx]['db'].billing = billing
self.job_db = Batchjob.query.get(self.job_id)
self.job_db.billing += billing
self._update_job_status()
self.log_status()
return True
# print status for debuging
def log_status(self):
task_copy = {}
@ -248,6 +264,7 @@ class JobMgr():
self.auth_key = env.getenv('AUTH_KEY')
def charge_beans(self,username,billing):
logger.debug("Charge user(%s) for %d beans"%(username, billing))
data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key}
url = "/billing/beans/"
return requests.post(self.userpoint+url,data=data).json()
@ -304,7 +321,6 @@ class JobMgr():
taskid = job_id + '_' + task_idx
self.taskmgr.lazy_stop_task(taskid)
job.stop_job()
del self.job_map[job_id]
except Exception as err:
logger.error(traceback.format_exc())
#logger.error(err)
@ -314,11 +330,11 @@ class JobMgr():
# user: username
# list a user's all job
def list_jobs(self,user):
alljobs = Batchjob.query.filter_by(username=user)
alljobs = Batchjob.query.filter_by(username=user).all()
res = []
for job in alljobs:
jobdata = json.loads(str(job))
tasks = job.tasks
tasks = job.tasks.all()
jobdata['tasks'] = [t.idx for t in tasks]
tasks_vnodeCount = {}
for t in tasks:
@ -372,9 +388,12 @@ class JobMgr():
if len(split_task_name) != 2:
logger.error("[jobmgr report]Illegal task_name(%s) report from taskmgr" % task_name)
return
if billing > 0 and (status == 'failed' or status == 'finished'):
self.charge_beans(user, billing)
job_id, task_idx = split_task_name
if job_id not in self.job_map.keys():
logger.error("[jobmgr report]jobid(%s) does not exist. task_name(%s)" % (job_id,task_name))
#TODO: update data in db
return
job = self.job_map[job_id]
if status == "running":
@ -383,16 +402,17 @@ class JobMgr():
#logger.debug(str(job.job_db))
elif status == "finished":
#logger.debug(str(job.job_db))
self.charge_beans(user, billing)
next_tasks = job.finish_task(task_idx, running_time, billing)
ret = self.add_task_taskmgr(user, next_tasks)
#logger.debug(str(job.job_db))
elif status == "retrying":
job.update_task_retrying(task_idx, reason, tried_times)
elif status == "failed":
self.charge_beans(user, billing)
job.update_task_failed(task_idx, reason, tried_times, running_time, billing)
if job.job_db.status == 'done' or job.job_db.status == 'failed':
elif status == "stopped":
if job.update_task_stopped(task_idx, running_time, billing) and billing > 0:
self.charge_beans(user, billing)
if job.job_db.status == 'done' or job.job_db.status == 'failed' or job.job_db.status == 'stopped':
del self.job_map[job_id]
# Get Batch job stdout or stderr from its file

View File

@ -228,6 +228,10 @@ class TaskMgr(threading.Thread):
if task.id in self.lazy_stop_list:
self.stop_remove_task(task)
self.lazy_delete_list.append(task)
running_time, billing = task.get_billing()
self.logger.info('task %s stopped, running_time:%s billing:%d'%(task.id, str(running_time), billing))
running_time = math.ceil(running_time)
self.jobmgr.report(task.username, task.id,'stopped',running_time=running_time,billing=billing)
while self.lazy_delete_list:
task = self.lazy_delete_list.pop(0)
@ -236,7 +240,14 @@ class TaskMgr(threading.Thread):
except Exception as err:
self.logger.warning(str(err))
self.lazy_append_list = [t for t in self.lazy_append_list if t.id not in self.lazy_stop_list]
new_append_list = []
for task in self.lazy_append_list:
if task.id in self.lazy_stop_list:
self.jobmgr.report(task.username, task.id, 'stopped')
else:
new_append_list.append(task)
self.lazy_append_list = new_append_list
self.lazy_stop_list.clear()
if self.lazy_append_list:
self.task_queue.extend(self.lazy_append_list)

View File

@ -44,7 +44,7 @@ app.config['SQLALCHEMY_BINDS'] = {
'history': 'sqlite:///'+fsdir+'/global/sys/HistoryTable.db',
'beansapplication': 'sqlite:///'+fsdir+'/global/sys/BeansApplication.db',
'system': 'sqlite:///'+fsdir+'/global/sys/System.db',
'batch':'sqlite:///'+fsdir+'/global/sys/Batch.db',
'batch':'sqlite:///'+fsdir+'/global/sys/Batch.db?check_same_thread=False',
'login': 'sqlite:///'+fsdir+'/global/sys/Login.db'
}
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True