diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index d0f6399..0a973d6 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -24,16 +24,12 @@ def int_to_ip(num): return str((num>>24)&255)+"."+str((num>>16)&255)+"."+str((num>>8)&255)+"."+str(num&255) class Task(): - def __init__(self, configinfo, vnodeinfo, taskinfo, priority, max_size): - self.vnodeinfo = vnodeinfo - self.taskinfo = taskinfo + def __init__(self, task_id, username, at_same_time, priority, max_size, task_infos): + self.id = task_id + self.username = username self.status = WAITING - self.subtask_list = [] - self.token = '' - self.maxRetryCount = self.configinfo['maxRetryCount'] - self.atSameTime = self.configinfo['atSameTime'] - self.multicommand = self.configinfo['multicommand'] - self.vnode_nums = self.configinfo['vnode_nums'] + # if all the vnodes must be started at the same time + self.at_same_time = at_same_time # priority the bigger the better # self.priority the smaller the better self.priority = int(time.time()) / 60 / 60 - priority @@ -41,8 +37,13 @@ class Task(): self.ips = None self.max_size = max_size - for i in range(self.vnode_nums): - self.subtask_list.append({'status':'WAITING','try_count':0}) + self.subtask_list = [SubTask( + idx = index, + root_task = self, + vnode_info = task_info['vnode_info'], + command_info = task_info['command_info'], + max_retry_count = task_info['max_retry_count'] + ) for (index, task_info) in enumerate(task_infos)] def __lt__(self, other): return self.priority < other.priority @@ -55,8 +56,8 @@ class Task(): self.ips.append(int_to_ip(base_ip + self.task_base_ip + i + 2)) def gen_hosts(self): - username = self.taskinfo.username - taskid = self.taskinfo.taskid + username = self.username + taskid = self.id logger.info("Generate hosts for user(%s) task(%s) base_ip(%s)"%(username,taskid,str(self.task_base_ip))) fspath = env.getenv('FS_PREFIX') if not os.path.isdir("%s/global/users/%s" % (fspath,username)): @@ -72,11 +73,26 @@ class Task(): i += 1 hosts_file.close() - def get_one_resources_need(self): - return self.vnodeinfo.vnode.instance +class SubTask(): + def __init__(self, idx, root_task, vnode_info, command_info, max_retry_count): + self.root_task = root_task + self.vnode_info = vnode_info + self.vnode_info.vnodeid = idx + self.command_info = command_info + self.command_info.vnodeid = idx + self.max_retry_count = max_retry_count + self.vnode_started = False + self.task_started = False + self.status = WAITING + self.status_reason = '' + self.try_count = 0 + self.worker = None - def get_all_resources_need(self): - return [self.vnodeinfo.vnode.instance for i in range(self.vnode_nums)] + def waiting_for_retry(self): + self.try_count += 1 + self.status = WAITING if self.try_count <= self.max_retry_count else FAILED + if self.status == FAILED and self.root_task.at_same_time: + self.root_task.status = FAILED class TaskReporter(MasterServicer): @@ -89,6 +105,7 @@ class TaskReporter(MasterServicer): self.taskmgr.on_task_report(task_report) return Reply(status=Reply.ACCEPTED, message='') + class TaskMgr(threading.Thread): # load task information from etcd @@ -156,9 +173,9 @@ class TaskMgr(threading.Thread): self.serve() while not self.thread_stop: self.sort_out_task_queue() - task, vnodes_workers = self.task_scheduler() - if task is not None and workers is not None: - self.task_processor(task, vnodes_workers) + task, sub_task_list = self.task_scheduler() + if task is not None and sub_task_list is not None: + self.task_processor(task, sub_task_list) else: time.sleep(self.scheduler_interval) @@ -185,39 +202,86 @@ class TaskMgr(threading.Thread): self.task_queue.append(task) self.task_queue = sorted(self.task_queue, key=lambda x: x.priority) - def stop_vnode(self, worker, task, vnodeid): - vnodeinfo = copy.copy(task.vnodeinfo) - vnodeinfo.vnodeid = vnodeid + def start_vnode(self, subtask): try: - self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (task.vnodeinfo.id, vnodeid)) - channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port)) + self.logger.info('[task_processor] Starting vnode for task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) + channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) stub = WorkerStub(channel) - response = stub.stop_vnode(vnodeinfo) + response = stub.start_vnode(subtask.vnode_info) if response.status != Reply.ACCEPTED: raise Exception(response.message) except Exception as e: self.logger.error('[task_processor] rpc error message: %s' % e) + subtask.status_reason = str(e) return [False, e] - return [True, ""] + subtask.vnode_started = True + self.cpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.cpu + self.gpu_usage[subtask.worker] += subtask.vnode_info.vnode.instance.gpu + return [True, ''] + + def stop_vnode(self, subtask): + try: + self.logger.info('[task_processor] Stopping vnode for task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) + channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) + stub = WorkerStub(channel) + response = stub.stop_vnode(subtask.vnode_info) + if response.status != Reply.ACCEPTED: + raise Exception(response.message) + except Exception as e: + self.logger.error('[task_processor] rpc error message: %s' % e) + subtask.status_reason = str(e) + return [False, e] + subtask.vnode_started = False + self.cpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.cpu + self.gpu_usage[subtask.worker] -= subtask.vnode_info.vnode.instance.gpu + return [True, ''] + + def start_task(self, subtask): + try: + self.logger.info('[task_processor] Starting task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) + channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) + stub = WorkerStub(channel) + response = stub.start_task(subtask.command_info) + if response.status != Reply.ACCEPTED: + raise Exception(response.message) + except Exception as e: + self.logger.error('[task_processor] rpc error message: %s' % e) + subtask.status_reason = str(e) + subtask.task_started = True + + def stop_task(self, subtask): + try: + self.logger.info('[task_processor] Stoping task [%s] vnode [%d]' % (subtask.vnode_info.taskid, subtask.vnode_info.vnodeid)) + channel = grpc.insecure_channel('%s:%s' % (subtask.worker, self.worker_port)) + stub = WorkerStub(channel) + response = stub.stop_stask(subtask.command_info) + if response.status != Reply.ACCEPTED: + raise Exception(response.message) + except Exception as e: + self.logger.error('[task_processor] rpc error message: %s' % e) + subtask.status = FAILED + subtask.status_reason = str(e) + subtask.task_started = False @net_lock def acquire_task_ips(self, task): - self.logger.info("[acquire_task_ips] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.task_base_ip))) + self.logger.info("[acquire_task_ips] user(%s) task(%s) net(%s)" % (task.username, task.id, str(task.task_base_ip))) if task.task_base_ip == None: task.task_base_ip = self.free_nets.pop(0) return task.task_base_ip @net_lock - def release_task_ips(self,task): - self.logger.info("[release_task_ips] user(%s) task(%s) net(%s)"%(task.taskinfo.username, task.taskinfo.taskid, str(task.task_base_ip))) + def release_task_ips(self, task): + self.logger.info("[release_task_ips] user(%s) task(%s) net(%s)" % (task.username, task.id, str(task.task_base_ip))) if task.task_base_ip == None: return self.free_nets.append(task.task_base_ip) - self.logger.error('[release task_net] %s'%str(e)) + task.task_base_ip = None + self.logger.error('[release task_net] %s' % str(e)) def setup_tasknet(self, task, workers=None): - taskid = task.taskinfo.taskid - username = task.taskinfo.username + taskid = task.id + username = task.username brname = "docklet-batch-%s-%s"%(username, taskid) gwname = "Batch-%s-%s"%(username, taskid) if task.task_base_ip == None: @@ -232,85 +296,113 @@ class TaskMgr(threading.Thread): return [True, gatewayip] def remove_tasknet(self, task): - taskid = task.taskinfo.taskid - username = task.taskinfo.username + taskid = task.id + username = task.username brname = "docklet-batch-%s-%s"%(username, taskid) netcontrol.del_bridge(brname) - def task_processor(self, task, vnodes_workers): + def task_processor(self, task, sub_task_list): task.status = RUNNING - self.jobmgr.report(task.taskinfo.taskid,'running') + # self.jobmgr.report(task.id,'running') # properties for transactio - self.acquire_task_net(task) + self.acquire_task_ips(task) task.gen_ips_from_base(self.base_ip) task.gen_hosts() #need to create hosts - [success, gwip] = self.setup_tasknet(task,[w[1] for w in vnodes_workers]) + [success, gwip] = self.setup_tasknet(task, [sub_task.worker for sub_task in sub_task_list]) if not success: self.release_task_ips(task) return [False, gwip] - token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + placed_workers = [] + start_all_vnode_success = True # start vc - for vid, worker in vnodes_workers: - vnodeinfo = copy.copy(task.vnodeinfo) - vnodeinfo.vnodeid = vid - vnodeinfo.vnode.hostname = "batch-"+str(vid%task.max_size) - vnode = task.subtask_list[vid] - vnode['status'] = RUNNING - vnode['try_count'] += 1 - vnode['token'] = token - vnode['worker'] = worker + for sub_task in sub_task_list: + vnode_info = sub_task.vnode_info + vnode_info.vnode.hostname = "batch-"+str(vid%task.max_size) + if sub_task.vnode_started: + continue - self.cpu_usage[worker] += task.vnodeinfo.vnode.instance.cpu - self.gpu_usage[worker] += task.vnodeinfo.vnode.instance.gpu - username = task.vnodeinfo.username + username = sub_task.username #container_name = task.info.username + '-batch-' + task.info.id + '-' + str(instance_id) + '-' + task.info.token #if not username in self.user_containers.keys(): #self.user_containers[username] = [] #self.user_containers[username].append(container_name) - ipaddr = task.ips[vid%task.max_size] - brname = "docklet-batch-%s-%s"%(username, taskid) + ipaddr = task.ips[vid % task.max_size] + brname = "docklet-batch-%s-%s" % (username, taskid) networkinfo = Network(ipaddr=ipaddr, gateway=gwip, masterip=self.masterip, brname=brname) - vnodeinfo.vnode.network = networkinfo + vnode_info.vnode.network = networkinfo - try: - self.logger.info('[task_processor] starting vnode for task [%s] instance [%d]' % (task.vnodeinfo.id, vid)) - channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port)) - stub = WorkerStub(channel) - response = stub.start_vnode(vnodeinfo) - placed_workers.append(worker) - if response.status != Reply.ACCEPTED: - raise Exception(response.message) - except Exception as e: - self.logger.error('[task_processor] rpc error message: %s' % e) - task.status = FAILED - vnode['status'] = FAILED - vnode['try_count'] -= 1 - for pl_worker in placed_workers: - pass - return - #self.user_containers[username].remove(container_name) + placed_workers.append(worker) + if not self.start_vnode(sub_task): + sub_task.waiting_for_retry() + sub_task.worker = None + start_all_vnode_success = False + + if not start_all_vnode_success: + return # start tasks - for vid, worker in vnodes_workers: - taskinfo = copy.copy(task.taskinfo) - taskinfo.vnodeid = vid - taskinfo.token = token - vnode = task.subtask_list[vid] - try: - self.logger.info('[task_processor] starting task [%s] instance [%d]' % (task.vnodeinfo.id, vid)) - channel = grpc.insecure_channel('%s:%s' % (worker, self.worker_port)) - stub = WorkerStub(channel) - response = stub.start_task(taskinfo) - if response.status != Reply.ACCEPTED: - raise Exception(response.message) - except Exception as e: - self.logger.error('[task_processor] rpc error message: %s' % e) - task.status = FAILED + for sub_task in sub_task_list: + task_info = sub_task.command_info + task_info.token = ''.join(random.sample(string.ascii_letters + string.digits, 8)) + + if self.start_task(sub_task): + sub_task.status = RUNNING + else: + sub_task.waiting_for_retry() + + def clear_sub_tasks(self, sub_task_list): + for sub_task in sub_task_list: + self.clear_sub_task(sub_task) + + def clear_sub_task(self, sub_task): + if sub_task.task_started: + self.stop_task(sub_task) + if sub_task.vnode_started: + self.stop_vnode(sub_task) + + def check_task_completed(self, task): + if task.status == RUNNING or task.status == WAITING: + for sub_task in task.subtask_list: + if sub_task.status == RUNNING or sub_task.status == WAITING: + return False + self.logger.info('task %s completed' % task.id) + if task.at_same_time and task.status == FAILED: + self.clear_sub_tasks(task.subtask_list) + # TODO report to jobmgr + self.lazy_delete_list.append(task) + return True + + # this method is called when worker send heart-beat rpc request + def on_task_report(self, report): + self.logger.info('[on_task_report] receive task report: id %s-%d, status %d' % (report.taskid, report.vnodeid, report.subTaskStatus)) + task = self.get_task(report.taskid) + if task == None: + self.logger.error('[on_task_report] task not found') + return + + sub_task = task.subtask_list[report.vnodeid] + if sub_task.token != report.token: + self.logger.warning('[on_task_report] wrong token') + return + username = task.username + # container_name = username + '-batch-' + task.info.id + '-' + str(report.instanceid) + '-' + report.token + # self.user_containers[username].remove(container_name) + + if sub_task.status != RUNNING: + self.logger.error('[on_task_report] receive task report when instance is not running') + + sub_task.status = report.subTaskStatus + sub_task.status_reason = report.errmsg + + self.clear_sub_task(sub_task) + + if report.subTaskStatus == FAILED or report.subTaskStatus == TIMEOUT: + sub_task.waiting_for_retry() # return task, workers def task_scheduler(self): @@ -320,60 +412,45 @@ class TaskMgr(threading.Thread): for task in self.task_queue: if task in self.lazy_delete_list: continue + if self.check_task_completed(task): + continue - if task.atSameTime: + if task.at_same_time: # parallel tasks - if task.status == RUNNING: - continue - workers = self.find_proper_workers(task.get_all_resources_need()) - if len(workers) < task.vnode_nums: + workers = self.find_proper_workers(task.subtask_list) + if len(workers) == 0: return None, None else: - idxs = [i for i in range(task.vnode_nums)] - return task, zip(idxs,workers) + for i in range(len(workers)): + task.subtask_list[i].worker = workers[i] + return task, task.subtask_list else: # traditional tasks - workers = self.find_proper_workers([task.get_one_resources_need()]) - if len(workers) < task.vnode_nums: - return None, None, None - '''for index, instance in enumerate(task.instance_list): - # find instance to retry - if (instance['status'] == FAILED or instance['status'] == TIMEOUT) and instance['try_count'] <= task.info.maxRetryCount: - if worker is not None: - self.logger.info('[task_scheduler] retry') - return task, index, worker - # find timeout instance - elif instance['status'] == RUNNING: - if not self.is_alive(instance['worker']): - instance['status'] = FAILED - instance['token'] = '' - self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu - self.gpu_usage[instance['worker']] -= task.info.cluster.instance.gpu + for sub_task in task.subtask_list: + if sub_task.status == WAITING: + workers = self.find_proper_workers([sub_task]) + if len(workers) > 0: + sub_task.worker = workers[0] + return task, [sub_task] - self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index)) - if worker is not None: - return task, index, worker + return None, None - if worker is not None: - # start new instance - if len(task.instance_list) < task.info.instanceCount: - instance = {} - instance['try_count'] = 0 - task.instance_list.append(instance) - return task, len(task.instance_list) - 1, worker''' - - self.check_task_completed(task) - - return None, None, None - - def find_proper_workers(self, vnodes_configs): + def find_proper_workers(self, sub_task_list): nodes = self.get_all_nodes() if nodes is None or len(nodes) == 0: self.logger.warning('[task_scheduler] running nodes not found') return None proper_workers = [] - for needs in vnodes_configs: + has_waiting = False + for sub_task in sub_task_list: + if sub_task.status == WAITING: + has_waiting = True + if sub_task.worker is not None and sub_task.vnode_started: + proper_workers.append(sub_task.worker) + continue + needs = sub_task.vnode_info.vnode.instance + proper_worker = None for worker_ip, worker_info in nodes: if needs.cpu + self.get_cpu_usage(worker_ip) > worker_info['cpu']: continue @@ -391,11 +468,16 @@ class TaskMgr(threading.Thread): worker_info['memory'] -= needs.memory worker_info['gpu'] -= needs.gpu worker_info['disk'] -= needs.disk - proper_workers.append(worker_ip) + proper_worker = worker_ip break + if proper_worker is not None: + proper_workers.append(proper_worker) else: return [] - return proper_workers + if has_waiting: + return proper_workers + else: + return [] def get_all_nodes(self): # cache running nodes @@ -445,41 +527,49 @@ class TaskMgr(threading.Thread): "base": Image.BASE, "public": Image.PUBLIC } - configinfo = {'vnode_nums':7,'atSameTime':True,'MultiStart':True, - 'maxRetryCount':int(json_task['retryCount'])} - # json_task = json.loads(json_task) - task = Task(configinfo, - VNodeInfo( - taskid = taskid, + task = Task( + task_id = taskid, username = username, - vnode = Vnode( - image = Image( - name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'], - type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'], - owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']), - instance = Instance( - cpu = int(json_task['cpuSetting']), - memory = int(json_task['memorySetting']), - disk = int(json_task['diskSetting']), - gpu = int(json_task['gpuSetting']))) - ), - TaskInfo( - taskid = taskid, - username = username, - parameters = Parameters( - command = Command( - commandLine = json_task['command'], - packagePath = json_task['srcAddr'], - envVars = {}), - stderrRedirectPath = json_task.get('stdErrRedPth',""), - stdoutRedirectPath = json_task.get('stdOutRedPth',"")), - timeout = int(json_task['expTime']), - ), - priority=task_priority,max_size=(1<