import numpy as np
import time
import random
from multiprocessing import Process, Manager
from ScheduleFrame import *
def gen_all_config(num_apps, num_resources):
generate all resource config according to the number of apps and total resources
num_apps (int): number of apps
num_resources (int): total units of resources
list<list>: a list containing all possible config, which is list<int>
if num_apps == 1:
# Only one app, it gets all remaining resources
return [[num_resources]]
all_config = []
for i in range(num_resources + 1):
# Recursively allocate the remaining resources among the remaining app
for sub_allocation in gen_all_config(num_apps - 1, num_resources - i):
all_config.append([i] + sub_allocation)
return all_config
def find_vectors(x, d_max):
n = len(x)
s = sum(x)
solutions = []
def backtrack(current_vector, index, current_distance):
if current_distance > d_max or any(val < 0 for val in current_vector):
if index == n:
if sum(current_vector) == s:
for delta in range(-d_max, d_max + 1):
current_vector[index] += delta
new_distance = current_distance + abs(delta)
backtrack(current_vector, index + 1, new_distance)
current_vector[index] -= delta
initial_vector = x.copy()
backtrack(initial_vector, 0, 0)
return solutions
class EpsilonGreedy(ScheduleFrame):
def __init__(self, epsilon, factor_alpha, num_apps, num_resources):
self.init_factor = [epsilon, factor_alpha, 0.95, 0.98]
self.epsilon = self.init_factor[0]
self.factor_alpha = self.init_factor[1]
self.all_cache_config = gen_all_config(num_apps, num_resources)
self.n_arms = len(self.all_cache_config)
self.last_arm_index = -1
# 探索过程随机探索与邻近探索的比例
self.sub_epsilon = self.init_factor[2]
self.sub_alpha = self.init_factor[3]
# 邻近探索的曼哈顿距离
self.dmax = 10
self.counts = np.zeros(self.n_arms)
self.values = np.zeros(self.n_arms)
self.neighbour = {}
self.last_reward = [-1] * self.n_arms
# 并行加速搜索临近配置的过程
st_time = time.time()
# for i in range(self.n_arms):
# self.neighbour[i] = find_vectors(self.all_cache_config[i], self.dmax)
all_threads = []
parts_neighbour = []
num_tasks = self.n_arms // 8
with Manager() as manager:
for i in range(0, self.n_arms, num_tasks):
all_threads.append(Process(target=self.find_neighbours, args=(i, min(i + num_tasks, self.n_arms), parts_neighbour[-1])))
for i in range(len(all_threads)):
for i in range(len(parts_neighbour)):
en_time = time.time()
print('construct neighbour :{}'.format(en_time - st_time))
def select_arm(self):
if np.random.rand() < self.epsilon:
# 探索
self.epsilon *= self.factor_alpha
if np.random.rand() < self.sub_epsilon:
# 随机探索
self.sub_epsilon *=self.sub_alpha
chosen_arm = np.random.randint(self.n_arms)
# 最优点临近探索
curr_max = np.argmax(self.values)
num_neighbours = len(self.neighbour[int(curr_max)])
chosen = random.randint(0, num_neighbours - 1)
chosen_arm = self.all_cache_config.index(self.neighbour[int(curr_max)][chosen])
# 利用
chosen_arm = np.argmax(self.values)
self.last_arm_index = chosen_arm
return self.all_cache_config[chosen_arm]
def update(self, reward, chosen_arm):
if self.last_arm_index == -1:
assert chosen_arm is not None, "Need Initial Arm Index"
chosen_arm_index = self.all_cache_config.index(list(chosen_arm.values()))
chosen_arm_index = self.last_arm_index
last_reward = self.last_reward[chosen_arm_index]
if last_reward > 0 and float(abs(reward - last_reward)) / last_reward > 0.05:
# workload change
print('-----judge workload change, {}'.format(float(abs(reward - last_reward)) / last_reward))
self.last_reward = [-1] * self.n_arms
self.counts = np.zeros(self.n_arms)
self.values = np.zeros(self.n_arms)
self.epsilon = self.init_factor[0]
self.sub_epsilon = self.init_factor[2]
self.counts[chosen_arm_index] += 1
n = self.counts[chosen_arm_index]
value = self.values[chosen_arm_index]
new_value = ((n - 1) / n) * value + (1 / n) * reward
self.values[chosen_arm_index] = new_value
self.last_reward[chosen_arm_index] = reward
def find_neighbours(self, start, end, part_neighbour):
# 并行执行
# start_time = time.time()
for i in range(start, end):
part_neighbour[i] = find_vectors(self.all_cache_config[i], self.dmax)
# end_time = time.time()
# print('----- find_neighbour, from {} to {}, Used Time: {}'.format(start, end, end_time - start_time))
def get_now_reward(self, performance, context_info=None):
th_reward = sum(float(x) for x in performance) / len(performance)
return th_reward

