Recover jobs when restart master
This commit is contained in:
parent
357f280bc8
commit
5478a8f431
|
@ -17,8 +17,19 @@ def db_commit():
|
||||||
raise
|
raise
|
||||||
|
|
||||||
class BatchJob(object):
|
class BatchJob(object):
|
||||||
def __init__(self, jobid, user, job_info):
|
def __init__(self, jobid, user, job_info, old_job_db=None):
|
||||||
self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority']))
|
if old_job_db is None:
|
||||||
|
self.job_db = Batchjob(jobid,user,job_info['jobName'],int(job_info['jobPriority']))
|
||||||
|
else:
|
||||||
|
self.job_db = old_job_db
|
||||||
|
self.job_db.clear()
|
||||||
|
job_info = {}
|
||||||
|
job_info['jobName'] = self.job_db.name
|
||||||
|
job_info['jobPriority'] = self.job_db.priority
|
||||||
|
all_tasks = self.job_db.tasks.all()
|
||||||
|
job_info['tasks'] = {}
|
||||||
|
for t in all_tasks:
|
||||||
|
job_info['tasks'][t.idx] = json.loads(t.config)
|
||||||
self.user = user
|
self.user = user
|
||||||
#self.raw_job_info = job_info
|
#self.raw_job_info = job_info
|
||||||
self.job_id = jobid
|
self.job_id = jobid
|
||||||
|
@ -35,8 +46,12 @@ class BatchJob(object):
|
||||||
self.tasks_cnt['pending'] = len(raw_tasks.keys())
|
self.tasks_cnt['pending'] = len(raw_tasks.keys())
|
||||||
for task_idx in raw_tasks.keys():
|
for task_idx in raw_tasks.keys():
|
||||||
task_info = raw_tasks[task_idx]
|
task_info = raw_tasks[task_idx]
|
||||||
task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info)
|
if old_job_db is None:
|
||||||
self.job_db.tasks.append(task_db)
|
task_db = Batchtask(jobid+"_"+task_idx, task_idx, task_info)
|
||||||
|
self.job_db.tasks.append(task_db)
|
||||||
|
else:
|
||||||
|
task_db = Batchtask.query.get(jobid+"_"+task_idx)
|
||||||
|
task_db.clear()
|
||||||
self.tasks[task_idx] = {}
|
self.tasks[task_idx] = {}
|
||||||
self.tasks[task_idx]['id'] = jobid+"_"+task_idx
|
self.tasks[task_idx]['id'] = jobid+"_"+task_idx
|
||||||
self.tasks[task_idx]['config'] = task_info
|
self.tasks[task_idx]['config'] = task_info
|
||||||
|
@ -54,7 +69,8 @@ class BatchJob(object):
|
||||||
self.dependency_out[d] = []
|
self.dependency_out[d] = []
|
||||||
self.dependency_out[d].append(task_idx)
|
self.dependency_out[d].append(task_idx)
|
||||||
|
|
||||||
db.session.add(self.job_db)
|
if old_job_db is None:
|
||||||
|
db.session.add(self.job_db)
|
||||||
db_commit()
|
db_commit()
|
||||||
|
|
||||||
self.log_status()
|
self.log_status()
|
||||||
|
@ -259,6 +275,7 @@ class JobMgr():
|
||||||
# load job information from etcd
|
# load job information from etcd
|
||||||
# initial a job queue and job schedueler
|
# initial a job queue and job schedueler
|
||||||
def __init__(self, taskmgr):
|
def __init__(self, taskmgr):
|
||||||
|
logger.info("Init jobmgr...")
|
||||||
try:
|
try:
|
||||||
Batchjob.query.all()
|
Batchjob.query.all()
|
||||||
except:
|
except:
|
||||||
|
@ -270,6 +287,22 @@ class JobMgr():
|
||||||
self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT'))
|
self.userpoint = "http://" + env.getenv('USER_IP') + ":" + str(env.getenv('USER_PORT'))
|
||||||
self.auth_key = env.getenv('AUTH_KEY')
|
self.auth_key = env.getenv('AUTH_KEY')
|
||||||
|
|
||||||
|
self.recover_jobs()
|
||||||
|
|
||||||
|
def recover_jobs(self):
|
||||||
|
logger.info("Rerun the unfailed and unfinished jobs...")
|
||||||
|
try:
|
||||||
|
rejobs = Batchjob.query.filter(~Batchjob.status.in_(['done','failed']))
|
||||||
|
rejobs = rejobs.order_by(Batchjob.create_time).all()
|
||||||
|
for rejob in rejobs:
|
||||||
|
logger.info("Rerun job: "+rejob.id)
|
||||||
|
logger.debug(str(rejob))
|
||||||
|
job = BatchJob(rejob.id, rejob.username, None, rejob)
|
||||||
|
self.job_map[job.job_id] = job
|
||||||
|
self.process_job(job)
|
||||||
|
except Exception as err:
|
||||||
|
logger.error(traceback.format_exc())
|
||||||
|
|
||||||
def charge_beans(self,username,billing):
|
def charge_beans(self,username,billing):
|
||||||
logger.debug("Charge user(%s) for %d beans"%(username, billing))
|
logger.debug("Charge user(%s) for %d beans"%(username, billing))
|
||||||
data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key}
|
data = {"owner_name":username,"billing":billing, "auth_key":self.auth_key}
|
||||||
|
|
|
@ -499,13 +499,14 @@ class TaskMgr(threading.Thread):
|
||||||
if sub_task.status != RUNNING:
|
if sub_task.status != RUNNING:
|
||||||
self.logger.error('[on_task_report] receive task report when vnode is not running')
|
self.logger.error('[on_task_report] receive task report when vnode is not running')
|
||||||
|
|
||||||
sub_task.status = report.subTaskStatus
|
#sub_task.status = report.subTaskStatus
|
||||||
sub_task.status_reason = report.errmsg
|
sub_task.status_reason = report.errmsg
|
||||||
sub_task.task_started = False
|
sub_task.task_started = False
|
||||||
|
|
||||||
if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT:
|
if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT:
|
||||||
self.clear_sub_task(sub_task)
|
self.clear_sub_task(sub_task)
|
||||||
sub_task.waiting_for_retry(report.errmsg)
|
sub_task.waiting_for_retry(report.errmsg)
|
||||||
|
self.logger.info('task %s report failed, status %d, subtasks: %s' % (task.id, task.status, str([sub_task.status for sub_task in task.subtask_list])))
|
||||||
if sub_task.status == WAITING:
|
if sub_task.status == WAITING:
|
||||||
self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg)
|
self.jobmgr.report(task.username, task.id, 'retrying', report.errmsg)
|
||||||
elif report.subTaskStatus == OUTPUTERROR:
|
elif report.subTaskStatus == OUTPUTERROR:
|
||||||
|
|
|
@ -461,6 +461,12 @@ class Batchjob(db.Model):
|
||||||
self.end_time = None
|
self.end_time = None
|
||||||
self.billing = 0
|
self.billing = 0
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.status = "pending"
|
||||||
|
self.failed_reason = ""
|
||||||
|
self.end_time = None
|
||||||
|
self.billing = 0
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
info = {}
|
info = {}
|
||||||
info['job_id'] = self.id
|
info['job_id'] = self.id
|
||||||
|
@ -503,6 +509,15 @@ class Batchtask(db.Model):
|
||||||
self.config = json.dumps(config)
|
self.config = json.dumps(config)
|
||||||
self.tried_times = 0
|
self.tried_times = 0
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.status = "pending"
|
||||||
|
self.failed_reason = ""
|
||||||
|
self.start_time = None
|
||||||
|
self.end_time = None
|
||||||
|
self.running_time = 0
|
||||||
|
self.billing = 0
|
||||||
|
self.tried_times = 0
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
info = {}
|
info = {}
|
||||||
info['id'] = self.id
|
info['id'] = self.id
|
||||||
|
|
|
@ -216,6 +216,7 @@
|
||||||
</tbody>
|
</tbody>
|
||||||
</table>
|
</table>
|
||||||
</div>
|
</div>
|
||||||
|
{% if 'mapping' in task['config'].keys() %}
|
||||||
<div class="table-responsive">
|
<div class="table-responsive">
|
||||||
<table class="table table-bordered table-hover">
|
<table class="table table-bordered table-hover">
|
||||||
<thead>
|
<thead>
|
||||||
|
@ -240,6 +241,7 @@
|
||||||
</tbody>
|
</tbody>
|
||||||
</table>
|
</table>
|
||||||
</div>
|
</div>
|
||||||
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
Loading…
Reference in New Issue