Compare commits

...

49 Commits

Author SHA1 Message Date
Steffen Illium
1b7581e656 MetaNetworks Debugged II 2022-02-01 18:17:11 +01:00
Steffen Illium
246d825bb4 MetaNetworks Debugged 2022-01-31 10:35:11 +01:00
Steffen Illium
49c0d8a621 MetaNetworks 2022-01-26 16:56:05 +01:00
Steffen Illium
5f1f5833d8 Journal TEx Text 2022-01-21 17:28:45 +01:00
Steffen Illium
21dd572969 Merge remote-tracking branch 'origin/journal' into journal
# Conflicts:
#	journal_basins.py
2021-09-13 16:06:03 +02:00
Steffen Illium
5f6c658068 Journal TEx Text 2021-09-13 16:04:48 +02:00
Maximilian Zorn
e51d7ad0b9 Added parent-vs-children plot, changed x-tick labels to base10 notation. 2021-09-07 18:15:06 +02:00
Steffen Illium
6c1a964f31 Second order function 2021-08-24 10:35:29 +02:00
steffen-illium
b22a7ac427 in between plots 2021-06-28 10:51:21 +02:00
steffen-illium
6c2d544f7c logspace.... 2021-06-25 17:46:45 +02:00
steffen-illium
5a7dad2363 Merge remote-tracking branch 'origin/journal' into journal
# Conflicts:
#	journal_soup_basins.py
2021-06-25 10:29:39 +02:00
steffen-illium
14d9a533cb journal linspace basins 2021-06-25 10:25:25 +02:00
ru43zex
f7a0d360b3 updated journal_soup_basin: all working, only orange lines not showing 2021-06-24 16:48:52 +03:00
Maximilian Zorn
cf6eec639f Fixed exp pickle save. Added exp results. Fixed soup plot issue. 2021-06-20 17:09:17 +02:00
Maximilian Zorn
1da5bd95d6 Added before-after-plot. 2021-06-17 15:53:48 +02:00
Maximilian Zorn
b40b534d5b Small fix for PCA plotting in linspace. 2021-06-15 17:18:40 +02:00
steffen-illium
27d763f1fb journal linspace basins 2021-06-15 14:11:40 +02:00
steffen-illium
0ba109c083 journal linspace basins 2021-06-14 11:55:11 +02:00
steffen-illium
e156540e2c readme update 2021-06-11 14:41:35 +02:00
steffen-illium
0e2289344a robustness 2021-06-11 14:38:22 +02:00
ru43zex
987d7b95f3 debugged journal_soup_basin experiment 2021-06-07 11:12:59 +03:00
ru43zex
7e231b5b50 Merge branch 'journal' of gitlab.lrz.de:mobile-ifi/bannana-networks into journal 2021-06-05 17:50:45 +03:00
ru43zex
2077d800ae fixed soup_basin experiment 2021-06-05 17:44:37 +03:00
steffen-illium
b57d3d32fd readme updated 2021-06-04 15:01:16 +02:00
steffen-illium
61ae8c2ee5 robustness fixed 2021-06-04 14:13:38 +02:00
steffen-illium
800a2c8f6b Merge remote-tracking branch 'origin/journal' into journal
# Conflicts:
#	journal_basins.py
2021-06-04 14:03:50 +02:00
steffen-illium
9abde030af robustness cleaned 2021-06-04 14:03:03 +02:00
ru43zex
0320957b85 implements basin experiments for soup 2021-06-02 02:49:18 +03:00
Maximilian Zorn
32ebb729e8 Added plot variations for basin exp. 2021-05-27 16:02:41 +02:00
steffen-illium
c9efe0a31b journal_robustness.py redone, now is sensitive to seeds and plots 2021-05-25 17:07:24 +02:00
steffen-illium
5e5511caf8 journal_robustness.py redone, now is sensitive to seeds and plots 2021-05-23 15:49:48 +02:00
steffen-illium
55bdd706b6 journal_robustness.py redone, now is sensitive to seeds and plots 2021-05-23 13:46:21 +02:00
steffen-illium
74d618774a application losses II 2021-05-23 10:36:22 +02:00
steffen-illium
54590eb147 application losses 2021-05-23 10:33:54 +02:00
Maximilian Zorn
b1dc574f5b Added distinction time-as-fixpoint and time-to-vergence to be tracked. 2021-05-22 14:43:20 +02:00
Maximilian Zorn
e9f6620b60 Robustness test with synthetic and natural fixpoints. Should now work as
intended. Noise gets added to weights instead of input.
2021-05-22 14:27:45 +02:00
Maximilian Zorn
bcfe5807a7 Added pickle save() function for SpawnExperiment, updated README, set
plot-pca-all false on default, just True for SpawnExp for now.
2021-05-21 16:37:27 +02:00
Maximilian Zorn
1e8ccd2b8b PCA now fit and transform over all trajectories. Networks now have
start-time properties for that to control where plotting starts. Added distance from
parent table/matrix.
2021-05-21 15:28:09 +02:00
Maximilian Zorn
f5ca3d1115 save weights instead of outputs 2021-05-17 09:21:01 +02:00
steffen-illium
c1f58f2675 All Experiments debugged
ToDo:
 - convert strings in pathlib.Path objects
 - check usage of fixpoint tests
2021-05-16 15:34:43 +02:00
steffen-illium
36377ee27d functionalities_test.py updated 2021-05-16 14:51:21 +02:00
steffen-illium
b1472479cb journal_basins.py debugged II
Questions for functionalities_test.py
corrected some fixes
Redo and implementation of everything path related now using pathlib.Path
2021-05-16 13:35:38 +02:00
steffen-illium
042188f15a journal_basins.py debugged 2021-05-16 11:30:34 +02:00
steffen-illium
5074100b71 Merge remote-tracking branch 'origin/journal' into journal 2021-05-14 17:58:01 +02:00
steffen-illium
4b5c36f6c0 - rearanged experiments.py into single runable files
- Reformated net.self_x functions (sa, st)
- corrected robustness_exp.py
- NO DEBUGGING DONE!!!!!
2021-05-14 17:57:44 +02:00
Cristian Lenta
56ea007f2b numbering of the points for easy addressing 2021-05-14 14:00:55 +00:00
Maximilian Zorn
22d34d4e75 some work on the new journal experiments with cristions code 2021-05-14 09:13:42 +02:00
Cristian Lenta
9bf37486a0 Update README.md 2021-05-03 06:50:33 +00:00
Cristian Lenta
e176d05cf5 uploaded my code (not yet 100% finished) 2021-05-03 06:43:53 +00:00
22 changed files with 3646 additions and 1 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/output/

View File

@ -1,2 +1,52 @@
# cristian_lenta - BA code
# self-rep NN paper - ALIFE journal edition
- [x] Plateau / Pillar sizeWhat does happen to the fixpoints after noise introduction and retraining?Options beeing: Same Fixpoint, Similar Fixpoint (Basin),
- Different Fixpoint?
Yes, we did not found same (10-5)
- Do they do the clustering thingy?
Kind of: Small movement towards (MIM-Distance getting smaller) parent fixpoint.
Small movement for everyone? -> Distribution
- see `journal_basins.py` for the "train -> spawn with noise -> train again and see where they end up" functionality. Apply noise follows the `vary` function that was used in the paper robustness test with `+- prng() * eps`. Change if desired.
- there is also a distance matrix for all-to-all particle comparisons (with distance parameter one of: `MSE`, `MAE` (mean absolute error = mean manhattan) and `MIM` (mean position invariant manhattan))
- [ ] Same Thing with Soup interaction. We would expect the same behaviour...Influence of interaction with near and far away particles.
-
-
- [x] Robustness test with a trained NetworkTraining for high quality fixpoints, compare with the "perfect" fixpoint. Average Loss per application step
- see `journal_robustness.py` for robustness test modeled after cristians robustness-exp (with the exeption that we put noise on the weights). Has `synthetic` bool to switch to hand-modeled perfect fixpoint instead of naturally trained ones.
- Also added two difference between the "time-as-fixpoint" and "time-to-verge" (i.e. to divergence / zero).
- We might need to consult about the "average loss per application step", as I think application loss get gradually higher the worse the weights get. So the average might not tell us much here.
- [x] Adjust Self Training so that it favors second order fixpoints-> Second order test implementation (?)
- [x] Barplot over clones -> how many become a fixpoint cs how many diverge per noise level
- [x] Box-Plot of Avg. Distance of clones from parent
- [x] Search subspace between two fixpoints by linage(10**-5), check were they end up
- [x] How are basins / "attractor areas" shaped?
# Future Todos:
- [ ] Find a statistik over weight space that provides a better init function
- [ ] Test this init function on a mnist classifier - just for the lolz
---
## Notes:
- In the spawn-experiment we now fit and transform the PCA over *ALL* trajectories, instead of each net-history by its own. This can be toggled by the `plot_pca_together` parameter in `visualisation.py/plot_3d_self_train() & plot_3d()` (default: `False` but set `True` in the spawn-experiment class).
- I have also added a `start_time` property for the nets (default: `1`). This is intended to be set flexibly for e.g., clones (when they are spawned midway through the experiment), such that the PCA can start the plotting trace from this timestep. When we spawn clones we deepcopy their parent's saved weight_history too, so that the PCA transforms same lenght trajectories. With `plot_pca_together` that means that clones and their parents will literally be plotted perfectly overlayed on top, up until the spawn-time, where you can see the offset / noise we apply. By setting the start_time, you can avoid this overlap and avoid hiding the parent's trace color which gets plotted first (because the parent is always added to self.nets first). **But more importantly, you can effectively zoom into the plot, by setting the parents start-time to just shy of the end of first epoch (where they get checked on fixpoint-property and spawn clones) and the start-times of clones to the second epoch. This will make the plot begin at spawn time, cutting off the parents initial trajectory and zoom-in to the action (see. `journal_basins.py/spawn_and_continue()`).**
- Now saving the whole experiment class as pickle dump (`experiment_pickle.p`, just like cristian), hope thats fine.
- Added a `requirement.txt` for quick venv / pip -r installs. Append as necessary.

6
experiments/__init__.py Normal file
View File

@ -0,0 +1,6 @@
from .mixed_setting_exp import run_mixed_experiment
from .robustness_exp import run_robustness_experiment
from .self_application_exp import run_SA_experiment
from .self_train_exp import run_ST_experiment
from .soup_exp import run_soup_experiment
import functionalities_test

59
experiments/helpers.py Normal file
View File

@ -0,0 +1,59 @@
""" -------------------------------- Methods for summarizing the experiments --------------------------------- """
from pathlib import Path
from visualization import line_chart_fixpoints, bar_chart_fixpoints
def summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory,
summary_pre_title):
avg_fixpoint_counters = {
"avg_identity_func": 0,
"avg_divergent": 0,
"avg_fix_zero": 0,
"avg_fix_weak": 0,
"avg_fix_sec": 0,
"avg_other_func": 0
}
for i in range(len(experiments)):
fixpoint_counters = experiments[i].fixpoint_counters
avg_fixpoint_counters["avg_identity_func"] += fixpoint_counters["identity_func"]
avg_fixpoint_counters["avg_divergent"] += fixpoint_counters["divergent"]
avg_fixpoint_counters["avg_fix_zero"] += fixpoint_counters["fix_zero"]
avg_fixpoint_counters["avg_fix_weak"] += fixpoint_counters["fix_weak"]
avg_fixpoint_counters["avg_fix_sec"] += fixpoint_counters["fix_sec"]
avg_fixpoint_counters["avg_other_func"] += fixpoint_counters["other_func"]
# Calculating the average for each fixpoint
avg_fixpoint_counters.update((x, y / len(experiments)) for x, y in avg_fixpoint_counters.items())
# Checking where the data is coming from to have a relevant title in the plot.
if summary_pre_title not in ["ST", "SA", "soup", "mixed", "robustness"]:
summary_pre_title = ""
# Plotting the summary
source_checker = "summary"
exp_details = f"{summary_pre_title}: {runs} runs & {epochs} epochs each."
bar_chart_fixpoints(avg_fixpoint_counters, population_size, directory, net_learning_rate, exp_details,
source_checker)
def summary_fixpoint_percentage(runs, epochs, fixpoints_percentages, ST_steps, SA_steps, directory_name,
population_size):
fixpoints_percentages = [round(fixpoints_percentages[i] / runs, 1) for i in range(len(fixpoints_percentages))]
# Plotting summary
if "soup" in directory_name:
line_chart_fixpoints(fixpoints_percentages, epochs / ST_steps, ST_steps, SA_steps, directory_name,
population_size)
else:
line_chart_fixpoints(fixpoints_percentages, epochs, ST_steps, SA_steps, directory_name, population_size)
""" -------------------------------------------- Miscellaneous --------------------------------------------------- """
def check_folder(experiment_folder: str):
exp_path = Path('experiments') / experiment_folder
exp_path.mkdir(parents=True, exist_ok=True)

View File

