This commit is contained in:
Gallen 2018-08-08 21:51:27 +08:00
parent c7ae571bae
commit 0a4507b3a5
2 changed files with 25 additions and 27 deletions

View File

@ -5,9 +5,9 @@ import random
import json
# must import logger after initlogging, ugly
from utils.log import initlogging
initlogging("docklet-taskmgr")
from utils.log import logger
# from utils.log import initlogging
# initlogging("docklet-taskmgr")
# from utils.log import logger
# grpc
from concurrent import futures
@ -49,10 +49,7 @@ class TaskMgr(threading.Thread):
self.task_queue = []
self.scheduler_interval = scheduler_interval
if external_logger is None:
self.logger = logger
else:
self.logger = external_logger
self.logger = external_logger
self.master_port = env.getenv('BATCH_MASTER_PORT')
self.worker_port = env.getenv('BATCH_WORKER_PORT')
@ -209,7 +206,7 @@ class TaskMgr(threading.Thread):
return task, index, worker
# find timeout instance
elif instance['status'] == RUNNING:
if not is_alive(instance['worker']):
if not self.is_alive(instance['worker']):
instance['status'] = FAILED
instance['token'] = ''
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
@ -297,28 +294,29 @@ class TaskMgr(threading.Thread):
task = Task(TaskInfo(
id = taskid,
username = username,
instanceCount = json_task['instanceCount'],
maxRetryCount = json_task['maxRetryCount'],
timeout = json_task['timeout'],
instanceCount = int(json_task['instCount']),
maxRetryCount = int(json_task['retryCount']),
timeout = int(json_task['expTime']),
parameters = Parameters(
command = Command(
commandLine = json_task['parameters']['command']['commandLine'],
packagePath = json_task['parameters']['command']['packagePath'],
envVars = json_task['parameters']['command']['envVars']),
stderrRedirectPath = json_task['parameters']['stderrRedirectPath'],
stdoutRedirectPath = json_task['parameters']['stdoutRedirectPath']),
commandLine = json_task['command'],
packagePath = json_task['srcAddr'],
envVars = {}),
stderrRedirectPath = json_task['stdErrRedPth'],
stdoutRedirectPath = json_task['stdOutRedPth']),
cluster = Cluster(
image = Image(
name = json_task['cluster']['image']['name'],
type = json_task['cluster']['image']['type'],
owner = json_task['cluster']['image']['owner']),
name = 'base', #json_task['cluster']['image']['name'],
type = Image.BASE, #json_task['cluster']['image']['type'],
owner = 'base'), #json_task['cluster']['image']['owner']),
instance = Instance(
cpu = json_task['cluster']['instance']['cpu'],
memory = json_task['cluster']['instance']['memory'],
disk = json_task['cluster']['instance']['disk'],
gpu = json_task['cluster']['instance']['gpu']))))
task.info.cluster.mount.extend([Mount(localPath=mount['localPath'], remotePath=mount['remotePath'])
for mount in json_task['cluster']['mount']])
cpu = int(json_task['cpuSetting']),
memory = int(json_task['memorySetting']),
disk = int(json_task['diskSetting']),
gpu = int(json_task['gpuSetting'])))))
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
remotePath=json_task['mapping'][mapping_key]['mappingRemotefDir'])
for mapping_key in json_task['mapping']])
self.task_queue.append(task)

View File

@ -94,7 +94,7 @@ class SimulatedJobMgr(threading.Thread):
task['timeout'] = timeout
task['parameters'] = {}
task['parameters']['command'] = {}
task['parameters']['command']['commandLine'] = ''
task['parameters']['command']['commandLine'] = 'ls'
task['parameters']['command']['packagePath'] = ''
task['parameters']['command']['envVars'] = {'a':'1'}
task['parameters']['stderrRedirectPath'] = ''
@ -111,7 +111,7 @@ class SimulatedJobMgr(threading.Thread):
task['cluster']['instance']['gpu'] = 0
task['cluster']['mount'] = [{'remotePath':'', 'localPath':''}]
taskmgr.add_task('user', taskid, json.dumps(task))
taskmgr.add_task('root', taskid, json.dumps(task))
class SimulatedLogger():