refactor taskmgr

This commit is contained in:
Gallen 2019-03-05 17:46:12 +08:00
parent 8f14b4e9c8
commit c85342bec8
2 changed files with 255 additions and 165 deletions

View File

@ -24,16 +24,12 @@ def int_to_ip(num):
return str((num>>24)&255)+"."+str((num>>16)&255)+"."+str((num>>8)&255)+"."+str(num&255)
class Task():
def __init__(self, configinfo, vnodeinfo, taskinfo, priority, max_size):
self.vnodeinfo = vnodeinfo
self.taskinfo = taskinfo
def __init__(self, task_id, username, at_same_time, priority, max_size, task_infos):
self.id = task_id
self.username = username
self.status = WAITING
self.subtask_list = []
self.token = ''
self.maxRetryCount = self.configinfo['maxRetryCount']
self.atSameTime = self.configinfo['atSameTime']
self.multicommand = self.configinfo['multicommand']
self.vnode_nums = self.configinfo['vnode_nums']
# if all the vnodes must be started at the same time
self.at_same_time = at_same_time
# priority the bigger the better
# self.priority the smaller the better
self.priority = int(time.time()) / 60 / 60 - priority
@ -41,8 +37,13 @@ class Task():
self.ips = None
self.max_size = max_size
for i in range(self.vnode_nums):
self.subtask_list.append({'status':'WAITING','try_count':0})
self.subtask_list = [SubTask(
idx = index,
root_task = self,
vnode_info = task_info['vnode_info'],
command_info = task_info['command_info'],
max_retry_count = task_info['max_retry_count']
) for (index, task_info) in enumerate(task_infos)]
def __lt__(self, other):
return self.priority < other.priority
@ -55,8 +56,8 @@ class Task():
self.ips.append(int_to_ip(base_ip + self.task_base_ip + i + 2))
def gen_hosts(self):
username = self.taskinfo.username
taskid = self.taskinfo.taskid
username = self.username
taskid = self.id
logger.info("Generate hosts for user(%s) task(%s) base_ip(%s)"%(username,taskid,str(self.task_base_ip)))
fspath = env.getenv('FS_PREFIX')
if not os.path.isdir("%s/global/users/%s" % (fspath,username)):
@ -72,11 +73,26 @@ class Task():
i += 1
hosts_file.close()
def get_one_resources_need(self):
return self.vnodeinfo.vnode.instance
class SubTask():
def __init__(self, idx, root_task, vnode_info, command_info, max_retry_count):
self.root_task = root_task
self.vnode_info = vnode_info
self.vnode_info.vnodeid = idx
self.command_info = command_info
self.command_info.vnodeid = idx
self.max_retry_count = max_retry_count
self.vnode_started = False
self.task_started = False
self.status = WAITING
self.status_reason = ''
self.try_count = 0
self.worker = None
def get_all_resources_need(self):
return [self.vnodeinfo.vnode.instance for i in range(self.vnode_nums)]
def waiting_for_retry(self):
self.try_count += 1
self.status = WAITING if self.try_count <= self.max_retry_count else FAILED
if self.status == FAILED and self.root_task.at_same_time:
self.root_task.status = FAILED
class TaskReporter(MasterServicer):
@ -89,6 +105,7 @@ class TaskReporter(MasterServicer):
self.taskmgr.on_task_report(task_report)
return Reply(status=Reply.ACCEPTED, message='')
class TaskMgr(threading.Thread):
# load task information from etcd
@ -156,9 +173,9 @@ class TaskMgr(threading.Thread):
self.serve()
while not self.thread_stop:
self.sort_out_task_queue()
task, vnodes_workers = self.task_scheduler()
if task is not None and workers is not None:
self.task_processor(task, vnodes_workers)
task, sub_task_list = self.task_scheduler()
if task is not None and sub_task_list is not None:
self.task_processor(task, sub_task_list)
else:
time.sleep(self.scheduler_interval)
@ -185,39 +202,86 @@ class TaskMgr(threading.Thread):
self.task_queue.append(task)
self.task_queue = sorted(self.task_queue, key=lambda x: x.priority)
def stop_vnode(self, worker, task, vnodeid):
vnodeinfo = copy.copy(task.vnodeinfo)
vnodeinfo.vnodeid = vnodeid
def start_vnode(self, subtask):
try:
self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (task.vnodeinfo.id, vnodeid))
channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port))
self.logger.info('[task_processor] Starting vnode for task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.stop_vnode(vnodeinfo)
response = stub.start_vnode(subtask.vnode_info)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
subtask.status_reason = str(e)
return [False, e]
return [True, ""]
subtask.vnode_started = True
self.cpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.cpu
self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu
return [True, '']
def stop_vnode(self, subtask):
try:
self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.stop_vnode(subtask.vnode_info)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
subtask.status_reason = str(e)
return [False, e]
subtask.vnode_started = False
self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu
self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu
return [True, '']
def start_task(self, subtask):
try:
self.logger.info('[task_processor] Starting task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.start_task(subtask.command_info)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
subtask.status_reason = str(e)
subtask.task_started = True
def stop_task(self, subtask):
try:
self.logger.info('[task_processor] Stoping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid))
channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.stop_stask(subtask.command_info)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
subtask.status = FAILED
subtask.status_reason = str(e)
subtask.task_started = False
@net_lock
def acquire_task_ips(self, task):
self.logger.info("[acquire_task_ips] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.task_base_ip)))
self.logger.info("[acquire_task_ips] user(%s) task(%s) net(%s)" % (task.username, task.id, str(task.task_base_ip)))
if task.task_base_ip == None:
task.task_base_ip = self.free_nets.pop(0)
return task.task_base_ip
@net_lock
def release_task_ips(self,task):
self.logger.info("[release_task_ips] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.task_base_ip)))
def release_task_ips(self, task):
self.logger.info("[release_task_ips] user(%s) task(%s) net(%s)" % (task.username, task.id, str(task.task_base_ip)))
if task.task_base_ip == None:
return
self.free_nets.append(task.task_base_ip)
self.logger.error('[release task_net] %s'%str(e))
task.task_base_ip = None
self.logger.error('[release task_net] %s' % str(e))
def setup_tasknet(self, task, workers=None):
taskid = task.taskinfo.taskid
username = task.taskinfo.username
taskid = task.id
username = task.username
brname = "docklet-batch-%s-%s"%(username, taskid)
gwname = "Batch-%s-%s"%(username, taskid)
if task.task_base_ip == None:
@ -232,85 +296,113 @@ class TaskMgr(threading.Thread):
return [True, gatewayip]
def remove_tasknet(self, task):
taskid = task.taskinfo.taskid
username = task.taskinfo.username
taskid = task.id
username = task.username
brname = "docklet-batch-%s-%s"%(username, taskid)
netcontrol.del_bridge(brname)
def task_processor(self, task, vnodes_workers):
def task_processor(self, task, sub_task_list):
task.status = RUNNING
self.jobmgr.report(task.taskinfo.taskid,'running')
# self.jobmgr.report(task.id,'running')
# properties for transactio
self.acquire_task_net(task)
self.acquire_task_ips(task)
task.gen_ips_from_base(self.base_ip)
task.gen_hosts()
#need to create hosts
[success, gwip] = self.setup_tasknet(task,[w[1] for w in vnodes_workers])
[success, gwip] = self.setup_tasknet(task, [sub_task.worker for sub_task in sub_task_list])
if not success:
self.release_task_ips(task)
return [False, gwip]
token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
placed_workers = []
start_all_vnode_success = True
# start vc
for vid, worker in vnodes_workers:
vnodeinfo = copy.copy(task.vnodeinfo)
vnodeinfo.vnodeid = vid
vnodeinfo.vnode.hostname = "batch-"+str(vid%task.max_size)
vnode = task.subtask_list[vid]
vnode['status'] = RUNNING
vnode['try_count'] += 1
vnode['token'] = token
vnode['worker'] = worker
for sub_task in sub_task_list:
vnode_info = sub_task.vnode_info
vnode_info.vnode.hostname = "batch-"+str(vid%task.max_size)
if sub_task.vnode_started:
continue
self.cpu_usage[worker] += task.vnodeinfo.vnode.instance.cpu
self.gpu_usage[worker] += task.vnodeinfo.vnode.instance.gpu
username = task.vnodeinfo.username
username = sub_task.username
#container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token
#if not username in self.user_containers.keys():
#self.user_containers[username] = []
#self.user_containers[username].append(container_name)
ipaddr = task.ips[vid%task.max_size]
brname = "docklet-batch-%s-%s"%(username, taskid)
ipaddr = task.ips[vid % task.max_size]
brname = "docklet-batch-%s-%s" % (username, taskid)
networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.masterip, brname=brname)
vnodeinfo.vnode.network = networkinfo
vnode_info.vnode.network = networkinfo
try:
self.logger.info('[task_processor] starting vnode for task [%s] instance [%d]' % (task.vnodeinfo.id, vid))
channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.start_vnode(vnodeinfo)
placed_workers.append(worker)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
task.status = FAILED
vnode['status'] = FAILED
vnode['try_count'] -= 1
for pl_worker in placed_workers:
pass
return
#self.user_containers[username].remove(container_name)
placed_workers.append(worker)
if not self.start_vnode(sub_task):
sub_task.waiting_for_retry()
sub_task.worker = None
start_all_vnode_success = False
if not start_all_vnode_success:
return
# start tasks
for vid, worker in vnodes_workers:
taskinfo = copy.copy(task.taskinfo)
taskinfo.vnodeid = vid
taskinfo.token = token
vnode = task.subtask_list[vid]
try:
self.logger.info('[task_processor] starting task [%s] instance [%d]' % (task.vnodeinfo.id, vid))
channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port))
stub = WorkerStub(channel)
response = stub.start_task(taskinfo)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
self.logger.error('[task_processor] rpc error message: %s' % e)
task.status = FAILED
for sub_task in sub_task_list:
task_info = sub_task.command_info
task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8))
if self.start_task(sub_task):
sub_task.status = RUNNING
else:
sub_task.waiting_for_retry()
def clear_sub_tasks(self, sub_task_list):
for sub_task in sub_task_list:
self.clear_sub_task(sub_task)
def clear_sub_task(self, sub_task):
if sub_task.task_started:
self.stop_task(sub_task)
if sub_task.vnode_started:
self.stop_vnode(sub_task)
def check_task_completed(self, task):
if task.status == RUNNING or task.status == WAITING:
for sub_task in task.subtask_list:
if sub_task.status == RUNNING or sub_task.status == WAITING:
return False
self.logger.info('task %s completed' % task.id)
if task.at_same_time and task.status == FAILED:
self.clear_sub_tasks(task.subtask_list)
# TODO report to jobmgr
self.lazy_delete_list.append(task)
return True
# this method is called when worker send heart-beat rpc request
def on_task_report(self, report):
self.logger.info('[on_task_report] receive task report: id %s-%d, status %d' % (report.taskid, report.vnodeid, report.subTaskStatus))
task = self.get_task(report.taskid)
if task == None:
self.logger.error('[on_task_report] task not found')
return
sub_task = task.subtask_list[report.vnodeid]
if sub_task.token != report.token:
self.logger.warning('[on_task_report] wrong token')
return
username = task.username
# container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token
# self.user_containers[username].remove(container_name)
if sub_task.status != RUNNING:
self.logger.error('[on_task_report] receive task report when instance is not running')
sub_task.status = report.subTaskStatus
sub_task.status_reason = report.errmsg
self.clear_sub_task(sub_task)
if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT:
sub_task.waiting_for_retry()
# return task, workers
def task_scheduler(self):
@ -320,60 +412,45 @@ class TaskMgr(threading.Thread):
for task in self.task_queue:
if task in self.lazy_delete_list:
continue
if self.check_task_completed(task):
continue
if task.atSameTime:
if task.at_same_time:
# parallel tasks
if task.status == RUNNING:
continue
workers = self.find_proper_workers(task.get_all_resources_need())
if len(workers) < task.vnode_nums:
workers = self.find_proper_workers(task.subtask_list)
if len(workers) == 0:
return None, None
else:
idxs = [i for i in range(task.vnode_nums)]
return task, zip(idxs,workers)
for i in range(len(workers)):
task.subtask_list[i].worker = workers[i]
return task, task.subtask_list
else:
# traditional tasks
workers = self.find_proper_workers([task.get_one_resources_need()])
if len(workers) < task.vnode_nums:
return None, None, None
'''for index, instance in enumerate(task.instance_list):
# find instance to retry
if (instance['status'] == FAILED or instance['status'] == TIMEOUT) and instance['try_count'] <= task.info.maxRetryCount:
if worker is not None:
self.logger.info('[task_scheduler] retry')
return task, index, worker
# find timeout instance
elif instance['status'] == RUNNING:
if not self.is_alive(instance['worker']):
instance['status'] = FAILED
instance['token'] = ''
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu
for sub_task in task.subtask_list:
if sub_task.status == WAITING:
workers = self.find_proper_workers([sub_task])
if len(workers) > 0:
sub_task.worker = workers[0]
return task, [sub_task]
self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index))
if worker is not None:
return task, index, worker
return None, None
if worker is not None:
# start new instance
if len(task.instance_list) < task.info.instanceCount:
instance = {}
instance['try_count'] = 0
task.instance_list.append(instance)
return task, len(task.instance_list) - 1, worker'''
self.check_task_completed(task)
return None, None, None
def find_proper_workers(self, vnodes_configs):
def find_proper_workers(self, sub_task_list):
nodes = self.get_all_nodes()
if nodes is None or len(nodes) == 0:
self.logger.warning('[task_scheduler] running nodes not found')
return None
proper_workers = []
for needs in vnodes_configs:
has_waiting = False
for sub_task in sub_task_list:
if sub_task.status == WAITING:
has_waiting = True
if sub_task.worker is not None and sub_task.vnode_started:
proper_workers.append(sub_task.worker)
continue
needs = sub_task.vnode_info.vnode.instance
proper_worker = None
for worker_ip, worker_info in nodes:
if needs.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']:
continue
@ -391,11 +468,16 @@ class TaskMgr(threading.Thread):
worker_info['memory'] -= needs.memory
worker_info['gpu'] -= needs.gpu
worker_info['disk'] -= needs.disk
proper_workers.append(worker_ip)
proper_worker = worker_ip
break
if proper_worker is not None:
proper_workers.append(proper_worker)
else:
return []
return proper_workers
if has_waiting:
return proper_workers
else:
return []
def get_all_nodes(self):
# cache running nodes
@ -445,41 +527,49 @@ class TaskMgr(threading.Thread):
"base": Image.BASE,
"public": Image.PUBLIC
}
configinfo = {'vnode_nums':7,'atSameTime':True,'MultiStart':True,
'maxRetryCount':int(json_task['retryCount'])}
# json_task = json.loads(json_task)
task = Task(configinfo,
VNodeInfo(
taskid = taskid,
task = Task(
task_id = taskid,
username = username,
vnode = Vnode(
image = Image(
name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'],
type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'],
owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']),
instance = Instance(
cpu = int(json_task['cpuSetting']),
memory = int(json_task['memorySetting']),
disk = int(json_task['diskSetting']),
gpu = int(json_task['gpuSetting'])))
),
TaskInfo(
taskid = taskid,
username = username,
parameters = Parameters(
command = Command(
commandLine = json_task['command'],
packagePath = json_task['srcAddr'],
envVars = {}),
stderrRedirectPath = json_task.get('stdErrRedPth',""),
stdoutRedirectPath = json_task.get('stdOutRedPth',"")),
timeout = int(json_task['expTime']),
),
priority=task_priority,max_size=(1<<self.task_cidr)-2)
if 'mapping' in json_task:
task.vnodeinfo.vnode.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
for mapping_key in json_task['mapping']])
# all vnode must be started at the same time
at_same_time = json_task['at_same_time'],
priority = task_priority,
max_size = (1 << self.task_cidr) - 2,
task_infos = [{
'max_retry_count': int(json_task['retryCount']),
'vnode_info': VNodeInfo(
taskid = taskid,
username = username,
vnode = Vnode(
image = Image(
name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'],
type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'],
owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']),
instance = Instance(
cpu = int(json_task['cpuSetting']),
memory = int(json_task['memorySetting']),
disk = int(json_task['diskSetting']),
gpu = int(json_task['gpuSetting'])),
mount = [Mount(
localPath = json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
for mapping_key in json_task['mapping']] if 'mapping' in json_task else []
),
),
'command_info': TaskInfo(
taskid = taskid,
username = username,
parameters = Parameters(
command = Command(
commandLine = json_task['command'],
packagePath = json_task['srcAddr'],
envVars = {}),
stderrRedirectPath = json_task.get('stdErrRedPth',""),
stdoutRedirectPath = json_task.get('stdOutRedPth',"")),
timeout = int(json_task['expTime'])
# commands are executed in all vnodes / only excuted in the first vnode
# if in traditional mode, commands will be executed in all vnodes
) if (not json_task['at_same_time'] or json_task['multicommand'] or instance_index == 0) else None
} for instance_index in range(json_task['instCount'])])
self.lazy_append_list.append(task)
@ -488,7 +578,7 @@ class TaskMgr(threading.Thread):
@queue_lock
def get_task(self, taskid):
for task in self.task_queue:
if task.info.id == taskid:
if task.id == taskid:
return task
return None

View File

@ -5,7 +5,7 @@ service Master {
}
service Worker {
rpc start_vnode (VNodeInfo) returns (Reply) {}
rpc start_vnode (VNodeInfo) returns (Reply) {}
rpc start_task (TaskInfo) returns (Reply) {}
rpc stop_task (TaskInfo) returns (Reply) {}
rpc stop_vnode (VNodeInfo) returns (Reply) {}