diff --git a/src/master/httprest.py b/src/master/httprest.py index abbdb10..d71c974 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -808,12 +808,12 @@ def add_job(user,beans,form): key_arr = key.split('_') value = job_data[key] if key_arr[0] == 'srcAddr' and value == '': - task_idx = 'task_' + key_arr[1] + #task_idx = 'task_' + key_arr[1] if task_idx in job_info['tasks']: - job_info['tasks'][task_idx]['srcAddr'] = '/root/nfs' + job_info['tasks'][task_idx]['srcAddr'] = '/root' else: job_info['tasks'][task_idx] = { - 'srcAddr': '/root/nfs/' + 'srcAddr': '/root' } elif key_arr[0] != 'dependency'and value == '': message['success'] = 'false' @@ -822,7 +822,7 @@ def add_job(user,beans,form): job_info[key_arr[0]] = value elif len(key_arr) == 2: key_prefix, task_idx = key_arr[0], key_arr[1] - task_idx = 'task_' + task_idx + #task_idx = 'task_' + task_idx if task_idx in job_info["tasks"]: job_info["tasks"][task_idx][key_prefix] = value else: @@ -832,7 +832,7 @@ def add_job(user,beans,form): job_info["tasks"][task_idx] = tmp_dict elif len(key_arr) == 3: key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2] - task_idx = 'task_' + task_idx + #task_idx = 'task_' + task_idx mapping_idx = 'mapping_' + mapping_idx if task_idx in job_info["tasks"]: if "mapping" in job_info["tasks"][task_idx]: @@ -1103,6 +1103,5 @@ if __name__ == '__main__': G_jobmgr = jobmgr.JobMgr(G_taskmgr) G_taskmgr.set_jobmgr(G_jobmgr) G_taskmgr.start() - G_jobmgr.start() app.run(host = masterip, port = masterport, threaded=True) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 5eb573d..7962223 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -1,6 +1,7 @@ import time, threading, random, string, os, traceback import master.monitor -import subprocess +import subprocess,json +from functools import wraps from utils.log import initlogging, logger from utils import env @@ -9,103 +10,194 @@ class BatchJob(object): def __init__(self, user, job_info): self.user = user self.raw_job_info = job_info - self.task_queue = [] - self.task_finished = [] self.job_id = None self.job_name = job_info['jobName'] self.job_priority = int(job_info['jobPriority']) self.status = 'pending' self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) - self.top_sort() + self.lock = threading.Lock() + self.tasks = {} + self.dependency_out = {} + self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0} - # transfer the dependency graph into a job queue - def top_sort(self): - logger.debug('top sorting') - tasks = self.raw_job_info["tasks"] - dependency_graph = {} - for task_idx in tasks: - dependency_graph[task_idx] = set() - task_info = tasks[task_idx] + #init self.tasks & self.dependency_out & self.tasks_cnt + logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time)) + raw_tasks = self.raw_job_info["tasks"] + self.tasks_cnt['pending'] = len(raw_tasks.keys()) + for task_idx in raw_tasks.keys(): + task_info = raw_tasks[task_idx] + self.tasks[task_idx] = {} + self.tasks[task_idx]['config'] = task_info + self.tasks[task_idx]['status'] = 'pending' + self.tasks[task_idx]['dependency'] = [] dependency = task_info['dependency'].strip().replace(' ', '').split(',') if len(dependency) == 1 and dependency[0] == '': continue - for t in dependency: - if not t in tasks: - raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx)) - dependency_graph[task_idx].add(t) - while len(dependency_graph) > 0: - s = set() - flag = False - for task_idx in dependency_graph: - if len(dependency_graph[task_idx]) == 0: - flag = True - s.add(task_idx) - for task_idx in s: - dependency_graph.pop(task_idx) - #there is a circle in the graph - if not flag: - raise ValueError('there is a circle in the dependency graph') - break - for task_idx in dependency_graph: - for t in s: - if t in dependency_graph[task_idx]: - dependency_graph[task_idx].remove(t) - self.task_queue.append({ - 'task_idx': s, - 'status': 'pending' - }) + for d in dependency: + if not d in raw_tasks.keys(): + raise ValueError('task %s is not defined in the dependency of task %s' % (d, task_idx)) + self.tasks[task_idx]['dependency'].append(d) + if not d in self.dependency_out.keys(): + self.dependency_out[d] = [] + self.dependency_out[d].append(task_idx) - # get a task and pass it to taskmgr - def get_task(self): - for task in self.task_queue: - if task['status'] == 'pending': - task_idx = task['task_idx'].pop() - task['status'] = 'running' + self.log_status() + logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3))) + + def data_lock(f): + @wraps(f) + def new_f(self, *args, **kwargs): + self.lock.acquire() + try: + result = f(self, *args, **kwargs) + except Exception as err: + self.lock.release() + raise err + self.lock.release() + return result + return new_f + + # return the tasks without dependencies + @data_lock + def get_tasks_no_dependency(self,update_status=False): + logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id) + ret_tasks = [] + for task_idx in self.tasks.keys(): + if (self.tasks[task_idx]['status'] == 'pending' and + len(self.tasks[task_idx]['dependency']) == 0): + if update_status: + self.tasks_cnt['pending'] -= 1 + self.tasks_cnt['scheduling'] += 1 + self.tasks[task_idx]['status'] = 'scheduling' task_name = self.user + '_' + self.job_id + '_' + task_idx - return task_name, self.raw_job_info["tasks"][task_idx] - return '', None + ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority]) + self.log_status() + return ret_tasks - # a task has finished + # update status of this job based + def _update_job_status(self): + allcnt = len(self.tasks.keys()) + if self.tasks_cnt['failed'] != 0: + self.status = 'failed' + elif self.tasks_cnt['running'] != 0: + self.status = 'running' + elif self.tasks_cnt['finished'] == allcnt: + self.status = 'done' + else: + self.status = 'pending' + + # start run a task, update status + @data_lock + def update_task_running(self, task_idx): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id)) + old_status = self.tasks[task_idx]['status'].split('(')[0] + self.tasks_cnt[old_status] -= 1 + self.tasks[task_idx]['status'] = 'running' + self.tasks_cnt['running'] += 1 + self._update_job_status() + self.log_status() + + # a task has finished, update dependency and return tasks without dependencies + @data_lock def finish_task(self, task_idx): - pass + if task_idx not in self.tasks.keys(): + logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id)) + return [] + logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id)) + old_status = self.tasks[task_idx]['status'].split('(')[0] + self.tasks_cnt[old_status] -= 1 + self.tasks[task_idx]['status'] = 'finished' + self.tasks_cnt['finished'] += 1 + self._update_job_status() + if task_idx not in self.dependency_out.keys(): + self.log_status() + return [] + ret_tasks = [] + for out_idx in self.dependency_out[task_idx]: + try: + self.tasks[out_idx]['dependency'].remove(task_idx) + except Exception as err: + logger.warning(traceback.format_exc()) + continue + if (self.tasks[out_idx]['status'] == 'pending' and + len(self.tasks[out_idx]['dependency']) == 0): + self.tasks_cnt['pending'] -= 1 + self.tasks_cnt['scheduling'] += 1 + self.tasks[out_idx]['status'] = 'scheduling' + task_name = self.user + '_' + self.job_id + '_' + out_idx + ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority]) + self.log_status() + return ret_tasks -class JobMgr(threading.Thread): + # update retrying status of task + @data_lock + def update_task_retrying(self, task_idx, reason, tried_times): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) + old_status = self.tasks[task_idx]['status'].split('(')[0] + self.tasks_cnt[old_status] -= 1 + self.tasks_cnt['retrying'] += 1 + self.tasks[task_idx]['status'] = 'retrying(%s)(%d times)' % (reason, int(tried_times)) + self._update_job_status() + self.log_status() + + # update failed status of task + @data_lock + def update_task_failed(self, task_idx, reason, tried_times): + logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times))) + old_status = self.tasks[task_idx]['status'].split('(')[0] + self.tasks_cnt[old_status] -= 1 + self.tasks_cnt['failed'] += 1 + if reason == "OUTPUTERROR": + self.tasks[task_idx]['status'] = 'failed(OUTPUTERROR)' + else: + self.tasks[task_idx]['status'] = 'failed(%s)(%d times)' % (reason, int(tried_times)) + self._update_job_status() + self.log_status() + + # print status for debuging + def log_status(self): + task_copy = {} + for task_idx in self.tasks.keys(): + task_copy[task_idx] = {} + task_copy[task_idx]['status'] = self.tasks[task_idx]['status'] + task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency'] + logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3))) + logger.debug("BatchJob(id:%s) tasks_cnt: %s" % (self.job_id, self.tasks_cnt)) + logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status)) + + +class JobMgr(): # load job information from etcd # initial a job queue and job schedueler def __init__(self, taskmgr): - threading.Thread.__init__(self) self.job_queue = [] self.job_map = {} self.taskmgr = taskmgr self.fspath = env.getenv('FS_PREFIX') - def run(self): - while True: - self.job_scheduler() - time.sleep(2) - # user: username - # job_data: a json string + # job_info: a json string # user submit a new job, add this job to queue and database def add_job(self, user, job_info): try: job = BatchJob(user, job_info) job.job_id = self.gen_jobid() - self.job_queue.append(job.job_id) self.job_map[job.job_id] = job + self.process_job(job) except ValueError as err: logger.error(err) return [False, err.args[0]] except Exception as err: + logger.error(traceback.format_exc()) + #logger.error(err) return [False, err.args[0]] - finally: - return [True, "add batch job success"] + return [True, "add batch job success"] # user: username # list a user's all job def list_jobs(self,user): res = [] - for job_id in self.job_queue: + for job_id in self.job_map.keys(): job = self.job_map[job_id] logger.debug('job_id: %s, user: %s' % (job_id, job.user)) if job.user == user: @@ -132,7 +224,7 @@ class JobMgr(threading.Thread): # check if a job exists def is_job_exist(self, job_id): - return job_id in self.job_queue + return job_id in self.job_map.keys() # generate a random job id def gen_jobid(self): @@ -141,31 +233,47 @@ class JobMgr(threading.Thread): job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8)) return job_id - # this is a thread to process a job - def job_processor(self, job): - task_name, task_info = job.get_task() - if not task_info: - return False - else: - task_priority = job.job_priority - self.taskmgr.add_task(job.user, task_name, task_info, task_priority) - return True - - # this is a thread to schedule the jobs - def job_scheduler(self): - # choose a job from queue, create a job processor for it - for job_id in self.job_queue: - job = self.job_map[job_id] - if self.job_processor(job): - job.status = 'running' - break + # add tasks into taskmgr's queue + def add_task_taskmgr(self, user, tasks): + for task_name, task_info, task_priority in tasks: + if not task_info: + logger.error("task_info does not exist! task_name(%s)" % task_name) + return False else: - job.status = 'done' + logger.debug("Add task(name:%s) with priority(%s) to taskmgr's queue." % (task_name, task_priority) ) + self.taskmgr.add_task(user, task_name, task_info, task_priority) + return True - # a task has finished - def report(self, task): - pass + # to process a job, add tasks without dependencies of the job into taskmgr + def process_job(self, job): + tasks = job.get_tasks_no_dependency(True) + return self.add_task_taskmgr(job.user, tasks) + # report task status from taskmgr when running, failed and finished + # task_name: user + '_' + job_id + '_' + task_idx + # status: 'running', 'finished', 'retrying', 'failed' + # reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR" + # tried_times: how many times the task has been tried. + def report(self, task_name, status, reason="", tried_times=1): + split_task_name = task_name.split('_') + if len(split_task_name) != 3: + logger.error("Illegal task_name(%s) report from taskmgr" % task_name) + return + user, job_id, task_idx = split_task_name + job = self.job_map[job_id] + if status == "running": + job.update_task_running(task_idx) + elif status == "finished": + next_tasks = job.finish_task(task_idx) + if len(next_tasks) == 0: + return + ret = self.add_task_taskmgr(user, next_tasks) + elif status == "retrying": + job.update_task_retrying(task_idx, reason, tried_times) + elif status == "failed": + job.update_task_failed(task_idx, reason, tried_times) + + # Get Batch job stdout or stderr from its file def get_output(self, username, jobid, taskid, instid, issue): filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt" fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename) diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 07936d8..91aa3e0 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -139,20 +139,26 @@ class TaskMgr(threading.Thread): elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT: if instance['try_count'] > task.info.maxRetryCount: self.check_task_completed(task) + else: + reason = 'FAILED' if report.instanceStatus == FAILED else 'TIMEOUT' + self.task_retrying(task, reason, instance['try_count']) elif report.instanceStatus == OUTPUTERROR: - self.task_failed(task) + self.task_failed(task,"OUTPUTERROR") def check_task_completed(self, task): if len(task.instance_list) < task.info.instanceCount: return failed = False + reason = "FAILED" for instance in task.instance_list: if instance['status'] == RUNNING or instance['status'] == WAITING: return if instance['status'] == FAILED or instance['status'] == TIMEOUT: if instance['try_count'] > task.info.maxRetryCount: failed = True + if instance['status'] == TIMEOUT: + reason = "TIMEOUT" else: return if instance['status'] == OUTPUTERROR: @@ -160,7 +166,7 @@ class TaskMgr(threading.Thread): break if failed: - self.task_failed(task) + self.task_failed(task,reason) else: self.task_completed(task) @@ -171,21 +177,27 @@ class TaskMgr(threading.Thread): if self.jobmgr is None: self.logger.error('[task_completed] jobmgr is None!') else: - self.jobmgr.report(task) + self.jobmgr.report(task.info.id,'finished') self.logger.info('task %s completed' % task.info.id) self.lazy_delete_list.append(task) - def task_failed(self, task): + def task_failed(self, task, reason): task.status = FAILED if self.jobmgr is None: self.logger.error('[task_failed] jobmgr is None!') else: - self.jobmgr.report(task) + self.jobmgr.report(task.info.id,'failed', reason, task.info.maxRetryCount+1) self.logger.info('task %s failed' % task.info.id) self.lazy_delete_list.append(task) + def task_retrying(self, task, reason, tried_times): + if self.jobmgr is None: + self.logger.error('[task_retrying] jobmgr is None!') + else: + self.jobmgr.report(task.info.id,'retrying',reason,tried_times) + @queue_lock def sort_out_task_queue(self): @@ -201,6 +213,7 @@ class TaskMgr(threading.Thread): def task_processor(self, task, instance_id, worker_ip): task.status = RUNNING + self.jobmgr.report(task.info.id,'running') # properties for transaction task.info.instanceid = instance_id diff --git a/src/master/testTaskCtrler.py b/src/master/testTaskCtrler.py index ba6b947..5427183 100644 --- a/src/master/testTaskCtrler.py +++ b/src/master/testTaskCtrler.py @@ -5,20 +5,21 @@ if sys.path[0].endswith("master"): import grpc,time from protos import rpc_pb2, rpc_pb2_grpc +import random, string def run(): channel = grpc.insecure_channel('localhost:50051') stub = rpc_pb2_grpc.WorkerStub(channel) - comm = rpc_pb2.Command(commandLine="ls /root/oss/test-for-docklet", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' - paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/oss/test-for-docklet/", stdoutRedirectPath="/root/oss/test-for-docklet/") + comm = rpc_pb2.Command(commandLine="ls /root;sleep 5;ls /root", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}' + paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/batch_{jobid}/", stdoutRedirectPath="/root/nfs/batch_{jobid}/") img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet") inst = rpc_pb2.Instance(cpu=1, memory=1000, disk=1000, gpu=0) mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="") - clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt]) + clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[]) - task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token="test") + task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token=''.join(random.sample(string.ascii_letters + string.digits, 8))) response = stub.process_task(task) print("Batch client received: " + str(response.status)+" "+response.message) @@ -34,6 +35,7 @@ def stop_task(): print("Batch client received: " + str(response.status)+" "+response.message) if __name__ == '__main__': + #for i in range(10): run() #time.sleep(4) #stop_task() diff --git a/web/templates/batch/batch_create.html b/web/templates/batch/batch_create.html index 532a1d1..bd9fa71 100644 --- a/web/templates/batch/batch_create.html +++ b/web/templates/batch/batch_create.html @@ -177,7 +177,7 @@ +'