I'm currently trying to implement the NEAT algorithm according to this paper https://nn.cs.utexas.edu/downloads/papers/stanley.ec02.pdf and I'm trying to solve the XOR problem with my current implementation. In the paper they state that their best solution uses one or two hidden nodes. The problem I have now is that my best solution is getting larger and larger. This is my current code, I know its a lot but maybe someone can help me with it.
import networkx as nx
import matplotlib.pyplot as plt
import random
import numpy as np
def sigmoid(x):
return 1 / (1 + np.exp(-x))
class NodeGene:
def __init__(self, node_id, node_type):
self.node_id = node_id
self.node_type = node_type
class ConnectionGene:
def __init__(self, in_node, out_node, weight, enabled, innovation_number):
self.in_node = in_node
self.out_node = out_node
self.weight = weight
self.enabled = enabled
self.innovation_number = innovation_number
def clone(self):
return ConnectionGene(self.in_node, self.out_node, self.weight, self.enabled, self.innovation_number)
class Genome:
def __init__(self, initial_innovation=0):
self.nodes = {}
self.connections = []
self.fitness = 0.0
self.adjusted_fitness = 0.0
self.global_innovation_counter = initial_innovation
def add_node(self):
connection_index = random.randint(0, len(self.connections)-1)
self.connections[connection_index].enabled = False
connection = self.connections[connection_index]
new_node_id = max(self.nodes) + 1
new_node = NodeGene(new_node_id, 'Hidden')
new_connection_1 = ConnectionGene(connection.in_node, new_node_id, 1, True, self.global_innovation_counter+1)
new_connection_2 = ConnectionGene(new_node_id, connection.out_node, connection.weight, True, self.global_innovation_counter+2)
self.connections.extend([new_connection_1, new_connection_2])
self.nodes[new_node.node_id] = new_node
self.global_innovation_counter += 2
def add_connection(self, max_attempts=100):
cur_connections = {(connection.in_node, connection.out_node) for connection in self.connections}
allowed_connections = {
('Input', 'Hidden'),
('Input', 'Output'),
('Hidden', 'Hidden'),
('Hidden', 'Output'),
('Bias', 'Hidden'),
('Bias', 'Output')
}
for _ in range(max_attempts):
node_id_1, node_id_2 = random.sample(range(1, len(self.nodes)), 2)
if node_id_1 or node_id_2 not in self.nodes:
continue
node_type_1 = self.nodes[node_id_1].node_type
node_type_2 = self.nodes[node_id_2].node_type
if (node_type_1, node_type_2) in allowed_connections and (node_id_1, node_id_2) not in cur_connections:
new_connection = ConnectionGene(node_id_1, node_id_2, random.uniform(-1, 1), True, self.global_innovation_counter+1)
self.connections.append(new_connection)
self.global_innovation_counter += 1
break
def mutate_weights(self, pertube_chance=0.9, pertube_factor=0.1):
for connection in self.connections:
if random.random() < pertube_chance:
connection.weight += random.uniform(-pertube_factor, pertube_factor)
else:
connection.weight = random.random()
def draw_genome(self, title):
G = nx.DiGraph()
node_labels = {}
edge_labels = {}
for node_id, node in self.nodes.items():
G.add_node(node_id)
node_labels[node_id] = f'{node.node_type}_{node_id}'
for connection in self.connections:
if connection.enabled:
G.add_edge(connection.in_node, connection.out_node, weight=connection.weight)
edge_labels[(connection.in_node, connection.out_node)] = f'{connection.weight:.2f}'
pos = nx.circular_layout(G)
nx.draw_networkx(G, pos, with_labels=True, labels=node_labels, node_size=500)
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
plt.title(title)
plt.show()
def calculate_fitness(self):
inputs = [
[0, 0],
[0, 1],
[1, 0],
[1, 1]
]
expected_outputs = [0, 1, 1, 0]
total_error = 0.0
for input_pattern, expected_output in zip([(0, 0), (0, 1), (1, 0), (1, 1)], expected_outputs):
network_output = self.forward_pass(input_pattern)
error = (expected_output - network_output[0])
total_error += error
fitness = 4 - total_error
fitness = fitness ** 2
self.fitness = fitness
def forward_pass(self, inputs):
node_values = {node_id: 0.0 for node_id in self.nodes}
for node_id, input_value in zip([node.node_id for node in self.nodes.values() if node.node_type == 'Input'], inputs):
node_values[node_id] = input_value
bias_node_id = [node_id for node_id, node in self.nodes.items() if node.node_type == 'Bias'][0]
node_values[bias_node_id] = 1.0
for connection in self.connections:
if connection.enabled:
in_node_value = node_values[connection.in_node]
weight = connection.weight
out_node_id = connection.out_node
node_values[out_node_id] += in_node_value * weight
for node_id, node in self.nodes.items():
node_values[node_id] = sigmoid(node_values[node_id])
outputs = [node_values[out_node] for out_node in self.nodes if self.nodes[out_node].node_type == 'Output']
return outputs
class Species:
def __init__(self, representative):
self.representative = representative
self.members = [representative]
self.fitness = 0.0
self.adjusted_fitness = 0.0
def calculate_excess_genes(genome_1, genome_2):
genes_1 = sorted(genome_1.connections, key=lambda x: x.innovation_number)
genes_2 = sorted(genome_2.connections, key=lambda x: x.innovation_number)
excess_count = 0
max_innovation_1 = genes_1[-1].innovation_number
max_innovation_2 = genes_2[-1].innovation_number
for gene in genes_1:
if gene.innovation_number > max_innovation_2:
excess_count += 1
for gene in genes_2:
if gene.innovation_number > max_innovation_1:
excess_count += 1
return excess_count
def calculate_disjoint_genes(genome_1, genome_2):
innovation_1 = [g.innovation_number for g in genome_1.connections]
innovation_2 = [g.innovation_number for g in genome_2.connections]
disjoint_count = 0
for innov in innovation_1:
if innov not in innovation_2:
disjoint_count += 1
for innov in innovation_2:
if innov not in innovation_1:
disjoint_count += 1
return disjoint_count
def calculate_average_weight(genome_1, genome_2):
matching_genes = 0
total_weight_difference = 0
for gene_1 in genome_1.connections:
for gene_2 in genome_2.connections:
if gene_1.innovation_number == gene_2.innovation_number:
matching_genes += 1
total_weight_difference += abs(gene_1.weight - gene_2.weight)
if matching_genes:
return total_weight_difference / matching_genes
else:
return 0
def calc_compatibility_distance(genome_1, genome_2, c1, c2, c3):
N = len(genome_1.nodes) if len(genome_1.nodes) > len(genome_2.nodes) else len(genome_2.nodes)
excess_genes = calculate_excess_genes(genome_1, genome_2)
disjoint_genes = calculate_disjoint_genes(genome_1, genome_2)
average_weight_difference = calculate_average_weight(genome_1, genome_2)
delta = (c1 * excess_genes / N) + (c2 * disjoint_genes / N) + c3 * average_weight_difference
return delta
def sharing_function(distance, threshold):
if distance <= threshold:
return 1
return 0
def calculate_adjusted_fitness(organism, species_members, threshold):
sharing_sum = 0
for other_organism in species_members:
sharing_sum += sharing_function(calc_compatibility_distance(organism, other_organism, c1=1.0, c2=1.0, c3=0.4), threshold)
if sharing_sum == 0:
return 0.0
return organism.fitness / sharing_sum
def calculate_species_fitness(species):
species.fitness = sum(member.fitness for member in species.members)
def calculate_species_adjusted_fitness(species, threshold):
for organism in species.members:
organism.adjusted_fitness = calculate_adjusted_fitness(organism, species.members, threshold)
species.fitness = calculate_species_fitness(species)
total_adjusted_fitness = sum(organism.adjusted_fitness for organism in species.members)
species.adjusted_fitness = total_adjusted_fitness
def add_node_to_offspring(offspring, node_id, parent_node):
if node_id not in offspring.nodes:
offspring.nodes[node_id] = NodeGene(node_id, parent_node.nodes[node_id].node_type)
class NEAT:
def __init__(self, population_size, population):
self.population_size = population_size
self.population = population
self.species = []
self.threshold = 3.0
self.c1 = 1.0
self.c2 = 1.0
self.c3 = 0.4
self.best_genome = None
self.best_fitness = float('-inf')
def get_best_genome(self):
return self.best_genome
def evaluate_fitness(self):
for genome in self.population:
genome.calculate_fitness()
def calc_adjusted_fitness_population(self):
for specie in self.species:
calculate_species_adjusted_fitness(specie, self.threshold)
def speciate(self):
self.species = []
for genome in self.population:
placed = False
for specie in self.species:
compatibility_distance = calc_compatibility_distance(genome, specie.representative, self.c1, self.c2, self.c3)
if compatibility_distance < self.threshold:
specie.members.append(genome)
placed = True
break
if not placed:
new_species = Species(genome)
self.species.append(new_species)
# roulette wheel
def selection(self, specie, num_parents):
total_adjusted_fitness = sum(g.adjusted_fitness for g in specie.members)
if total_adjusted_fitness == 0:
return []
probs = [g.adjusted_fitness / total_adjusted_fitness for g in specie.members]
selected_parents = []
for _ in range(num_parents):
selected_genome = random.choices(specie.members, weights=probs)[0]
selected_parents.append(selected_genome)
return selected_parents
def crossover(self, genome_1, genome_2):
if genome_1.fitness >= genome_2.fitness:
fitter_parent, other_parent = genome_1, genome_2
else:
fitter_parent, other_parent = genome_2, genome_1
starting_innovation = len(fitter_parent.connections)
offspring = Genome(initial_innovation=starting_innovation)
connections_other = {connection.innovation_number: connection for connection in other_parent.connections if connection.enabled}
for gene_fitter in fitter_parent.connections:
if gene_fitter.innovation_number in connections_other:
if random.random() <= 0.5:
offspring.connections.append(gene_fitter.clone())
add_node_to_offspring(offspring, gene_fitter.in_node, fitter_parent)
add_node_to_offspring(offspring, gene_fitter.out_node, fitter_parent)
else:
gene_other = connections_other[gene_fitter.innovation_number]
offspring.connections.append(gene_other.clone())
add_node_to_offspring(offspring, gene_other.in_node, other_parent)
add_node_to_offspring(offspring, gene_other.out_node, other_parent)
else:
offspring.connections.append(gene_fitter.clone())
add_node_to_offspring(offspring, gene_fitter.in_node, fitter_parent)
add_node_to_offspring(offspring, gene_fitter.out_node, fitter_parent)
return offspring
def mutation(self, offspring, mutation_rate):
if random.random() < 0.03:
offspring.add_node()
if random.random() < 0.02:
offspring.add_connection()
if random.random() < 0.05:
offspring.mutate_weights()
def run_neat(self, num_generations):
for generation in range(num_generations):
self.evaluate_fitness()
self.speciate()
self.calc_adjusted_fitness_population()
new_population = []
for specie in self.species:
total_adjusted_fitness = sum([g.adjusted_fitness for g in specie.members])
if total_adjusted_fitness == 0:
continue
normalized_values = [g.adjusted_fitness / total_adjusted_fitness for g in specie.members]
num_offspring = int(sum(normalized_values) * self.population_size)
# keep top half
specie.members = [g for _, g in sorted(zip(normalized_values, specie.members), key=lambda x: x[0], reverse=True)]
specie.members = specie.members[:max(2, len(specie.members) // 2)]
for _ in range(num_offspring):
selected_parents = self.selection(specie, num_parents=2)
genome_1, genome_2 = selected_parents
offspring = self.crossover(genome_1, genome_2)
self.mutation(offspring, mutation_rate=0.2)
new_population.append(offspring)
offspring.calculate_fitness()
if offspring.fitness > self.best_fitness:
self.best_genome = offspring
self.best_fitness = offspring.fitness
self.population = new_population
print(f"Generation {generation + 1} completed", end='\r')
population = []
population_size = 150
num_generations = 100
for _ in range(population_size):
genome = Genome(initial_innovation=3)
nodes = {
1: NodeGene(1, 'Input'),
2: NodeGene(2, 'Input'),
3: NodeGene(3, 'Bias'),
4: NodeGene(4, 'Output')
}
connections = [
ConnectionGene(1, 4, random.uniform(-1, 1), True, 1),
ConnectionGene(2, 4, random.uniform(-1, 1), True, 2),
ConnectionGene(3, 4, random.uniform(-1, 1), True, 3)
]
genome.nodes = nodes
genome.connections = connections
population.append(genome)
neat = NEAT(population_size, population)
neat.run_neat(num_generations)
best_genome = neat.get_best_genome()
best_genome.draw_genome("Best Genome")