import random
import numpy as np
from itertools import zip_longest
from ScheduleFrame import *
def gen_feasible_configs(num_of_cache, cache_top_k):
# TODO根据各个应用的top_k整理出的所有可行的结果
num_app = len(cache_top_k)
top_k = len(cache_top_k[0])
def gen_side(tmp, k, n=1):
:param root: root node, first app node
:param n: n_th in top_k
if n == num_app:
return [[]]
for k in range(top_k):
t1 = k * top_k ** (num_app - n - 1)
t2 = top_k ** (num_app - 1) - (top_k - k - 1) * top_k ** (num_app - n - 1)
delta2 = top_k ** (num_app - 1 - n)
delta1 = top_k ** (num_app - n)
for i in range(t1, t2, delta1):
for j in range(i, i + delta2):
app_core = cache_top_k[n][k]
feasible_core = num_of_cache - sum(tmp[j])
if app_core > feasible_core:
gen_side(n=n + 1, tmp=tmp, k=k)
return tmp
all_feasible_config = []
for i in range(len(cache_top_k)):
for j in range(top_k):
tmp = [[cache_top_k[0][j]] for _ in range(top_k ** (num_app - 1))]
res = gen_side(tmp, k=0)
for k in range(i + 1):
res = [[row[-1]] + row[:-1] for row in res]
# 先去重
unique_tuples = set(tuple(x) for x in all_feasible_config)
all_feasible_config = [list(x) for x in unique_tuples]
# 对未利用的资源重新分配
for config in all_feasible_config:
assert sum(config) <= num_of_cache, 'The allocated cache exceeds the limit'
if sum(config) < num_of_cache:
left_cache = num_of_cache - sum(config)
for i in range(left_cache):
min_value = min(config)
min_index = config.index(min_value)
config[min_index] += 1
return all_feasible_config
def get_top_k(arr, k, times):
if times < 5 or random.randint(1, 10) > 8:
arr_top_k_id = [random.randint(0, len(arr) - 1) for _ in range(k)]
arr_top_k_id = np.argsort(arr)[-k:]
return list(arr_top_k_id)
def beam_search(num_of_cache, all_apps, p_c_t, times, end_condition=30):
# TODO从各个子bandit中选出top_k的配置并进行组合形成全局的配置
action = {}.fromkeys(all_apps)
num_app = len(all_apps)
top_k = int(10 ** (np.log10(end_condition) / num_app))
cache_top_k = [get_top_k(p_c_t[all_apps[i]], top_k, times) for i in range(num_app)]
feasible_configs = gen_feasible_configs(num_of_cache=num_of_cache, cache_top_k=cache_top_k)
sum_p_list = []
for config in feasible_configs:
config_p = [p_c_t[all_apps[i]][config[i]] for i in range(num_app)]
sum_p = sum(config_p)
except IndexError as e:
print(f"Caught an IndexError: {e}")
cache_act = feasible_configs[np.argmax(sum_p_list)]
for i in range(num_app):
action[all_apps[i]] = cache_act[i]
return action
def latin_hypercube_sampling(n, d, m, M, ratio):
# Initialize the samples array
samples = np.zeros((n, d))
for i in range(n):
# Generate d random numbers and normalize them
while True:
x = np.random.uniform(m, M, d)
if np.sum(x) >= M:
x = x / np.sum(x) * M
# Check if all elements are in the range [m, M]
if np.all(x >= m) and np.all(x <= M):
samples[i] = x
# Re-generate the sample if it doesn't satisfy the constraints
i -= 1
sample_config = []
for i in range(len(samples)):
tmp = [int(ele) for ele in samples[i]]
curr_sum = sum(tmp)
tmp[-1] += M - curr_sum
sample_config.append([ele * ratio for ele in tmp])
return sample_config
class LayerUCB(ScheduleFrame):
def __init__(self, all_apps, n_cache, alpha, factor_alpha, n_features):
self.all_apps = all_apps
self.num_apps = len(all_apps)
self.arm_dict = {}
for app in self.all_apps:
self.arm_dict[app] = list(range(0, n_cache, 5))
self.n_features = n_features
self.A_c = {}
self.b_c = {}
self.p_c_t = {}
for app in self.all_apps:
self.A_c[app] = np.zeros((len(self.arm_dict[app]), self.n_features * 2, self.n_features * 2))
self.b_c[app] = np.zeros((len(self.arm_dict[app]), self.n_features * 2, 1))
self.p_c_t[app] = np.zeros(len(self.arm_dict[app]))
for arm in range(len(self.arm_dict[app])):
self.A_c[app][arm] = np.eye(self.n_features * 2)
# contexts
self.context = {}
self.other_context = {}
sum = np.zeros(n_features)
for app in self.all_apps:
self.context[app] = [1.0 for _ in range(self.n_features)]
sum += np.array(self.context[app])
for app in self.all_apps:
self.other_context[app] = list((sum - np.array(self.context[app])) / (len(self.all_apps) - 1))
def select_arm(self):
if self.sampling_model:
if self.times == len(self.sample_config):
# sample end
self.times += 1
self.sampling_model = False
# print(max(self.sample_result))
return self.sample_config[self.sample_result.index(max(self.sample_result))]
# initial phase
cache_allocation = self.sample_config[self.times]
self.times += 1
return cache_allocation
self.times += 1
if self.duration_period > self.threshold and random.randint(1, 10) < 8:
cache_allocation = []
for app in self.all_apps:
return cache_allocation
contexts = {}
for app in self.all_apps:
A = self.A_c[app]
b = self.b_c[app]
contexts[app] = np.hstack((self.context[app], self.other_context[app]))
for i in range(self.n_arms):
theta = np.linalg.inv(A[i]).dot(b[i])
cntx = np.array(contexts[app])
self.p_c_t[app][i] = + self.alpha * np.sqrt([i]).dot(cntx)))
cache_action = beam_search(self.n_arms - 1, self.all_apps, self.p_c_t, self.times, end_condition=30)
# cache_action is a dict
cache_allocation = []
for app in self.all_apps:
self.alpha *= self.factor_alpha
return cache_allocation
def update(self, reward, chosen_arm):
if self.sampling_model:
# initial phase
if self.curr_best_config is None:
self.curr_best_config = chosen_arm
self.curr_best_reward = reward
self.duration_period = 1
# self.history_reward = []
if reward > self.curr_best_reward:
self.curr_best_reward = reward
self.curr_best_config = chosen_arm
self.duration_period = 1
# self.history_reward = []
self.duration_period += 1
contexts = {}
for app in self.all_apps:
arm = chosen_arm[app]
contexts[app] = np.hstack((self.context[app], self.other_context[app]))
self.A_c[app][arm] += np.outer(np.array(contexts[app]), np.array(contexts[app]))
self.b_c[app][arm] = np.add(self.b_c[app][arm].T, np.array(contexts[app]) * reward).reshape(self.n_features * 2, 1)
if self.times > 200:
if len(self.history_reward) < 60:
self.history_reward.append(round(float(reward), 3))
first_half = self.history_reward[:30]
second_half = self.history_reward[30:]
first_aver = sum(first_half) / len(first_half)
second_aver = sum(second_half) / len(second_half)
# print('{} --> {}, {}'.format(first_aver, second_aver, abs(second_aver - first_aver) / first_aver))
if abs(second_aver - first_aver) / first_aver > 0.20:
# workload change
print('----- test workload change -----')
self.history_reward = second_half
def get_now_reward(self, performance, context_info=None):
# update the context
tmp = [list(row) for row in zip_longest(*context_info, fillvalue=None)]
sum_context = np.zeros(self.n_features)
for i, app in enumerate(self.all_apps):
self.context[app] = tmp[i]
sum_context += np.array(self.context[app])
for app in self.all_apps:
self.other_context[app] = list((sum_context - np.array(self.context[app])) / (len(self.all_apps) - 1))
# calculate the reward
th_reward = sum(float(x) for x in performance) / len(performance)
return th_reward
def reset(self):
self.alpha = self.init_factor[0]
self.times = 0
self.sampling_model = True
self.sample_config = latin_hypercube_sampling(20, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio)
self.sample_result = []
self.curr_best_config = None
self.curr_best_reward = None
self.duration_period = 0
self.history_reward = []
for app in self.all_apps:
self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2))
self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1))
self.p_c_t[app] = np.zeros(self.n_arms)
for arm in range(self.n_arms):
self.A_c[app][arm] = np.eye(self.n_features * 2)
if __name__ == '__main__':

