How to replace a LSTM model with a TCN

36 Views Asked by At

I am trying to replace an LSTM model with a TCN for the same task. I think I have succeeded in the testing phase but I am having issues in the testing phase as I want to change the input data shape.

During training, the Input data shape of TCN = (16000, 50, 96) where 96 are the time steps.

In the testing phase, I want to use data stored in the first symbol of each sample as my input e.g. X[i, 0, :]. The problem then becomes that the TCN is expecting 50 input channels. How can I fix this without tiling this input data?

The following is the LSTM code that I am trying to replace with a TCN code listed below as well:

else:
    # We are running the testing phase
    mobility = sys.argv[1]
    channel_model = sys.argv[2]
    modulation_order = sys.argv[3]
    scheme = sys.argv[4]
    testing_snr = sys.argv[5]
    if modulation_order == 'QPSK':
        modu_way = 1
    elif modulation_order == '16QAM':
        modu_way = 2

    for n_snr in SNR_index:
        mat = loadmat('./{}_{}_{}_{}_LSTM_testing_dataset_{}.mat'.format(mobility, channel_model, modulation_order, scheme, n_snr))
        Dataset = mat['LSTM_Datasets']
        Dataset = Dataset[0, 0]
        X = Dataset['Test_X']
        Y = Dataset['Test_Y']
        yf_d = Dataset['Y_DataSubCarriers']
        print('Loaded Dataset Inputs: ', X.shape)
        print('Loaded Dataset Outputs: ', Y.shape)
        print('Loaded Testing OFDM Frames: ', yf_d.shape)
        hf_DL = np.zeros((yf_d.shape[0], yf_d.shape[1], yf_d.shape[2]), dtype="complex64")
        device = torch.device("cpu")
        NET = torch.load('./{}_{}_{}_{}_LSTM_{}.pkl'.format(mobility, channel_model, modulation_order, scheme, testing_snr)).to(device)
        scaler = StandardScaler()

        # For over all Frames
        for i in range(yf_d.shape[0]):
            hf = X[i, 0, :]
            hn, cn = None, None
            print('Testing Frame | ', i)
            # For over OFDM Symbols
            for j in range(yf_d.shape[1]):
                hf_input = hf
                input1 = scaler.fit_transform(hf_input.reshape(-1, 2)).reshape(hf_input.shape)
                input2 = torch.from_numpy(input1).type(torch.FloatTensor).unsqueeze(0)
                output, hn, cn = NET(input2.to(device), hn, cn)  # ([1,96])
                out = scaler.inverse_transform(output.detach().cpu().numpy().reshape(-1, 2)).reshape(output.shape)
                hf_out = out[:, :48] + 1j * out[:, 48:]  # (1,48)
                hf_DL[i, j, :] = hf_out
                sf = yf_d[i, j, :] / hf_out  # (1,48)
                x = fn.demap(sf, modu_way)
                xf = fn.map(x, modu_way)
                hf_out = yf_d[i, j, :] / xf
                hf_out = hf_out.ravel()
                if j < yf_d.shape[1] - 1:
                    hf_out_Expanded = np.concatenate((hf_out.real, hf_out.imag), axis=0)
                    X[i, j + 1, DSC_IDX] = hf_out_Expanded
                    hf = 0.5 * hf + 0.5 * X[i, j + 1, :]
        # Save Results
        result_path = './{}_{}_{}_{}_LSTM_Results_{}.pickle'.format(mobility, channel_model, modulation_order, scheme, n_snr)
        dest_name = './{}_{}_{}_{}_LSTM_Results_{}.mat'.format(mobility, channel_model, modulation_order, scheme, n_snr)
        with open(result_path, 'wb') as f:
            pickle.dump([X, Y, hf_DL], f)

        a = pickle.load(open(result_path, "rb"))
        scipy.io.savemat(dest_name, {
            '{}_test_x_{}'.format(scheme, n_snr): a[0],
            '{}_test_y_{}'.format(scheme, n_snr): a[1],
            '{}_corrected_y_{}'.format(scheme, n_snr): a[2]
        })
        print("Data successfully converted to .mat file ")
        os.remove(result_path)

