diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index b19be81..7f80034 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -18,10 +18,10 @@ class BatchJob(object): self.lock = threading.Lock() self.tasks = {} self.dependency_out = {} - self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'error':0, 'failed':0, 'finished':0} - #self.top_sort() + self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} #init self.tasks & self.dependency_out & self.tasks_cnt + logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time)) raw_tasks = self.raw_job_info["tasks"] self.tasks_cnt['pending'] = len(raw_tasks.keys()) for task_idx in raw_tasks.keys(): @@ -57,6 +57,7 @@ class BatchJob(object): # return the tasks without dependencies @data_lock def get_tasks_no_dependency(self,update_status=False): + logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id) ret_tasks = [] for task_idx in self.tasks.keys(): if (self.tasks[task_idx]['status'] = 'pending' and @@ -84,6 +85,7 @@ class BatchJob(object): # start run a task, update status @data_lock def update_task_running(self, task_idx): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id)) old_status = self.tasks[task_idx]['status'].split('(')[0] self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'running' @@ -96,6 +98,7 @@ class BatchJob(object): if task_idx not in self.tasks.keys(): logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) return [] + logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id)) old_status = self.tasks[task_idx]['status'].split('(')[0] self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'finished' @@ -115,55 +118,51 @@ class BatchJob(object): ret_tasks.append([task_name, self.tasks[out_idx]['config']]) return ret_tasks - # update error status of task + # update retrying status of task @data_lock - def update_task_error(self, task_idx, tried_times, try_out=False): + def update_task_retrying(self, task_idx, tried_times, try_out=False): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying." % (task_idx, self.job_id)) old_status = self.tasks[task_idx]['status'].split('(')[0] self.tasks_cnt[old_status] -= 1 - self.tasks[task_idx]['status'] = 'error(tried %d times)' % int(tried_times) if try_out: self.tasks_cnt['failed'] += 1 + self.tasks[task_idx]['status'] = 'failed(tried %d times)' % int(tried_times) else: - self.tasks_cnt['error'] += 1 + self.tasks_cnt['retrying'] += 1 + self.tasks[task_idx]['status'] = 'retrying(%d times)' % int(tried_times) self._update_job_status() -class JobMgr(threading.Thread): +class JobMgr(): # load job information from etcd # initial a job queue and job schedueler def __init__(self, taskmgr): - threading.Thread.__init__(self) self.job_queue = [] self.job_map = {} self.taskmgr = taskmgr self.fspath = env.getenv('FS_PREFIX') - def run(self): - while True: - self.job_scheduler() - time.sleep(2) - # user: username - # job_data: a json string + # job_info: a json string # user submit a new job, add this job to queue and database def add_job(self, user, job_info): try: job = BatchJob(user, job_info) job.job_id = self.gen_jobid() - self.job_queue.append(job.job_id) self.job_map[job.job_id] = job + self.process_job(job) except ValueError as err: logger.error(err) return [False, err.args[0]] except Exception as err: + logger.error(err) return [False, err.args[0]] - finally: - return [True, "add batch job success"] + return [True, "add batch job success"] # user: username # list a user's all job def list_jobs(self,user): res = [] - for job_id in self.job_queue: + for job_id in self.job_map.keys(): job = self.job_map[job_id] logger.debug('job_id: %s, user: %s' % (job_id, job.user)) if job.user == user: @@ -190,7 +189,7 @@ class JobMgr(threading.Thread): # check if a job exists def is_job_exist(self, job_id): - return job_id in self.job_queue + return job_id in self.job_map.keys() # generate a random job id def gen_jobid(self): @@ -199,31 +198,42 @@ class JobMgr(threading.Thread): job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) return job_id - # this is a thread to process a job - def job_processor(self, job): - task_name, task_info = job.get_task() - if not task_info: - return False - else: - task_priority = job.job_priority - self.taskmgr.add_task(job.user, task_name, task_info, task_priority) - return True + # add tasks into taskmgr's queue + def add_task_taskmgr(self,tasks): + for task_name, task_info in tasks: + if not task_info: + logger.error("task_info does not exist! task_name(%s)" % task_name) + return False + else: + task_priority = job.job_priority + logger.debug("Add task(name:%s) with priority(%s) to taskmgr's queue." % (task_name, task_priority) ) + self.taskmgr.add_task(job.user, task_name, task_info, task_priority) + return True - # this is a thread to schedule the jobs - def job_scheduler(self): - # choose a job from queue, create a job processor for it - for job_id in self.job_queue: - job = self.job_map[job_id] - if self.job_processor(job): - job.status = 'running' - break - #else: - #job.status = 'done' + # to process a job, add tasks without dependencies of the job into taskmgr + def process_job(self, job): + tasks = job.get_tasks_no_dependency(True) + return add_task_taskmgr(tasks) - # a task has finished - def report(self, task): - pass + # report task status from taskmgr when running, failed and finished + def report(self, task_name, status): + split_task_name = task_name.split('_') + if len(split_task_name) != 3: + logger.error("Illegal task_name(%s) report from taskmgr" % task_name) + return + user, job_id, task_idx = split_task_name + job = self.job_map[job_id] + if status == "running": + job.update_task_running(task_idx) + elif status == "failed": + pass # TODO + elif status == "finished": + next_tasks = job.finish_task(task_idx) + if len(next_tasks) == 0: + return + ret = add_task_taskmgr(next_tasks) + # Get Batch job stdout or stderr from its file def get_output(self, username, jobid, taskid, instid, issue): filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt" fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename)