[taskmgr] fix a avaliable memory calc bug

This commit is contained in:
Gallen 2018-08-10 13:40:58 +08:00
parent a77e6a8d38
commit 6bc2a06705
1 changed files with 26 additions and 7 deletions

View File

@ -193,6 +193,19 @@ class TaskMgr(threading.Thread):
def task_scheduler(self): def task_scheduler(self):
# simple FIFO # simple FIFO
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue)) self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
# nodes = self.get_all_nodes()
# if nodes is None or len(nodes) == 0:
# self.logger.info('[task_scheduler] no nodes found')
# else:
# for worker_ip, worker_info in nodes:
# self.logger.info('[task_scheduler] nodes %s' % worker_ip)
# for key in worker_info:
# if key == 'cpu':
# self.logger.info('[task_scheduler] %s: %d/%d' % (key, self.get_cpu_usage(worker_ip), worker_info[key]))
# else:
# self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key]))
for task in self.task_queue: for task in self.task_queue:
worker = self.find_proper_worker(task) worker = self.find_proper_worker(task)
@ -264,7 +277,7 @@ class TaskMgr(threading.Thread):
worker_info = fetcher.info worker_info = fetcher.info
info = {} info = {}
info['cpu'] = len(worker_info['cpuconfig']) info['cpu'] = len(worker_info['cpuconfig'])
info['memory'] = worker_info['meminfo']['free'] / 1024 # (Mb) info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb)
info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb) info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb)
info['gpu'] = 0 # not support yet info['gpu'] = 0 # not support yet
return info return info
@ -289,6 +302,11 @@ class TaskMgr(threading.Thread):
def add_task(self, username, taskid, json_task): def add_task(self, username, taskid, json_task):
# decode json string to object defined in grpc # decode json string to object defined in grpc
self.logger.info('[taskmgr add_task] receive task %s' % taskid) self.logger.info('[taskmgr add_task] receive task %s' % taskid)
image_dict = {
"private": Image.PRIVATE,
"base": Image.BASE,
"public": Image.PUBLIC
}
# json_task = json.loads(json_task) # json_task = json.loads(json_task)
task = Task(TaskInfo( task = Task(TaskInfo(
id = taskid, id = taskid,
@ -305,14 +323,15 @@ class TaskMgr(threading.Thread):
stdoutRedirectPath = json_task['stdOutRedPth']), stdoutRedirectPath = json_task['stdOutRedPth']),
cluster = Cluster( cluster = Cluster(
image = Image( image = Image(
name = 'base', #json_task['cluster']['image']['name'], name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'],
type = Image.BASE, #json_task['cluster']['image']['type'], type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'],
owner = 'base'), #json_task['cluster']['image']['owner']), owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']),
instance = Instance( instance = Instance(
cpu = int(json_task['cpuSetting']), cpu = int(json_task['cpuSetting']),
memory = int(json_task['memorySetting']), memory = int(json_task['memorySetting']),
disk = int(json_task['diskSetting']), disk = int(json_task['diskSetting']),
gpu = int(json_task['gpuSetting']))))) gpu = int(json_task['gpuSetting'])))))
if 'mapping' in json_task:
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'], task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
for mapping_key in json_task['mapping']]) for mapping_key in json_task['mapping']])