add more efficient (lazy) experience queue implementation based on tensor, adjusted marl algorithms
This commit is contained in:
@ -1,89 +1,93 @@
|
||||
import torch
|
||||
from typing import Union, List
|
||||
from torch import Tensor
|
||||
import numpy as np
|
||||
from collections import deque
|
||||
import torch
|
||||
from typing import Union
|
||||
from torch import Tensor
|
||||
from torch.utils.data import Dataset, ConcatDataset
|
||||
import random
|
||||
|
||||
|
||||
class ActorCriticMemory(object):
|
||||
def __init__(self):
|
||||
def __init__(self, capacity=10):
|
||||
self.capacity = capacity
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.__states = []
|
||||
self.__actions = []
|
||||
self.__rewards = []
|
||||
self.__dones = []
|
||||
self.__hiddens_actor = []
|
||||
self.__hiddens_critic = []
|
||||
self.__logits = []
|
||||
self.__values = []
|
||||
self.__actions = LazyTensorFiFoQueue(maxlen=self.capacity+1)
|
||||
self.__hidden_actor = LazyTensorFiFoQueue(maxlen=self.capacity+1)
|
||||
self.__hidden_critic = LazyTensorFiFoQueue(maxlen=self.capacity+1)
|
||||
self.__states = LazyTensorFiFoQueue(maxlen=self.capacity+1)
|
||||
self.__rewards = LazyTensorFiFoQueue(maxlen=self.capacity+1)
|
||||
self.__dones = LazyTensorFiFoQueue(maxlen=self.capacity+1)
|
||||
self.__logits = LazyTensorFiFoQueue(maxlen=self.capacity+1)
|
||||
self.__values = LazyTensorFiFoQueue(maxlen=self.capacity+1)
|
||||
|
||||
def __len__(self):
|
||||
return len(self.__states)
|
||||
return len(self.__rewards) - 1
|
||||
|
||||
@property
|
||||
def observation(self): # add time dimension through stacking
|
||||
return torch.stack(self.__states, 0).unsqueeze(0) # 1 x timesteps x hidden dim
|
||||
def observation(self, sls=slice(0, None)): # add time dimension through stacking
|
||||
return self.__states[sls].unsqueeze(0) # 1 x time x hidden dim
|
||||
|
||||
@property
|
||||
def hidden_actor(self):
|
||||
if len(self.__hiddens_actor) == 1:
|
||||
return self.__hiddens_actor[0]
|
||||
return torch.stack(self.__hiddens_actor, 0) # layers x timesteps x hidden dim
|
||||
def hidden_actor(self, sls=slice(0, None)): # 1 x n_layers x dim
|
||||
return self.__hidden_actor[sls].unsqueeze(0) # 1 x time x n_layers x dim
|
||||
|
||||
@property
|
||||
def hidden_critic(self):
|
||||
if len(self.__hiddens_critic) == 1:
|
||||
return self.__hiddens_critic[0]
|
||||
return torch.stack(self.__hiddens_critic, 0) # layers x timesteps x hidden dim
|
||||
def hidden_critic(self, sls=slice(0, None)): # 1 x n_layers x dim
|
||||
return self.__hidden_critic[sls].unsqueeze(0) # 1 x time x n_layers x dim
|
||||
|
||||
@property
|
||||
def reward(self):
|
||||
return torch.tensor(self.__rewards).float().unsqueeze(0) # 1 x timesteps
|
||||
def reward(self, sls=slice(0, None)):
|
||||
return self.__rewards[sls].squeeze().unsqueeze(0) # 1 x time
|
||||
|
||||
@property
|
||||
def action(self):
|
||||
return torch.tensor(self.__actions).long().unsqueeze(0) # 1 x timesteps+1
|
||||
def action(self, sls=slice(0, None)):
|
||||
return self.__actions[sls].long().squeeze().unsqueeze(0) # 1 x time
|
||||
|
||||
@property
|
||||
def done(self):
|
||||
return torch.tensor(self.__dones).float().unsqueeze(0) # 1 x timesteps
|
||||
def done(self, sls=slice(0, None)):
|
||||
return self.__dones[sls].float().squeeze().unsqueeze(0) # 1 x time
|
||||
|
||||
@property
|
||||
def logits(self): # assumes a trailing 1 for time dimension - common when using output from NN
|
||||
return torch.cat(self.__logits, 0).unsqueeze(0) # 1 x timesteps x actions
|
||||
def logits(self, sls=slice(0, None)): # assumes a trailing 1 for time dimension - common when using output from NN
|
||||
return self.__logits[sls].squeeze().unsqueeze(0) # 1 x time x actions
|
||||
|
||||
@property
|
||||
def values(self):
|
||||
return torch.cat(self.__values, 0).unsqueeze(0) # 1 x timesteps x actions
|
||||
def values(self, sls=slice(0, None)):
|
||||
return self.__values[sls].squeeze().unsqueeze(0) # 1 x time x actions
|
||||
|
||||
def add_observation(self, state: Union[Tensor, np.ndarray]):
|
||||
self.__states.append(state if isinstance(state, Tensor) else torch.from_numpy(state))
|
||||
|
||||
def add_hidden_actor(self, hidden: Tensor):
|
||||
# 1x layers x hidden dim
|
||||
if len(hidden.shape) < 3: hidden = hidden.unsqueeze(0)
|
||||
self.__hiddens_actor.append(hidden)
|
||||
# layers x hidden dim
|
||||
self.__hidden_actor.append(hidden)
|
||||
|
||||
def add_hidden_critic(self, hidden: Tensor):
|
||||
# 1x layers x hidden dim
|
||||
if len(hidden.shape) < 3: hidden = hidden.unsqueeze(0)
|
||||
self.__hiddens_critic.append(hidden)
|
||||
# layers x hidden dim
|
||||
self.__hidden_critic.append(hidden)
|
||||
|
||||
def add_action(self, action: int):
|
||||
def add_action(self, action: Union[int, Tensor]):
|
||||
if not isinstance(action, Tensor):
|
||||
action = torch.tensor(action)
|
||||
self.__actions.append(action)
|
||||
|
||||
def add_reward(self, reward: float):
|
||||
def add_reward(self, reward: Union[float, Tensor]):
|
||||
if not isinstance(reward, Tensor):
|
||||
reward = torch.tensor(reward)
|
||||
self.__rewards.append(reward)
|
||||
|
||||
def add_done(self, done: bool):
|
||||
if not isinstance(done, Tensor):
|
||||
done = torch.tensor(done)
|
||||
self.__dones.append(done)
|
||||
|
||||
def add_logits(self, logits: Tensor):
|
||||
self.__logits.append(logits)
|
||||
|
||||
def add_values(self, logits: Tensor):
|
||||
self.__values.append(logits)
|
||||
def add_values(self, values: Tensor):
|
||||
self.__values.append(values)
|
||||
|
||||
def add(self, **kwargs):
|
||||
for k, v in kwargs.items():
|
||||
@ -92,10 +96,10 @@ class ActorCriticMemory(object):
|
||||
|
||||
|
||||
class MARLActorCriticMemory(object):
|
||||
def __init__(self, n_agents):
|
||||
def __init__(self, n_agents, capacity):
|
||||
self.n_agents = n_agents
|
||||
self.memories = [
|
||||
ActorCriticMemory() for _ in range(n_agents)
|
||||
ActorCriticMemory(capacity) for _ in range(n_agents)
|
||||
]
|
||||
|
||||
def __call__(self, agent_i):
|
||||
@ -109,50 +113,109 @@ class MARLActorCriticMemory(object):
|
||||
mem.reset()
|
||||
|
||||
def add(self, **kwargs):
|
||||
# todo try catch - print all possible functions
|
||||
for agent_i in range(self.n_agents):
|
||||
for k, v in kwargs.items():
|
||||
func = getattr(ActorCriticMemory, f'add_{k}')
|
||||
func(self.memories[agent_i], v[agent_i])
|
||||
|
||||
@property
|
||||
def observation(self):
|
||||
all_obs = [mem.observation for mem in self.memories]
|
||||
return torch.cat(all_obs, 0) # agents x timesteps+1 x ...
|
||||
def __getattr__(self, attr):
|
||||
all_attrs = [getattr(mem, attr) for mem in self.memories]
|
||||
return torch.cat(all_attrs, 0) # agents x time ...
|
||||
|
||||
def chunk_dataloader(self, chunk_len, k):
|
||||
datasets = [ExperienceChunks(mem, chunk_len, k) for mem in self.memories]
|
||||
dataset = ConcatDataset(datasets)
|
||||
data = [dataset[i] for i in range(len(dataset))]
|
||||
data = custom_collate_fn(data)
|
||||
return data
|
||||
|
||||
|
||||
def custom_collate_fn(batch):
|
||||
elem = batch[0]
|
||||
return {key: torch.cat([d[key] for d in batch], dim=0) for key in elem}
|
||||
|
||||
|
||||
class ExperienceChunks(Dataset):
|
||||
def __init__(self, memory, chunk_len, k):
|
||||
assert chunk_len <= len(memory), 'chunk_len cannot be longer than the size of the memory'
|
||||
self.memory = memory
|
||||
self.chunk_len = chunk_len
|
||||
self.k = k
|
||||
|
||||
@property
|
||||
def action(self):
|
||||
all_actions = [mem.action for mem in self.memories]
|
||||
return torch.cat(all_actions, 0) # agents x timesteps+1 x ...
|
||||
def whitelist(self):
|
||||
whitelist = torch.ones(len(self.memory) - self.chunk_len)
|
||||
for d in self.memory.done.squeeze().nonzero().flatten():
|
||||
whitelist[max((0, d-self.chunk_len-1)):d+2] = 0
|
||||
whitelist[0] = 0
|
||||
return whitelist.tolist()
|
||||
|
||||
@property
|
||||
def done(self):
|
||||
all_dones = [mem.done for mem in self.memories]
|
||||
return torch.cat(all_dones, 0).float() # agents x timesteps x ...
|
||||
def sample(self, start=1):
|
||||
cl = self.chunk_len
|
||||
sample = dict(observation=self.memory.observation[:, start:start+cl+1],
|
||||
action=self.memory.action[:, start-1:start+cl],
|
||||
hidden_actor=self.memory.hidden_actor[:, start-1],
|
||||
hidden_critic=self.memory.hidden_critic[:, start-1],
|
||||
reward=self.memory.reward[:, start:start + cl],
|
||||
done=self.memory.done[:, start:start + cl],
|
||||
logits=self.memory.logits[:, start:start + cl],
|
||||
values=self.memory.values[:, start:start + cl])
|
||||
return sample
|
||||
|
||||
def __len__(self):
|
||||
return self.k
|
||||
|
||||
def __getitem__(self, i):
|
||||
idx = random.choices(range(0, len(self.memory) - self.chunk_len), weights=self.whitelist, k=1)
|
||||
return self.sample(idx[0])
|
||||
|
||||
|
||||
class LazyTensorFiFoQueue:
|
||||
def __init__(self, maxlen):
|
||||
self.maxlen = maxlen
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.__lazy_queue = deque(maxlen=self.maxlen)
|
||||
self.shape = None
|
||||
self.queue = None
|
||||
|
||||
def shape_init(self, tensor: Tensor):
|
||||
self.shape = torch.Size([self.maxlen, *tensor.shape])
|
||||
|
||||
def build_tensor_queue(self):
|
||||
if len(self.__lazy_queue) > 0:
|
||||
block = torch.stack(list(self.__lazy_queue), dim=0)
|
||||
l = block.shape[0]
|
||||
if self.queue is None:
|
||||
self.queue = block
|
||||
elif self.true_len() <= self.maxlen:
|
||||
self.queue = torch.cat((self.queue, block), dim=0)
|
||||
else:
|
||||
self.queue = torch.cat((self.queue[l:], block), dim=0)
|
||||
self.__lazy_queue.clear()
|
||||
|
||||
def append(self, data):
|
||||
if self.shape is None:
|
||||
self.shape_init(data)
|
||||
self.__lazy_queue.append(data)
|
||||
if len(self.__lazy_queue) >= self.maxlen:
|
||||
self.build_tensor_queue()
|
||||
|
||||
def true_len(self):
|
||||
return len(self.__lazy_queue) + (0 if self.queue is None else self.queue.shape[0])
|
||||
|
||||
def __len__(self):
|
||||
return min((self.true_len(), self.maxlen))
|
||||
|
||||
def __str__(self):
|
||||
return f'LazyTensorFiFoQueue\tmaxlen: {self.maxlen}, shape: {self.shape}, ' \
|
||||
f'len: {len(self)}, true_len: {self.true_len()}, elements in lazy queue: {len(self.__lazy_queue)}'
|
||||
|
||||
def __getitem__(self, item_or_slice):
|
||||
self.build_tensor_queue()
|
||||
return self.queue[item_or_slice]
|
||||
|
||||
@property
|
||||
def reward(self):
|
||||
all_rewards = [mem.reward for mem in self.memories]
|
||||
return torch.cat(all_rewards, 0).float() # agents x timesteps x ...
|
||||
|
||||
@property
|
||||
def hidden_actor(self):
|
||||
all_ha = [mem.hidden_actor for mem in self.memories]
|
||||
return torch.cat(all_ha, 0) # agents x layers x x timesteps x hidden dim
|
||||
|
||||
@property
|
||||
def hidden_critic(self):
|
||||
all_hc = [mem.hidden_critic for mem in self.memories]
|
||||
return torch.cat(all_hc, 0) # agents x layers x timesteps x hidden dim
|
||||
|
||||
@property
|
||||
def logits(self):
|
||||
all_lgts = [mem.logits for mem in self.memories]
|
||||
return torch.cat(all_lgts, 0) # agents x layers x timesteps x hidden dim
|
||||
|
||||
@property
|
||||
def values(self):
|
||||
all_vals = [mem.values for mem in self.memories]
|
||||
return torch.cat(all_vals, 0) # agents x layers x timesteps x hidden dim
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user