import os from pathlib import Path import pickle from tqdm import tqdm import random import copy from functionalities_test import is_identity_function, test_status from network import Net from visualization import plot_3d_self_train, plot_loss import numpy as np from tabulate import tabulate from sklearn.metrics import mean_absolute_error as MAE from sklearn.metrics import mean_squared_error as MSE import pandas as pd import seaborn as sns from matplotlib import pyplot as plt import torch import torch.nn.functional as F def prng(): return random.random() def l1(tup): a, b = tup return abs(a - b) def mean_invariate_manhattan_distance(x, y): # One of these one-liners that might be smart or really dumb. Goal is to find pairwise # distances of ascending values, ie. sum (abs(min1_X-min1_Y), abs(min2_X-min2Y) ...) / mean. # Idea was to find weight sets that have same values but just in different positions, that would # make this distance 0. try: return np.mean(list(map(l1, zip(sorted(x.detach().numpy()), sorted(y.detach().numpy()))))) except AttributeError: return np.mean(list(map(l1, zip(sorted(x.numpy()), sorted(y.numpy()))))) def distance_matrix(nets, distance="MIM", print_it=True): matrix = [[0 for _ in range(len(nets))] for _ in range(len(nets))] for net in range(len(nets)): weights = nets[net].input_weight_matrix()[:, 0] for other_net in range(len(nets)): other_weights = nets[other_net].input_weight_matrix()[:, 0] if distance in ["MSE"]: matrix[net][other_net] = MSE(weights, other_weights) elif distance in ["MAE"]: matrix[net][other_net] = MAE(weights, other_weights) elif distance in ["MIM"]: matrix[net][other_net] = mean_invariate_manhattan_distance(weights, other_weights) if print_it: print(f"\nDistance matrix (all to all) [{distance}]:") headers = [i.name for i in nets] print(tabulate(matrix, showindex=headers, headers=headers, tablefmt='orgtbl')) return matrix def distance_from_parent(nets, distance="MIM", print_it=True): list_of_matrices = [] parents = list(filter(lambda x: "clone" not in x.name and is_identity_function(x), nets)) distance_range = range(10) for parent in parents: parent_weights = parent.create_target_weights(parent.input_weight_matrix()) clones = list(filter(lambda y: parent.name in y.name and parent.name != y.name, nets)) matrix = [[0 for _ in distance_range] for _ in range(len(clones))] for dist in distance_range: for idx, clone in enumerate(clones): clone_weights = clone.create_target_weights(clone.input_weight_matrix()) if distance in ["MSE"]: matrix[idx][dist] = MSE(parent_weights, clone_weights) < pow(10, -dist) elif distance in ["MAE"]: matrix[idx][dist] = MAE(parent_weights, clone_weights) < pow(10, -dist) elif distance in ["MIM"]: matrix[idx][dist] = mean_invariate_manhattan_distance(parent_weights, clone_weights) < pow(10, -dist) if print_it: print(f"\nDistances from parent {parent.name} [{distance}]:") col_headers = [str(f"10e-{d}") for d in distance_range] row_headers = [str(f"clone_{i}") for i in range(len(clones))] print(tabulate(matrix, showindex=row_headers, headers=col_headers, tablefmt='orgtbl')) list_of_matrices.append(matrix) return list_of_matrices class SpawnExperiment: def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate, epochs, st_steps, nr_clones, noise, directory) -> None: self.population_size = population_size self.log_step_size = log_step_size self.net_input_size = net_input_size self.net_hidden_size = net_hidden_size self.net_out_size = net_out_size self.net_learning_rate = net_learning_rate self.epochs = epochs self.ST_steps = st_steps self.loss_history = [] self.nets = [] self.nr_clones = nr_clones self.noise = noise or 10e-5 print("\nNOISE:", self.noise) self.parents = [] self.directory = Path(directory) self.directory.mkdir(parents=True, exist_ok=True) self.populate_environment() self.spawn_and_continue() self.weights_evolution_3d_experiment() # self.visualize_loss() self.distance_matrix = distance_matrix(self.nets, print_it=False) self.parent_clone_distances = distance_from_parent(self.nets, print_it=False) def populate_environment(self): loop_population_size = tqdm(range(self.population_size)) for i in loop_population_size: loop_population_size.set_description("Populating experiment %s" % i) net_name = f"ST_net_{str(i)}" net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name) for _ in range(self.ST_steps): net.self_train(1, self.log_step_size, self.net_learning_rate) self.nets.append(net) self.parents.append(net) def spawn_and_continue(self, number_clones: int = None): number_clones = number_clones or self.nr_clones df = pd.DataFrame( columns=['name', 'MAE_pre', 'MAE_post', 'MSE_pre', 'MSE_post', 'MIM_pre', 'MIM_post', 'noise', 'status_post']) # For every initial net {i} after populating (that is fixpoint after first epoch); for i in range(self.population_size): net = self.nets[i] # We set parent start_time to just before this epoch ended, so plotting is zoomed in. Comment out to # to see full trajectory (but the clones will be very hard to see). # Make one target to compare distances to clones later when they have trained. net.start_time = self.ST_steps - 350 net_input_data = net.input_weight_matrix() net_target_data = net.create_target_weights(net_input_data) if is_identity_function(net): print(f"\nNet {i} is fixpoint") # Clone the fixpoint x times and add (+-)self.noise to weight-sets randomly; # To plot clones starting after first epoch (z=ST_steps), set that as start_time! # To make sure PCA will plot the same trajectory up until this point, we clone the # parent-net's weight history as well. for j in range(number_clones): clone = Net(net.input_size, net.hidden_size, net.out_size, f"ST_net_{str(i)}_clone_{str(j)}", start_time=self.ST_steps) clone.load_state_dict(copy.deepcopy(net.state_dict())) rand_noise = prng() * self.noise clone = clone.apply_noise(rand_noise) clone.s_train_weights_history = copy.deepcopy(net.s_train_weights_history) clone.number_trained = copy.deepcopy(net.number_trained) # Pre Training distances (after noise application of course) clone_pre_weights = clone.create_target_weights(clone.input_weight_matrix()) MAE_pre = MAE(net_target_data, clone_pre_weights) MSE_pre = MSE(net_target_data, clone_pre_weights) MIM_pre = mean_invariate_manhattan_distance(net_target_data, clone_pre_weights) # Then finish training each clone {j} (for remaining epoch-1 * ST_steps) .. for _ in range(self.epochs - 1): for _ in range(self.ST_steps): clone.self_train(1, self.log_step_size, self.net_learning_rate) # Post Training distances for comparison clone_post_weights = clone.create_target_weights(clone.input_weight_matrix()) MAE_post = MAE(net_target_data, clone_post_weights) MSE_post = MSE(net_target_data, clone_post_weights) MIM_post = mean_invariate_manhattan_distance(net_target_data, clone_post_weights) # .. log to data-frame and add to nets for 3d plotting if they are fixpoints themselves. test_status(clone) if is_identity_function(clone): print(f"Clone {j} (of net_{i}) is fixpoint." f"\nMSE({i},{j}): {MSE_post}" f"\nMAE({i},{j}): {MAE_post}" f"\nMIM({i},{j}): {MIM_post}\n") self.nets.append(clone) df.loc[clone.name] = [clone.name, MAE_pre, MAE_post, MSE_pre, MSE_post, MIM_pre, MIM_post, self.noise, clone.is_fixpoint] # Finally take parent net {i} and finish it's training for comparison to clone development. for _ in range(self.epochs - 1): for _ in range(self.ST_steps): net.self_train(1, self.log_step_size, self.net_learning_rate) net_weights_after = net.create_target_weights(net.input_weight_matrix()) print(f"Parent net's distance to original position." f"\nMSE(OG,new): {MAE(net_target_data, net_weights_after)}" f"\nMAE(OG,new): {MSE(net_target_data, net_weights_after)}" f"\nMIM(OG,new): {mean_invariate_manhattan_distance(net_target_data, net_weights_after)}\n") self.df = df def weights_evolution_3d_experiment(self): exp_name = f"ST_{str(len(self.nets))}_nets_3d_weights_PCA" return plot_3d_self_train(self.nets, exp_name, self.directory, self.log_step_size, plot_pca_together=True) def visualize_loss(self): for i in range(len(self.nets)): net_loss_history = self.nets[i].loss_history self.loss_history.append(net_loss_history) plot_loss(self.loss_history, self.directory) if __name__ == "__main__": NET_INPUT_SIZE = 4 NET_OUT_SIZE = 1 # Define number of runs & name: ST_runs = 1 ST_runs_name = "test-27" ST_steps = 2500 ST_epochs = 2 ST_log_step_size = 10 # Define number of networks & their architecture nr_clones = 10 ST_population_size = 1 ST_net_hidden_size = 2 ST_net_learning_rate = 0.04 ST_name_hash = random.getrandbits(32) print(f"Running the Spawn experiment:") exp_list = [] for noise_factor in range(2, 3): exp = SpawnExperiment( population_size=ST_population_size, log_step_size=ST_log_step_size, net_input_size=NET_INPUT_SIZE, net_hidden_size=ST_net_hidden_size, net_out_size=NET_OUT_SIZE, net_learning_rate=ST_net_learning_rate, epochs=ST_epochs, st_steps=ST_steps, nr_clones=nr_clones, noise=pow(10, -noise_factor), directory=Path('output') / 'spawn_basin' / f'{ST_name_hash}' / f'10e-{noise_factor}' ) exp_list.append(exp) directory = Path('output') / 'spawn_basin' / f'{ST_name_hash}' pickle.dump(exp_list, open(f"{directory}/experiment_pickle_{ST_name_hash}.p", "wb")) print(f"\nSaved experiment to {directory}.") # Concat all dataframes, and add columns depending on where clone weights end up after training (rel. to parent) df = pd.concat([exp.df for exp in exp_list]) df = df.dropna().reset_index() df["relative_distance"] = [ (df.loc[i]["MAE_pre"] - df.loc[i]["MAE_post"])/df.loc[i]["noise"] for i in range(len(df))] df["class"] = [ "approaching" if df.loc[i]["relative_distance"] > 0 else "distancing" if df.loc[i]["relative_distance"] < 0 else "stationary" for i in range(len(df))] # Countplot of all fixpoint clone after training per class. ax = sns.catplot(kind="count", data=df, x="noise", hue="class", height=5.27, aspect=11.7/5.27, legend=False) ax.set_axis_labels("Noise Levels", "Clone Fixpoints After Training Count ", fontsize=15) ax.set_xticklabels(labels=('$\mathregular{10^{-10}}$', '$\mathregular{10^{-9}}$', '$\mathregular{10^{-8}}$', '$\mathregular{10^{-7}}$', '$\mathregular{10^{-6}}$', '$\mathregular{10^{-5}}$', '$\mathregular{10^{-4}}$', '$\mathregular{10^{-5}}$', '$\mathregular{10^{-2}}$', '$\mathregular{10^{-1}}$'), fontsize=15) plt.legend(bbox_to_anchor=(0.01, 0.85), loc=2, borderaxespad=0.) plt.legend(fontsize='large') plt.savefig(f"{directory}/clone_status_after_countplot_{ST_name_hash}.png") plt.clf() # Catplot of before-after comparison of the clone's weights. Colors links depending on class (approaching, distancing, stationary (i.e., MAE=0)). Blue, orange and green are based on countplot above, should be save for colorblindness (see https://gist.github.com/mwaskom/b35f6ebc2d4b340b4f64a4e28e778486)- mlt = df.melt(id_vars=["name", "noise", "class"], value_vars=["MAE_pre", "MAE_post"], var_name="State", value_name="Distance") P = ["blue" if mlt.loc[i]["class"] == "approaching" else "orange" if mlt.loc[i]["class"] == "distancing" else "green" for i in range(len(mlt))] P = sns.color_palette(P, as_cmap=False) ax = sns.catplot(data=mlt, x="State", y="Distance", col="noise", hue="name", kind="point", palette=P, col_wrap=min(5, len(exp_list)), sharey=False, legend=False) ax.map(sns.boxplot, "State", "Distance", "noise", linewidth=0.8, order=["MAE_pre", "MAE_post"], whis=[0, 100]) ax.set_axis_labels("", "Manhattan Distance To Parent Weights", fontsize=15) ax.set_xticklabels(labels=('after noise application', 'after training'), fontsize=15) # plt.ticklabel_format(style='sci', axis='x') plt.savefig(f"{directory}/before_after_distance_catplot_{ST_name_hash}.png") plt.clf() # Catplot of child_nets L1 Prediction "progress" compared to parents. Computes one round of accuracy first. If net is a parent net (not a clone), then we reset weights to timestep of cloning first (from the weight history). So 5k (end) -> 2.5k training (in this experiment, so careful with len(history)/2, this might only work here!) df_acc = pd.DataFrame(columns=["name", "noise", "l1_acc", "Network Type"]) for i in range(len(exp_list)): noise = exp_list[i].noise print(f"\nNoise: {noise}") for network in exp_list[i].nets: is_parent = "clone" not in network.name if is_parent: network.apply_weights(torch.tensor(network.s_train_weights_history[int(len(network.s_train_weights_history)/2)][0])) input_data = network.input_weight_matrix() target_data = network.create_target_weights(input_data) predicted_values = network(input_data) mse_loss = F.mse_loss(target_data, predicted_values).item() l1_loss = F.l1_loss(target_data, predicted_values).item() df_acc.loc[len(df_acc)+1] = [network.name, noise, l1_loss, "parents" if is_parent else "child_nets"] print("MSE:", mse_loss, "\t", "L1: ", l1_loss, "\t", network.name) # Note: If there are outliers then showfliers=False is necessary or it will zoom way to far out. If parent and child_nets accuracy is too far apart this plot might not work (only shows either parents or part of the child_nets). ax = sns.catplot(data=df_acc, y="l1_acc", x="noise", hue="Network Type", kind="box", legend=False, showfliers=False, height=5.27, aspect=11.7/5.27, sharey=False) ax.map(plt.axhline, y=10**-6, ls='--') ax.map(plt.axhline, y=10**-7, ls='--') ax.set_axis_labels("Noise levels", "L1 Prediction Loss After Training", fontsize=15) ax.set_xticklabels(labels=('$\mathregular{10^{-10}}$', '$\mathregular{10^{-9}}$', '$\mathregular{10^{-8}}$', '$\mathregular{10^{-7}}$', '$\mathregular{10^{-6}}$', '$\mathregular{10^{-5}}$', '$\mathregular{10^{-4}}$', '$\mathregular{10^{-5}}$', '$\mathregular{10^{-2}}$', '$\mathregular{10^{-1}}$'), fontsize=15) plt.legend(bbox_to_anchor=(0.01, 0.85), loc=2, borderaxespad=0.) plt.legend(fontsize='large') plt.savefig(f"{directory}/parent_vs_children_accuracy_{ST_name_hash}.png") plt.clf()