@ -0,0 +1,271 @@
import pickle
from collections import defaultdict
from pathlib import Path
import sys
import platform
import pandas as pd
import torchmetrics
import numpy as np
import torch
from matplotlib import pyplot as plt
import seaborn as sns
from torch import nn
from torch.nn import Flatten
from torch.utils.data import Dataset, DataLoader
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor, Compose, Resize
from tqdm import tqdm
if platform.node() == 'CarbonX':
debug = True
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
print("@ Warning, Debugging Config@!!!!!! @")
print("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@")
else:
debug = False
try:
# noinspection PyUnboundLocalVariable
if __package__ is None:
DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(DIR.parent))
__package__ = DIR.name
else:
DIR = None
except NameError:
DIR = None
pass
from network import MetaNet
from functionalities_test import test_for_fixpoints
WORKER = 10 if not debug else 2
BATCHSIZE = 500 if not debug else 50
EPOCH = 100 if not debug else 3
VALIDATION_FRQ = 5 if not debug else 1
SELF_TRAIN_FRQ = 1 if not debug else 1
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if debug:
torch.autograd.set_detect_anomaly(True)
class ToFloat:
def __call__(self, x):
return x.to(torch.float32)
class AddTaskDataset(Dataset):
def __init__(self, length=int(5e5)):
super().__init__()
self.length = length
self.prng = np.random.default_rng()
def __len__(self):
return self.length
def __getitem__(self, _):
ab = self.prng.normal(size=(2,)).astype(np.float32)
return ab, ab.sum(axis=-1, keepdims=True)
def set_checkpoint(model, out_path, epoch_n, final_model=False):
epoch_n = str(epoch_n)
if not final_model:
ckpt_path = Path(out_path) / 'ckpt' / f'{epoch_n.zfill(4)}_model_ckpt.tp'
else:
ckpt_path = Path(out_path) / f'trained_model_ckpt_e{epoch_n}.tp'
ckpt_path.parent.mkdir(exist_ok=True, parents=True)
torch.save(model, ckpt_path, pickle_protocol=pickle.HIGHEST_PROTOCOL)
return ckpt_path
def validate(checkpoint_path, ratio=0.1):
checkpoint_path = Path(checkpoint_path)
import torchmetrics
# initialize metric
validmetric = torchmetrics.Accuracy()
ut = Compose([ToTensor(), ToFloat(), Resize((15, 15)), Flatten(start_dim=0)])
try:
datas = MNIST(str(data_path), transform=ut, train=False)
except RuntimeError:
datas = MNIST(str(data_path), transform=ut, train=False, download=True)
valid_d = DataLoader(datas, batch_size=BATCHSIZE, shuffle=True, drop_last=True, num_workers=WORKER)
model = torch.load(checkpoint_path, map_location=DEVICE).eval()
n_samples = int(len(valid_d) * ratio)
with tqdm(total=n_samples, desc='Validation Run: ') as pbar:
for idx, (valid_batch_x, valid_batch_y) in enumerate(valid_d):
valid_batch_x, valid_batch_y = valid_batch_x.to(DEVICE), valid_batch_y.to(DEVICE)
y_valid = model(valid_batch_x)
# metric on current batch
acc = validmetric(y_valid.cpu(), valid_batch_y.cpu())
pbar.set_postfix_str(f'Acc: {acc}')
pbar.update()
if idx == n_samples:
break
# metric on all batches using custom accumulation
acc = validmetric.compute()
tqdm.write(f"Avg. accuracy on all data: {acc}")
return acc
def new_train_storage_df():
return pd.DataFrame(columns=['Epoch', 'Batch', 'Metric', 'Score'])
def checkpoint_and_validate(model, out_path, epoch_n, final_model=False):
out_path = Path(out_path)
ckpt_path = set_checkpoint(model, out_path, epoch_n, final_model=final_model)
result = validate(ckpt_path)
return result
def plot_training_result(path_to_dataframe):
# load from Drive
df = pd.read_csv(path_to_dataframe, index_col=0)
# Set up figure
fig, ax1 = plt.subplots() # initializes figure and plots
ax2 = ax1.twinx() # applies twinx to ax2, which is the second y-axis.
# plots the first set of data
data = df[(df['Metric'] == 'Task Loss') | (df['Metric'] == 'Self Train Loss')].groupby(['Epoch', 'Metric']).mean()
palette = sns.color_palette()[0:data.reset_index()['Metric'].unique().shape[0]]
sns.lineplot(data=data.groupby(['Epoch', 'Metric']).mean(), x='Epoch', y='Score', hue='Metric',
palette=palette, ax=ax1)
# plots the second set of data
data = df[(df['Metric'] == 'Test Accuracy') | (df['Metric'] == 'Train Accuracy')]
palette = sns.color_palette()[len(palette):data.reset_index()['Metric'].unique().shape[0] + len(palette)]
sns.lineplot(data=data, x='Epoch', y='Score', marker='o', hue='Metric', palette=palette)
ax1.set(yscale='log', ylabel='Losses')
ax1.set_title('Training Lineplot')
ax2.set(ylabel='Accuracy')
fig.legend(loc="center right", title='Metric', bbox_to_anchor=(0.85, 0.5))
ax1.get_legend().remove()
ax2.get_legend().remove()
plt.tight_layout()
if debug:
plt.show()
else:
plt.savefig(Path(path_to_dataframe.parent / 'training_lineplot.png'), dpi=300)
if __name__ == '__main__':
self_train = False
training = False
plotting = False
particle_analysis = True
as_sparse_network_test = True
data_path = Path('data')
data_path.mkdir(exist_ok=True, parents=True)
run_path = Path('output') / 'mnist_self_train_100_NEW_STYLE'
model_path = run_path / '0000_trained_model.zip'
df_store_path = run_path / 'train_store.csv'
if training:
utility_transforms = Compose([ToTensor(), ToFloat(), Resize((15, 15)), Flatten(start_dim=0)])
try:
dataset = MNIST(str(data_path), transform=utility_transforms)
except RuntimeError:
dataset = MNIST(str(data_path), transform=utility_transforms, download=True)
d = DataLoader(dataset, batch_size=BATCHSIZE, shuffle=True, drop_last=True, num_workers=WORKER)
interface = np.prod(dataset[0][0].shape)
metanet = MetaNet(interface, depth=4, width=6, out=10).to(DEVICE).train()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(metanet.parameters(), lr=0.004, momentum=0.9)
train_store = new_train_storage_df()
for epoch in tqdm(range(EPOCH), desc='MetaNet Train - Epochs'):
is_validation_epoch = epoch % VALIDATION_FRQ == 0 if not debug else True
is_self_train_epoch = epoch % SELF_TRAIN_FRQ == 0 if not debug else True
if is_validation_epoch:
metric = torchmetrics.Accuracy()
else:
metric = None
for batch, (batch_x, batch_y) in tqdm(enumerate(d), total=len(d), desc='MetaNet Train - Batch'):
if self_train and is_self_train_epoch:
self_train_loss = metanet.combined_self_train(optimizer)
step_log = dict(Epoch=epoch, Batch=batch, Metric='Self Train Loss', Score=self_train_loss.item())
train_store.loc[train_store.shape[0]] = step_log
# Zero your gradients for every batch!
optimizer.zero_grad()
batch_x, batch_y = batch_x.to(DEVICE), batch_y.to(DEVICE)
y = metanet(batch_x)
# loss = loss_fn(y, batch_y.unsqueeze(-1).to(torch.float32))
loss = loss_fn(y, batch_y.to(torch.long))
loss.backward()
# Adjust learning weights
optimizer.step()
step_log = dict(Epoch=epoch, Batch=batch,
Metric='Task Loss', Score=loss.item())
train_store.loc[train_store.shape[0]] = step_log
if is_validation_epoch:
metric(y.cpu(), batch_y.cpu())
if batch >= 3 and debug:
break
if is_validation_epoch:
validation_log = dict(Epoch=int(epoch), Batch=BATCHSIZE,
Metric='Train Accuracy', Score=metric.compute().item())
train_store.loc[train_store.shape[0]] = validation_log
accuracy = checkpoint_and_validate(metanet, run_path, epoch)
validation_log = dict(Epoch=int(epoch), Batch=BATCHSIZE,
Metric='Test Accuracy', Score=accuracy.item())
train_store.loc[train_store.shape[0]] = validation_log
if particle_analysis:
counter_dict = defaultdict(lambda: 0)
# This returns ID-functions
_ = test_for_fixpoints(counter_dict, list(metanet.particles))
for key, value in dict(counter_dict).items():
step_log = dict(Epoch=int(epoch), Batch=BATCHSIZE, Metric=key, Score=value)
train_store.loc[train_store.shape[0]] = step_log
train_store.to_csv(df_store_path, mode='a', header=not df_store_path.exists())
train_store = new_train_storage_df()
accuracy = checkpoint_and_validate(metanet, run_path, EPOCH, final_model=True)
validation_log = dict(Epoch=EPOCH, Batch=BATCHSIZE,
Metric='Test Accuracy', Score=accuracy.item())
train_store.loc[train_store.shape[0]] = validation_log
train_store.to_csv(df_store_path)
if plotting:
plot_training_result(df_store_path)
if particle_analysis:
model_path = next(run_path.glob('*ckpt.tp'))
latest_model = torch.load(model_path, map_location=DEVICE).eval()
counter_dict = defaultdict(lambda: 0)
_ = test_for_fixpoints(counter_dict, list(latest_model.particles))
tqdm.write(str(dict(counter_dict)))
zero_ident = torch.load(model_path, map_location=DEVICE).eval().replace_with_zero('identity_func')
zero_other = torch.load(model_path, map_location=DEVICE).eval().replace_with_zero('other_func')
if as_sparse_network_test:
acc_pre = validate(model_path, ratio=1)
ident_ckpt = set_checkpoint(zero_ident, model_path.parent, -1, final_model=True)
ident_acc_post = validate(ident_ckpt, ratio=1)
tqdm.write(f'Zero_ident diff = {abs(ident_acc_post-acc_pre)}')
other_ckpt = set_checkpoint(zero_other, model_path.parent, -2, final_model=True)
other_acc_post = validate(other_ckpt, ratio=1)
tqdm.write(f'Zero_other diff = {abs(other_acc_post - acc_pre)}')

View File

@ -0,0 +1,177 @@
import os.path
import pickle
from tqdm import tqdm
from experiments.helpers import check_folder, summary_fixpoint_experiment, summary_fixpoint_percentage
from functionalities_test import test_for_fixpoints
from network import Net
from visualization import plot_loss, bar_chart_fixpoints, line_chart_fixpoints
from visualization import plot_3d_self_train
class MixedSettingExperiment:
def __init__(self, population_size, net_i_size, net_h_size, net_o_size, learning_rate, train_nets,
epochs, SA_steps, ST_steps_between_SA, log_step_size, directory_name):
super().__init__()
self.population_size = population_size
self.net_input_size = net_i_size
self.net_hidden_size = net_h_size
self.net_out_size = net_o_size
self.net_learning_rate = learning_rate
self.train_nets = train_nets
self.epochs = epochs
self.SA_steps = SA_steps
self.ST_steps_between_SA = ST_steps_between_SA
self.log_step_size = log_step_size
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.loss_history = []
self.fixpoint_counters_history = []
self.directory_name = directory_name
os.mkdir(self.directory_name)
self.nets = []
self.populate_environment()
self.fixpoint_percentage()
self.weights_evolution_3d_experiment()
self.count_fixpoints()
self.visualize_loss()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating mixed experiment %s" % i)
net_name = f"mixed_net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
self.nets.append(net)
loop_epochs = tqdm(range(self.epochs))
for j in loop_epochs:
loop_epochs.set_description("Running mixed experiment %s" % j)
for i in loop_population_size:
net = self.nets[i]
if self.train_nets == "before_SA":
for _ in range(self.ST_steps_between_SA):
net.self_train(1, self.log_step_size, self.net_learning_rate)
net.self_application(self.SA_steps, self.log_step_size)
elif self.train_nets == "after_SA":
net.self_application(self.SA_steps, self.log_step_size)
for _ in range(self.ST_steps_between_SA):
net.self_train(1, self.log_step_size, self.net_learning_rate)
print(
f"\nLast weight matrix (epoch: {j}):\n{net.input_weight_matrix()}\nLossHistory: {net.loss_history[-10:]}")
test_for_fixpoints(self.fixpoint_counters, self.nets)
# Rounding the result not to run into other problems later regarding the exact representation of floating number
fixpoints_percentage = round((self.fixpoint_counters["fix_zero"] + self.fixpoint_counters[
"fix_sec"]) / self.population_size, 1)
self.fixpoint_counters_history.append(fixpoints_percentage)
# Resetting the fixpoint counter. Last iteration not to be reset - it is important for the bar_chart_fixpoints().
if j < self.epochs:
self.reset_fixpoint_counters()
def weights_evolution_3d_experiment(self):
exp_name = f"Mixed {str(len(self.nets))}"
# This batch size is not relevant for mixed settings because during an epoch there are more steps of SA & ST happening
# and only they need the batch size. To not affect the number of epochs shown in the 3D plot, will send
# forward the number "1" for batch size with the variable <irrelevant_batch_size>
irrelevant_batch_size = 1
plot_3d_self_train(self.nets, exp_name, self.directory_name, irrelevant_batch_size, True)
def count_fixpoints(self):
exp_details = f"SA steps: {self.SA_steps}; ST steps: {self.ST_steps_between_SA}"
test_for_fixpoints(self.fixpoint_counters, self.nets)
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
def fixpoint_percentage(self):
line_chart_fixpoints(self.fixpoint_counters_history, self.epochs, self.ST_steps_between_SA,
self.SA_steps, self.directory_name, self.population_size)
def visualize_loss(self):
for i in range(len(self.nets)):
net_loss_history = self.nets[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory_name)
def reset_fixpoint_counters(self):
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
def run_mixed_experiment(population_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate, train_nets,
epochs, SA_steps, ST_steps_between_SA, batch_size, name_hash, runs, run_name):
experiments = {}
fixpoints_percentages = []
check_folder("mixed")
# Running the experiments
for i in range(runs):
directory_name = f"experiments/mixed/{run_name}_run_{i}_{str(population_size)}_nets_{SA_steps}_SA_{ST_steps_between_SA}_ST_{str(name_hash)}"
mixed_experiment = MixedSettingExperiment(
population_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
train_nets,
epochs,
SA_steps,
ST_steps_between_SA,
batch_size,
directory_name
)
pickle.dump(mixed_experiment, open(f"{directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = mixed_experiment
# Building history of fixpoint percentages for summary
fixpoint_counters_history = mixed_experiment.fixpoint_counters_history
if not fixpoints_percentages:
fixpoints_percentages = mixed_experiment.fixpoint_counters_history
else:
# Using list comprehension to make the sum of all the percentages
fixpoints_percentages = [fixpoints_percentages[i] + fixpoint_counters_history[i] for i in
range(len(fixpoints_percentages))]
# Building a summary of all the runs
directory_name = f"experiments/mixed/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{str(name_hash)}"
os.mkdir(directory_name)
summary_pre_title = "mixed"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory_name,
summary_pre_title)
summary_fixpoint_percentage(runs, epochs, fixpoints_percentages, ST_steps_between_SA, SA_steps, directory_name,
population_size)
if __name__ == '__main__':
raise NotImplementedError('Test this here!!!')

View File

@ -0,0 +1,151 @@
import copy
import os.path
import pickle
import random
from tqdm import tqdm
from experiments.helpers import check_folder, summary_fixpoint_experiment
from functionalities_test import test_for_fixpoints, is_identity_function
from network import Net
from visualization import bar_chart_fixpoints, box_plot, write_file
def add_noise(input_data, epsilon=pow(10, -5)):
output = copy.deepcopy(input_data)
for k in range(len(input_data)):
output[k][0] += random.random() * epsilon
return output
class RobustnessExperiment:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
ST_steps, directory_name) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.ST_steps = ST_steps
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.id_functions = []
self.directory_name = directory_name
os.mkdir(self.directory_name)
self.nets = []
# Create population:
self.populate_environment()
print("Nets:\n", self.nets)
self.count_fixpoints()
[print(net.is_fixpoint) for net in self.nets]
self.test_robustness()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating robustness experiment %s" % i)
net_name = f"net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
for _ in range(self.ST_steps):
net.self_train(1, self.log_step_size, self.net_learning_rate)
self.nets.append(net)
def test_robustness(self):
# test_for_fixpoints(self.fixpoint_counters, self.nets, self.id_functions)
zero_epsilon = pow(10, -5)
data = [[0 for _ in range(10)] for _ in range(len(self.id_functions))]
for i in range(len(self.id_functions)):
for j in range(10):
original_net = self.id_functions[i]
# Creating a clone of the network. Not by copying it, but by creating a completely new network
# and changing its weights to the original ones.
original_net_clone = Net(original_net.input_size, original_net.hidden_size, original_net.out_size,
original_net.name)
# Extra safety for the value of the weights
original_net_clone.load_state_dict(copy.deepcopy(original_net.state_dict()))
noisy_weights = add_noise(original_net_clone.input_weight_matrix(), epsilon=pow(10, -j))
original_net_clone.apply_weights(noisy_weights)
# Testing if the new net is still an identity function after applying noise
still_id_func = is_identity_function(original_net_clone, zero_epsilon)
# If the net is still an id. func. after applying the first run of noise, continue to apply it until otherwise
while still_id_func and data[i][j] <= 1000:
data[i][j] += 1
original_net_clone = original_net_clone.self_application(1, self.log_step_size)
still_id_func = is_identity_function(original_net_clone, zero_epsilon)
print(f"Data {data}")
if data.count(0) == 10:
print(f"There is no network resisting the robustness test.")
text = f"For this population of \n {self.population_size} networks \n there is no" \
f" network resisting the robustness test."
write_file(text, self.directory_name)
else:
box_plot(data, self.directory_name, self.population_size)
def count_fixpoints(self):
exp_details = f"ST steps: {self.ST_steps}"
self.id_functions = test_for_fixpoints(self.fixpoint_counters, self.nets)
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
def run_robustness_experiment(population_size, batch_size, net_input_size, net_hidden_size, net_out_size,
net_learning_rate, epochs, runs, run_name, name_hash):
experiments = {}
check_folder("robustness")
# Running the experiments
for i in range(runs):
ST_directory_name = f"experiments/robustness/{run_name}_run_{i}_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
robustness_experiment = RobustnessExperiment(
population_size,
batch_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
epochs,
ST_directory_name
)
pickle.dump(robustness_experiment, open(f"{ST_directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = robustness_experiment
# Building a summary of all the runs
directory_name = f"experiments/robustness/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{str(name_hash)}"
os.mkdir(directory_name)
summary_pre_title = "robustness"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory_name,
summary_pre_title)
if __name__ == '__main__':
raise NotImplementedError('Test this here!!!')

View File

@ -0,0 +1,120 @@
import os.path
import pickle
from tqdm import tqdm
from experiments.helpers import check_folder, summary_fixpoint_experiment
from functionalities_test import test_for_fixpoints
from network import Net
from visualization import bar_chart_fixpoints
from visualization import plot_3d_self_application
class SelfApplicationExperiment:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size,
net_learning_rate, application_steps, train_nets, directory_name, training_steps
) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.SA_steps = application_steps #
self.train_nets = train_nets
self.ST_steps = training_steps
self.directory_name = directory_name
os.mkdir(self.directory_name)
""" Creating the nets & making the SA steps & (maybe) also training the networks. """
self.nets = []
# Create population:
self.populate_environment()
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.weights_evolution_3d_experiment()
self.count_fixpoints()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating SA experiment %s" % i)
net_name = f"SA_net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name
)
for _ in range(self.SA_steps):
input_data = net.input_weight_matrix()
target_data = net.create_target_weights(input_data)
if self.train_nets == "before_SA":
net.self_train(1, self.log_step_size, self.net_learning_rate)
net.self_application(self.SA_steps, self.log_step_size)
elif self.train_nets == "after_SA":
net.self_application(self.SA_steps, self.log_step_size)
net.self_train(1, self.log_step_size, self.net_learning_rate)
else:
net.self_application(self.SA_steps, self.log_step_size)
self.nets.append(net)
def weights_evolution_3d_experiment(self):
exp_name = f"SA_{str(len(self.nets))}_nets_3d_weights_PCA"
plot_3d_self_application(self.nets, exp_name, self.directory_name, self.log_step_size)
def count_fixpoints(self):
test_for_fixpoints(self.fixpoint_counters, self.nets)
exp_details = f"{self.SA_steps} SA steps"
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
def run_SA_experiment(population_size, batch_size, net_input_size, net_hidden_size, net_out_size,
net_learning_rate, runs, run_name, name_hash, application_steps, train_nets, training_steps):
experiments = {}
check_folder("self_application")
# Running the experiments
for i in range(runs):
directory_name = f"experiments/self_application/{run_name}_run_{i}_{str(population_size)}_nets_{application_steps}_SA_{str(name_hash)}"
SA_experiment = SelfApplicationExperiment(
population_size,
batch_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
application_steps,
train_nets,
directory_name,
training_steps
)
pickle.dump(SA_experiment, open(f"{directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = SA_experiment
# Building a summary of all the runs
directory_name = f"experiments/self_application/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{application_steps}_SA_{str(name_hash)}"
os.mkdir(directory_name)
summary_pre_title = "SA"
summary_fixpoint_experiment(runs, population_size, application_steps, experiments, net_learning_rate,
directory_name,
summary_pre_title)
if __name__ == '__main__':
raise NotImplementedError('Test this here!!!')

View File

@ -0,0 +1,116 @@
import os.path
import pickle
from pathlib import Path
from tqdm import tqdm
from experiments.helpers import check_folder, summary_fixpoint_experiment
from functionalities_test import test_for_fixpoints
from network import Net
from visualization import plot_loss, bar_chart_fixpoints
from visualization import plot_3d_self_train
class SelfTrainExperiment:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, directory_name) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.epochs = epochs
self.loss_history = []
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.directory_name = directory_name
os.mkdir(self.directory_name)
self.nets = []
# Create population:
self.populate_environment()
self.weights_evolution_3d_experiment()
self.count_fixpoints()
self.visualize_loss()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating ST experiment %s" % i)
net_name = f"ST_net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
for _ in range(self.epochs):
net.self_train(1, self.log_step_size, self.net_learning_rate)
print(f"\nLast weight matrix (epoch: {self.epochs}):\n{net.input_weight_matrix()}\nLossHistory: {net.loss_history[-10:]}")
self.nets.append(net)
def weights_evolution_3d_experiment(self):
exp_name = f"ST_{str(len(self.nets))}_nets_3d_weights_PCA"
return plot_3d_self_train(self.nets, exp_name, self.directory_name, self.log_step_size)
def count_fixpoints(self):
test_for_fixpoints(self.fixpoint_counters, self.nets)
exp_details = f"Self-train for {self.epochs} epochs"
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
def visualize_loss(self):
for i in range(len(self.nets)):
net_loss_history = self.nets[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory_name)
def run_ST_experiment(population_size, batch_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, runs, run_name, name_hash):
experiments = {}
logging_directory = Path('output') / 'self_training'
logging_directory.mkdir(parents=True, exist_ok=True)
# Running the experiments
for i in range(runs):
experiment_name = f"{run_name}_run_{i}_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
this_exp_directory = logging_directory / experiment_name
ST_experiment = SelfTrainExperiment(
population_size,
batch_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
epochs,
this_exp_directory
)
with (this_exp_directory / 'full_experiment_pickle.p').open('wb') as f:
pickle.dump(ST_experiment, f)
experiments[i] = ST_experiment
# Building a summary of all the runs
summary_name = f"/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
summary_directory_name = logging_directory / summary_name
summary_directory_name.mkdir(parents=True, exist_ok=True)
summary_pre_title = "ST"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, summary_directory_name,
summary_pre_title)
if __name__ == '__main__':
raise NotImplementedError('Test this here!!!')

