Merge pull request #334 from GallenShao/batch

[taskmgr] fix a avaliable memory calc bug
This commit is contained in:
GallenShao 2018-08-10 14:42:31 +08:00 committed by GitHub
commit 62fd6463c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 26 additions and 7 deletions

View File

@ -193,6 +193,19 @@ class TaskMgr(threading.Thread):
def task_scheduler(self):
# simple FIFO
self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue))
# nodes = self.get_all_nodes()
# if nodes is None or len(nodes) == 0:
# self.logger.info('[task_scheduler] no nodes found')
# else:
# for worker_ip, worker_info in nodes:
# self.logger.info('[task_scheduler] nodes %s' % worker_ip)
# for key in worker_info:
# if key == 'cpu':
# self.logger.info('[task_scheduler] %s: %d/%d' % (key, self.get_cpu_usage(worker_ip), worker_info[key]))
# else:
# self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key]))
for task in self.task_queue:
worker = self.find_proper_worker(task)
@ -264,7 +277,7 @@ class TaskMgr(threading.Thread):
worker_info = fetcher.info
info = {}
info['cpu'] = len(worker_info['cpuconfig'])
info['memory'] = worker_info['meminfo']['free'] / 1024 # (Mb)
info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb)
info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb)
info['gpu'] = 0 # not support yet
return info
@ -289,6 +302,11 @@ class TaskMgr(threading.Thread):
def add_task(self, username, taskid, json_task):
# decode json string to object defined in grpc
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
image_dict = {
"private": Image.PRIVATE,
"base": Image.BASE,
"public": Image.PUBLIC
}
# json_task = json.loads(json_task)
task = Task(TaskInfo(
id = taskid,
@ -305,17 +323,18 @@ class TaskMgr(threading.Thread):
stdoutRedirectPath = json_task['stdOutRedPth']),
cluster = Cluster(
image = Image(
name = 'base', #json_task['cluster']['image']['name'],
type = Image.BASE, #json_task['cluster']['image']['type'],
owner = 'base'), #json_task['cluster']['image']['owner']),
name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'],
type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'],
owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']),
instance = Instance(
cpu = int(json_task['cpuSetting']),
memory = int(json_task['memorySetting']),
disk = int(json_task['diskSetting']),
gpu = int(json_task['gpuSetting'])))))
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
for mapping_key in json_task['mapping']])
if 'mapping' in json_task:
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
for mapping_key in json_task['mapping']])
self.task_queue.append(task)