diff --git a/src/master/httprest.py b/src/master/httprest.py index 615c021..90a12e4 100755 --- a/src/master/httprest.py +++ b/src/master/httprest.py @@ -737,7 +737,15 @@ def add_job(user,beans,form): for key in job_data: key_arr = key.split('_') value = job_data[key] - if key_arr[0] != 'dependency' and value == '': + if key_arr[0] == 'srcAddr' and value == '': + task_idx = 'task_' + key_arr[1] + if task_idx in job_info['tasks']: + job_info['tasks'][task_idx]['srcAddr'] = '/root/nfs' + else: + job_info['tasks'][task_idx] = { + 'srcAddr': '/root/nfs/' + } + elif key_arr[0] != 'dependency'and value == '': message['success'] = 'false' message['message'] = 'value of %s is null' % key elif len(key_arr) == 1: @@ -973,11 +981,6 @@ if __name__ == '__main__': G_networkmgr.printpools() G_cloudmgr = cloudmgr.CloudMgr() - '''G_taskmgr = taskmgr.TaskMgr() - G_jobmgr = jobmgr.JobMgr(taskmgr) - G_jobmgr.start() - G_taskmgr.set_jobmgr(G_jobmgr) - G_taskmgr.start()''' # start NodeMgr and NodeMgr will wait for all nodes to start ... G_nodemgr = nodemgr.NodeMgr(G_networkmgr, etcdclient, addr = ipaddr, mode=mode) diff --git a/src/master/jobmgr.py b/src/master/jobmgr.py index 6808110..31c0b4c 100644 --- a/src/master/jobmgr.py +++ b/src/master/jobmgr.py @@ -15,6 +15,7 @@ class BatchJob(object): self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) self.top_sort() + # transfer the dependency graph into a job queue def top_sort(self): logger.debug('top sorting') tasks = self.raw_job_info["tasks"] @@ -74,13 +75,11 @@ class JobMgr(threading.Thread): self.job_map = {} self.taskmgr = taskmgr - def run(self): while True: self.job_scheduler() time.sleep(2) - # user: username # job_data: a json string # user submit a new job, add this job to queue and database @@ -116,7 +115,7 @@ class JobMgr(threading.Thread): # user: username # jobid: the id of job - # get the information of a job, including the status, json description and other informationa + # get the information of a job, including the status, json description and other information # call get_task to get the task information def get_job(self, user, job_id): pass diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index d439f70..beaaf95 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -113,7 +113,7 @@ class TaskMgr(threading.Thread): if instance['try_count'] > task.info.maxRetryCount: self.check_task_completed(task) elif report.instanceStatus == OUTPUTERROR: - task_failed(task) + self.task_failed(task) def check_task_completed(self, task): @@ -326,4 +326,4 @@ class TaskMgr(threading.Thread): if task.info.id == taskid: return task return None - \ No newline at end of file +