I am trying to train a Physics Informed Neural Network which has two outputs from Two Different PDEs.
The First PDE is the 2D Heat Equation while the second PDE is the 2D Oxygen diffusion in a 2D domain.
I splitted the loss functions into two functions one for the Heat equation (First PDE) and one for the second PDE.
I didn't combine the two losses together to reduce the error and to ensure stability. In addition, the Boundary conditions and initial Conditions for each PDE are different.
I created two training loop.
The part of the code that contains the PINN is in the following:
class FCN(nn.Module):
def __init__(self, layers):
super(FCN, self).__init__()
self.activation = nn.Tanh()
self.loss_function = nn.MSELoss(reduction='mean')
self.linears = nn.ModuleList([nn.Linear(layers[i], layers[i + 1]) for i in range(len(layers) - 1)])
for i in range(len(layers) - 1):
nn.init.xavier_normal_(self.linears[i].weight.data, gain=1)
nn.init.zeros_(self.linears[i].bias.data)
def forward(self, x):
if not torch.is_tensor(x):
x = torch.from_numpy(x)
x = x.float()
# Assuming ub and lb are defined elsewhere in your code
u_b = ub.float().to(device)
l_b = lb.float().to(device)
x = (x - l_b) / (u_b - l_b) # feature scaling
a = x
for i in range(len(self.linears) - 1):
z = self.linears[i](a)
a = self.activation(z)
a = self.linears[-1](a)
return a
def loss_T(self, x_BC_T, y_BC_T, x_PDE, alpha):
# Boundary condition loss for T
output_BC_T = self.forward(x_BC_T)
loss_BC_T = self.loss_function(output_BC_T, y_BC_T)
g = x_PDE.clone()
g.requires_grad = True
output = self.forward(g) # This outputs both T and Pb, shape [n_samples, 2]
self.Temp = output[:, 0:1] # Splitting Temp
# Gradient of T with respect to input
T_grad = autograd.grad(self.Temp, g, torch.ones([g.shape[0], 1]).to(device), retain_graph=True, create_graph=True)[0]
# Laplacian of T
T_grad2 = autograd.grad(T_grad, g, torch.ones(g.shape).to(device), create_graph=True)[0]
T_t = T_grad[:, [2]] # Assuming the 3rd column of g corresponds to time
T_xx = T_grad2[:, [0]] # Assuming the 1st column of g corresponds to x
T_yy = T_grad2[:, [1]] # Assuming the 2nd column of g corresponds to y
# Heat equation for T
T_res = T_t - alpha * (T_xx + T_yy)
loss_pde_T = self.loss_function(T_res, torch.zeros(T_t.shape).to(device))
# L2 regularization
l2_reg = torch.tensor(0.).to(device)
for param in self.parameters():
l2_reg += torch.norm(param) ** 2
# Total loss
total_loss_T = loss_BC_T + loss_pde_T
return total_loss_T
def loss_Pb(self, x_BC_Pb, y_BC_Pb, x_PDE, fcf, D_0, c_0, R, h, dCA_dt):
# Boundary condition loss for Pb
output_BC_Pb = self.forward(x_BC_Pb)
loss_BC_Pb = self.loss_function(output_BC_Pb, y_BC_Pb)
g = x_PDE.clone()
g.requires_grad = True
output = self.forward(g) # This outputs both T and Pb, shape [n_samples, 2]
Pb = output[:, 1:2] # Splitting Pb
# Gradient of Pb with respect to input
Pb_grad = autograd.grad(Pb, g, torch.ones([g.shape[0], 1]).to(device), retain_graph=True, create_graph=True)[0]
# Laplacian of Pb
Pb_grad2 = autograd.grad(Pb_grad, g, torch.ones(g.shape).to(device), create_graph=True)[0]
Pb_t = Pb_grad[:, [2]]
Pb_xx = Pb_grad2[:, [0]]
Pb_yy = Pb_grad2[:, [1]]
# PDE for Pb, incorporating T
Pb_res = Pb_t - fcf * D_0 * (Pb_xx + Pb_yy) + (c_0 * self.Temp * R / h) * dCA_dt
loss_pde_Pb = self.loss_function(Pb_res, torch.zeros(Pb_t.shape).to(device))
# L2 regularization
l2_reg = torch.tensor(0.).to(device)
for param in self.parameters():
l2_reg += torch.norm(param) ** 2
# Total loss
total_loss_Pb = loss_BC_Pb + loss_pde_Pb
return total_loss_Pb
# Initialize the model, optimizer, etc.
layers = np.array([3, 256, 2]) # Two Outputs T and Pb
PINN = FCN(layers)
PINN.to(device)
# Using LBFGS as the optimizer
optimizer = torch.optim.LBFGS(PINN.parameters(), lr=1, max_iter=20, tolerance_grad=1e-7, tolerance_change=1e-9, history_size=100)
fcf = 1
D_0 = 1E-11
c_0 = 0.00367
R = 8.67
h = 0.0076
dCA_dt = 1
# Training loop without curriculum learning
epochs = 1
alpha = 0.1 # Fixed alpha value
loss_history = []
x_range = np.linspace(0, 1, 50) # Adjust as needed
y_range = np.linspace(0, 1, 50) # Adjust as needed
t_value = 0.5 # Fixed time for visualization, adjust as needed
X_mesh, Y_mesh = np.meshgrid(x_range, y_range)
T_mesh = np.full_like(X_mesh, t_value)
for epoch in range(epochs):
# Resample collocation points for each epoch T
idx = np.random.choice(T_X_train.shape[0], Nu, replace=False)
X_train_Nu = T_X_train[idx, :]
Y_train_Nu = T_Y_train[idx, :]
# Resample using MCMC for each epoch Pb
X_train_Nf = torch.zeros((Nf, 3)).to(device)
X_train_Nf[0, :] = torch.tensor([0.5, 0.5, 0.5]).to(device)
for i in range(1, Nf):
X_train_Nf[i, :] = X_train_Nf[i - 1, :] + 0.1 * torch.randn(3).to(device)
if torch.any(X_train_Nf[i, :] < lb) or torch.any(X_train_Nf[i, :] > ub):
X_train_Nf[i, :] = X_train_Nf[i - 1, :]
# Normalize the newly sampled points
X_train_Nu_normalized = (X_train_Nu - lb) / (ub - lb)
X_train_Nf_normalized = (X_train_Nf - lb) / (ub - lb)
X_train_Nu_normalized = X_train_Nu_normalized.float().to(device)
Y_train_Nu = Y_train_Nu.float().to(device)
X_train_Nf_normalized = X_train_Nf_normalized.float().to(device)
def closure_T():
optimizer.zero_grad()
loss_value_T = PINN.loss_T(X_train_Nu_normalized, Y_train_Nu, X_train_Nf_normalized, alpha)
loss_value_T.backward(retain_graph= True)
return loss_value_T
# Optimize for T
optimizer.step(closure_T)
# Optional: Log or print the loss for T
loss_value_T = closure_T() # Note: This will re-compute the loss; remove if efficiency is a concern
print(f"Epoch: {epoch + 1}, Loss T: {loss_value_T.item()}")
# Plotting after each epoch
plot_heat_distribution(PINN, X_mesh, Y_mesh, t_value, device)
for epoch in range(epochs):
# Resample collocation points for each epoch
idx2 = np.random.choice(Pb_X_train.shape[0], Nu2, replace=False)
X_train_Nu2 = Pb_X_train[idx2, :]
Y_train_Nu2 = Pb_Y_train[idx2, :]
# Resample using MCMC for each epoch
X_train_Nf2 = torch.zeros((Nf2, 3)).to(device)
X_train_Nf2[0, :] = torch.tensor([0.5, 0.5, 0.5]).to(device)
for i in range(1, Nf2):
X_train_Nf2[i, :] = X_train_Nf2[i - 1, :] + 0.1 * torch.randn(3).to(device)
if torch.any(X_train_Nf2[i, :] < lb) or torch.any(X_train_Nf2[i, :] > ub):
X_train_Nf2[i, :] = X_train_Nf2[i - 1, :]
X_train_Nu2_normalized = (X_train_Nu2 - lb) / (ub - lb)
X_train_Nf2_normalized = (X_train_Nf2 - lb) / (ub - lb)
X_train_Nu2_normalized = X_train_Nu2_normalized.float().to(device)
Y_train_Nu2 = Y_train_Nu2.float().to(device)
X_train_Nf2_normalized = X_train_Nf2_normalized.float().to(device)
# Closure for loss Pb
def closure_Pb():
optimizer.zero_grad() # Clear gradients
loss_value_Pb = PINN.loss_Pb(X_train_Nu2_normalized, Y_train_Nu2, X_train_Nf2_normalized, fcf, D_0, c_0, R, h, dCA_dt)
loss_value_Pb.backward() # Compute gradients for loss Pb
print(f"Epoch: {epoch + 1}, Loss Pb: {loss_value_Pb.item()}") # Print here
return loss_value_Pb
optimizer.step(closure_Pb)
# Plotting after each epoch
plot_dependent_variable(PINN, X_mesh, Y_mesh, t_value, device)
however, the model worked perfectly well for the first loop and I got the following error before starting the second loop:
RuntimeError: Trying to backward through the graph a second time (or directly access saved tensors after they have already been freed). Saved intermediate values of the graph are freed when you call .backward() or autograd.grad(). Specify retain_graph=True if you need to backward through the graph a second time or if you need to access saved tensors after calling backward.
def closure_Pb():
optimizer.zero_grad() # Clear gradients
loss_value_Pb = PINN.loss_Pb(X_train_Nu2_normalized, Y_train_Nu2, X_train_Nf2_normalized, fcf, D_0, c_0, R, h, dCA_dt)
loss_value_Pb.backward() # Compute gradients for loss Pb
print(f"Epoch: {epoch + 1}, Loss Pb: {loss_value_Pb.item()}") # Print here
return loss_value_Pb
optimizer.step(closure_Pb)
it seems that it is not allowed to call .backward() two times I assigned the added the following retain_graph = True but still the same problem!
Any help!
Note: I designed my code in way to resample the collection points for each training epoch