integrade dockmeter local features:

1) smart cgroup controlling;
2) account billing;
3) libs to get host resources / process runtime elapsed;
This commit is contained in:
cuiwei13 2016-05-19 02:13:57 +08:00
parent 71fe755aec
commit 94aa5841f3
12 changed files with 863 additions and 0 deletions

99
meter/connector/master.py Executable file
View File

@ -0,0 +1,99 @@
#!/usr/bin/python3
import socket, select, errno, threading, os
class master_connector:
tcp_port = 1727
max_minions = 256
conn = {}
epoll_fd = select.epoll()
def establish_vswitch(ovsname):
os.system('ovs-vsctl del-br ovs-%s >/dev/null 2>&1' % ovsname)
os.system('ovs-vsctl add-br ovs-%s' % ovsname)
os.system('brctl addif ovs-bridge ovs-%s >/dev/null 2>&1' % ovsname)
os.system('ip link set ovs-system up')
os.system('ip link set ovs-%s up' % ovsname)
def build_gre_conn(ovsname, ipaddr):
name = ipaddr.replace('.','_')
os.system('ovs-vsctl add-port ovs-%s gre-%s -- set interface gre-%s type=gre options:remote_ip=%s 2>/dev/null' % (ovsname, name, name, ipaddr))
def break_gre_conn(ovsname, ipaddr):
name = ipaddr.replace('.','_')
os.system('ovs-vsctl del-port ovs-%s gre-%s 2>/dev/null' % (ovsname, name))
def close_connection(fd):
master_connector.epoll_fd.unregister(fd)
master_connector.conn[fd][0].close()
addr = master_connector.conn[fd][1]
master_connector.conn.pop(fd)
master_connector.break_gre_conn('master', addr)
def do_message_response(input_buffer):
assert(input_buffer == b'ack')
return b'ack'
def start():
thread = threading.Thread(target = master_connector.run_forever, args = [])
thread.setDaemon(True)
thread.start()
return thread
def run_forever():
listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_fd.bind(('', master_connector.tcp_port))
listen_fd.listen(master_connector.max_minions)
master_connector.epoll_fd.register(listen_fd.fileno(), select.EPOLLIN)
datalist = {}
master_connector.establish_vswitch('master')
try:
while True:
epoll_list = master_connector.epoll_fd.poll()
for fd, events in epoll_list:
if fd == listen_fd.fileno():
fileno, addr = listen_fd.accept()
fileno.setblocking(0)
master_connector.epoll_fd.register(fileno.fileno(), select.EPOLLIN | select.EPOLLET)
master_connector.conn[fileno.fileno()] = (fileno, addr[0])
master_connector.build_gre_conn('master', addr[0])
elif select.EPOLLIN & events:
datas = b''
while True:
try:
data = master_connector.conn[fd][0].recv(10)
if not data and not datas:
master_connector.close_connection(fd)
break
else:
datas += data
except socket.error as msg:
if msg.errno == errno.EAGAIN:
try:
datalist[fd] = master_connector.do_message_response(datas)
master_connector.epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT)
except:
master_connector.close_connection(fd)
else:
master_connector.close_connection(fd)
break
elif select.EPOLLOUT & events:
sendLen = 0
while True:
sendLen += master_connector.conn[fd][0].send(datalist[fd][sendLen:])
if sendLen == len(datalist[fd]):
break
master_connector.epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET)
elif select.EPOLLHUP & events:
master_connector.close_connection(fd)
else:
continue
finally:
os.system('ovs-vsctl del-br ovs-master >/dev/null 2>&1')