View File

@ -0,0 +1,114 @@
import pickle
from pathlib import Path
from tqdm import tqdm
from experiments.helpers import check_folder, summary_fixpoint_experiment
from functionalities_test import test_for_fixpoints
from network import SecondaryNet
from visualization import plot_loss, bar_chart_fixpoints
from visualization import plot_3d_self_train
class SelfTrainExperimentSecondary:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, directory: Path) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.epochs = epochs
self.loss_history = []
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.directory_name = Path(directory)
self.directory_name.mkdir(parents=True, exist_ok=True)
self.nets = []
# Create population:
self.populate_environment()
self.weights_evolution_3d_experiment()
self.count_fixpoints()
self.visualize_loss()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating ST experiment %s" % i)
net_name = f"ST_net_{str(i)}"
net = SecondaryNet(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
for _ in range(self.epochs):
net.self_train(1, self.log_step_size, self.net_learning_rate)
print(f"\nLast weight matrix (epoch: {self.epochs}):\n{net.input_weight_matrix()}\nLossHistory: {net.loss_history[-10:]}")
self.nets.append(net)
def weights_evolution_3d_experiment(self):
exp_name = f"ST_{str(len(self.nets))}_nets_3d_weights_PCA"
return plot_3d_self_train(self.nets, exp_name, self.directory_name, self.log_step_size)
def count_fixpoints(self):
test_for_fixpoints(self.fixpoint_counters, self.nets)
exp_details = f"Self-train for {self.epochs} epochs"
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory_name, self.net_learning_rate,
exp_details)
def visualize_loss(self):
for i in range(len(self.nets)):
net_loss_history = self.nets[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory_name)
def run_ST_experiment(population_size, batch_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, runs, run_name, name_hash):
experiments = {}
logging_directory = Path('output') / 'self_training'
logging_directory.mkdir(parents=True, exist_ok=True)
# Running the experiments
for i in range(runs):
experiment_name = f"{run_name}_run_{i}_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
this_exp_directory = logging_directory / experiment_name
ST_experiment = SelfTrainExperimentSecondary(
population_size,
batch_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
epochs,
this_exp_directory
)
with (this_exp_directory / 'full_experiment_pickle.p').open('wb') as f:
pickle.dump(ST_experiment, f)
experiments[i] = ST_experiment
# Building a summary of all the runs
summary_name = f"/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
summary_directory_name = logging_directory / summary_name
summary_directory_name.mkdir(parents=True, exist_ok=True)
summary_pre_title = "ST"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, summary_directory_name,
summary_pre_title)
if __name__ == '__main__':
raise NotImplementedError('Test this here!!!')

190
experiments/soup_exp.py Normal file
View File

@ -0,0 +1,190 @@
import random
import os.path
import pickle
from pathlib import Path
from typing import Union
from tqdm import tqdm
from experiments.helpers import check_folder, summary_fixpoint_percentage, summary_fixpoint_experiment
from functionalities_test import test_for_fixpoints
from network import Net
from visualization import plot_loss, bar_chart_fixpoints, plot_3d_soup, line_chart_fixpoints
class SoupExperiment:
def __init__(self, population_size, net_i_size, net_h_size, net_o_size, learning_rate, attack_chance,
train_nets, ST_steps, epochs, log_step_size, directory: Union[str, Path]):
super().__init__()
self.population_size = population_size
self.net_input_size = net_i_size
self.net_hidden_size = net_h_size
self.net_out_size = net_o_size
self.net_learning_rate = learning_rate
self.attack_chance = attack_chance
self.train_nets = train_nets
# self.SA_steps = SA_steps
self.ST_steps = ST_steps
self.epochs = epochs
self.log_step_size = log_step_size
self.loss_history = []
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
# <self.fixpoint_counters_history> is used for keeping track of the amount of fixpoints in %
self.fixpoint_counters_history = []
self.directory = Path(directory)
self.directory.mkdir(parents=True, exist_ok=True)
self.population = []
self.populate_environment()
self.evolve()
self.fixpoint_percentage()
self.weights_evolution_3d_experiment()
self.count_fixpoints()
self.visualize_loss()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in tqdm(range(self.population_size)):
loop_population_size.set_description("Populating soup experiment %s" % i)
net_name = f"soup_network_{i}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
self.population.append(net)
def population_self_train(self):
# Self-training each network in the population
for j in range(self.population_size):
net = self.population[j]
for _ in range(self.ST_steps):
net.self_train(1, self.log_step_size, self.net_learning_rate)
def population_attack(self):
# A network attacking another network with a given percentage
if random.randint(1, 100) <= self.attack_chance:
random_net1, random_net2 = random.sample(range(self.population_size), 2)
random_net1 = self.population[random_net1]
random_net2 = self.population[random_net2]
print(f"\n Attack: {random_net1.name} -> {random_net2.name}")
random_net1.attack(random_net2)
def evolve(self):
""" Evolving consists of attacking & self-training. """
loop_epochs = tqdm(range(self.epochs))
for i in loop_epochs:
loop_epochs.set_description("Evolving soup %s" % i)
# A network attacking another network with a given percentage
self.population_attack()
# Self-training each network in the population
self.population_self_train()
# Testing for fixpoints after each batch of ST steps to see relevant data
if i % self.ST_steps == 0:
test_for_fixpoints(self.fixpoint_counters, self.population)
fixpoints_percentage = round(self.fixpoint_counters["identity_func"] / self.population_size, 1)
self.fixpoint_counters_history.append(fixpoints_percentage)
# Resetting the fixpoint counter. Last iteration not to be reset -
# it is important for the bar_chart_fixpoints().
if i < self.epochs:
self.reset_fixpoint_counters()
def weights_evolution_3d_experiment(self):
exp_name = f"soup_{self.population_size}_nets_{self.ST_steps}_training_{self.epochs}_epochs"
return plot_3d_soup(self.population, exp_name, self.directory)
def count_fixpoints(self):
test_for_fixpoints(self.fixpoint_counters, self.population)
exp_details = f"Evolution steps: {self.epochs} epochs"
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory, self.net_learning_rate,
exp_details)
def fixpoint_percentage(self):
runs = self.epochs / self.ST_steps
SA_steps = None
line_chart_fixpoints(self.fixpoint_counters_history, runs, self.ST_steps, SA_steps, self.directory,
self.population_size)
def visualize_loss(self):
for i in range(len(self.population)):
net_loss_history = self.population[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory)
def reset_fixpoint_counters(self):
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
def run_soup_experiment(population_size, attack_chance, net_input_size, net_hidden_size, net_out_size,
net_learning_rate, epochs, batch_size, runs, run_name, name_hash, ST_steps, train_nets):
experiments = {}
fixpoints_percentages = []
check_folder("soup")
# Running the experiments
for i in range(runs):
# FIXME: Make this a pathlib.Path() Operation
directory_name = f"experiments/soup/{run_name}_run_{i}_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
soup_experiment = SoupExperiment(
population_size,
net_input_size,
net_hidden_size,
net_out_size,
net_learning_rate,
attack_chance,
train_nets,
ST_steps,
epochs,
batch_size,
directory_name
)
pickle.dump(soup_experiment, open(f"{directory_name}/full_experiment_pickle.p", "wb"))
experiments[i] = soup_experiment
# Building history of fixpoint percentages for summary
fixpoint_counters_history = soup_experiment.fixpoint_counters_history
if not fixpoints_percentages:
fixpoints_percentages = soup_experiment.fixpoint_counters_history
else:
# Using list comprehension to make the sum of all the percentages
fixpoints_percentages = [fixpoints_percentages[i] + fixpoint_counters_history[i] for i in
range(len(fixpoints_percentages))]
# Creating a folder for the summary of the current runs
# FIXME: Make this a pathlib.Path() Operation
directory_name = f"experiments/soup/summary_{run_name}_{runs}_runs_{str(population_size)}_nets_{epochs}_epochs_{str(name_hash)}"
os.mkdir(directory_name)
# Building a summary of all the runs
summary_pre_title = "soup"
summary_fixpoint_experiment(runs, population_size, epochs, experiments, net_learning_rate, directory_name,
summary_pre_title)
SA_steps = None
summary_fixpoint_percentage(runs, epochs, fixpoints_percentages, ST_steps, SA_steps, directory_name,
population_size)

View File

@ -0,0 +1,50 @@
import random
from tqdm import tqdm
from experiments.soup_exp import SoupExperiment
from functionalities_test import test_for_fixpoints
class MeltingSoupExperiment(SoupExperiment):
def __init__(self, melt_chance, *args, keep_population_size=True, **kwargs):
super(MeltingSoupExperiment, self).__init__(*args, **kwargs)
self.keep_population_size = keep_population_size
self.melt_chance = melt_chance
def population_melt(self):
# A network melting with another network by a given percentage
if random.randint(1, 100) <= self.melt_chance:
random_net1_idx, random_net2_idx, destroy_idx = random.sample(range(self.population_size), 3)
random_net1 = self.population[random_net1_idx]
random_net2 = self.population[random_net2_idx]
print(f"\n Melt: {random_net1.name} -> {random_net2.name}")
melted_network = random_net1.melt(random_net2)
if self.keep_population_size:
del self.population[destroy_idx]
self.population.append(melted_network)
def evolve(self):
""" Evolving consists of attacking, melting & self-training. """
loop_epochs = tqdm(range(self.epochs))
for i in loop_epochs:
loop_epochs.set_description("Evolving soup %s" % i)
self.population_attack()
self.population_melt()
self.population_self_train()
# Testing for fixpoints after each batch of ST steps to see relevant data
if i % self.ST_steps == 0:
test_for_fixpoints(self.fixpoint_counters, self.population)
fixpoints_percentage = round(self.fixpoint_counters["identity_func"] / self.population_size, 1)
self.fixpoint_counters_history.append(fixpoints_percentage)
# Resetting the fixpoint counter. Last iteration not to be reset -
# it is important for the bar_chart_fixpoints().
if i < self.epochs:
self.reset_fixpoint_counters()

95
functionalities_test.py Normal file
View File

@ -0,0 +1,95 @@
import copy
from typing import Dict, List
import torch
from tqdm import tqdm
from network import Net
def is_divergent(network: Net) -> bool:
return network.input_weight_matrix().isinf().any().item() or network.input_weight_matrix().isnan().any().item()
def is_identity_function(network: Net, epsilon=pow(10, -5)) -> bool:
input_data = network.input_weight_matrix()
target_data = network.create_target_weights(input_data)
predicted_values = network(input_data)
return torch.allclose(target_data.detach(), predicted_values.detach(),
rtol=0, atol=epsilon)
def is_zero_fixpoint(network: Net, epsilon=pow(10, -5)) -> bool:
target_data = network.create_target_weights(network.input_weight_matrix().detach())
result = torch.allclose(target_data, torch.zeros_like(target_data), rtol=0, atol=epsilon)
# result = bool(len(np.nonzero(network.create_target_weights(network.input_weight_matrix()))))
return result
def is_secondary_fixpoint(network: Net, epsilon: float = pow(10, -5)) -> bool:
""" Secondary fixpoint check is done like this: compare first INPUT with second OUTPUT.
If they are within the boundaries, then is secondary fixpoint. """
input_data = network.input_weight_matrix()
target_data = network.create_target_weights(input_data)
# Calculating first output
first_output = network(input_data)
# Getting the second output by initializing a new net with the weights of the original net.
net_copy = copy.deepcopy(network)
net_copy.apply_weights(first_output)
input_data_2 = net_copy.input_weight_matrix()
# Calculating second output
second_output = network(input_data_2)
# Perform the Check: all(epsilon > abs(input_data - second_output))
check_abs_within_epsilon = torch.allclose(target_data.detach(), second_output.detach(),
rtol=0, atol=epsilon)
return check_abs_within_epsilon
def test_for_fixpoints(fixpoint_counter: Dict, nets: List, id_functions=None):
id_functions = id_functions or list()
for net in tqdm(nets, desc='Fixpoint Tester', total=len(nets)):
if is_divergent(net):
fixpoint_counter["divergent"] += 1
net.is_fixpoint = "divergent"
elif is_identity_function(net): # is default value
fixpoint_counter["identity_func"] += 1
net.is_fixpoint = "identity_func"
id_functions.append(net)
elif is_zero_fixpoint(net):
fixpoint_counter["fix_zero"] += 1
net.is_fixpoint = "fix_zero"
elif is_secondary_fixpoint(net):
fixpoint_counter["fix_sec"] += 1
net.is_fixpoint = "fix_sec"
else:
fixpoint_counter["other_func"] += 1
net.is_fixpoint = "other_func"
return id_functions
def changing_rate(x_new, x_old):
return x_new - x_old
def test_status(net: Net) -> Net:
if is_divergent(net):
net.is_fixpoint = "divergent"
elif is_identity_function(net): # is default value
net.is_fixpoint = "identity_func"
elif is_zero_fixpoint(net):
net.is_fixpoint = "fix_zero"
elif is_secondary_fixpoint(net):
net.is_fixpoint = "fix_sec"
else:
net.is_fixpoint = "other_func"
return net

View File

