Merge pull request #371 from FirmlyReality/batch

Update jobmgr to support a new policy of scheduling
This commit is contained in:
Yujian Zhu 2019-01-18 17:43:52 +08:00 committed by GitHub
commit 583bd42e90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 220 additions and 98 deletions

View File

@ -808,12 +808,12 @@ def add_job(user,beans,form):
key_arr = key.split('_')
value = job_data[key]
if key_arr[0] == 'srcAddr' and value == '':
task_idx = 'task_' + key_arr[1]
#task_idx = 'task_' + key_arr[1]
if task_idx in job_info['tasks']:
job_info['tasks'][task_idx]['srcAddr'] = '/root/nfs'
job_info['tasks'][task_idx]['srcAddr'] = '/root'
else:
job_info['tasks'][task_idx] = {
'srcAddr': '/root/nfs/'
'srcAddr': '/root'
}
elif key_arr[0] != 'dependency'and value == '':
message['success'] = 'false'
@ -822,7 +822,7 @@ def add_job(user,beans,form):
job_info[key_arr[0]] = value
elif len(key_arr) == 2:
key_prefix, task_idx = key_arr[0], key_arr[1]
task_idx = 'task_' + task_idx
#task_idx = 'task_' + task_idx
if task_idx in job_info["tasks"]:
job_info["tasks"][task_idx][key_prefix] = value
else:
@ -832,7 +832,7 @@ def add_job(user,beans,form):
job_info["tasks"][task_idx] = tmp_dict
elif len(key_arr) == 3:
key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2]
task_idx = 'task_' + task_idx
#task_idx = 'task_' + task_idx
mapping_idx = 'mapping_' + mapping_idx
if task_idx in job_info["tasks"]:
if "mapping" in job_info["tasks"][task_idx]:
@ -1103,6 +1103,5 @@ if __name__ == '__main__':
G_jobmgr = jobmgr.JobMgr(G_taskmgr)
G_taskmgr.set_jobmgr(G_jobmgr)
G_taskmgr.start()
G_jobmgr.start()
app.run(host = masterip, port = masterport, threaded=True)

View File

