Merge pull request #331 from iteratorlee/dev_batch

fix package path bug
This commit is contained in:
Yan Li 2018-08-10 12:07:59 +09:00 committed by GitHub
commit ffe2a0487d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 13 additions and 11 deletions

View File

@ -737,7 +737,15 @@ def add_job(user,beans,form):
for key in job_data: for key in job_data:
key_arr = key.split('_') key_arr = key.split('_')
value = job_data[key] value = job_data[key]
if key_arr[0] != 'dependency' and value == '': if key_arr[0] == 'srcAddr' and value == '':
task_idx = 'task_' + key_arr[1]
if task_idx in job_info['tasks']:
job_info['tasks'][task_idx]['srcAddr'] = '/root/nfs'
else:
job_info['tasks'][task_idx] = {
'srcAddr': '/root/nfs/'
}
elif key_arr[0] != 'dependency'and value == '':
message['success'] = 'false' message['success'] = 'false'
message['message'] = 'value of %s is null' % key message['message'] = 'value of %s is null' % key
elif len(key_arr) == 1: elif len(key_arr) == 1:
@ -973,11 +981,6 @@ if __name__ == '__main__':
G_networkmgr.printpools() G_networkmgr.printpools()
G_cloudmgr = cloudmgr.CloudMgr() G_cloudmgr = cloudmgr.CloudMgr()
'''G_taskmgr = taskmgr.TaskMgr()
G_jobmgr = jobmgr.JobMgr(taskmgr)
G_jobmgr.start()
G_taskmgr.set_jobmgr(G_jobmgr)
G_taskmgr.start()'''
# start NodeMgr and NodeMgr will wait for all nodes to start ... # start NodeMgr and NodeMgr will wait for all nodes to start ...
G_nodemgr = nodemgr.NodeMgr(G_networkmgr, etcdclient, addr = ipaddr, mode=mode) G_nodemgr = nodemgr.NodeMgr(G_networkmgr, etcdclient, addr = ipaddr, mode=mode)

View File

@ -15,6 +15,7 @@ class BatchJob(object):
self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime()) self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
self.top_sort() self.top_sort()
# transfer the dependency graph into a job queue
def top_sort(self): def top_sort(self):
logger.debug('top sorting') logger.debug('top sorting')
tasks = self.raw_job_info["tasks"] tasks = self.raw_job_info["tasks"]
@ -74,13 +75,11 @@ class JobMgr(threading.Thread):
self.job_map = {} self.job_map = {}
self.taskmgr = taskmgr self.taskmgr = taskmgr
def run(self): def run(self):
while True: while True:
self.job_scheduler() self.job_scheduler()
time.sleep(2) time.sleep(2)
# user: username # user: username
# job_data: a json string # job_data: a json string
# user submit a new job, add this job to queue and database # user submit a new job, add this job to queue and database
@ -116,7 +115,7 @@ class JobMgr(threading.Thread):
# user: username # user: username
# jobid: the id of job # jobid: the id of job
# get the information of a job, including the status, json description and other informationa # get the information of a job, including the status, json description and other information
# call get_task to get the task information # call get_task to get the task information
def get_job(self, user, job_id): def get_job(self, user, job_id):
pass pass

View File

@ -113,7 +113,7 @@ class TaskMgr(threading.Thread):
if instance['try_count'] > task.info.maxRetryCount: if instance['try_count'] > task.info.maxRetryCount:
self.check_task_completed(task) self.check_task_completed(task)
elif report.instanceStatus == OUTPUTERROR: elif report.instanceStatus == OUTPUTERROR:
task_failed(task) self.task_failed(task)
def check_task_completed(self, task): def check_task_completed(self, task):
@ -326,4 +326,4 @@ class TaskMgr(threading.Thread):
if task.info.id == taskid: if task.info.id == taskid:
return task return task
return None return None