43
meter/connector/minion.py Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/python3
import socket, time, threading, os
class minion_connector:
def connect(server_ip):
from connector.master import master_connector
connected = True
while True:
try:
fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
fd.connect((server_ip, master_connector.tcp_port))
connected = True
print("[info]", "connected to master.")
master_connector.establish_vswitch('minion')
master_connector.build_gre_conn('minion', server_ip)
while True:
data = b'ack'
if fd.send(data) != len(data):
break
readData = fd.recv(1024)
time.sleep(0.5)
fd.close()
except socket.error as e:
master_connector.break_gre_conn('minion', server_ip)
if connected:
print("[info]", "non-connected with master.")
except Exception as e:
pass
finally:
if connected:
os.system('ovs-vsctl del-br ovs-minion >/dev/null 2>&1')
connected = False
time.sleep(1)
def start(server_ip):
thread = threading.Thread(target = minion_connector.connect, args = [server_ip])
thread.setDaemon(True)
thread.start()
return thread

82
meter/daemon/http.py Executable file
View File

@ -0,0 +1,82 @@
import json, cgi, threading
from http.server import BaseHTTPRequestHandler, HTTPServer
class base_http_handler(BaseHTTPRequestHandler):
def load_module(self):
return None
def do_POST(self):
try:
default_exception = 'unsupported request.'
success = True
data = None
length = self.headers['content-length']
if length == None:
length = self.headers['content-length'] = 0
if int(length) > (1<<12):
raise Exception("data too large")
http_form = cgi.FieldStorage(fp=self.rfile, headers=self.headers,environ={'REQUEST_METHOD':'POST','CONTENT_TYPE': "text/html"})
form = {}
for item in http_form:
try:
value = http_form[item].file.read().strip()
except:
value = http_form[item].value
try:
value = value.decode()
except:
pass
form[item] = value
parts = self.path.split('/', 2)
if len(parts) != 3:
raise Exception(default_exception)
[null, version, path] = parts
pymodule = self.load_module() + '_' + version
module = __import__('daemon.' + pymodule)
handler = module.__dict__[pymodule].__dict__['case_handler']
method = path.replace('/', '_')
if not hasattr(handler, method):
raise Exception(default_exception)
data = handler.__dict__[method](form, self.handler_class.args)
except Exception as e:
success = False
data = {"reason": str(e)}
finally:
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"success": success, "data": data}).encode())
self.wfile.write("\n".encode())
return
class master_http_handler(base_http_handler):
http_port = 1728
def load_module(self):
self.handler_class = master_http_handler
return 'master'
class minion_http_handler(base_http_handler):
http_port = 1729
def load_module(self):
self.handler_class = minion_http_handler
return 'minion'
class http_daemon_listener:
def __init__(self, handler_class, args = None):
handler_class.args = args
self.handler_class = handler_class
def listen(self):
server = HTTPServer(('', self.handler_class.http_port), self.handler_class)
server.serve_forever()

58
meter/daemon/master_v1.py Executable file
View File

@ -0,0 +1,58 @@
import subprocess, os
def http_client_post(ip, port, url, entries = {}):
import urllib.request, urllib.parse, json
url = url if not url.startswith('/') else url[1:]
response = urllib.request.urlopen('http://%s:%d/%s' % (ip, port, url), urllib.parse.urlencode(entries).encode())
obj = json.loads(response.read().decode().strip())
response.close()
return obj
class case_handler:
# [Order-by] lexicographic order
# curl -L -X POST http://0.0.0.0:1728/v1/minions/list
def minions_list(form, args):
minions = []
for item in args.conn:
minions.append(args.conn[item][1])
return {"minions": minions}
# curl -L -X POST -F mem=4096 -F cpu=2 http://0.0.0.0:1728/v1/resource/allocation
def resource_allocation(form, args):
mem = int(form['mem'])
cpu = int(form['cpu'])
candidates = {}
from daemon.http import minion_http_handler
for item in args.conn:
addr = args.conn[item][1]
obj = http_client_post(addr, minion_http_handler.http_port, '/v1/system/memsw/available')
if obj['success'] and obj['data']['Mbytes'] >= mem:
candidates[addr] = obj['data']
if len(candidates) <= 0:
raise Exception("no minions")
else:
from policy.allocate import candidates_selector
one = candidates_selector.select(candidates)
return {"recommend": one}
# curl -L -X POST -F user=docklet http://0.0.0.0:1728/v1/user/live/add
def user_live_add(form, args):
if not os.path.exists('/var/lib/docklet/global/users/%s' % form['user']):
return False
subprocess.getoutput('echo live > /var/lib/docklet/global/users/%s/status' % form['user'])
return True
# curl -L -X POST -F user=docklet http://0.0.0.0:1728/v1/user/live/remove
def user_live_remove(form, args):
subprocess.getoutput('rm -f /var/lib/docklet/global/users/%s/status' % form['user'])
return True
# curl -L -X POST http://0.0.0.0:1728/v1/user/live/list
def user_live_list(form, args):
return subprocess.getoutput('ls -1 /var/lib/docklet/global/users/*/status 2>/dev/null | awk -F\/ \'{print $(NF-1)\'}').split()