@ -1,6 +1,7 @@
import time, threading, random, string, os, traceback
import master.monitor
import subprocess
import subprocess,json
from functools import wraps
from utils.log import initlogging, logger
from utils import env
@ -9,103 +10,194 @@ class BatchJob(object):
def __init__(self, user, job_info):
self.user = user
self.raw_job_info = job_info
self.task_queue = []
self.task_finished = []
self.job_id = None
self.job_name = job_info['jobName']
self.job_priority = int(job_info['jobPriority'])
self.status = 'pending'
self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
self.top_sort()
self.lock = threading.Lock()
self.tasks = {}
self.dependency_out = {}
self.tasks_cnt = {'pending':0, 'scheduling':0, 'running':0, 'retrying':0, 'failed':0, 'finished':0}
# transfer the dependency graph into a job queue
def top_sort(self):
logger.debug('top sorting')
tasks = self.raw_job_info["tasks"]
dependency_graph = {}
for task_idx in tasks:
dependency_graph[task_idx] = set()
task_info = tasks[task_idx]
#init self.tasks & self.dependency_out & self.tasks_cnt
logger.debug("Init BatchJob user:%s job_name:%s create_time:%s" % (self.user, self.job_name, self.create_time))
raw_tasks = self.raw_job_info["tasks"]
self.tasks_cnt['pending'] = len(raw_tasks.keys())
for task_idx in raw_tasks.keys():
task_info = raw_tasks[task_idx]
self.tasks[task_idx] = {}
self.tasks[task_idx]['config'] = task_info
self.tasks[task_idx]['status'] = 'pending'
self.tasks[task_idx]['dependency'] = []
dependency = task_info['dependency'].strip().replace(' ', '').split(',')
if len(dependency) == 1 and dependency[0] == '':
continue
for t in dependency:
if not t in tasks:
raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx))
dependency_graph[task_idx].add(t)
while len(dependency_graph) > 0:
s = set()
flag = False
for task_idx in dependency_graph:
if len(dependency_graph[task_idx]) == 0:
flag = True
s.add(task_idx)
for task_idx in s:
dependency_graph.pop(task_idx)
#there is a circle in the graph
if not flag:
raise ValueError('there is a circle in the dependency graph')
break
for task_idx in dependency_graph:
for t in s:
if t in dependency_graph[task_idx]:
dependency_graph[task_idx].remove(t)
self.task_queue.append({
'task_idx': s,
'status': 'pending'
})
for d in dependency:
if not d in raw_tasks.keys():
raise ValueError('task %s is not defined in the dependency of task %s' % (d, task_idx))
self.tasks[task_idx]['dependency'].append(d)
if not d in self.dependency_out.keys():
self.dependency_out[d] = []
self.dependency_out[d].append(task_idx)
# get a task and pass it to taskmgr
def get_task(self):
for task in self.task_queue:
if task['status'] == 'pending':
task_idx = task['task_idx'].pop()
task['status'] = 'running'
self.log_status()
logger.debug("BatchJob(id:%s) dependency_out: %s" % (self.job_id, json.dumps(self.dependency_out, indent=3)))
def data_lock(f):
@wraps(f)
def new_f(self, *args, **kwargs):
self.lock.acquire()
try:
result = f(self, *args, **kwargs)
except Exception as err:
self.lock.release()
raise err
self.lock.release()
return result
return new_f
# return the tasks without dependencies
@data_lock
def get_tasks_no_dependency(self,update_status=False):
logger.debug("Get tasks without dependencies of BatchJob(id:%s)" % self.job_id)
ret_tasks = []
for task_idx in self.tasks.keys():
if (self.tasks[task_idx]['status'] == 'pending' and
len(self.tasks[task_idx]['dependency']) == 0):
if update_status:
self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1
self.tasks[task_idx]['status'] = 'scheduling'
task_name = self.user + '_' + self.job_id + '_' + task_idx
return task_name, self.raw_job_info["tasks"][task_idx]
return '', None
ret_tasks.append([task_name, self.tasks[task_idx]['config'], self.job_priority])
self.log_status()
return ret_tasks
# a task has finished
# update status of this job based
def _update_job_status(self):
allcnt = len(self.tasks.keys())
if self.tasks_cnt['failed'] != 0:
self.status = 'failed'
elif self.tasks_cnt['running'] != 0:
self.status = 'running'
elif self.tasks_cnt['finished'] == allcnt:
self.status = 'done'
else:
self.status = 'pending'
# start run a task, update status
@data_lock
def update_task_running(self, task_idx):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) running." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status'].split('(')[0]
self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'running'
self.tasks_cnt['running'] += 1
self._update_job_status()
self.log_status()
# a task has finished, update dependency and return tasks without dependencies
@data_lock
def finish_task(self, task_idx):
pass
if task_idx not in self.tasks.keys():
logger.error('Task_idx %s not in job. user:%s job_name:%s job_id:%s'%(task_idx, self.user, self.job_name, self.job_id))
return []
logger.debug("Task(idx:%s) of BatchJob(id:%s) has finished. Update dependency..." % (task_idx, self.job_id))
old_status = self.tasks[task_idx]['status'].split('(')[0]
self.tasks_cnt[old_status] -= 1
self.tasks[task_idx]['status'] = 'finished'
self.tasks_cnt['finished'] += 1
self._update_job_status()
if task_idx not in self.dependency_out.keys():
self.log_status()
return []
ret_tasks = []
for out_idx in self.dependency_out[task_idx]:
try:
self.tasks[out_idx]['dependency'].remove(task_idx)
except Exception as err:
logger.warning(traceback.format_exc())
continue
if (self.tasks[out_idx]['status'] == 'pending' and
len(self.tasks[out_idx]['dependency']) == 0):
self.tasks_cnt['pending'] -= 1
self.tasks_cnt['scheduling'] += 1
self.tasks[out_idx]['status'] = 'scheduling'
task_name = self.user + '_' + self.job_id + '_' + out_idx
ret_tasks.append([task_name, self.tasks[out_idx]['config'], self.job_priority])
self.log_status()
return ret_tasks
class JobMgr(threading.Thread):
# update retrying status of task
@data_lock
def update_task_retrying(self, task_idx, reason, tried_times):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) retrying. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
old_status = self.tasks[task_idx]['status'].split('(')[0]
self.tasks_cnt[old_status] -= 1
self.tasks_cnt['retrying'] += 1
self.tasks[task_idx]['status'] = 'retrying(%s)(%d times)' % (reason, int(tried_times))
self._update_job_status()
self.log_status()
# update failed status of task
@data_lock
def update_task_failed(self, task_idx, reason, tried_times):
logger.debug("Update status of task(idx:%s) of BatchJob(id:%s) failed. reason:%s tried_times:%d" % (task_idx, self.job_id, reason, int(tried_times)))
old_status = self.tasks[task_idx]['status'].split('(')[0]
self.tasks_cnt[old_status] -= 1
self.tasks_cnt['failed'] += 1
if reason == "OUTPUTERROR":
self.tasks[task_idx]['status'] = 'failed(OUTPUTERROR)'
else:
self.tasks[task_idx]['status'] = 'failed(%s)(%d times)' % (reason, int(tried_times))
self._update_job_status()
self.log_status()
# print status for debuging
def log_status(self):
task_copy = {}
for task_idx in self.tasks.keys():
task_copy[task_idx] = {}
task_copy[task_idx]['status'] = self.tasks[task_idx]['status']
task_copy[task_idx]['dependency'] = self.tasks[task_idx]['dependency']
logger.debug("BatchJob(id:%s) tasks status: %s" % (self.job_id, json.dumps(task_copy, indent=3)))
logger.debug("BatchJob(id:%s) tasks_cnt: %s" % (self.job_id, self.tasks_cnt))
logger.debug("BatchJob(id:%s) job_status: %s" %(self.job_id, self.status))
class JobMgr():
# load job information from etcd
# initial a job queue and job schedueler
def __init__(self, taskmgr):
threading.Thread.__init__(self)
self.job_queue = []
self.job_map = {}
self.taskmgr = taskmgr
self.fspath = env.getenv('FS_PREFIX')
def run(self):
while True:
self.job_scheduler()
time.sleep(2)
# user: username
# job_data: a json string
# job_info: a json string
# user submit a new job, add this job to queue and database
def add_job(self, user, job_info):
try:
job = BatchJob(user, job_info)
job.job_id = self.gen_jobid()
self.job_queue.append(job.job_id)
self.job_map[job.job_id] = job
self.process_job(job)
except ValueError as err:
logger.error(err)
return [False, err.args[0]]
except Exception as err:
logger.error(traceback.format_exc())
#logger.error(err)
return [False, err.args[0]]
finally:
return [True, "add batch job success"]
return [True, "add batch job success"]
# user: username
# list a user's all job
def list_jobs(self,user):
res = []
for job_id in self.job_queue:
for job_id in self.job_map.keys():
job = self.job_map[job_id]
logger.debug('job_id: %s, user: %s' % (job_id, job.user))
if job.user == user:
@ -132,7 +224,7 @@ class JobMgr(threading.Thread):
# check if a job exists
def is_job_exist(self, job_id):
return job_id in self.job_queue
return job_id in self.job_map.keys()
# generate a random job id
def gen_jobid(self):
@ -141,31 +233,47 @@ class JobMgr(threading.Thread):
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
return job_id
# this is a thread to process a job
def job_processor(self, job):
task_name, task_info = job.get_task()
if not task_info:
return False
else:
task_priority = job.job_priority
self.taskmgr.add_task(job.user, task_name, task_info, task_priority)
return True
# this is a thread to schedule the jobs
def job_scheduler(self):
# choose a job from queue, create a job processor for it
for job_id in self.job_queue:
job = self.job_map[job_id]
if self.job_processor(job):
job.status = 'running'
break
# add tasks into taskmgr's queue
def add_task_taskmgr(self, user, tasks):
for task_name, task_info, task_priority in tasks:
if not task_info:
logger.error("task_info does not exist! task_name(%s)" % task_name)
return False
else:
job.status = 'done'
logger.debug("Add task(name:%s) with priority(%s) to taskmgr's queue." % (task_name, task_priority) )
self.taskmgr.add_task(user, task_name, task_info, task_priority)
return True
# a task has finished
def report(self, task):
pass
# to process a job, add tasks without dependencies of the job into taskmgr
def process_job(self, job):
tasks = job.get_tasks_no_dependency(True)
return self.add_task_taskmgr(job.user, tasks)
# report task status from taskmgr when running, failed and finished
# task_name: user + '_' + job_id + '_' + task_idx
# status: 'running', 'finished', 'retrying', 'failed'
# reason: reason for failure or retrying, such as "FAILED", "TIMEOUT", "OUTPUTERROR"
# tried_times: how many times the task has been tried.
def report(self, task_name, status, reason="", tried_times=1):
split_task_name = task_name.split('_')
if len(split_task_name) != 3:
logger.error("Illegal task_name(%s) report from taskmgr" % task_name)
return
user, job_id, task_idx = split_task_name
job = self.job_map[job_id]
if status == "running":
job.update_task_running(task_idx)
elif status == "finished":
next_tasks = job.finish_task(task_idx)
if len(next_tasks) == 0:
return
ret = self.add_task_taskmgr(user, next_tasks)
elif status == "retrying":
job.update_task_retrying(task_idx, reason, tried_times)
elif status == "failed":
job.update_task_failed(task_idx, reason, tried_times)
# Get Batch job stdout or stderr from its file
def get_output(self, username, jobid, taskid, instid, issue):
filename = username + "_" + jobid + "_" + taskid + "_" + instid + "_" + issue + ".txt"
fpath = "%s/global/users/%s/data/batch_%s/%s" % (self.fspath,username,jobid,filename)

