Merge branch 'batch' of https://github.com/FirmlyReality/docklet into batch
This commit is contained in:
commit
4912526dac
|
@ -725,12 +725,82 @@ def resetall_system(user, beans, form):
|
|||
@app.route("/batch/job/add/", methods=['POST'])
|
||||
@login_required
|
||||
def add_job(user,beans,form):
|
||||
pass
|
||||
global G_jobmgr
|
||||
job_data = form.to_dict()
|
||||
job_info = {
|
||||
'tasks': {}
|
||||
}
|
||||
message = {
|
||||
'success': 'true',
|
||||
'message': 'add batch job success'
|
||||
}
|
||||
for key in job_data:
|
||||
key_arr = key.split('_')
|
||||
value = job_data[key]
|
||||
if key_arr[0] != 'dependency' and value == '':
|
||||
message['success'] = 'false'
|
||||
message['message'] = 'value of %s is null' % key
|
||||
elif len(key_arr) == 1:
|
||||
job_info[key_arr[0]] = value
|
||||
elif len(key_arr) == 2:
|
||||
key_prefix, task_idx = key_arr[0], key_arr[1]
|
||||
task_idx = 'task_' + task_idx
|
||||
if task_idx in job_info["tasks"]:
|
||||
job_info["tasks"][task_idx][key_prefix] = value
|
||||
else:
|
||||
tmp_dict = {
|
||||
key_prefix: value
|
||||
}
|
||||
job_info["tasks"][task_idx] = tmp_dict
|
||||
elif len(key_arr) == 3:
|
||||
key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2]
|
||||
task_idx = 'task_' + task_idx
|
||||
mapping_idx = 'mapping_' + mapping_idx
|
||||
if task_idx in job_info["tasks"]:
|
||||
if "mapping" in job_info["tasks"][task_idx]:
|
||||
if mapping_idx in job_info["tasks"][task_idx]["mapping"]:
|
||||
job_info["tasks"][task_idx]["mapping"][mapping_idx][key_prefix] = value
|
||||
else:
|
||||
tmp_dict = {
|
||||
key_prefix: value
|
||||
}
|
||||
job_info["tasks"][task_idx]["mapping"][mapping_idx] = tmp_dict
|
||||
else:
|
||||
job_info["tasks"][task_idx]["mapping"] = {
|
||||
mapping_idx: {
|
||||
key_prefix: value
|
||||
}
|
||||
}
|
||||
else:
|
||||
tmp_dict = {
|
||||
"mapping":{
|
||||
mapping_idx: {
|
||||
key_prefix: value
|
||||
}
|
||||
}
|
||||
}
|
||||
job_info["tasks"][task_idx] = tmp_dict
|
||||
logger.debug('batch job adding info %s' % json.dumps(job_info, indent=4))
|
||||
[status, msg] = G_jobmgr.add_job(user, job_info)
|
||||
if status:
|
||||
return json.dumps(message)
|
||||
else:
|
||||
logger.debug('fail to add batch job: %s' % msg)
|
||||
message["success"] = "false"
|
||||
message["message"] = msg
|
||||
return json.dumps(message)
|
||||
return json.dumps(message)
|
||||
|
||||
@app.route("/batch/job/list/", methods=['POST'])
|
||||
@login_required
|
||||
def list_job(user,beans,form):
|
||||
pass
|
||||
global G_jobmgr
|
||||
result = {
|
||||
'status': 'true',
|
||||
'data': G_jobmgr.list_jobs(user)
|
||||
}
|
||||
return json.dumps(result)
|
||||
|
||||
|
||||
@app.route("/batch/job/info/", methods=['POST'])
|
||||
@login_required
|
||||
|
@ -932,4 +1002,10 @@ if __name__ == '__main__':
|
|||
# server = http.server.HTTPServer((masterip, masterport), DockletHttpHandler)
|
||||
logger.info("starting master server")
|
||||
|
||||
G_taskmgr = taskmgr.TaskMgr(G_nodemgr, monitor.Fetcher)
|
||||
G_jobmgr = jobmgr.JobMgr(G_taskmgr)
|
||||
G_taskmgr.set_jobmgr(G_jobmgr)
|
||||
G_taskmgr.start()
|
||||
G_jobmgr.start()
|
||||
|
||||
app.run(host = masterip, port = masterport, threaded=True)
|
||||
|
|
|
@ -1,43 +1,158 @@
|
|||
import time, threading
|
||||
import time, threading, random, string
|
||||
import master.monitor
|
||||
|
||||
from utils.log import initlogging, logger
|
||||
initlogging("docklet-jobmgr")
|
||||
|
||||
class JobMgr(object):
|
||||
class BatchJob(object):
|
||||
def __init__(self, user, job_info):
|
||||
self.user = user
|
||||
self.raw_job_info = job_info
|
||||
self.task_queue = []
|
||||
self.task_finished = []
|
||||
self.job_id = None
|
||||
self.job_name = job_info['jobName']
|
||||
self.status = 'pending'
|
||||
self.create_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime())
|
||||
self.top_sort()
|
||||
|
||||
def top_sort(self):
|
||||
logger.debug('top sorting')
|
||||
tasks = self.raw_job_info["tasks"]
|
||||
dependency_graph = {}
|
||||
for task_idx in tasks:
|
||||
dependency_graph[task_idx] = set()
|
||||
task_info = tasks[task_idx]
|
||||
dependency = task_info['dependency'].strip().replace(' ', '').split(',')
|
||||
if len(dependency) == 1 and dependency[0] == '':
|
||||
continue
|
||||
for t in dependency:
|
||||
if not t in tasks:
|
||||
raise ValueError('task %s is not defined in the dependency of task %s' % (t, task_idx))
|
||||
dependency_graph[task_idx].add(t)
|
||||
while len(dependency_graph) > 0:
|
||||
s = set()
|
||||
flag = False
|
||||
for task_idx in dependency_graph:
|
||||
if len(dependency_graph[task_idx]) == 0:
|
||||
flag = True
|
||||
s.add(task_idx)
|
||||
for task_idx in s:
|
||||
dependency_graph.pop(task_idx)
|
||||
#there is a circle in the graph
|
||||
if not flag:
|
||||
raise ValueError('there is a circle in the dependency graph')
|
||||
break
|
||||
for task_idx in dependency_graph:
|
||||
for t in s:
|
||||
if t in dependency_graph[task_idx]:
|
||||
dependency_graph[task_idx].remove(t)
|
||||
self.task_queue.append({
|
||||
'task_idx': s,
|
||||
'status': 'pending'
|
||||
})
|
||||
|
||||
# get a task and pass it to taskmgr
|
||||
def get_task(self):
|
||||
for task in self.task_queue:
|
||||
if task['status'] == 'pending':
|
||||
task_idx = task['task_idx'].pop()
|
||||
task['status'] = 'running'
|
||||
task_name = self.user + '_' + self.job_id + '_' + task_idx
|
||||
return task_name, self.raw_job_info["tasks"][task_idx]
|
||||
return '', None
|
||||
|
||||
# a task has finished
|
||||
def finish_task(self, task_idx):
|
||||
pass
|
||||
|
||||
class JobMgr(threading.Thread):
|
||||
# load job information from etcd
|
||||
# initial a job queue and job schedueler
|
||||
def __init__(self, taskmgr):
|
||||
threading.Thread.__init__(self)
|
||||
self.job_queue = []
|
||||
self.job_map = {}
|
||||
self.taskmgr = taskmgr
|
||||
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
self.job_scheduler()
|
||||
time.sleep(2)
|
||||
|
||||
|
||||
# user: username
|
||||
# job: a json string
|
||||
# job_data: a json string
|
||||
# user submit a new job, add this job to queue and database
|
||||
# call add_task to add task information
|
||||
def add_job(self, user, job):
|
||||
pass
|
||||
def add_job(self, user, job_info):
|
||||
try:
|
||||
job = BatchJob(user, job_info)
|
||||
job.job_id = self.gen_jobid()
|
||||
self.job_queue.append(job.job_id)
|
||||
self.job_map[job.job_id] = job
|
||||
except ValueError as err:
|
||||
return [False, err.args[0]]
|
||||
except Exception as err:
|
||||
return [False, err.args[0]]
|
||||
finally:
|
||||
return [True, "add batch job success"]
|
||||
|
||||
# user: username
|
||||
# list a user's all job
|
||||
def list_jobs(self,user):
|
||||
pass
|
||||
res = []
|
||||
for job_id in self.job_queue:
|
||||
job = self.job_map[job_id]
|
||||
logger.debug('job_id: %s, user: %s' % (job_id, job.user))
|
||||
if job.user == user:
|
||||
res.append({
|
||||
'job_name': job.job_name,
|
||||
'job_id': job.job_id,
|
||||
'status': job.status,
|
||||
'create_time': job.create_time
|
||||
})
|
||||
return res
|
||||
|
||||
# user: username
|
||||
# jobid: the id of job
|
||||
# get the information of a job, including the status, json description and other informationa
|
||||
# call get_task to get the task information
|
||||
def get_job(self, user, jobid):
|
||||
def get_job(self, user, job_id):
|
||||
pass
|
||||
|
||||
# job: a json string
|
||||
# check if a job exists
|
||||
def is_job_exist(self, job_id):
|
||||
return job_id in self.job_queue
|
||||
|
||||
# generate a random job id
|
||||
def gen_jobid(self):
|
||||
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
|
||||
while self.is_job_exist(job_id):
|
||||
job_id = ''.join(random.sample(string.ascii_letters + string.digits, 8))
|
||||
return job_id
|
||||
|
||||
# this is a thread to process a job
|
||||
def job_processor(self, job):
|
||||
# according the DAG of job, add task to taskmanager
|
||||
# wait for all task completed and exit
|
||||
pass
|
||||
task_name, task_info = job.get_task()
|
||||
if not task_info:
|
||||
return False
|
||||
else:
|
||||
self.taskmgr.add_task(job.user, task_name, task_info)
|
||||
return True
|
||||
|
||||
# this is a thread to schedule the jobs
|
||||
def job_scheduler(self):
|
||||
# choose a job from queue, create a job processor for it
|
||||
for job_id in self.job_queue:
|
||||
job = self.job_map[job_id]
|
||||
if self.job_processor(job):
|
||||
job.status = 'running'
|
||||
break
|
||||
else:
|
||||
job.status = 'done'
|
||||
|
||||
# a task has finished
|
||||
def report(self, task):
|
||||
pass
|
||||
|
||||
# load job information from etcd
|
||||
# initial a job queue and job schedueler
|
||||
def __init__(self, taskmgr):
|
||||
pass
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
#!/user/bin/python3
|
||||
import json
|
||||
|
||||
job_data = {'image_1': 'base_base_base', 'mappingRemoteDir_2_2': 'sss', 'dependency_1': 'aaa', 'mappingLocalDir_2_1': 'xxx', 'mappingLocalDir_1_2': 'aaa', 'mappingLocalDir_1_1': 'aaa', 'mappingLocalDir_2_3': 'fdsffdf', 'mappingRemoteDir_1_1': 'ddd', 'mappingRemoteDir_2_3': 'sss', 'srcAddr_1': 'aaa', 'mappingSource_2_1': 'Aliyun', 'cpuSetting_1': '1', 'mappingSource_2_2': 'Aliyun', 'retryCount_2': '1', 'mappingSource_1_1': 'Aliyun', 'expTime_1': '60', 'diskSetting_2': '1024', 'diskSetting_1': '1024', 'dependency_2': 'ddd', 'memorySetting_1': '1024', 'command_2': 'ccc', 'mappingRemoteDir_1_2': 'ddd', 'gpuSetting_2': '0', 'memorySetting_2': '1024', 'gpuSetting_1': '0', 'mappingLocalDir_2_2': 'bbb', 'mappingSource_1_2': 'Aliyun', 'expTime_2': '60', 'mappingRemoteDir_2_1': 'vvv', 'srcAddr_2': 'fff', 'cpuSetting_2': '1', 'instCount_1': '1', 'mappingSource_2_3': 'Aliyun', 'token': 'ZXlKaGJHY2lPaUpJVXpJMU5pSXNJbWxoZENJNk1UVXpNelE0TVRNMU5Td2laWGh3SWpveE5UTXpORGcwT1RVMWZRLmV5SnBaQ0k2TVgwLkF5UnRnaGJHZXhJY2lBSURZTUd5eXZIUVJnUGd1ZTA3OEtGWkVoejJVMkE=', 'instCount_2': '1', 'retryCount_1': '1', 'command_1': 'aaa', 'taskPriority': '0', 'image_2': 'base_base_base', 'jobName': 'aaa'}
|
||||
|
||||
def parse(job_data):
|
||||
job_info = {}
|
||||
message = {}
|
||||
for key in job_data:
|
||||
key_arr = key.split('_')
|
||||
value = job_data[key]
|
||||
if len(key_arr) == 1:
|
||||
job_info[key_arr[0]] = value
|
||||
elif len(key_arr) == 2:
|
||||
key_prefix, task_idx = key_arr[0], key_arr[1]
|
||||
task_idx = 'task_' + task_idx
|
||||
if task_idx in job_info:
|
||||
job_info[task_idx][key_prefix] = value
|
||||
else:
|
||||
tmp_dict = {
|
||||
key_prefix: value
|
||||
}
|
||||
job_info[task_idx] = tmp_dict
|
||||
elif len(key_arr) == 3:
|
||||
key_prefix, task_idx, mapping_idx = key_arr[0], key_arr[1], key_arr[2]
|
||||
task_idx = 'task_' + task_idx
|
||||
mapping_idx = 'mapping_' + mapping_idx
|
||||
if task_idx in job_info:
|
||||
if "mapping" in job_info[task_idx]:
|
||||
if mapping_idx in job_info[task_idx]["mapping"]:
|
||||
job_info[task_idx]["mapping"][mapping_idx][key_prefix] = value
|
||||
else:
|
||||
tmp_dict = {
|
||||
key_prefix: value
|
||||
}
|
||||
job_info[task_idx]["mapping"][mapping_idx] = tmp_dict
|
||||
else:
|
||||
job_info[task_idx]["mapping"] = {
|
||||
mapping_idx: {
|
||||
key_prefix: value
|
||||
}
|
||||
}
|
||||
else:
|
||||
tmp_dict = {
|
||||
"mapping":{
|
||||
mapping_idx: {
|
||||
key_prefix: value
|
||||
}
|
||||
}
|
||||
}
|
||||
job_info[task_idx] = tmp_dict
|
||||
print(json.dumps(job_info, indent=4))
|
||||
|
||||
if __name__ == '__main__':
|
||||
parse(job_data)
|
|
@ -5,9 +5,7 @@ import random
|
|||
import json
|
||||
|
||||
# must import logger after initlogging, ugly
|
||||
# from utils.log import initlogging
|
||||
# initlogging("docklet-taskmgr")
|
||||
# from utils.log import logger
|
||||
from utils.log import logger
|
||||
|
||||
# grpc
|
||||
from concurrent import futures
|
||||
|
@ -42,13 +40,12 @@ class TaskMgr(threading.Thread):
|
|||
# load task information from etcd
|
||||
# initial a task queue and task schedueler
|
||||
# taskmgr: a taskmgr instance
|
||||
def __init__(self, nodemgr, monitor_fetcher, logger, worker_timeout=60, scheduler_interval=2):
|
||||
def __init__(self, nodemgr, monitor_fetcher, scheduler_interval=2, external_logger=None):
|
||||
threading.Thread.__init__(self)
|
||||
self.thread_stop = False
|
||||
self.jobmgr = None
|
||||
self.task_queue = []
|
||||
|
||||
self.heart_beat_timeout = worker_timeout # (s)
|
||||
self.scheduler_interval = scheduler_interval
|
||||
self.logger = logger
|
||||
|
||||
|
@ -109,7 +106,6 @@ class TaskMgr(threading.Thread):
|
|||
|
||||
instance['status'] = report.instanceStatus
|
||||
instance['error_msg'] = report.errmsg
|
||||
instance['last_update_time'] = time.time()
|
||||
|
||||
if report.instanceStatus == COMPLETED:
|
||||
self.check_task_completed(task)
|
||||
|
@ -174,7 +170,6 @@ class TaskMgr(threading.Thread):
|
|||
|
||||
instance = task.instance_list[instance_id]
|
||||
instance['status'] = RUNNING
|
||||
instance['last_update_time'] = time.time()
|
||||
instance['try_count'] += 1
|
||||
instance['token'] = task.info.token
|
||||
instance['worker'] = worker_ip
|
||||
|
@ -209,12 +204,12 @@ class TaskMgr(threading.Thread):
|
|||
return task, index, worker
|
||||
# find timeout instance
|
||||
elif instance['status'] == RUNNING:
|
||||
if time.time() - instance['last_update_time'] > self.heart_beat_timeout:
|
||||
if not self.is_alive(instance['worker']):
|
||||
instance['status'] = FAILED
|
||||
instance['token'] = ''
|
||||
self.cpu_usage[instance['worker']] -= task.info.cluster.instance.cpu
|
||||
|
||||
self.logger.warning('[task_scheduler] worker timeout task [%s] instance [%d]' % (task.info.id, index))
|
||||
self.logger.warning('[task_scheduler] worker dead, retry task [%s] instance [%d]' % (task.info.id, index))
|
||||
if worker is not None:
|
||||
return task, index, worker
|
||||
|
||||
|
@ -241,8 +236,8 @@ class TaskMgr(threading.Thread):
|
|||
continue
|
||||
if task.info.cluster.instance.memory > worker_info['memory']:
|
||||
continue
|
||||
if task.info.cluster.instance.disk > worker_info['disk']:
|
||||
continue
|
||||
# if task.info.cluster.instance.disk > worker_info['disk']:
|
||||
# continue
|
||||
if task.info.cluster.instance.gpu > worker_info['gpu']:
|
||||
continue
|
||||
return worker_ip
|
||||
|
@ -259,6 +254,11 @@ class TaskMgr(threading.Thread):
|
|||
return all_nodes
|
||||
|
||||
|
||||
def is_alive(self, worker):
|
||||
nodes = self.nodemgr.get_nodeips()
|
||||
return worker in nodes
|
||||
|
||||
|
||||
def get_worker_resource_info(self, worker_ip):
|
||||
fetcher = self.monitor_fetcher(worker_ip)
|
||||
worker_info = fetcher.info
|
||||
|
@ -288,32 +288,34 @@ class TaskMgr(threading.Thread):
|
|||
# called when jobmgr assign task to taskmgr
|
||||
def add_task(self, username, taskid, json_task):
|
||||
# decode json string to object defined in grpc
|
||||
json_task = json.loads(json_task)
|
||||
self.logger.info('[taskmgr add_task] receive task %s' % taskid)
|
||||
# json_task = json.loads(json_task)
|
||||
task = Task(TaskInfo(
|
||||
id = taskid,
|
||||
username = username,
|
||||
instanceCount = json_task['instanceCount'],
|
||||
maxRetryCount = json_task['maxRetryCount'],
|
||||
timeout = json_task['timeout'],
|
||||
instanceCount = int(json_task['instCount']),
|
||||
maxRetryCount = int(json_task['retryCount']),
|
||||
timeout = int(json_task['expTime']),
|
||||
parameters = Parameters(
|
||||
command = Command(
|
||||
commandLine = json_task['parameters']['command']['commandLine'],
|
||||
packagePath = json_task['parameters']['command']['packagePath'],
|
||||
envVars = json_task['parameters']['command']['envVars']),
|
||||
stderrRedirectPath = json_task['parameters']['stderrRedirectPath'],
|
||||
stdoutRedirectPath = json_task['parameters']['stdoutRedirectPath']),
|
||||
commandLine = json_task['command'],
|
||||
packagePath = json_task['srcAddr'],
|
||||
envVars = {}),
|
||||
stderrRedirectPath = json_task['stdErrRedPth'],
|
||||
stdoutRedirectPath = json_task['stdOutRedPth']),
|
||||
cluster = Cluster(
|
||||
image = Image(
|
||||
name = json_task['cluster']['image']['name'],
|
||||
type = json_task['cluster']['image']['type'],
|
||||
owner = json_task['cluster']['image']['owner']),
|
||||
name = 'base', #json_task['cluster']['image']['name'],
|
||||
type = Image.BASE, #json_task['cluster']['image']['type'],
|
||||
owner = 'base'), #json_task['cluster']['image']['owner']),
|
||||
instance = Instance(
|
||||
cpu = json_task['cluster']['instance']['cpu'],
|
||||
memory = json_task['cluster']['instance']['memory'],
|
||||
disk = json_task['cluster']['instance']['disk'],
|
||||
gpu = json_task['cluster']['instance']['gpu']))))
|
||||
task.info.cluster.mount.extend([Mount(localPath=mount['localPath'], remotePath=mount['remotePath'])
|
||||
for mount in json_task['cluster']['mount']])
|
||||
cpu = int(json_task['cpuSetting']),
|
||||
memory = int(json_task['memorySetting']),
|
||||
disk = int(json_task['diskSetting']),
|
||||
gpu = int(json_task['gpuSetting'])))))
|
||||
task.info.cluster.mount.extend([Mount(localPath=json_task['mapping'][mapping_key]['mappingLocalDir'],
|
||||
remotePath=json_task['mapping'][mapping_key]['mappingRemoteDir'])
|
||||
for mapping_key in json_task['mapping']])
|
||||
self.task_queue.append(task)
|
||||
|
||||
|
||||
|
@ -324,3 +326,4 @@ class TaskMgr(threading.Thread):
|
|||
if task.info.id == taskid:
|
||||
return task
|
||||
return None
|
||||
|
|
@ -11,7 +11,7 @@ def run():
|
|||
stub = rpc_pb2_grpc.WorkerStub(channel)
|
||||
|
||||
comm = rpc_pb2.Command(commandLine="echo \"s\" | awk '{print \"test4\\n\\\"\"}' > test.txt;cat test.txt", packagePath="/root", envVars={'test1':'10','test2':'20'}) # | awk '{print \"test\\\"\\n\"}'
|
||||
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="/test1/test2")
|
||||
paras = rpc_pb2.Parameters(command=comm, stderrRedirectPath="/root/nfs/", stdoutRedirectPath="/root/nfs/")
|
||||
|
||||
img = rpc_pb2.Image(name="base", type=rpc_pb2.Image.BASE, owner="docklet")
|
||||
inst = rpc_pb2.Instance(cpu=2, memory=2000, disk=500, gpu=0)
|
||||
|
|
|
@ -94,7 +94,7 @@ class SimulatedJobMgr(threading.Thread):
|
|||
task['timeout'] = timeout
|
||||
task['parameters'] = {}
|
||||
task['parameters']['command'] = {}
|
||||
task['parameters']['command']['commandLine'] = ''
|
||||
task['parameters']['command']['commandLine'] = 'ls'
|
||||
task['parameters']['command']['packagePath'] = ''
|
||||
task['parameters']['command']['envVars'] = {'a':'1'}
|
||||
task['parameters']['stderrRedirectPath'] = ''
|
||||
|
@ -111,7 +111,7 @@ class SimulatedJobMgr(threading.Thread):
|
|||
task['cluster']['instance']['gpu'] = 0
|
||||
task['cluster']['mount'] = [{'remotePath':'', 'localPath':''}]
|
||||
|
||||
taskmgr.add_task('user', taskid, json.dumps(task))
|
||||
taskmgr.add_task('root', taskid, json.dumps(task))
|
||||
|
||||
|
||||
class SimulatedLogger():
|
||||
|
@ -135,13 +135,27 @@ def test():
|
|||
jobmgr = SimulatedJobMgr()
|
||||
jobmgr.start()
|
||||
|
||||
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, SimulatedLogger(), worker_timeout=10, scheduler_interval=2)
|
||||
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, external_logger=SimulatedLogger())
|
||||
taskmgr.set_jobmgr(jobmgr)
|
||||
taskmgr.start()
|
||||
|
||||
add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048)
|
||||
|
||||
|
||||
def test2():
|
||||
global jobmgr
|
||||
global taskmgr
|
||||
jobmgr = SimulatedJobMgr()
|
||||
jobmgr.start()
|
||||
|
||||
taskmgr = master.taskmgr.TaskMgr(SimulatedNodeMgr(), SimulatedMonitorFetcher, scheduler_interval=2, external_logger=SimulatedLogger())
|
||||
taskmgr.set_jobmgr(jobmgr)
|
||||
taskmgr.start()
|
||||
|
||||
add('task_0', instance_count=2, retry_count=2, timeout=60, cpu=2, memory=2048, disk=2048)
|
||||
|
||||
|
||||
|
||||
def add(taskid, instance_count, retry_count, timeout, cpu, memory, disk):
|
||||
global jobmgr
|
||||
global taskmgr
|
||||
|
|
|
@ -122,14 +122,14 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
image['type'] = 'base'
|
||||
image['owner'] = request.cluster.image.owner
|
||||
username = request.username
|
||||
lxcname = '%s-batch-%s-%s' % (username,taskid,str(instanceid))
|
||||
token = request.token
|
||||
lxcname = '%s-batch-%s-%s-%s' % (username,taskid,str(instanceid),token)
|
||||
instance_type = request.cluster.instance
|
||||
outpath = [request.parameters.stdoutRedirectPath,request.parameters.stderrRedirectPath]
|
||||
for i in range(len(outpath)):
|
||||
if outpath[i] == "":
|
||||
outpath[i] = "/root/nfs/"
|
||||
timeout = request.timeout
|
||||
token = request.token
|
||||
|
||||
# acquire ip
|
||||
[status, ip] = self.acquire_ip()
|
||||
|
@ -146,7 +146,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
|
||||
if not os.path.isdir("%s/global/users/%s" % (self.fspath,username)):
|
||||
path = env.getenv('DOCKLET_LIB')
|
||||
subprocess.call([path+"/userinit.sh", username])
|
||||
subprocess.call([path+"/master/userinit.sh", username])
|
||||
logger.info("user %s directory not found, create it" % username)
|
||||
sys_run("mkdir -p /var/lib/lxc/%s" % lxcname)
|
||||
logger.info("generate config file for %s" % lxcname)
|
||||
|
@ -211,7 +211,7 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
|
|||
return [False,msg]
|
||||
logger.info("Succeed to moving output_tmp to nfs/%s" % tmpfilename)
|
||||
|
||||
if "/root/nfs/"+tmpfilename == filepath:
|
||||
if os.path.abspath("/root/nfs/"+tmpfilename) == os.path.abspath(filepath):
|
||||
return [True,""]
|
||||
ret = subprocess.run(cmd % ("/root/nfs/"+tmpfilename,filepath),stdout=subprocess.PIPE,stderr=subprocess.STDOUT, shell=True)
|
||||
if ret.returncode != 0:
|
||||
|
|
|
@ -40,15 +40,15 @@
|
|||
</div>
|
||||
</div>
|
||||
<div class="box-body">
|
||||
<form id="form" class="form-horizontal" action="/batch_job/add/" method="POST">
|
||||
<form id="form" class="form-horizontal" action="/batch_job/{{masterips[0].split("@")[0]}}/add/" method="POST">
|
||||
|
||||
<div class="form-group"><label class="col-sm-2 control-label">Job Name</label>
|
||||
<div class="col-sm-10"><input type="text" class="form-control" name="job_name" id="job_name"></div>
|
||||
<div class="col-sm-10"><input type="text" class="form-control" name="jobName" id="job_name"></div>
|
||||
</div>
|
||||
<div class="hr-line-dashed"></div>
|
||||
<br/>
|
||||
<div class="form-group"><label class="col-sm-2 control-label">Priority</label>
|
||||
<div class="col-sm-10"><select id="priority_selector" class="form-control">
|
||||
<div class="col-sm-10"><select id="priority_selector" class="form-control" name="taskPriority">
|
||||
{% for priority in range(10) %}
|
||||
<option value="{{priority}}">{{priority}}</option>
|
||||
{% endfor %}
|
||||
|
@ -117,11 +117,11 @@
|
|||
var remote_dir = new_mapping.insertCell();
|
||||
var source = new_mapping.insertCell();
|
||||
var remove = new_mapping.insertCell();
|
||||
local_dir.innerHTML = '<input type="text" class="form-control" name="mapping_local_dir_' + task_number + '_' + mapping_number + '" id="mapping_local_dir_'
|
||||
local_dir.innerHTML = '<input type="text" class="form-control" name="mappingLocalDir_' + task_number + '_' + mapping_number + '" id="mapping_local_dir_'
|
||||
+ task_number + '_' + mapping_number + '" />';
|
||||
remote_dir.innerHTML = '<input type="text" class="form-control" name="mapping_remote_dir_' + task_number + '_' + mapping_number + '" id="mapping_remote_dir_'
|
||||
remote_dir.innerHTML = '<input type="text" class="form-control" name="mappingRemoteDir_' + task_number + '_' + mapping_number + '" id="mapping_remote_dir_'
|
||||
+ task_number + '_' + mapping_number + '" />';
|
||||
source.innerHTML = '<select class="form-control" name="mapping_source_' + task_number + '_' + mapping_number + '" id="mapping_source_'
|
||||
source.innerHTML = '<select class="form-control" name="mappingSource_' + task_number + '_' + mapping_number + '" id="mapping_source_'
|
||||
+ task_number + '_' + mapping_number + '">'
|
||||
+'<option>Aliyun</option><option>AWS</option></select>';
|
||||
remove.innerHTML = '<div class="box-tool pull-left"><button type="button" id="' + task_number + '_' + mapping_number +'" onclick="removeMapping(this)" class="btn btn-xs btn-danger">'
|
||||
|
@ -171,6 +171,13 @@
|
|||
+'<div class="col-sm-3"><input type="number" class="form-control" name="expTime_' + task_number + '" id="expTime_' + task_number + '" value= 60 />'
|
||||
+'</div>Seconds</div>'
|
||||
+'<div class="form-group">'
|
||||
+'<label class="col-sm-2 control-label">Stderr Redirect Path</label>'
|
||||
+'<div class="col-sm-3"><input type="text" class="form-control" name="stdErrRedPth_' + task_number + '" id="stdErrRedPth_' + task_number + '" />'
|
||||
+'</div>'
|
||||
+'<label class="col-sm-2 control-label">Stdout Redirect Path</label>'
|
||||
+'<div class="col-sm-3"><input type="text" class="form-control" name="stdOutRedPth_' + task_number + '" id="stdOutRedPth_' + task_number + '" />'
|
||||
+'</div></div>'
|
||||
+'<div class="form-group">'
|
||||
+'<label class="col-sm-2 control-label">Dependency <i class="fa fa-question-circle" title="The tasks that this task depends on, seperate them with commas, eg: Task_1, Task_2"></i></label>'
|
||||
+'<div class="col-sm-3"><input type="text" class="form-control" name="dependency_' + task_number + '" id="dependency_' + task_number + '" />'
|
||||
+'</div>'
|
||||
|
|
|
@ -41,9 +41,23 @@
|
|||
<th>Status</th>
|
||||
<th>Tasks</th>
|
||||
<th>Operations</th>
|
||||
<th>Running Time</th>
|
||||
<th>Create Time</th>
|
||||
<tr>
|
||||
<thead>
|
||||
<tbody>
|
||||
{% for job_info in job_list %}
|
||||
<tr>
|
||||
<th>{{ job_info['job_id'] }}</th>
|
||||
<th>{{ job_info['job_name'] }}</th>
|
||||
<th>
|
||||
{{ job_info['status'] }}
|
||||
</th>
|
||||
<th>Tasks</th>
|
||||
<th><button type="button" class="btn btn-xs btn-default"> Stop </button></th>
|
||||
<th>{{ job_info['create_time'] }}</th>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
@ -130,12 +130,11 @@ def batch_job():
|
|||
def create_batch_job():
|
||||
return createBatchJobView().as_view()
|
||||
|
||||
@app.route("/batch_job/add/", methods=['POST'])
|
||||
@app.route("/batch_job/<masterip>/add/", methods=['POST'])
|
||||
@login_required
|
||||
def add_batch_job():
|
||||
#TODO get form parameters of a job description
|
||||
job_data = {}
|
||||
job_data["job_name"] = request.form["job_name"]
|
||||
def add_batch_job(masterip):
|
||||
addBatchJobView.masterip = masterip
|
||||
addBatchJobView.job_data = request.form
|
||||
return addBatchJobView().as_view()
|
||||
|
||||
@app.route("/batch_job/state/", methods=['GET'])
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
from flask import session, redirect, request
|
||||
from webViews.view import normalView
|
||||
from webViews.log import logger
|
||||
from webViews.checkname import checkname
|
||||
from webViews.dockletrequest import dockletRequest
|
||||
|
||||
|
@ -8,8 +9,12 @@ class batchJobListView(normalView):
|
|||
|
||||
@classmethod
|
||||
def get(self):
|
||||
masterips = dockletRequest.post_to_all()
|
||||
result = dockletRequest.post("/batch/job/list/",{},masterips[0].split("@")[0])
|
||||
job_list = result.get("data")
|
||||
logger.debug("job_list: %s" % job_list)
|
||||
if True:
|
||||
return self.render(self.template_path)
|
||||
return self.render(self.template_path, job_list=job_list)
|
||||
else:
|
||||
return self.error()
|
||||
|
||||
|
@ -40,7 +45,9 @@ class addBatchJobView(normalView):
|
|||
|
||||
@classmethod
|
||||
def post(self):
|
||||
if True:
|
||||
return self.render(self.template_path)
|
||||
masterip = self.masterip
|
||||
result = dockletRequest.post("/batch/job/add/", self.job_data, masterip)
|
||||
if result.get('success', None) == "true":
|
||||
return redirect('/batch_jobs/')
|
||||
else:
|
||||
return self.error()
|
||||
|
|
Loading…
Reference in New Issue