TCN code to replace the LSTM code:

num_ofdm_symbols = 50
num_subcarriers = 96
num_channels = [50, 50]    

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# Initialize and load the TCN model
model_path = f'./{mobility}_{channel_model}_{modulation_order}_{scheme}_TCN_{testing_snr}.pt'

model = TemporalConvNet(num_inputs=num_inputs, num_channels=num_channels, kernel_size=4, dropout=0.000001)
model.load_state_dict(torch.load(model_path, map_location=device))
scaler = StandardScaler()

model.eval()        

with torch.no_grad():         
    for n_snr in SNR_index:
        mat = loadmat(f'./{mobility}_{channel_model}_{modulation_order}_{scheme}_TCN_testing_dataset_{n_snr}.mat')
        Dataset = mat['TCN_Datasets']
        Dataset = Dataset[0, 0]
        X_original = Dataset['Test_X']
        Y = Dataset['Test_Y']
        yf_d = Dataset['Y_DataSubCarriers']
        print('Loaded Dataset Inputs: ', X_original.shape)
        print('Loaded Dataset Outputs: ', Y.shape)
        print('Loaded Testing OFDM Frames: ', yf_d.shape)

        X = X_original
        
        hf_DL_TCN = np.zeros((yf_d.shape[0], yf_d.shape[1], yf_d.shape[2]), dtype="complex64")
        print("Shape of hf_DL_TCN: ", hf_DL_TCN.shape)

        for i in range(yf_d.shape[0]):
            print(f'Processing Frame | {i}')

            for j in range(yf_d.shape[1]):
                initial_channel_est = X[i, 0, :]
                initial_channel_est = scaler.fit_transform(initial_channel_est.reshape(-1, 2)).reshape(initial_channel_est.shape)
                input_data = np.tile(initial_channel_est, (50, 1))
                input_tensor = torch.from_numpy(input_data).type(torch.FloatTensor).unsqueeze(0)
                #input_tensor = torch.from_numpy(X[i]).type(torch.FloatTensor).unsqueeze(0)
                output_tensor = model(input_tensor.to(device))  # (1, 50, 96)
                
                output_data = scaler.inverse_transform(output_tensor.detach().cpu().numpy().reshape(-1, 2)).reshape(output_tensor.shape)
                    
                hf_out = output_data[0, j, :48] + 1j * output_data[0, j, 48:]
          
                if j == 0:
                    hf_DL_TCN[i, j, :] = hf_out
                else:
                    y_eq = yf_d[i, j, :] / hf_DL_TCN[i, j-1, :]
                    q = fn.map(fn.demap(y_eq, modu_way), modu_way)
                    hf_DL_TCN[i, j, :] = yf_d[i, j, :] / q
                                       

        # Save Results
        result_path = f'./{mobility}_{channel_model}_{modulation_order}_{scheme}_TCN_Results_{n_snr}.pickle'
        dest_name = f'./{mobility}_{channel_model}_{modulation_order}_{scheme}_TCN_Results_{n_snr}.mat'
        with open(result_path, 'wb') as f:
            pickle.dump([X, Y, hf_DL_TCN], f)
        scipy.io.savemat(dest_name, {f'{scheme}_TCN_test_x_{n_snr}': X,
                                     f'{scheme}_TCN_test_y_{n_snr}': Y,
                                     f'{scheme}_TCN_predicted_y_{n_snr}': hf_DL_TCN})
        print("Data successfully converted to .mat file")
        os.remove(result_path)

I have tried tiling (hf = X[i, 0, :]) so that I can have 50 expected channels by the TCN model. This is not the best way to do it., I suspect.

0

There are 0 best solutions below