76
meter/daemon/minion_v1.py Executable file
View File

@ -0,0 +1,76 @@
from intra.system import system_manager
from intra.billing import billing_manager
from intra.cgroup import cgroup_manager
from policy.quota import *
from intra.smart import smart_controller
class case_handler:
# [Order-by] lexicographic order
# curl -L -X POST -F uuid=docklet-1-0 http://0.0.0.0:1729/v1/billing/increment
def billing_increment(form, args):
return billing_manager.fetch_increment_and_clean(form['uuid'])
# curl -L -X POST http://0.0.0.0:1729/v1/cgroup/container/list
def cgroup_container_list(form, args):
return cgroup_manager.get_cgroup_containers()
# curl -L -X POST -F policy=etime_rev_policy http://0.0.0.0:1729/v1/smart/quota/policy
def smart_quota_policy(form, args):
msg = 'success'
try:
smart_controller.set_policy(eval(form['policy']))
except Exception as e:
msg = e
return {'message': msg}
# curl -L -X POST -F uuid=n1 http://0.0.0.0:1729/v1/cgroup/container/limit
def cgroup_container_limit(form, args):
return cgroup_manager.get_container_limit(form['uuid'])
# curl -L -X POST -F uuid=n1 http://0.0.0.0:1729/v1/cgroup/container/sample
def cgroup_container_sample(form, args):
return cgroup_manager.get_container_sample(form['uuid'])
# curl -L -X POST http://0.0.0.0:1729/v1/system/loads
def system_loads(form, args):
return system_manager.get_system_loads()
# curl -L -X POST http://0.0.0.0:1729/v1/system/memsw/available
def system_memsw_available(form, args):
return system_manager.get_available_memsw()
# curl -L -X POST -F size=16 http://0.0.0.0:1729/v1/system/swap/extend
def system_swap_extend(form, args):
return system_manager.extend_swap(int(form['size']))
# curl -L -X POST http://0.0.0.0:1729/v1/system/swap/clear
def system_swap_clear(form, args):
return system_manager.clear_all_swaps()
# curl -L -X POST http://0.0.0.0:1729/v1/system/total/physical/memory
def system_total_physical_memory(form, args):
return system_manager.get_total_physical_memory_for_containers()
'''
# curl -X POST -F uuid=n1 http://0.0.0.0:1729/v1/blacklist/add
def blacklist_add(form):
exists = form['uuid'] in smart_controller.blacklist
if not exists:
smart_controller.blacklist.add(form['uuid'])
return {"changed": not exists}
# curl -X POST -F uuid=n1 http://0.0.0.0:1729/v1/blacklist/remove
def blacklist_remove(form):
exists = form['uuid'] in smart_controller.blacklist
if exists:
smart_controller.blacklist.remove(form['uuid'])
return {"changed": exists}
# curl -X POST http://0.0.0.0:1729/v1/blacklist/show
def blacklist_show(form):
blacklist = []
for item in smart_controller.blacklist:
blacklist.append(item)
return blacklist
'''