@ -0,0 +1,203 @@
import copy
import itertools
from pathlib import Path
import random
import pickle
import pandas as pd
import numpy as np
import torch
from functionalities_test import is_identity_function, test_status
from journal_basins import SpawnExperiment, mean_invariate_manhattan_distance
from network import Net
from sklearn.metrics import mean_absolute_error as MAE
from sklearn.metrics import mean_squared_error as MSE
class SpawnLinspaceExperiment(SpawnExperiment):
def spawn_and_continue(self, number_clones: int = None):
number_clones = number_clones or self.nr_clones
df = pd.DataFrame(
columns=['clone', 'parent', 'parent2',
'MAE_pre', 'MAE_post',
'MSE_pre', 'MSE_post',
'MIM_pre', 'MIM_post',
'noise', 'status_pst'])
# For every initial net {i} after populating (that is fixpoint after first epoch);
# parent = self.parents[0]
# parent_clone = clone = Net(parent.input_size, parent.hidden_size, parent.out_size,
# name=f"{parent.name}_clone_{0}", start_time=self.ST_steps)
# parent_clone.apply_weights(torch.as_tensor(parent.create_target_weights(parent.input_weight_matrix())))
# parent_clone = parent_clone.apply_noise(self.noise)
# self.parents.append(parent_clone)
pairwise_net_list = list(itertools.combinations(self.parents, 2))
for net1, net2 in pairwise_net_list:
# We set parent start_time to just before this epoch ended, so plotting is zoomed in. Comment out to
# to see full trajectory (but the clones will be very hard to see).
# Make one target to compare distances to clones later when they have trained.
net1.start_time = self.ST_steps - 150
net1_input_data = net1.input_weight_matrix().detach()
net1_target_data = net1.create_target_weights(net1_input_data).detach()
net2.start_time = self.ST_steps - 150
net2_input_data = net2.input_weight_matrix().detach()
net2_target_data = net2.create_target_weights(net2_input_data).detach()
if is_identity_function(net1) and is_identity_function(net2):
# if True:
# Clone the fixpoint x times and add (+-)self.noise to weight-sets randomly;
# To plot clones starting after first epoch (z=ST_steps), set that as start_time!
# To make sure PCA will plot the same trajectory up until this point, we clone the
# parent-net's weight history as well.
in_between_weights = np.linspace(net1_target_data, net2_target_data, number_clones, endpoint=False)
# in_between_weights = np.logspace(net1_target_data, net2_target_data, number_clones, endpoint=False)
for j, in_between_weight in enumerate(in_between_weights):
clone = Net(net1.input_size, net1.hidden_size, net1.out_size,
name=f"{net1.name}_{net2.name}_clone_{str(j)}", start_time=self.ST_steps + 100)
clone.apply_weights(torch.as_tensor(in_between_weight))
clone.s_train_weights_history = copy.deepcopy(net1.s_train_weights_history)
clone.number_trained = copy.deepcopy(net1.number_trained)
# Pre Training distances (after noise application of course)
clone_pre_weights = clone.create_target_weights(clone.input_weight_matrix()).detach()
MAE_pre = MAE(net1_target_data, clone_pre_weights)
MSE_pre = MSE(net1_target_data, clone_pre_weights)
MIM_pre = mean_invariate_manhattan_distance(net1_target_data, clone_pre_weights)
try:
# Then finish training each clone {j} (for remaining epoch-1 * ST_steps) ..
for _ in range(self.epochs - 1):
for _ in range(self.ST_steps):
clone.self_train(1, self.log_step_size, self.net_learning_rate)
if any([torch.isnan(x).any() for x in clone.parameters()]):
raise ValueError
except ValueError:
print("Ran into nan in 'in beetween weights' array.")
df.loc[len(df)] = [j, net1.name, net2.name,
MAE_pre, 0,
MSE_pre, 0,
MIM_pre, 0,
self.noise, clone.is_fixpoint]
continue
# Post Training distances for comparison
clone_post_weights = clone.create_target_weights(clone.input_weight_matrix()).detach()
MAE_post = MAE(net1_target_data, clone_post_weights)
MSE_post = MSE(net1_target_data, clone_post_weights)
MIM_post = mean_invariate_manhattan_distance(net1_target_data, clone_post_weights)
# .. log to data-frame and add to nets for 3d plotting if they are fixpoints themselves.
test_status(clone)
if is_identity_function(clone):
print(f"Clone {j} (between {net1.name} and {net2.name}) is fixpoint."
f"\nMSE({net1.name},{j}): {MSE_post}"
f"\nMAE({net1.name},{j}): {MAE_post}"
f"\nMIM({net1.name},{j}): {MIM_post}\n")
self.nets.append(clone)
df.loc[len(df)] = [j, net1.name, net2.name,
MAE_pre, MAE_post,
MSE_pre, MSE_post,
MIM_pre, MIM_post,
self.noise, clone.is_fixpoint]
for net1, net2 in pairwise_net_list:
try:
value = 'MAE'
c_selector = [f'{value}_pre', f'{value}_post']
values = df.loc[(df['parent'] == net1.name) & (df['parent2'] == net2.name)][c_selector]
this_min, this_max = values.values.min(), values.values.max()
df.loc[(df['parent'] == net1.name) &
(df['parent2'] == net2.name), c_selector] = (values - this_min) / (this_max - this_min)
except ValueError:
pass
for parent in self.parents:
for _ in range(self.epochs - 1):
for _ in range(self.ST_steps):
parent.self_train(1, self.log_step_size, self.net_learning_rate)
self.df = df
if __name__ == '__main__':
NET_INPUT_SIZE = 4
NET_OUT_SIZE = 1
# Define number of runs & name:
ST_runs = 1
ST_runs_name = "test-27"
ST_steps = 2000
ST_epochs = 2
ST_log_step_size = 10
# Define number of networks & their architecture
nr_clones = 25
ST_population_size = 10
ST_net_hidden_size = 2
ST_net_learning_rate = 0.04
ST_name_hash = random.getrandbits(32)
print(f"Running the Spawn experiment:")
exp = SpawnLinspaceExperiment(
population_size=ST_population_size,
log_step_size=ST_log_step_size,
net_input_size=NET_INPUT_SIZE,
net_hidden_size=ST_net_hidden_size,
net_out_size=NET_OUT_SIZE,
net_learning_rate=ST_net_learning_rate,
epochs=ST_epochs,
st_steps=ST_steps,
nr_clones=nr_clones,
noise=1e-8,
directory=Path('output') / 'spawn_basin' / f'{ST_name_hash}' / f'linage'
)
df = exp.df
directory = Path('output') / 'spawn_basin' / f'{ST_name_hash}' / 'linage'
with (directory / f"experiment_pickle_{ST_name_hash}.p").open('wb') as f:
pickle.dump(exp, f)
print(f"\nSaved experiment to {directory}.")
# Boxplot with counts of nr_fixpoints, nr_other, nr_etc. on y-axis
# sns.countplot(data=df, x="noise", hue="status_post")
# plt.savefig(f"output/spawn_basin/{ST_name_hash}/fixpoint_status_countplot.png")
# Catplot (either kind="point" or "box") that shows before-after training distances to parent
# mlt = df[["MIM_pre", "MIM_post", "noise"]].melt("noise", var_name="time", value_name='Average Distance')
# sns.catplot(data=mlt, x="time", y="Average Distance", col="noise", kind="point", col_wrap=5, sharey=False)
# plt.savefig(f"output/spawn_basin/{ST_name_hash}/clone_distance_catplot.png")
# Pointplot with pre and after parent Distances
import seaborn as sns
from matplotlib import pyplot as plt, ticker
# ptplt = sns.pointplot(data=exp.df, x='MAE_pre', y='MAE_post', join=False)
ptplt = sns.scatterplot(x=exp.df['MAE_pre'], y=exp.df['MAE_post'])
# ptplt.set(xscale='log', yscale='log')
x0, x1 = ptplt.axes.get_xlim()
y0, y1 = ptplt.axes.get_ylim()
lims = [max(x0, y0), min(x1, y1)]
# This is the x=y line using transforms
ptplt.plot(lims, lims, 'w', linestyle='dashdot', transform=ptplt.axes.transData)
ptplt.plot([0, 1], [0, 1], ':k', transform=ptplt.axes.transAxes)
ptplt.set(xlabel='Mean Absolute Distance before Self-Training',
ylabel='Mean Absolute Distance after Self-Training')
# ptplt.axes.xaxis.set_major_formatter(ticker.FuncFormatter(lambda x, pos: round(float(x), 2)))
# ptplt.xticks(rotation=45)
#for ind, label in enumerate(ptplt.get_xticklabels()):
# if ind % 10 == 0: # every 10th label is kept
# label.set_visible(True)
# else:
# label.set_visible(False)
filepath = exp.directory / 'mim_dist_plot.pdf'
plt.tight_layout()
plt.savefig(filepath, dpi=600, format='pdf', bbox_inches='tight')

315
journal_basins.py Normal file
View File

@ -0,0 +1,315 @@
import os
from pathlib import Path
import pickle
from tqdm import tqdm
import random
import copy
from functionalities_test import is_identity_function, test_status
from network import Net
from visualization import plot_3d_self_train, plot_loss
import numpy as np
from tabulate import tabulate
from sklearn.metrics import mean_absolute_error as MAE
from sklearn.metrics import mean_squared_error as MSE
import pandas as pd
import seaborn as sns
from matplotlib import pyplot as plt
import torch
import torch.nn.functional as F
def prng():
return random.random()
def l1(tup):
a, b = tup
return abs(a - b)
def mean_invariate_manhattan_distance(x, y):
# One of these one-liners that might be smart or really dumb. Goal is to find pairwise
# distances of ascending values, ie. sum (abs(min1_X-min1_Y), abs(min2_X-min2Y) ...) / mean.
# Idea was to find weight sets that have same values but just in different positions, that would
# make this distance 0.
try:
return np.mean(list(map(l1, zip(sorted(x.detach().numpy()), sorted(y.detach().numpy())))))
except AttributeError:
return np.mean(list(map(l1, zip(sorted(x.numpy()), sorted(y.numpy())))))
def distance_matrix(nets, distance="MIM", print_it=True):
matrix = [[0 for _ in range(len(nets))] for _ in range(len(nets))]
for net in range(len(nets)):
weights = nets[net].input_weight_matrix()[:, 0]
for other_net in range(len(nets)):
other_weights = nets[other_net].input_weight_matrix()[:, 0]
if distance in ["MSE"]:
matrix[net][other_net] = MSE(weights, other_weights)
elif distance in ["MAE"]:
matrix[net][other_net] = MAE(weights, other_weights)
elif distance in ["MIM"]:
matrix[net][other_net] = mean_invariate_manhattan_distance(weights, other_weights)
if print_it:
print(f"\nDistance matrix (all to all) [{distance}]:")
headers = [i.name for i in nets]
print(tabulate(matrix, showindex=headers, headers=headers, tablefmt='orgtbl'))
return matrix
def distance_from_parent(nets, distance="MIM", print_it=True):
list_of_matrices = []
parents = list(filter(lambda x: "clone" not in x.name and is_identity_function(x), nets))
distance_range = range(10)
for parent in parents:
parent_weights = parent.create_target_weights(parent.input_weight_matrix())
clones = list(filter(lambda y: parent.name in y.name and parent.name != y.name, nets))
matrix = [[0 for _ in distance_range] for _ in range(len(clones))]
for dist in distance_range:
for idx, clone in enumerate(clones):
clone_weights = clone.create_target_weights(clone.input_weight_matrix())
if distance in ["MSE"]:
matrix[idx][dist] = MSE(parent_weights, clone_weights) < pow(10, -dist)
elif distance in ["MAE"]:
matrix[idx][dist] = MAE(parent_weights, clone_weights) < pow(10, -dist)
elif distance in ["MIM"]:
matrix[idx][dist] = mean_invariate_manhattan_distance(parent_weights, clone_weights) < pow(10,
-dist)
if print_it:
print(f"\nDistances from parent {parent.name} [{distance}]:")
col_headers = [str(f"10e-{d}") for d in distance_range]
row_headers = [str(f"clone_{i}") for i in range(len(clones))]
print(tabulate(matrix, showindex=row_headers, headers=col_headers, tablefmt='orgtbl'))
list_of_matrices.append(matrix)
return list_of_matrices
class SpawnExperiment:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, st_steps, nr_clones, noise, directory) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.epochs = epochs
self.ST_steps = st_steps
self.loss_history = []
self.nets = []
self.nr_clones = nr_clones
self.noise = noise or 10e-5
print("\nNOISE:", self.noise)
self.parents = []
self.directory = Path(directory)
self.directory.mkdir(parents=True, exist_ok=True)
self.populate_environment()
self.spawn_and_continue()
self.weights_evolution_3d_experiment()
# self.visualize_loss()
self.distance_matrix = distance_matrix(self.nets, print_it=False)
self.parent_clone_distances = distance_from_parent(self.nets, print_it=False)
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating experiment %s" % i)
net_name = f"ST_net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
for _ in range(self.ST_steps):
net.self_train(1, self.log_step_size, self.net_learning_rate)
self.nets.append(net)
self.parents.append(net)
def spawn_and_continue(self, number_clones: int = None):
number_clones = number_clones or self.nr_clones
df = pd.DataFrame(
columns=['name', 'MAE_pre', 'MAE_post', 'MSE_pre', 'MSE_post', 'MIM_pre', 'MIM_post', 'noise',
'status_post'])
# For every initial net {i} after populating (that is fixpoint after first epoch);
for i in range(self.population_size):
net = self.nets[i]
# We set parent start_time to just before this epoch ended, so plotting is zoomed in. Comment out to
# to see full trajectory (but the clones will be very hard to see).
# Make one target to compare distances to clones later when they have trained.
net.start_time = self.ST_steps - 350
net_input_data = net.input_weight_matrix()
net_target_data = net.create_target_weights(net_input_data)
if is_identity_function(net):
print(f"\nNet {i} is fixpoint")
# Clone the fixpoint x times and add (+-)self.noise to weight-sets randomly;
# To plot clones starting after first epoch (z=ST_steps), set that as start_time!
# To make sure PCA will plot the same trajectory up until this point, we clone the
# parent-net's weight history as well.
for j in range(number_clones):
clone = Net(net.input_size, net.hidden_size, net.out_size,
f"ST_net_{str(i)}_clone_{str(j)}", start_time=self.ST_steps)
clone.load_state_dict(copy.deepcopy(net.state_dict()))
rand_noise = prng() * self.noise
clone = clone.apply_noise(rand_noise)
clone.s_train_weights_history = copy.deepcopy(net.s_train_weights_history)
clone.number_trained = copy.deepcopy(net.number_trained)
# Pre Training distances (after noise application of course)
clone_pre_weights = clone.create_target_weights(clone.input_weight_matrix())
MAE_pre = MAE(net_target_data, clone_pre_weights)
MSE_pre = MSE(net_target_data, clone_pre_weights)
MIM_pre = mean_invariate_manhattan_distance(net_target_data, clone_pre_weights)
# Then finish training each clone {j} (for remaining epoch-1 * ST_steps) ..
for _ in range(self.epochs - 1):
for _ in range(self.ST_steps):
clone.self_train(1, self.log_step_size, self.net_learning_rate)
# Post Training distances for comparison
clone_post_weights = clone.create_target_weights(clone.input_weight_matrix())
MAE_post = MAE(net_target_data, clone_post_weights)
MSE_post = MSE(net_target_data, clone_post_weights)
MIM_post = mean_invariate_manhattan_distance(net_target_data, clone_post_weights)
# .. log to data-frame and add to nets for 3d plotting if they are fixpoints themselves.
test_status(clone)
if is_identity_function(clone):
print(f"Clone {j} (of net_{i}) is fixpoint."
f"\nMSE({i},{j}): {MSE_post}"
f"\nMAE({i},{j}): {MAE_post}"
f"\nMIM({i},{j}): {MIM_post}\n")
self.nets.append(clone)
df.loc[clone.name] = [clone.name, MAE_pre, MAE_post, MSE_pre, MSE_post, MIM_pre, MIM_post, self.noise, clone.is_fixpoint]
# Finally take parent net {i} and finish it's training for comparison to clone development.
for _ in range(self.epochs - 1):
for _ in range(self.ST_steps):
net.self_train(1, self.log_step_size, self.net_learning_rate)
net_weights_after = net.create_target_weights(net.input_weight_matrix())
print(f"Parent net's distance to original position."
f"\nMSE(OG,new): {MAE(net_target_data, net_weights_after)}"
f"\nMAE(OG,new): {MSE(net_target_data, net_weights_after)}"
f"\nMIM(OG,new): {mean_invariate_manhattan_distance(net_target_data, net_weights_after)}\n")
self.df = df
def weights_evolution_3d_experiment(self):
exp_name = f"ST_{str(len(self.nets))}_nets_3d_weights_PCA"
return plot_3d_self_train(self.nets, exp_name, self.directory, self.log_step_size, plot_pca_together=True)
def visualize_loss(self):
for i in range(len(self.nets)):
net_loss_history = self.nets[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory)
if __name__ == "__main__":
NET_INPUT_SIZE = 4
NET_OUT_SIZE = 1
# Define number of runs & name:
ST_runs = 1
ST_runs_name = "test-27"
ST_steps = 2500
ST_epochs = 2
ST_log_step_size = 10
# Define number of networks & their architecture
nr_clones = 10
ST_population_size = 1
ST_net_hidden_size = 2
ST_net_learning_rate = 0.04
ST_name_hash = random.getrandbits(32)
print(f"Running the Spawn experiment:")
exp_list = []
for noise_factor in range(2, 3):
exp = SpawnExperiment(
population_size=ST_population_size,
log_step_size=ST_log_step_size,
net_input_size=NET_INPUT_SIZE,
net_hidden_size=ST_net_hidden_size,
net_out_size=NET_OUT_SIZE,
net_learning_rate=ST_net_learning_rate,
epochs=ST_epochs,
st_steps=ST_steps,
nr_clones=nr_clones,
noise=pow(10, -noise_factor),
directory=Path('output') / 'spawn_basin' / f'{ST_name_hash}' / f'10e-{noise_factor}'
)
exp_list.append(exp)
directory = Path('output') / 'spawn_basin' / f'{ST_name_hash}'
pickle.dump(exp_list, open(f"{directory}/experiment_pickle_{ST_name_hash}.p", "wb"))
print(f"\nSaved experiment to {directory}.")
# Concat all dataframes, and add columns depending on where clone weights end up after training (rel. to parent)
df = pd.concat([exp.df for exp in exp_list])
df = df.dropna().reset_index()
df["relative_distance"] = [ (df.loc[i]["MAE_pre"] - df.loc[i]["MAE_post"])/df.loc[i]["noise"] for i in range(len(df))]
df["class"] = [ "approaching" if df.loc[i]["relative_distance"] > 0 else "distancing" if df.loc[i]["relative_distance"] < 0 else "stationary" for i in range(len(df))]
# Countplot of all fixpoint clone after training per class.
ax = sns.catplot(kind="count", data=df, x="noise", hue="class", height=5.27, aspect=11.7/5.27, legend=False)
ax.set_axis_labels("Noise Levels", "Clone Fixpoints After Training Count ", fontsize=15)
ax.set_xticklabels(labels=('$\mathregular{10^{-10}}$', '$\mathregular{10^{-9}}$', '$\mathregular{10^{-8}}$', '$\mathregular{10^{-7}}$', '$\mathregular{10^{-6}}$', '$\mathregular{10^{-5}}$', '$\mathregular{10^{-4}}$', '$\mathregular{10^{-5}}$', '$\mathregular{10^{-2}}$', '$\mathregular{10^{-1}}$'), fontsize=15)
plt.legend(bbox_to_anchor=(0.01, 0.85), loc=2, borderaxespad=0.)
plt.legend(fontsize='large')
plt.savefig(f"{directory}/clone_status_after_countplot_{ST_name_hash}.png")
plt.clf()
# Catplot of before-after comparison of the clone's weights. Colors links depending on class (approaching, distancing, stationary (i.e., MAE=0)). Blue, orange and green are based on countplot above, should be save for colorblindness (see https://gist.github.com/mwaskom/b35f6ebc2d4b340b4f64a4e28e778486)-
mlt = df.melt(id_vars=["name", "noise", "class"], value_vars=["MAE_pre", "MAE_post"], var_name="State", value_name="Distance")
P = ["blue" if mlt.loc[i]["class"] == "approaching" else "orange" if mlt.loc[i]["class"] == "distancing" else "green" for i in range(len(mlt))]
P = sns.color_palette(P, as_cmap=False)
ax = sns.catplot(data=mlt, x="State", y="Distance", col="noise", hue="name", kind="point", palette=P, col_wrap=min(5, len(exp_list)), sharey=False, legend=False)
ax.map(sns.boxplot, "State", "Distance", "noise", linewidth=0.8, order=["MAE_pre", "MAE_post"], whis=[0, 100])
ax.set_axis_labels("", "Manhattan Distance To Parent Weights", fontsize=15)
ax.set_xticklabels(labels=('after noise application', 'after training'), fontsize=15)
# plt.ticklabel_format(style='sci', axis='x')
plt.savefig(f"{directory}/before_after_distance_catplot_{ST_name_hash}.png")
plt.clf()
# Catplot of child_nets L1 Prediction "progress" compared to parents. Computes one round of accuracy first. If net is a parent net (not a clone), then we reset weights to timestep of cloning first (from the weight history). So 5k (end) -> 2.5k training (in this experiment, so careful with len(history)/2, this might only work here!)
df_acc = pd.DataFrame(columns=["name", "noise", "l1_acc", "Network Type"])
for i in range(len(exp_list)):
noise = exp_list[i].noise
print(f"\nNoise: {noise}")
for network in exp_list[i].nets:
is_parent = "clone" not in network.name
if is_parent:
network.apply_weights(torch.tensor(network.s_train_weights_history[int(len(network.s_train_weights_history)/2)][0]))
input_data = network.input_weight_matrix()
target_data = network.create_target_weights(input_data)
predicted_values = network(input_data)
mse_loss = F.mse_loss(target_data, predicted_values).item()
l1_loss = F.l1_loss(target_data, predicted_values).item()
df_acc.loc[len(df_acc)+1] = [network.name, noise, l1_loss, "parents" if is_parent else "child_nets"]
print("MSE:", mse_loss, "\t", "L1: ", l1_loss, "\t", network.name)
# Note: If there are outliers then showfliers=False is necessary or it will zoom way to far out. If parent and child_nets accuracy is too far apart this plot might not work (only shows either parents or part of the child_nets).
ax = sns.catplot(data=df_acc, y="l1_acc", x="noise", hue="Network Type", kind="box", legend=False, showfliers=False, height=5.27, aspect=11.7/5.27, sharey=False)
ax.map(plt.axhline, y=10**-6, ls='--')
ax.map(plt.axhline, y=10**-7, ls='--')
ax.set_axis_labels("Noise levels", "L1 Prediction Loss After Training", fontsize=15)
ax.set_xticklabels(labels=('$\mathregular{10^{-10}}$', '$\mathregular{10^{-9}}$', '$\mathregular{10^{-8}}$', '$\mathregular{10^{-7}}$', '$\mathregular{10^{-6}}$', '$\mathregular{10^{-5}}$', '$\mathregular{10^{-4}}$', '$\mathregular{10^{-5}}$', '$\mathregular{10^{-2}}$', '$\mathregular{10^{-1}}$'), fontsize=15)
plt.legend(bbox_to_anchor=(0.01, 0.85), loc=2, borderaxespad=0.)
plt.legend(fontsize='large')
plt.savefig(f"{directory}/parent_vs_children_accuracy_{ST_name_hash}.png")
plt.clf()

