let taskcontroller be started by docklet-worker

This commit is contained in:
zhuyj17 2018-07-28 17:32:01 +08:00
parent 4c0891938d
commit f613591ffe
4 changed files with 99 additions and 13 deletions

View File

@ -45,6 +45,13 @@ DAEMON_OPTS=
# The process ID of the script when it runs is stored here:
PIDFILE=$RUN_DIR/$DAEMON_NAME.pid
# settings for docklet batch worker, which is required for batch job processing system
BATCH_ON=True
DAEMON_BATCH=$DOCKLET_LIB/worker/taskcontroller.py
DAEMON_NAME_BATCH=docklet-taskcontroller
PIDFILE_BATCH=$RUN_DIR/batch.pid
DAEMON_OPTS_BATCH=
# settings for docklet proxy, which is required for web access
DAEMON_PROXY=`which configurable-http-proxy`
DAEMON_NAME_PROXY=docklet-proxy
@ -104,6 +111,19 @@ do_start() {
log_end_msg $?
}
do_start_batch () {
if [ "$BATCH_ON" = "False" ]
then
return 1
fi
log_daemon_msg "Starting $DAEMON_NAME_BATCH in $FS_PREFIX"
DAEMON_OPTS_BATCH=""
start-stop-daemon --start --background --pidfile $PIDFILE_BATCH --make-pidfile --user $DAEMON_USER --chuid $DAEMON_USER --startas $DAEMON_BATCH -- $DAEMON_OPTS_BATCH
log_end_msg $?
}
do_start_proxy () {
if [ "$DISTRIBUTED_GATEWAY" = "False" ]
then
@ -121,6 +141,16 @@ do_stop () {
log_end_msg $?
}
do_stop_batch () {
if [ "$BATCH_ON" = "False" ]
then
return 1
fi
log_daemon_msg "Stopping $DAEMON_NAME_BATCH daemon"
start-stop-daemon --stop --quiet --oknodo --remove-pidfile --pidfile $PIDFILE_BATCH --retry 10
log_end_msg $?
}
do_stop_proxy () {
if [ "$DISTRIBUTED_GATEWAY" = "False" ]
then
@ -149,11 +179,13 @@ do_stop_meter() {
case "$1" in
start)
do_start
do_start_batch
do_start_proxy
;;
stop)
do_stop
do_stop_batch
do_stop_proxy
;;
start-meter)
@ -164,6 +196,15 @@ case "$1" in
do_stop_meter
;;
start_batch)
pre_start
do_start_batch
;;
stop_batch)
do_stop_batch
;;
start_proxy)
do_start_proxy
;;
@ -179,13 +220,16 @@ case "$1" in
restart)
do_stop
do_stop_batch
do_stop_proxy
do_start
do_start_batch
do_start_proxy
;;
status)
status_of_proc -p $PIDFILE "$DAEMON" "$DAEMON_NAME" && exit 0 || exit $?
status_of_proc -p $PIDFILE_BATCH "$DAEMON_BATCH" "$DAEMON_NAME_BATCH" || status=$?
status_of_proc -p $PIDFILE_PROXY "$DAEMON_PROXY" "$DAEMON_NAME_PROXY" || status=$?
;;
*)

View File

@ -47,9 +47,6 @@
# CLUSTER_NET: cluster network ip address range, default is 172.16.0.1/16
# CLUSTER_NET=172.16.0.1/16
# BATCH_NET: ip addresses range of containers for batch job, default is 10.0.3.0/24
# BATCH_NET=10.0.3.0/24
# Deprecated since v0.2.7. read from quota group set in web admin page
# CONTAINER_CPU: CPU quota of container, default is 100000
# A single CPU core has total=100000 (100ms), so the default 100000
@ -185,3 +182,32 @@
# ALLOW_SCALE_OUT: allow docklet to rent server on the cloud to scale out
# Only when you deploy docklet on the cloud can you set it to True
# ALLOW_SCALE_OUT=False
# ==================================================
#
# Batch Config
#
# ==================================================
# BATCH_ON: whether to start batch job processing system when start
# the docklet. Default: True
# BATCH_ON=True
# BATCH_MASTER_PORT: the rpc server port on master.
# default: 50050
# BATCH_MASTER_PORT=50050
# BATCH_WORKER_PORT: the rpc server port on worker.
# default: 50051
# BATCH_WORKER_PORT=50051
# BATCH_GATEWAY: the ip address of gateway for the containers processing
# batch jobs. default: 10.0.3.1
# BATCH_GATEWAY=10.0.3.1
# BATCH_NET: ip addresses range of containers for batch job, default is 10.0.3.0/24
# BATCH_NET=10.0.3.0/24
# BATCH_MAX_THREAD_WORKER: the maximun number of threads of the rpc server on
# the batch job worker. default:5
# BATCH_MAX_THREAD_WORKER=5

