Merge branch 'batch' of https://github.com/unias/docklet into batch
This commit is contained in:
commit
8645c6260c
|
@ -1,6 +1,8 @@
|
|||
import threading
|
||||
import time
|
||||
|
||||
import master.monitor
|
||||
|
||||
# must import logger after initlogging, ugly
|
||||
from utils.log import initlogging
|
||||
initlogging("docklet-taskmgr")
|
||||
|
@ -26,19 +28,28 @@ class TaskMgr(threading.Thread):
|
|||
# load task information from etcd
|
||||
# initial a task queue and task schedueler
|
||||
# taskmgr: a taskmgr instance
|
||||
def __init__(self):
|
||||
def __init__(self, nodemgr):
|
||||
threading.Thread.__init__(self)
|
||||
self.thread_stop = False
|
||||
self.taskQueue = []
|
||||
|
||||
# tasks
|
||||
self.task_queue = []
|
||||
|
||||
# nodes
|
||||
self.nodemgr = nodemgr
|
||||
self.all_nodes = None
|
||||
self.last_nodes_info_update_time = 0
|
||||
self.nodes_info_update_interval = 30 # (s)
|
||||
|
||||
|
||||
def run(self):
|
||||
self.serve()
|
||||
while not self.thread_stop:
|
||||
task = self.task_scheduler()
|
||||
if task is not None:
|
||||
self.task_processor(task)
|
||||
time.sleep(2)
|
||||
task, instance_id, worker = self.task_scheduler()
|
||||
if task is not None and worker is not None:
|
||||
self.task_processor(task, instance_id, worker)
|
||||
else:
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
def serve(self):
|
||||
|
@ -46,11 +57,13 @@ class TaskMgr(threading.Thread):
|
|||
add_MasterServicer_to_server(TaskReporter(self), self.server)
|
||||
self.server.add_insecure_port('[::]:50051')
|
||||
self.server.start()
|
||||
logger.info('[taskmgr_rpc] start rpc server')
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.thread_stop = True
|
||||
self.server.stop(0)
|
||||
logger.info('[taskmgr_rpc] stop rpc server')
|
||||
|
||||
|
||||
# this method is called when worker send heart-beat rpc request
|
||||
|
@ -61,41 +74,93 @@ class TaskMgr(threading.Thread):
|
|||
logger.error('[on_task_report] task not found')
|
||||
return
|
||||
|
||||
task.status = report.status
|
||||
if task.status == Task.RUNNING:
|
||||
instance_id = report.parameters.command.envVars['INSTANCE_ID']
|
||||
instance = task.instance_list[instance_id]
|
||||
|
||||
if report.status == Task.RUNNING:
|
||||
pass
|
||||
elif task.status == Task.COMPLETED:
|
||||
# tell jobmgr
|
||||
pass
|
||||
elif task.status == Task.FAILED || task.status == Task.TIMEOUT:
|
||||
# retry
|
||||
if task.maxRetryCount <= 0:
|
||||
# tell jobmgr
|
||||
pass
|
||||
else:
|
||||
# decrease max retry count & waiting for retry
|
||||
task.maxRetryCount -= 1
|
||||
task.status = Task.WAITING
|
||||
elif report.status == Task.COMPLETED:
|
||||
instance['status'] = 'completed'
|
||||
check_task_completed(task)
|
||||
elif report.status == Task.FAILED || report.status == Task.TIMEOUT:
|
||||
instance['status'] = 'failed'
|
||||
if instance['try_count'] > task.maxRetryCount:
|
||||
check_task_completed(task)
|
||||
else:
|
||||
logger.error('[on_task_report] receive report from waiting task')
|
||||
|
||||
|
||||
# this is a thread to process task(or a instance)
|
||||
def task_processor(self,task):
|
||||
# call the rpc to call a function in worker
|
||||
# create container -> execute task
|
||||
# (one instance or multiple instances)
|
||||
# retry when failed
|
||||
print('processing %s' % task)
|
||||
def check_task_completed(self, task):
|
||||
if len(task.instance_list) < task.instanceCount:
|
||||
return
|
||||
failed = False
|
||||
for instance in task.instance_list:
|
||||
if instance['status'] == 'running':
|
||||
return
|
||||
if instance['status'] == 'failed':
|
||||
if instance['try_count'] > task.maxRetryCount:
|
||||
failed = True
|
||||
else:
|
||||
return
|
||||
if failed:
|
||||
# tell jobmgr task failed
|
||||
task.status = Task.FAILED
|
||||
else:
|
||||
# tell jobmgr task completed
|
||||
task.status = Task.COMPLETED
|
||||
self.task_queue.remove(task)
|
||||
|
||||
|
||||
# this is a thread to schdule the tasks
|
||||
def task_processor(self, task, instance_id, worker):
|
||||
task.status = Task.RUNNING
|
||||
task.parameters.command.envVars['INSTANCE_ID'] = instance_id
|
||||
# TODO call the rpc to call a function in worker
|
||||
print('processing %s' % task.id)
|
||||
|
||||
|
||||
# return task, worker
|
||||
def task_scheduler(self):
|
||||
try:
|
||||
task = self.taskQueue.pop(0)
|
||||
except:
|
||||
task = None
|
||||
return task
|
||||
# simple FIFO
|
||||
for task in self.task_queue:
|
||||
worker = self.find_proper_worker(task)
|
||||
if worker is not None:
|
||||
# find instance to retry
|
||||
for instance, index in enumerate(task.instance_list):
|
||||
if instance['status'] == 'failed' and instance['try_count'] <= task.maxRetryCount:
|
||||
instance['try_count'] += 1
|
||||
return task, index, worker
|
||||
|
||||
# start new instance
|
||||
if len(task.instance_list) < task.instanceCount:
|
||||
instance = {}
|
||||
instance['status'] = 'running'
|
||||
instance['try_count'] = 0
|
||||
task.instance_list.append(instance)
|
||||
return task, len(task.instance_list) - 1, worker
|
||||
return None
|
||||
|
||||
|
||||
def find_proper_worker(self, task):
|
||||
nodes = get_all_nodes()
|
||||
if nodes is None or len(nodes) == 0:
|
||||
logger.warning('[task_scheduler] running nodes not found')
|
||||
return None
|
||||
|
||||
# TODO
|
||||
return nodes[0]
|
||||
|
||||
|
||||
def get_all_nodes(self):
|
||||
# cache running nodes
|
||||
if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
|
||||
return self.all_nodes
|
||||
# get running nodes
|
||||
node_ips = self.nodemgr.get_nodeips()
|
||||
self.all_nodes = []
|
||||
for node_ip in node_ips:
|
||||
fetcher = master.monitor.Fetcher(node_ip)
|
||||
self.all_nodes.append(fetcher.info)
|
||||
return self.all_nodes
|
||||
|
||||
|
||||
# user: username
|
||||
|
@ -103,13 +168,15 @@ class TaskMgr(threading.Thread):
|
|||
# save the task information into database
|
||||
# called when jobmgr assign task to taskmgr
|
||||
def add_task(self, task):
|
||||
pass
|
||||
# decode json string to object defined in grpc
|
||||
task.instance_list = []
|
||||
self.task_queue.append(task)
|
||||
|
||||
|
||||
# user: username
|
||||
# get the information of a task, including the status, task description and other information
|
||||
def get_task(self, taskid):
|
||||
for task in self.taskQueue:
|
||||
for task in self.task_queue:
|
||||
if task.id == taskid:
|
||||
return task
|
||||
return None
|
||||
|
|
Loading…
Reference in New Issue