I am trying to implement a bidirectional RNN using pytorch. I have short texts of variable lengths, which I tokenize and get their lengths. Each text has words inside, and I use a Word2vec model to turn each word into a vector. So a text is a list of such vectors. As I said, they differ in length, so I use pad_sequence, pack_padded_sequence and pad_packed_sequence to have the model read them correctly.
tokenized = [text_word_vectors(text, model, 100) for text in texts]
valid = [text_word_vectors(text, model, 100) for text in texts]
and this is a optuna case for training
class MultiClassDataset(Dataset):
def __init__(self, X, y, lengths):
self.X = X
self.y = torch.from_numpy(y).type(torch.LongTensor)
self.len = len(X)
self.lengths = lengths
def __getitem__(self, index):
if self.lengths is not None:
return self.X[index], self.y[index], self.lengths[index]
else:
return self.X[index], self.y[index]
def __len__(self):
return self.len
def objective(trial):
lengths_train = [len(text) for text in tokenized]
lengths_valid = [len(text) for text in valid_tokenized]
batch_size = 240
def collate_fn(batch):
tweets, labels, lengths = zip(*batch)
padded = pad_sequence([torch.FloatTensor(text) for text in texts], batch_first=True, padding_value=0.0)
return padded, torch.LongTensor(labels), torch.LongTensor(lengths)
transformed_dataset = MultiClassDataset(tokenized, y_train, lengths_train)
train_loader = DataLoader(dataset=transformed_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
transformed_valid_dataset = MultiClassDataset(valid_tokenized, y_valid, lengths_valid)
valid_loader = DataLoader(dataset=transformed_valid_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
trial_number = trial.number
# Initialize lists to store losses for plotting the learning curve
train_losses = []
val_losses = []
D_in = 100
D_out = 3
# Suggest hyperparameters to optimize
hidden_size = trial.suggest_int('hidden_size', 64, 64)
learning_rate = trial.suggest_float('LR', 0.0001, 0.01)
num_layers = trial.suggest_int('num_layers', 4,4)
dropout = trial.suggest_float('dropout', 0.4, 0.5, step=0.025)
clip = trial.suggest_float('clip', 0.1, 0.7, step=0.025)
cell_type = trial.suggest_categorical('cell_type', ['GRU', 'LSTM'])
skip = trial.suggest_categorical('skip', [False, False])
neurnet = RNN(cell_type, D_in, hidden_size, D_out, batch_size, device, num_layers, skip, dropout, True)
neurnet.train()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(neurnet.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=6, gamma=0.1)
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
total_train_loss = 0.0
total_train_batches = 0
neurnet.train()
# Training
for batch_idx, (inputs, labels, lengths) in enumerate(train_loader):
inputs, labels, lengths = inputs.to(device), labels.to(device), lengths.to(device)
optimizer.zero_grad()
outputs_packed = neurnet(inputs,lengths)
loss = criterion(outputs_packed, labels)
nn.utils.clip_grad_norm_(neurnet.parameters(), clip)
total_train_loss += loss.item()
loss.backward()
optimizer.step()
total_train_batches += 1
# Validation
neurnet.eval()
total_val_loss = 0.0
total_val_batches = 0
with torch.no_grad():
for batch in valid_loader:
if len(batch) == 2:
val_inputs, val_labels = batch
elif len(batch) == 3:
val_inputs, val_labels, lengths = batch
else:
raise ValueError("Unexpected number of elements in the batch")
val_inputs = val_inputs.to(device)
lengths = lengths.to(device)
val_outputs = neurnet(val_inputs,lengths)
val_loss = criterion(val_outputs, val_labels.to(device)) # Compute loss
total_val_loss += val_loss.item() # Accumulate total_val_loss
# Convert logits to predictions
preds = torch.argmax(val_outputs, dim=1).cpu().numpy()
# Collect predictions and labels for precision, recall, and F1 score calculation
total_val_batches += 1 # Increment total_val_batches
average_val_loss = total_val_loss / total_val_batches
average_train_loss = total_train_loss / total_train_batches
train_losses.append(average_train_loss)
val_losses.append(average_val_loss)
# Update the learning rate based on validation loss
# lr_scheduler.step(average_val_loss)
scheduler.step()
print(f"Epoch {epoch}, Loss: {average_train_loss}, Valuation Loss: {average_val_loss}, Learning Rate: {optimizer.param_groups[0]['lr']}")
# Report intermediate results to Optuna
trial.report(average_val_loss, epoch)
# Handle pruning based on the intermediate value
if trial.should_prune():
raise optuna.TrialPruned()
My problem is at how to define the model!
class RNN(nn.Module):
"""
Wrapper module for RNN, LSTM, GRU cells using pytorch api. The output is fed to a FFN for classification.
"""
def __init__(self, cell_type, input_size, hidden_size, output_size, batch_size,device,num_layers=2, skip_connections=False, dropout=0.0,attention=False, num_heads=1):
super(RNN, self).__init__()
cells = {
"RNN" : nn.RNN,
"LSTM" : nn.LSTM,
"GRU" : nn.GRU
}
self.hidden_size = hidden_size
self.num_layers = num_layers
self.input_size = input_size
self.cell_type = cell_type
self.batch_size = batch_size
self.dropout = nn.Dropout(dropout)
self.skip_connections = skip_connections
self.attention = attention
if not skip_connections:
self.rnn = cells[cell_type]( # Pick the specific model
bidirectional=True,
input_size=input_size, # Number of features for each time step
hidden_size=hidden_size, # rnn hidden units
batch_first=True, # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
num_layers=num_layers,
dropout=dropout,
device=device
)
else:
self.rnn = nn.ModuleList()
for i in range(num_layers):
if i != 0:
self.rnn.append(cells[cell_type](
bidirectional=True,
input_size=hidden_size*2, # Number of features for each time step
hidden_size=hidden_size, # rnn hidden units
batch_first=True, # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
num_layers=1,
device=device
))
else:
self.rnn.append(cells[cell_type](
bidirectional=True,
input_size=input_size, # Number of features for each time step
hidden_size=hidden_size, # rnn hidden units
batch_first=True, # input & output will has batch size as 1s dimension. e.g. (batch, time_step, input_size)
num_layers=1,
device=device
))
self.out = nn.Linear(hidden_size*2, output_size,device=device) # Feed forward network
if self.attention:
self.Q = nn.Linear(hidden_size*2,hidden_size*2, bias=False, device=device)
self.K = nn.Linear(hidden_size*2,hidden_size*2, bias=False, device=device)
self.V = nn.Linear(hidden_size*2,hidden_size*2, bias=False, device=device)
self.att = nn.MultiheadAttention(self.hidden_size*2, num_heads=num_heads ,batch_first=True, device=device)
def forward(self, x,lengths):
# x shape (batch, time_step, input_size)
# r_out shape (batch, time_step, output_size)
# h_n shape (n_layers, batch, hidden_size)
# h_c shape (n_layers, batch, hidden_size)
inputs_packed = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)
if not self.skip_connections:
if self.cell_type == 'LSTM':
r_out, (h_n, h_c) = self.rnn(inputs_packed)
else:
r_out, h_n = self.rnn(inputs_packed)
r_in, _ = pad_packed_sequence(r_out, batch_first=True)
print(r_in.shape)
# Bidirectional
# out = torch.cat((h_n[-2, :, :], h_n[-1, :, :]), dim=1)
# Now, proceed with the linear layer
# print("Before linear: out", out.shape)
# out = self.out(out)
if self.attention:
Q = self.Q(r_in)
K = self.K(r_in)
V = self.V(r_in)
att, _ = self.att(Q, K, V)
att = att.squeeze(dim=1)
out = att + r_in
print(out.shape)
t = torch.cat((out[-2, :, :], out[-1, :, :]), dim=1)
print(t.shape)
out = t
else:
out = torch.cat((h_n[-2, :, :], h_n[-1, :, :]), dim=1)
print(out.shape)
return self.out(out)
# print("Out", out.shape)
else:
The problem I have is regarding the different dimensions. My process of though is that, the input (x) and its lengths get inside the model. My first job is to remove the padding, for which i use pack_padded_sequence. Since its already padded, this will remove the padding. Now I can feed that into the RNN. After that, I have to feed the output of the model, into the different heads for attention. However, in order to do that, I have to repack the output of the model, from a PackedSequence object, in order to get a tensor. this works fine thus far. From print(r_in.shape) after the pad_packed_sequence I get [240,50,128] which is the [batch_size,length,hidden_size*2] as I understand it. The print(out.shape) also yields [240,50,128] which is correct so far well. I then want to use that to concatenate the last two layers as discussed here but the tensor I get then is torch.Size([50, 256]) (print(t.shape)). This yields a mat1 and mat2 shapes cannot be multiplied (50x256 and 128x3) error, from the self.out(out). What am I doing wrong? Why doesnt the concatenation work?