diff --git a/src/master/taskmgr.py b/src/master/taskmgr.py index 1f4587c..3026c19 100644 --- a/src/master/taskmgr.py +++ b/src/master/taskmgr.py @@ -5,9 +5,9 @@ import random import json # must import logger after initlogging, ugly -from utils.log import initlogging -initlogging("docklet-taskmgr") -from utils.log import logger +# from utils.log import initlogging +# initlogging("docklet-taskmgr") +# from utils.log import logger # grpc from concurrent import futures @@ -49,10 +49,7 @@ class TaskMgr(threading.Thread): self.task_queue = [] self.scheduler_interval = scheduler_interval - if external_logger is None: - self.logger = logger - else: - self.logger = external_logger + self.logger = external_logger self.master_port = env.getenv('BATCH_MASTER_PORT') self.worker_port = env.getenv('BATCH_WORKER_PORT') @@ -209,7 +206,7 @@ class TaskMgr(threading.Thread): return task, index, worker # find timeout instance elif instance['status'] == RUNNING: - if not is_alive(instance['worker']): + if not self.is_alive(instance['worker']): instance['status'] = FAILED instance['token'] = '' self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu @@ -297,28 +294,29 @@ class TaskMgr(threading.Thread): task = Task(TaskInfo( id = taskid, username = username, - instanceCount = json_task['instanceCount'], - maxRetryCount = json_task['maxRetryCount'], - timeout = json_task['timeout'], + instanceCount = int(json_task['instCount']), + maxRetryCount = int(json_task['retryCount']), + timeout = int(json_task['expTime']), parameters = Parameters( command = Command( - commandLine = json_task['parameters']['command']['commandLine'], - packagePath = json_task['parameters']['command']['packagePath'], - envVars = json_task['parameters']['command']['envVars']), - stderrRedirectPath = json_task['parameters']['stderrRedirectPath'], - stdoutRedirectPath = json_task['parameters']['stdoutRedirectPath']), + commandLine = json_task['command'], + packagePath = json_task['srcAddr'], + envVars = {}), + stderrRedirectPath = json_task['stdErrRedPth'], + stdoutRedirectPath = json_task['stdOutRedPth']), cluster = Cluster( image = Image( - name = json_task['cluster']['image']['name'], - type = json_task['cluster']['image']['type'], - owner = json_task['cluster']['image']['owner']), + name = 'base', #json_task['cluster']['image']['name'], + type = Image.BASE, #json_task['cluster']['image']['type'], + owner = 'base'), #json_task['cluster']['image']['owner']), instance = Instance( - cpu = json_task['cluster']['instance']['cpu'], - memory = json_task['cluster']['instance']['memory'], - disk = json_task['cluster']['instance']['disk'], - gpu = json_task['cluster']['instance']['gpu'])))) - task.info.cluster.mount.extend([Mount(localPath=mount['localPath'], remotePath=mount['remotePath']) - for mount in json_task['cluster']['mount']]) + cpu = int(json_task['cpuSetting']), + memory = int(json_task['memorySetting']), + disk = int(json_task['diskSetting']), + gpu = int(json_task['gpuSetting']))))) + task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'], + remotePath=json_task['mapping'][mapping_key]['mappingRemotefDir']) + for mapping_key in json_task['mapping']]) self.task_queue.append(task) diff --git a/src/master/testTaskMgr.py b/src/master/testTaskMgr.py index 138b56a..9503fdc 100644 --- a/src/master/testTaskMgr.py +++ b/src/master/testTaskMgr.py @@ -94,7 +94,7 @@ class SimulatedJobMgr(threading.Thread): task['timeout'] = timeout task['parameters'] = {} task['parameters']['command'] = {} - task['parameters']['command']['commandLine'] = '' + task['parameters']['command']['commandLine'] = 'ls' task['parameters']['command']['packagePath'] = '' task['parameters']['command']['envVars'] = {'a':'1'} task['parameters']['stderrRedirectPath'] = '' @@ -111,7 +111,7 @@ class SimulatedJobMgr(threading.Thread): task['cluster']['instance']['gpu'] = 0 task['cluster']['mount'] = [{'remotePath':'', 'localPath':''}] - taskmgr.add_task('user', taskid, json.dumps(task)) + taskmgr.add_task('root', taskid, json.dumps(task)) class SimulatedLogger():