Fix conflict

This commit is contained in:
zhuyj17 2018-05-27 14:30:05 +08:00
commit 1a5632e574
4 changed files with 108 additions and 0 deletions

View File

@ -23,7 +23,11 @@ import os
import http.server, cgi, json, sys, shutil
import xmlrpc.client
from socketserver import ThreadingMixIn
<<<<<<< HEAD
import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr, lockmgr, cloudmgr
=======
import nodemgr, vclustermgr, etcdlib, network, imagemgr, notificationmgr, lockmgr, jobmgr, taskmgr
>>>>>>> 2c2f318b64cbd0af06780df49c1320432d437eae
from logs import logs
import userManager,beansapplicationmgr
import monitor,traceback
@ -719,6 +723,26 @@ def resetall_system(user, beans, form):
return json.dumps({'success':'false', 'message': message})
return json.dumps(result)
@app.route("/batch/job/add/", methods=['POST'])
@login_required
def add_job(user,beans,form):
pass
@app.route("/batch/job/list/", methods=['POST'])
@login_required
def list_job(user,beans,form):
pass
@app.route("/batch/job/info/", methods=['POST'])
@login_required
def info_job(user,beans,form):
pass
@app.route("/batch/task/info/", methods=['POST'])
@login_required
def info_task(user,beans,form):
pass
# @app.route("/inside/cluster/scaleout/", methods=['POST'])
# @inside_ip_required
# def inside_cluster_scalout(cur_user, cluster_info, form):
@ -786,6 +810,8 @@ if __name__ == '__main__':
global G_applicationmgr
global G_ulockmgr
global G_cloudmgr
global G_jobmgr
global G_taskmgr
# move 'tools.loadenv' to the beginning of this file
fs_path = env.getenv("FS_PREFIX")
@ -878,6 +904,8 @@ if __name__ == '__main__':
G_networkmgr.printpools()
G_cloudmgr = cloudmgr.CloudMgr()
G_taskmgr = taskmgr.TaskMgr()
G_jobmgr = jobmgr.JobMgr(taskmgr)
# start NodeMgr and NodeMgr will wait for all nodes to start ...
G_nodemgr = nodemgr.NodeMgr(G_networkmgr, etcdclient, addr = ipaddr, mode=mode)

37
src/jobmgr.py Normal file
View File

@ -0,0 +1,37 @@
class JobMgr(object):
# user: username
# job: a json string
# user submit a new job, add this job to queue and database
# call add_task to add task information
def add_job(self, user, job):
pass
# user: username
# list a user's all job
def list_jobs(self,user):
pass
# user: username
# jobid: the id of job
# get the information of a job, including the status, json description and other informationa
# call get_task to get the task information
def get_job(self, user, jobid):
pass
# job: a json string
# this is a thread to process a job
def job_processor(self, job):
# according the DAG of job, add task to taskmanager
# wait for all task completed and exit
pass
# this is a thread to schedule the jobs
def job_scheduler(self):
# choose a job from queue, create a job processor for it
pass
# load job information from etcd
# initial a job queue and job schedueler
def __init__(self, taskmgr):
pass

41
src/taskmgr.py Normal file
View File

@ -0,0 +1,41 @@
class TaskMgr(object):
# task: a json string
# this is a thread to process task(or a instance)
def task_processor(self,task):
# call the rpc to call a function in worker
# create container -> execute task
# (one instance or multiple instances)
# retry when failed
pass
# this is a thread to schdule the tasks
def task_scheduler(self):
# choose a task from queue, create a task processor for it
pass
# user: username
# task: a json string
# save the task information into database
def add_task(self,user,task):
pass
# user: username
# jobid: the id of job
# taskid: the id of task
# get the information of a task, including the status, task description and other information
def get_task(self, user, jobid, taskid):
pass
# task: a json string
# this is a rpc function for worker, task processor call this function to execute a task in a worker
@staticmethod
def execute_task(self,task):
return
# load task information from etcd
# initial a task queue and task schedueler
# taskmgr: a taskmgr instance
def __init__(self):
pass

View File

@ -17,6 +17,7 @@ import etcdlib, network, container
from nettools import netcontrol,ovscontrol,portcontrol
import monitor, proxytool
from lvmtool import new_group, recover_group
from taskmgr import TaskMgr
##################################################################
# Worker
@ -139,6 +140,7 @@ class Worker(object):
self.rpcserver.register_function(proxytool.delete_route)
self.rpcserver.register_function(portcontrol.acquire_port_mapping)
self.rpcserver.register_function(portcontrol.release_port_mapping)
self.rpcserver.register_function(TaskMgr.execute_task)
# register functions or instances to server for rpc
#self.rpcserver.register_function(function_name)