diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index beaaf95..cd9043b 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -193,6 +193,19 @@ class TaskMgr(threading.Thread): def task_scheduler(self): # simple FIFO self.logger.info('[task_scheduler] scheduling... (%d tasks remains)' % len(self.task_queue)) + + # nodes = self.get_all_nodes() + # if nodes is None or len(nodes) == 0: + # self.logger.info('[task_scheduler] no nodes found') + # else: + # for worker_ip, worker_info in nodes: + # self.logger.info('[task_scheduler] nodes %s' % worker_ip) + # for key in worker_info: + # if key == 'cpu': + # self.logger.info('[task_scheduler] %s: %d/%d' % (key, self.get_cpu_usage(worker_ip), worker_info[key])) + # else: + # self.logger.info('[task_scheduler] %s: %d' % (key, worker_info[key])) + for task in self.task_queue: worker = self.find_proper_worker(task) @@ -264,7 +277,7 @@ class TaskMgr(threading.Thread): worker_info = fetcher.info info = {} info['cpu'] = len(worker_info['cpuconfig']) - info['memory'] = worker_info['meminfo']['free'] / 1024 # (Mb) + info['memory'] = (worker_info['meminfo']['buffers'] + worker_info['meminfo']['cached'] + worker_info['meminfo']['free']) / 1024 # (Mb) info['disk'] = sum([disk['free'] for disk in worker_info['diskinfo']]) / 1024 / 1024 # (Mb) info['gpu'] = 0 # not support yet return info @@ -289,6 +302,11 @@ class TaskMgr(threading.Thread): def add_task(self, username, taskid, json_task): # decode json string to object defined in grpc self.logger.info('[taskmgr add_task] receive task %s' % taskid) + image_dict = { + "private": Image.PRIVATE, + "base": Image.BASE, + "public": Image.PUBLIC + } # json_task = json.loads(json_task) task = Task(TaskInfo( id = taskid, @@ -305,17 +323,18 @@ class TaskMgr(threading.Thread): stdoutRedirectPath = json_task['stdOutRedPth']), cluster = Cluster( image = Image( - name = 'base', #json_task['cluster']['image']['name'], - type = Image.BASE, #json_task['cluster']['image']['type'], - owner = 'base'), #json_task['cluster']['image']['owner']), + name = json_task['image'].split('_')[0], #json_task['cluster']['image']['name'], + type = image_dict[json_task['image'].split('_')[2]], #json_task['cluster']['image']['type'], + owner = username if not json_task['image'].split('_')[1] else json_task['image'].split('_')[1]), #json_task['cluster']['image']['owner']), instance = Instance( cpu = int(json_task['cpuSetting']), memory = int(json_task['memorySetting']), disk = int(json_task['diskSetting']), gpu = int(json_task['gpuSetting']))))) - task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'], - remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) - for mapping_key in json_task['mapping']]) + if 'mapping' in json_task: + task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'], + remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir']) + for mapping_key in json_task['mapping']]) self.task_queue.append(task)