This commit is contained in:
Fan Shixiong 2016-05-19 06:17:05 -07:00
commit e9175e08fb
21 changed files with 1042 additions and 41 deletions

33
CHANGES
View File

@ -1,6 +1,37 @@
v0.2.7, May 17, 2016
--------------------
**Bug Fix**
* [#9] updating user profile taking effects immediately
* [#12] logging user's activity
* [#14] Can't stop vcluster by dashboard page
* [#18] subprocess call should check return status
* [#19] lxc config string in config file is limited in 16 bytes
* [#25] bug of external login
* [#30] support lxc.custom.conf in appending
* [#35] nfs mountpoint bug in imagemgr.py
* [#49] Fail to create container
* [#57] status page of normal user failed
* [#68] Not Found error when just click "Sign in" Button
* [#76] unable to show and edit user table in smartphone
**Improvement**
* [#7] enhance quota management
* [#8] independent starting of master and workers
* [#20] check typing and input on web pages and web server
* [#23] add LXCFS for container
* [#41] move system data to global/sys
* [#42] check IP and network pool when releasing IPs
* [#48] token expires after some time
* [#54] display container owner
* [#61] rewrite httprest.py using flask routing
**Notes**
* If you upgrade from former version, please run tools/upgrade.py first.
v0.2.6, Mar 31, 2016
--------------------
An initial release on github.com
- Using the open source AdminLTE theme
* Using the open source AdminLTE theme

View File

@ -1 +1 @@
0.2.6
0.2.7

View File

@ -38,6 +38,9 @@ DAEMON_OPTS=
# The process ID of the script when it runs is stored here:
PIDFILE=$RUN_DIR/$DAEMON_NAME.pid
DOCKMETER_NAME=$DAEMON_NAME-metering
DOCKMETER_PIDFILE=$RUN_DIR/$DOCKMETER_NAME.pid
. /lib/lsb/init-functions
###########
@ -90,6 +93,18 @@ do_stop () {
log_end_msg $?
}
do_start_meter() {
log_daemon_msg "Starting $DOCKMETER_NAME in $FS_PREFIX"
start-stop-daemon --start --background --pidfile $DOCKMETER_PIDFILE --make-pidfile --exec $DOCKLET_HOME/meter/main.py
log_end_msg $?
}
do_stop_meter() {
log_daemon_msg "Stopping $DOCKMETER_NAME daemon"
start-stop-daemon --stop --pidfile $DOCKMETER_PIDFILE --remove-pidfile
log_end_msg $?
}
@ -101,6 +116,13 @@ case "$1" in
stop)
do_stop
;;
start-meter)
do_start_meter
;;
stop-meter)
do_stop_meter
;;
console)
pre_start

View File

@ -41,6 +41,8 @@ lxc.cgroup.memory.limit_in_bytes = %CONTAINER_MEMORY%M
# lxc.cgroup.cpu.cfs_quota_us : quota time of this process
lxc.cgroup.cpu.cfs_quota_us = %CONTAINER_CPU%
lxc.cap.drop = sys_admin net_admin mac_admin mac_override sys_time sys_module
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/data %ROOTFS%/root/nfs none bind,rw,create=dir 0 0
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/hosts/%CLUSTERID%.hosts %ROOTFS%/etc/hosts none bind,ro,create=file 0 0
lxc.mount.entry = %FS_PREFIX%/global/users/%USERNAME%/ssh %ROOTFS%/root/.ssh none bind,ro,create=dir 0 0

99
meter/connector/master.py Executable file
View File

@ -0,0 +1,99 @@
#!/usr/bin/python3
import socket, select, errno, threading, os
class master_connector:
tcp_port = 1727
max_minions = 256
conn = {}
epoll_fd = select.epoll()
def establish_vswitch(ovsname):
os.system('ovs-vsctl del-br ovs-%s >/dev/null 2>&1' % ovsname)
os.system('ovs-vsctl add-br ovs-%s' % ovsname)
os.system('brctl addif ovs-bridge ovs-%s >/dev/null 2>&1' % ovsname)
os.system('ip link set ovs-system up')
os.system('ip link set ovs-%s up' % ovsname)
def build_gre_conn(ovsname, ipaddr):
name = ipaddr.replace('.','_')
os.system('ovs-vsctl add-port ovs-%s gre-%s -- set interface gre-%s type=gre options:remote_ip=%s 2>/dev/null' % (ovsname, name, name, ipaddr))
def break_gre_conn(ovsname, ipaddr):
name = ipaddr.replace('.','_')
os.system('ovs-vsctl del-port ovs-%s gre-%s 2>/dev/null' % (ovsname, name))
def close_connection(fd):
master_connector.epoll_fd.unregister(fd)
master_connector.conn[fd][0].close()
addr = master_connector.conn[fd][1]
master_connector.conn.pop(fd)
master_connector.break_gre_conn('master', addr)
def do_message_response(input_buffer):
assert(input_buffer == b'ack')
return b'ack'
def start():
thread = threading.Thread(target = master_connector.run_forever, args = [])
thread.setDaemon(True)
thread.start()
return thread
def run_forever():
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_fd.bind(('', master_connector.tcp_port))
listen_fd.listen(master_connector.max_minions)
master_connector.epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
datalist = {}
master_connector.establish_vswitch('master')
try:
while True:
epoll_list = master_connector.epoll_fd.poll()
for fd, events in epoll_list:
if fd == listen_fd.fileno():
fileno, addr = listen_fd.accept()
fileno.setblocking(0)
master_connector.epoll_fd.register(fileno.fileno(), select.EPOLLIN | select.EPOLLET)
master_connector.conn[fileno.fileno()] = (fileno, addr[0])
master_connector.build_gre_conn('master', addr[0])
elif select.EPOLLIN & events:
datas = b''
while True:
try:
data = master_connector.conn[fd][0].recv(10)
if not data and not datas:
master_connector.close_connection(fd)
break
else:
datas += data
except socket.error as msg:
if msg.errno == errno.EAGAIN:
try:
datalist[fd] = master_connector.do_message_response(datas)
master_connector.epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
except:
master_connector.close_connection(fd)
else:
master_connector.close_connection(fd)
break
elif select.EPOLLOUT & events:
sendLen = 0
while True:
sendLen += master_connector.conn[fd][0].send(datalist[fd][sendLen:])
if sendLen == len(datalist[fd]):
break
master_connector.epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
elif select.EPOLLHUP & events:
master_connector.close_connection(fd)
else:
continue
finally:
os.system('ovs-vsctl del-br ovs-master >/dev/null 2>&1')

43
meter/connector/minion.py Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/python3
import socket, time, threading, os
class minion_connector:
def connect(server_ip):
from connector.master import master_connector
connected = True
while True:
try:
fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
fd.connect((server_ip, master_connector.tcp_port))
connected = True
print("[info]", "connected to master.")
master_connector.establish_vswitch('minion')
master_connector.build_gre_conn('minion', server_ip)
while True:
data = b'ack'
if fd.send(data) != len(data):
break
readData = fd.recv(1024)
time.sleep(0.5)
fd.close()
except socket.error as e:
master_connector.break_gre_conn('minion', server_ip)
if connected:
print("[info]", "non-connected with master.")
except Exception as e:
pass
finally:
if connected:
os.system('ovs-vsctl del-br ovs-minion >/dev/null 2>&1')
connected = False
time.sleep(1)
def start(server_ip):
thread = threading.Thread(target = minion_connector.connect, args = [server_ip])
thread.setDaemon(True)
thread.start()
return thread

82
meter/daemon/http.py Executable file
View File

@ -0,0 +1,82 @@
import json, cgi, threading
from http.server import BaseHTTPRequestHandler, HTTPServer
class base_http_handler(BaseHTTPRequestHandler):
def load_module(self):
return None
def do_POST(self):
try:
default_exception = 'unsupported request.'
success = True
data = None
length = self.headers['content-length']
if length == None:
length = self.headers['content-length'] = 0
if int(length) > (1<<12):
raise Exception("data too large")
http_form = cgi.FieldStorage(fp=self.rfile, headers=self.headers,environ={'REQUEST_METHOD':'POST','CONTENT_TYPE': "text/html"})
form = {}
for item in http_form:
try:
value = http_form[item].file.read().strip()
except:
value = http_form[item].value
try:
value = value.decode()
except:
pass
form[item] = value
parts = self.path.split('/', 2)
if len(parts) != 3:
raise Exception(default_exception)
[null, version, path] = parts
pymodule = self.load_module() + '_' + version
module = __import__('daemon.' + pymodule)
handler = module.__dict__[pymodule].__dict__['case_handler']
method = path.replace('/', '_')
if not hasattr(handler, method):
raise Exception(default_exception)
data = handler.__dict__[method](form, self.handler_class.args)
except Exception as e:
success = False
data = {"reason": str(e)}
finally:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"success": success, "data": data}).encode())
self.wfile.write("\n".encode())
return
class master_http_handler(base_http_handler):
http_port = 1728
def load_module(self):
self.handler_class = master_http_handler
return 'master'
class minion_http_handler(base_http_handler):
http_port = 1729
def load_module(self):
self.handler_class = minion_http_handler
return 'minion'
class http_daemon_listener:
def __init__(self, handler_class, args = None):
handler_class.args = args
self.handler_class = handler_class
def listen(self):
server = HTTPServer(('', self.handler_class.http_port), self.handler_class)
server.serve_forever()

58
meter/daemon/master_v1.py Executable file
View File

@ -0,0 +1,58 @@
import subprocess, os
def http_client_post(ip, port, url, entries = {}):
import urllib.request, urllib.parse, json
url = url if not url.startswith('/') else url[1:]
response = urllib.request.urlopen('http://%s:%d/%s' % (ip, port, url), urllib.parse.urlencode(entries).encode())
obj = json.loads(response.read().decode().strip())
response.close()
return obj
class case_handler:
# [Order-by] lexicographic order
# curl -L -X POST http://0.0.0.0:1728/v1/minions/list
def minions_list(form, args):
minions = []
for item in args.conn:
minions.append(args.conn[item][1])
return {"minions": minions}
# curl -L -X POST -F mem=4096 -F cpu=2 http://0.0.0.0:1728/v1/resource/allocation
def resource_allocation(form, args):
mem = int(form['mem'])
cpu = int(form['cpu'])
candidates = {}
from daemon.http import minion_http_handler
for item in args.conn:
addr = args.conn[item][1]
obj = http_client_post(addr, minion_http_handler.http_port, '/v1/system/memsw/available')
if obj['success'] and obj['data']['Mbytes'] >= mem:
candidates[addr] = obj['data']
if len(candidates) <= 0:
raise Exception("no minions")
else:
from policy.allocate import candidates_selector
one = candidates_selector.select(candidates)
return {"recommend": one}
# curl -L -X POST -F user=docklet http://0.0.0.0:1728/v1/user/live/add
def user_live_add(form, args):
if not os.path.exists('/var/lib/docklet/global/users/%s' % form['user']):
return False
subprocess.getoutput('echo live > /var/lib/docklet/global/users/%s/status' % form['user'])
return True
# curl -L -X POST -F user=docklet http://0.0.0.0:1728/v1/user/live/remove
def user_live_remove(form, args):
subprocess.getoutput('rm -f /var/lib/docklet/global/users/%s/status' % form['user'])
return True
# curl -L -X POST http://0.0.0.0:1728/v1/user/live/list
def user_live_list(form, args):
return subprocess.getoutput('ls -1 /var/lib/docklet/global/users/*/status 2>/dev/null | awk -F\/ \'{print $(NF-1)\'}').split()

76
meter/daemon/minion_v1.py Executable file
View File

@ -0,0 +1,76 @@
from intra.system import system_manager
from intra.billing import billing_manager
from intra.cgroup import cgroup_manager
from policy.quota import *
from intra.smart import smart_controller
class case_handler:
# [Order-by] lexicographic order
# curl -L -X POST -F uuid=docklet-1-0 http://0.0.0.0:1729/v1/billing/increment
def billing_increment(form, args):
return billing_manager.fetch_increment_and_clean(form['uuid'])
# curl -L -X POST http://0.0.0.0:1729/v1/cgroup/container/list
def cgroup_container_list(form, args):
return cgroup_manager.get_cgroup_containers()
# curl -L -X POST -F policy=etime_rev_policy http://0.0.0.0:1729/v1/smart/quota/policy
def smart_quota_policy(form, args):
msg = 'success'
try:
smart_controller.set_policy(eval(form['policy']))
except Exception as e:
msg = e
return {'message': msg}
# curl -L -X POST -F uuid=n1 http://0.0.0.0:1729/v1/cgroup/container/limit
def cgroup_container_limit(form, args):
return cgroup_manager.get_container_limit(form['uuid'])
# curl -L -X POST -F uuid=n1 http://0.0.0.0:1729/v1/cgroup/container/sample
def cgroup_container_sample(form, args):
return cgroup_manager.get_container_sample(form['uuid'])
# curl -L -X POST http://0.0.0.0:1729/v1/system/loads
def system_loads(form, args):
return system_manager.get_system_loads()
# curl -L -X POST http://0.0.0.0:1729/v1/system/memsw/available
def system_memsw_available(form, args):
return system_manager.get_available_memsw()
# curl -L -X POST -F size=16 http://0.0.0.0:1729/v1/system/swap/extend
def system_swap_extend(form, args):
return system_manager.extend_swap(int(form['size']))
# curl -L -X POST http://0.0.0.0:1729/v1/system/swap/clear
def system_swap_clear(form, args):
return system_manager.clear_all_swaps()
# curl -L -X POST http://0.0.0.0:1729/v1/system/total/physical/memory
def system_total_physical_memory(form, args):
return system_manager.get_total_physical_memory_for_containers()
'''
# curl -X POST -F uuid=n1 http://0.0.0.0:1729/v1/blacklist/add
def blacklist_add(form):
exists = form['uuid'] in smart_controller.blacklist
if not exists:
smart_controller.blacklist.add(form['uuid'])
return {"changed": not exists}
# curl -X POST -F uuid=n1 http://0.0.0.0:1729/v1/blacklist/remove
def blacklist_remove(form):
exists = form['uuid'] in smart_controller.blacklist
if exists:
smart_controller.blacklist.remove(form['uuid'])
return {"changed": exists}
# curl -X POST http://0.0.0.0:1729/v1/blacklist/show
def blacklist_show(form):
blacklist = []
for item in smart_controller.blacklist:
blacklist.append(item)
return blacklist
'''

46
meter/intra/billing.py Executable file
View File

@ -0,0 +1,46 @@
import subprocess, time, os
from intra.system import system_manager
class billing_manager:
history_book = {}
def on_lxc_acct_usage(uuid, prev, curr, interval):
cpu_gen = max(0, curr['cpu_sample'] - prev['cpu_sample']) >> 20 # in ms
mem_gen = ((curr['mem_phys_sample'] + prev['mem_phys_sample']) * interval) >> 11 # in kbytes
try:
os.makedirs('%s/%s' % (system_manager.db_prefix, uuid))
except:
pass
with open('%s/%s/usage' % (system_manager.db_prefix, uuid), 'a') as fp:
fp.write('%d %d\n' % (cpu_gen, mem_gen))
def add_usage_sample(uuid, sample, interval):
if uuid in billing_manager.history_book:
billing_manager.on_lxc_acct_usage(uuid, billing_manager.history_book[uuid], sample, interval)
billing_manager.history_book[uuid] = sample
def clean_dead_node(uuid):
if uuid in billing_manager.history_book:
billing_manager.history_book.pop(uuid)
def fetch_increment_and_clean(uuid):
cpu_acct = 0.0
mem_acct = 0.0
cnt_acct = 0
try:
fetch_path = '%s/%s/%f' % (system_manager.db_prefix, uuid, time.time())
os.rename('%s/%s/usage' % (system_manager.db_prefix, uuid), fetch_path)
with open(fetch_path, 'r') as fp:
line = fp.readline()
while line != '':
[cpu, mem] = line.split()
line = fp.readline()
cnt_acct += 1
cpu_acct += float(cpu)
mem_acct += float(mem)
os.remove(fetch_path)
except:
pass
return {"cpu_acct": cpu_acct, "mem_acct": mem_acct, "cnt_acct": cnt_acct}

112
meter/intra/cgroup.py Executable file
View File

@ -0,0 +1,112 @@
import subprocess, os
class cgroup_controller:
def read_value(group, uuid, item):
path = cgroup_manager.__default_prefix__ % (group, uuid, item)
if not os.path.exists(path):
raise Exception('read: container "%s" not found!' % uuid)
with open(path, 'r') as file:
value = file.read()
return value.strip()
def write_value(group, uuid, item, value):
path = cgroup_manager.__default_prefix__ % (group, uuid, item)
if not os.path.exists(path):
raise Exception('write: container "%s" not found!' % uuid)
try:
with open(path, 'w') as file:
file.write(str(value))
except:
pass
class cgroup_manager:
__prefix_docker__ = '/sys/fs/cgroup/%s/system.slice/docker-%s.scope/%s'
__prefix_lxc__ = '/sys/fs/cgroup/%s/lxc/%s/%s'
__prefix_lxcinit__ = '/sys/fs/cgroup/%s/init.scope/lxc/%s/%s'
def set_default_memory_limit(limit):
cgroup_manager.__default_memory_limit__ = limit
def set_cgroup_prefix(prefix = __prefix_lxc__):
cgroup_manager.__default_prefix__ = prefix
def auto_detect_prefix():
cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_docker__
if len(cgroup_manager.get_cgroup_containers()) > 0:
return
cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_lxcinit__
if len(cgroup_manager.get_cgroup_containers()) > 0:
return
cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_lxc__
if len(cgroup_manager.get_cgroup_containers()) > 0:
return
# print("[info]", "set cgroup prefix to %s" % cgroup_manager.__default_prefix__)
def get_cgroup_containers():
containers = subprocess.getoutput("find %s -type d 2>/dev/null | awk -F\/ '{print $(NF-1)}'" % (cgroup_manager.__default_prefix__ % ('cpu', '*', '.'))).split()
uuids = []
for item in containers:
if item.startswith('docker-') and item.endswith('.scope') and len(item) > 64:
uuids.append(item[7:-6])
else:
uuids.append(item)
return uuids
def get_container_pid(uuid):
return int(cgroup_controller.read_value('cpu', uuid, 'tasks').split()[0])
def get_container_sample(uuid):
mem_page_sample = int(cgroup_controller.read_value('memory', uuid, 'memory.memsw.usage_in_bytes'))
mem_phys_sample = int(cgroup_controller.read_value('memory', uuid, 'memory.usage_in_bytes'))
cpu_sample = int(cgroup_controller.read_value('cpu', uuid, 'cpuacct.usage'))
pids_sample = int(cgroup_controller.read_value('pids', uuid, 'pids.current'))
container_pid = cgroup_manager.get_container_pid(uuid)
from intra.system import system_manager
real_time = system_manager.get_proc_etime(container_pid)
return {"cpu_sample": cpu_sample, "pids_sample": pids_sample, "mem_page_sample": mem_page_sample, "mem_phys_sample": mem_phys_sample, "pid": container_pid, "real_time": real_time}
def get_container_limit(uuid):
mem_phys_quota = int(cgroup_controller.read_value('memory', uuid, 'memory.limit_in_bytes'))
mem_page_quota = int(cgroup_controller.read_value('memory', uuid, 'memory.memsw.limit_in_bytes'))
cpu_shares = int(cgroup_controller.read_value('cpu', uuid, 'cpu.shares'))
cpu_quota = int(cgroup_controller.read_value('cpu', uuid, 'cpu.cfs_quota_us'))
cpu_quota = cpu_quota if cpu_quota >= 0 else -1
pids_quota = cgroup_controller.read_value('pids', uuid, 'pids.max')
pids_quota = int(pids_quota) if pids_quota != 'max' else -1
return {"cpu_quota": cpu_quota, "cpu_shares": cpu_shares, "mem_phy_quota": mem_phys_quota, "mem_page_quota": mem_page_quota, "pids_quota": pids_quota}
def get_container_oom_status(uuid):
[_x, idle, _y, oom] = cgroup_controller.read_value('memory', uuid, 'memory.oom_control').split()
return (idle == '1', oom == '1')
def set_container_oom_idle(uuid, idle):
cgroup_controller.write_value('memory', uuid, 'memory.oom_control', 1 if idle else 0)
def protect_container_oom(uuid):
cgroup_controller.write_value('memory', uuid, 'memory.oom_control', 1)
data = cgroup_manager.get_container_limit(uuid)
if data["mem_page_quota"] >= 9223372036854771712:
memory_limit_in_bytes = cgroup_manager.__default_memory_limit__ << 30
mem_phy_quota = min(data["mem_phy_quota"], memory_limit_in_bytes)
mem_page_quota = memory_limit_in_bytes
cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'FROZEN')
cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', mem_phy_quota)
cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', mem_phy_quota)
cgroup_controller.write_value('memory', uuid, 'memory.memsw.limit_in_bytes', mem_page_quota)
cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'THAWED')
def set_container_physical_memory_limit(uuid, Mbytes, freeze = False):
if freeze:
cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'FROZEN')
memory_limit = int(max(0, Mbytes)) << 20
cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', memory_limit)
if freeze:
cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'THAWED')
def set_container_cpu_priority_limit(uuid, ceof):
cpu_scaling = min(1024, 10 + int(1024 * ceof))
cgroup_controller.write_value('cpu', uuid, 'cpu.shares', cpu_scaling)

96
meter/intra/smart.py Executable file
View File

@ -0,0 +1,96 @@
import subprocess, time, os, threading, math
from intra.system import system_manager
from intra.cgroup import cgroup_manager
from intra.billing import billing_manager
class smart_controller:
def set_policy(policy):
smart_controller.policy = policy
def start(interval = 4):
thread = threading.Thread(target = smart_controller.smart_control_forever, args = [interval])
thread.setDaemon(True)
thread.start()
return thread
def smart_control_forever(interval):
last_live = []
while True:
time.sleep(interval)
try:
mem_usage_mapping = {}
live = cgroup_manager.get_cgroup_containers()
for item in live:
try:
last_live.remove(item)
except:
pass
try:
cgroup_manager.protect_container_oom(item)
sample = cgroup_manager.get_container_sample(item)
mem_usage_mapping[item] = math.ceil(sample['mem_page_sample'] * 1e-6)
billing_manager.add_usage_sample(item, sample, interval)
except:
pass
for item in last_live:
billing_manager.clean_dead_node(item)
last_live = live
is_ready = True
memory_available = system_manager.get_available_memsw()
if memory_available['Mbytes'] <= 0:
size_in_gb = int(math.ceil(-memory_available['Mbytes'] / 1024 / 16) * 16)
print("[warning]", 'overloaded containers, auto-extending %d G memsw.' % size_in_gb)
system_manager.extend_swap(size_in_gb)
total_score = 0.0
score_mapping = {}
for item in live:
score = max(1e-8, smart_controller.policy.get_score_by_uuid(item))
score_mapping[item] = score
print(item, "(score/cpu)", score)
total_score += score
# CPU Scoring
for item in live:
ceof = score_mapping[item] / total_score
cgroup_manager.set_container_cpu_priority_limit(item, ceof)
# Iterative Memory Scoring
free_mem = system_manager.get_total_physical_memory_for_containers()['Mbytes']
local_nodes = live
mem_alloc = {}
for item in live:
mem_alloc[item] = 0
while free_mem > 0 and len(local_nodes) > 0:
excess_mem = 0
next_local_nodes = []
for item in local_nodes:
mem_alloc[item] += int(math.floor(free_mem * score_mapping[item] / total_score))
if mem_alloc[item] >= mem_usage_mapping[item]:
excess_mem += mem_alloc[item] - mem_usage_mapping[item]
mem_alloc[item] = mem_usage_mapping[item]
else:
next_local_nodes.append(item)
free_mem = excess_mem
local_nodes = next_local_nodes
for item in live:
mem_alloc[item] += int(math.floor(free_mem * score_mapping[item] / total_score))
cgroup_manager.set_container_physical_memory_limit(item, mem_alloc[item])
print(item, "(malloc:usage)", mem_alloc[item], mem_usage_mapping[item])
if len(live) > 0:
print("-------------------------------")
except:
pass
# echo "8:0 1000" > /sys/fs/cgroup/blkio/lxc/docklet-1-0/blkio.throttle.write_bps_device
# https://www.kernel.org/doc/Documentation/devices.txt
# while true; do clear; cat /sys/fs/cgroup/blkio/lxc/docklet-1-0/blkio.throttle.io_service_bytes; sleep 0.5; done
# hugetlb, net_cls, net_prio, /sbin/tc

121
meter/intra/system.py Executable file
View File

@ -0,0 +1,121 @@
import subprocess, time, os
from intra.cgroup import cgroup_manager
class system_manager:
db_prefix = '.'
def set_db_prefix(prefix):
system_manager.db_prefix = prefix
try:
os.makedirs(prefix)
except:
pass
def clear_all_swaps():
subprocess.getoutput('swapoff -a')
subprocess.getoutput('losetup -D')
def extend_swap(size):
if size < 0:
(mem_free, mem_total) = system_manager.get_memory_sample()
size = (mem_total + mem_total // 8) // 1024
nid = 128
while subprocess.getoutput("cat /proc/swaps | grep cg-loop | awk '{print $1}' | awk -F\- '{print $NF}' | grep %d$" % nid) != "":
nid = nid + 1
start_time = time.time()
# setup
os.system('dd if=/dev/zero of=/tmp/cg-swap-%d bs=1G count=0 seek=%d >/dev/null 2>&1' % (nid, size))
os.system('mknod -m 0660 /dev/cg-loop-%d b 7 %d >/dev/null 2>&1' % (nid, nid))
os.system('losetup /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid))
os.system('mkswap /dev/cg-loop-%d >/dev/null 2>&1' % nid)
success = os.system('swapon /dev/cg-loop-%d >/dev/null 2>&1' % nid) == 0
# detach
# os.system('swapoff /dev/cg-loop-%d >/dev/null 2>&1' % nid)
# os.system('losetup -d /dev/cg-loop-%d >/dev/null 2>&1' % nid)
# os.system('rm -f /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid))
end_time = time.time()
return {"setup": success, "time": end_time - start_time }
def get_cpu_sample():
[a, b, c, d] = subprocess.getoutput("cat /proc/stat | grep ^cpu\ | awk '{print $2, $3, $4, $6}'").split()
cpu_time = int(a) + int(b) + int(c) + int(d)
return (cpu_time, time.time())
def get_memory_sample():
mem_free = int(subprocess.getoutput("awk '{if ($1==\"MemAvailable:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
mem_total = int(subprocess.getoutput("awk '{if ($1==\"MemTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
return (mem_free, mem_total)
def get_swap_sample():
swap_free = int(subprocess.getoutput("awk '{if ($1==\"SwapFree:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
swap_total = int(subprocess.getoutput("awk '{if ($1==\"SwapTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
return (swap_free, swap_total)
def get_system_loads():
if 'last_cpu_sample' not in system_manager.__dict__:
system_manager.last_cpu_sample = system_manager.get_cpu_sample()
time.sleep(1)
cpu_sample = system_manager.get_cpu_sample()
(mem_free, mem_total) = system_manager.get_memory_sample()
(swap_free, swap_total) = system_manager.get_swap_sample()
ncpus = int(subprocess.getoutput("grep processor /proc/cpuinfo | wc -l"))
cpu_free = ncpus - (cpu_sample[0] - system_manager.last_cpu_sample[0]) * 0.01 / (cpu_sample[1] - system_manager.last_cpu_sample[1])
cpu_free = 0.0 if cpu_free <= 0.0 else cpu_free
system_manager.last_cpu_sample = cpu_sample
return {"mem_free": mem_free, "mem_total": mem_total, "swap_free": swap_free, "swap_total": swap_total, "cpu_free": cpu_free, "cpu_total": ncpus }
def get_proc_etime(pid):
fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d' | awk '{print $NF}'" % pid).strip()
if fmt == '':
return -1
parts = fmt.split('-')
days = int(parts[0]) if len(parts) == 2 else 0
fmt = parts[-1]
parts = fmt.split(':')
hours = int(parts[0]) if len(parts) == 3 else 0
parts = parts[len(parts)-2:]
minutes = int(parts[0])
seconds = int(parts[1])
return ((days * 24 + hours) * 60 + minutes) * 60 + seconds
def get_available_memsw():
total_mem_limit = 0
total_mem_used = 0
sysloads = system_manager.get_system_loads()
live = cgroup_manager.get_cgroup_containers()
for item in live:
try:
sample = cgroup_manager.get_container_sample(item)
limit = cgroup_manager.get_container_limit(item)
total_mem_limit += limit["mem_page_quota"]
total_mem_used += sample["mem_page_sample"]
except:
pass
total_mem_limit >>= 20
total_mem_used = (total_mem_used + (1<<20) - 1) >> 20
available_mem_resource = sysloads['mem_free'] + \
sysloads['swap_free'] - total_mem_limit + total_mem_used
return {"Mbytes": available_mem_resource, "physical": sysloads['mem_free'], "cpu_free": sysloads['cpu_free']}
def get_total_physical_memory_for_containers():
total_mem_used = 0
sysloads = system_manager.get_system_loads()
live = cgroup_manager.get_cgroup_containers()
for item in live:
try:
sample = cgroup_manager.get_container_sample(item)
total_mem_used += sample["mem_page_sample"]
except:
pass
total_mem_used = (total_mem_used + (1<<20) - 1) >> 20
total_physical_memory_for_containers = sysloads['mem_free'] + total_mem_used
return {"Mbytes": total_physical_memory_for_containers}

69
meter/main.py Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/python3
########################################
# Boot for Local:
# sudo ./main (or: sudo ./main [master-ipaddr])
#
########################################
# Usage for Local:
# curl -F uuid="lxc-name1" http://0.0.0.0:1729/v1/cgroup/container/sample
#
import time, sys, signal, json, subprocess, os
if __name__ == '__main__':
if not subprocess.getoutput('lsb_release -r -s 2>/dev/null').startswith('16.04'):
raise Exception('Ubuntu 16.04 LTS is required.')
if not os.path.exists('/sys/fs/cgroup/memory/memory.memsw.usage_in_bytes'):
raise Exception('Please append "swapaccount=1" to kernel.')
if subprocess.getoutput('whoami') != 'root':
raise Exception('Root privilege is required.')
from daemon.http import *
if len(sys.argv) == 1:
sys.argv.append('disable-network')
def signal_handler(signal, frame):
if sys.argv[1] == 'master':
subprocess.getoutput('ovs-vsctl del-br ovs-master >/dev/null 2>&1')
else:
subprocess.getoutput('ovs-vsctl del-br ovs-minion >/dev/null 2>&1')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
if sys.argv[1] != 'master': # for minions
from intra.cgroup import cgroup_manager
cgroup_manager.auto_detect_prefix()
cgroup_manager.set_default_memory_limit(4)
from intra.system import system_manager
system_manager.set_db_prefix('/var/lib/docklet/meter')
# system_manager.extend_swap(32)
if sys.argv[1] != 'disable-network':
from connector.minion import minion_connector
minion_connector.start(sys.argv[1])
else:
print("(No network mode)")
from policy.quota import identify_policy
from intra.smart import smart_controller
smart_controller.set_policy(identify_policy)
smart_controller.start()
print("Minion REST Daemon Starts Listening ..")
http = http_daemon_listener(minion_http_handler)
http.listen()
else: # for master: sudo ./main master
from connector.master import master_connector
master_connector.start()
print("Master REST Daemon Starts Listening ..")
http = http_daemon_listener(master_http_handler, master_connector)
http.listen()

5
meter/policy/allocate.py Executable file
View File

@ -0,0 +1,5 @@
class candidates_selector:
def select(candidates):
return max(candidates, key=lambda addr: candidates[addr]['cpu_free'])

56
meter/policy/quota.py Executable file
View File

@ -0,0 +1,56 @@
from intra.system import system_manager
from intra.cgroup import cgroup_manager
import subprocess
class identify_policy:
def get_score_by_uuid(uuid):
return 1.0
class etime_rev_policy(identify_policy):
def get_score_by_uuid(uuid):
pid = cgroup_manager.get_container_pid(uuid)
etime = system_manager.get_proc_etime(pid)
return 1.0 / (1.0 + etime)
class mem_usage_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_sample(uuid)
return sample["mem_page_sample"]
class mem_quota_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_limit(uuid)
return sample["mem_page_quota"]
class cpu_usage_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_sample(uuid)
return sample["cpu_sample"]
class cpu_usage_rev_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_sample(uuid)
return 1024 * 1024 / (1.0 + sample["cpu_sample"])
class cpu_speed_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_sample(uuid)
pid = cgroup_manager.get_container_pid(uuid)
etime = system_manager.get_proc_etime(pid)
return sample["cpu_sample"] / etime
class user_state_policy(identify_policy):
def get_score_by_uuid(uuid):
user = uuid.split('-')[0]
online = subprocess.getoutput('cat /var/lib/docklet/global/users/%s/status 2>/dev/null' % user) == 'live'
return 10.0 if online else 1.0

View File

@ -23,7 +23,7 @@ import http.server, cgi, json, sys, shutil
from socketserver import ThreadingMixIn
import nodemgr, vclustermgr, etcdlib, network, imagemgr
import userManager
import monitor
import monitor,traceback
import threading
import sysmgr
@ -405,15 +405,15 @@ def vnodes_monitor(cur_user, user, form, con_id, issue):
global G_clustername
logger.info("handle request: monitor/vnodes")
res = {}
fetcher = monitor.Container_Fetcher()
fetcher = monitor.Container_Fetcher(con_id)
if issue == 'cpu_use':
res['cpu_use'] = fetcher.get_cpu_use(con_id)
res['cpu_use'] = fetcher.get_cpu_use()
elif issue == 'mem_use':
res['mem_use'] = fetcher.get_mem_use(con_id)
res['mem_use'] = fetcher.get_mem_use()
elif issue == 'disk_use':
res['disk_use'] = fetcher.get_disk_use(con_id)
res['disk_use'] = fetcher.get_disk_use()
elif issue == 'basic_info':
res['basic_info'] = fetcher.get_basic_info(con_id)
res['basic_info'] = fetcher.get_basic_info()
elif issue == 'owner':
names = con_id.split('-')
result = G_usermgr.query(username = names[0], cur_user = cur_user)
@ -657,6 +657,7 @@ def resetall_system(cur_user, user, form):
@app.errorhandler(500)
def internal_server_error(error):
logger.debug("An internel server error occured")
logger.error(traceback.format_exc())
return json.dumps({'success':'false', 'message':'500 Internal Server Error', 'Unauthorized': 'True'})

View File

@ -30,6 +30,20 @@ class Container_Collector(threading.Thread):
containers = re.split('\s+',output)
return containers
def get_proc_etime(self,pid):
fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d' | awk '{print $NF}'" % pid).strip()
if fmt == '':
return -1
parts = fmt.split('-')
days = int(parts[0]) if len(parts) == 2 else 0
fmt = parts[-1]
parts = fmt.split(':')
hours = int(parts[0]) if len(parts) == 3 else 0
parts = parts[len(parts)-2:]
minutes = int(parts[0])
seconds = int(parts[1])
return ((days * 24 + hours) * 60 + minutes) * 60 + seconds
def collect_containerinfo(self,container_name):
global workercinfo
output = subprocess.check_output("sudo lxc-info -n %s" % (container_name),shell=True)
@ -37,6 +51,9 @@ class Container_Collector(threading.Thread):
parts = re.split('\n',output)
info = {}
basic_info = {}
basic_exist = 'basic_info' in workercinfo[container_name].keys()
if basic_exist:
basic_info = workercinfo[container_name]['basic_info']
for part in parts:
if not part == '':
key_val = re.split(':',part)
@ -45,11 +62,24 @@ class Container_Collector(threading.Thread):
info[key] = val.lstrip()
basic_info['Name'] = info['Name']
basic_info['State'] = info['State']
#if basic_exist:
# logger.info(workercinfo[container_name]['basic_info'])
if(info['State'] == 'STOPPED'):
workercinfo[container_name]['basic_info'] = basic_info
logger.info(basic_info)
return False
running_time = self.get_proc_etime(int(info['PID']))
if basic_exist and 'PID' in workercinfo[container_name]['basic_info'].keys():
last_time = workercinfo[container_name]['basic_info']['LastTime']
if not info['PID'] == workercinfo[container_name]['basic_info']['PID']:
last_time = workercinfo[container_name]['basic_info']['RunningTime']
else:
last_time = 0
basic_info['LastTime'] = last_time
running_time += last_time
basic_info['PID'] = info['PID']
basic_info['IP'] = info['IP']
basic_info['RunningTime'] = running_time
workercinfo[container_name]['basic_info'] = basic_info
cpu_parts = re.split(' +',info['CPU use'])
@ -256,6 +286,10 @@ def workerFetchInfo():
global workercinfo
return str([workerinfo, workercinfo])
def get_owner(container_name):
names = container_name.split('-')
return names[0]
class Master_Collector(threading.Thread):
def __init__(self,nodemgr):
@ -274,12 +308,14 @@ class Master_Collector(threading.Thread):
for worker in workers:
try:
ip = self.nodemgr.rpc_to_ip(worker)
#[info,cinfo] = worker.workerFetchInfo()
info = list(eval(worker.workerFetchInfo()))
logger.info(info[1])
#logger.info(info[1])
monitor_hosts[ip] = info[0]
for container in info[1].keys():
monitor_vnodes[container] = info[1][container]
owner = get_owner(container)
if not owner in monitor_vnodes.keys():
monitor_vnodes[owner] = {}
monitor_vnodes[owner][container] = info[1][container]
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
@ -291,45 +327,47 @@ class Master_Collector(threading.Thread):
return
class Container_Fetcher:
def __init__(self):
def __init__(self,container_name):
self.owner = get_owner(container_name)
self.con_id = container_name
return
def get_cpu_use(self,container_name):
def get_cpu_use(self):
global monitor_vnodes
try:
res = monitor_vnodes[container_name]['cpu_use']
res['quota'] = monitor_vnodes[container_name]['quota']
res = monitor_vnodes[self.owner][self.con_id]['cpu_use']
res['quota'] = monitor_vnodes[self.owner][self.con_id]['quota']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_mem_use(self,container_name):
def get_mem_use(self):
global monitor_vnodes
try:
res = monitor_vnodes[container_name]['mem_use']
res['quota'] = monitor_vnodes[container_name]['quota']
res = monitor_vnodes[self.owner][self.con_id]['mem_use']
res['quota'] = monitor_vnodes[self.owner][self.con_id]['quota']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_disk_use(self,container_name):
def get_disk_use(self):
global monitor_vnodes
try:
res = monitor_vnodes[container_name]['disk_use']
res = monitor_vnodes[self.owner][self.con_id]['disk_use']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)
res = {}
return res
def get_basic_info(self,container_name):
def get_basic_info(self):
global monitor_vnodes
try:
res = monitor_vnodes[container_name]['basic_info']
res = monitor_vnodes[self.owner][self.con_id]['basic_info']
except Exception as err:
logger.warning(traceback.format_exc())
logger.warning(err)

View File

@ -1,15 +1,24 @@
var mem_usedp = 0;
var cpu_usedp = 0;
var is_running = true;
function processMemData(data)
{
mem_usedp = data.monitor.mem_use.usedp;
var usedp = data.monitor.mem_use.usedp;
var unit = data.monitor.mem_use.unit;
var quota = data.monitor.mem_use.quota.memory/1024.0;
var val = data.monitor.mem_use.val;
var out = "("+val+unit+"/"+quota.toFixed(2)+"MiB)";
$("#con_mem").html((usedp/0.01).toFixed(2)+"%<br/>"+out);
if(is_running)
{
mem_usedp = data.monitor.mem_use.usedp;
var usedp = data.monitor.mem_use.usedp;
var unit = data.monitor.mem_use.unit;
var quota = data.monitor.mem_use.quota.memory/1024.0;
var val = data.monitor.mem_use.val;
var out = "("+val+unit+"/"+quota.toFixed(2)+"MiB)";
$("#con_mem").html((usedp/0.01).toFixed(2)+"%<br/>"+out);
}
else
{
mem_usedp = 0;
$("#con_mem").html("--");
}
}
function getMemY()
{
@ -17,16 +26,24 @@ function getMemY()
}
function processCpuData(data)
{
cpu_usedp = data.monitor.cpu_use.usedp;
var val = data.monitor.cpu_use.val;
var unit = data.monitor.cpu_use.unit;
var quota = data.monitor.cpu_use.quota.cpu;
var quotaout = "("+quota;
if(quota == 1)
quotaout += " Core)";
if(is_running)
{
cpu_usedp = data.monitor.cpu_use.usedp;
var val = data.monitor.cpu_use.val;
var unit = data.monitor.cpu_use.unit;
var quota = data.monitor.cpu_use.quota.cpu;
var quotaout = "("+quota;
if(quota == 1)
quotaout += " Core)";
else
quotaout += " Cores)";
$("#con_cpu").html(val +" "+ unit+"<br/>"+quotaout);
}
else
quotaout += " Cores)";
$("#con_cpu").html(val +" "+ unit+"<br/>"+quotaout);
{
cpu_usedp = 0;
$("#con_cpu").html("--");
}
}
function getCpuY()
{
@ -173,3 +190,25 @@ function processDiskData()
},"json");
}
setInterval(processDiskData,1000);
function processBasicInfo()
{
$.post(url+"/basic_info/",{},function(data){
basic_info = data.monitor.basic_info;
state = basic_info.State;
if(state == 'STOPPED')
{
is_running = false;
$("#con_state").html("<div class='label label-danger'>Stopped</div>");
$("#con_ip").html("--");
}
else
{
is_running = true;
$("#con_state").html("<div class='label label-primary'>Running</div>");
$("#con_ip").html(basic_info.IP);
}
$("#con_time").html(basic_info.RunningTime+"s");
},"json");
}
setInterval(processBasicInfo,1000);

View File

@ -82,6 +82,7 @@
<th>Node Name</th>
<th>IP Address</th>
<th>Status</th>
<th>Running Time</th>
<th>Cpu Usage</th>
<th>Mem Usage</th>
<th>Disk Usage</th>
@ -100,6 +101,7 @@
{% else %}
<td><div id='{{cluster}}_{{ loop.index }}_state' class="label label-primary">Running</div></td>
{% endif %}
<td id='{{cluster}}_{{ loop.index }}_time'>--</td>
<td id='{{cluster}}_{{ loop.index }}_cpu'>--</td>
<td id='{{cluster}}_{{ loop.index }}_mem'>--</td>
<td id='{{cluster}}_{{ loop.index }}_disk'>--</td>
@ -124,7 +126,7 @@
$.post(url+"/basic_info/",{},function(data){
var state = data.monitor.basic_info.State;
if(state == 'RUNNING')
if(state == 'RUNNING')
{
var tmp = $("#"+index+"_state");
tmp.removeClass();
@ -145,6 +147,7 @@
$("#"+index+"_mem").html('--');
return;
}
$("#"+index+"_time").html(data.monitor.basic_info.RunningTime+"s")
$.post(url+"/cpu_use/",{},function(data){
var usedp = data.monitor.cpu_use.usedp;

View File

@ -41,6 +41,7 @@
<tr>
<th>State</th>
<th>IP Address</th>
<th>Running Time</th>
<th>CPU Usage</th>
<th>Mem Usage</th>
<th>Disk Usage</th>
@ -49,12 +50,13 @@
<tbody>
<tr>
{% if container['State'] == 'STOPPED' %}
<td><div id='con_state' class="label label-danger">Stopped</div></td>
<td id='con_state'><div class="label label-danger">Stopped</div></td>
<td id='con_ip'>--</td>
{% else %}
<td><div id='con_state' class="label label-primary">Running</div></td>
<td id='con_state'><div class="label label-primary">Running</div></td>
<td id='con_ip'>{{ container['IP'] }}</td>
{% endif %}
<td id='con_time'>{{ container['RunningTime'] }}s</td>
<td id='con_cpu'>--</td>
<td id='con_mem'>--</td>
<td id='con_disk'>--</td>