246
journal_robustness.py Normal file
View File

@ -0,0 +1,246 @@
import pickle
import pandas as pd
import torch
import random
import copy
from pathlib import Path
from matplotlib.ticker import ScalarFormatter
from tqdm import tqdm
from tabulate import tabulate
from functionalities_test import is_identity_function, is_zero_fixpoint, test_for_fixpoints, is_divergent
from network import Net
from torch.nn import functional as F
from visualization import plot_loss, bar_chart_fixpoints
import seaborn as sns
from matplotlib import pyplot as plt
def prng():
return random.random()
def generate_perfekt_synthetic_fixpoint_weights():
return torch.tensor([[1.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0], [0.0],
[1.0], [0.0], [0.0], [0.0],
[1.0], [0.0]
], dtype=torch.float32)
PALETTE = 10 * (
"#377eb8",
"#4daf4a",
"#984ea3",
"#e41a1c",
"#ff7f00",
"#a65628",
"#f781bf",
"#888888",
"#a6cee3",
"#b2df8a",
"#cab2d6",
"#fb9a99",
"#fdbf6f",
)
class RobustnessComparisonExperiment:
@staticmethod
def apply_noise(network, noise: int):
# Changing the weights of a network to values + noise
for layer_id, layer_name in enumerate(network.state_dict()):
for line_id, line_values in enumerate(network.state_dict()[layer_name]):
for weight_id, weight_value in enumerate(network.state_dict()[layer_name][line_id]):
# network.state_dict()[layer_name][line_id][weight_id] = weight_value + noise
if prng() < 0.5:
network.state_dict()[layer_name][line_id][weight_id] = weight_value + noise
else:
network.state_dict()[layer_name][line_id][weight_id] = weight_value - noise
return network
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, st_steps, synthetic, directory) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.epochs = epochs
self.ST_steps = st_steps
self.loss_history = []
self.is_synthetic = synthetic
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
self.directory = Path(directory)
self.directory.mkdir(parents=True, exist_ok=True)
self.id_functions = []
self.nets = self.populate_environment()
self.count_fixpoints()
self.time_to_vergence, self.time_as_fixpoint = self.test_robustness(
seeds=population_size if self.is_synthetic else 1)
def populate_environment(self):
nets = []
if self.is_synthetic:
''' Either use perfect / hand-constructed fixpoint ... '''
net_name = f"net_{str(0)}_synthetic"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
net.apply_weights(generate_perfekt_synthetic_fixpoint_weights())
nets.append(net)
else:
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating experiment %s" % i)
''' .. or use natural approach to train fixpoints from random initialisation. '''
net_name = f"net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
for _ in range(self.epochs):
net.self_train(self.ST_steps, self.log_step_size, self.net_learning_rate)
nets.append(net)
return nets
def test_robustness(self, print_it=True, noise_levels=10, seeds=10):
assert (len(self.id_functions) == 1 and seeds > 1) or (len(self.id_functions) > 1 and seeds == 1)
time_to_vergence = [[0 for _ in range(noise_levels)] for _ in
range(seeds if self.is_synthetic else len(self.id_functions))]
time_as_fixpoint = [[0 for _ in range(noise_levels)] for _ in
range(seeds if self.is_synthetic else len(self.id_functions))]
row_headers = []
# This checks wether to use synthetic setting with multiple seeds
# or multi network settings with a singlee seed
df = pd.DataFrame(columns=['setting', 'Noise Level', 'Self Train Steps', 'absolute_loss',
'Time to convergence', 'Time as fixpoint'])
with tqdm(total=max(len(self.id_functions), seeds)) as pbar:
for i, fixpoint in enumerate(self.id_functions): # 1 / n
row_headers.append(fixpoint.name)
for seed in range(seeds): # n / 1
setting = seed if self.is_synthetic else i
for noise_level in range(noise_levels):
steps = 0
clone = Net(fixpoint.input_size, fixpoint.hidden_size, fixpoint.out_size,
f"{fixpoint.name}_clone_noise_1e-{noise_level}")
clone.load_state_dict(copy.deepcopy(fixpoint.state_dict()))
clone = clone.apply_noise(pow(10, -noise_level))
while not is_zero_fixpoint(clone) and not is_divergent(clone):
# -> before
clone_weight_pre_application = clone.input_weight_matrix()
target_data_pre_application = clone.create_target_weights(clone_weight_pre_application)
clone.self_application(1, self.log_step_size)
time_to_vergence[setting][noise_level] += 1
# -> after
clone_weight_post_application = clone.input_weight_matrix()
target_data_post_application = clone.create_target_weights(clone_weight_post_application)
absolute_loss = F.l1_loss(target_data_pre_application, target_data_post_application).item()
if is_identity_function(clone):
time_as_fixpoint[setting][noise_level] += 1
# When this raises a Type Error, we found a second order fixpoint!
steps += 1
df.loc[df.shape[0]] = [setting, f'$\mathregular{{10^{{-{noise_level}}}}}$',
steps, absolute_loss,
time_to_vergence[setting][noise_level],
time_as_fixpoint[setting][noise_level]]
pbar.update(1)
# Get the measuremts at the highest time_time_to_vergence
df_sorted = df.sort_values('Self Train Steps', ascending=False).drop_duplicates(['setting', 'Noise Level'])
df_melted = df_sorted.reset_index().melt(id_vars=['setting', 'Noise Level', 'Self Train Steps'],
value_vars=['Time to convergence', 'Time as fixpoint'],
var_name="Measurement",
value_name="Steps").sort_values('Noise Level')
# Plotting
# plt.rcParams.update({
# "text.usetex": True,
# "font.family": "sans-serif",
# "font.size": 12,
# "font.weight": 'bold',
# "font.sans-serif": ["Helvetica"]})
sns.set(style='whitegrid', font_scale=2)
bf = sns.boxplot(data=df_melted, y='Steps', x='Noise Level', hue='Measurement', palette=PALETTE)
synthetic = 'synthetic' if self.is_synthetic else 'natural'
plt.tight_layout()
# sns.set(rc={'figure.figsize': (10, 50)})
# bx = sns.catplot(data=df[df['absolute_loss'] < 1], y='absolute_loss', x='application_step', kind='box',
# col='noise_level', col_wrap=3, showfliers=False)
filename = f"absolute_loss_perapplication_boxplot_grid_{'synthetic' if self.is_synthetic else 'wild'}.png"
filepath = self.directory / filename
plt.savefig(str(filepath))
if print_it:
col_headers = [str(f"1e-{d}") for d in range(noise_levels)]
print(f"\nAppplications steps until divergence / zero: ")
# print(tabulate(time_to_vergence, showindex=row_headers, headers=col_headers, tablefmt='orgtbl'))
print(f"\nTime as fixpoint: ")
# print(tabulate(time_as_fixpoint, showindex=row_headers, headers=col_headers, tablefmt='orgtbl'))
return time_as_fixpoint, time_to_vergence
def count_fixpoints(self):
exp_details = f"ST steps: {self.ST_steps}"
self.id_functions = test_for_fixpoints(self.fixpoint_counters, self.nets)
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory, self.net_learning_rate,
exp_details)
def visualize_loss(self):
for i in range(len(self.nets)):
net_loss_history = self.nets[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory)
if __name__ == "__main__":
NET_INPUT_SIZE = 4
NET_OUT_SIZE = 1
ST_steps = 1000
ST_epochs = 5
ST_log_step_size = 10
ST_population_size = 1000
ST_net_hidden_size = 2
ST_net_learning_rate = 0.004
ST_name_hash = random.getrandbits(32)
ST_synthetic = False
print(f"Running the robustness comparison experiment:")
exp = RobustnessComparisonExperiment(
population_size=ST_population_size,
log_step_size=ST_log_step_size,
net_input_size=NET_INPUT_SIZE,
net_hidden_size=ST_net_hidden_size,
net_out_size=NET_OUT_SIZE,
net_learning_rate=ST_net_learning_rate,
epochs=ST_epochs,
st_steps=ST_steps,
synthetic=ST_synthetic,
directory=Path('output') / 'journal_robustness' / f'{ST_name_hash}'
)
directory = Path('output') / 'journal_robustness' / f'{ST_name_hash}'
pickle.dump(exp, open(f"{directory}/experiment_pickle_{ST_name_hash}.p", "wb"))
print(f"\nSaved experiment to {directory}.")

341
journal_soup_basins.py Normal file
View File

