basic implement for taskmgr

This commit is contained in:
Gallen 2018-07-19 14:55:27 +08:00
parent a670020e8d
commit 245fad1fad
1 changed files with 102 additions and 35 deletions

View File

@ -1,6 +1,8 @@
import threading import threading
import time import time
import master.monitor
# must import logger after initlogging, ugly # must import logger after initlogging, ugly
from utils.log import initlogging from utils.log import initlogging
initlogging("docklet-taskmgr") initlogging("docklet-taskmgr")
@ -26,19 +28,28 @@ class TaskMgr(threading.Thread):
# load task information from etcd # load task information from etcd
# initial a task queue and task schedueler # initial a task queue and task schedueler
# taskmgr: a taskmgr instance # taskmgr: a taskmgr instance
def __init__(self): def __init__(self, nodemgr):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.thread_stop = False self.thread_stop = False
self.taskQueue = []
# tasks
self.task_queue = []
# nodes
self.nodemgr = nodemgr
self.all_nodes = None
self.last_nodes_info_update_time = 0
self.nodes_info_update_interval = 30 # (s)
def run(self): def run(self):
self.serve() self.serve()
while not self.thread_stop: while not self.thread_stop:
task = self.task_scheduler() task, instance_id, worker = self.task_scheduler()
if task is not None: if task is not None and worker is not None:
self.task_processor(task) self.task_processor(task, instance_id, worker)
time.sleep(2) else:
time.sleep(2)
def serve(self): def serve(self):
@ -46,11 +57,13 @@ class TaskMgr(threading.Thread):
add_MasterServicer_to_server(TaskReporter(self), self.server) add_MasterServicer_to_server(TaskReporter(self), self.server)
self.server.add_insecure_port('[::]:50051') self.server.add_insecure_port('[::]:50051')
self.server.start() self.server.start()
logger.info('[taskmgr_rpc] start rpc server')
def stop(self): def stop(self):
self.thread_stop = True self.thread_stop = True
self.server.stop(0) self.server.stop(0)
logger.info('[taskmgr_rpc] stop rpc server')
# this method is called when worker send heart-beat rpc request # this method is called when worker send heart-beat rpc request
@ -61,41 +74,93 @@ class TaskMgr(threading.Thread):
logger.error('[on_task_report] task not found') logger.error('[on_task_report] task not found')
return return
task.status = report.status instance_id = report.parameters.command.envVars['INSTANCE_ID']
if task.status == Task.RUNNING: instance = task.instance_list[instance_id]
if report.status == Task.RUNNING:
pass pass
elif task.status == Task.COMPLETED: elif report.status == Task.COMPLETED:
# tell jobmgr instance['status'] = 'completed'
pass check_task_completed(task)
elif task.status == Task.FAILED || task.status == Task.TIMEOUT: elif report.status == Task.FAILED || report.status == Task.TIMEOUT:
# retry instance['status'] = 'failed'
if task.maxRetryCount <= 0: if instance['try_count'] > task.maxRetryCount:
# tell jobmgr check_task_completed(task)
pass
else:
# decrease max retry count & waiting for retry
task.maxRetryCount -= 1
task.status = Task.WAITING
else: else:
logger.error('[on_task_report] receive report from waiting task') logger.error('[on_task_report] receive report from waiting task')
# this is a thread to process task(or a instance) def check_task_completed(self, task):
def task_processor(self,task): if len(task.instance_list) < task.instanceCount:
# call the rpc to call a function in worker return
# create container -> execute task failed = False
# (one instance or multiple instances) for instance in task.instance_list:
# retry when failed if instance['status'] == 'running':
print('processing %s' % task) return
if instance['status'] == 'failed':
if instance['try_count'] > task.maxRetryCount:
failed = True
else:
return
if failed:
# tell jobmgr task failed
task.status = Task.FAILED
else:
# tell jobmgr task completed
task.status = Task.COMPLETED
self.task_queue.remove(task)
# this is a thread to schdule the tasks def task_processor(self, task, instance_id, worker):
task.status = Task.RUNNING
task.parameters.command.envVars['INSTANCE_ID'] = instance_id
# TODO call the rpc to call a function in worker
print('processing %s' % task.id)
# return task, worker
def task_scheduler(self): def task_scheduler(self):
try: # simple FIFO
task = self.taskQueue.pop(0) for task in self.task_queue:
except: worker = self.find_proper_worker(task)
task = None if worker is not None:
return task # find instance to retry
for instance, index in enumerate(task.instance_list):
if instance['status'] == 'failed' and instance['try_count'] <= task.maxRetryCount:
instance['try_count'] += 1
return task, index, worker
# start new instance
if len(task.instance_list) < task.instanceCount:
instance = {}
instance['status'] = 'running'
instance['try_count'] = 0
task.instance_list.append(instance)
return task, len(task.instance_list) - 1, worker
return None
def find_proper_worker(self, task):
nodes = get_all_nodes()
if nodes is None or len(nodes) == 0:
logger.warning('[task_scheduler] running nodes not found')
return None
# TODO
return nodes[0]
def get_all_nodes(self):
# cache running nodes
if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
return self.all_nodes
# get running nodes
node_ips = self.nodemgr.get_nodeips()
self.all_nodes = []
for node_ip in node_ips:
fetcher = master.monitor.Fetcher(node_ip)
self.all_nodes.append(fetcher.info)
return self.all_nodes
# user: username # user: username
@ -103,13 +168,15 @@ class TaskMgr(threading.Thread):
# save the task information into database # save the task information into database
# called when jobmgr assign task to taskmgr # called when jobmgr assign task to taskmgr
def add_task(self, task): def add_task(self, task):
pass # decode json string to object defined in grpc
task.instance_list = []
self.task_queue.append(task)
# user: username # user: username
# get the information of a task, including the status, task description and other information # get the information of a task, including the status, task description and other information
def get_task(self, taskid): def get_task(self, taskid):
for task in self.taskQueue: for task in self.task_queue:
if task.id == taskid: if task.id == taskid:
return task return task
return None return None