uploaded my code (not yet 100% finished)

This commit is contained in:
Cristian Lenta
2021-05-03 06:43:53 +00:00
parent 9bd65713fe
commit e176d05cf5
5 changed files with 1402 additions and 0 deletions

737
experiments.py Normal file
View File

@ -0,0 +1,737 @@
import copy
import random
import os.path
import pickle
from tokenize import String
from tqdm import tqdm
from functionalities_test import test_for_fixpoints, is_zero_fixpoint, is_identity_function
from network import Net
from visualization import plot_loss, bar_chart_fixpoints, plot_3d_soup, line_chart_fixpoints, box_plot, write_file
from visualization import plot_3d_self_application, plot_3d_self_train
class SelfTrainExperiment:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, directory_name) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.epochs = epochs
self.loss_history = []
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.directory_name = directory_name
os.mkdir(self.directory_name)
self.nets = []
# Create population:
self.populate_environment()
self.weights_evolution_3d_experiment()
self.count_fixpoints()
self.visualize_loss()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating ST experiment %s" % i)
net_name = f"ST_net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
input_data = net.input_weight_matrix()
target_data = net.create_target_weights(input_data)
net.self_train(self.epochs, self.log_step_size, self.net_learning_rate, input_data, target_data)
self.nets.append(net)
def weights_evolution_3d_experiment(self):
exp_name = f"ST_{str(len(self.nets))}_nets_3d_weights_PCA"
return plot_3d_self_train(self.nets, exp_name, self.directory_name, self.log_step_size)
def count_fixpoints(self):
test_for_fixpoints(self.fixpoint_counters, self.nets)
exp_details = f"Self-train for {self.epochs} epochs"
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
def visualize_loss(self):
for i in range(len(self.nets)):
net_loss_history = self.nets[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory_name)
class SelfApplicationExperiment:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size,
net_learning_rate, application_steps, train_nets, directory_name, training_steps
) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.SA_steps = application_steps #
self.train_nets = train_nets
self.ST_steps = training_steps
self.directory_name = directory_name
os.mkdir(self.directory_name)
""" Creating the nets & making the SA steps & (maybe) also training the networks. """
self.nets = []
# Create population:
self.populate_environment()
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.weights_evolution_3d_experiment()
self.count_fixpoints()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating SA experiment %s" % i)
net_name = f"SA_net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name
)
input_data = net.input_weight_matrix()
target_data = net.create_target_weights(input_data)
if self.train_nets == "before_SA":
net.self_train(self.ST_steps, self.log_step_size, self.net_learning_rate, input_data, target_data)
net.self_application(input_data, self.SA_steps, self.log_step_size)
elif self.train_nets == "after_SA":
net.self_application(input_data, self.SA_steps, self.log_step_size)
net.self_train(self.ST_steps, self.log_step_size, self.net_learning_rate, input_data, target_data)
else:
net.self_application(input_data, self.SA_steps, self.log_step_size)
self.nets.append(net)
def weights_evolution_3d_experiment(self):
exp_name = f"SA_{str(len(self.nets))}_nets_3d_weights_PCA"
plot_3d_self_application(self.nets, exp_name, self.directory_name, self.log_step_size)
def count_fixpoints(self):
test_for_fixpoints(self.fixpoint_counters, self.nets)
exp_details = f"{self.SA_steps} SA steps"
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
class SoupExperiment:
def __init__(self, population_size, net_i_size, net_h_size, net_o_size, learning_rate, attack_chance,
train_nets, ST_steps, epochs, log_step_size, directory_name):
super().__init__()
self.population_size = population_size
self.net_input_size = net_i_size
self.net_hidden_size = net_h_size
self.net_out_size = net_o_size
self.net_learning_rate = learning_rate
self.attack_chance = attack_chance
self.train_nets = train_nets
# self.SA_steps = SA_steps
self.ST_steps = ST_steps
self.epochs = epochs
self.log_step_size = log_step_size
self.loss_history = []
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
# <self.fixpoint_counters_history> is used for keeping track of the amount of fixpoints in %
self.fixpoint_counters_history = []
self.directory_name = directory_name
os.mkdir(self.directory_name)
self.population = []
self.populate_environment()
self.evolve()
self.fixpoint_percentage()
self.weights_evolution_3d_experiment()
self.count_fixpoints()
self.visualize_loss()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in tqdm(range(self.population_size)):
loop_population_size.set_description("Populating soup experiment %s" % i)
net_name = f"soup_network_{i}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
self.population.append(net)
def evolve(self):
""" Evolving consists of attacking & self-training. """
loop_epochs = tqdm(range(self.epochs))
for i in loop_epochs:
loop_epochs.set_description("Evolving soup %s" % i)
# A network attacking another network with a given percentage
chance = random.randint(1, 100)
if chance <= self.attack_chance:
random_net1, random_net2 = random.sample(range(self.population_size), 2)
random_net1 = self.population[random_net1]
random_net2 = self.population[random_net2]
print(f"\n Attack: {random_net1.name} -> {random_net2.name}")
random_net1.attack(random_net2)
# Self-training each network in the population
for j in range(self.population_size):
net = self.population[j]
input_data = net.input_weight_matrix()
target_data = net.create_target_weights(input_data)
net.self_train(self.ST_steps, self.log_step_size, self.net_learning_rate, input_data, target_data)
# Testing for fixpoints after each batch of ST steps to see relevant data
if i % self.ST_steps == 0:
test_for_fixpoints(self.fixpoint_counters, self.population)
fixpoints_percentage = round((self.fixpoint_counters["fix_zero"] + self.fixpoint_counters["fix_weak"] +
self.fixpoint_counters["fix_sec"]) / self.population_size, 1)
self.fixpoint_counters_history.append(fixpoints_percentage)
# Resetting the fixpoint counter. Last iteration not to be reset - it is important for the bar_chart_fixpoints().
if i < self.epochs:
self.reset_fixpoint_counters()
def weights_evolution_3d_experiment(self):
exp_name = f"soup_{self.population_size}_nets_{self.ST_steps}_training_{self.epochs}_epochs"
return plot_3d_soup(self.population, exp_name, self.directory_name)
def count_fixpoints(self):
test_for_fixpoints(self.fixpoint_counters, self.population)
exp_details = f"Evolution steps: {self.epochs} epochs"
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
def fixpoint_percentage(self):
runs = self.epochs / self.ST_steps
SA_steps = None
line_chart_fixpoints(self.fixpoint_counters_history, runs, self.ST_steps, SA_steps, self.directory_name,
self.population_size)
def visualize_loss(self):
for i in range(len(self.population)):
net_loss_history = self.population[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory_name)
def reset_fixpoint_counters(self):
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
class MixedSettingExperiment:
def __init__(self, population_size, net_i_size, net_h_size, net_o_size, learning_rate, train_nets,
epochs, SA_steps, ST_steps_between_SA, log_step_size, directory_name):
super().__init__()
self.population_size = population_size
self.net_input_size = net_i_size
self.net_hidden_size = net_h_size
self.net_out_size = net_o_size
self.net_learning_rate = learning_rate
self.train_nets = train_nets
self.epochs = epochs
self.SA_steps = SA_steps
self.ST_steps_between_SA = ST_steps_between_SA
self.log_step_size = log_step_size
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.loss_history = []
self.fixpoint_counters_history = []
self.directory_name = directory_name
os.mkdir(self.directory_name)
self.nets = []
self.populate_environment()
self.fixpoint_percentage()
self.weights_evolution_3d_experiment()
self.count_fixpoints()
self.visualize_loss()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating mixed experiment %s" % i)
net_name = f"mixed_net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
self.nets.append(net)
loop_epochs = tqdm(range(self.epochs))
for j in loop_epochs:
loop_epochs.set_description("Running mixed experiment %s" % j)
for i in loop_population_size:
net = self.nets[i]
input_data = net.input_weight_matrix()
target_data = net.create_target_weights(input_data)
if self.train_nets == "before_SA":
net.self_train(self.ST_steps_between_SA, self.log_step_size, self.net_learning_rate, input_data,
target_data)
net.self_application(input_data, self.SA_steps, self.log_step_size)
elif self.train_nets == "after_SA":
net.self_application(input_data, self.SA_steps, self.log_step_size)
net.self_train(self.ST_steps_between_SA, self.log_step_size, self.net_learning_rate, input_data,
target_data)
test_for_fixpoints(self.fixpoint_counters, self.nets)
# Rounding the result not to run into other problems later regarding the exact representation of floating number
fixpoints_percentage = round((self.fixpoint_counters["fix_zero"] + self.fixpoint_counters[
"fix_sec"]) / self.population_size, 1)
self.fixpoint_counters_history.append(fixpoints_percentage)
# Resetting the fixpoint counter. Last iteration not to be reset - it is important for the bar_chart_fixpoints().
if j < self.epochs:
self.reset_fixpoint_counters()
def weights_evolution_3d_experiment(self):
exp_name = f"Mixed {str(len(self.nets))}"
# This batch size is not relevant for mixed settings because during an epoch there are more steps of SA & ST happening
# and only they need the batch size. To not affect the number of epochs shown in the 3D plot, will send
# forward the number "1" for batch size with the variable <irrelevant_batch_size>
irrelevant_batch_size = 1
plot_3d_self_train(self.nets, exp_name, self.directory_name, irrelevant_batch_size)
def count_fixpoints(self):
exp_details = f"SA steps: {self.SA_steps}; ST steps: {self.ST_steps_between_SA}"
test_for_fixpoints(self.fixpoint_counters, self.nets)
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
def fixpoint_percentage(self):
line_chart_fixpoints(self.fixpoint_counters_history, self.epochs, self.ST_steps_between_SA,
self.SA_steps, self.directory_name, self.population_size)
def visualize_loss(self):
for i in range(len(self.nets)):
net_loss_history = self.nets[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory_name)
def reset_fixpoint_counters(self):
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
class RobustnessExperiment:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
ST_steps, directory_name) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.ST_steps = ST_steps
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.id_functions = []
self.directory_name = directory_name
os.mkdir(self.directory_name)
self.nets = []
# Create population:
self.populate_environment()
self.count_fixpoints()
self.test_robustness()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating robustness experiment %s" % i)
net_name = f"net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
input_data = net.input_weight_matrix()
target_data = net.create_target_weights(input_data)
net.self_train(self.ST_steps, self.log_step_size, self.net_learning_rate, input_data, target_data)
self.nets.append(net)
def test_robustness(self):
test_for_fixpoints(self.fixpoint_counters, self.nets, self.id_functions)
zero_epsilon = pow(10, -5)
data = [[0 for _ in range(10)] for _ in range(len(self.id_functions))]
for i in range(len(self.id_functions)):
for j in range(10):
original_net = self.id_functions[i]
# Creating a clone of the network. Not by copying it, but by creating a completely new network
# and changing its weights to the original ones.
original_net_clone = Net(original_net.input_size, original_net.hidden_size, original_net.out_size,
original_net.name)
# Extra safety for the value of the weights
original_net_clone.load_state_dict(copy.deepcopy(original_net.state_dict()))
input_data = original_net_clone.input_weight_matrix()
target_data = original_net_clone.create_target_weights(input_data)
changed_weights = copy.deepcopy(input_data)
for k in range(len(input_data)):
changed_weights[k][0] = changed_weights[k][0] + pow(10, -j)
# Testing if the new net is still an identity function after applying noise
still_id_func = is_identity_function(original_net_clone, changed_weights, target_data, zero_epsilon)
# If the net is still an id. func. after applying the first run of noise, continue to apply it until otherwise
while still_id_func and data[i][j] <= 1000:
data[i][j] += 1
new_weights = original_net_clone.create_target_weights(changed_weights)
original_net_clone = original_net_clone.apply_weights(original_net_clone, new_weights)
still_id_func = is_identity_function(original_net_clone, input_data, target_data, zero_epsilon)
if data.count(0) == 10:
print(f"There is no network resisting the robustness test.")
text = f"For this population of \n {self.population_size} networks \n there is no" \
f" network resisting the robustness test."
write_file(text, self.directory_name)
else:
box_plot(data, self.directory_name, self.population_size)
def count_fixpoints(self):
exp_details = f"ST steps: {self.ST_steps}"
test_for_fixpoints(self.fixpoint_counters, self.nets)
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
""" ----------------------------------------------- Running the experiments ----------------------------------------------- """
def run_ST_experiment(population_size, batch_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, runs, run_name, name_hash):
experiments = {}
check_folder("self_training")
# Running the experiments
for i in range(runs):
ST_directory_name = f"experiments/self_training/{run_name}_run_{i}_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
ST_experiment = SelfTrainExperiment(
population_size,
batch_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
epochs,
ST_directory_name
)
pickle.dump(ST_experiment, open(f"{ST_directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = ST_experiment
# Building a summary of all the runs
directory_name = f"experiments/self_training/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
os.mkdir(directory_name)
summary_pre_title = "ST"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory_name,
summary_pre_title)
def run_SA_experiment(population_size, batch_size, net_input_size, net_hidden_size, net_out_size,
net_learning_rate, runs, run_name, name_hash, application_steps, train_nets, training_steps):
experiments = {}
check_folder("self_application")
# Running the experiments
for i in range(runs):
directory_name = f"experiments/self_application/{run_name}_run_{i}_{str(population_size)}_nets_{application_steps}_SA_{str(name_hash)}"
SA_experiment = SelfApplicationExperiment(
population_size,
batch_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
application_steps,
train_nets,
directory_name,
training_steps
)
pickle.dump(SA_experiment, open(f"{directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = SA_experiment
# Building a summary of all the runs
directory_name = f"experiments/self_application/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{application_steps}_SA_{str(name_hash)}"
os.mkdir(directory_name)
summary_pre_title = "SA"
summary_fixpoint_experiment(runs, population_size, application_steps, experiments, net_learning_rate,
directory_name,
summary_pre_title)
def run_soup_experiment(population_size, attack_chance, net_input_size, net_hidden_size, net_out_size,
net_learning_rate, epochs, batch_size, runs, run_name, name_hash, ST_steps, train_nets):
experiments = {}
fixpoints_percentages = []
check_folder("soup")
# Running the experiments
for i in range(runs):
directory_name = f"experiments/soup/{run_name}_run_{i}_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
soup_experiment = SoupExperiment(
population_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
attack_chance,
train_nets,
ST_steps,
epochs,
batch_size,
directory_name
)
pickle.dump(soup_experiment, open(f"{directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = soup_experiment
# Building history of fixpoint percentages for summary
fixpoint_counters_history = soup_experiment.fixpoint_counters_history
if not fixpoints_percentages:
fixpoints_percentages = soup_experiment.fixpoint_counters_history
else:
# Using list comprehension to make the sum of all the percentages
fixpoints_percentages = [fixpoints_percentages[i] + fixpoint_counters_history[i] for i in
range(len(fixpoints_percentages))]
# Creating a folder for the summary of the current runs
directory_name = f"experiments/soup/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
os.mkdir(directory_name)
# Building a summary of all the runs
summary_pre_title = "soup"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory_name,
summary_pre_title)
SA_steps = None
summary_fixpoint_percentage(runs, epochs, fixpoints_percentages, ST_steps, SA_steps, directory_name,
population_size)
def run_mixed_experiment(population_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate, train_nets,
epochs, SA_steps, ST_steps_between_SA, batch_size, name_hash, runs, run_name):
experiments = {}
fixpoints_percentages = []
check_folder("mixed")
# Running the experiments
for i in range(runs):
directory_name = f"experiments/mixed/{run_name}_run_{i}_{str(population_size)}_nets_{SA_steps}_SA_{ST_steps_between_SA}_ST_{str(name_hash)}"
mixed_experiment = MixedSettingExperiment(
population_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
train_nets,
epochs,
SA_steps,
ST_steps_between_SA,
batch_size,
directory_name
)
pickle.dump(mixed_experiment, open(f"{directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = mixed_experiment
# Building history of fixpoint percentages for summary
fixpoint_counters_history = mixed_experiment.fixpoint_counters_history
if not fixpoints_percentages:
fixpoints_percentages = mixed_experiment.fixpoint_counters_history
else:
# Using list comprehension to make the sum of all the percentages
fixpoints_percentages = [fixpoints_percentages[i] + fixpoint_counters_history[i] for i in
range(len(fixpoints_percentages))]
# Building a summary of all the runs
directory_name = f"experiments/mixed/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{str(name_hash)}"
os.mkdir(directory_name)
summary_pre_title = "mixed"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory_name,
summary_pre_title)
summary_fixpoint_percentage(runs, epochs, fixpoints_percentages, ST_steps_between_SA, SA_steps, directory_name,
population_size)
def run_robustness_experiment(population_size, batch_size, net_input_size, net_hidden_size, net_out_size,
net_learning_rate, epochs, runs, run_name, name_hash):
experiments = {}
check_folder("robustness")
# Running the experiments
for i in range(runs):
ST_directory_name = f"experiments/robustness/{run_name}_run_{i}_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
robustness_experiment = RobustnessExperiment(
population_size,
batch_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
epochs,
ST_directory_name
)
pickle.dump(robustness_experiment, open(f"{ST_directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = robustness_experiment
# Building a summary of all the runs
directory_name = f"experiments/robustness/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{str(name_hash)}"
os.mkdir(directory_name)
summary_pre_title = "robustness"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory_name,
summary_pre_title)
""" ----------------------------------------- Methods for summarizing the experiments ------------------------------------------ """
def summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory_name,
summary_pre_title):
avg_fixpoint_counters = {
"avg_identity_func": 0,
"avg_divergent": 0,
"avg_fix_zero": 0,
"avg_fix_weak": 0,
"avg_fix_sec": 0,
"avg_other_func": 0
}
for i in range(len(experiments)):
fixpoint_counters = experiments[i].fixpoint_counters
avg_fixpoint_counters["avg_identity_func"] += fixpoint_counters["identity_func"]
avg_fixpoint_counters["avg_divergent"] += fixpoint_counters["divergent"]
avg_fixpoint_counters["avg_fix_zero"] += fixpoint_counters["fix_zero"]
avg_fixpoint_counters["avg_fix_weak"] += fixpoint_counters["fix_weak"]
avg_fixpoint_counters["avg_fix_sec"] += fixpoint_counters["fix_sec"]
avg_fixpoint_counters["avg_other_func"] += fixpoint_counters["other_func"]
# Calculating the average for each fixpoint
avg_fixpoint_counters.update((x, y / len(experiments)) for x, y in avg_fixpoint_counters.items())
# Checking where the data is coming from to have a relevant title in the plot.
if summary_pre_title not in ["ST", "SA", "soup", "mixed", "robustness"]:
summary_pre_title = ""
# Plotting the summary
source_checker = "summary"
exp_details = f"{summary_pre_title}: {runs} runs & {epochs} epochs each."
bar_chart_fixpoints(avg_fixpoint_counters, population_size, directory_name, net_learning_rate, exp_details,
source_checker)
def summary_fixpoint_percentage(runs, epochs, fixpoints_percentages, ST_steps, SA_steps, directory_name,
population_size):
fixpoints_percentages = [round(fixpoints_percentages[i] / runs, 1) for i in range(len(fixpoints_percentages))]
# Plotting summary
if "soup" in directory_name:
line_chart_fixpoints(fixpoints_percentages, epochs / ST_steps, ST_steps, SA_steps, directory_name,
population_size)
else:
line_chart_fixpoints(fixpoints_percentages, epochs, ST_steps, SA_steps, directory_name, population_size)
""" --------------------------------------------------- Miscellaneous ---------------------------------------------------------- """
def check_folder(experiment_folder: String):
if not os.path.isdir("experiments"): os.mkdir(f"experiments/")
if not os.path.isdir(f"experiments/{experiment_folder}/"): os.mkdir(f"experiments/{experiment_folder}/")

103
functionalities_test.py Normal file
View File

@ -0,0 +1,103 @@
import copy
from typing import Dict, List
import numpy as np
from torch import Tensor
from network import Net
def overall_fixpoint_test(network: Net, epsilon: float, input_data) -> bool:
predicted_values = network(input_data)
check_smaller_epsilon = all(epsilon > predicted_values)
check_greater_epsilon = all(-epsilon < predicted_values)
if check_smaller_epsilon and check_greater_epsilon:
return True
else:
return False
def is_divergent(network: Net) -> bool:
for i in network.input_weight_matrix():
weight_value = i[0].item()
if np.isnan(weight_value) or np.isinf(weight_value):
return True
return False
def is_identity_function(network: Net, input_data: Tensor, target_data: Tensor, epsilon=pow(10, -5)) -> bool:
predicted_values = network(input_data)
return np.allclose(target_data.detach().numpy(), predicted_values.detach().numpy(), 0, epsilon)
def is_zero_fixpoint(network: Net, input_data: Tensor, epsilon=pow(10, -5)) -> bool:
result = overall_fixpoint_test(network, epsilon, input_data)
return result
def is_secondary_fixpoint(network: Net, input_data: Tensor, epsilon: float) -> bool:
""" Secondary fixpoint check is done like this: compare first INPUT with second OUTPUT.
If they are within the boundaries, then is secondary fixpoint. """
# Calculating first output
first_output = network(input_data)
# Getting the second output by initializing a new net with the weights of the original net.
net_copy = copy.deepcopy(network)
net_copy.apply_weights(net_copy, first_output)
input_data_2 = net_copy.input_weight_matrix()
# Calculating second output
second_output = network(input_data_2)
check_smaller_epsilon = all(epsilon > second_output)
check_greater_epsilon = all(-epsilon < second_output)
if check_smaller_epsilon and check_greater_epsilon:
return True
else:
return False
def is_weak_fixpoint(network: Net, input_data: Tensor, epsilon: float) -> bool:
result = overall_fixpoint_test(network, epsilon, input_data)
return result
def test_for_fixpoints(fixpoint_counter: Dict, nets: List, id_functions=[]):
zero_epsilon = pow(10, -5)
epsilon = pow(10, -3)
for i in range(len(nets)):
net = nets[i]
input_data = net.input_weight_matrix()
target_data = net.create_target_weights(input_data)
if is_divergent(nets[i]):
fixpoint_counter["divergent"] += 1
nets[i].is_fixpoint = "divergent"
elif is_identity_function(nets[i], input_data, target_data, zero_epsilon):
fixpoint_counter["identity_func"] += 1
nets[i].is_fixpoint = "identity_func"
id_functions.append(nets[i])
elif is_zero_fixpoint(nets[i], input_data, zero_epsilon):
fixpoint_counter["fix_zero"] += 1
nets[i].is_fixpoint = "fix_zero"
elif is_weak_fixpoint(nets[i], input_data, epsilon):
fixpoint_counter["fix_weak"] += 1
nets[i].is_fixpoint = "fix_weak"
elif is_secondary_fixpoint(nets[i], input_data, zero_epsilon):
fixpoint_counter["fix_sec"] += 1
nets[i].is_fixpoint = "fix_sec"
else:
fixpoint_counter["other_func"] += 1
nets[i].is_fixpoint = "other_func"
def changing_rate(x_new, x_old):
return x_new - x_old

154
main.py Normal file
View File

@ -0,0 +1,154 @@
from experiments import run_ST_experiment, run_SA_experiment, run_soup_experiment, run_mixed_experiment, \
run_robustness_experiment
import random
# TODO maybe add also SA to the soup
def run_experiments(run_ST, run_SA, run_soup, run_mixed, run_robustness):
if run_ST:
print(f"Running the ST experiment:")
run_ST_experiment(ST_population_size, ST_log_step_size, NET_INPUT_SIZE, ST_net_hidden_size, NET_OUT_SIZE,
ST_net_learning_rate,
ST_epochs, ST_runs, ST_runs_name, ST_name_hash)
if run_SA:
print(f"\n Running the SA experiment:")
run_SA_experiment(SA_population_size, SA_log_step_size, NET_INPUT_SIZE, SA_net_hidden_size, NET_OUT_SIZE,
SA_net_learning_rate, SA_runs, SA_runs_name, SA_name_hash,
SA_steps, SA_train_nets, SA_ST_steps)
if run_soup:
print(f"\n Running the soup experiment:")
run_soup_experiment(soup_population_size, soup_attack_chance, NET_INPUT_SIZE, soup_net_hidden_size,
NET_OUT_SIZE, soup_net_learning_rate, soup_epochs, soup_log_step_size, soup_runs, soup_runs_name,
soup_name_hash, soup_ST_steps, soup_train_nets)
if run_mixed:
print(f"\n Running the mixed experiment:")
run_mixed_experiment(mixed_population_size, NET_INPUT_SIZE, mixed_net_hidden_size, NET_OUT_SIZE,
mixed_net_learning_rate, mixed_train_nets, mixed_epochs, mixed_SA_steps,
mixed_ST_steps_between_SA, mixed_log_step_size, mixed_name_hash, mixed_total_runs, mixed_runs_name)
if run_robustness:
print(f"Running the robustness experiment:")
run_robustness_experiment(rob_population_size, rob_log_step_size, NET_INPUT_SIZE, rob_net_hidden_size,
NET_OUT_SIZE, rob_net_learning_rate, rob_ST_steps, rob_runs, rob_runs_name, rob_name_hash)
if not run_ST and not run_SA and not run_soup and not run_mixed and not run_robustness:
print(f"No experiments to be run.")
if __name__ == '__main__':
# Constants:
NET_INPUT_SIZE = 4
NET_OUT_SIZE = 1
""" ------------------------------------- Self-training (ST) experiment ------------------------------------- """
run_ST_experiment_bool = True
# Define number of runs & name:
ST_runs = 3
ST_runs_name = "test-27"
ST_epochs = 500
ST_log_step_size = 5
# Define number of networks & their architecture
ST_population_size = 10
ST_net_hidden_size = 2
ST_net_learning_rate = 0.04
ST_name_hash = random.getrandbits(32)
""" ----------------------------------- Self-application (SA) experiment ----------------------------------- """
run_SA_experiment_bool = True
# Define number of runs, name, etc.:
SA_runs_name = "test-17"
SA_runs = 2
SA_steps = 100
SA_app_batch_size = 5
SA_train_batch_size = 5
SA_log_step_size = 5
# Define number of networks & their architecture
SA_population_size = 10
SA_net_hidden_size = 2
SA_net_learning_rate = 0.04
# SA_train_nets has 3 possible values "no", "before_SA", "after_SA".
SA_train_nets = "no"
SA_ST_steps = 300
SA_name_hash = random.getrandbits(32)
""" -------------------------------------------- Soup experiment -------------------------------------------- """
run_soup_experiment_bool = True
# Define number of runs, name, etc.:
soup_runs = 1
soup_runs_name = "test-16"
soup_epochs = 100
soup_log_step_size = 5
soup_ST_steps = 20
# soup_SA_steps = 10
# Define number of networks & their architecture
soup_population_size = 5
soup_net_hidden_size = 2
soup_net_learning_rate = 0.04
# soup_attack_chance in %
soup_attack_chance = 10
# not used yet: soup_train_nets has 3 possible values "no", "before_SA", "after_SA".
soup_train_nets = "no"
soup_name_hash = random.getrandbits(32)
""" ------------------------------------------- Mixed experiment -------------------------------------------- """
run_mixed_experiment_bool = True
# Define number of runs, name, etc.:
mixed_runs_name = "test-17"
mixed_total_runs = 2
# Define number of networks & their architecture
mixed_population_size = 5
mixed_net_hidden_size = 2
mixed_epochs = 10
# Set the <batch_size> to the same value as <ST_steps_between_SA> to see the weights plotted
# ONLY after each epoch, and not after a certain amount of steps.
mixed_log_step_size = 5
mixed_ST_steps_between_SA = 50
mixed_SA_steps = 4
mixed_net_learning_rate = 0.04
# mixed_train_nets has 2 possible values "before_SA", "after_SA".
mixed_train_nets = "after_SA"
mixed_name_hash = random.getrandbits(32)
""" ----------------------------------------- Robustness experiment ----------------------------------------- """
run_robustness_bool = True
# Define number of runs & name:
rob_runs = 3
rob_runs_name = "test-07"
rob_ST_steps = 500
rob_log_step_size = 10
# Define number of networks & their architecture
rob_population_size = 6
rob_net_hidden_size = 2
rob_net_learning_rate = 0.04
rob_name_hash = random.getrandbits(32)
""" ---------------------------------------- Running the experiment ----------------------------------------- """
run_experiments(run_ST_experiment_bool, run_SA_experiment_bool, run_soup_experiment_bool, run_mixed_experiment_bool,
run_robustness_bool)

170
network.py Normal file
View File

@ -0,0 +1,170 @@
from __future__ import annotations
import copy
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from torch import optim, Tensor
class Net(nn.Module):
@staticmethod
def create_target_weights(input_weight_matrix: Tensor) -> Tensor:
""" Outputting a tensor with the target weights. """
target_weight_matrix = np.arange(len(input_weight_matrix)).reshape(len(input_weight_matrix), 1).astype("f")
for i in range(len(input_weight_matrix)):
target_weight_matrix[i] = input_weight_matrix[i][0]
return torch.from_numpy(target_weight_matrix)
@staticmethod
def are_weights_diverged(network_weights):
""" Testing if the weights are eiter converging to infinity or -infinity. """
for layer_id, layer in enumerate(network_weights):
for cell_id, cell in enumerate(layer):
for weight_id, weight in enumerate(cell):
if np.isnan(weight):
return True
if np.isinf(weight):
return True
return False
@staticmethod
def apply_weights(network: Net, new_weights: Tensor) -> Net:
""" Changing the weights of a network to new given values. """
i = 0
for layer_id, layer_name in enumerate(network.state_dict()):
for line_id, line_values in enumerate(network.state_dict()[layer_name]):
for weight_id, weight_value in enumerate(network.state_dict()[layer_name][line_id]):
network.state_dict()[layer_name][line_id][weight_id] = new_weights[i]
i += 1
return network
def __init__(self, i_size: int, h_size: int, o_size: int, name=None) -> None:
super().__init__()
self.name = name
self.input_size = i_size
self.hidden_size = h_size
self.out_size = o_size
self.no_weights = h_size * (i_size + h_size * (h_size - 1) + o_size)
""" Data saved in self.s_train_weights_history & self.s_application_weights_history is used for experiments. """
self.s_train_weights_history = []
self.s_application_weights_history = []
self.loss_history = []
self.trained = False
self.is_fixpoint = ""
self.fc1 = nn.Linear(i_size, h_size, False)
self.fc2 = nn.Linear(h_size, h_size, False)
self.fc3 = nn.Linear(h_size, o_size, False)
def forward(self, x):
x = self.fc1(x)
x = self.fc2(x)
x = self.fc3(x)
return x
def normalize(self, value):
""" Normalizing the values >= 1 and adding pow(10, -8) to the values equal to 0 """
if value >= 1:
return value/len(self.state_dict())
elif value == 0:
return pow(10, -8)
else:
return value
def input_weight_matrix(self) -> Tensor:
""" Calculating the input tensor formed from the weights of the net """
# The "4" represents the weightwise coordinates used for the matrix: <value><layer_id><cell_id><positional_id>
weight_matrix = np.arange(self.no_weights * 4).reshape(self.no_weights, 4).astype("f")
i = 0
for layer_id, layer_name in enumerate(self.state_dict()):
for line_id, line_values in enumerate(self.state_dict()[layer_name]):
for weight_id, weight_value in enumerate(self.state_dict()[layer_name][line_id]):
weight_matrix[i] = weight_value.item(), self.normalize(layer_id), self.normalize(weight_id), self.normalize(line_id)
i += 1
return torch.from_numpy(weight_matrix)
def self_train(self, training_steps: int, log_step_size: int, learning_rate: float, input_data: Tensor, target_data: Tensor) -> (np.ndarray, Tensor, list):
""" Training a network to predict its own weights in order to self-replicate. """
optimizer = optim.SGD(self.parameters(), lr=learning_rate, momentum=0.9)
self.trained = True
for training_step in range(training_steps):
output = self(input_data)
loss = F.mse_loss(output, target_data)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Saving the history of the weights after a certain amount of steps (aka log_step_size) for research.
# If it is a soup/mixed env. save weights only at the end of all training steps (aka a soup/mixed epoch)
if "soup" not in self.name and "mixed" not in self.name:
# If self-training steps are lower than 10, then append weight history after each ST step.
if training_steps < 10:
self.s_train_weights_history.append(output.T.detach().numpy())
self.loss_history.append(round(loss.detach().numpy().item(), 5))
else:
if training_step % log_step_size == 0:
self.s_train_weights_history.append(output.T.detach().numpy())
self.loss_history.append(round(loss.detach().numpy().item(), 5))
# Saving weights only at the end of a soup/mixed exp. epoch.
if "soup" in self.name or "mixed" in self.name:
self.s_train_weights_history.append(output.T.detach().numpy())
self.loss_history.append(round(loss.detach().numpy().item(), 5))
return output.detach().numpy(), loss, self.loss_history
def self_application(self, weights_matrix: Tensor, SA_steps: int, log_step_size: int) -> Net:
""" Inputting the weights of a network to itself for a number of steps, without backpropagation. """
data = copy.deepcopy(weights_matrix)
new_net = copy.deepcopy(self)
# output = new_net(data)
for i in range(SA_steps):
output = new_net(data)
# Saving the weights history after a certain amount of steps (aka log_step_size) for research purposes.
# If self-application steps are lower than 10, then append weight history after each SA step.
if SA_steps < 10:
self.s_application_weights_history.append(output.T.detach().numpy())
else:
if i % log_step_size == 0:
self.s_application_weights_history.append(output.T.detach().numpy())
""" See after how many steps of SA is the output not changing anymore: """
# print(f"Self-app. step {i+1}: {Experiment.changing_rate(output2, output)}")
for j in range(len(data)):
""" Constructing the weight matrix to have it as the next input. """
data[j][0] = output[j]
new_net = self.apply_weights(new_net, output)
return new_net
def attack(self, other_net: Net) -> Net:
other_net_weights = other_net.input_weight_matrix()
SA_steps = 1
log_step_size = 1
return self.self_application(other_net_weights, SA_steps, log_step_size)

238
visualization.py Normal file
View File

@ -0,0 +1,238 @@
from tokenize import String
from typing import List, Dict
import numpy
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
from sklearn.decomposition import PCA
import os.path
import random
import string
def plot_output(output):
""" Plotting the values of the final output """
plt.figure()
plt.imshow(output)
plt.colorbar()
plt.show()
def plot_loss(loss_array, directory_name, batch_size=1):
""" Plotting the evolution of the loss function."""
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(12)
for i in range(len(loss_array)):
plt.plot(loss_array[i], label=f"Last loss value: {str(loss_array[i][len(loss_array[i])-1])}")
plt.legend()
plt.xlabel("Epochs")
plt.ylabel("Loss")
filepath = f"A:/Bachelorarbeit_git/thesis_code/{directory_name}"
filename = f"{filepath}/_nets_loss_function.png"
plt.savefig(f"{filename}")
# plt.show()
plt.clf()
def bar_chart_fixpoints(fixpoint_counter: Dict, population_size: int, directory_name: String, learning_rate: float,
exp_details: String, source_check=None):
""" Plotting the number of fixpoints in a barchart. """
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(12)
legend_population_size = mpatches.Patch(color="white", label=f"No. of nets: {str(population_size)}")
learning_rate = mpatches.Patch(color="white", label=f"Learning rate: {str(learning_rate)}")
epochs = mpatches.Patch(color="white", label=f"{str(exp_details)}")
if source_check == "summary":
plt.legend(handles=[legend_population_size, learning_rate, epochs])
plt.ylabel("No. of nets/run")
plt.title("Summary: avg. amount of fixpoints/run")
else:
plt.legend(handles=[legend_population_size, learning_rate, epochs])
plt.ylabel("Number of networks")
plt.title("Fixpoint count")
plt.bar(range(len(fixpoint_counter)), list(fixpoint_counter.values()), align='center')
plt.xticks(range(len(fixpoint_counter)), list(fixpoint_counter.keys()))
filepath = f"A:/Bachelorarbeit_git/thesis_code/{directory_name}"
filename = f"{filepath}/{str(population_size)}_nets_fixpoints_barchart.png"
plt.savefig(f"{filename}")
plt.clf()
# plt.show()
def plot_3d(matrices_weights_history, folder_name, population_size, z_axis_legend, exp_name="experiment", is_trained="",
batch_size=1):
""" Plotting the the weights of the nets in a 3d form using principal component analysis (PCA) """
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(12)
pca = PCA(n_components=2, whiten=True)
ax = plt.axes(projection='3d')
loop_matrices_weights_history = tqdm(range(len(matrices_weights_history)))
for i in loop_matrices_weights_history:
loop_matrices_weights_history.set_description("Plotting weights 3D PCA %s" % i)
weight_matrix = matrices_weights_history[i]
weight_matrix = np.array(weight_matrix)
n, x, y = weight_matrix.shape
weight_matrix = weight_matrix.reshape(n, x * y)
pca.fit(weight_matrix)
weight_matrix_pca = pca.transform(weight_matrix)
xdata, ydata = [], []
for j in range(len(weight_matrix_pca)):
xdata.append(weight_matrix_pca[j][0])
ydata.append(weight_matrix_pca[j][1])
zdata = np.arange(1, len(ydata)*batch_size+1, batch_size).tolist()
ax.plot3D(xdata, ydata, zdata)
ax.scatter(np.array(xdata), np.array(ydata), np.array(zdata), s=7)
steps = mpatches.Patch(color="white", label=f"{z_axis_legend}: {len(matrices_weights_history)} steps")
population_size = mpatches.Patch(color="white", label=f"Population: {population_size} networks")
if z_axis_legend == "Self-application":
trained = mpatches.Patch(color="white", label=f"Trained: true") if is_trained == "_trained" else mpatches.Patch(color="white", label=f"Trained: false")
ax.legend(handles=[steps, population_size, trained])
else:
ax.legend(handles=[steps, population_size])
ax.set_title(f"PCA Weights history")
ax.set_xlabel("PCA X")
ax.set_ylabel("PCA Y")
ax.set_zlabel(f"Epochs")
filepath = f"A:/Bachelorarbeit_git/thesis_code/{folder_name}"
filename = f"{filepath}/{exp_name}{is_trained}.png"
if os.path.isfile(filename):
letters = string.ascii_lowercase
random_letters = ''.join(random.choice(letters) for _ in range(5))
plt.savefig(f"{filename}_{random_letters}")
else:
plt.savefig(f"{filename}")
# plt.show()
plt.clf()
def plot_3d_self_train(nets_array: List, exp_name: String, directory_name: String, batch_size: int):
""" Plotting the evolution of the weights in a 3D space when doing self training. """
matrices_weights_history = []
loop_nets_array = tqdm(range(len(nets_array)))
for i in loop_nets_array:
loop_nets_array.set_description("Creating ST weights history %s" % i)
matrices_weights_history.append(nets_array[i].s_train_weights_history)
z_axis_legend = "epochs"
return plot_3d(matrices_weights_history, directory_name, len(nets_array), z_axis_legend, exp_name, "", batch_size)
def plot_3d_self_application(nets_array: List, exp_name: String, directory_name: String, batch_size: int) -> None:
""" Plotting the evolution of the weights in a 3D space when doing self application. """
matrices_weights_history = []
loop_nets_array = tqdm(range(len(nets_array)))
for i in loop_nets_array:
loop_nets_array.set_description("Creating SA weights history %s" % i)
matrices_weights_history.append(nets_array[i].s_application_weights_history)
if nets_array[i].trained:
is_trained = "_trained"
else:
is_trained = "_not_trained"
z_axis_legend = "epochs"
plot_3d(matrices_weights_history, directory_name, len(nets_array), z_axis_legend, exp_name, is_trained, batch_size)
def plot_3d_soup(nets_list, exp_name, directory_name):
""" Plotting the evolution of the weights in a 3D space for the soup environment. """
# This batch size is not relevant for soups. To not affect the number of epochs shown in the 3D plot,
# will send forward the number "1" for batch size with the variable <irrelevant_batch_size>.
irrelevant_batch_size = 1
plot_3d_self_train(nets_list, exp_name, directory_name, irrelevant_batch_size)
def line_chart_fixpoints(fixpoint_counters_history: list, epochs: int, ST_steps_between_SA: int,
SA_steps, directory_name: String, population_size: int):
""" Plotting the percentage of fixpoints after each iteration of SA & ST steps. """
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(12)
ST_steps_per_SA = np.arange(0, ST_steps_between_SA * epochs, ST_steps_between_SA).tolist()
legend_population_size = mpatches.Patch(color="white", label=f"No. of nets: {str(population_size)}")
legend_SA_steps = mpatches.Patch(color="white", label=f"SA_steps: {str(SA_steps)}")
legend_SA_and_ST_runs = mpatches.Patch(color="white", label=f"SA_and_ST_runs: {str(epochs)}")
legend_ST_steps_between_SA = mpatches.Patch(color="white", label=f"ST_steps_between_SA: {str(ST_steps_between_SA)}")
plt.legend(handles=[legend_population_size, legend_SA_and_ST_runs, legend_SA_steps, legend_ST_steps_between_SA])
plt.xlabel("Epochs")
plt.ylabel("Percentage")
plt.title("Percentage of fixpoints")
plt.plot(ST_steps_per_SA, fixpoint_counters_history, color="green", marker="o")
filepath = f"A:/Bachelorarbeit_git/thesis_code/{directory_name}"
filename = f"{filepath}/{str(population_size)}_nets_fixpoints_linechart.png"
plt.savefig(f"{filename}")
plt.clf()
# plt.show()
def box_plot(data, directory_name, population_size):
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(10, 7))
# ax = fig.add_axes([0, 0, 1, 1])
plt.title("Fixpoint variation")
plt.xlabel("Amount of noise")
plt.ylabel("Steps")
# data = numpy.array(data)
# ax.boxplot(data)
axs[1].boxplot(data)
axs[1].set_title('Box plot')
filepath = f"A:/Bachelorarbeit_git/thesis_code/{directory_name}"
filename = f"{filepath}/{str(population_size)}_nets_fixpoints_barchart.png"
plt.savefig(f"{filename}")
# plt.show()
plt.clf()
def write_file(text, directory_name):
filepath = f"A:/Bachelorarbeit_git/thesis_code/{directory_name}"
f = open(f"{filepath}/experiment.txt", "w+")
f.write(text)
f.close()