46
meter/intra/billing.py Executable file
View File

@ -0,0 +1,46 @@
import subprocess, time, os
from intra.system import system_manager
class billing_manager:
history_book = {}
def on_lxc_acct_usage(uuid, prev, curr, interval):
cpu_gen = max(0, curr['cpu_sample'] - prev['cpu_sample']) >> 20 # in ms
mem_gen = ((curr['mem_phys_sample'] + prev['mem_phys_sample']) * interval) >> 11 # in kbytes
try:
os.makedirs('%s/%s' % (system_manager.db_prefix, uuid))
except:
pass
with open('%s/%s/usage' % (system_manager.db_prefix, uuid), 'a') as fp:
fp.write('%d %d\n' % (cpu_gen, mem_gen))
def add_usage_sample(uuid, sample, interval):
if uuid in billing_manager.history_book:
billing_manager.on_lxc_acct_usage(uuid, billing_manager.history_book[uuid], sample, interval)
billing_manager.history_book[uuid] = sample
def clean_dead_node(uuid):
if uuid in billing_manager.history_book:
billing_manager.history_book.pop(uuid)
def fetch_increment_and_clean(uuid):
cpu_acct = 0.0
mem_acct = 0.0
cnt_acct = 0
try:
fetch_path = '%s/%s/%f' % (system_manager.db_prefix, uuid, time.time())
os.rename('%s/%s/usage' % (system_manager.db_prefix, uuid), fetch_path)
with open(fetch_path, 'r') as fp:
line = fp.readline()
while line != '':
[cpu, mem] = line.split()
line = fp.readline()
cnt_acct += 1
cpu_acct += float(cpu)
mem_acct += float(mem)
os.remove(fetch_path)
except:
pass
return {"cpu_acct": cpu_acct, "mem_acct": mem_acct, "cnt_acct": cnt_acct}

112
meter/intra/cgroup.py Executable file
View File

@ -0,0 +1,112 @@
import subprocess, os
class cgroup_controller:
def read_value(group, uuid, item):
path = cgroup_manager.__default_prefix__ % (group, uuid, item)
if not os.path.exists(path):
raise Exception('read: container "%s" not found!' % uuid)
with open(path, 'r') as file:
value = file.read()
return value.strip()
def write_value(group, uuid, item, value):
path = cgroup_manager.__default_prefix__ % (group, uuid, item)
if not os.path.exists(path):
raise Exception('write: container "%s" not found!' % uuid)
try:
with open(path, 'w') as file:
file.write(str(value))
except:
pass
class cgroup_manager:
__prefix_docker__ = '/sys/fs/cgroup/%s/system.slice/docker-%s.scope/%s'
__prefix_lxc__ = '/sys/fs/cgroup/%s/lxc/%s/%s'
__prefix_lxcinit__ = '/sys/fs/cgroup/%s/init.scope/lxc/%s/%s'
def set_default_memory_limit(limit):
cgroup_manager.__default_memory_limit__ = limit
def set_cgroup_prefix(prefix = __prefix_lxc__):
cgroup_manager.__default_prefix__ = prefix
def auto_detect_prefix():
cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_docker__
if len(cgroup_manager.get_cgroup_containers()) > 0:
return
cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_lxcinit__
if len(cgroup_manager.get_cgroup_containers()) > 0:
return
cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_lxc__
if len(cgroup_manager.get_cgroup_containers()) > 0:
return
# print("[info]", "set cgroup prefix to %s" % cgroup_manager.__default_prefix__)
def get_cgroup_containers():
containers = subprocess.getoutput("find %s -type d 2>/dev/null | awk -F\/ '{print $(NF-1)}'" % (cgroup_manager.__default_prefix__ % ('cpu', '*', '.'))).split()
uuids = []
for item in containers:
if item.startswith('docker-') and item.endswith('.scope') and len(item) > 64:
uuids.append(item[7:-6])
else:
uuids.append(item)
return uuids
def get_container_pid(uuid):
return int(cgroup_controller.read_value('cpu', uuid, 'tasks').split()[0])
def get_container_sample(uuid):
mem_page_sample = int(cgroup_controller.read_value('memory', uuid, 'memory.memsw.usage_in_bytes'))
mem_phys_sample = int(cgroup_controller.read_value('memory', uuid, 'memory.usage_in_bytes'))
cpu_sample = int(cgroup_controller.read_value('cpu', uuid, 'cpuacct.usage'))
pids_sample = int(cgroup_controller.read_value('pids', uuid, 'pids.current'))
container_pid = cgroup_manager.get_container_pid(uuid)
from intra.system import system_manager
real_time = system_manager.get_proc_etime(container_pid)
return {"cpu_sample": cpu_sample, "pids_sample": pids_sample, "mem_page_sample": mem_page_sample, "mem_phys_sample": mem_phys_sample, "pid": container_pid, "real_time": real_time}
def get_container_limit(uuid):
mem_phys_quota = int(cgroup_controller.read_value('memory', uuid, 'memory.limit_in_bytes'))
mem_page_quota = int(cgroup_controller.read_value('memory', uuid, 'memory.memsw.limit_in_bytes'))
cpu_shares = int(cgroup_controller.read_value('cpu', uuid, 'cpu.shares'))
cpu_quota = int(cgroup_controller.read_value('cpu', uuid, 'cpu.cfs_quota_us'))
cpu_quota = cpu_quota if cpu_quota >= 0 else -1
pids_quota = cgroup_controller.read_value('pids', uuid, 'pids.max')
pids_quota = int(pids_quota) if pids_quota != 'max' else -1
return {"cpu_quota": cpu_quota, "cpu_shares": cpu_shares, "mem_phy_quota": mem_phys_quota, "mem_page_quota": mem_page_quota, "pids_quota": pids_quota}
def get_container_oom_status(uuid):
[_x, idle, _y, oom] = cgroup_controller.read_value('memory', uuid, 'memory.oom_control').split()
return (idle == '1', oom == '1')
def set_container_oom_idle(uuid, idle):
cgroup_controller.write_value('memory', uuid, 'memory.oom_control', 1 if idle else 0)
def protect_container_oom(uuid):
cgroup_controller.write_value('memory', uuid, 'memory.oom_control', 1)
data = cgroup_manager.get_container_limit(uuid)
if data["mem_page_quota"] >= 9223372036854771712:
memory_limit_in_bytes = cgroup_manager.__default_memory_limit__ << 30
mem_phy_quota = min(data["mem_phy_quota"], memory_limit_in_bytes)
mem_page_quota = memory_limit_in_bytes
cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'FROZEN')
cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', mem_phy_quota)
cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', mem_phy_quota)
cgroup_controller.write_value('memory', uuid, 'memory.memsw.limit_in_bytes', mem_page_quota)
cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'THAWED')
def set_container_physical_memory_limit(uuid, Mbytes, freeze = False):
if freeze:
cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'FROZEN')
memory_limit = int(max(0, Mbytes)) << 20
cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', memory_limit)
if freeze:
cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'THAWED')
def set_container_cpu_priority_limit(uuid, ceof):
cpu_scaling = min(1024, 10 + int(1024 * ceof))
cgroup_controller.write_value('cpu', uuid, 'cpu.shares', cpu_scaling)

96
meter/intra/smart.py Executable file
View File

@ -0,0 +1,96 @@
import subprocess, time, os, threading, math
from intra.system import system_manager
from intra.cgroup import cgroup_manager
from intra.billing import billing_manager
class smart_controller:
def set_policy(policy):
smart_controller.policy = policy
def start(interval = 4):
thread = threading.Thread(target = smart_controller.smart_control_forever, args = [interval])
thread.setDaemon(True)
thread.start()
return thread
def smart_control_forever(interval):
last_live = []
while True:
time.sleep(interval)
try:
mem_usage_mapping = {}
live = cgroup_manager.get_cgroup_containers()
for item in live:
try:
last_live.remove(item)
except:
pass
try:
cgroup_manager.protect_container_oom(item)
sample = cgroup_manager.get_container_sample(item)
mem_usage_mapping[item] = math.ceil(sample['mem_page_sample'] * 1e-6)
billing_manager.add_usage_sample(item, sample, interval)
except:
pass
for item in last_live:
billing_manager.clean_dead_node(item)
last_live = live
is_ready = True
memory_available = system_manager.get_available_memsw()
if memory_available['Mbytes'] <= 0:
size_in_gb = int(math.ceil(-memory_available['Mbytes'] / 1024 / 16) * 16)
print("[warning]", 'overloaded containers, auto-extending %d G memsw.' % size_in_gb)
system_manager.extend_swap(size_in_gb)
total_score = 0.0
score_mapping = {}
for item in live:
score = max(1e-8, smart_controller.policy.get_score_by_uuid(item))
score_mapping[item] = score
print(item, "(score/cpu)", score)
total_score += score
# CPU Scoring
for item in live:
ceof = score_mapping[item] / total_score
cgroup_manager.set_container_cpu_priority_limit(item, ceof)
# Iterative Memory Scoring
free_mem = system_manager.get_total_physical_memory_for_containers()['Mbytes']
local_nodes = live
mem_alloc = {}
for item in live:
mem_alloc[item] = 0
while free_mem > 0 and len(local_nodes) > 0:
excess_mem = 0
next_local_nodes = []
for item in local_nodes:
mem_alloc[item] += int(math.floor(free_mem * score_mapping[item] / total_score))
if mem_alloc[item] >= mem_usage_mapping[item]:
excess_mem += mem_alloc[item] - mem_usage_mapping[item]
mem_alloc[item] = mem_usage_mapping[item]
else:
next_local_nodes.append(item)
free_mem = excess_mem
local_nodes = next_local_nodes
for item in live:
mem_alloc[item] += int(math.floor(free_mem * score_mapping[item] / total_score))
cgroup_manager.set_container_physical_memory_limit(item, mem_alloc[item])
print(item, "(malloc:usage)", mem_alloc[item], mem_usage_mapping[item])
if len(live) > 0:
print("-------------------------------")
except:
pass
# echo "8:0 1000" > /sys/fs/cgroup/blkio/lxc/docklet-1-0/blkio.throttle.write_bps_device
# https://www.kernel.org/doc/Documentation/devices.txt
# while true; do clear; cat /sys/fs/cgroup/blkio/lxc/docklet-1-0/blkio.throttle.io_service_bytes; sleep 0.5; done
# hugetlb, net_cls, net_prio, /sbin/tc

121
meter/intra/system.py Executable file
View File

@ -0,0 +1,121 @@
import subprocess, time, os
from intra.cgroup import cgroup_manager
class system_manager:
db_prefix = '.'
def set_db_prefix(prefix):
system_manager.db_prefix = prefix
try:
os.makedirs(prefix)
except:
pass
def clear_all_swaps():
subprocess.getoutput('swapoff -a')
subprocess.getoutput('losetup -D')
def extend_swap(size):
if size < 0:
(mem_free, mem_total) = system_manager.get_memory_sample()
size = (mem_total + mem_total // 8) // 1024
nid = 128
while subprocess.getoutput("cat /proc/swaps | grep cg-loop | awk '{print $1}' | awk -F\- '{print $NF}' | grep %d$" % nid) != "":
nid = nid + 1
start_time = time.time()
# setup
os.system('dd if=/dev/zero of=/tmp/cg-swap-%d bs=1G count=0 seek=%d >/dev/null 2>&1' % (nid, size))
os.system('mknod -m 0660 /dev/cg-loop-%d b 7 %d >/dev/null 2>&1' % (nid, nid))
os.system('losetup /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid))
os.system('mkswap /dev/cg-loop-%d >/dev/null 2>&1' % nid)
success = os.system('swapon /dev/cg-loop-%d >/dev/null 2>&1' % nid) == 0
# detach
# os.system('swapoff /dev/cg-loop-%d >/dev/null 2>&1' % nid)
# os.system('losetup -d /dev/cg-loop-%d >/dev/null 2>&1' % nid)
# os.system('rm -f /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid))
end_time = time.time()
return {"setup": success, "time": end_time - start_time }
def get_cpu_sample():
[a, b, c, d] = subprocess.getoutput("cat /proc/stat | grep ^cpu\ | awk '{print $2, $3, $4, $6}'").split()
cpu_time = int(a) + int(b) + int(c) + int(d)
return (cpu_time, time.time())
def get_memory_sample():
mem_free = int(subprocess.getoutput("awk '{if ($1==\"MemAvailable:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
mem_total = int(subprocess.getoutput("awk '{if ($1==\"MemTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
return (mem_free, mem_total)
def get_swap_sample():
swap_free = int(subprocess.getoutput("awk '{if ($1==\"SwapFree:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
swap_total = int(subprocess.getoutput("awk '{if ($1==\"SwapTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024
return (swap_free, swap_total)
def get_system_loads():
if 'last_cpu_sample' not in system_manager.__dict__:
system_manager.last_cpu_sample = system_manager.get_cpu_sample()
time.sleep(1)
cpu_sample = system_manager.get_cpu_sample()
(mem_free, mem_total) = system_manager.get_memory_sample()
(swap_free, swap_total) = system_manager.get_swap_sample()
ncpus = int(subprocess.getoutput("grep processor /proc/cpuinfo | wc -l"))
cpu_free = ncpus - (cpu_sample[0] - system_manager.last_cpu_sample[0]) * 0.01 / (cpu_sample[1] - system_manager.last_cpu_sample[1])
cpu_free = 0.0 if cpu_free <= 0.0 else cpu_free
system_manager.last_cpu_sample = cpu_sample
return {"mem_free": mem_free, "mem_total": mem_total, "swap_free": swap_free, "swap_total": swap_total, "cpu_free": cpu_free, "cpu_total": ncpus }
def get_proc_etime(pid):
fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d' | awk '{print $NF}'" % pid).strip()
if fmt == '':
return -1
parts = fmt.split('-')
days = int(parts[0]) if len(parts) == 2 else 0
fmt = parts[-1]
parts = fmt.split(':')
hours = int(parts[0]) if len(parts) == 3 else 0
parts = parts[len(parts)-2:]
minutes = int(parts[0])
seconds = int(parts[1])
return ((days * 24 + hours) * 60 + minutes) * 60 + seconds
def get_available_memsw():
total_mem_limit = 0
total_mem_used = 0
sysloads = system_manager.get_system_loads()
live = cgroup_manager.get_cgroup_containers()
for item in live:
try:
sample = cgroup_manager.get_container_sample(item)
limit = cgroup_manager.get_container_limit(item)
total_mem_limit += limit["mem_page_quota"]
total_mem_used += sample["mem_page_sample"]
except:
pass
total_mem_limit >>= 20
total_mem_used = (total_mem_used + (1<<20) - 1) >> 20
available_mem_resource = sysloads['mem_free'] + \
sysloads['swap_free'] - total_mem_limit + total_mem_used
return {"Mbytes": available_mem_resource, "physical": sysloads['mem_free'], "cpu_free": sysloads['cpu_free']}
def get_total_physical_memory_for_containers():
total_mem_used = 0
sysloads = system_manager.get_system_loads()
live = cgroup_manager.get_cgroup_containers()
for item in live:
try:
sample = cgroup_manager.get_container_sample(item)
total_mem_used += sample["mem_page_sample"]
except:
pass
total_mem_used = (total_mem_used + (1<<20) - 1) >> 20
total_physical_memory_for_containers = sysloads['mem_free'] + total_mem_used
return {"Mbytes": total_physical_memory_for_containers}

69
meter/main.py Executable file
View File

@ -0,0 +1,69 @@
#!/usr/bin/python3
########################################
# Boot for Local:
# sudo ./main (or: sudo ./main [master-ipaddr])
#
########################################
# Usage for Local:
# curl -F uuid="lxc-name1" http://0.0.0.0:1729/v1/cgroup/container/sample
#
import time, sys, signal, json, subprocess, os
if __name__ == '__main__':
if not subprocess.getoutput('lsb_release -r -s 2>/dev/null').startswith('16.04'):
raise Exception('Ubuntu 16.04 LTS is required.')
if not os.path.exists('/sys/fs/cgroup/memory/memory.memsw.usage_in_bytes'):
raise Exception('Please append "swapaccount=1" to kernel.')
if subprocess.getoutput('whoami') != 'root':
raise Exception('Root privilege is required.')
from daemon.http import *
if len(sys.argv) == 1:
sys.argv.append('disable-network')
def signal_handler(signal, frame):
if sys.argv[1] == 'master':
subprocess.getoutput('ovs-vsctl del-br ovs-master >/dev/null 2>&1')
else:
subprocess.getoutput('ovs-vsctl del-br ovs-minion >/dev/null 2>&1')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
if sys.argv[1] != 'master': # for minions
from intra.cgroup import cgroup_manager
cgroup_manager.auto_detect_prefix()
cgroup_manager.set_default_memory_limit(4)
from intra.system import system_manager
system_manager.set_db_prefix('/var/lib/docklet/meter')
# system_manager.extend_swap(32)
if sys.argv[1] != 'disable-network':
from connector.minion import minion_connector
minion_connector.start(sys.argv[1])
else:
print("(No network mode)")
from policy.quota import identify_policy
from intra.smart import smart_controller
smart_controller.set_policy(identify_policy)
smart_controller.start()
print("Minion REST Daemon Starts Listening ..")
http = http_daemon_listener(minion_http_handler)
http.listen()
else: # for master: sudo ./main master
from connector.master import master_connector
master_connector.start()
print("Master REST Daemon Starts Listening ..")
http = http_daemon_listener(master_http_handler, master_connector)
http.listen()

5
meter/policy/allocate.py Executable file
View File

@ -0,0 +1,5 @@
class candidates_selector:
def select(candidates):
return max(candidates, key=lambda addr: candidates[addr]['cpu_free'])

56
meter/policy/quota.py Executable file
View File

@ -0,0 +1,56 @@
from intra.system import system_manager
from intra.cgroup import cgroup_manager
import subprocess
class identify_policy:
def get_score_by_uuid(uuid):
return 1.0
class etime_rev_policy(identify_policy):
def get_score_by_uuid(uuid):
pid = cgroup_manager.get_container_pid(uuid)
etime = system_manager.get_proc_etime(pid)
return 1.0 / (1.0 + etime)
class mem_usage_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_sample(uuid)
return sample["mem_page_sample"]
class mem_quota_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_limit(uuid)
return sample["mem_page_quota"]
class cpu_usage_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_sample(uuid)
return sample["cpu_sample"]
class cpu_usage_rev_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_sample(uuid)
return 1024 * 1024 / (1.0 + sample["cpu_sample"])
class cpu_speed_policy(identify_policy):
def get_score_by_uuid(uuid):
sample = cgroup_manager.get_container_sample(uuid)
pid = cgroup_manager.get_container_pid(uuid)
etime = system_manager.get_proc_etime(pid)
return sample["cpu_sample"] / etime
class user_state_policy(identify_policy):
def get_score_by_uuid(uuid):
user = uuid.split('-')[0]
online = subprocess.getoutput('cat /var/lib/docklet/global/users/%s/status 2>/dev/null' % user) == 'live'
return 10.0 if online else 1.0