View File

@ -139,20 +139,26 @@ class TaskMgr(threading.Thread):
elif report.instanceStatus == FAILED or report.instanceStatus == TIMEOUT:
if instance['try_count'] > task.info.maxRetryCount:
self.check_task_completed(task)
else:
reason = 'FAILED' if report.instanceStatus == FAILED else 'TIMEOUT'
self.task_retrying(task, reason, instance['try_count'])
elif report.instanceStatus == OUTPUTERROR:
self.task_failed(task)
self.task_failed(task,"OUTPUTERROR")
def check_task_completed(self, task):
if len(task.instance_list) < task.info.instanceCount:
return
failed = False
reason = "FAILED"
for instance in task.instance_list:
if instance['status'] == RUNNING or instance['status'] == WAITING:
return
if instance['status'] == FAILED or instance['status'] == TIMEOUT:
if instance['try_count'] > task.info.maxRetryCount:
failed = True
if instance['status'] == TIMEOUT:
reason = "TIMEOUT"
else:
return
if instance['status'] == OUTPUTERROR:
@ -160,7 +166,7 @@ class TaskMgr(threading.Thread):
break
if failed:
self.task_failed(task)
self.task_failed(task,reason)
else:
self.task_completed(task)
@ -171,21 +177,27 @@ class TaskMgr(threading.Thread):
if self.jobmgr is None:
self.logger.error('[task_completed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.jobmgr.report(task.info.id,'finished')
self.logger.info('task %s completed' % task.info.id)
self.lazy_delete_list.append(task)
def task_failed(self, task):
def task_failed(self, task, reason):
task.status = FAILED
if self.jobmgr is None:
self.logger.error('[task_failed] jobmgr is None!')
else:
self.jobmgr.report(task)
self.jobmgr.report(task.info.id,'failed', reason, task.info.maxRetryCount+1)
self.logger.info('task %s failed' % task.info.id)
self.lazy_delete_list.append(task)
def task_retrying(self, task, reason, tried_times):
if self.jobmgr is None:
self.logger.error('[task_retrying] jobmgr is None!')
else:
self.jobmgr.report(task.info.id,'retrying',reason,tried_times)
@queue_lock
def sort_out_task_queue(self):
@ -201,6 +213,7 @@ class TaskMgr(threading.Thread):
def task_processor(self, task, instance_id, worker_ip):
task.status = RUNNING
self.jobmgr.report(task.info.id,'running')
# properties for transaction
task.info.instanceid = instance_id

View File

@ -5,20 +5,21 @@ if sys.path[0].endswith("master"):
import grpc,time
from protos import rpc_pb2, rpc_pb2_grpc
import random, string
def run():
channel = grpc.insecure_channel('localhost:50051')
stub = rpc_pb2_grpc.WorkerStub(channel)
comm = rpc_pb2.Command(commandLine="ls /root/oss/test-for-docklet", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/oss/test-for-docklet/", stdoutRedirectPath="/root/oss/test-for-docklet/")
comm = rpc_pb2.Command(commandLine="ls /root;sleep 5;ls /root", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/batch_{jobid}/", stdoutRedirectPath="/root/nfs/batch_{jobid}/")
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
inst = rpc_pb2.Instance(cpu=1, memory=1000, disk=1000, gpu=0)
mnt = rpc_pb2.Mount(localPath="",provider='aliyun',remotePath="test-for-docklet",other="oss-cn-beijing.aliyuncs.com",accessKey="LTAIdl7gmmIhfqA9",secretKey="")
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[mnt])
clu = rpc_pb2.Cluster(image=img, instance=inst, mount=[])
task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token="test")
task = rpc_pb2.TaskInfo(id="test",username="root",instanceid=1,instanceCount=1,maxRetryCount=1,parameters=paras,cluster=clu,timeout=60000,token=''.join(random.sample(string.ascii_letters + string.digits, 8)))
response = stub.process_task(task)
print("Batch client received: " + str(response.status)+" "+response.message)
@ -34,6 +35,7 @@ def stop_task():
print("Batch client received: " + str(response.status)+" "+response.message)
if __name__ == '__main__':
#for i in range(10):
run()
#time.sleep(4)
#stop_task()

View File

@ -177,7 +177,7 @@
+'<div class="col-sm-3"><input type="text" class="form-control" placeholder="/path/to/file or /path/" name="stdOutRedPth_' + task_number + '" id="stdOutRedPth_' + task_number + '" value="/root/nfs/batch_{jobid}/" />'
+'</div></div>'
+'<div class="form-group">'
+'<label class="col-sm-2 control-label">Dependency&nbsp<i class="fa fa-question-circle" title="The tasks that this task depends on, seperate them with commas, eg: task_1, task_2"></i></label>'
+'<label class="col-sm-2 control-label">Dependency&nbsp<i class="fa fa-question-circle" title="The tasks ids that this task depends on, seperate them with commas, eg: 1, 2"></i></label>'
+'<div class="col-sm-3"><input type="text" class="form-control" name="dependency_' + task_number + '" id="dependency_' + task_number + '" />'
+'</div>'
+'<label class="col-sm-2 control-label">Command</label>'