import pickle
import random
import config
from itertools import zip_longest
from ScheduleFrame import *
def gen_feasible_configs(num_of_cache, cache_top_k):
# TODO根据各个应用的top_k整理出的所有可行的结果
num_app = len(cache_top_k)
top_k = len(cache_top_k[0])
def gen_side(tmp, k, n=1):
:param root: root node, first app node
:param n: n_th in top_k
if n == num_app:
return [[]]
for k in range(top_k):
t1 = k * top_k ** (num_app - n - 1)
t2 = top_k ** (num_app - 1) - (top_k - k - 1) * top_k ** (num_app - n - 1)
delta2 = top_k ** (num_app - 1 - n)
delta1 = top_k ** (num_app - n)
for i in range(t1, t2, delta1):
for j in range(i, i + delta2):
app_core = cache_top_k[n][k]
feasible_core = num_of_cache - sum(tmp[j])
if app_core > feasible_core:
gen_side(n=n + 1, tmp=tmp, k=k)
return tmp
all_feasible_config = []
for i in range(len(cache_top_k)):
for j in range(top_k):
tmp = [[cache_top_k[0][j]] for _ in range(top_k ** (num_app - 1))]
res = gen_side(tmp, k=0)
for k in range(i + 1):
res = [[row[-1]] + row[:-1] for row in res]
# 先去重
unique_tuples = set(tuple(x) for x in all_feasible_config)
all_feasible_config = [list(x) for x in unique_tuples]
# 对未利用的资源重新分配
for config in all_feasible_config:
assert sum(config) <= num_of_cache, 'The allocated cache exceeds the limit'
if sum(config) < num_of_cache:
left_cache = num_of_cache - sum(config)
for i in range(left_cache):
min_value = min(config)
min_index = config.index(min_value)
config[min_index] += 1
return all_feasible_config
def get_top_k(arr, k, times):
if times < 5 or random.randint(1, 10) > 8:
arr_top_k_id = [random.randint(0, len(arr) - 1) for _ in range(k)]
arr_top_k_id = np.argsort(arr)[-k:]
return list(arr_top_k_id)
def beam_search(num_of_cache, all_apps, p_c_t, times, end_condition=30):
# TODO从各个子bandit中选出top_k的配置并进行组合形成全局的配置
action = {}.fromkeys(all_apps)
num_app = len(all_apps)
top_k = int(10 ** (np.log10(end_condition) / num_app))
cache_top_k = [get_top_k(p_c_t[all_apps[i]], top_k, times) for i in range(num_app)]
feasible_configs = gen_feasible_configs(num_of_cache=num_of_cache, cache_top_k=cache_top_k)
sum_p_list = []
for config in feasible_configs:
config_p = [p_c_t[all_apps[i]][config[i]] for i in range(num_app)]
sum_p = sum(config_p)
except IndexError as e:
print(f"Caught an IndexError: {e}")
cache_act = feasible_configs[np.argmax(sum_p_list)]
for i in range(num_app):
action[all_apps[i]] = cache_act[i]
return action
def latin_hypercube_sampling(n, d, m, M, ratio):
# Initialize the samples array
samples = np.zeros((n, d))
for i in range(n):
# Generate d random numbers and normalize them
while True:
x = np.random.uniform(m, M, d)
if np.sum(x) >= M:
x = x / np.sum(x) * M
# Check if all elements are in the range [m, M]
if np.all(x >= m) and np.all(x <= M):
samples[i] = x
# Re-generate the sample if it doesn't satisfy the constraints
i -= 1
sample_config = []
for i in range(len(samples)):
tmp = [int(ele) for ele in samples[i]]
curr_sum = sum(tmp)
tmp[-1] += M - curr_sum
sample_config.append([ele * ratio for ele in tmp])
return sample_config
class LinUCB(ScheduleFrame):
def __init__(self, all_apps, n_cache, alpha, factor_alpha, n_features, sample=False):
self.all_apps = all_apps
self.num_apps = len(all_apps)
self.n_arms = n_cache + 1
self.n_features = n_features
self.init_factor = [alpha, factor_alpha]
self.alpha = self.init_factor[0]
self.factor_alpha = self.init_factor[1]
self.times = 0
self.sampling_model = sample
self.sample_result = {}
self.sample_config = None
self.sample_times = config.SAMPLE_TIMES
self.ratio = config.SAMPLE_RATIO
self.curr_best_config = None
self.curr_best_reward = None
self.duration_period = 0
self.probability_threshold = config.PROBABILITY_THRESHOLD
self.load_change_threshold = config.LOAD_CHANGE_THRESHOLD
self.history_reward_window = config.HISTORY_REWARD_WINDOW
self.history_reward = []
self.A_c = {}
self.b_c = {}
self.p_c_t = {}
for app in self.all_apps:
self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2))
self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1))
self.p_c_t[app] = np.zeros(self.n_arms)
for arm in range(self.n_arms):
self.A_c[app][arm] = np.eye(self.n_features * 2)
# contexts
self.context = {}
self.other_context = {}
sum = np.zeros(n_features)
for app in self.all_apps:
self.context[app] = [1.0 for _ in range(self.n_features)]
sum += np.array(self.context[app])
for app in self.all_apps:
self.other_context[app] = list((sum - np.array(self.context[app])) / (len(self.all_apps) - 1))
# Hypercube Sampling
if self.sampling_model:
self.sample_config = latin_hypercube_sampling(self.sample_times, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio)
def select_arm(self):
if self.sampling_model:
if self.times == len(self.sample_config):
# sample end
self.times += 1
self.sampling_model = False
max_sample_config = list(max(self.sample_result, key=self.sample_result.get))
return max_sample_config
# initial phase
cache_allocation = self.sample_config[self.times]
self.times += 1
return cache_allocation
self.times += 1
if self.duration_period > self.threshold and random.randint(1, 100) < self.probability_threshold:
cache_allocation = []
for app in self.all_apps:
return cache_allocation
contexts = {}
for app in self.all_apps:
A = self.A_c[app]
b = self.b_c[app]
contexts[app] = np.hstack((self.context[app], self.other_context[app]))
for i in range(self.n_arms):
theta = np.linalg.inv(A[i]).dot(b[i])
cntx = np.array(contexts[app])
self.p_c_t[app][i] = + self.alpha * np.sqrt([i]).dot(cntx)))
cache_action = beam_search(self.n_arms - 1, self.all_apps, self.p_c_t, self.times, end_condition=30)
# cache_action is a dict
cache_allocation = []
for app in self.all_apps:
self.alpha *= self.factor_alpha
return cache_allocation
def update(self, reward, chosen_arm):
if self.sampling_model:
# initial phase
curr_config = tuple(chosen_arm.values())
self.sample_result[curr_config] = float(reward)
if self.curr_best_config is None:
self.curr_best_config = chosen_arm
self.curr_best_reward = reward
self.duration_period = 1
# self.history_reward = []
if reward > self.curr_best_reward:
self.curr_best_reward = reward
self.curr_best_config = chosen_arm
self.duration_period = 1
# self.history_reward = []
self.duration_period += 1
contexts = {}
for app in self.all_apps:
arm = chosen_arm[app]
contexts[app] = np.hstack((self.context[app], self.other_context[app]))
self.A_c[app][arm] += np.outer(np.array(contexts[app]), np.array(contexts[app]))
self.b_c[app][arm] = np.add(self.b_c[app][arm].T, np.array(contexts[app]) * reward).reshape(self.n_features * 2, 1)
if self.times > self.load_change_threshold:
if len(self.history_reward) < self.history_reward_window:
half_window = self.history_reward_window // 2
first_half = self.history_reward[:half_window]
second_half = self.history_reward[half_window:]
first_aver = sum(first_half) / len(first_half)
second_aver = sum(second_half) / len(second_half)
# print('{} --> {}, {}'.format(first_aver, second_aver, abs(second_aver - first_aver) / first_aver))
if abs(second_aver - first_aver) / first_aver > 0.20:
# workload change
print('----- test workload change -----')
def get_now_reward(self, performance, context_info=None):
# update the context
tmp = [list(row) for row in zip_longest(*context_info, fillvalue=None)]
sum_context = np.zeros(self.n_features)
for i, app in enumerate(self.all_apps):
self.context[app] = tmp[i]
sum_context += np.array(self.context[app])
for app in self.all_apps:
self.other_context[app] = list((sum_context - np.array(self.context[app])) / (len(self.all_apps) - 1))
# calculate the reward for hitrate etc bigger is greater
th_reward = sum(float(x) for x in performance) / len(performance)
return th_reward
# smaller is greater
# aver_latency = sum(float(x) for x in performance) / len(performance)
# th_reward = 100 / aver_latency
# return th_reward, aver_latency
def reset(self):
self.alpha = self.init_factor[0]
self.times = 0
self.sampling_model = True
self.sample_config = latin_hypercube_sampling(self.sample_times, self.num_apps, 0, (self.n_arms - 1)//self.ratio, self.ratio)
self.sample_result = {}
self.curr_best_config = None
self.curr_best_reward = None
self.duration_period = 0
self.history_reward = []
for app in self.all_apps:
self.A_c[app] = np.zeros((self.n_arms, self.n_features * 2, self.n_features * 2))
self.b_c[app] = np.zeros((self.n_arms, self.n_features * 2, 1))
self.p_c_t[app] = np.zeros(self.n_arms)
for arm in range(self.n_arms):
self.A_c[app][arm] = np.eye(self.n_features * 2)
def save_to_pickle(self, filename):
with open(filename, 'wb') as f:
pickle.dump(self, f)
def load_from_pickle(filename):
with open(filename, 'rb') as f:
return pickle.load(f)
if __name__ == '__main__':
mylist = [1, 2, 3, 4, 5, 6]
mylist2 = [1, 2, 3, 4, 5]

from abc import ABC, abstractmethod
import numpy as np
# from scipy.special import gamma
class ScheduleFrame:
def __init__(self):
def select_arm(self):
def update(self, reward, chosen_arm):
def get_now_reward(self, performance, context_info=None):
def latin_hypercube_sampling(n, d, m, M):
# Initialize the samples array
samples = np.zeros((n, d))
for i in range(n):
# Generate d random numbers and normalize them
while True:
x = np.random.uniform(m, M, d)
if np.sum(x) >= M:
x = x / np.sum(x) * M
# Check if all elements are in the range [m, M]
if np.all(x >= m) and np.all(x <= M):
samples[i] = x
# Re-generate the sample if it doesn't satisfy the constraints
i -= 1
for i in range(len(samples)):
samples[i] = [int(ele) for ele in samples[i]]
curr_sum = sum(samples[i])
samples[i][-1] += M - curr_sum
return samples
if __name__ == '__main__':
# Parameters
num_samples = 20 # Number of samples
num_dimensions = 3 # Number of dimensions
min_value = 0 # Minimum value of each element
total_sum = 240 # Total sum of elements in each sample
# Generate samples
samples = latin_hypercube_sampling(num_samples, num_dimensions, min_value, total_sum)

import numpy as np
import matplotlib.pyplot as plt
# 生成数据
x = np.linspace(0, 6000, 1000)
# Sigmoid 函数
def sigmoid(x, k=0.002, x0=3000):
return 1 / (1 + np.exp(-k * (x - x0)))
# Tanh 函数
def tanh(x, k=0.0005, x0=3000):
return (np.tanh(k * (x - x0)) + 1) / 2
# 指数归一化函数
def exponential_normalize(x, alpha=0.5):
return (x / 6000) ** alpha
# Arctan 函数
def arctan(x, k=0.001, x0=3000):
return 0.5 + (1 / np.pi) * np.arctan(k * (x - x0))
# 映射到 0-1
y1 = sigmoid(x)
y2 = tanh(x)
y3 = exponential_normalize(x)
y4 = arctan(x)
# 绘制图像
plt.plot(x, y1, label='sigmoid')
plt.plot(x, y2, label='tanh')
plt.plot(x, y3, label='exponential_normalize')
plt.plot(x, y4, label='arctan')
plt.title("Function Mapping")
plt.xlabel("Original Values")
plt.ylabel("Mapped Values (0-1)")

import re
import matplotlib.pyplot as plt
with open('../Logs/linucb.log', 'r', encoding='utf-8') as file:
text =
result = [[float(x) for x in match] for match in re.findall(r'reward : ([\d\.]+) \[(\d+), (\d+)\]', text)]
rewards = [item[0] for item in result]
write_pool_sizes = [item[1] for item in result]
read_pool_sizes = [item[2] for item in result]
# flush_pool_sizes = [item[3] for item in result]
def putAll():
plt.figure(figsize=(15, 6))
plt.plot(write_pool_sizes, label='WritePool Size', marker='o', markersize=5, alpha=0.5)
plt.plot(read_pool_sizes, label='ReadPool Size', marker='s', markersize=5, alpha=0.8)
# plt.plot(flush_pool_sizes, label='FlushPool Size', marker='^', markersize=5, alpha=0.8)
plt.title('Pool Sizes Over Time')
plt.xlabel('Time (Epoch)')
plt.ylabel('Pool Size')
plt.savefig('../figures/newPoolSize.png', dpi=300)
def separate():
# Plotting the data in a 2x2 grid
fig, axs = plt.subplots(2, 2, figsize=(18, 10))
# Write Pool size plot
axs[0, 0].plot(write_pool_sizes, marker='o', linestyle='-', color='blue')
axs[0, 0].set_title('Write Pool Size Over Time')
axs[0, 0].set_xlabel('Iteration')
axs[0, 0].set_ylabel('Write Pool Size')
axs[0, 0].grid(True)
# Read Pool size plot
axs[0, 1].plot(read_pool_sizes, marker='^', linestyle='-', color='green')
axs[0, 1].set_title('Read Pool Size Over Time')
axs[0, 1].set_xlabel('Iteration')
axs[0, 1].set_ylabel('Read Pool Size')
axs[0, 1].grid(True)
# Flush Pool size plot
# axs[1, 0].plot(flush_pool_sizes, marker='+', linestyle='-', color='red')
# axs[1, 0].set_title('Flush Pool Size Over Time')
# axs[1, 0].set_xlabel('Iteration')
# axs[1, 0].set_ylabel('Flush Pool Size')
# axs[1, 0].grid(True)
# Reward plot
axs[1, 1].plot(rewards, marker='*', linestyle='-', color='purple')
axs[1, 1].set_title('Reward Over Time')
axs[1, 1].set_xlabel('Iteration')
axs[1, 1].set_ylabel('Reward')
axs[1, 1].grid(True)
plt.savefig('../figures/pool_sizes_separate.png', dpi=300)
def calculate_greater_than_ratio(pool_sizes, average_size=80):
count_greater= sum(1 for size in pool_sizes if size > average_size)
total_count = len(pool_sizes)
ratio = count_greater / total_count
return ratio
def calculate_all_ratio():
average_size = 120 # 单机unet3d专属
train_point = 20 # 训练开始的节点
write_pool_ratio = calculate_greater_than_ratio(write_pool_sizes[train_point:], average_size)
read_pool_ratio = calculate_greater_than_ratio(read_pool_sizes[train_point:], average_size)
# flush_pool_ratio = calculate_greater_than_ratio(flush_pool_sizes[train_point:], average_size)
print(f'WritePool > 120 Ratio: {write_pool_ratio:.2%}')
print(f'ReadPool > 120 Ratio: {read_pool_ratio:.2%}')
# print(f'FlushPool > 120 Ratio: {flush_pool_ratio:.2%}')
if __name__ == '__main__':
for i in result:

import matplotlib.pyplot as plt
if __name__ == '__main__':
log_name = '/home/renfeng/tqy/JYCache/JYCache_Env/LinUCBSchedule/Logs/linucb.log'
index = []
reward = []
count = 1
# with open(log_name, 'r') as log_file:
# for line in log_file.readlines():
# # idx, rew = line.split(' ')
# rew = line
# idx = count
# index.append(idx)
# reward.append(rew)
# count += 1
with open(log_name, 'r') as log_file:
for line in log_file.readlines():
if 'reward' in line:
idx = count
reward_str = line.split(':')[1].split('[')[0].strip() # 提取 reward 数值部分
count += 1
reward = [float(i) for i in reward]
# best_score = 0.32
best_score = reward[0] * 1.2
target = [best_score] * len(index)
init_score = [reward[0]] * len(index)
plt.plot(index, reward)
plt.plot(index, target)
plt.plot(index, init_score)
# 设置刻度
plt.xticks(list(range(0, int(index[-1]), int(int(index[-1])/20))), rotation = 45)
plt.yticks([i * 0.2 for i in range(0, int(best_score // 0.2) + 2, 1)])

# The number of hypercube samples in the initial stage
# Ratio of sample, if ratio is 10 and num_cache is 100, valid solution in sample is 0, 10, 20, 30 ...... 100
# If no better config is found within threshold,
# it is determined that the model has entered a state of approximate convergence
# In approximate convergence state,There is a certain probability to directly choose the optimal solution
# that has been explored, otherwise continue to explore
# It takes time for rewards to level off, only after this threshold, detection begins for load changes
# The size of history reward sliding window

import csv
import os
import sys
import time
import numpy as np
from LinUCB import *
from EpsilonGreedy import *
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
from util import *
NUM_RESOURCES_CACHE = 60 #total units of cache size
def for_epsilon_greedy():
cm = simulation_config_management()
curr_config = cm.receive_config()
all_app = curr_config[0]
num_resources = int(np.sum(curr_config[1]))
chosen_arm = {}
curr_allocate = curr_config[1]
for j in range(len(all_app)):
chosen_arm[all_app[j]] = curr_allocate[j]
# initialize parameters
epsilon = 0.5
factor_alpha = 0.98
epochs = 3000
ucb_cache = EpsilonGreedy(epsilon, factor_alpha, len(all_app), num_resources)
file_ = open('linucb.log', 'w', newline='')
start_time = time.time()
for i in range(epochs):
performance = curr_config[2]
th_reward = ucb_cache.get_now_reward(performance)
ucb_cache.update(th_reward, chosen_arm)
# choose an arm
chosen_arm = ucb_cache.select_arm()
# prepare new config
new_config = [curr_config[0], chosen_arm]
# waiting for result
curr_config = cm.receive_config()
# write to log
log_info = str(i) + ' ' + str(th_reward) + '\n'
if (i + 1) % 100 == 0:
print('epoch [{} / {}]'.format(i + 1, epochs))
end_time = time.time()
print('used time :{}'.format(end_time - start_time))
def for_linucb():
cm = simulation_config_management()
curr_config = cm.receive_config()
all_app = curr_config[0]
num_resources = int(np.sum(curr_config[1]))
alpha = 0.90
factor_alpha = 0.98
n_features = 10
epochs = 1000
ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, n_features)
file_ = open('linucb.log', 'w', newline='')
start_time = time.time()
for i in range(epochs):
curr_allocate = curr_config[1]
performance = curr_config[2]
context_info = curr_config[3:]
chosen_arm = {}
for j in range(len(all_app)):
chosen_arm[all_app[j]] = curr_allocate[j]
th_reward = ucb_cache.get_now_reward(performance, context_info)
ucb_cache.update(th_reward, chosen_arm)
new_arm = ucb_cache.select_arm()
# prepare new config
new_config = [curr_config[0], new_arm]
# waiting for result
curr_config = cm.receive_config()
# write to log
log_info = str(i) + ' ' + str(th_reward) + '\n'
if (i + 1) % 100 == 0:
print('epoch [{} / {}]'.format(i + 1, epochs))
end_time = time.time()
print('used time :{}'.format(end_time - start_time))
def for_linucb_sample():
cm = simulation_config_management()
curr_config = cm.receive_config()
all_app = curr_config[0]
num_resources = int(np.sum(curr_config[1]))
alpha = 0.95
factor_alpha = 0.98
n_features = 10
epochs = 3000
ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, n_features, True)
file_ = open('linucb.log', 'w', newline='')
start_time = time.time()
for i in range(epochs):
new_arm = ucb_cache.select_arm()
# prepare new config
new_config = [curr_config[0], new_arm]
# waiting for result
curr_config = cm.receive_config()
curr_allocate = curr_config[1]
performance = curr_config[2]
context_info = curr_config[3:]
chosen_arm = {}
for j in range(len(all_app)):
chosen_arm[all_app[j]] = curr_allocate[j]
th_reward = ucb_cache.get_now_reward(performance, context_info)
ucb_cache.update(th_reward, chosen_arm)
# write to log
log_info = str(i) + ' ' + str(th_reward) + '\n'
if (i + 1) % 100 == 0:
print('epoch [{} / {}]'.format(i + 1, epochs))
end_time = time.time()
print('used time :{}'.format(end_time - start_time))
if __name__ == '__main__':
if __name__ == '__main__':
# cm = simulation_config_management()
# curr_config = cm.receive_config()
# num_apps = len(curr_config[0])
# num_resources = [NUM_RESOURCES_CACHE]
# all_cache_config = gen_all_config(num_apps, num_resources[0])
# num_config = len(all_cache_config)
# epochs = 1000
# file_ = open('linucb.log', 'w', newline='')
# app_name = curr_config[0]
# cache_config = curr_config[1]
# cache_hit = curr_config[2]
# cache_reward = curr_config[3]
# init_cache_index = all_cache_config.index(cache_config)
# # context infomation and reward
# context, th_reward = get_now_reward(cache_hit)
# # initialize parameters
# alpha = 0.999
# factor_alpha = 0.997
# n_features_cache = len(context)
# context_cache = np.array(context)
# num_arm = num_config
# lin_ucb_cache = LinUCB(alpha, num_arm, n_features_cache)
# # lin_ucb_cache = EpsilonGreedy(alpha, num_arm, n_features_cache)
# lin_ucb_cache.update(init_cache_index, th_reward, context_cache)
# start_time = time.time()
# for i in range(epochs):
# context, th_reward = get_now_reward(cache_hit)
# # print('----- reward is: ' + str(th_reward))
# context_cache = np.array(context)
# # choose a arm
# chosen_arm_cache = lin_ucb_cache.select_arm(context_cache, factor_alpha)
# cache_index = chosen_arm_cache
# # prepare new config
# new_config = []
# new_config.append(curr_config[0])
# new_partition = all_cache_config[cache_index]
# new_config.append(new_partition)
# # print('new config:' + str(new_config))
# cm.send_config(new_config)
# # waiting for result
# curr_config = cm.receive_config()
# # print('recv_config is: ' + str(curr_config))
# app_name = curr_config[0]
# cache_config = curr_config[1]
# cache_hit = curr_config[2]
# cache_reward = curr_config[3]
# context, th_reward = get_now_reward(cache_hit)
# context_cache = np.array(context)
# lin_ucb_cache.update(chosen_arm_cache, th_reward, context_cache)
# # write to log
# log_info = str(i) + ' ' + str(th_reward) + '\n'
# file_.write(log_info)
# if (i + 1) % 100 == 0:
# print('epoch [{} / {}]'.format(i + 1, epochs))
# end_time = time.time()
# print('used time :{}'.format(end_time-start_time))
# file_.close()

import csv
import os
import sys
import threading
import socket
import time
import numpy as np
from LinUCB import *
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
from server_util import *
NUM_RESOURCES_CACHE = 352 #total units of cache size 176*2
# initialize LinUCB parameters
alpha = 0.95
factor_alpha = 0.98
num_features = 1 # 缓冲池特征维度
# all_app = ['WritePool', 'ReadPool', 'FlushPool']
all_app = ['WritePool', 'ReadPool']
# 收到客户端发送的配置信息后使用LinUCB,生成最新的配置信息,再发送给客户端
def handle_client(client_socket, ucb_cache, file_writer):
# 从curve接收 pool_name, pool_size, pool_throughput, write read 函数的调用次数
request = client_socket.recv(1024).decode("utf-8")
print(f"Received: {request}")
pool_and_size, pool_and_throughput, pool_and_invoke = process_request(request)
pool_name = list(pool_and_size.keys())
pool_size = list(pool_and_size.values())
pool_throughput = list(pool_and_throughput.values())
context_info = [list(pool_and_invoke.values())]
if(np.sum(pool_throughput) > 0):
start_time = time.time()
chosen_arm = {}
for j in range(len(all_app)):
chosen_arm[all_app[j]] = pool_size[j]
# reward
th_reward = ucb_cache.get_now_reward(pool_throughput, context_info)
ucb_cache.update(th_reward, chosen_arm)
new_size = ucb_cache.select_arm()
end_time = time.time()
# write to log
start_time_log = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
end_time_log = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
file_writer.write('start time : ' + start_time_log + '\n')
run_time = end_time - start_time
file_writer.write(f'run time : {run_time:.2f} s\n')
file_writer.write('end time : ' + end_time_log + '\n')
log_info = 'reward : ' + str(th_reward) + ' ' + str(new_size) + '\n'
new_size = pool_size
# prepare new config
new_config = [pool_name, new_size]
print('new config:' + str(new_config))
# server发送最新的配置信息给curve
serialized_data = '\n'.join([' '.join(map(str, row)) for row in new_config])
# except Exception as e:
# print(f"An error occurred: {e}")
# 启动server以监听curve发送的配置信息
def start_server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(("", 2333))
# 允许同时最多5个客户端排队等待连接
print("Server listening on port 2333")
# 初始化LinUCB
file_ = open('Logs/linucb.log', 'w', newline='')
num_resources = NUM_RESOURCES_CACHE
# 开启采样
ucb_cache = LinUCB(all_app, num_resources, alpha, factor_alpha, num_features, True)
while True:
client_socket, addr = server.accept()
print(f"Accepted connection from {addr}")
client_handler = threading.Thread(
target=handle_client, args=(client_socket, ucb_cache, file_)
if __name__ == '__main__':

import numpy as np
# unet3d x0 设置为 3000对于 resnet50 x0 设置为 30000
def sigmoid(x, k=0.002, x0=30000):
return 1 / (1 + np.exp(-k * (x - x0)))
# 接收curve发送的request解析为pool_and_size, pool_and_throughput
def process_request(request):
pairs = request.split(';')[:-1]
pool_and_size = {}
pool_and_throughput = {}
pool_and_invoke = {}
pair1 = pairs[:2]
pair2 = pairs[2:4]
pair3 = pairs[4:]
for pair in pair1:
key,value = pair.split(':')
pool_and_size[key] = int(value)
for pair in pair2:
key,value = pair.split(':')
pool_and_throughput[key] = float(value)
for pair in pair3:
key,value = pair.split(':')
pool_and_invoke[key] = sigmoid(float(value))
return pool_and_size, pool_and_throughput, pool_and_invoke

import socket
import pickle
import time
host = ''
port = 54000
def get_last_line(file_name):
Open file and read the last line
file_name (str): file name
str: last line of the file, '' if the file not found
with open(file_name, 'rb') as file:,2)
while!=b'\n':, 1)
return file.readline().decode().strip()
except FileNotFoundError:
return ''
def get_pool_stats():
Communicating with cache server through socket, get current cache pool name and cache allocation
map<poolname->poolsize>: all pool in the cache and their pool size
# link the target program
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
message = "G:"
# print(message)
# wait the response
response = sock.recv(1024).decode()
# print(response)
deserialized_map = {}
pairs = response.split(';')[:-1]
for pair in pairs:
key,value = pair.split(':')
deserialized_map[key] = int(value)
return deserialized_map
def set_cache_size(workloads, cache_size):
Communicating with cache server through socket, adjust pool size to the new size
workloads (list<str>): pool name of all workloads
cache_size (list<int>): new size of each pool
# link the server program
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
curr_config = [workloads, cache_size]
serialized_data = '\n'.join([' '.join(map(str, row)) for row in curr_config])
serialized_data = 'S:' + serialized_data
# print(serialized_data)
# send to server
def receive_config():
Old version, Wait to receive the current resource config
list: [
[name of pool],
[allocation of resource A of every pool], # such as cache size,[16,16]
[context of resource A of every pool ], # for cache size ,context canbe hit_rate,[0.8254, 0.7563]
[latency of every warkload]
# create the server socket
# print("receive config")
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('', 1412))
# listening the old_config from the executor
client_socket, _ = server_socket.accept()
received_data = client_socket.recv(1024)
config = pickle.loads(received_data)
return config
def send_config(new_config):
Old version, Send the new config
list: [
[name of pool],
[new allocation of resource A of every pool]
serialized_config = pickle.dumps(new_config)
# connect to the bench
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(('', 1413))
print("send_config success!")
class config_management:
def __init__(self) -> None:
def receive_config(self):
curr_config = []
# pool name and cache allocation
pool_and_size = get_pool_stats()
pool_name = list(pool_and_size.keys())
pool_size = list(pool_and_size.values())
#TODO: cache hit rate
hitrate_log = ['/home/md/SHMCachelib/bin/' + name + '_hitrate.log' for name in pool_name]
hitrates = []
for log in hitrate_log:
#TODO: tail latency
latency_log = ['/home/md/SHMCachelib/bin/' + name + '_tailLatency.log' for name in pool_name]
latencies = []
for log in latency_log:
return curr_config
def send_config(self, new_config):
set_cache_size(new_config[0], new_config[1])
def userA_func(resources, threshold):
if threshold < 0:
return 0
if resources < threshold:
return resources * 0.095
return threshold * 0.095 + (resources - threshold) * 0.010
def userB_func(resources, threshold):
if threshold < 0:
return 0
if resources < threshold:
return resources * 0.040
return threshold * 0.040 + (resources - threshold) * 0.005
def uniform(resources):
return min(resources, 240) * 0.003
def sequential(resources):
return 0
def hotspot(resources):
return min(resources, 60) * 0.007 + min(max(0, resources - 60), 180) * 0.002
class userModel:
def __init__(self, name, resources=0, user_func=None): = name
self.resources = resources
self.user_func = user_func
class simulation_config_management:
def __init__(self):
self.total_resource = 240
self.all_user = [
userModel('A', 0, sequential),
userModel('B', 0, uniform),
userModel('C', 0, hotspot),
for u in self.all_user:
u.resources = self.total_resource // len(self.all_user)
self.n_features = 10
self.workload_change = 1000
self.counter = 0
def receive_config(self):
curr_config = []
curr_config.append([ for u in self.all_user])
curr_config.append([u.resources for u in self.all_user])
curr_config.append([u.user_func(u.resources) for u in self.all_user])
# context info
for i in range(self.n_features):
curr_config.append([1.0] * len(self.all_user))
self.counter = self.counter + 1
if self.workload_change > 0 and self.counter % self.workload_change == 0:
print('---------------------------workload change -----------------------------')
self.all_user[0].user_func, self.all_user[1].user_func = self.all_user[1].user_func, self.all_user[0].user_func
self.all_user[1].user_func, self.all_user[2].user_func = self.all_user[2].user_func, self.all_user[1].user_func
return curr_config
def send_config(self, new_config):
for i in range(len(self.all_user)):
self.all_user[i].resources = new_config[1][i]
if __name__ == '__main__':
cs = config_management()
curr = cs.receive_config()
for item in curr: