From 94aa5841f31c78ecde9b898e445a2295643bb446 Mon Sep 17 00:00:00 2001 From: cuiwei13 Date: Thu, 19 May 2016 02:13:57 +0800 Subject: [PATCH] integrade dockmeter local features: 1) smart cgroup controlling; 2) account billing; 3) libs to get host resources / process runtime elapsed; --- meter/connector/master.py | 99 +++++++++++++++++++++++++++++++ meter/connector/minion.py | 43 ++++++++++++++ meter/daemon/http.py | 82 ++++++++++++++++++++++++++ meter/daemon/master_v1.py | 58 ++++++++++++++++++ meter/daemon/minion_v1.py | 76 ++++++++++++++++++++++++ meter/intra/billing.py | 46 +++++++++++++++ meter/intra/cgroup.py | 112 +++++++++++++++++++++++++++++++++++ meter/intra/smart.py | 96 ++++++++++++++++++++++++++++++ meter/intra/system.py | 121 ++++++++++++++++++++++++++++++++++++++ meter/main.py | 69 ++++++++++++++++++++++ meter/policy/allocate.py | 5 ++ meter/policy/quota.py | 56 ++++++++++++++++++ 12 files changed, 863 insertions(+) create mode 100755 meter/connector/master.py create mode 100755 meter/connector/minion.py create mode 100755 meter/daemon/http.py create mode 100755 meter/daemon/master_v1.py create mode 100755 meter/daemon/minion_v1.py create mode 100755 meter/intra/billing.py create mode 100755 meter/intra/cgroup.py create mode 100755 meter/intra/smart.py create mode 100755 meter/intra/system.py create mode 100755 meter/main.py create mode 100755 meter/policy/allocate.py create mode 100755 meter/policy/quota.py diff --git a/meter/connector/master.py b/meter/connector/master.py new file mode 100755 index 0000000..b5b8240 --- /dev/null +++ b/meter/connector/master.py @@ -0,0 +1,99 @@ +#!/usr/bin/python3 + +import socket, select, errno, threading, os + +class master_connector: + + tcp_port = 1727 + max_minions = 256 + + conn = {} + epoll_fd = select.epoll() + + def establish_vswitch(ovsname): + os.system('ovs-vsctl del-br ovs-%s >/dev/null 2>&1' % ovsname) + os.system('ovs-vsctl add-br ovs-%s' % ovsname) + os.system('brctl addif ovs-bridge ovs-%s >/dev/null 2>&1' % ovsname) + os.system('ip link set ovs-system up') + os.system('ip link set ovs-%s up' % ovsname) + + def build_gre_conn(ovsname, ipaddr): + name = ipaddr.replace('.','_') + os.system('ovs-vsctl add-port ovs-%s gre-%s -- set interface gre-%s type=gre options:remote_ip=%s 2>/dev/null' % (ovsname, name, name, ipaddr)) + + def break_gre_conn(ovsname, ipaddr): + name = ipaddr.replace('.','_') + os.system('ovs-vsctl del-port ovs-%s gre-%s 2>/dev/null' % (ovsname, name)) + + def close_connection(fd): + master_connector.epoll_fd.unregister(fd) + master_connector.conn[fd][0].close() + addr = master_connector.conn[fd][1] + master_connector.conn.pop(fd) + master_connector.break_gre_conn('master', addr) + + def do_message_response(input_buffer): + assert(input_buffer == b'ack') + return b'ack' + + def start(): + thread = threading.Thread(target = master_connector.run_forever, args = []) + thread.setDaemon(True) + thread.start() + return thread + + def run_forever(): + listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listen_fd.bind(('', master_connector.tcp_port)) + listen_fd.listen(master_connector.max_minions) + + master_connector.epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) + + datalist = {} + + master_connector.establish_vswitch('master') + try: + while True: + epoll_list = master_connector.epoll_fd.poll() + for fd, events in epoll_list: + if fd == listen_fd.fileno(): + fileno, addr = listen_fd.accept() + fileno.setblocking(0) + master_connector.epoll_fd.register(fileno.fileno(), select.EPOLLIN | select.EPOLLET) + master_connector.conn[fileno.fileno()] = (fileno, addr[0]) + master_connector.build_gre_conn('master', addr[0]) + elif select.EPOLLIN & events: + datas = b'' + while True: + try: + data = master_connector.conn[fd][0].recv(10) + if not data and not datas: + master_connector.close_connection(fd) + break + else: + datas += data + except socket.error as msg: + if msg.errno == errno.EAGAIN: + try: + datalist[fd] = master_connector.do_message_response(datas) + master_connector.epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) + except: + master_connector.close_connection(fd) + else: + master_connector.close_connection(fd) + break + elif select.EPOLLOUT & events: + sendLen = 0 + while True: + sendLen += master_connector.conn[fd][0].send(datalist[fd][sendLen:]) + if sendLen == len(datalist[fd]): + break + master_connector.epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) + elif select.EPOLLHUP & events: + master_connector.close_connection(fd) + else: + continue + finally: + os.system('ovs-vsctl del-br ovs-master >/dev/null 2>&1') + diff --git a/meter/connector/minion.py b/meter/connector/minion.py new file mode 100755 index 0000000..5ed0362 --- /dev/null +++ b/meter/connector/minion.py @@ -0,0 +1,43 @@ +#!/usr/bin/python3 + +import socket, time, threading, os + +class minion_connector: + + def connect(server_ip): + from connector.master import master_connector + connected = True + while True: + try: + fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + fd.connect((server_ip, master_connector.tcp_port)) + connected = True + print("[info]", "connected to master.") + + master_connector.establish_vswitch('minion') + master_connector.build_gre_conn('minion', server_ip) + + while True: + data = b'ack' + if fd.send(data) != len(data): + break + readData = fd.recv(1024) + time.sleep(0.5) + fd.close() + except socket.error as e: + master_connector.break_gre_conn('minion', server_ip) + if connected: + print("[info]", "non-connected with master.") + except Exception as e: + pass + finally: + if connected: + os.system('ovs-vsctl del-br ovs-minion >/dev/null 2>&1') + connected = False + time.sleep(1) + + def start(server_ip): + thread = threading.Thread(target = minion_connector.connect, args = [server_ip]) + thread.setDaemon(True) + thread.start() + return thread diff --git a/meter/daemon/http.py b/meter/daemon/http.py new file mode 100755 index 0000000..def5a38 --- /dev/null +++ b/meter/daemon/http.py @@ -0,0 +1,82 @@ +import json, cgi, threading +from http.server import BaseHTTPRequestHandler, HTTPServer + +class base_http_handler(BaseHTTPRequestHandler): + + def load_module(self): + return None + + def do_POST(self): + try: + default_exception = 'unsupported request.' + success = True + data = None + + length = self.headers['content-length'] + if length == None: + length = self.headers['content-length'] = 0 + if int(length) > (1<<12): + raise Exception("data too large") + http_form = cgi.FieldStorage(fp=self.rfile, headers=self.headers,environ={'REQUEST_METHOD':'POST','CONTENT_TYPE': "text/html"}) + + form = {} + for item in http_form: + try: + value = http_form[item].file.read().strip() + except: + value = http_form[item].value + try: + value = value.decode() + except: + pass + form[item] = value + + parts = self.path.split('/', 2) + if len(parts) != 3: + raise Exception(default_exception) + [null, version, path] = parts + + pymodule = self.load_module() + '_' + version + module = __import__('daemon.' + pymodule) + handler = module.__dict__[pymodule].__dict__['case_handler'] + method = path.replace('/', '_') + if not hasattr(handler, method): + raise Exception(default_exception) + + data = handler.__dict__[method](form, self.handler_class.args) + except Exception as e: + success = False + data = {"reason": str(e)} + finally: + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"success": success, "data": data}).encode()) + self.wfile.write("\n".encode()) + return + +class master_http_handler(base_http_handler): + + http_port = 1728 + + def load_module(self): + self.handler_class = master_http_handler + return 'master' + +class minion_http_handler(base_http_handler): + + http_port = 1729 + + def load_module(self): + self.handler_class = minion_http_handler + return 'minion' + +class http_daemon_listener: + + def __init__(self, handler_class, args = None): + handler_class.args = args + self.handler_class = handler_class + + def listen(self): + server = HTTPServer(('', self.handler_class.http_port), self.handler_class) + server.serve_forever() diff --git a/meter/daemon/master_v1.py b/meter/daemon/master_v1.py new file mode 100755 index 0000000..6a8062b --- /dev/null +++ b/meter/daemon/master_v1.py @@ -0,0 +1,58 @@ +import subprocess, os + +def http_client_post(ip, port, url, entries = {}): + import urllib.request, urllib.parse, json + url = url if not url.startswith('/') else url[1:] + response = urllib.request.urlopen('http://%s:%d/%s' % (ip, port, url), urllib.parse.urlencode(entries).encode()) + obj = json.loads(response.read().decode().strip()) + response.close() + return obj + +class case_handler: + # [Order-by] lexicographic order + + # curl -L -X POST http://0.0.0.0:1728/v1/minions/list + def minions_list(form, args): + minions = [] + for item in args.conn: + minions.append(args.conn[item][1]) + return {"minions": minions} + + # curl -L -X POST -F mem=4096 -F cpu=2 http://0.0.0.0:1728/v1/resource/allocation + def resource_allocation(form, args): + mem = int(form['mem']) + cpu = int(form['cpu']) + candidates = {} + from daemon.http import minion_http_handler + for item in args.conn: + addr = args.conn[item][1] + obj = http_client_post(addr, minion_http_handler.http_port, '/v1/system/memsw/available') + if obj['success'] and obj['data']['Mbytes'] >= mem: + candidates[addr] = obj['data'] + + if len(candidates) <= 0: + raise Exception("no minions") + else: + from policy.allocate import candidates_selector + one = candidates_selector.select(candidates) + return {"recommend": one} + + # curl -L -X POST -F user=docklet http://0.0.0.0:1728/v1/user/live/add + def user_live_add(form, args): + if not os.path.exists('/var/lib/docklet/global/users/%s' % form['user']): + return False + subprocess.getoutput('echo live > /var/lib/docklet/global/users/%s/status' % form['user']) + return True + + # curl -L -X POST -F user=docklet http://0.0.0.0:1728/v1/user/live/remove + def user_live_remove(form, args): + subprocess.getoutput('rm -f /var/lib/docklet/global/users/%s/status' % form['user']) + return True + + # curl -L -X POST http://0.0.0.0:1728/v1/user/live/list + def user_live_list(form, args): + return subprocess.getoutput('ls -1 /var/lib/docklet/global/users/*/status 2>/dev/null | awk -F\/ \'{print $(NF-1)\'}').split() + + + + diff --git a/meter/daemon/minion_v1.py b/meter/daemon/minion_v1.py new file mode 100755 index 0000000..dc0303f --- /dev/null +++ b/meter/daemon/minion_v1.py @@ -0,0 +1,76 @@ +from intra.system import system_manager +from intra.billing import billing_manager +from intra.cgroup import cgroup_manager +from policy.quota import * +from intra.smart import smart_controller + +class case_handler: + # [Order-by] lexicographic order + + # curl -L -X POST -F uuid=docklet-1-0 http://0.0.0.0:1729/v1/billing/increment + def billing_increment(form, args): + return billing_manager.fetch_increment_and_clean(form['uuid']) + + # curl -L -X POST http://0.0.0.0:1729/v1/cgroup/container/list + def cgroup_container_list(form, args): + return cgroup_manager.get_cgroup_containers() + + # curl -L -X POST -F policy=etime_rev_policy http://0.0.0.0:1729/v1/smart/quota/policy + def smart_quota_policy(form, args): + msg = 'success' + try: + smart_controller.set_policy(eval(form['policy'])) + except Exception as e: + msg = e + return {'message': msg} + + # curl -L -X POST -F uuid=n1 http://0.0.0.0:1729/v1/cgroup/container/limit + def cgroup_container_limit(form, args): + return cgroup_manager.get_container_limit(form['uuid']) + + # curl -L -X POST -F uuid=n1 http://0.0.0.0:1729/v1/cgroup/container/sample + def cgroup_container_sample(form, args): + return cgroup_manager.get_container_sample(form['uuid']) + + # curl -L -X POST http://0.0.0.0:1729/v1/system/loads + def system_loads(form, args): + return system_manager.get_system_loads() + + # curl -L -X POST http://0.0.0.0:1729/v1/system/memsw/available + def system_memsw_available(form, args): + return system_manager.get_available_memsw() + + # curl -L -X POST -F size=16 http://0.0.0.0:1729/v1/system/swap/extend + def system_swap_extend(form, args): + return system_manager.extend_swap(int(form['size'])) + + # curl -L -X POST http://0.0.0.0:1729/v1/system/swap/clear + def system_swap_clear(form, args): + return system_manager.clear_all_swaps() + + # curl -L -X POST http://0.0.0.0:1729/v1/system/total/physical/memory + def system_total_physical_memory(form, args): + return system_manager.get_total_physical_memory_for_containers() + + ''' + # curl -X POST -F uuid=n1 http://0.0.0.0:1729/v1/blacklist/add + def blacklist_add(form): + exists = form['uuid'] in smart_controller.blacklist + if not exists: + smart_controller.blacklist.add(form['uuid']) + return {"changed": not exists} + + # curl -X POST -F uuid=n1 http://0.0.0.0:1729/v1/blacklist/remove + def blacklist_remove(form): + exists = form['uuid'] in smart_controller.blacklist + if exists: + smart_controller.blacklist.remove(form['uuid']) + return {"changed": exists} + + # curl -X POST http://0.0.0.0:1729/v1/blacklist/show + def blacklist_show(form): + blacklist = [] + for item in smart_controller.blacklist: + blacklist.append(item) + return blacklist + ''' diff --git a/meter/intra/billing.py b/meter/intra/billing.py new file mode 100755 index 0000000..e8ca8f8 --- /dev/null +++ b/meter/intra/billing.py @@ -0,0 +1,46 @@ +import subprocess, time, os + +from intra.system import system_manager + +class billing_manager: + + history_book = {} + + def on_lxc_acct_usage(uuid, prev, curr, interval): + cpu_gen = max(0, curr['cpu_sample'] - prev['cpu_sample']) >> 20 # in ms + mem_gen = ((curr['mem_phys_sample'] + prev['mem_phys_sample']) * interval) >> 11 # in kbytes + try: + os.makedirs('%s/%s' % (system_manager.db_prefix, uuid)) + except: + pass + with open('%s/%s/usage' % (system_manager.db_prefix, uuid), 'a') as fp: + fp.write('%d %d\n' % (cpu_gen, mem_gen)) + + def add_usage_sample(uuid, sample, interval): + if uuid in billing_manager.history_book: + billing_manager.on_lxc_acct_usage(uuid, billing_manager.history_book[uuid], sample, interval) + billing_manager.history_book[uuid] = sample + + def clean_dead_node(uuid): + if uuid in billing_manager.history_book: + billing_manager.history_book.pop(uuid) + + def fetch_increment_and_clean(uuid): + cpu_acct = 0.0 + mem_acct = 0.0 + cnt_acct = 0 + try: + fetch_path = '%s/%s/%f' % (system_manager.db_prefix, uuid, time.time()) + os.rename('%s/%s/usage' % (system_manager.db_prefix, uuid), fetch_path) + with open(fetch_path, 'r') as fp: + line = fp.readline() + while line != '': + [cpu, mem] = line.split() + line = fp.readline() + cnt_acct += 1 + cpu_acct += float(cpu) + mem_acct += float(mem) + os.remove(fetch_path) + except: + pass + return {"cpu_acct": cpu_acct, "mem_acct": mem_acct, "cnt_acct": cnt_acct} diff --git a/meter/intra/cgroup.py b/meter/intra/cgroup.py new file mode 100755 index 0000000..02bb705 --- /dev/null +++ b/meter/intra/cgroup.py @@ -0,0 +1,112 @@ +import subprocess, os + +class cgroup_controller: + + def read_value(group, uuid, item): + path = cgroup_manager.__default_prefix__ % (group, uuid, item) + if not os.path.exists(path): + raise Exception('read: container "%s" not found!' % uuid) + with open(path, 'r') as file: + value = file.read() + return value.strip() + + def write_value(group, uuid, item, value): + path = cgroup_manager.__default_prefix__ % (group, uuid, item) + if not os.path.exists(path): + raise Exception('write: container "%s" not found!' % uuid) + try: + with open(path, 'w') as file: + file.write(str(value)) + except: + pass + +class cgroup_manager: + + __prefix_docker__ = '/sys/fs/cgroup/%s/system.slice/docker-%s.scope/%s' + __prefix_lxc__ = '/sys/fs/cgroup/%s/lxc/%s/%s' + __prefix_lxcinit__ = '/sys/fs/cgroup/%s/init.scope/lxc/%s/%s' + + def set_default_memory_limit(limit): + cgroup_manager.__default_memory_limit__ = limit + + def set_cgroup_prefix(prefix = __prefix_lxc__): + cgroup_manager.__default_prefix__ = prefix + + def auto_detect_prefix(): + cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_docker__ + if len(cgroup_manager.get_cgroup_containers()) > 0: + return + cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_lxcinit__ + if len(cgroup_manager.get_cgroup_containers()) > 0: + return + cgroup_manager.__default_prefix__ = cgroup_manager.__prefix_lxc__ + if len(cgroup_manager.get_cgroup_containers()) > 0: + return + # print("[info]", "set cgroup prefix to %s" % cgroup_manager.__default_prefix__) + + def get_cgroup_containers(): + containers = subprocess.getoutput("find %s -type d 2>/dev/null | awk -F\/ '{print $(NF-1)}'" % (cgroup_manager.__default_prefix__ % ('cpu', '*', '.'))).split() + uuids = [] + for item in containers: + if item.startswith('docker-') and item.endswith('.scope') and len(item) > 64: + uuids.append(item[7:-6]) + else: + uuids.append(item) + return uuids + + def get_container_pid(uuid): + return int(cgroup_controller.read_value('cpu', uuid, 'tasks').split()[0]) + + def get_container_sample(uuid): + mem_page_sample = int(cgroup_controller.read_value('memory', uuid, 'memory.memsw.usage_in_bytes')) + mem_phys_sample = int(cgroup_controller.read_value('memory', uuid, 'memory.usage_in_bytes')) + cpu_sample = int(cgroup_controller.read_value('cpu', uuid, 'cpuacct.usage')) + pids_sample = int(cgroup_controller.read_value('pids', uuid, 'pids.current')) + container_pid = cgroup_manager.get_container_pid(uuid) + + from intra.system import system_manager + real_time = system_manager.get_proc_etime(container_pid) + return {"cpu_sample": cpu_sample, "pids_sample": pids_sample, "mem_page_sample": mem_page_sample, "mem_phys_sample": mem_phys_sample, "pid": container_pid, "real_time": real_time} + + def get_container_limit(uuid): + mem_phys_quota = int(cgroup_controller.read_value('memory', uuid, 'memory.limit_in_bytes')) + mem_page_quota = int(cgroup_controller.read_value('memory', uuid, 'memory.memsw.limit_in_bytes')) + cpu_shares = int(cgroup_controller.read_value('cpu', uuid, 'cpu.shares')) + cpu_quota = int(cgroup_controller.read_value('cpu', uuid, 'cpu.cfs_quota_us')) + cpu_quota = cpu_quota if cpu_quota >= 0 else -1 + + pids_quota = cgroup_controller.read_value('pids', uuid, 'pids.max') + pids_quota = int(pids_quota) if pids_quota != 'max' else -1 + return {"cpu_quota": cpu_quota, "cpu_shares": cpu_shares, "mem_phy_quota": mem_phys_quota, "mem_page_quota": mem_page_quota, "pids_quota": pids_quota} + + def get_container_oom_status(uuid): + [_x, idle, _y, oom] = cgroup_controller.read_value('memory', uuid, 'memory.oom_control').split() + return (idle == '1', oom == '1') + + def set_container_oom_idle(uuid, idle): + cgroup_controller.write_value('memory', uuid, 'memory.oom_control', 1 if idle else 0) + + def protect_container_oom(uuid): + cgroup_controller.write_value('memory', uuid, 'memory.oom_control', 1) + data = cgroup_manager.get_container_limit(uuid) + if data["mem_page_quota"] >= 9223372036854771712: + memory_limit_in_bytes = cgroup_manager.__default_memory_limit__ << 30 + mem_phy_quota = min(data["mem_phy_quota"], memory_limit_in_bytes) + mem_page_quota = memory_limit_in_bytes + cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'FROZEN') + cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', mem_phy_quota) + cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', mem_phy_quota) + cgroup_controller.write_value('memory', uuid, 'memory.memsw.limit_in_bytes', mem_page_quota) + cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'THAWED') + + def set_container_physical_memory_limit(uuid, Mbytes, freeze = False): + if freeze: + cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'FROZEN') + memory_limit = int(max(0, Mbytes)) << 20 + cgroup_controller.write_value('memory', uuid, 'memory.limit_in_bytes', memory_limit) + if freeze: + cgroup_controller.write_value('freezer', uuid, 'freezer.state', 'THAWED') + + def set_container_cpu_priority_limit(uuid, ceof): + cpu_scaling = min(1024, 10 + int(1024 * ceof)) + cgroup_controller.write_value('cpu', uuid, 'cpu.shares', cpu_scaling) diff --git a/meter/intra/smart.py b/meter/intra/smart.py new file mode 100755 index 0000000..7a22823 --- /dev/null +++ b/meter/intra/smart.py @@ -0,0 +1,96 @@ +import subprocess, time, os, threading, math + +from intra.system import system_manager +from intra.cgroup import cgroup_manager +from intra.billing import billing_manager + +class smart_controller: + + def set_policy(policy): + smart_controller.policy = policy + + def start(interval = 4): + thread = threading.Thread(target = smart_controller.smart_control_forever, args = [interval]) + thread.setDaemon(True) + thread.start() + return thread + + def smart_control_forever(interval): + last_live = [] + while True: + time.sleep(interval) + try: + mem_usage_mapping = {} + live = cgroup_manager.get_cgroup_containers() + for item in live: + try: + last_live.remove(item) + except: + pass + try: + cgroup_manager.protect_container_oom(item) + sample = cgroup_manager.get_container_sample(item) + mem_usage_mapping[item] = math.ceil(sample['mem_page_sample'] * 1e-6) + billing_manager.add_usage_sample(item, sample, interval) + except: + pass + for item in last_live: + billing_manager.clean_dead_node(item) + last_live = live + is_ready = True + + memory_available = system_manager.get_available_memsw() + if memory_available['Mbytes'] <= 0: + size_in_gb = int(math.ceil(-memory_available['Mbytes'] / 1024 / 16) * 16) + print("[warning]", 'overloaded containers, auto-extending %d G memsw.' % size_in_gb) + system_manager.extend_swap(size_in_gb) + + total_score = 0.0 + score_mapping = {} + for item in live: + score = max(1e-8, smart_controller.policy.get_score_by_uuid(item)) + score_mapping[item] = score + print(item, "(score/cpu)", score) + total_score += score + + # CPU Scoring + for item in live: + ceof = score_mapping[item] / total_score + cgroup_manager.set_container_cpu_priority_limit(item, ceof) + + # Iterative Memory Scoring + free_mem = system_manager.get_total_physical_memory_for_containers()['Mbytes'] + local_nodes = live + mem_alloc = {} + for item in live: + mem_alloc[item] = 0 + + while free_mem > 0 and len(local_nodes) > 0: + excess_mem = 0 + next_local_nodes = [] + for item in local_nodes: + mem_alloc[item] += int(math.floor(free_mem * score_mapping[item] / total_score)) + if mem_alloc[item] >= mem_usage_mapping[item]: + excess_mem += mem_alloc[item] - mem_usage_mapping[item] + mem_alloc[item] = mem_usage_mapping[item] + else: + next_local_nodes.append(item) + free_mem = excess_mem + local_nodes = next_local_nodes + + for item in live: + mem_alloc[item] += int(math.floor(free_mem * score_mapping[item] / total_score)) + cgroup_manager.set_container_physical_memory_limit(item, mem_alloc[item]) + print(item, "(malloc:usage)", mem_alloc[item], mem_usage_mapping[item]) + + if len(live) > 0: + print("-------------------------------") + + except: + pass + + +# echo "8:0 1000" > /sys/fs/cgroup/blkio/lxc/docklet-1-0/blkio.throttle.write_bps_device +# https://www.kernel.org/doc/Documentation/devices.txt +# while true; do clear; cat /sys/fs/cgroup/blkio/lxc/docklet-1-0/blkio.throttle.io_service_bytes; sleep 0.5; done +# hugetlb, net_cls, net_prio, /sbin/tc diff --git a/meter/intra/system.py b/meter/intra/system.py new file mode 100755 index 0000000..39efbca --- /dev/null +++ b/meter/intra/system.py @@ -0,0 +1,121 @@ +import subprocess, time, os + +from intra.cgroup import cgroup_manager + +class system_manager: + + db_prefix = '.' + + def set_db_prefix(prefix): + system_manager.db_prefix = prefix + try: + os.makedirs(prefix) + except: + pass + + def clear_all_swaps(): + subprocess.getoutput('swapoff -a') + subprocess.getoutput('losetup -D') + + def extend_swap(size): + if size < 0: + (mem_free, mem_total) = system_manager.get_memory_sample() + size = (mem_total + mem_total // 8) // 1024 + nid = 128 + while subprocess.getoutput("cat /proc/swaps | grep cg-loop | awk '{print $1}' | awk -F\- '{print $NF}' | grep %d$" % nid) != "": + nid = nid + 1 + start_time = time.time() + # setup + os.system('dd if=/dev/zero of=/tmp/cg-swap-%d bs=1G count=0 seek=%d >/dev/null 2>&1' % (nid, size)) + os.system('mknod -m 0660 /dev/cg-loop-%d b 7 %d >/dev/null 2>&1' % (nid, nid)) + os.system('losetup /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid)) + os.system('mkswap /dev/cg-loop-%d >/dev/null 2>&1' % nid) + success = os.system('swapon /dev/cg-loop-%d >/dev/null 2>&1' % nid) == 0 + # detach + # os.system('swapoff /dev/cg-loop-%d >/dev/null 2>&1' % nid) + # os.system('losetup -d /dev/cg-loop-%d >/dev/null 2>&1' % nid) + # os.system('rm -f /dev/cg-loop-%d /tmp/cg-swap-%d >/dev/null 2>&1' % (nid, nid)) + end_time = time.time() + return {"setup": success, "time": end_time - start_time } + + def get_cpu_sample(): + [a, b, c, d] = subprocess.getoutput("cat /proc/stat | grep ^cpu\ | awk '{print $2, $3, $4, $6}'").split() + cpu_time = int(a) + int(b) + int(c) + int(d) + return (cpu_time, time.time()) + + def get_memory_sample(): + mem_free = int(subprocess.getoutput("awk '{if ($1==\"MemAvailable:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024 + mem_total = int(subprocess.getoutput("awk '{if ($1==\"MemTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024 + return (mem_free, mem_total) + + def get_swap_sample(): + swap_free = int(subprocess.getoutput("awk '{if ($1==\"SwapFree:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024 + swap_total = int(subprocess.getoutput("awk '{if ($1==\"SwapTotal:\") print $2}' /proc/meminfo 2>/dev/null")) // 1024 + return (swap_free, swap_total) + + def get_system_loads(): + if 'last_cpu_sample' not in system_manager.__dict__: + system_manager.last_cpu_sample = system_manager.get_cpu_sample() + time.sleep(1) + cpu_sample = system_manager.get_cpu_sample() + (mem_free, mem_total) = system_manager.get_memory_sample() + (swap_free, swap_total) = system_manager.get_swap_sample() + ncpus = int(subprocess.getoutput("grep processor /proc/cpuinfo | wc -l")) + cpu_free = ncpus - (cpu_sample[0] - system_manager.last_cpu_sample[0]) * 0.01 / (cpu_sample[1] - system_manager.last_cpu_sample[1]) + cpu_free = 0.0 if cpu_free <= 0.0 else cpu_free + system_manager.last_cpu_sample = cpu_sample + return {"mem_free": mem_free, "mem_total": mem_total, "swap_free": swap_free, "swap_total": swap_total, "cpu_free": cpu_free, "cpu_total": ncpus } + + def get_proc_etime(pid): + fmt = subprocess.getoutput("ps -A -opid,etime | grep '^ *%d' | awk '{print $NF}'" % pid).strip() + if fmt == '': + return -1 + parts = fmt.split('-') + days = int(parts[0]) if len(parts) == 2 else 0 + fmt = parts[-1] + parts = fmt.split(':') + hours = int(parts[0]) if len(parts) == 3 else 0 + parts = parts[len(parts)-2:] + minutes = int(parts[0]) + seconds = int(parts[1]) + return ((days * 24 + hours) * 60 + minutes) * 60 + seconds + + def get_available_memsw(): + total_mem_limit = 0 + total_mem_used = 0 + sysloads = system_manager.get_system_loads() + live = cgroup_manager.get_cgroup_containers() + + for item in live: + try: + sample = cgroup_manager.get_container_sample(item) + limit = cgroup_manager.get_container_limit(item) + total_mem_limit += limit["mem_page_quota"] + total_mem_used += sample["mem_page_sample"] + except: + pass + + total_mem_limit >>= 20 + total_mem_used = (total_mem_used + (1<<20) - 1) >> 20 + + available_mem_resource = sysloads['mem_free'] + \ + sysloads['swap_free'] - total_mem_limit + total_mem_used + return {"Mbytes": available_mem_resource, "physical": sysloads['mem_free'], "cpu_free": sysloads['cpu_free']} + + def get_total_physical_memory_for_containers(): + total_mem_used = 0 + sysloads = system_manager.get_system_loads() + live = cgroup_manager.get_cgroup_containers() + + for item in live: + try: + sample = cgroup_manager.get_container_sample(item) + total_mem_used += sample["mem_page_sample"] + except: + pass + + total_mem_used = (total_mem_used + (1<<20) - 1) >> 20 + total_physical_memory_for_containers = sysloads['mem_free'] + total_mem_used + + return {"Mbytes": total_physical_memory_for_containers} + diff --git a/meter/main.py b/meter/main.py new file mode 100755 index 0000000..2168bc0 --- /dev/null +++ b/meter/main.py @@ -0,0 +1,69 @@ +#!/usr/bin/python3 + +######################################## +# Boot for Local: +# sudo ./main (or: sudo ./main [master-ipaddr]) +# + +######################################## +# Usage for Local: +# curl -F uuid="lxc-name1" http://0.0.0.0:1729/v1/cgroup/container/sample +# + +import time, sys, signal, json, subprocess, os + +if __name__ == '__main__': + if not subprocess.getoutput('lsb_release -r -s 2>/dev/null').startswith('16.04'): + raise Exception('Ubuntu 16.04 LTS is required.') + + if not os.path.exists('/sys/fs/cgroup/memory/memory.memsw.usage_in_bytes'): + raise Exception('Please append "swapaccount=1" to kernel.') + + if subprocess.getoutput('whoami') != 'root': + raise Exception('Root privilege is required.') + + from daemon.http import * + if len(sys.argv) == 1: + sys.argv.append('disable-network') + + def signal_handler(signal, frame): + if sys.argv[1] == 'master': + subprocess.getoutput('ovs-vsctl del-br ovs-master >/dev/null 2>&1') + else: + subprocess.getoutput('ovs-vsctl del-br ovs-minion >/dev/null 2>&1') + sys.exit(0) + signal.signal(signal.SIGINT, signal_handler) + + if sys.argv[1] != 'master': # for minions + from intra.cgroup import cgroup_manager + cgroup_manager.auto_detect_prefix() + cgroup_manager.set_default_memory_limit(4) + + from intra.system import system_manager + system_manager.set_db_prefix('/var/lib/docklet/meter') + # system_manager.extend_swap(32) + + if sys.argv[1] != 'disable-network': + from connector.minion import minion_connector + minion_connector.start(sys.argv[1]) + else: + print("(No network mode)") + + from policy.quota import identify_policy + from intra.smart import smart_controller + + smart_controller.set_policy(identify_policy) + smart_controller.start() + + print("Minion REST Daemon Starts Listening ..") + http = http_daemon_listener(minion_http_handler) + http.listen() + + else: # for master: sudo ./main master + from connector.master import master_connector + master_connector.start() + + print("Master REST Daemon Starts Listening ..") + http = http_daemon_listener(master_http_handler, master_connector) + http.listen() + diff --git a/meter/policy/allocate.py b/meter/policy/allocate.py new file mode 100755 index 0000000..a8afa4d --- /dev/null +++ b/meter/policy/allocate.py @@ -0,0 +1,5 @@ +class candidates_selector: + + def select(candidates): + return max(candidates, key=lambda addr: candidates[addr]['cpu_free']) + diff --git a/meter/policy/quota.py b/meter/policy/quota.py new file mode 100755 index 0000000..a49717d --- /dev/null +++ b/meter/policy/quota.py @@ -0,0 +1,56 @@ +from intra.system import system_manager +from intra.cgroup import cgroup_manager +import subprocess + +class identify_policy: + + def get_score_by_uuid(uuid): + return 1.0 + +class etime_rev_policy(identify_policy): + + def get_score_by_uuid(uuid): + pid = cgroup_manager.get_container_pid(uuid) + etime = system_manager.get_proc_etime(pid) + return 1.0 / (1.0 + etime) + +class mem_usage_policy(identify_policy): + + def get_score_by_uuid(uuid): + sample = cgroup_manager.get_container_sample(uuid) + return sample["mem_page_sample"] + +class mem_quota_policy(identify_policy): + + def get_score_by_uuid(uuid): + sample = cgroup_manager.get_container_limit(uuid) + return sample["mem_page_quota"] + +class cpu_usage_policy(identify_policy): + + def get_score_by_uuid(uuid): + sample = cgroup_manager.get_container_sample(uuid) + return sample["cpu_sample"] + +class cpu_usage_rev_policy(identify_policy): + + def get_score_by_uuid(uuid): + sample = cgroup_manager.get_container_sample(uuid) + return 1024 * 1024 / (1.0 + sample["cpu_sample"]) + +class cpu_speed_policy(identify_policy): + + def get_score_by_uuid(uuid): + sample = cgroup_manager.get_container_sample(uuid) + pid = cgroup_manager.get_container_pid(uuid) + etime = system_manager.get_proc_etime(pid) + return sample["cpu_sample"] / etime + +class user_state_policy(identify_policy): + + def get_score_by_uuid(uuid): + user = uuid.split('-')[0] + online = subprocess.getoutput('cat /var/lib/docklet/global/users/%s/status 2>/dev/null' % user) == 'live' + return 10.0 if online else 1.0 + +