diff --git a/LinUCBSchedule/EpsilonGreedy.py b/LinUCBSchedule/EpsilonGreedy.py new file mode 100644 index 0000000..610d864 --- /dev/null +++ b/LinUCBSchedule/EpsilonGreedy.py @@ -0,0 +1,147 @@ +import numpy as np +import time +import random +from multiprocessing import Process, Manager +from ScheduleFrame import * + + +def gen_all_config(num_apps, num_resources): + """ + generate all resource config according to the number of apps and total resources + + Args: + num_apps (int): number of apps + num_resources (int): total units of resources + + Returns: + list: a list containing all possible config, which is list + """ + if num_apps == 1: + # Only one app, it gets all remaining resources + return [[num_resources]] + all_config = [] + for i in range(num_resources + 1): + # Recursively allocate the remaining resources among the remaining app + for sub_allocation in gen_all_config(num_apps - 1, num_resources - i): + all_config.append([i] + sub_allocation) + return all_config + + +def find_vectors(x, d_max): + n = len(x) + s = sum(x) + solutions = [] + + def backtrack(current_vector, index, current_distance): + if current_distance > d_max or any(val < 0 for val in current_vector): + return + if index == n: + if sum(current_vector) == s: + solutions.append(list(current_vector)) + return + for delta in range(-d_max, d_max + 1): + current_vector[index] += delta + new_distance = current_distance + abs(delta) + backtrack(current_vector, index + 1, new_distance) + current_vector[index] -= delta + + initial_vector = x.copy() + backtrack(initial_vector, 0, 0) + return solutions + + +class EpsilonGreedy(ScheduleFrame): + def __init__(self, epsilon, factor_alpha, num_apps, num_resources): + super().__init__() + self.init_factor = [epsilon, factor_alpha, 0.95, 0.98] + self.epsilon = self.init_factor[0] + self.factor_alpha = self.init_factor[1] + self.all_cache_config = gen_all_config(num_apps, num_resources) + self.n_arms = len(self.all_cache_config) + self.last_arm_index = -1 + + # 探索过程随机探索与邻近探索的比例 + self.sub_epsilon = self.init_factor[2] + self.sub_alpha = self.init_factor[3] + # 邻近探索的曼哈顿距离 + self.dmax = 10 + + self.counts = np.zeros(self.n_arms) + self.values = np.zeros(self.n_arms) + self.neighbour = {} + self.last_reward = [-1] * self.n_arms + + # 并行加速搜索临近配置的过程 + st_time = time.time() + # for i in range(self.n_arms): + # self.neighbour[i] = find_vectors(self.all_cache_config[i], self.dmax) + all_threads = [] + parts_neighbour = [] + num_tasks = self.n_arms // 8 + with Manager() as manager: + parts_neighbour.append(manager.dict()) + for i in range(0, self.n_arms, num_tasks): + all_threads.append(Process(target=self.find_neighbours, args=(i, min(i + num_tasks, self.n_arms), parts_neighbour[-1]))) + all_threads[-1].start() + for i in range(len(all_threads)): + all_threads[i].join() + for i in range(len(parts_neighbour)): + self.neighbour.update(parts_neighbour[i]) + en_time = time.time() + print('construct neighbour :{}'.format(en_time - st_time)) + + def select_arm(self): + if np.random.rand() < self.epsilon: + # 探索 + self.epsilon *= self.factor_alpha + if np.random.rand() < self.sub_epsilon: + # 随机探索 + self.sub_epsilon *=self.sub_alpha + chosen_arm = np.random.randint(self.n_arms) + else: + # 最优点临近探索 + curr_max = np.argmax(self.values) + num_neighbours = len(self.neighbour[int(curr_max)]) + chosen = random.randint(0, num_neighbours - 1) + chosen_arm = self.all_cache_config.index(self.neighbour[int(curr_max)][chosen]) + else: + # 利用 + chosen_arm = np.argmax(self.values) + self.last_arm_index = chosen_arm + return self.all_cache_config[chosen_arm] + + def update(self, reward, chosen_arm): + if self.last_arm_index == -1: + assert chosen_arm is not None, "Need Initial Arm Index" + chosen_arm_index = self.all_cache_config.index(list(chosen_arm.values())) + else: + chosen_arm_index = self.last_arm_index + + last_reward = self.last_reward[chosen_arm_index] + if last_reward > 0 and float(abs(reward - last_reward)) / last_reward > 0.05: + # workload change + print('-----judge workload change, {}'.format(float(abs(reward - last_reward)) / last_reward)) + self.last_reward = [-1] * self.n_arms + self.counts = np.zeros(self.n_arms) + self.values = np.zeros(self.n_arms) + self.epsilon = self.init_factor[0] + self.sub_epsilon = self.init_factor[2] + + self.counts[chosen_arm_index] += 1 + n = self.counts[chosen_arm_index] + value = self.values[chosen_arm_index] + new_value = ((n - 1) / n) * value + (1 / n) * reward + self.values[chosen_arm_index] = new_value + self.last_reward[chosen_arm_index] = reward + + def find_neighbours(self, start, end, part_neighbour): + # 并行执行 + # start_time = time.time() + for i in range(start, end): + part_neighbour[i] = find_vectors(self.all_cache_config[i], self.dmax) + # end_time = time.time() + # print('----- find_neighbour, from {} to {}, Used Time: {}'.format(start, end, end_time - start_time)) + + def get_now_reward(self, performance, context_info=None): + th_reward = sum(float(x) for x in performance) / len(performance) + return th_reward diff --git a/LinUCBSchedule/LayerUCB.py b/LinUCBSchedule/LayerUCB.py new file mode 100644 index 0000000..c9da59d --- /dev/null +++ b/LinUCBSchedule/LayerUCB.py @@ -0,0 +1,271 @@ +import random +import numpy as np +from itertools import zip_longest +from ScheduleFrame import * + + +def gen_feasible_configs(num_of_cache, cache_top_k): + # TODO:根据各个应用的top_k整理出的所有可行的结果 + num_app = len(cache_top_k) + top_k = len(cache_top_k[0]) + + def gen_side(tmp, k, n=1): + """ + :param root: root node, first app node + :param n: n_th in top_k + :return: + """ + if n == num_app: + return [[]] + for k in range(top_k): + t1 = k * top_k ** (num_app - n - 1) + t2 = top_k ** (num_app - 1) - (top_k - k - 1) * top_k ** (num_app - n - 1) + delta2 = top_k ** (num_app - 1 - n) + delta1 = top_k ** (num_app - n) + for i in range(t1, t2, delta1): + for j in range(i, i + delta2): + app_core = cache_top_k[n][k] + feasible_core = num_of_cache - sum(tmp[j]) + if app_core > feasible_core: + tmp[j].append(feasible_core) + else: + tmp[j].append(app_core) + + gen_side(n=n + 1, tmp=tmp, k=k) + return tmp + + all_feasible_config = [] + for i in range(len(cache_top_k)): + cache_top_k.append(cache_top_k.pop(0)) + for j in range(top_k): + tmp = [[cache_top_k[0][j]] for _ in range(top_k ** (num_app - 1))] + res = gen_side(tmp, k=0) + for k in range(i + 1): + res = [[row[-1]] + row[:-1] for row in res] + all_feasible_config.extend(res) + + # 先去重 + unique_tuples = set(tuple(x) for x in all_feasible_config) + all_feasible_config = [list(x) for x in unique_tuples] + # 对未利用的资源重新分配 + for config in all_feasible_config: + assert sum(config) <= num_of_cache, 'The allocated cache exceeds the limit' + if sum(config) < num_of_cache: + left_cache = num_of_cache - sum(config) + for i in range(left_cache): + min_value = min(config) + min_index = config.index(min_value) + config[min_index] += 1 + return all_feasible_config + + +def get_top_k(arr, k, times): + if times < 5 or random.randint(1, 10) > 8: + arr_top_k_id = [random.randint(0, len(arr) - 1) for _ in range(k)] + else: + arr_top_k_id = np.argsort(arr)[-k:] + return list(arr_top_k_id) + + +def beam_search(num_of_cache, all_apps, p_c_t, times, end_condition=30): + # TODO:从各个子bandit中选出top_k的配置,并进行组合,形成全局的配置 + action = {}.fromkeys(all_apps) + num_app = len(all_apps) + top_k = int(10 ** (np.log10(end_condition) / num_app)) + + cache_top_k = [get_top_k(p_c_t[all_apps[i]], top_k, times) for i in range(num_app)] + feasible_configs = gen_feasible_configs(num_of_cache=num_of_cache, cache_top_k=cache_top_k) + sum_p_list = [] + for config in feasible_configs: + try: + config_p = [p_c_t[all_apps[i]][config[i]] for i in range(num_app)] + sum_p = sum(config_p) + sum_p_list.append(sum_p) + except IndexError as e: + print(f"Caught an IndexError: {e}") + print(config) + print(cache_top_k) + cache_act = feasible_configs[np.argmax(sum_p_list)] + for i in range(num_app): + action[all_apps[i]] = cache_act[i] + return action + + +def latin_hypercube_sampling(n, d, m, M, ratio): + # Initialize the samples array + samples = np.zeros((n, d)) + for i in range(n): + # Generate d random numbers and normalize them + while True: + x = np.random.uniform(m, M, d) + if np.sum(x) >= M: + break + x = x / np.sum(x) * M + + # Check if all elements are in the range [m, M] + if np.all(x >= m) and np.all(x <= M): + samples[i] = x + else: + # Re-generate the sample if it doesn't satisfy the constraints + i -= 1 + sample_config = [] + for i in range(len(samples)): + tmp = [int(ele) for ele in samples[i]] + curr_sum = sum(tmp) + tmp[-1] += M - curr_sum + sample_config.append([ele * ratio for ele in tmp]) + return sample_config + + +class LayerUCB(ScheduleFrame): + def __init__(self, all_apps, n_cache, alpha, factor_alpha, n_features): + super().__init__() + self.all_apps = all_apps + self.num_apps = len(all_apps) + self.arm_dict = {} + for app in self.all_apps: + self.arm_dict[app] = list(range(0, n_cache, 5)) + self.n_features = n_features + + self.A_c = {} + self.b_c = {} + self.p_c_t = {} + + for app in self.all_apps: + self.A_c[app] = np.zeros((len(self.arm_dict[app]), self.n_features * 2, self.n_features * 2)) + self.b_c[app] = np.zeros((len(self.arm_dict[app]), self.n_features * 2, 1)) + self.p_c_t[app] = np.zeros(len(self.arm_dict[app])) + for arm in range(len(self.arm_dict[app])): + self.A_c[app][arm] = np.eye(self.n_features * 2) + + # contexts + self.context = {} + self.other_context = {} + sum = np.zeros(n_features) + for app in self.all_apps: + self.context[app] = [1.0 for _ in range(self.n_features)] + sum += np.array(self.context[app]) + for app in self.all_apps: + self.other_context[app] = list((sum - np.array(self.context[app])) / (len(self.all_apps) - 1)) + + return + + def select_arm(self): + if self.sampling_model: + if self.times == len(self.sample_config): + # sample end + self.times += 1 + self.sampling_model = False + # print(max(self.sample_result)) + return self.sample_config[self.sample_result.index(max(self.sample_result))] + # initial phase + cache_allocation = self.sample_config[self.times] + self.times += 1 + return cache_allocation + + self.times += 1 + if self.duration_period > self.threshold and random.randint(1, 10) < 8: + cache_allocation = [] + for app in self.all_apps: + cache_allocation.append(self.curr_best_config[app]) + return cache_allocation + else: + contexts = {} + for app in self.all_apps: + A = self.A_c[app] + b = self.b_c[app] + contexts[app] = np.hstack((self.context[app], self.other_context[app])) + + for i in range(self.n_arms): + theta = np.linalg.inv(A[i]).dot(b[i]) + cntx = np.array(contexts[app]) + self.p_c_t[app][i] = theta.T.dot(cntx) + self.alpha * np.sqrt( + cntx.dot(np.linalg.inv(A[i]).dot(cntx))) + cache_action = beam_search(self.n_arms - 1, self.all_apps, self.p_c_t, self.times, end_condition=30) + + # cache_action is a dict + cache_allocation = [] + for app in self.all_apps: + cache_allocation.append(cache_action[app]) + + self.alpha *= self.factor_alpha + return cache_allocation + + def update(self, reward, chosen_arm): + if self.sampling_model: + # initial phase + self.sample_result.append(float(reward)) + else: + if self.curr_best_config is None: + self.curr_best_config = chosen_arm + self.curr_best_reward = reward + self.duration_period = 1 + # self.history_reward = [] + else: + if reward > self.curr_best_reward: + self.curr_best_reward = reward + self.curr_best_config = chosen_arm + self.duration_period = 1 + # self.history_reward = [] + else: + self.duration_period += 1 + + contexts = {} + for app in self.all_apps: + arm = chosen_arm[app] + contexts[app] = np.hstack((self.context[app], self.other_context[app])) + self.A_c[app][arm] += np.outer(np.array(contexts[app]), np.array(contexts[app])) + self.b_c[app][arm] = np.add(self.b_c[app][arm].T, np.array(contexts[app]) * reward).reshape(self.n_features * 2, 1) + + if self.times > 200: + if len(self.history_reward) < 60: + self.history_reward.append(round(float(reward), 3)) + else: + first_half = self.history_reward[:30] + second_half = self.history_reward[30:] + first_aver = sum(first_half) / len(first_half) + second_aver = sum(second_half) / len(second_half) + # print('{} --> {}, {}'.format(first_aver, second_aver, abs(second_aver - first_aver) / first_aver)) + if abs(second_aver - first_aver) / first_aver > 0.20: + # workload change + print('----- test workload change -----') + self.reset() + else: + self.history_reward = second_half + + def get_now_reward(self, performance, context_info=None): + # update the context + tmp = [list(row) for row in zip_longest(*context_info, fillvalue=None)] + sum_context = np.zeros(self.n_features) + for i, app in enumerate(self.all_apps): + self.context[app] = tmp[i] + sum_context += np.array(self.context[app]) + for app in self.all_apps: + self.other_context[app] = list((sum_context - np.array(self.context[app])) / (len(self.all_apps) - 1)) + # calculate the reward + th_reward = sum(float(x) for x in performance) / len(performance) + return th_reward + + def reset(self): + self.alpha = self.init_factor[0] + self.times = 0 + self.sampling_model = True + self.sample_config = latin_hypercube_sampling(20, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio) + self.sample_result = [] + + self.curr_best_config = None + self.curr_best_reward = None + self.duration_period = 0 + + self.history_reward = [] + + for app in self.all_apps: + self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2)) + self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1)) + self.p_c_t[app] = np.zeros(self.n_arms) + for arm in range(self.n_arms): + self.A_c[app][arm] = np.eye(self.n_features * 2) + + +if __name__ == '__main__': + pass diff --git a/LinUCBSchedule/LinUCB.py b/LinUCBSchedule/LinUCB.py new file mode 100644 index 0000000..b925400 --- /dev/null +++ b/LinUCBSchedule/LinUCB.py @@ -0,0 +1,315 @@ +import pickle +import random +import config +from itertools import zip_longest +from ScheduleFrame import * + + +def gen_feasible_configs(num_of_cache, cache_top_k): + # TODO:根据各个应用的top_k整理出的所有可行的结果 + num_app = len(cache_top_k) + top_k = len(cache_top_k[0]) + + def gen_side(tmp, k, n=1): + """ + :param root: root node, first app node + :param n: n_th in top_k + :return: + """ + if n == num_app: + return [[]] + for k in range(top_k): + t1 = k * top_k ** (num_app - n - 1) + t2 = top_k ** (num_app - 1) - (top_k - k - 1) * top_k ** (num_app - n - 1) + delta2 = top_k ** (num_app - 1 - n) + delta1 = top_k ** (num_app - n) + for i in range(t1, t2, delta1): + for j in range(i, i + delta2): + app_core = cache_top_k[n][k] + feasible_core = num_of_cache - sum(tmp[j]) + if app_core > feasible_core: + tmp[j].append(feasible_core) + else: + tmp[j].append(app_core) + + gen_side(n=n + 1, tmp=tmp, k=k) + return tmp + + all_feasible_config = [] + for i in range(len(cache_top_k)): + cache_top_k.append(cache_top_k.pop(0)) + for j in range(top_k): + tmp = [[cache_top_k[0][j]] for _ in range(top_k ** (num_app - 1))] + res = gen_side(tmp, k=0) + for k in range(i + 1): + res = [[row[-1]] + row[:-1] for row in res] + all_feasible_config.extend(res) + + # 先去重 + unique_tuples = set(tuple(x) for x in all_feasible_config) + all_feasible_config = [list(x) for x in unique_tuples] + # 对未利用的资源重新分配 + for config in all_feasible_config: + assert sum(config) <= num_of_cache, 'The allocated cache exceeds the limit' + if sum(config) < num_of_cache: + left_cache = num_of_cache - sum(config) + for i in range(left_cache): + min_value = min(config) + min_index = config.index(min_value) + config[min_index] += 1 + return all_feasible_config + + +def get_top_k(arr, k, times): + if times < 5 or random.randint(1, 10) > 8: + arr_top_k_id = [random.randint(0, len(arr) - 1) for _ in range(k)] + else: + arr_top_k_id = np.argsort(arr)[-k:] + return list(arr_top_k_id) + + +def beam_search(num_of_cache, all_apps, p_c_t, times, end_condition=30): + # TODO:从各个子bandit中选出top_k的配置,并进行组合,形成全局的配置 + action = {}.fromkeys(all_apps) + num_app = len(all_apps) + top_k = int(10 ** (np.log10(end_condition) / num_app)) + + cache_top_k = [get_top_k(p_c_t[all_apps[i]], top_k, times) for i in range(num_app)] + feasible_configs = gen_feasible_configs(num_of_cache=num_of_cache, cache_top_k=cache_top_k) + sum_p_list = [] + for config in feasible_configs: + try: + config_p = [p_c_t[all_apps[i]][config[i]] for i in range(num_app)] + sum_p = sum(config_p) + sum_p_list.append(sum_p) + except IndexError as e: + print(f"Caught an IndexError: {e}") + print(config) + print(cache_top_k) + cache_act = feasible_configs[np.argmax(sum_p_list)] + for i in range(num_app): + action[all_apps[i]] = cache_act[i] + return action + + +def latin_hypercube_sampling(n, d, m, M, ratio): + # Initialize the samples array + samples = np.zeros((n, d)) + for i in range(n): + # Generate d random numbers and normalize them + while True: + x = np.random.uniform(m, M, d) + if np.sum(x) >= M: + break + x = x / np.sum(x) * M + + # Check if all elements are in the range [m, M] + if np.all(x >= m) and np.all(x <= M): + samples[i] = x + else: + # Re-generate the sample if it doesn't satisfy the constraints + i -= 1 + sample_config = [] + for i in range(len(samples)): + tmp = [int(ele) for ele in samples[i]] + curr_sum = sum(tmp) + tmp[-1] += M - curr_sum + sample_config.append([ele * ratio for ele in tmp]) + return sample_config + + +class LinUCB(ScheduleFrame): + def __init__(self, all_apps, n_cache, alpha, factor_alpha, n_features, sample=False): + super().__init__() + self.all_apps = all_apps + self.num_apps = len(all_apps) + self.n_arms = n_cache + 1 + self.n_features = n_features + + self.init_factor = [alpha, factor_alpha] + self.alpha = self.init_factor[0] + self.factor_alpha = self.init_factor[1] + + self.times = 0 + self.sampling_model = sample + self.sample_result = {} + self.sample_config = None + self.sample_times = config.SAMPLE_TIMES + self.ratio = config.SAMPLE_RATIO + + self.curr_best_config = None + self.curr_best_reward = None + self.duration_period = 0 + self.threshold = config.APPROXIMATE_CONVERGENCE_THRESHOLD + self.probability_threshold = config.PROBABILITY_THRESHOLD + + self.load_change_threshold = config.LOAD_CHANGE_THRESHOLD + self.history_reward_window = config.HISTORY_REWARD_WINDOW + self.history_reward = [] + + self.A_c = {} + self.b_c = {} + self.p_c_t = {} + + for app in self.all_apps: + self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2)) + self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1)) + self.p_c_t[app] = np.zeros(self.n_arms) + for arm in range(self.n_arms): + self.A_c[app][arm] = np.eye(self.n_features * 2) + + # contexts + self.context = {} + self.other_context = {} + sum = np.zeros(n_features) + for app in self.all_apps: + self.context[app] = [1.0 for _ in range(self.n_features)] + sum += np.array(self.context[app]) + for app in self.all_apps: + self.other_context[app] = list((sum - np.array(self.context[app])) / (len(self.all_apps) - 1)) + + # Hypercube Sampling + if self.sampling_model: + self.sample_config = latin_hypercube_sampling(self.sample_times, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio) + return + + def select_arm(self): + if self.sampling_model: + if self.times == len(self.sample_config): + # sample end + self.times += 1 + self.sampling_model = False + max_sample_config = list(max(self.sample_result, key=self.sample_result.get)) + return max_sample_config + # initial phase + cache_allocation = self.sample_config[self.times] + self.times += 1 + return cache_allocation + + self.times += 1 + if self.duration_period > self.threshold and random.randint(1, 100) < self.probability_threshold: + cache_allocation = [] + for app in self.all_apps: + cache_allocation.append(self.curr_best_config[app]) + return cache_allocation + else: + contexts = {} + for app in self.all_apps: + A = self.A_c[app] + b = self.b_c[app] + contexts[app] = np.hstack((self.context[app], self.other_context[app])) + + for i in range(self.n_arms): + theta = np.linalg.inv(A[i]).dot(b[i]) + cntx = np.array(contexts[app]) + self.p_c_t[app][i] = theta.T.dot(cntx) + self.alpha * np.sqrt( + cntx.dot(np.linalg.inv(A[i]).dot(cntx))) + cache_action = beam_search(self.n_arms - 1, self.all_apps, self.p_c_t, self.times, end_condition=30) + + # cache_action is a dict + cache_allocation = [] + for app in self.all_apps: + cache_allocation.append(cache_action[app]) + + self.alpha *= self.factor_alpha + return cache_allocation + + def update(self, reward, chosen_arm): + if self.sampling_model: + # initial phase + curr_config = tuple(chosen_arm.values()) + self.sample_result[curr_config] = float(reward) + else: + if self.curr_best_config is None: + self.curr_best_config = chosen_arm + self.curr_best_reward = reward + self.duration_period = 1 + # self.history_reward = [] + else: + if reward > self.curr_best_reward: + self.curr_best_reward = reward + self.curr_best_config = chosen_arm + self.duration_period = 1 + # self.history_reward = [] + else: + self.duration_period += 1 + + contexts = {} + for app in self.all_apps: + arm = chosen_arm[app] + contexts[app] = np.hstack((self.context[app], self.other_context[app])) + self.A_c[app][arm] += np.outer(np.array(contexts[app]), np.array(contexts[app])) + self.b_c[app][arm] = np.add(self.b_c[app][arm].T, np.array(contexts[app]) * reward).reshape(self.n_features * 2, 1) + + if self.times > self.load_change_threshold: + if len(self.history_reward) < self.history_reward_window: + self.history_reward.append(float(reward)) + else: + half_window = self.history_reward_window // 2 + first_half = self.history_reward[:half_window] + second_half = self.history_reward[half_window:] + first_aver = sum(first_half) / len(first_half) + second_aver = sum(second_half) / len(second_half) + # print('{} --> {}, {}'.format(first_aver, second_aver, abs(second_aver - first_aver) / first_aver)) + if abs(second_aver - first_aver) / first_aver > 0.20: + # workload change + print('----- test workload change -----') + self.reset() + else: + self.history_reward.pop(0) + + def get_now_reward(self, performance, context_info=None): + # update the context + tmp = [list(row) for row in zip_longest(*context_info, fillvalue=None)] + sum_context = np.zeros(self.n_features) + for i, app in enumerate(self.all_apps): + self.context[app] = tmp[i] + sum_context += np.array(self.context[app]) + for app in self.all_apps: + self.other_context[app] = list((sum_context - np.array(self.context[app])) / (len(self.all_apps) - 1)) + + # calculate the reward for hitrate etc bigger is greater + th_reward = sum(float(x) for x in performance) / len(performance) + return th_reward + + # smaller is greater + # aver_latency = sum(float(x) for x in performance) / len(performance) + # th_reward = 100 / aver_latency + # return th_reward, aver_latency + + + def reset(self): + self.alpha = self.init_factor[0] + self.times = 0 + self.sampling_model = True + self.sample_config = latin_hypercube_sampling(self.sample_times, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio) + self.sample_result = {} + + self.curr_best_config = None + self.curr_best_reward = None + self.duration_period = 0 + + self.history_reward = [] + + for app in self.all_apps: + self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2)) + self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1)) + self.p_c_t[app] = np.zeros(self.n_arms) + for arm in range(self.n_arms): + self.A_c[app][arm] = np.eye(self.n_features * 2) + + def save_to_pickle(self, filename): + with open(filename, 'wb') as f: + pickle.dump(self, f) + + @staticmethod + def load_from_pickle(filename): + with open(filename, 'rb') as f: + return pickle.load(f) + + +if __name__ == '__main__': + mylist = [1, 2, 3, 4, 5, 6] + mylist2 = [1, 2, 3, 4, 5] + print(mylist2[mylist.index(max(mylist))]) + pass \ No newline at end of file diff --git a/LinUCBSchedule/ScheduleFrame.py b/LinUCBSchedule/ScheduleFrame.py new file mode 100644 index 0000000..a2233ca --- /dev/null +++ b/LinUCBSchedule/ScheduleFrame.py @@ -0,0 +1,59 @@ +from abc import ABC, abstractmethod +import numpy as np +# from scipy.special import gamma + +class ScheduleFrame: + def __init__(self): + pass + + @abstractmethod + def select_arm(self): + pass + + @abstractmethod + def update(self, reward, chosen_arm): + pass + + @abstractmethod + def get_now_reward(self, performance, context_info=None): + pass + + +def latin_hypercube_sampling(n, d, m, M): + # Initialize the samples array + samples = np.zeros((n, d)) + + for i in range(n): + # Generate d random numbers and normalize them + while True: + x = np.random.uniform(m, M, d) + if np.sum(x) >= M: + break + x = x / np.sum(x) * M + + # Check if all elements are in the range [m, M] + if np.all(x >= m) and np.all(x <= M): + samples[i] = x + else: + # Re-generate the sample if it doesn't satisfy the constraints + i -= 1 + for i in range(len(samples)): + samples[i] = [int(ele) for ele in samples[i]] + curr_sum = sum(samples[i]) + samples[i][-1] += M - curr_sum + + return samples + + +if __name__ == '__main__': + # Parameters + num_samples = 20 # Number of samples + num_dimensions = 3 # Number of dimensions + min_value = 0 # Minimum value of each element + total_sum = 240 # Total sum of elements in each sample + + # Generate samples + samples = latin_hypercube_sampling(num_samples, num_dimensions, min_value, total_sum) + print(samples) + + pass diff --git a/LinUCBSchedule/analyse/activate_fun.py b/LinUCBSchedule/analyse/activate_fun.py new file mode 100644 index 0000000..0bf87ea --- /dev/null +++ b/LinUCBSchedule/analyse/activate_fun.py @@ -0,0 +1,38 @@ +import numpy as np +import matplotlib.pyplot as plt + +# 生成数据 +x = np.linspace(0, 6000, 1000) + +# Sigmoid 函数 +def sigmoid(x, k=0.002, x0=3000): + return 1 / (1 + np.exp(-k * (x - x0))) + +# Tanh 函数 +def tanh(x, k=0.0005, x0=3000): + return (np.tanh(k * (x - x0)) + 1) / 2 + +# 指数归一化函数 +def exponential_normalize(x, alpha=0.5): + return (x / 6000) ** alpha + +# Arctan 函数 +def arctan(x, k=0.001, x0=3000): + return 0.5 + (1 / np.pi) * np.arctan(k * (x - x0)) + +# 映射到 0-1 +y1 = sigmoid(x) +y2 = tanh(x) +y3 = exponential_normalize(x) +y4 = arctan(x) + +# 绘制图像 +plt.plot(x, y1, label='sigmoid') +plt.plot(x, y2, label='tanh') +plt.plot(x, y3, label='exponential_normalize') +plt.plot(x, y4, label='arctan') +plt.legend() +plt.title("Function Mapping") +plt.xlabel("Original Values") +plt.ylabel("Mapped Values (0-1)") +plt.savefig('../figures/activate_fun.png') diff --git a/LinUCBSchedule/analyse/analyse_config.py b/LinUCBSchedule/analyse/analyse_config.py new file mode 100644 index 0000000..ad6e897 --- /dev/null +++ b/LinUCBSchedule/analyse/analyse_config.py @@ -0,0 +1,88 @@ +import re +import matplotlib.pyplot as plt + +with open('../Logs/linucb.log', 'r', encoding='utf-8') as file: + text = file.read() + +result = [[float(x) for x in match] for match in re.findall(r'reward : ([\d\.]+) \[(\d+), (\d+)\]', text)] + +rewards = [item[0] for item in result] +write_pool_sizes = [item[1] for item in result] +read_pool_sizes = [item[2] for item in result] +# flush_pool_sizes = [item[3] for item in result] + +def putAll(): + plt.figure(figsize=(15, 6)) + + plt.plot(write_pool_sizes, label='WritePool Size', marker='o', markersize=5, alpha=0.5) + plt.plot(read_pool_sizes, label='ReadPool Size', marker='s', markersize=5, alpha=0.8) + # plt.plot(flush_pool_sizes, label='FlushPool Size', marker='^', markersize=5, alpha=0.8) + + plt.title('Pool Sizes Over Time') + plt.xlabel('Time (Epoch)') + plt.ylabel('Pool Size') + + plt.legend() + + plt.grid(True) + + # plt.show() + plt.savefig('../figures/newPoolSize.png', dpi=300) + +def separate(): + # Plotting the data in a 2x2 grid + fig, axs = plt.subplots(2, 2, figsize=(18, 10)) + + # Write Pool size plot + axs[0, 0].plot(write_pool_sizes, marker='o', linestyle='-', color='blue') + axs[0, 0].set_title('Write Pool Size Over Time') + axs[0, 0].set_xlabel('Iteration') + axs[0, 0].set_ylabel('Write Pool Size') + axs[0, 0].grid(True) + + # Read Pool size plot + axs[0, 1].plot(read_pool_sizes, marker='^', linestyle='-', color='green') + axs[0, 1].set_title('Read Pool Size Over Time') + axs[0, 1].set_xlabel('Iteration') + axs[0, 1].set_ylabel('Read Pool Size') + axs[0, 1].grid(True) + + # Flush Pool size plot + # axs[1, 0].plot(flush_pool_sizes, marker='+', linestyle='-', color='red') + # axs[1, 0].set_title('Flush Pool Size Over Time') + # axs[1, 0].set_xlabel('Iteration') + # axs[1, 0].set_ylabel('Flush Pool Size') + # axs[1, 0].grid(True) + + # Reward plot + axs[1, 1].plot(rewards, marker='*', linestyle='-', color='purple') + axs[1, 1].set_title('Reward Over Time') + axs[1, 1].set_xlabel('Iteration') + axs[1, 1].set_ylabel('Reward') + axs[1, 1].grid(True) + + plt.tight_layout() + + plt.savefig('../figures/pool_sizes_separate.png', dpi=300) + +def calculate_greater_than_ratio(pool_sizes, average_size=80): + count_greater= sum(1 for size in pool_sizes if size > average_size) + total_count = len(pool_sizes) + ratio = count_greater / total_count + return ratio + +def calculate_all_ratio(): + average_size = 120 # 单机unet3d专属 + train_point = 20 # 训练开始的节点 + write_pool_ratio = calculate_greater_than_ratio(write_pool_sizes[train_point:], average_size) + read_pool_ratio = calculate_greater_than_ratio(read_pool_sizes[train_point:], average_size) + # flush_pool_ratio = calculate_greater_than_ratio(flush_pool_sizes[train_point:], average_size) + + print(f'WritePool > 120 Ratio: {write_pool_ratio:.2%}') + print(f'ReadPool > 120 Ratio: {read_pool_ratio:.2%}') + # print(f'FlushPool > 120 Ratio: {flush_pool_ratio:.2%}') + +if __name__ == '__main__': + separate() + for i in result: + print(i) \ No newline at end of file diff --git a/LinUCBSchedule/analyse/analyse_data.py b/LinUCBSchedule/analyse/analyse_data.py new file mode 100644 index 0000000..8baeba1 --- /dev/null +++ b/LinUCBSchedule/analyse/analyse_data.py @@ -0,0 +1,39 @@ +import matplotlib.pyplot as plt + +if __name__ == '__main__': + log_name = '/home/renfeng/tqy/JYCache/JYCache_Env/LinUCBSchedule/Logs/linucb.log' + index = [] + reward = [] + count = 1 + # with open(log_name, 'r') as log_file: + # for line in log_file.readlines(): + # # idx, rew = line.split(' ') + # rew = line + # idx = count + # index.append(idx) + # reward.append(rew) + # count += 1 + with open(log_name, 'r') as log_file: + for line in log_file.readlines(): + if 'reward' in line: + idx = count + reward_str = line.split(':')[1].split('[')[0].strip() # 提取 reward 数值部分 + reward.append(float(reward_str)) + index.append(idx) + count += 1 + reward = [float(i) for i in reward] + # best_score = 0.32 + best_score = reward[0] * 1.2 + target = [best_score] * len(index) + init_score = [reward[0]] * len(index) + + plt.plot(index, reward) + plt.plot(index, target) + plt.plot(index, init_score) + # 设置刻度 + plt.xticks(list(range(0, int(index[-1]), int(int(index[-1])/20))), rotation = 45) + plt.yticks([i * 0.2 for i in range(0, int(best_score // 0.2) + 2, 1)]) + plt.title('OLUCB') + # plt.show() + plt.savefig('../figures/OLUCB_JYCache.png') + diff --git a/LinUCBSchedule/config.py b/LinUCBSchedule/config.py new file mode 100644 index 0000000..d416e17 --- /dev/null +++ b/LinUCBSchedule/config.py @@ -0,0 +1,21 @@ +# The number of hypercube samples in the initial stage +SAMPLE_TIMES = 20 + +# Ratio of sample, if ratio is 10 and num_cache is 100, valid solution in sample is 0, 10, 20, 30 ...... 100 +SAMPLE_RATIO = 10 + +# If no better config is found within threshold, +# it is determined that the model has entered a state of approximate convergence +APPROXIMATE_CONVERGENCE_THRESHOLD = 150 + +# In approximate convergence state,There is a certain probability to directly choose the optimal solution +# that has been explored, otherwise continue to explore +PROBABILITY_THRESHOLD = 80 + +# It takes time for rewards to level off, only after this threshold, detection begins for load changes +LOAD_CHANGE_THRESHOLD = 200 + +# The size of history reward sliding window +HISTORY_REWARD_WINDOW = 60 + + diff --git a/LinUCBSchedule/main.py b/LinUCBSchedule/main.py new file mode 100644 index 0000000..0db7da9 --- /dev/null +++ b/LinUCBSchedule/main.py @@ -0,0 +1,221 @@ +import csv +import os +import sys +import time + +import numpy as np +from LinUCB import * +from EpsilonGreedy import * + +curPath = os.path.abspath(os.path.dirname(__file__)) +rootPath = os.path.split(curPath)[0] +sys.path.append(rootPath) +from util import * + +NUM_RESOURCES_CACHE = 60 #total units of cache size + + +def for_epsilon_greedy(): + cm = simulation_config_management() + curr_config = cm.receive_config() + + all_app = curr_config[0] + num_resources = int(np.sum(curr_config[1])) + chosen_arm = {} + curr_allocate = curr_config[1] + for j in range(len(all_app)): + chosen_arm[all_app[j]] = curr_allocate[j] + + # initialize parameters + epsilon = 0.5 + factor_alpha = 0.98 + epochs = 3000 + ucb_cache = EpsilonGreedy(epsilon, factor_alpha, len(all_app), num_resources) + + file_ = open('linucb.log', 'w', newline='') + start_time = time.time() + for i in range(epochs): + performance = curr_config[2] + th_reward = ucb_cache.get_now_reward(performance) + ucb_cache.update(th_reward, chosen_arm) + # choose an arm + chosen_arm = ucb_cache.select_arm() + # prepare new config + new_config = [curr_config[0], chosen_arm] + cm.send_config(new_config) + + # waiting for result + curr_config = cm.receive_config() + + # write to log + log_info = str(i) + ' ' + str(th_reward) + '\n' + file_.write(log_info) + if (i + 1) % 100 == 0: + print('epoch [{} / {}]'.format(i + 1, epochs)) + end_time = time.time() + print('used time :{}'.format(end_time - start_time)) + file_.close() + + +def for_linucb(): + cm = simulation_config_management() + curr_config = cm.receive_config() + + all_app = curr_config[0] + num_resources = int(np.sum(curr_config[1])) + + alpha = 0.90 + factor_alpha = 0.98 + n_features = 10 + epochs = 1000 + ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, n_features) + + file_ = open('linucb.log', 'w', newline='') + start_time = time.time() + for i in range(epochs): + curr_allocate = curr_config[1] + performance = curr_config[2] + context_info = curr_config[3:] + chosen_arm = {} + for j in range(len(all_app)): + chosen_arm[all_app[j]] = curr_allocate[j] + + th_reward = ucb_cache.get_now_reward(performance, context_info) + ucb_cache.update(th_reward, chosen_arm) + + new_arm = ucb_cache.select_arm() + # prepare new config + new_config = [curr_config[0], new_arm] + cm.send_config(new_config) + + # waiting for result + curr_config = cm.receive_config() + # write to log + log_info = str(i) + ' ' + str(th_reward) + '\n' + file_.write(log_info) + if (i + 1) % 100 == 0: + print('epoch [{} / {}]'.format(i + 1, epochs)) + + + end_time = time.time() + print('used time :{}'.format(end_time - start_time)) + file_.close() + + +def for_linucb_sample(): + cm = simulation_config_management() + curr_config = cm.receive_config() + + all_app = curr_config[0] + num_resources = int(np.sum(curr_config[1])) + + alpha = 0.95 + factor_alpha = 0.98 + n_features = 10 + epochs = 3000 + ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, n_features, True) + + file_ = open('linucb.log', 'w', newline='') + start_time = time.time() + for i in range(epochs): + new_arm = ucb_cache.select_arm() + # prepare new config + new_config = [curr_config[0], new_arm] + cm.send_config(new_config) + + # waiting for result + curr_config = cm.receive_config() + + curr_allocate = curr_config[1] + performance = curr_config[2] + context_info = curr_config[3:] + chosen_arm = {} + for j in range(len(all_app)): + chosen_arm[all_app[j]] = curr_allocate[j] + + th_reward = ucb_cache.get_now_reward(performance, context_info) + ucb_cache.update(th_reward, chosen_arm) + + # write to log + log_info = str(i) + ' ' + str(th_reward) + '\n' + file_.write(log_info) + if (i + 1) % 100 == 0: + print('epoch [{} / {}]'.format(i + 1, epochs)) + + end_time = time.time() + print('used time :{}'.format(end_time - start_time)) + file_.close() + + +if __name__ == '__main__': + # for_epsilon_greedy() + for_linucb_sample() + # cm = simulation_config_management() + # curr_config = cm.receive_config() + # + # num_apps = len(curr_config[0]) + # num_resources = [NUM_RESOURCES_CACHE] + # all_cache_config = gen_all_config(num_apps, num_resources[0]) + # num_config = len(all_cache_config) + # + # epochs = 1000 + # file_ = open('linucb.log', 'w', newline='') + # + # app_name = curr_config[0] + # cache_config = curr_config[1] + # cache_hit = curr_config[2] + # cache_reward = curr_config[3] + # init_cache_index = all_cache_config.index(cache_config) + # + # # context infomation and reward + # context, th_reward = get_now_reward(cache_hit) + # + # # initialize parameters + # alpha = 0.999 + # factor_alpha = 0.997 + # + # n_features_cache = len(context) + # context_cache = np.array(context) + # + # num_arm = num_config + # lin_ucb_cache = LinUCB(alpha, num_arm, n_features_cache) + # # lin_ucb_cache = EpsilonGreedy(alpha, num_arm, n_features_cache) + # + # lin_ucb_cache.update(init_cache_index, th_reward, context_cache) + # start_time = time.time() + # for i in range(epochs): + # context, th_reward = get_now_reward(cache_hit) + # # print('----- reward is: ' + str(th_reward)) + # context_cache = np.array(context) + # # choose a arm + # chosen_arm_cache = lin_ucb_cache.select_arm(context_cache, factor_alpha) + # cache_index = chosen_arm_cache + # # prepare new config + # new_config = [] + # new_config.append(curr_config[0]) + # new_partition = all_cache_config[cache_index] + # new_config.append(new_partition) + # + # # print('new config:' + str(new_config)) + # cm.send_config(new_config) + # + # # waiting for result + # curr_config = cm.receive_config() + # # print('recv_config is: ' + str(curr_config)) + # app_name = curr_config[0] + # cache_config = curr_config[1] + # cache_hit = curr_config[2] + # cache_reward = curr_config[3] + # + # context, th_reward = get_now_reward(cache_hit) + # context_cache = np.array(context) + # lin_ucb_cache.update(chosen_arm_cache, th_reward, context_cache) + # # write to log + # log_info = str(i) + ' ' + str(th_reward) + '\n' + # file_.write(log_info) + # if (i + 1) % 100 == 0: + # print('epoch [{} / {}]'.format(i + 1, epochs)) + # end_time = time.time() + # print('used time :{}'.format(end_time-start_time)) + # file_.close() + diff --git a/LinUCBSchedule/server.py b/LinUCBSchedule/server.py new file mode 100644 index 0000000..5b45bd1 --- /dev/null +++ b/LinUCBSchedule/server.py @@ -0,0 +1,101 @@ +import csv +import os +import sys +import threading +import socket +import time +import numpy as np +from LinUCB import * + +curPath = os.path.abspath(os.path.dirname(__file__)) +rootPath = os.path.split(curPath)[0] +sys.path.append(rootPath) +from server_util import * + +NUM_RESOURCES_CACHE = 352 #total units of cache size 176*2 + +# initialize LinUCB parameters +alpha = 0.95 +factor_alpha = 0.98 +num_features = 1 # 缓冲池特征维度 +# all_app = ['WritePool', 'ReadPool', 'FlushPool'] +all_app = ['WritePool', 'ReadPool'] + +# 收到客户端发送的配置信息后,使用LinUCB,生成最新的配置信息,再发送给客户端 +def handle_client(client_socket, ucb_cache, file_writer): + try: + # 从curve接收 pool_name, pool_size, pool_throughput, write read 函数的调用次数 + request = client_socket.recv(1024).decode("utf-8") + print(f"Received: {request}") + pool_and_size, pool_and_throughput, pool_and_invoke = process_request(request) + pool_name = list(pool_and_size.keys()) + pool_size = list(pool_and_size.values()) + pool_throughput = list(pool_and_throughput.values()) + context_info = [list(pool_and_invoke.values())] + + if(np.sum(pool_throughput) > 0): + start_time = time.time() + chosen_arm = {} + for j in range(len(all_app)): + chosen_arm[all_app[j]] = pool_size[j] + # reward + th_reward = ucb_cache.get_now_reward(pool_throughput, context_info) + + ucb_cache.update(th_reward, chosen_arm) + + new_size = ucb_cache.select_arm() + end_time = time.time() + + # write to log + start_time_log = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) + end_time_log = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) + file_writer.write('start time : ' + start_time_log + '\n') + run_time = end_time - start_time + file_writer.write(f'run time : {run_time:.2f} s\n') + file_writer.write('end time : ' + end_time_log + '\n') + + log_info = 'reward : ' + str(th_reward) + ' ' + str(new_size) + '\n' + file_writer.write(log_info) + else: + new_size = pool_size + + # prepare new config + new_config = [pool_name, new_size] + print('new config:' + str(new_config)) + + # server发送最新的配置信息给curve + serialized_data = '\n'.join([' '.join(map(str, row)) for row in new_config]) + client_socket.send(serialized_data.encode()) + print("server发送最新配置给client:") + print(serialized_data.encode()) + # except Exception as e: + # print(f"An error occurred: {e}") + finally: + client_socket.close() + +# 启动server以监听curve发送的配置信息 +def start_server(): + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.bind(("0.0.0.0", 2333)) + # 允许同时最多5个客户端排队等待连接 + server.listen(5) + print("Server listening on port 2333") + + # 初始化LinUCB + file_ = open('Logs/linucb.log', 'w', newline='') + num_resources = NUM_RESOURCES_CACHE + # 开启采样 + ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, num_features, True) + + while True: + client_socket, addr = server.accept() + print(f"Accepted connection from {addr}") + client_handler = threading.Thread( + target=handle_client, args=(client_socket, ucb_cache, file_) + ) + client_handler.start() + + file_.close() + +if __name__ == '__main__': + start_server() \ No newline at end of file diff --git a/LinUCBSchedule/server_util.py b/LinUCBSchedule/server_util.py new file mode 100644 index 0000000..4b1eed1 --- /dev/null +++ b/LinUCBSchedule/server_util.py @@ -0,0 +1,24 @@ +import numpy as np + +# unet3d x0 设置为 3000,对于 resnet50 x0 设置为 30000 +def sigmoid(x, k=0.002, x0=30000): + return 1 / (1 + np.exp(-k * (x - x0))) +# 接收curve发送的request,解析为pool_and_size, pool_and_throughput +def process_request(request): + pairs = request.split(';')[:-1] + pool_and_size = {} + pool_and_throughput = {} + pool_and_invoke = {} + pair1 = pairs[:2] + pair2 = pairs[2:4] + pair3 = pairs[4:] + for pair in pair1: + key,value = pair.split(':') + pool_and_size[key] = int(value) + for pair in pair2: + key,value = pair.split(':') + pool_and_throughput[key] = float(value) + for pair in pair3: + key,value = pair.split(':') + pool_and_invoke[key] = sigmoid(float(value)) + return pool_and_size, pool_and_throughput, pool_and_invoke \ No newline at end of file diff --git a/LinUCBSchedule/util.py b/LinUCBSchedule/util.py new file mode 100644 index 0000000..2c2d8e2 --- /dev/null +++ b/LinUCBSchedule/util.py @@ -0,0 +1,248 @@ +import socket +import pickle +import time + + +host = '127.0.0.1' +port = 54000 + + +def get_last_line(file_name): + """ + Open file and read the last line + + Args: + file_name (str): file name + + Returns: + str: last line of the file, '' if the file not found + """ + try: + with open(file_name, 'rb') as file: + file.seek(-2,2) + while file.read(1)!=b'\n': + file.seek(-2, 1) + return file.readline().decode().strip() + except FileNotFoundError: + return '' + + +def get_pool_stats(): + """ + Communicating with cache server through socket, get current cache pool name and cache allocation + + Args: + + Returns: + mappoolsize>: all pool in the cache and their pool size + """ + # link the target program + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + + message = "G:" + # print(message) + sock.sendall(message.encode()) + + # wait the response + response = sock.recv(1024).decode() + # print(response) + sock.close() + deserialized_map = {} + pairs = response.split(';')[:-1] + for pair in pairs: + key,value = pair.split(':') + deserialized_map[key] = int(value) + + return deserialized_map + + +def set_cache_size(workloads, cache_size): + """ + Communicating with cache server through socket, adjust pool size to the new size + + Args: + workloads (list): pool name of all workloads + cache_size (list): new size of each pool + + Returns: + """ + # link the server program + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + + curr_config = [workloads, cache_size] + serialized_data = '\n'.join([' '.join(map(str, row)) for row in curr_config]) + serialized_data = 'S:' + serialized_data + # print(serialized_data) + + # send to server + sock.sendall(serialized_data.encode()) + sock.close() + + +def receive_config(): + """ + Old version, Wait to receive the current resource config + + Args: + + Returns: + list: [ + [name of pool], + [allocation of resource A of every pool], # such as cache size,[16,16] + [context of resource A of every pool ], # for cache size ,context canbe hit_rate,[0.8254, 0.7563] + [latency of every warkload] + ] + """ + # create the server socket + # print("receive config") + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server_socket.bind(('127.0.0.1', 1412)) + server_socket.listen(1) + + # listening the old_config from the executor + client_socket, _ = server_socket.accept() + received_data = client_socket.recv(1024) + config = pickle.loads(received_data) + + client_socket.close() + server_socket.close() + return config + + +def send_config(new_config): + """ + Old version, Send the new config + + Args: + list: [ + [name of pool], + [new allocation of resource A of every pool] + ] + + Returns: + """ + serialized_config = pickle.dumps(new_config) + + # connect to the bench + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket.connect(('127.0.0.1', 1413)) + client_socket.send(serialized_config) + client_socket.close() + print("send_config success!") + return + + +class config_management: + def __init__(self) -> None: + pass + + def receive_config(self): + curr_config = [] + # pool name and cache allocation + pool_and_size = get_pool_stats() + pool_name = list(pool_and_size.keys()) + pool_size = list(pool_and_size.values()) + curr_config.append(pool_name) + curr_config.append(pool_size) + #TODO: cache hit rate + hitrate_log = ['/home/md/SHMCachelib/bin/' + name + '_hitrate.log' for name in pool_name] + hitrates = [] + for log in hitrate_log: + hitrates.append(get_last_line(log)) + curr_config.append(hitrates) + #TODO: tail latency + latency_log = ['/home/md/SHMCachelib/bin/' + name + '_tailLatency.log' for name in pool_name] + latencies = [] + for log in latency_log: + latencies.append(get_last_line(log)) + curr_config.append(latencies) + + return curr_config + + def send_config(self, new_config): + set_cache_size(new_config[0], new_config[1]) + + +def userA_func(resources, threshold): + if threshold < 0: + return 0 + if resources < threshold: + return resources * 0.095 + else: + return threshold * 0.095 + (resources - threshold) * 0.010 + + +def userB_func(resources, threshold): + if threshold < 0: + return 0 + if resources < threshold: + return resources * 0.040 + else: + return threshold * 0.040 + (resources - threshold) * 0.005 + + +def uniform(resources): + return min(resources, 240) * 0.003 + + +def sequential(resources): + return 0 + + +def hotspot(resources): + return min(resources, 60) * 0.007 + min(max(0, resources - 60), 180) * 0.002 + + +class userModel: + def __init__(self, name, resources=0, user_func=None): + self.name = name + self.resources = resources + self.user_func = user_func + + +class simulation_config_management: + def __init__(self): + self.total_resource = 240 + self.all_user = [ + userModel('A', 0, sequential), + userModel('B', 0, uniform), + userModel('C', 0, hotspot), + ] + for u in self.all_user: + u.resources = self.total_resource // len(self.all_user) + + self.n_features = 10 + self.workload_change = 1000 + self.counter = 0 + + def receive_config(self): + curr_config = [] + + curr_config.append([u.name for u in self.all_user]) + curr_config.append([u.resources for u in self.all_user]) + curr_config.append([u.user_func(u.resources) for u in self.all_user]) + # context info + for i in range(self.n_features): + curr_config.append([1.0] * len(self.all_user)) + + self.counter = self.counter + 1 + if self.workload_change > 0 and self.counter % self.workload_change == 0: + print('---------------------------workload change -----------------------------') + self.all_user[0].user_func, self.all_user[1].user_func = self.all_user[1].user_func, self.all_user[0].user_func + self.all_user[1].user_func, self.all_user[2].user_func = self.all_user[2].user_func, self.all_user[1].user_func + return curr_config + + def send_config(self, new_config): + for i in range(len(self.all_user)): + self.all_user[i].resources = new_config[1][i] + + +if __name__ == '__main__': + time.sleep(10) + cs = config_management() + curr = cs.receive_config() + for item in curr: + print(item)