Merge pull request #296 from zhongyehong/batch

add frame for batch
This commit is contained in:
zhong yehong 2018-05-13 15:29:20 +08:00 committed by GitHub
commit 2c2f318b64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 106 additions and 1 deletions

View File

@ -23,7 +23,7 @@ import os
import http.server, cgi, json, sys, shutil
import xmlrpc.client
from socketserver import ThreadingMixIn
import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr, lockmgr
import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr, lockmgr, jobmgr, taskmgr
from logs import logs
import userManager,beansapplicationmgr
import monitor,traceback
@ -694,6 +694,26 @@ def resetall_system(user, beans, form):
return json.dumps({'success':'false', 'message': message})
return json.dumps(result)
@app.route("/batch/job/add/", methods=['POST'])
@login_required
def add_job(user,beans,form):
pass
@app.route("/batch/job/list/", methods=['POST'])
@login_required
def list_job(user,beans,form):
pass
@app.route("/batch/job/info/", methods=['POST'])
@login_required
def info_job(user,beans,form):
pass
@app.route("/batch/task/info/", methods=['POST'])
@login_required
def info_task(user,beans,form):
pass
# @app.route("/inside/cluster/scaleout/", methods=['POST'])
# @inside_ip_required
# def inside_cluster_scalout(cur_user, cluster_info, form):
@ -760,6 +780,8 @@ if __name__ == '__main__':
global G_historymgr
global G_applicationmgr
global G_ulockmgr
global G_jobmgr
global G_taskmgr
# move 'tools.loadenv' to the beginning of this file
fs_path = env.getenv("FS_PREFIX")
@ -851,6 +873,9 @@ if __name__ == '__main__':
G_networkmgr = network.NetworkMgr(clusternet, etcdclient, mode, ipaddr)
G_networkmgr.printpools()
G_taskmgr = taskmgr.TaskMgr()
G_jobmgr = jobmgr.JobMgr(taskmgr)
# start NodeMgr and NodeMgr will wait for all nodes to start ...
G_nodemgr = nodemgr.NodeMgr(G_networkmgr, etcdclient, addr = ipaddr, mode=mode)
logger.info("nodemgr started")

37
src/jobmgr.py Normal file
View File

@ -0,0 +1,37 @@
class JobMgr(object):
# user: username
# job: a json string
# user submit a new job, add this job to queue and database
# call add_task to add task information
def add_job(self, user, job):
pass
# user: username
# list a user's all job
def list_jobs(self,user):
pass
# user: username
# jobid: the id of job
# get the information of a job, including the status, json description and other informationa
# call get_task to get the task information
def get_job(self, user, jobid):
pass
# job: a json string
# this is a thread to process a job
def job_processor(self, job):
# according the DAG of job, add task to taskmanager
# wait for all task completed and exit
pass
# this is a thread to schedule the jobs
def job_scheduler(self):
# choose a job from queue, create a job processor for it
pass
# load job information from etcd
# initial a job queue and job schedueler
def __init__(self, taskmgr):
pass

41
src/taskmgr.py Normal file
View File

@ -0,0 +1,41 @@
class TaskMgr(object):
# task: a json string
# this is a thread to process task(or a instance)
def task_processor(self,task):
# call the rpc to call a function in worker
# create container -> execute task
# (one instance or multiple instances)
# retry when failed
pass
# this is a thread to schdule the tasks
def task_scheduler(self):
# choose a task from queue, create a task processor for it
pass
# user: username
# task: a json string
# save the task information into database
def add_task(self,user,task):
pass
# user: username
# jobid: the id of job
# taskid: the id of task
# get the information of a task, including the status, task description and other information
def get_task(self, user, jobid, taskid):
pass
# task: a json string
# this is a rpc function for worker, task processor call this function to execute a task in a worker
@staticmethod
def execute_task(self,task):
return
# load task information from etcd
# initial a task queue and task schedueler
# taskmgr: a taskmgr instance
def __init__(self):
pass

View File

@ -17,6 +17,7 @@ import etcdlib, network, container
from nettools import netcontrol,ovscontrol,portcontrol
import monitor, proxytool
from lvmtool import new_group, recover_group
from taskmgr import TaskMgr
##################################################################
# Worker
@ -139,6 +140,7 @@ class Worker(object):
self.rpcserver.register_function(proxytool.delete_route)
self.rpcserver.register_function(portcontrol.acquire_port_mapping)
self.rpcserver.register_function(portcontrol.release_port_mapping)
self.rpcserver.register_function(TaskMgr.execute_task)
# register functions or instances to server for rpc
#self.rpcserver.register_function(function_name)