View File

@ -9,8 +9,6 @@ def getenv(key):
return int(os.environ.get("CLUSTER_SIZE", 1))
elif key == "CLUSTER_NET":
return os.environ.get("CLUSTER_NET", "172.16.0.1/16")
elif key == "BATCH_NET":
return os.environ.get("BATCH_NET","10.0.3.0/24")
elif key == "CONTAINER_CPU":
return int(os.environ.get("CONTAINER_CPU", 100000))
elif key == "CONTAINER_DISK":
@ -81,5 +79,17 @@ def getenv(key):
return os.environ.get("ALLOCATED_PORTS","10000-65535")
elif key =="ALLOW_SCALE_OUT":
return os.environ.get("ALLOW_SCALE_OUT", "False")
elif key == "BATCH_ON":
return os.environ.get("BATCH_ON","True")
elif key == "BATCH_MASTER_PORT":
return os.environ.get("BATCH_MASTER_PORT","50050")
elif key == "BATCH_WORKER_PORT":
return os.environ.get("BATCH_WORKER_PORT","50051")
elif key == "BATCH_GATEWAY":
return os.environ.get("BATCH_GATEWAY","10.0.3.1")
elif key == "BATCH_NET":
return os.environ.get("BATCH_NET","10.0.3.0/24")
elif key == "BATCH_MAX_THREAD_WORKER":
return os.environ.get("BATCH_MAX_THREAD_WORKER","5")
else:
return os.environ.get(key,"")

22
src/worker/taskcontroller.py Normal file → Executable file
View File

@ -3,11 +3,11 @@ import sys
if sys.path[0].endswith("worker"):
sys.path[0] = sys.path[0][:-6]
from utils import env, tools
#config = env.getenv("CONFIG")
config = "/opt/docklet/local/docklet-running.conf"
config = env.getenv("CONFIG")
#config = "/opt/docklet/local/docklet-running.conf"
tools.loadenv(config)
from utils.log import initlogging
initlogging("docklet-worker")
initlogging("docklet-taskcontroller")
from utils.log import logger
from concurrent import futures
@ -33,14 +33,17 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
self.fspath = env.getenv('FS_PREFIX')
self.confpath = env.getenv('DOCKLET_CONF')
self.lock = threading.Lock()
self.cons_gateway = '10.0.3.1'
self.cons_ips = '10.0.3.0/24'
self.cons_gateway = env.getenv('BATCH_GATEWAY')
self.cons_ips = env.getenv('BATCH_NET')
logger.info("Batch gateway ip address %s" % self.cons_gateway)
logger.info("Batch ip pools %s" % self.cons_ips)
self.cidr = 32 - int(self.cons_ips.split('/')[1])
self.ipbase = ip_to_int(self.cons_ips.split('/')[0])
self.free_ips = []
for i in range(2, (1 << self.cidr) - 1):
self.free_ips.append(i)
logger.info("Free ip addresses pool %s" % str(self.free_ips))
logger.info('TaskController init success')
@ -184,11 +187,14 @@ class TaskController(rpc_pb2_grpc.WorkerServicer):
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
def TaskControllerServe():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=5))
max_threads = int(env.getenv('BATCH_MAX_THREAD_WORKER'))
worker_port = int(env.getenv('BATCH_WORKER_PORT'))
logger.info("Max Threads on a worker is %d" % max_threads)
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_threads))
rpc_pb2_grpc.add_WorkerServicer_to_server(TaskController(), server)
server.add_insecure_port('[::]:50051')
server.add_insecure_port('[::]:'+str(worker_port))
server.start()
logger.info("Start TaskController Servicer")
logger.info("Start TaskController Servicer on port:%d" % worker_port)
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)