@ -0,0 +1,341 @@
import pickle
import random
import copy
from pathlib import Path
import numpy as np
import pandas as pd
import seaborn as sns
import torch
from matplotlib import pyplot as plt
from sklearn.metrics import mean_absolute_error as MAE
from sklearn.metrics import mean_squared_error as MSE
from tabulate import tabulate
from tqdm import tqdm
from functionalities_test import is_identity_function, test_status, is_zero_fixpoint, is_divergent, \
is_secondary_fixpoint
from journal_basins import mean_invariate_manhattan_distance
from network import Net
from visualization import plot_loss, plot_3d_soup
def l1(tup):
a, b = tup
return abs(a - b)
def distance_matrix(nets, distance="MIM", print_it=True):
matrix = [[0 for _ in range(len(nets))] for _ in range(len(nets))]
for net in range(len(nets)):
weights = nets[net].input_weight_matrix()[:, 0]
for other_net in range(len(nets)):
other_weights = nets[other_net].input_weight_matrix()[:, 0]
if distance in ["MSE"]:
matrix[net][other_net] = MSE(weights, other_weights)
elif distance in ["MAE"]:
matrix[net][other_net] = MAE(weights, other_weights)
elif distance in ["MIM"]:
matrix[net][other_net] = mean_invariate_manhattan_distance(weights, other_weights)
if print_it:
print(f"\nDistance matrix (all to all) [{distance}]:")
headers = [i.name for i in nets]
print(tabulate(matrix, showindex=headers, headers=headers, tablefmt='orgtbl'))
return matrix
def distance_from_parent(nets, distance="MIM", print_it=True):
list_of_matrices = []
parents = list(filter(lambda x: "clone" not in x.name and is_identity_function(x), nets))
distance_range = range(10)
for parent in parents:
parent_weights = parent.create_target_weights(parent.input_weight_matrix())
clones = list(filter(lambda y: parent.name in y.name and parent.name != y.name, nets))
matrix = [[0 for _ in distance_range] for _ in range(len(clones))]
for dist in distance_range:
for idx, clone in enumerate(clones):
clone_weights = clone.create_target_weights(clone.input_weight_matrix())
if distance in ["MSE"]:
matrix[idx][dist] = MSE(parent_weights, clone_weights) < pow(10, -dist)
elif distance in ["MAE"]:
matrix[idx][dist] = MAE(parent_weights, clone_weights) < pow(10, -dist)
elif distance in ["MIM"]:
matrix[idx][dist] = mean_invariate_manhattan_distance(parent_weights, clone_weights) < pow(10,
-dist)
if print_it:
print(f"\nDistances from parent {parent.name} [{distance}]:")
col_headers = [str(f"10e-{d}") for d in distance_range]
row_headers = [str(f"clone_{i}") for i in range(len(clones))]
print(tabulate(matrix, showindex=row_headers, headers=col_headers, tablefmt='orgtbl'))
list_of_matrices.append(matrix)
return list_of_matrices
class SoupSpawnExperiment:
def __init__(self, population_size, log_step_size, net_input_size, net_hidden_size, net_out_size, net_learning_rate,
epochs, st_steps, attack_chance, nr_clones, noise, directory) -> None:
self.population_size = population_size
self.log_step_size = log_step_size
self.net_input_size = net_input_size
self.net_hidden_size = net_hidden_size
self.net_out_size = net_out_size
self.net_learning_rate = net_learning_rate
self.epochs = epochs
self.ST_steps = st_steps
self.attack_chance = attack_chance
self.loss_history = []
self.nr_clones = nr_clones
self.noise = noise or 10e-5
print("\nNOISE:", self.noise)
self.directory = Path(directory)
self.directory.mkdir(parents=True, exist_ok=True)
# Populating environment & evolving entities
self.parents = []
self.clones = []
self.parents_with_clones = []
self.parents_clones_id_functions = []
self.populate_environment()
self.spawn_and_continue()
# self.weights_evolution_3d_experiment(self.parents, "only_parents")
self.weights_evolution_3d_experiment(self.clones, "only_clones")
self.weights_evolution_3d_experiment(self.parents_with_clones, "parents_with_clones")
# self.weights_evolution_3d_experiment(self.parents_clones_id_functions, "id_f_with_parents")
# self.visualize_loss()
self.distance_matrix = distance_matrix(self.parents_clones_id_functions, print_it=False)
self.parent_clone_distances = distance_from_parent(self.parents_clones_id_functions, print_it=False)
# self.save()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in loop_population_size:
loop_population_size.set_description("Populating experiment %s" % i)
net_name = f"parent_net_{str(i)}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
for _ in range(self.ST_steps):
net.self_train(1, self.log_step_size, self.net_learning_rate)
self.parents.append(net)
self.parents_with_clones.append(net)
if is_identity_function(net):
self.parents_clones_id_functions.append(net)
print(f"\nNet {net.name} is identity function")
if is_divergent(net):
print(f"\nNet {net.name} is divergent")
if is_zero_fixpoint(net):
print(f"\nNet {net.name} is zero fixpoint")
if is_secondary_fixpoint(net):
print(f"\nNet {net.name} is secondary fixpoint")
def evolve(self, population):
print(f"Clone soup has a population of {len(population)} networks")
loop_epochs = tqdm(range(self.epochs - 1))
for i in loop_epochs:
loop_epochs.set_description("\nEvolving clone soup %s" % i)
# A network attacking another network with a given percentage
if random.randint(1, 100) <= self.attack_chance:
random_net1, random_net2 = random.sample(range(len(population)), 2)
random_net1 = population[random_net1]
random_net2 = population[random_net2]
print(f"\n Attack: {random_net1.name} -> {random_net2.name}")
random_net1.attack(random_net2)
# Self-training each network in the population
for j in range(len(population)):
net = population[j]
for _ in range(self.ST_steps):
net.self_train(1, self.log_step_size, self.net_learning_rate)
def spawn_and_continue(self, number_clones: int = None):
number_clones = number_clones or self.nr_clones
df = pd.DataFrame(
columns=['name', 'parent', 'MAE_pre', 'MAE_post', 'MSE_pre', 'MSE_post', 'MIM_pre', 'MIM_post', 'noise',
'status_post'])
# MAE_pre, MSE_pre, MIM_pre = 0, 0, 0
# For every initial net {i} after populating (that is fixpoint after first epoch);
for i in range(len(self.parents)):
net = self.parents[i]
# We set parent start_time to just before this epoch ended, so plotting is zoomed in. Comment out to
# to see full trajectory (but the clones will be very hard to see).
# Make one target to compare distances to clones later when they have trained.
net.start_time = self.ST_steps - 150
net_input_data = net.input_weight_matrix()
net_target_data = net.create_target_weights(net_input_data)
# print(f"\nNet {i} is fixpoint")
# Clone the fixpoint x times and add (+-)self.noise to weight-sets randomly;
# To plot clones starting after first epoch (z=ST_steps), set that as start_time!
# To make sure PCA will plot the same trajectory up until this point, we clone the
# parent-net's weight history as well.
for j in range(number_clones):
clone = Net(net.input_size, net.hidden_size, net.out_size,
f"net_{str(i)}_clone_{str(j)}", start_time=self.ST_steps)
clone.load_state_dict(copy.deepcopy(net.state_dict()))
clone = clone.apply_noise(self.noise)
clone.s_train_weights_history = copy.deepcopy(net.s_train_weights_history)
clone.number_trained = copy.deepcopy(net.number_trained)
# Pre Training distances (after noise application of course)
clone_pre_weights = clone.create_target_weights(clone.input_weight_matrix())
MAE_pre = MAE(net_target_data, clone_pre_weights)
MSE_pre = MSE(net_target_data, clone_pre_weights)
MIM_pre = mean_invariate_manhattan_distance(net_target_data, clone_pre_weights)
df.loc[len(df)] = [clone.name, net.name, MAE_pre, 0, MSE_pre, 0, MIM_pre, 0, self.noise, ""]
net.child_nets.append(clone)
self.clones.append(clone)
self.parents_with_clones.append(clone)
self.evolve(self.clones)
# evolve also with the parents together
# self.evolve(self.parents_with_clones)
for i in range(len(self.parents)):
net = self.parents[i]
net_input_data = net.input_weight_matrix()
net_target_data = net.create_target_weights(net_input_data)
for j in range(len(net.child_nets)):
clone = net.child_nets[j]
# Post Training distances for comparison
clone_post_weights = clone.create_target_weights(clone.input_weight_matrix())
MAE_post = MAE(net_target_data, clone_post_weights)
MSE_post = MSE(net_target_data, clone_post_weights)
MIM_post = mean_invariate_manhattan_distance(net_target_data, clone_post_weights)
# .. log to data-frame and add to nets for 3d plotting if they are fixpoints themselves.
test_status(clone)
if is_identity_function(clone):
print(f"Clone {j} (of net_{i}) is fixpoint."
f"\nMSE({i},{j}): {MSE_post}"
f"\nMAE({i},{j}): {MAE_post}"
f"\nMIM({i},{j}): {MIM_post}\n")
self.parents_clones_id_functions.append(clone)
# df.loc[df.name == clone.name, ["MAE_post", "MSE_post", "MIM_post"]] = [MAE_pre, MSE_pre, MIM_pre]
df.loc[df.name == clone.name, ["MAE_post", "MSE_post", "MIM_post", "status_post"]] = [MAE_post,
MSE_post,
MIM_post,
clone.is_fixpoint]
# Finally take parent net {i} and finish it's training for comparison to clone development.
for _ in range(self.epochs - 1):
for _ in range(self.ST_steps):
net.self_train(1, self.log_step_size, self.net_learning_rate)
net_weights_after = net.create_target_weights(net.input_weight_matrix())
print(f"Parent net's distance to original position."
f"\nMSE(OG,new): {MAE(net_target_data, net_weights_after)}"
f"\nMAE(OG,new): {MSE(net_target_data, net_weights_after)}"
f"\nMIM(OG,new): {mean_invariate_manhattan_distance(net_target_data, net_weights_after)}\n")
self.df = df
def weights_evolution_3d_experiment(self, nets_population, suffix):
exp_name = f"soup_basins_{str(len(nets_population))}_nets_3d_weights_PCA_{suffix}"
return plot_3d_soup(nets_population, exp_name, self.directory)
def visualize_loss(self):
for i in range(len(self.parents)):
net_loss_history = self.parents[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory)
if __name__ == "__main__":
NET_INPUT_SIZE = 4
NET_OUT_SIZE = 1
# Define number of runs & name:
ST_runs = 3
ST_runs_name = "test-27"
soup_ST_steps = 1500
soup_epochs = 2
soup_log_step_size = 10
# Define number of networks & their architecture
nr_clones = 5
soup_population_size = 3
soup_net_hidden_size = 2
soup_net_learning_rate = 0.04
soup_attack_chance = 10
soup_name_hash = random.getrandbits(32)
print(f"Running the Soup-Spawn experiment:")
exp_list = []
for noise_factor in range(2, 5):
exp = SoupSpawnExperiment(
population_size=soup_population_size,
log_step_size=soup_log_step_size,
net_input_size=NET_INPUT_SIZE,
net_hidden_size=soup_net_hidden_size,
net_out_size=NET_OUT_SIZE,
net_learning_rate=soup_net_learning_rate,
epochs=soup_epochs,
st_steps=soup_ST_steps,
attack_chance=soup_attack_chance,
nr_clones=nr_clones,
noise=pow(10, -noise_factor),
directory=Path('output') / 'soup_spawn_basin' / f'{soup_name_hash}' / f'10e-{noise_factor}'
)
exp_list.append(exp)
directory = Path('output') / 'soup_spawn_basin' / f'{soup_name_hash}'
pickle.dump(exp_list, open(f"{directory}/experiment_pickle_{soup_name_hash}.p", "wb"))
print(f"\nSaved experiment to {directory}.")
# Concat all dataframes, and add columns depending on where clone weights end up after training (rel. to parent)
df = pd.concat([exp.df for exp in exp_list])
df = df.dropna().reset_index()
df["relative_distance"] = [ (df.loc[i]["MAE_pre"] - df.loc[i]["MAE_post"]) for i in range(len(df))]
df["class"] = ["approaching" if df.loc[i]["relative_distance"] > 0 else "distancing" if df.loc[i]["relative_distance"] < 0 else "stationary" for i in range(len(df))]
# Countplot of all fixpoint clone after training per class. Uncomment and manually adjust xticklabels if x-ax size gets too small.
ax = sns.catplot(kind="count", data=df, x="noise", hue="class", height=5.27, aspect=12.7 / 5.27)
ax.set_axis_labels("Noise Levels", "Clone Fixpoints After Training Count ", fontsize=15)
# ax.set_xticklabels(labels=('10e-10', '10e-9', '10e-8', '10e-7', '10e-6', '10e-5', '10e-4', '10e-3', '10e-2', '10e-1'), fontsize=15)
plt.savefig(f"{directory}/clone_status_after_countplot_{soup_name_hash}.png")
plt.clf()
# Catplot (either kind="point" or "box") that shows before-after training distances to parent
mlt = df.melt(id_vars=["name", "noise", "class"], value_vars=["MAE_pre", "MAE_post"], var_name="State",
value_name="Distance")
P = ["blue" if mlt.loc[i]["class"] == "approaching" else "orange" if mlt.loc[i]["class"] == "distancing" else "green" for i in range(len(mlt))]
# P = sns.color_palette(P, as_cmap=False)
ax = sns.catplot(data=mlt, x="State", y="Distance", col="noise", hue="name", kind="point", palette=P,
col_wrap=min(5, len(exp_list)), sharey=False, legend=False)
ax.map(sns.boxplot, "State", "Distance", "noise", linewidth=0.8, order=["MAE_pre", "MAE_post"], whis=[0, 100])
ax.set_axis_labels("", "Manhattan Distance To Parent Weights", fontsize=15)
ax.set_xticklabels(labels=('after noise application', 'after training'), fontsize=15)
plt.savefig(f"{directory}/before_after_distance_catplot_{soup_name_hash}.png")
plt.clf()

252
journal_soup_robustness.py Normal file
View File

@ -0,0 +1,252 @@
import copy
import random
from pathlib import Path
from typing import Union
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib.ticker import ScalarFormatter
from tqdm import tqdm
from matplotlib import pyplot as plt
from torch.nn import functional as F
from tabulate import tabulate
from functionalities_test import test_for_fixpoints, is_zero_fixpoint, is_divergent, is_identity_function
from network import Net
from visualization import plot_loss, bar_chart_fixpoints, plot_3d_soup, line_chart_fixpoints
def prng():
return random.random()
class SoupRobustnessExperiment:
def __init__(self, population_size, net_i_size, net_h_size, net_o_size, learning_rate, attack_chance,
train_nets, ST_steps, epochs, log_step_size, directory: Union[str, Path]):
super().__init__()
self.population_size = population_size
self.net_input_size = net_i_size
self.net_hidden_size = net_h_size
self.net_out_size = net_o_size
self.net_learning_rate = learning_rate
self.attack_chance = attack_chance
self.train_nets = train_nets
# self.SA_steps = SA_steps
self.ST_steps = ST_steps
self.epochs = epochs
self.log_step_size = log_step_size
self.loss_history = []
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
# <self.fixpoint_counters_history> is used for keeping track of the amount of fixpoints in %
self.fixpoint_counters_history = []
self.id_functions = []
self.directory = Path(directory)
self.directory.mkdir(parents=True, exist_ok=True)
self.population = []
self.populate_environment()
self.evolve()
self.fixpoint_percentage()
self.weights_evolution_3d_experiment()
self.count_fixpoints()
self.visualize_loss()
self.time_to_vergence, self.time_as_fixpoint = self.test_robustness()
def populate_environment(self):
loop_population_size = tqdm(range(self.population_size))
for i in tqdm(range(self.population_size)):
loop_population_size.set_description("Populating soup experiment %s" % i)
net_name = f"soup_network_{i}"
net = Net(self.net_input_size, self.net_hidden_size, self.net_out_size, net_name)
self.population.append(net)
def evolve(self):
""" Evolving consists of attacking & self-training. """
loop_epochs = tqdm(range(self.epochs))
for i in loop_epochs:
loop_epochs.set_description("Evolving soup %s" % i)
# A network attacking another network with a given percentage
if random.randint(1, 100) <= self.attack_chance:
random_net1, random_net2 = random.sample(range(self.population_size), 2)
random_net1 = self.population[random_net1]
random_net2 = self.population[random_net2]
print(f"\n Attack: {random_net1.name} -> {random_net2.name}")
random_net1.attack(random_net2)
# Self-training each network in the population
for j in range(self.population_size):
net = self.population[j]
for _ in range(self.ST_steps):
net.self_train(1, self.log_step_size, self.net_learning_rate)
# Testing for fixpoints after each batch of ST steps to see relevant data
if i % self.ST_steps == 0:
test_for_fixpoints(self.fixpoint_counters, self.population)
fixpoints_percentage = round(self.fixpoint_counters["identity_func"] / self.population_size, 1)
self.fixpoint_counters_history.append(fixpoints_percentage)
# Resetting the fixpoint counter. Last iteration not to be reset -
# it is important for the bar_chart_fixpoints().
if i < self.epochs:
self.reset_fixpoint_counters()
def test_robustness(self, print_it=True, noise_levels=10, seeds=10):
# assert (len(self.id_functions) == 1 and seeds > 1) or (len(self.id_functions) > 1 and seeds == 1)
is_synthetic = True if len(self.id_functions) > 1 and seeds == 1 else False
avg_time_to_vergence = [[0 for _ in range(noise_levels)] for _ in
range(seeds if is_synthetic else len(self.id_functions))]
avg_time_as_fixpoint = [[0 for _ in range(noise_levels)] for _ in
range(seeds if is_synthetic else len(self.id_functions))]
row_headers = []
data_pos = 0
# This checks wether to use synthetic setting with multiple seeds
# or multi network settings with a singlee seed
df = pd.DataFrame(columns=['seed', 'noise_level', 'application_step', 'absolute_loss'])
for i, fixpoint in enumerate(self.id_functions): # 1 / n
row_headers.append(fixpoint.name)
for seed in range(seeds): # n / 1
for noise_level in range(noise_levels):
self_application_steps = 1
clone = Net(fixpoint.input_size, fixpoint.hidden_size, fixpoint.out_size,
f"{fixpoint.name}_clone_noise10e-{noise_level}")
clone.load_state_dict(copy.deepcopy(fixpoint.state_dict()))
clone = clone.apply_noise(pow(10, -noise_level))
while not is_zero_fixpoint(clone) and not is_divergent(clone):
if is_identity_function(clone):
avg_time_as_fixpoint[i][noise_level] += 1
# -> before
clone_weight_pre_application = clone.input_weight_matrix()
target_data_pre_application = clone.create_target_weights(clone_weight_pre_application)
clone.self_application(1, self.log_step_size)
avg_time_to_vergence[i][noise_level] += 1
# -> after
clone_weight_post_application = clone.input_weight_matrix()
target_data_post_application = clone.create_target_weights(clone_weight_post_application)
absolute_loss = F.l1_loss(target_data_pre_application, target_data_post_application).item()
setting = i if is_synthetic else seed
df.loc[data_pos] = [setting, noise_level, self_application_steps, absolute_loss]
data_pos += 1
self_application_steps += 1
# calculate the average:
df = df.replace([np.inf, -np.inf], np.nan)
df = df.dropna()
# sns.set(rc={'figure.figsize': (10, 50)})
sns.set_theme(style="ticks")
bx = sns.catplot(data=df[df['absolute_loss'] < 1], y='absolute_loss', x='application_step', kind='box',
col='noise_level', col_wrap=3, showfliers=False)
directory = Path('output') / 'robustness'
filename = f"absolute_loss_perapplication_boxplot_grid.png"
filepath = directory / filename
plt.savefig(str(filepath))
if print_it:
col_headers = [str(f"10-{d}") for d in range(noise_levels)]
print(f"\nAppplications steps until divergence / zero: ")
print(tabulate(avg_time_to_vergence, showindex=row_headers, headers=col_headers, tablefmt='orgtbl'))
print(f"\nTime as fixpoint: ")
print(tabulate(avg_time_as_fixpoint, showindex=row_headers, headers=col_headers, tablefmt='orgtbl'))
return avg_time_as_fixpoint, avg_time_to_vergence
def weights_evolution_3d_experiment(self):
exp_name = f"soup_{self.population_size}_nets_{self.ST_steps}_training_{self.epochs}_epochs"
return plot_3d_soup(self.population, exp_name, self.directory)
def count_fixpoints(self):
self.id_functions = test_for_fixpoints(self.fixpoint_counters, self.population)
exp_details = f"Evolution steps: {self.epochs} epochs"
bar_chart_fixpoints(self.fixpoint_counters, self.population_size, self.directory, self.net_learning_rate,
exp_details)
def fixpoint_percentage(self):
runs = self.epochs / self.ST_steps
SA_steps = None
line_chart_fixpoints(self.fixpoint_counters_history, runs, self.ST_steps, SA_steps, self.directory,
self.population_size)
def visualize_loss(self):
for i in range(len(self.population)):
net_loss_history = self.population[i].loss_history
self.loss_history.append(net_loss_history)
plot_loss(self.loss_history, self.directory)
def reset_fixpoint_counters(self):
self.fixpoint_counters = {
"identity_func": 0,
"divergent": 0,
"fix_zero": 0,
"fix_weak": 0,
"fix_sec": 0,
"other_func": 0
}
if __name__ == "__main__":
NET_INPUT_SIZE = 4
NET_OUT_SIZE = 1
soup_epochs = 100
soup_log_step_size = 5
soup_ST_steps = 20
# soup_SA_steps = 10
# Define number of networks & their architecture
soup_population_size = 4
soup_net_hidden_size = 2
soup_net_learning_rate = 0.04
# soup_attack_chance in %
soup_attack_chance = 10
# not used yet: soup_train_nets has 3 possible values "no", "before_SA", "after_SA".
soup_train_nets = "no"
soup_name_hash = random.getrandbits(32)
soup_synthetic = True
print(f"Running the robustness comparison experiment:")
SoupRobustnessExperiment(
population_size=soup_population_size,
net_i_size=NET_INPUT_SIZE,
net_h_size=soup_net_hidden_size,
net_o_size=NET_OUT_SIZE,
learning_rate=soup_net_learning_rate,
attack_chance=soup_attack_chance,
train_nets=soup_train_nets,
ST_steps=soup_ST_steps,
epochs=soup_epochs,
log_step_size=soup_log_step_size,
directory=Path('output') / 'robustness' / f'{soup_name_hash}'
)

