add simple resource manage to taskmgr

This commit is contained in:
Gallen 2018-07-19 20:47:49 +08:00
parent 55ed430439
commit 2c8216e143
1 changed files with 56 additions and 26 deletions

View File

@ -25,7 +25,7 @@ class TaskReporter(MasterServicer):
def report(self, request, context):
self.taskmgr.on_task_report(request)
return Reply(message=Reply.ACCEPTED)
return Reply(status=Reply.ACCEPTED, message='')
class TaskMgr(threading.Thread):
@ -42,9 +42,10 @@ class TaskMgr(threading.Thread):
# nodes
self.nodemgr = nodemgr
self.all_nodes = None
self.last_nodes_info_update_time = 0
self.nodes_info_update_interval = 30 # (s)
self.cpu_usage = {}
# self.all_nodes = None
# self.last_nodes_info_update_time = 0
# self.nodes_info_update_interval = 30 # (s)
def run(self):
@ -85,11 +86,15 @@ class TaskMgr(threading.Thread):
return
instance['status'] = report.instanceStatus
if report.instanceStatus == Status.RUNNING:
instance['last_update_time'] = time.time()
elif report.instanceStatus == Status.COMPLETED:
else:
self.cpu_usage[instance['worker']] -= task.cluster.instance.cpu
if report.instanceStatus == Status.COMPLETED:
check_task_completed(task)
elif report.instanceStatus == Status.FAILED || report.instanceStatus == Status.TIMEOUT:
elif report.instanceStatus == Status.FAILED or report.instanceStatus == Status.TIMEOUT:
if instance['try_count'] > task.maxRetryCount:
check_task_completed(task)
else:
@ -101,9 +106,9 @@ class TaskMgr(threading.Thread):
return
failed = False
for instance in task.instance_list:
if instance['status'] == Status.RUNNING || instance['status'] == Status.WAITING:
if instance['status'] == Status.RUNNING or instance['status'] == Status.WAITING:
return
if instance['status'] == Status.FAILED || instance['status'] == Status.TIMEOUT:
if instance['status'] == Status.FAILED or instance['status'] == Status.TIMEOUT:
if instance['try_count'] > task.maxRetryCount:
failed = True
else:
@ -133,15 +138,19 @@ class TaskMgr(threading.Thread):
instance['last_update_time'] = time.time()
instance['try_count'] += 1
instance['token'] = task.token
instance['worker'] = worker
self.cpu_usage[worker] += task.cluster.instance.cpu
try:
logger.info('[task_processor] processing %s' % task.id)
channel = grpc.insecure_channel('%s:50052' % worker)
stub = WorkerStub(channel)
response = stub.process_task(task)
logger.info('[task_processor] worker response: %d' response.message)
if response.status != Reply.ACCEPTED:
raise Exception(response.message)
except Exception as e:
logger.error('[task_processor] rpc error message: %s' e)
logger.error('[task_processor] rpc error message: %s' % e)
instance['status'] = Status.FAILED
instance['try_count'] -= 1
@ -154,7 +163,7 @@ class TaskMgr(threading.Thread):
if worker is not None:
# find instance to retry
for instance, index in enumerate(task.instance_list):
if (instance['status'] == Status.FAILED || instance['status'] == Status.TIMEOUT) and instance['try_count'] <= task.maxRetryCount:
if (instance['status'] == Status.FAILED or instance['status'] == Status.TIMEOUT) and instance['try_count'] <= task.maxRetryCount:
return task, index, worker
elif instance['status'] == Status.RUNNING:
if time.time() - instance['last_update_time'] > self.heart_beat_timeout:
@ -177,24 +186,46 @@ class TaskMgr(threading.Thread):
logger.warning('[task_scheduler] running nodes not found')
return None
for node in nodes:
# TODO
if True:
return node[0]
for worker_ip, worker_info in nodes:
if task.cluster.instance.cpu + get_cpu_usage(worker_ip) > worker_info['cpu']:
continue
if task.cluster.instance.memory > worker_info['memory']:
continue
if task.cluster.instance.disk > worker_info['disk']:
continue
if task.cluster.instance.gpu > worker_info['gpu']:
continue
return worker_ip
return None
def get_all_nodes(self):
# cache running nodes
if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
return self.all_nodes
# if self.all_nodes is not None and time.time() - self.last_nodes_info_update_time < self.nodes_info_update_interval:
# return self.all_nodes
# get running nodes
node_ips = self.nodemgr.get_nodeips()
self.all_nodes = []
for node_ip in node_ips:
fetcher = master.monitor.Fetcher(node_ip)
self.all_nodes.append((node_ip, fetcher.info))
return self.all_nodes
all_nodes = [(node_ip, get_worker_resource_info(node_ip)) for node_ip in node_ips]
return all_nodes
def get_worker_resource_info(self, worker_ip):
fetcher = master.monitor.Fetcher(worker_ip)
worker_info = fetcher.info
info = {}
info['cpu'] = len(worker_info['cpuconfig'])
info['memory'] = worker_info['meminfo']['free'] / 1024 # (Mb)
info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb)
info['gpu'] = 0 # not support yet
return info
def get_cpu_usage(self, worker_ip):
try:
return self.cpu_usage[worker_ip]
except:
self.cpu_usage[worker_ip] = 0
return 0
def set_jobmgr(self, jobmgr):
@ -222,7 +253,7 @@ class TaskMgr(threading.Thread):
stderrRedirectPath = json_task['parameters']['stderrRedirectPath'],
stdoutRedirectPath = json_task['parameters']['stdoutRedirectPath']),
cluster = Cluster(
,image = Image(
image = Image(
name = json_task['cluster']['image']['name'],
type = json_task['cluster']['image']['type'],
owner = json_task['cluster']['image']['owner']),
@ -231,9 +262,8 @@ class TaskMgr(threading.Thread):
memory = json_task['cluster']['instance']['memory'],
disk = json_task['cluster']['instance']['disk'],
gpu = json_task['cluster']['instance']['gpu'])))
task.cluster.mount = []
for mount in json_task['cluster']['mount']:
task.cluster.mount.append(Mount(localPath=mount['localPath'], remotePath=mount['remotePath']))
task.cluster.mount = [Mount(localPath=mount['localPath'], remotePath=mount['remotePath'])
for mount in json_task['cluster']['mount']]
# local properties
task.status = Status.WAITING