2021-11-11 10:59:13 +01:00

221 lines
8.1 KiB
Python

from typing import NamedTuple, Union
from collections import deque, OrderedDict, defaultdict
import numpy as np
import random
import pandas as pd
import torch
import torch.nn as nn
from tqdm import trange
class Experience(NamedTuple):
# can be use for a single (s_t, a, r s_{t+1}) tuple
# or for a batch of tuples
observation: np.ndarray
next_observation: np.ndarray
action: np.ndarray
reward: Union[float, np.ndarray]
done : Union[bool, np.ndarray]
episode: int = -1
class BaseLearner:
def __init__(self, env, n_agents=1, train_every=('step', 4), n_grad_steps=1, stack_n_frames=1):
assert train_every[0] in ['step', 'episode'], 'train_every[0] must be one of ["step", "episode"]'
self.env = env
self.n_agents = n_agents
self.n_grad_steps = n_grad_steps
self.train_every = train_every
self.stack_n_frames = deque(maxlen=stack_n_frames)
self.device = 'cpu'
self.n_updates = 0
self.step = 0
self.episode_step = 0
self.episode = 0
self.running_reward = deque(maxlen=5)
def to(self, device):
self.device = device
for attr, value in self.__dict__.items():
if isinstance(value, nn.Module):
value = value.to(self.device)
return self
def get_action(self, obs) -> Union[int, np.ndarray]:
pass
def on_new_experience(self, experience):
pass
def on_step_end(self, n_steps):
pass
def on_episode_end(self, n_steps):
pass
def on_all_done(self):
pass
def train(self):
pass
def reward(self, r):
return r
def learn(self, n_steps):
train_type, train_freq = self.train_every
while self.step < n_steps:
obs, done = self.env.reset(), False
total_reward = 0
self.episode_step = 0
while not done:
action = self.get_action(obs)
next_obs, reward, done, info = self.env.step(action if not len(action) == 1 else action[0])
experience = Experience(observation=obs, next_observation=next_obs,
action=action, reward=self.reward(reward),
done=done, episode=self.episode) # do we really need to copy?
self.on_new_experience(experience)
# end of step routine
obs = next_obs
total_reward += reward
self.step += 1
self.episode_step += 1
self.on_step_end(n_steps)
if train_type == 'step' and (self.step % train_freq == 0):
self.train()
self.n_updates += 1
self.on_episode_end(n_steps)
if train_type == 'episode' and (self.episode % train_freq == 0):
self.train()
self.n_updates += 1
self.running_reward.append(total_reward)
self.episode += 1
try:
if self.step % 100 == 0:
print(
f'Step: {self.step} ({(self.step / n_steps) * 100:.2f}%)\tRunning reward: {sum(list(self.running_reward)) / len(self.running_reward):.2f}\t'
f' eps: {self.eps:.4f}\tRunning loss: {sum(list(self.running_loss)) / len(self.running_loss):.4f}\tUpdates:{self.n_updates}')
except Exception as e:
pass
self.on_all_done()
def evaluate(self, n_episodes=100, render=False):
with torch.no_grad():
data = []
for eval_i in trange(n_episodes):
obs, done = self.env.reset(), False
while not done:
action = self.get_action(obs)
next_obs, reward, done, info = self.env.step(action if not len(action) == 1 else action[0])
if render: self.env.render()
obs = next_obs # srsly i'm so stupid
info.update({'reward': reward, 'eval_episode': eval_i})
data.append(info)
return pd.DataFrame(data).fillna(0)
class BaseBuffer:
def __init__(self, size: int):
self.size = size
self.experience = deque(maxlen=size)
def __len__(self):
return len(self.experience)
def add(self, exp: Experience):
self.experience.append(exp)
def sample(self, k, cer=4):
sample = random.choices(self.experience, k=k-cer)
for i in range(cer): sample += [self.experience[-i]]
observations = torch.stack([torch.from_numpy(e.observation) for e in sample], 0).float()
next_observations = torch.stack([torch.from_numpy(e.next_observation) for e in sample], 0).float()
actions = torch.tensor([e.action for e in sample]).long()
rewards = torch.tensor([e.reward for e in sample]).float().view(-1, 1)
dones = torch.tensor([e.done for e in sample]).float().view(-1, 1)
#print(observations.shape, next_observations.shape, actions.shape, rewards.shape, dones.shape)
return Experience(observations, next_observations, actions, rewards, dones)
class TrajectoryBuffer(BaseBuffer):
def __init__(self, size):
super(TrajectoryBuffer, self).__init__(size)
self.experience = defaultdict(list)
def add(self, exp: Experience):
self.experience[exp.episode].append(exp)
if len(self.experience) > self.size:
oldest_traj_key = list(sorted(self.experience.keys()))[0]
del self.experience[oldest_traj_key]
def soft_update(local_model, target_model, tau):
# taken from https://github.com/BY571/Munchausen-RL/blob/master/M-DQN.ipynb
for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
target_param.data.copy_(tau*local_param.data + (1.-tau)*target_param.data)
def mlp_maker(dims, flatten=False, activation='elu', activation_last='identity'):
activations = {'elu': nn.ELU, 'relu': nn.ReLU, 'sigmoid': nn.Sigmoid,
'leaky_relu': nn.LeakyReLU, 'tanh': nn.Tanh,
'gelu': nn.GELU, 'identity': nn.Identity}
layers = [('Flatten', nn.Flatten())] if flatten else []
for i in range(1, len(dims)):
layers.append((f'Layer #{i - 1}: Linear', nn.Linear(dims[i - 1], dims[i])))
activation_str = activation if i != len(dims)-1 else activation_last
layers.append((f'Layer #{i - 1}: {activation_str.capitalize()}', activations[activation_str]()))
return nn.Sequential(OrderedDict(layers))
class BaseDQN(nn.Module):
def __init__(self, dims=[3*5*5, 64, 64, 9]):
super(BaseDQN, self).__init__()
self.net = mlp_maker(dims, flatten=True)
@torch.no_grad()
def act(self, x) -> np.ndarray:
action = self.forward(x).max(-1)[1].numpy()
return action
def forward(self, x):
return self.net(x)
class BaseDDQN(BaseDQN):
def __init__(self,
backbone_dims=[3*5*5, 64, 64],
value_dims=[64, 1],
advantage_dims=[64, 9],
activation='elu'):
super(BaseDDQN, self).__init__(backbone_dims)
self.net = mlp_maker(backbone_dims, activation=activation, flatten=True)
self.value_head = mlp_maker(value_dims)
self.advantage_head = mlp_maker(advantage_dims)
def forward(self, x):
features = self.net(x)
advantages = self.advantage_head(features)
values = self.value_head(features)
return values + (advantages - advantages.mean())
class BaseICM(nn.Module):
def __init__(self, backbone_dims=[2*3*5*5, 64, 64], head_dims=[2*64, 64, 9]):
super(BaseICM, self).__init__()
self.backbone = mlp_maker(backbone_dims, flatten=True, activation_last='relu', activation='relu')
self.icm = mlp_maker(head_dims)
self.ce = nn.CrossEntropyLoss()
def forward(self, s0, s1, a):
phi_s0 = self.backbone(s0)
phi_s1 = self.backbone(s1)
cat = torch.cat((phi_s0, phi_s1), dim=1)
a_prime = torch.softmax(self.icm(cat), dim=-1)
ce = self.ce(a_prime, a)
return dict(prediction=a_prime, loss=ce)