150
main.py Normal file
View File

@ -0,0 +1,150 @@
from experiments import *
import random
# TODO maybe add also SA to the soup
def run_experiments(run_ST, run_SA, run_soup, run_mixed, run_robustness):
if run_ST:
print(f"Running the ST experiment:")
run_ST_experiment(ST_population_size, ST_log_step_size, NET_INPUT_SIZE, ST_net_hidden_size, NET_OUT_SIZE,
ST_net_learning_rate,
ST_epochs, ST_runs, ST_runs_name, ST_name_hash)
if run_SA:
print(f"\n Running the SA experiment:")
run_SA_experiment(SA_population_size, SA_log_step_size, NET_INPUT_SIZE, SA_net_hidden_size, NET_OUT_SIZE,
SA_net_learning_rate, SA_runs, SA_runs_name, SA_name_hash,
SA_steps, SA_train_nets, SA_ST_steps)
if run_soup:
print(f"\n Running the soup experiment:")
run_soup_experiment(soup_population_size, soup_attack_chance, NET_INPUT_SIZE, soup_net_hidden_size,
NET_OUT_SIZE, soup_net_learning_rate, soup_epochs, soup_log_step_size, soup_runs,
soup_runs_name, soup_name_hash, soup_ST_steps, soup_train_nets)
if run_mixed:
print(f"\n Running the mixed experiment:")
run_mixed_experiment(mixed_population_size, NET_INPUT_SIZE, mixed_net_hidden_size, NET_OUT_SIZE,
mixed_net_learning_rate, mixed_train_nets, mixed_epochs, mixed_SA_steps,
mixed_ST_steps_between_SA, mixed_log_step_size, mixed_name_hash, mixed_total_runs,
mixed_runs_name)
if run_robustness:
print(f"Running the robustness experiment:")
run_robustness_experiment(rob_population_size, rob_log_step_size, NET_INPUT_SIZE, rob_net_hidden_size,
NET_OUT_SIZE, rob_net_learning_rate, rob_ST_steps, rob_runs, rob_runs_name,
rob_name_hash)
if not run_ST and not run_SA and not run_soup and not run_mixed and not run_robustness:
print(f"No experiments to be run.")
if __name__ == '__main__':
# Constants:
NET_INPUT_SIZE = 4
NET_OUT_SIZE = 1
run_ST_experiment_bool = False
run_SA_experiment_bool = False
run_soup_experiment_bool = False
run_mixed_experiment_bool = False
run_robustness_bool = True
""" ------------------------------------- Self-training (ST) experiment ------------------------------------- """
# Define number of runs & name:
ST_runs = 1
ST_runs_name = "test-27"
ST_epochs = 1000
ST_log_step_size = 10
# Define number of networks & their architecture
ST_population_size = 1
ST_net_hidden_size = 2
ST_net_learning_rate = 0.04
ST_name_hash = random.getrandbits(32)
""" ----------------------------------- Self-application (SA) experiment ----------------------------------- """
# Define number of runs, name, etc.:
SA_runs_name = "test-17"
SA_runs = 2
SA_steps = 100
SA_app_batch_size = 5
SA_train_batch_size = 5
SA_log_step_size = 5
# Define number of networks & their architecture
SA_population_size = 10
SA_net_hidden_size = 2
SA_net_learning_rate = 0.04
# SA_train_nets has 3 possible values "no", "before_SA", "after_SA".
SA_train_nets = "no"
SA_ST_steps = 300
SA_name_hash = random.getrandbits(32)
""" -------------------------------------------- Soup experiment -------------------------------------------- """
# Define number of runs, name, etc.:
soup_runs = 1
soup_runs_name = "test-16"
soup_epochs = 100
soup_log_step_size = 5
soup_ST_steps = 20
# soup_SA_steps = 10
# Define number of networks & their architecture
soup_population_size = 5
soup_net_hidden_size = 2
soup_net_learning_rate = 0.04
# soup_attack_chance in %
soup_attack_chance = 10
# not used yet: soup_train_nets has 3 possible values "no", "before_SA", "after_SA".
soup_train_nets = "no"
soup_name_hash = random.getrandbits(32)
""" ------------------------------------------- Mixed experiment -------------------------------------------- """
# Define number of runs, name, etc.:
mixed_runs_name = "test-17"
mixed_total_runs = 2
# Define number of networks & their architecture
mixed_population_size = 5
mixed_net_hidden_size = 2
mixed_epochs = 10
# Set the <batch_size> to the same value as <ST_steps_between_SA> to see the weights plotted
# ONLY after each epoch, and not after a certain amount of steps.
mixed_log_step_size = 5
mixed_ST_steps_between_SA = 50
mixed_SA_steps = 4
mixed_net_learning_rate = 0.04
# mixed_train_nets has 2 possible values "before_SA", "after_SA".
mixed_train_nets = "after_SA"
mixed_name_hash = random.getrandbits(32)
""" ----------------------------------------- Robustness experiment ----------------------------------------- """
# Define number of runs & name:
rob_runs = 1
rob_runs_name = "test-07"
rob_ST_steps = 1500
rob_log_step_size = 10
# Define number of networks & their architecture
rob_population_size = 1
rob_net_hidden_size = 2
rob_net_learning_rate = 0.04
rob_name_hash = random.getrandbits(32)
""" ---------------------------------------- Running the experiment ----------------------------------------- """
run_experiments(run_ST_experiment_bool, run_SA_experiment_bool, run_soup_experiment_bool, run_mixed_experiment_bool,
run_robustness_bool)

443
network.py Normal file
View File

