Merge pull request #325 from iteratorlee/dev_batch

add job queue
This commit is contained in:
Yan Li 2018-08-08 19:00:10 +09:00 committed by GitHub
commit 2c05a85649
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 195 additions and 29 deletions

View File

@ -725,7 +725,71 @@ def resetall_system(user, beans, form):
@app.route("/batch/job/add/", methods=['POST'])
@login_required
def add_job(user,beans,form):
pass
global G_jobmgr
job_data = form.to_dict()
job_info = {
'tasks': {}
}
message = {
'success': 'true',
'message': 'add batch job success'
}
for key in job_data:
key_arr = key.split('_')
value = job_data[key]
if key_arr[0] != 'dependency' and value == '':
message['success'] = 'false'
message['message'] = 'value of %s is null' % key
elif len(key_arr) == 1:
job_info[key_arr[0]] = value
elif len(key_arr) == 2:
key_prefix, task_idx = key_arr[0], key_arr[1]
task_idx = 'task_' + task_idx
if task_idx in job_info["tasks"]:
job_info["tasks"][task_idx][key_prefix] = value
else:
tmp_dict = {
key_prefix: value
}
job_info["tasks"][task_idx] = tmp_dict
elif len(key_arr) == 3:
key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2]
task_idx = 'task_' + task_idx
mapping_idx = 'mapping_' + mapping_idx
if task_idx in job_info["tasks"]:
if "mapping" in job_info["tasks"][task_idx]:
if mapping_idx in job_info["tasks"][task_idx]["mapping"]:
job_info["tasks"][task_idx]["mapping"][mapping_idx][key_prefix] = value
else:
tmp_dict = {
key_prefix: value
}
job_info["tasks"][task_idx]["mapping"][mapping_idx] = tmp_dict
else:
job_info["tasks"][task_idx]["mapping"] = {
mapping_idx: {
key_prefix: value
}
}
else:
tmp_dict = {
"mapping":{
mapping_idx: {
key_prefix: value
}
}
}
job_info["tasks"][task_idx] = tmp_dict
logger.debug('batch job adding form %s' % job_data)
logger.debug('batch job adding info %s' % json.dumps(job_info, indent=4))
[status, msg] = G_jobmgr.add_job(user, job_info)
if status:
return json.dumps(message)
else:
message["success"] = "false"
message["message"] = msg
return json.dumps(message)
return json.dumps(message)
@app.route("/batch/job/list/", methods=['POST'])
@login_required
@ -903,6 +967,8 @@ if __name__ == '__main__':
G_networkmgr.printpools()
G_cloudmgr = cloudmgr.CloudMgr()
#G_taskmgr = taskmgr.TaskMgr()
G_jobmgr = jobmgr.JobMgr()
'''G_taskmgr = taskmgr.TaskMgr()
G_jobmgr = jobmgr.JobMgr(taskmgr)
G_jobmgr.start()

View File

@ -1,43 +1,140 @@
import time, threading
import time, threading, random, string
import master.monitor
from utils.log import initlogging, logger
initlogging("docklet-jobmgr")
class BatchJob(object):
def __init__(self, user, job_info):
self.user = user
self.raw_job_info = job_info
self.task_queue = []
self.task_finished = []
self.job_id = None
self.job_name = job_info['jobName']
self.status = 'pending'
self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
self.top_sort()
def top_sort(self):
logger.debug('top sorting')
tasks = self.raw_job_info["tasks"]
dependency_graph = {}
for task_idx in tasks:
dependency_graph[task_idx] = set()
task_info = tasks[task_idx]
dependency = task_info['dependency'].strip().replace(' ', '').split(',')
for t in dependency:
if not t in tasks:
raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx))
dependency_graph[task_idx].add(t)
while len(dependency_graph) > 0:
s = set()
flag = False
for task_idx in dependency_graph:
if len(dependency_graph[task_idx]) == 0:
flag = True
s.add(task_idx)
#there is a circle in the graph
if not flag:
raise ValueError('there is a circle in the dependency graph')
for task_idx in dependency_graph:
for t in s:
if t in dependency_graph[task_idx]:
dependency_graph[task_idx].remove(t)
self.task_queue.append({
'task_idx': s,
'status': 'pending'
})
# get a task and pass it to taskmgr
def get_task(self):
for task in self.task_queue:
if task['status'] == 'pending':
task_idx = task['task_idx']
task['status'] = 'running'
task_name = self.user + '_' + self.job_id + '_' + self.task_idx
return task_name, self.raw_job_info["tasks"][task_idx]
return '', None
# a task has finished
def finish_task(self, task_idx):
pass
class JobMgr(object):
# load job information from etcd
# initial a job queue and job schedueler
#def __init__(self, taskmgr):
def __init__(self):
self.job_queue = []
self.job_map = {}
#self.taskmgr = taskmgr
# user: username
# job: a json string
# job_data: a json string
# user submit a new job, add this job to queue and database
# call add_task to add task information
def add_job(self, user, job):
pass
def add_job(self, user, job_info):
try:
job = BatchJob(user, job_info)
job.job_id = self.gen_jobid()
self.job_queue.append(job_id)
self.job_map[job_id] = job
except ValueError as err:
return [False, err.args[0]]
finally:
return [True, "add batch job success"]
# user: username
# list a user's all job
def list_jobs(self,user):
pass
res = []
for job_id in self.job_queue:
job = self.job_map[job_id]
if job.user == user:
res.append({
'job_name': job.job_name,
'job_id': job.job_id,
'status': job.status,
'create_time': job.create_time
})
return res
# user: username
# jobid: the id of job
# get the information of a job, including the status, json description and other informationa
# call get_task to get the task information
def get_job(self, user, jobid):
def get_job(self, user, job_id):
pass
# job: a json string
# check if a job exists
def is_job_exist(self, job_id):
return job_id in self.job_queue
# generate a random job id
def gen_jobid(self):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
while is_job_exist(job_id):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
return job_id
# this is a thread to process a job
def job_processor(self, job):
# according the DAG of job, add task to taskmanager
# wait for all task completed and exit
pass
task_name, task_info = job.get_task()
if not task_info:
return False
else:
#self.taskmgr.add_task(job.user, task_name, task_info)
return True
# this is a thread to schedule the jobs
def job_scheduler(self):
# choose a job from queue, create a job processor for it
pass
for job_id in self.job_queue:
job = self.job_map[job_id]
if self.job_processer(job):
job.status = 'running'
break
else:
job.status = 'done'
# load job information from etcd
# initial a job queue and job schedueler
def __init__(self, taskmgr):
pass

View File

@ -40,15 +40,15 @@
</div>
</div>
<div class="box-body">
<form id="form" class="form-horizontal" action="/batch_job/add/" method="POST">
<form id="form" class="form-horizontal" action="/batch_job/{{masterips[0].split("@")[0]}}/add/" method="POST">
<div class="form-group"><label class="col-sm-2 control-label">Job Name</label>
<div class="col-sm-10"><input type="text" class="form-control" name="job_name" id="job_name"></div>
<div class="col-sm-10"><input type="text" class="form-control" name="jobName" id="job_name"></div>
</div>
<div class="hr-line-dashed"></div>
<br/>
<div class="form-group"><label class="col-sm-2 control-label">Priority</label>
<div class="col-sm-10"><select id="priority_selector" class="form-control">
<div class="col-sm-10"><select id="priority_selector" class="form-control" name="taskPriority">
{% for priority in range(10) %}
<option value="{{priority}}">{{priority}}</option>
{% endfor %}
@ -117,11 +117,11 @@
var remote_dir = new_mapping.insertCell();
var source = new_mapping.insertCell();
var remove = new_mapping.insertCell();
local_dir.innerHTML = '<input type="text" class="form-control" name="mapping_local_dir_' + task_number + '_' + mapping_number + '" id="mapping_local_dir_'
local_dir.innerHTML = '<input type="text" class="form-control" name="mappingLocalDir_' + task_number + '_' + mapping_number + '" id="mapping_local_dir_'
+ task_number + '_' + mapping_number + '" />';
remote_dir.innerHTML = '<input type="text" class="form-control" name="mapping_remote_dir_' + task_number + '_' + mapping_number + '" id="mapping_remote_dir_'
remote_dir.innerHTML = '<input type="text" class="form-control" name="mappingRemoteDir_' + task_number + '_' + mapping_number + '" id="mapping_remote_dir_'
+ task_number + '_' + mapping_number + '" />';
source.innerHTML = '<select class="form-control" name="mapping_source_' + task_number + '_' + mapping_number + '" id="mapping_source_'
source.innerHTML = '<select class="form-control" name="mappingSource_' + task_number + '_' + mapping_number + '" id="mapping_source_'
+ task_number + '_' + mapping_number + '">'
+'<option>Aliyun</option><option>AWS</option></select>';
remove.innerHTML = '<div class="box-tool pull-left"><button type="button" id="' + task_number + '_' + mapping_number +'" onclick="removeMapping(this)" class="btn btn-xs btn-danger">'

View File

@ -41,9 +41,11 @@
<th>Status</th>
<th>Tasks</th>
<th>Operations</th>
<th>Running Time</th>
<th>Create Time</th>
<tr>
<thead>
<tbody>
</tbody>
</table>
</div>
</div>

View File

@ -130,12 +130,11 @@ def batch_job():
def create_batch_job():
return createBatchJobView().as_view()
@app.route("/batch_job/add/", methods=['POST'])
@app.route("/batch_job/<masterip>/add/", methods=['POST'])
@login_required
def add_batch_job():
#TODO get form parameters of a job description
job_data = {}
job_data["job_name"] = request.form["job_name"]
def add_batch_job(masterip):
addBatchJobView.masterip = masterip
addBatchJobView.job_data = request.form
return addBatchJobView().as_view()
@app.route("/batch_job/state/", methods=['GET'])

View File

@ -40,7 +40,9 @@ class addBatchJobView(normalView):
@classmethod
def post(self):
if True:
masterip = self.masterip
result = dockletRequest.post("/batch/job/add/", self.job_data, masterip)
if result.get('success', None) == "true":
return self.render(self.template_path)
else:
return self.error()