From 05917e1fb530b454545cc29fa2d0b09c9b30dd8b Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Sun, 13 Jan 2019 20:48:44 +0800 Subject: [PATCH 1/5] use only number to identity task --- src/master/httprest.py | 10 +++++----- src/master/testTaskCtrler.py | 10 ++++++---- web/templates/batch/batch_create.html | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/master/httprest.py b/src/master/httprest.py index abbdb10..f328c31 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -808,12 +808,12 @@ def add_job(user,beans,form): key_arr = key.split('_') value = job_data[key] if key_arr[0] == 'srcAddr' and value == '': - task_idx = 'task_' + key_arr[1] + #task_idx = 'task_' + key_arr[1] if task_idx in job_info['tasks']: - job_info['tasks'][task_idx]['srcAddr'] = '/root/nfs' + job_info['tasks'][task_idx]['srcAddr'] = '/root' else: job_info['tasks'][task_idx] = { - 'srcAddr': '/root/nfs/' + 'srcAddr': '/root' } elif key_arr[0] != 'dependency'and value == '': message['success'] = 'false' @@ -822,7 +822,7 @@ def add_job(user,beans,form): job_info[key_arr[0]] = value elif len(key_arr) == 2: key_prefix, task_idx = key_arr[0], key_arr[1] - task_idx = 'task_' + task_idx + #task_idx = 'task_' + task_idx if task_idx in job_info["tasks"]: job_info["tasks"][task_idx][key_prefix] = value else: @@ -832,7 +832,7 @@ def add_job(user,beans,form): job_info["tasks"][task_idx] = tmp_dict elif len(key_arr) == 3: key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2] - task_idx = 'task_' + task_idx + #task_idx = 'task_' + task_idx mapping_idx = 'mapping_' + mapping_idx if task_idx in job_info["tasks"]: if "mapping" in job_info["tasks"][task_idx]: diff --git a/src/master/testTaskCtrler.py b/src/master/testTaskCtrler.py index ba6b947..5427183 100644 --- a/src/master/testTaskCtrler.py +++ b/src/master/testTaskCtrler.py @@ -5,20 +5,21 @@ if sys.path[0].endswith("master"): import grpc,time from protos import rpc_pb2, rpc_pb2_grpc +import random, string def run(): channel = grpc.insecure_channel('localhost:50051') stub = rpc_pb2_grpc.WorkerStub(channel) - comm = rpc_pb2.Command(commandLine="ls /root/oss/test-for-docklet", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' - paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/oss/test-for-docklet/", stdoutRedirectPath="/root/oss/test-for-docklet/") + comm = rpc_pb2.Command(commandLine="ls /root;sleep 5;ls /root", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' + paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/batch_{jobid}/", stdoutRedirectPath="/root/nfs/batch_{jobid}/") img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet") inst = rpc_pb2.Instance(cpu=1, memory=1000, disk=1000, gpu=0) mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="") - clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt]) + clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[]) - task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token="test") + task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token=''.join(random.sample(string.ascii_letters + string.digits, 8))) response = stub.process_task(task) print("Batch client received: " + str(response.status)+" "+response.message) @@ -34,6 +35,7 @@ def stop_task(): print("Batch client received: " + str(response.status)+" "+response.message) if __name__ == '__main__': + #for i in range(10): run() #time.sleep(4) #stop_task() diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index 532a1d1..bd9fa71 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -177,7 +177,7 @@ +'
' +'
' +'
' - +'' + +'' +'
' +'
' +'' From f39bb42dc776ce2ab867dfee07a2b7a375567472 Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Sun, 13 Jan 2019 22:51:01 +0800 Subject: [PATCH 2/5] Update jobmgr to support a new policy of scheduling --- src/master/jobmgr.py | 152 ++++++++++++++++++++++++++++++------------- 1 file changed, 105 insertions(+), 47 deletions(-) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 5eb573d..b19be81 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -1,6 +1,7 @@ import time, threading, random, string, os, traceback import master.monitor import subprocess +from functools import wraps from utils.log import initlogging, logger from utils import env @@ -9,65 +10,122 @@ class BatchJob(object): def __init__(self, user, job_info): self.user = user self.raw_job_info = job_info - self.task_queue = [] - self.task_finished = [] self.job_id = None self.job_name = job_info['jobName'] self.job_priority = int(job_info['jobPriority']) self.status = 'pending' self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) - self.top_sort() + self.lock = threading.Lock() + self.tasks = {} + self.dependency_out = {} + self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'error':0, 'failed':0, 'finished':0} + #self.top_sort() - # transfer the dependency graph into a job queue - def top_sort(self): - logger.debug('top sorting') - tasks = self.raw_job_info["tasks"] - dependency_graph = {} - for task_idx in tasks: - dependency_graph[task_idx] = set() - task_info = tasks[task_idx] + #init self.tasks & self.dependency_out & self.tasks_cnt + raw_tasks = self.raw_job_info["tasks"] + self.tasks_cnt['pending'] = len(raw_tasks.keys()) + for task_idx in raw_tasks.keys(): + task_info = raw_tasks[task_idx] + self.tasks[task_idx] = {} + self.tasks[task_idx]['config'] = task_info + self.tasks[task_idx]['status'] = 'pending' + self.tasks[task_idx]['dependency'] = [] dependency = task_info['dependency'].strip().replace(' ', '').split(',') if len(dependency) == 1 and dependency[0] == '': continue - for t in dependency: - if not t in tasks: + for d in dependency: + if not d in raw_tasks.keys(): raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx)) - dependency_graph[task_idx].add(t) - while len(dependency_graph) > 0: - s = set() - flag = False - for task_idx in dependency_graph: - if len(dependency_graph[task_idx]) == 0: - flag = True - s.add(task_idx) - for task_idx in s: - dependency_graph.pop(task_idx) - #there is a circle in the graph - if not flag: - raise ValueError('there is a circle in the dependency graph') - break - for task_idx in dependency_graph: - for t in s: - if t in dependency_graph[task_idx]: - dependency_graph[task_idx].remove(t) - self.task_queue.append({ - 'task_idx': s, - 'status': 'pending' - }) + self.tasks[task_idx]['dependency'].append(d) + if not d in self.dependency_out.keys(): + self.dependency_out[d] = [] + self.dependency_out[d].append(task_idx) - # get a task and pass it to taskmgr - def get_task(self): - for task in self.task_queue: - if task['status'] == 'pending': - task_idx = task['task_idx'].pop() - task['status'] = 'running' + def data_lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): + self.lock.acquire() + try: + result = f(self, *args, **kwargs) + except Exception as err: + self.lock.release() + raise err + self.lock.release() + return result + return new_f + + # return the tasks without dependencies + @data_lock + def get_tasks_no_dependency(self,update_status=False): + ret_tasks = [] + for task_idx in self.tasks.keys(): + if (self.tasks[task_idx]['status'] = 'pending' and + len(self.tasks[task_idx]['dependency']) == 0): + if update_status: + self.tasks_cnt['pending'] -= 1 + self.tasks_cnt['scheduling'] += 1 + self.tasks[task_idx]['status'] = 'scheduling' task_name = self.user + '_' + self.job_id + '_' + task_idx - return task_name, self.raw_job_info["tasks"][task_idx] - return '', None + ret_tasks.append([task_name, self.tasks[task_idx]['config']]) + return ret_tasks - # a task has finished + # update status of this job based + def _update_job_status(self): + allcnt = len(self.tasks.keys()) + if self.tasks_cnt['failed'] != 0: + self.status = 'failed' + elif self.tasks_cnt['running'] != 0: + self.status = 'running' + elif self.tasks_cnt['finished'] == allcnt: + self.status = 'done' + else: + self.status = 'pending' + + # start run a task, update status + @data_lock + def update_task_running(self, task_idx): + old_status = self.tasks[task_idx]['status'].split('(')[0] + self.tasks_cnt[old_status] -= 1 + self.tasks[task_idx]['status'] = 'running' + self.tasks_cnt['running'] += 1 + self._update_job_status() + + # a task has finished, update dependency and return tasks without dependencies + @data_lock def finish_task(self, task_idx): - pass + if task_idx not in self.tasks.keys(): + logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) + return [] + old_status = self.tasks[task_idx]['status'].split('(')[0] + self.tasks_cnt[old_status] -= 1 + self.tasks[task_idx]['status'] = 'finished' + self.tasks_cnt['finished'] += 1 + self._update_job_status() + if task_idx not in self.dependency_out.keys(): + return [] + ret_tasks = [] + for out_idx in self.dependency_out[task_idx]: + self.tasks[out_idx]['dependency'].remove(task_idx) + if (self.tasks[out_idx]['status'] == 'pending' and + len(self.tasks[out_idx]['dependency']) == 0): + self.tasks_cnt['pending'] -= 1 + self.tasks_cnt['scheduling'] += 1 + self.tasks[out_idx]['status'] = 'scheduling' + task_name = self.user + '_' + self.job_id + '_' + out_idx + ret_tasks.append([task_name, self.tasks[out_idx]['config']]) + return ret_tasks + + # update error status of task + @data_lock + def update_task_error(self, task_idx, tried_times, try_out=False): + old_status = self.tasks[task_idx]['status'].split('(')[0] + self.tasks_cnt[old_status] -= 1 + self.tasks[task_idx]['status'] = 'error(tried %d times)' % int(tried_times) + if try_out: + self.tasks_cnt['failed'] += 1 + else: + self.tasks_cnt['error'] += 1 + self._update_job_status() class JobMgr(threading.Thread): # load job information from etcd @@ -159,8 +217,8 @@ class JobMgr(threading.Thread): if self.job_processor(job): job.status = 'running' break - else: - job.status = 'done' + #else: + #job.status = 'done' # a task has finished def report(self, task): From 6f72efda63939ce81aef1e613a98eb1fea4253d6 Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Wed, 16 Jan 2019 17:30:06 +0800 Subject: [PATCH 3/5] update add_job & report of jobmgr --- src/master/jobmgr.py | 92 ++++++++++++++++++++++++-------------------- 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index b19be81..7f80034 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -18,10 +18,10 @@ class BatchJob(object): self.lock = threading.Lock() self.tasks = {} self.dependency_out = {} - self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'error':0, 'failed':0, 'finished':0} - #self.top_sort() + self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} #init self.tasks & self.dependency_out & self.tasks_cnt + logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time)) raw_tasks = self.raw_job_info["tasks"] self.tasks_cnt['pending'] = len(raw_tasks.keys()) for task_idx in raw_tasks.keys(): @@ -57,6 +57,7 @@ class BatchJob(object): # return the tasks without dependencies @data_lock def get_tasks_no_dependency(self,update_status=False): + logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id) ret_tasks = [] for task_idx in self.tasks.keys(): if (self.tasks[task_idx]['status'] = 'pending' and @@ -84,6 +85,7 @@ class BatchJob(object): # start run a task, update status @data_lock def update_task_running(self, task_idx): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id)) old_status = self.tasks[task_idx]['status'].split('(')[0] self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'running' @@ -96,6 +98,7 @@ class BatchJob(object): if task_idx not in self.tasks.keys(): logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) return [] + logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id)) old_status = self.tasks[task_idx]['status'].split('(')[0] self.tasks_cnt[old_status] -= 1 self.tasks[task_idx]['status'] = 'finished' @@ -115,55 +118,51 @@ class BatchJob(object): ret_tasks.append([task_name, self.tasks[out_idx]['config']]) return ret_tasks - # update error status of task + # update retrying status of task @data_lock - def update_task_error(self, task_idx, tried_times, try_out=False): + def update_task_retrying(self, task_idx, tried_times, try_out=False): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying." % (task_idx, self.job_id)) old_status = self.tasks[task_idx]['status'].split('(')[0] self.tasks_cnt[old_status] -= 1 - self.tasks[task_idx]['status'] = 'error(tried %d times)' % int(tried_times) if try_out: self.tasks_cnt['failed'] += 1 + self.tasks[task_idx]['status'] = 'failed(tried %d times)' % int(tried_times) else: - self.tasks_cnt['error'] += 1 + self.tasks_cnt['retrying'] += 1 + self.tasks[task_idx]['status'] = 'retrying(%d times)' % int(tried_times) self._update_job_status() -class JobMgr(threading.Thread): +class JobMgr(): # load job information from etcd # initial a job queue and job schedueler def __init__(self, taskmgr): - threading.Thread.__init__(self) self.job_queue = [] self.job_map = {} self.taskmgr = taskmgr self.fspath = env.getenv('FS_PREFIX') - def run(self): - while True: - self.job_scheduler() - time.sleep(2) - # user: username - # job_data: a json string + # job_info: a json string # user submit a new job, add this job to queue and database def add_job(self, user, job_info): try: job = BatchJob(user, job_info) job.job_id = self.gen_jobid() - self.job_queue.append(job.job_id) self.job_map[job.job_id] = job + self.process_job(job) except ValueError as err: logger.error(err) return [False, err.args[0]] except Exception as err: + logger.error(err) return [False, err.args[0]] - finally: - return [True, "add batch job success"] + return [True, "add batch job success"] # user: username # list a user's all job def list_jobs(self,user): res = [] - for job_id in self.job_queue: + for job_id in self.job_map.keys(): job = self.job_map[job_id] logger.debug('job_id: %s, user: %s' % (job_id, job.user)) if job.user == user: @@ -190,7 +189,7 @@ class JobMgr(threading.Thread): # check if a job exists def is_job_exist(self, job_id): - return job_id in self.job_queue + return job_id in self.job_map.keys() # generate a random job id def gen_jobid(self): @@ -199,31 +198,42 @@ class JobMgr(threading.Thread): job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) return job_id - # this is a thread to process a job - def job_processor(self, job): - task_name, task_info = job.get_task() - if not task_info: - return False - else: - task_priority = job.job_priority - self.taskmgr.add_task(job.user, task_name, task_info, task_priority) - return True + # add tasks into taskmgr's queue + def add_task_taskmgr(self,tasks): + for task_name, task_info in tasks: + if not task_info: + logger.error("task_info does not exist! task_name(%s)" % task_name) + return False + else: + task_priority = job.job_priority + logger.debug("Add task(name:%s) with priority(%s) to taskmgr's queue." % (task_name, task_priority) ) + self.taskmgr.add_task(job.user, task_name, task_info, task_priority) + return True - # this is a thread to schedule the jobs - def job_scheduler(self): - # choose a job from queue, create a job processor for it - for job_id in self.job_queue: - job = self.job_map[job_id] - if self.job_processor(job): - job.status = 'running' - break - #else: - #job.status = 'done' + # to process a job, add tasks without dependencies of the job into taskmgr + def process_job(self, job): + tasks = job.get_tasks_no_dependency(True) + return add_task_taskmgr(tasks) - # a task has finished - def report(self, task): - pass + # report task status from taskmgr when running, failed and finished + def report(self, task_name, status): + split_task_name = task_name.split('_') + if len(split_task_name) != 3: + logger.error("Illegal task_name(%s) report from taskmgr" % task_name) + return + user, job_id, task_idx = split_task_name + job = self.job_map[job_id] + if status == "running": + job.update_task_running(task_idx) + elif status == "failed": + pass # TODO + elif status == "finished": + next_tasks = job.finish_task(task_idx) + if len(next_tasks) == 0: + return + ret = add_task_taskmgr(next_tasks) + # Get Batch job stdout or stderr from its file def get_output(self, username, jobid, taskid, instid, issue): filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt" fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename) From 9792f1c2e2d3b325f7920bb93e72a6f760128c22 Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Thu, 17 Jan 2019 17:08:12 +0800 Subject: [PATCH 4/5] Fix some bugs & make jobmgr run on new scheduler policy --- src/master/httprest.py | 1 - src/master/jobmgr.py | 52 ++++++++++++++++++++++++++++++------------ src/master/taskmgr.py | 5 ++-- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/src/master/httprest.py b/src/master/httprest.py index f328c31..d71c974 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -1103,6 +1103,5 @@ if __name__ == '__main__': G_jobmgr = jobmgr.JobMgr(G_taskmgr) G_taskmgr.set_jobmgr(G_jobmgr) G_taskmgr.start() - G_jobmgr.start() app.run(host = masterip, port = masterport, threaded=True) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 7f80034..484fcce 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -1,6 +1,6 @@ import time, threading, random, string, os, traceback import master.monitor -import subprocess +import subprocess,json from functools import wraps from utils.log import initlogging, logger @@ -35,12 +35,14 @@ class BatchJob(object): continue for d in dependency: if not d in raw_tasks.keys(): - raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx)) + raise ValueError('task %s is not defined in the dependency of task %s' % (d, task_idx)) self.tasks[task_idx]['dependency'].append(d) if not d in self.dependency_out.keys(): self.dependency_out[d] = [] self.dependency_out[d].append(task_idx) + self.log_status() + def data_lock(f): @wraps(f) def new_f(self, *args, **kwargs): @@ -60,14 +62,15 @@ class BatchJob(object): logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id) ret_tasks = [] for task_idx in self.tasks.keys(): - if (self.tasks[task_idx]['status'] = 'pending' and + if (self.tasks[task_idx]['status'] == 'pending' and len(self.tasks[task_idx]['dependency']) == 0): if update_status: self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[task_idx]['status'] = 'scheduling' task_name = self.user + '_' + self.job_id + '_' + task_idx - ret_tasks.append([task_name, self.tasks[task_idx]['config']]) + ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority]) + self.log_status() return ret_tasks # update status of this job based @@ -91,6 +94,7 @@ class BatchJob(object): self.tasks[task_idx]['status'] = 'running' self.tasks_cnt['running'] += 1 self._update_job_status() + self.log_status() # a task has finished, update dependency and return tasks without dependencies @data_lock @@ -105,17 +109,23 @@ class BatchJob(object): self.tasks_cnt['finished'] += 1 self._update_job_status() if task_idx not in self.dependency_out.keys(): + self.log_status() return [] ret_tasks = [] for out_idx in self.dependency_out[task_idx]: - self.tasks[out_idx]['dependency'].remove(task_idx) + try: + self.tasks[out_idx]['dependency'].remove(task_idx) + except Exception as err: + logger.warning(traceback.format_exc()) + continue if (self.tasks[out_idx]['status'] == 'pending' and len(self.tasks[out_idx]['dependency']) == 0): self.tasks_cnt['pending'] -= 1 self.tasks_cnt['scheduling'] += 1 self.tasks[out_idx]['status'] = 'scheduling' task_name = self.user + '_' + self.job_id + '_' + out_idx - ret_tasks.append([task_name, self.tasks[out_idx]['config']]) + ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) + self.log_status() return ret_tasks # update retrying status of task @@ -131,6 +141,18 @@ class BatchJob(object): self.tasks_cnt['retrying'] += 1 self.tasks[task_idx]['status'] = 'retrying(%d times)' % int(tried_times) self._update_job_status() + self.log_status() + + # print status for debuging + def log_status(self): + task_copy = {} + for task_idx in self.tasks.keys(): + task_copy[task_idx] = {} + task_copy[task_idx]['status'] = self.tasks[task_idx]['status'] + task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency'] + logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3))) + logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3))) + logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status)) class JobMgr(): # load job information from etcd @@ -154,7 +176,8 @@ class JobMgr(): logger.error(err) return [False, err.args[0]] except Exception as err: - logger.error(err) + logger.error(traceback.format_exc()) + #logger.error(err) return [False, err.args[0]] return [True, "add batch job success"] @@ -199,23 +222,24 @@ class JobMgr(): return job_id # add tasks into taskmgr's queue - def add_task_taskmgr(self,tasks): - for task_name, task_info in tasks: + def add_task_taskmgr(self, user, tasks): + for task_name, task_info, task_priority in tasks: if not task_info: logger.error("task_info does not exist! task_name(%s)" % task_name) return False else: - task_priority = job.job_priority logger.debug("Add task(name:%s) with priority(%s) to taskmgr's queue." % (task_name, task_priority) ) - self.taskmgr.add_task(job.user, task_name, task_info, task_priority) - return True + self.taskmgr.add_task(user, task_name, task_info, task_priority) + return True # to process a job, add tasks without dependencies of the job into taskmgr def process_job(self, job): tasks = job.get_tasks_no_dependency(True) - return add_task_taskmgr(tasks) + return self.add_task_taskmgr(job.user, tasks) # report task status from taskmgr when running, failed and finished + # task_name: user + '_' + job_id + '_' + task_idx + # status: 'running', 'retrying', 'failed', 'finished' def report(self, task_name, status): split_task_name = task_name.split('_') if len(split_task_name) != 3: @@ -231,7 +255,7 @@ class JobMgr(): next_tasks = job.finish_task(task_idx) if len(next_tasks) == 0: return - ret = add_task_taskmgr(next_tasks) + ret = self.add_task_taskmgr(user, next_tasks) # Get Batch job stdout or stderr from its file def get_output(self, username, jobid, taskid, instid, issue): diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 07936d8..e08f536 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -171,7 +171,7 @@ class TaskMgr(threading.Thread): if self.jobmgr is None: self.logger.error('[task_completed] jobmgr is None!') else: - self.jobmgr.report(task) + self.jobmgr.report(task.info.id,'finished') self.logger.info('task %s completed' % task.info.id) self.lazy_delete_list.append(task) @@ -182,7 +182,7 @@ class TaskMgr(threading.Thread): if self.jobmgr is None: self.logger.error('[task_failed] jobmgr is None!') else: - self.jobmgr.report(task) + self.jobmgr.report(task.info.id,'failed') self.logger.info('task %s failed' % task.info.id) self.lazy_delete_list.append(task) @@ -201,6 +201,7 @@ class TaskMgr(threading.Thread): def task_processor(self, task, instance_id, worker_ip): task.status = RUNNING + self.jobmgr.report(task.info.id,'running') # properties for transaction task.info.instanceid = instance_id From 8313794c7f598bf39b1cc1b5014763d56ba89f12 Mon Sep 17 00:00:00 2001 From: Firmlyzhu Date: Fri, 18 Jan 2019 17:40:25 +0800 Subject: [PATCH 5/5] Update jobmgr & taskmgr to support failure statuses updating. --- src/master/jobmgr.py | 40 ++++++++++++++++++++++++++++------------ src/master/taskmgr.py | 20 ++++++++++++++++---- 2 files changed, 44 insertions(+), 16 deletions(-) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 484fcce..7962223 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -42,6 +42,7 @@ class BatchJob(object): self.dependency_out[d].append(task_idx) self.log_status() + logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3))) def data_lock(f): @wraps(f) @@ -130,16 +131,26 @@ class BatchJob(object): # update retrying status of task @data_lock - def update_task_retrying(self, task_idx, tried_times, try_out=False): - logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying." % (task_idx, self.job_id)) + def update_task_retrying(self, task_idx, reason, tried_times): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) old_status = self.tasks[task_idx]['status'].split('(')[0] self.tasks_cnt[old_status] -= 1 - if try_out: - self.tasks_cnt['failed'] += 1 - self.tasks[task_idx]['status'] = 'failed(tried %d times)' % int(tried_times) + self.tasks_cnt['retrying'] += 1 + self.tasks[task_idx]['status'] = 'retrying(%s)(%d times)' % (reason, int(tried_times)) + self._update_job_status() + self.log_status() + + # update failed status of task + @data_lock + def update_task_failed(self, task_idx, reason, tried_times): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) + old_status = self.tasks[task_idx]['status'].split('(')[0] + self.tasks_cnt[old_status] -= 1 + self.tasks_cnt['failed'] += 1 + if reason == "OUTPUTERROR": + self.tasks[task_idx]['status'] = 'failed(OUTPUTERROR)' else: - self.tasks_cnt['retrying'] += 1 - self.tasks[task_idx]['status'] = 'retrying(%d times)' % int(tried_times) + self.tasks[task_idx]['status'] = 'failed(%s)(%d times)' % (reason, int(tried_times)) self._update_job_status() self.log_status() @@ -151,9 +162,10 @@ class BatchJob(object): task_copy[task_idx]['status'] = self.tasks[task_idx]['status'] task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency'] logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3))) - logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3))) + logger.debug("BatchJob(id:%s) tasks_cnt: %s" % (self.job_id, self.tasks_cnt)) logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status)) + class JobMgr(): # load job information from etcd # initial a job queue and job schedueler @@ -239,8 +251,10 @@ class JobMgr(): # report task status from taskmgr when running, failed and finished # task_name: user + '_' + job_id + '_' + task_idx - # status: 'running', 'retrying', 'failed', 'finished' - def report(self, task_name, status): + # status: 'running', 'finished', 'retrying', 'failed' + # reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR" + # tried_times: how many times the task has been tried. + def report(self, task_name, status, reason="", tried_times=1): split_task_name = task_name.split('_') if len(split_task_name) != 3: logger.error("Illegal task_name(%s) report from taskmgr" % task_name) @@ -249,13 +263,15 @@ class JobMgr(): job = self.job_map[job_id] if status == "running": job.update_task_running(task_idx) - elif status == "failed": - pass # TODO elif status == "finished": next_tasks = job.finish_task(task_idx) if len(next_tasks) == 0: return ret = self.add_task_taskmgr(user, next_tasks) + elif status == "retrying": + job.update_task_retrying(task_idx, reason, tried_times) + elif status == "failed": + job.update_task_failed(task_idx, reason, tried_times) # Get Batch job stdout or stderr from its file def get_output(self, username, jobid, taskid, instid, issue): diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index e08f536..91aa3e0 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -139,20 +139,26 @@ class TaskMgr(threading.Thread): elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT: if instance['try_count'] > task.info.maxRetryCount: self.check_task_completed(task) + else: + reason = 'FAILED' if report.instanceStatus == FAILED else 'TIMEOUT' + self.task_retrying(task, reason, instance['try_count']) elif report.instanceStatus == OUTPUTERROR: - self.task_failed(task) + self.task_failed(task,"OUTPUTERROR") def check_task_completed(self, task): if len(task.instance_list) < task.info.instanceCount: return failed = False + reason = "FAILED" for instance in task.instance_list: if instance['status'] == RUNNING or instance['status'] == WAITING: return if instance['status'] == FAILED or instance['status'] == TIMEOUT: if instance['try_count'] > task.info.maxRetryCount: failed = True + if instance['status'] == TIMEOUT: + reason = "TIMEOUT" else: return if instance['status'] == OUTPUTERROR: @@ -160,7 +166,7 @@ class TaskMgr(threading.Thread): break if failed: - self.task_failed(task) + self.task_failed(task,reason) else: self.task_completed(task) @@ -176,16 +182,22 @@ class TaskMgr(threading.Thread): self.lazy_delete_list.append(task) - def task_failed(self, task): + def task_failed(self, task, reason): task.status = FAILED if self.jobmgr is None: self.logger.error('[task_failed] jobmgr is None!') else: - self.jobmgr.report(task.info.id,'failed') + self.jobmgr.report(task.info.id,'failed', reason, task.info.maxRetryCount+1) self.logger.info('task %s failed' % task.info.id) self.lazy_delete_list.append(task) + def task_retrying(self, task, reason, tried_times): + if self.jobmgr is None: + self.logger.error('[task_retrying] jobmgr is None!') + else: + self.jobmgr.report(task.info.id,'retrying',reason,tried_times) + @queue_lock def sort_out_task_queue(self):