@ -0,0 +1,443 @@
# from __future__ import annotations
import copy
import random
from math import sqrt
from typing import Union
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import optim, Tensor
from tqdm import tqdm
def prng():
return random.random()
class Net(nn.Module):
@staticmethod
def create_target_weights(input_weight_matrix: Tensor) -> Tensor:
""" Outputting a tensor with the target weights. """
# What kind of slow shit is this?
# target_weight_matrix = np.arange(len(input_weight_matrix)).reshape(len(input_weight_matrix), 1).astype("f")
# for i in range(len(input_weight_matrix)):
# target_weight_matrix[i] = input_weight_matrix[i][0]
# Fast and simple
return input_weight_matrix[:, 0].unsqueeze(-1)
@staticmethod
def are_weights_diverged(network_weights):
""" Testing if the weights are eiter converging to infinity or -infinity. """
# Slow and shitty:
# for layer_id, layer in enumerate(network_weights):
# for cell_id, cell in enumerate(layer):
# for weight_id, weight in enumerate(cell):
# if torch.isnan(weight):
# return True
# if torch.isinf(weight):
# return True
# return False
# Fast and modern:
return any(x.isnan.any() or x.isinf().any() for x in network_weights.parameters)
def apply_weights(self, new_weights: Tensor):
""" Changing the weights of a network to new given values. """
# TODO: Change this to 'parameters' version
i = 0
for layer_id, layer_name in enumerate(self.state_dict()):
for line_id, line_values in enumerate(self.state_dict()[layer_name]):
for weight_id, weight_value in enumerate(self.state_dict()[layer_name][line_id]):
self.state_dict()[layer_name][line_id][weight_id] = new_weights[i]
i += 1
return self
def __init__(self, i_size: int, h_size: int, o_size: int, name=None, start_time=1) -> None:
super().__init__()
self.start_time = start_time
self.name = name
self.child_nets = []
self.input_size = i_size
self.hidden_size = h_size
self.out_size = o_size
self.no_weights = h_size * (i_size + h_size * (h_size - 1) + o_size)
""" Data saved in self.s_train_weights_history & self.s_application_weights_history is used for experiments. """
self.s_train_weights_history = []
self.s_application_weights_history = []
self.loss_history = []
self.trained = False
self.number_trained = 0
self.is_fixpoint = ""
self.layers = nn.ModuleList(
[nn.Linear(i_size, h_size, False),
nn.Linear(h_size, h_size, False),
nn.Linear(h_size, o_size, False)]
)
self._weight_pos_enc_and_mask = None
@property
def _weight_pos_enc(self):
if self._weight_pos_enc_and_mask is None:
d = next(self.parameters()).device
weight_matrix = []
for layer_id, layer in enumerate(self.layers):
x = next(layer.parameters())
weight_matrix.append(
torch.cat(
(
# Those are the weights
torch.full((x.numel(), 1), 0, device=d),
# Layer enumeration
torch.full((x.numel(), 1), layer_id, device=d),
# Cell Enumeration
torch.arange(layer.out_features, device=d).repeat_interleave(layer.in_features).view(-1, 1),
# Weight Enumeration within the Cells
torch.arange(layer.in_features, device=d).view(-1, 1).repeat(layer.out_features, 1),
*(torch.full((x.numel(), 1), 0, device=d) for _ in range(self.input_size-4))
), dim=1)
)
# Finalize
weight_matrix = torch.cat(weight_matrix).float()
# Normalize 1,2,3 column of dim 1
last_pos_idx = self.input_size - 4
norm2 = weight_matrix[:, 1:-last_pos_idx].pow(2).sum(keepdim=True, dim=0).sqrt()
weight_matrix[:, 1:-last_pos_idx] = (weight_matrix[:, 1:-last_pos_idx] / norm2) + 1e-8
# computations
# create a mask where pos is 0 if it is to be replaced
mask = torch.ones_like(weight_matrix)
mask[:, 0] = 0
self._weight_pos_enc_and_mask = weight_matrix, mask
return tuple(x.clone() for x in self._weight_pos_enc_and_mask)
def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
def normalize(self, value, norm):
raise NotImplementedError
# FIXME, This is bullshit, the code does not do what the docstring explains
# Obsolete now
""" Normalizing the values >= 1 and adding pow(10, -8) to the values equal to 0 """
if norm > 1:
return float(value) / float(norm)
else:
return float(value)
def input_weight_matrix(self) -> Tensor:
""" Calculating the input tensor formed from the weights of the net """
weight_matrix = torch.cat([x.view(-1, 1) for x in self.parameters()])
pos_enc, mask = self._weight_pos_enc
weight_matrix = pos_enc * mask + weight_matrix.expand(-1, pos_enc.shape[-1]) * (1 - mask)
return weight_matrix
def self_train(self,
training_steps: int,
log_step_size: int = 0,
learning_rate: float = 0.0004,
save_history: bool = True
) -> (Tensor, list):
""" Training a network to predict its own weights in order to self-replicate. """
optimizer = optim.SGD(self.parameters(), lr=learning_rate, momentum=0.9)
for training_step in range(training_steps):
self.number_trained += 1
optimizer.zero_grad()
input_data = self.input_weight_matrix()
target_data = self.create_target_weights(input_data)
output = self(input_data)
loss = F.mse_loss(output, target_data)
loss.backward()
optimizer.step()
if save_history:
# Saving the history of the weights after a certain amount of steps (aka log_step_size) for research.
# If it is a soup/mixed env. save weights only at the end of all training steps (aka a soup/mixed epoch)
if "soup" not in self.name and "mixed" not in self.name:
weights = self.create_target_weights(self.input_weight_matrix())
# If self-training steps are lower than 10, then append weight history after each ST step.
if self.number_trained < 10:
self.s_train_weights_history.append(weights.T.detach().numpy())
self.loss_history.append(loss.item())
else:
if log_step_size != 0:
if self.number_trained % log_step_size == 0:
self.s_train_weights_history.append(weights.T.detach().numpy())
self.loss_history.append(loss.item())
weights = self.create_target_weights(self.input_weight_matrix())
# Saving weights only at the end of a soup/mixed exp. epoch.
if save_history:
if "soup" in self.name or "mixed" in self.name:
self.s_train_weights_history.append(weights.T.detach().numpy())
self.loss_history.append(loss.item())
self.trained = True
return loss, self.loss_history
def self_application(self, SA_steps: int, log_step_size: Union[int, None] = None):
""" Inputting the weights of a network to itself for a number of steps, without backpropagation. """
for i in range(SA_steps):
output = self(self.input_weight_matrix())
# Saving the weights history after a certain amount of steps (aka log_step_size) for research purposes.
# If self-application steps are lower than 10, then append weight history after each SA step.
if SA_steps < 10:
weights = self.create_target_weights(self.input_weight_matrix())
self.s_application_weights_history.append(weights.T.detach().numpy())
else:
weights = self.create_target_weights(self.input_weight_matrix())
if i % log_step_size == 0:
self.s_application_weights_history.append(weights.T.detach().numpy())
""" See after how many steps of SA is the output not changing anymore: """
# print(f"Self-app. step {i+1}: {Experiment.changing_rate(output2, output)}")
_ = self.apply_weights(output)
return self
def attack(self, other_net):
other_net_weights = other_net.input_weight_matrix()
my_evaluation = self(other_net_weights)
return other_net.apply_weights(my_evaluation)
def melt(self, other_net):
try:
melted_name = self.name + other_net.name
except AttributeError:
melted_name = None
melted_weights = self.create_target_weights(other_net.input_weight_matrix())
self_weights = self.create_target_weights(self.input_weight_matrix())
weight_indxs = list(range(len(self_weights)))
random.shuffle(weight_indxs)
for weight_idx in weight_indxs[:len(melted_weights) // 2]:
melted_weights[weight_idx] = self_weights[weight_idx]
melted_net = Net(i_size=self.input_size, h_size=self.hidden_size, o_size=self.out_size, name=melted_name)
melted_net.apply_weights(melted_weights)
return melted_net
def apply_noise(self, noise_size: float):
""" Changing the weights of a network to values + noise """
for layer_id, layer_name in enumerate(self.state_dict()):
for line_id, line_values in enumerate(self.state_dict()[layer_name]):
for weight_id, weight_value in enumerate(self.state_dict()[layer_name][line_id]):
# network.state_dict()[layer_name][line_id][weight_id] = weight_value + noise
if prng() < 0.5:
self.state_dict()[layer_name][line_id][weight_id] = weight_value + noise_size * prng()
else:
self.state_dict()[layer_name][line_id][weight_id] = weight_value - noise_size * prng()
return self
class SecondaryNet(Net):
def self_train(self, training_steps: int, log_step_size: int, learning_rate: float) -> (pd.DataFrame, Tensor, list):
""" Training a network to predict its own weights in order to self-replicate. """
optimizer = optim.SGD(self.parameters(), lr=learning_rate, momentum=0.9)
df = pd.DataFrame(columns=['step', 'loss', 'first_to_target_loss', 'second_to_target_loss', 'second_to_first_loss'])
is_diverged = False
for training_step in range(training_steps):
self.number_trained += 1
optimizer.zero_grad()
input_data = self.input_weight_matrix()
target_data = self.create_target_weights(input_data)
intermediate_output = self(input_data)
second_input = copy.deepcopy(input_data)
second_input[:, 0] = intermediate_output.squeeze()
output = self(second_input)
second_to_target_loss = F.mse_loss(output, target_data)
first_to_target_loss = F.mse_loss(intermediate_output, target_data * -1)
second_to_first_loss = F.mse_loss(intermediate_output, output)
if any([torch.isnan(x) or torch.isinf(x) for x in [second_to_first_loss, first_to_target_loss, second_to_target_loss]]):
print('is nan')
is_diverged = True
break
loss = second_to_target_loss + first_to_target_loss
df.loc[df.shape[0]] = [df.shape[0], loss.detach().numpy().item(),
first_to_target_loss.detach().numpy().item(),
second_to_target_loss.detach().numpy().item(),
second_to_first_loss.detach().numpy().item()]
loss.backward()
optimizer.step()
self.trained = True
return df, is_diverged
class MetaCell(nn.Module):
def __init__(self, name, interface):
super().__init__()
self.name = name
self.interface = interface
self.weight_interface = 5
self.net_hidden_size = 4
self.net_ouput_size = 1
self.meta_weight_list = nn.ModuleList()
self.meta_weight_list.extend(
[Net(self.weight_interface, self.net_hidden_size,
self.net_ouput_size, name=f'{self.name}_W{weight_idx}'
) for weight_idx in range(self.interface)]
)
self.__bed_mask = None
@property
def _bed_mask(self):
if self.__bed_mask is None:
d = next(self.parameters()).device
embedding = torch.zeros(1, self.weight_interface, device=d)
# computations
# create a mask where pos is 0 if it is to be replaced
mask = torch.ones_like(embedding)
mask[:, -1] = 0
self.__bed_mask = embedding, mask
return tuple(x.clone() for x in self.__bed_mask)
def forward(self, x):
embedding, mask = self._bed_mask
expanded_mask = mask.expand(*x.shape, embedding.shape[-1])
embedding = embedding.repeat(*x.shape, 1)
# Row-wise
# xs = x.unsqueeze(-1).expand(-1, -1, embedding.shape[-1]).swapdims(0, 1)
# Column-wise
xs = x.unsqueeze(-1).expand(-1, -1, embedding.shape[-1])
xs = embedding * expanded_mask + xs * (1 - expanded_mask)
# ToDo Speed this up!
tensor = torch.hstack([meta_weight(xs[:, idx, :]) for idx, meta_weight in enumerate(self.meta_weight_list)])
tensor = torch.sum(tensor, dim=-1, keepdim=True)
return tensor
@property
def particles(self):
return (net for net in self.meta_weight_list)
class MetaLayer(nn.Module):
def __init__(self, name, interface=4, width=4, residual_skip=True):
super().__init__()
self.residual_skip = residual_skip
self.name = name
self.interface = interface
self.width = width
self.meta_cell_list = nn.ModuleList()
self.meta_cell_list.extend([MetaCell(name=f'{self.name}_C{cell_idx}',
interface=interface
) for cell_idx in range(self.width)]
)
def forward(self, x):
cell_results = []
for metacell in self.meta_cell_list:
cell_results.append(metacell(x))
tensor = torch.hstack(cell_results)
if self.residual_skip and x.shape == tensor.shape:
tensor += x
return tensor
@property
def particles(self):
return (weight for metacell in self.meta_cell_list for weight in metacell.particles)
class MetaNet(nn.Module):
def __init__(self, interface=4, depth=3, width=4, out=1, activation=None):
super().__init__()
self.activation = activation
self.out = out
self.interface = interface
self.width = width
self.depth = depth
self._meta_layer_list = nn.ModuleList()
self._meta_layer_list.append(MetaLayer(name=f'L{0}',
interface=self.interface,
width=self.width)
)
self._meta_layer_list.extend([MetaLayer(name=f'L{layer_idx + 1}',
interface=self.width, width=self.width
) for layer_idx in range(self.depth - 2)]
)
self._meta_layer_list.append(MetaLayer(name=f'L{len(self._meta_layer_list)}',
interface=self.width, width=self.out)
)
def replace_with_zero(self, ident_key):
replaced_particles = 0
for particle in self.particles:
if particle.is_fixpoint == ident_key:
particle.load_state_dict(
{key: torch.zeros_like(state) for key, state in particle.state_dict().items()}
)
replaced_particles += 1
tqdm.write(f'Particle Parameters replaced: {str(replaced_particles)}')
return self
def forward(self, x):
tensor = x
for meta_layer in self._meta_layer_list:
tensor = meta_layer(tensor)
return tensor
@property
def particles(self):
return (cell for metalayer in self._meta_layer_list for cell in metalayer.particles)
def combined_self_train(self, external_optimizer):
losses = []
for particle in self.particles:
# Zero your gradients for every batch!
external_optimizer.zero_grad()
# Intergrate optimizer and backward function
input_data = particle.input_weight_matrix()
target_data = particle.create_target_weights(input_data)
output = particle(input_data)
loss = F.mse_loss(output, target_data)
losses.append(loss.detach)
loss.backward()
# Adjust learning weights
external_optimizer.step()
# return torch.hstack(losses).sum(dim=-1, keepdim=True)
return sum(losses)
if __name__ == '__main__':
metanet = MetaNet(interface=3, depth=5, width=3, out=1)
next(metanet.particles).input_weight_matrix()
metanet(torch.hstack([torch.full((2, 1), x) for x in range(metanet.interface)]))
a = metanet.particles
print('Test')
print('Test')
print('Test')
print('Test')
print('Test')

14
requirements.txt Normal file
View File

@ -0,0 +1,14 @@
torch~=1.8.1+cpu
tqdm~=4.60.0
numpy~=1.20.3
matplotlib~=3.4.2
sklearn~=0.0
scipy
tabulate~=0.8.9
scikit-learn~=0.24.2
pandas~=1.2.4
seaborn~=0.11.1
future~=0.18.2
torchmetrics~=0.7.0
torchvision~=0.9.1+cpu

281
visualization.py Normal file
View File

@ -0,0 +1,281 @@
from pathlib import Path
from typing import List, Dict, Union
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import numpy as np
from sklearn.decomposition import PCA
import random
import string
from matplotlib import rcParams
rcParams['axes.labelpad'] = 20
def plot_output(output):
""" Plotting the values of the final output """
plt.figure()
plt.imshow(output)
plt.colorbar()
plt.show()
def plot_loss(loss_array, directory: Union[str, Path], batch_size=1):
""" Plotting the evolution of the loss function."""
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(12)
for i in range(len(loss_array)):
plt.plot(loss_array[i], label=f"Last loss value: {str(loss_array[i][len(loss_array[i])-1])}")
plt.legend()
plt.xlabel("Epochs")
plt.ylabel("Loss")
directory = Path(directory)
filename = "nets_loss_function.png"
file_path = directory / filename
plt.savefig(str(file_path))
plt.clf()
def bar_chart_fixpoints(fixpoint_counter: Dict, population_size: int, directory: Union[str, Path], learning_rate: float,
exp_details: str, source_check=None):
""" Plotting the number of fixpoints in a barchart. """
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(12)
legend_population_size = mpatches.Patch(color="white", label=f"No. of nets: {str(population_size)}")
learning_rate = mpatches.Patch(color="white", label=f"Learning rate: {str(learning_rate)}")
epochs = mpatches.Patch(color="white", label=f"{str(exp_details)}")
if source_check == "summary":
plt.legend(handles=[legend_population_size, learning_rate, epochs])
plt.ylabel("No. of nets/run")
plt.title("Summary: avg. amount of fixpoints/run")
else:
plt.legend(handles=[legend_population_size, learning_rate, epochs])
plt.ylabel("Number of networks")
plt.title("Fixpoint count")
plt.bar(range(len(fixpoint_counter)), list(fixpoint_counter.values()), align='center')
plt.xticks(range(len(fixpoint_counter)), list(fixpoint_counter.keys()))
directory = Path(directory)
directory.mkdir(parents=True, exist_ok=True)
filename = f"{str(population_size)}_nets_fixpoints_barchart.png"
filepath = directory / filename
plt.savefig(str(filepath))
plt.clf()
def plot_3d(matrices_weights_history, directory: Union[str, Path], population_size, z_axis_legend,
exp_name="experiment", is_trained="", batch_size=1, plot_pca_together=False, nets_array=None):
""" Plotting the the weights of the nets in a 3d form using principal component analysis (PCA) """
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(12)
pca = PCA(n_components=2, whiten=True)
ax = plt.axes(projection='3d')
if plot_pca_together:
weight_histories = []
start_times = []
for wh, st in matrices_weights_history:
start_times.append(st)
wm = np.array(wh)
n, x, y = wm.shape
wm = wm.reshape(n, x * y)
weight_histories.append(wm)
weight_data = np.array(weight_histories)
n, x, y = weight_data.shape
weight_data = weight_data.reshape(n*x, y)
pca.fit(weight_data)
weight_data_pca = pca.transform(weight_data)
for transformed_trajectory, start_time in zip(np.split(weight_data_pca, n), start_times):
start_log_time = int(start_time / batch_size)
xdata = transformed_trajectory[start_log_time:, 0]
ydata = transformed_trajectory[start_log_time:, 1]
zdata = np.arange(start_time, len(ydata)*batch_size+start_time, batch_size).tolist()
ax.plot3D(xdata, ydata, zdata, label=f"net")
ax.scatter(xdata, ydata, zdata, s=7)
else:
loop_matrices_weights_history = tqdm(range(len(matrices_weights_history)))
for i in loop_matrices_weights_history:
loop_matrices_weights_history.set_description("Plotting weights 3D PCA %s" % i)
weight_matrix, start_time = matrices_weights_history[i]
weight_matrix = np.array(weight_matrix)
n, x, y = weight_matrix.shape
weight_matrix = weight_matrix.reshape(n, x * y)
pca.fit(weight_matrix)
weight_matrix_pca = pca.transform(weight_matrix)
xdata, ydata = [], []
start_log_time = int(start_time / 10)
for j in range(start_log_time, len(weight_matrix_pca)):
xdata.append(weight_matrix_pca[j][0])
ydata.append(weight_matrix_pca[j][1])
zdata = np.arange(start_time, len(ydata)*batch_size+start_time, batch_size)
ax.plot3D(xdata, ydata, zdata, label=f"net {i}", c="b")
if "parent" in nets_array[i].name:
ax.scatter(np.asarray(xdata), np.asarray(ydata), zdata, s=3, c="b")
else:
ax.scatter(np.asarray(xdata), np.asarray(ydata), zdata, s=3)
#steps = mpatches.Patch(color="white", label=f"{z_axis_legend}: {len(matrices_weights_history)} steps")
population_size = mpatches.Patch(color="white", label=f"Population: {population_size} networks")
if False:
if z_axis_legend == "Self-application":
if is_trained == '_trained':
trained = mpatches.Patch(color="white", label=f"Trained: true")
else:
trained = mpatches.Patch(color="white", label=f"Trained: false")
ax.legend(handles=[population_size, trained])
else:
ax.legend(handles=[population_size])
ax.set_title(f"PCA Transformed Weight Trajectories")
# ax.set_xlabel("PCA Transformed X-Axis")
# ax.set_ylabel("PCA Transformed Y-Axis")
ax.set_zlabel(f"Self Training Steps")
# FIXME: Replace this kind of operation with pathlib.Path() object interactions
directory = Path(directory)
directory.mkdir(parents=True, exist_ok=True)
filename = f"{exp_name}{is_trained}.png"
filepath = directory / filename
if filepath.exists():
letters = string.ascii_lowercase
random_letters = ''.join(random.choice(letters) for _ in range(5))
plt.savefig(f"{filepath.stem}_{random_letters}.png")
else:
plt.savefig(str(filepath))
plt.show()
def plot_3d_self_train(nets_array: List, exp_name: str, directory: Union[str, Path], batch_size: int, plot_pca_together: bool):
""" Plotting the evolution of the weights in a 3D space when doing self training. """
matrices_weights_history = []
loop_nets_array = tqdm(range(len(nets_array)))
for i in loop_nets_array:
loop_nets_array.set_description("Creating ST weights history %s" % i)
matrices_weights_history.append((nets_array[i].s_train_weights_history, nets_array[i].start_time))
z_axis_legend = "epochs"
return plot_3d(matrices_weights_history, directory, len(nets_array), z_axis_legend, exp_name, "", batch_size,
plot_pca_together=plot_pca_together, nets_array=nets_array)
def plot_3d_self_application(nets_array: List, exp_name: str, directory_name: Union[str, Path], batch_size: int) -> None:
""" Plotting the evolution of the weights in a 3D space when doing self application. """
matrices_weights_history = []
loop_nets_array = tqdm(range(len(nets_array)))
for i in loop_nets_array:
loop_nets_array.set_description("Creating SA weights history %s" % i)
matrices_weights_history.append( (nets_array[i].s_application_weights_history, nets_array[i].start_time) )
if nets_array[i].trained:
is_trained = "_trained"
else:
is_trained = "_not_trained"
# Fixme: Are the both following lines on the correct intendation? -> Value of "is_trained" changes multiple times!
z_axis_legend = "epochs"
plot_3d(matrices_weights_history, directory_name, len(nets_array), z_axis_legend, exp_name, is_trained, batch_size)
def plot_3d_soup(nets_list, exp_name, directory: Union[str, Path]):
""" Plotting the evolution of the weights in a 3D space for the soup environment. """
# This batch size is not relevant for soups. To not affect the number of epochs shown in the 3D plot,
# will send forward the number "1" for batch size with the variable <irrelevant_batch_size>.
irrelevant_batch_size = 1
# plot_3d_self_train(nets_list, exp_name, directory, irrelevant_batch_size, False)
plot_3d_self_train(nets_list, exp_name, directory, 10, True)
def line_chart_fixpoints(fixpoint_counters_history: list, epochs: int, ST_steps_between_SA: int,
SA_steps, directory: Union[str, Path], population_size: int):
""" Plotting the percentage of fixpoints after each iteration of SA & ST steps. """
fig = plt.figure()
fig.set_figheight(10)
fig.set_figwidth(12)
ST_steps_per_SA = np.arange(0, ST_steps_between_SA * epochs, ST_steps_between_SA).tolist()
legend_population_size = mpatches.Patch(color="white", label=f"No. of nets: {str(population_size)}")
legend_SA_steps = mpatches.Patch(color="white", label=f"SA_steps: {str(SA_steps)}")
legend_SA_and_ST_runs = mpatches.Patch(color="white", label=f"SA_and_ST_runs: {str(epochs)}")
legend_ST_steps_between_SA = mpatches.Patch(color="white", label=f"ST_steps_between_SA: {str(ST_steps_between_SA)}")
plt.legend(handles=[legend_population_size, legend_SA_and_ST_runs, legend_SA_steps, legend_ST_steps_between_SA])
plt.xlabel("Epochs")
plt.ylabel("Percentage")
plt.title("Percentage of fixpoints")
plt.plot(ST_steps_per_SA, fixpoint_counters_history, color="green", marker="o")
directory = Path(directory)
filename = f"{str(population_size)}_nets_fixpoints_linechart.png"
filepath = directory / filename
plt.savefig(str(filepath))
plt.clf()
def box_plot(data, directory: Union[str, Path], population_size):
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(10, 7))
# ax = fig.add_axes([0, 0, 1, 1])
plt.title("Fixpoint variation")
plt.xlabel("Amount of noise")
plt.ylabel("Steps")
# data = numpy.array(data)
# ax.boxplot(data)
axs[1].boxplot(data)
axs[1].set_title('Box plot')
directory = Path(directory)
filename = f"{str(population_size)}_nets_fixpoints_barchart.png"
filepath = directory / filename
plt.savefig(str(filepath))
plt.clf()
def write_file(text, directory: Union[str, Path]):
directory = Path(directory)
filepath = directory / 'experiment.txt'
with filepath.open('w+') as f:
f.write(text)
f.close()