XOR with NEAT Algorithm

30 Views Asked by At

I'm currently trying to implement the NEAT algorithm according to this paper https://nn.cs.utexas.edu/downloads/papers/stanley.ec02.pdf and I'm trying to solve the XOR problem with my current implementation. In the paper they state that their best solution uses one or two hidden nodes. The problem I have now is that my best solution is getting larger and larger. This is my current code, I know its a lot but maybe someone can help me with it.

import networkx as nx
import matplotlib.pyplot as plt
import random
import numpy as np

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

class NodeGene:
    def __init__(self, node_id, node_type):
        self.node_id = node_id
        self.node_type = node_type
        
class ConnectionGene:
    def __init__(self, in_node, out_node, weight, enabled, innovation_number):
        self.in_node = in_node
        self.out_node = out_node
        self.weight = weight
        self.enabled = enabled
        self.innovation_number = innovation_number
    
    def clone(self):
        return ConnectionGene(self.in_node, self.out_node, self.weight, self.enabled, self.innovation_number)
        

class Genome:
    def __init__(self, initial_innovation=0):
        self.nodes = {}
        self.connections = []
        self.fitness = 0.0
        self.adjusted_fitness = 0.0
        self.global_innovation_counter = initial_innovation
    
    
    def add_node(self):
        connection_index = random.randint(0, len(self.connections)-1)
        self.connections[connection_index].enabled = False
        connection = self.connections[connection_index]
        
        new_node_id = max(self.nodes) + 1
        new_node = NodeGene(new_node_id, 'Hidden')
        
        new_connection_1 = ConnectionGene(connection.in_node, new_node_id, 1, True, self.global_innovation_counter+1)
        new_connection_2 = ConnectionGene(new_node_id, connection.out_node, connection.weight, True, self.global_innovation_counter+2)
        
        self.connections.extend([new_connection_1, new_connection_2])
        self.nodes[new_node.node_id] = new_node
        self.global_innovation_counter += 2
    
    
    def add_connection(self, max_attempts=100):
        cur_connections = {(connection.in_node, connection.out_node) for connection in self.connections}
        allowed_connections = {
            ('Input', 'Hidden'),
            ('Input', 'Output'),
            ('Hidden', 'Hidden'),
            ('Hidden', 'Output'),
            ('Bias', 'Hidden'),
            ('Bias', 'Output')
        }
        
        for _ in range(max_attempts):
            node_id_1, node_id_2 = random.sample(range(1, len(self.nodes)), 2)
            
            if node_id_1 or node_id_2 not in self.nodes:
                continue
            node_type_1 = self.nodes[node_id_1].node_type
            node_type_2 = self.nodes[node_id_2].node_type
            
            if (node_type_1, node_type_2) in allowed_connections and (node_id_1, node_id_2) not in cur_connections:
                new_connection = ConnectionGene(node_id_1, node_id_2, random.uniform(-1, 1), True, self.global_innovation_counter+1)
                self.connections.append(new_connection)
                self.global_innovation_counter += 1
                break               
            
    
    def mutate_weights(self, pertube_chance=0.9, pertube_factor=0.1):
        for connection in self.connections:
            if random.random() < pertube_chance:
                connection.weight += random.uniform(-pertube_factor, pertube_factor)
            else:
                connection.weight = random.random()
    
    
    def draw_genome(self, title):
        G = nx.DiGraph()
    
        node_labels = {}
        edge_labels = {}
        
        for node_id, node in self.nodes.items():
            G.add_node(node_id)
            node_labels[node_id] = f'{node.node_type}_{node_id}'

        for connection in self.connections:
            if connection.enabled:
                G.add_edge(connection.in_node, connection.out_node, weight=connection.weight)
                edge_labels[(connection.in_node, connection.out_node)] = f'{connection.weight:.2f}'

        pos = nx.circular_layout(G)
        nx.draw_networkx(G, pos, with_labels=True, labels=node_labels, node_size=500)
        nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
        plt.title(title)
        plt.show()
        
    
    def calculate_fitness(self):
        inputs = [
            [0, 0],
            [0, 1],
            [1, 0],
            [1, 1]
        ]
        
        expected_outputs = [0, 1, 1, 0]
        
        total_error = 0.0
        
        for input_pattern, expected_output in zip([(0, 0), (0, 1), (1, 0), (1, 1)], expected_outputs):
            network_output = self.forward_pass(input_pattern)
            error = (expected_output - network_output[0])
            total_error += error
        
        fitness = 4 - total_error
        fitness = fitness ** 2
        self.fitness = fitness
    
    def forward_pass(self, inputs):
        node_values = {node_id: 0.0 for node_id in self.nodes}
        
        for node_id, input_value in zip([node.node_id for node in self.nodes.values() if node.node_type == 'Input'], inputs):
            node_values[node_id] = input_value
            
        bias_node_id = [node_id for node_id, node in self.nodes.items() if node.node_type == 'Bias'][0]
        node_values[bias_node_id] = 1.0
        
        for connection in self.connections:
            if connection.enabled:
                in_node_value = node_values[connection.in_node]
                weight = connection.weight
                out_node_id = connection.out_node

                node_values[out_node_id] += in_node_value * weight
        for node_id, node in self.nodes.items():
            node_values[node_id] = sigmoid(node_values[node_id])
        
        outputs = [node_values[out_node] for out_node in self.nodes if self.nodes[out_node].node_type == 'Output']
        return outputs   
        
class Species:
    def __init__(self, representative):
        self.representative = representative
        self.members = [representative]
        self.fitness = 0.0
        self.adjusted_fitness = 0.0

def calculate_excess_genes(genome_1, genome_2):
    genes_1 = sorted(genome_1.connections, key=lambda x: x.innovation_number)
    genes_2 = sorted(genome_2.connections, key=lambda x: x.innovation_number)
    
    excess_count = 0
    max_innovation_1 = genes_1[-1].innovation_number
    max_innovation_2 = genes_2[-1].innovation_number
    
    for gene in genes_1:
        if gene.innovation_number > max_innovation_2:
            excess_count += 1
    
    for gene in genes_2:
        if gene.innovation_number > max_innovation_1:
            excess_count += 1
    
    return excess_count


def calculate_disjoint_genes(genome_1, genome_2):
    innovation_1 = [g.innovation_number for g in genome_1.connections]
    innovation_2 = [g.innovation_number for g in genome_2.connections]
    
    disjoint_count = 0
    for innov in innovation_1:
        if innov not in innovation_2:
            disjoint_count += 1
    for innov in innovation_2:
        if innov not in innovation_1:
            disjoint_count += 1
    return disjoint_count


def calculate_average_weight(genome_1, genome_2):
    matching_genes = 0
    total_weight_difference = 0
    
    for gene_1 in genome_1.connections:
        for gene_2 in genome_2.connections:
            if gene_1.innovation_number == gene_2.innovation_number:
                matching_genes += 1
                total_weight_difference += abs(gene_1.weight - gene_2.weight)
    
    if matching_genes:
        return total_weight_difference / matching_genes
    else:
        return 0

    
def calc_compatibility_distance(genome_1, genome_2, c1, c2, c3):
        N = len(genome_1.nodes) if len(genome_1.nodes) > len(genome_2.nodes) else len(genome_2.nodes)
        excess_genes = calculate_excess_genes(genome_1, genome_2)
        disjoint_genes = calculate_disjoint_genes(genome_1, genome_2)
        average_weight_difference = calculate_average_weight(genome_1, genome_2)
        
        delta = (c1 * excess_genes / N) + (c2 * disjoint_genes / N) + c3 * average_weight_difference
        return delta
    
    
def sharing_function(distance, threshold):
    if distance <= threshold:
        return 1
    return 0

  
def calculate_adjusted_fitness(organism, species_members, threshold):
    sharing_sum = 0
    for other_organism in species_members:
        sharing_sum += sharing_function(calc_compatibility_distance(organism, other_organism, c1=1.0, c2=1.0, c3=0.4), threshold)
    if sharing_sum == 0:
        return 0.0
    return organism.fitness / sharing_sum


def calculate_species_fitness(species):
    species.fitness = sum(member.fitness for member in species.members)

    
def calculate_species_adjusted_fitness(species, threshold):
    for organism in species.members:
        organism.adjusted_fitness = calculate_adjusted_fitness(organism, species.members, threshold)
    species.fitness = calculate_species_fitness(species)
    total_adjusted_fitness = sum(organism.adjusted_fitness for organism in species.members)
    species.adjusted_fitness = total_adjusted_fitness
    
    
def add_node_to_offspring(offspring, node_id, parent_node):
    if node_id not in offspring.nodes:
        offspring.nodes[node_id] = NodeGene(node_id, parent_node.nodes[node_id].node_type)

class NEAT:
    def __init__(self, population_size, population):
        self.population_size = population_size
        self.population = population
        self.species = []
        self.threshold = 3.0
        self.c1 = 1.0
        self.c2 = 1.0
        self.c3 = 0.4
        self.best_genome = None
        self.best_fitness = float('-inf')
        
    
    def get_best_genome(self):
        return self.best_genome
    
    
    def evaluate_fitness(self):
        for genome in self.population:
            genome.calculate_fitness()
    
    
    def calc_adjusted_fitness_population(self):
        for specie in self.species:
            calculate_species_adjusted_fitness(specie, self.threshold)
    
    
    def speciate(self):
        self.species = []
        
        for genome in self.population:
            placed = False
            for specie in self.species:
                compatibility_distance = calc_compatibility_distance(genome, specie.representative, self.c1, self.c2, self.c3)

                if compatibility_distance < self.threshold:
                    specie.members.append(genome)
                    placed = True
                    break
            
            if not placed:
                new_species = Species(genome)
                self.species.append(new_species)
        
    
    # roulette wheel
    def selection(self, specie, num_parents):
        total_adjusted_fitness = sum(g.adjusted_fitness for g in specie.members)
        if total_adjusted_fitness == 0:
            return []

        probs = [g.adjusted_fitness / total_adjusted_fitness for g in specie.members]

        selected_parents = []
        for _ in range(num_parents):
            selected_genome = random.choices(specie.members, weights=probs)[0]
            selected_parents.append(selected_genome)

        return selected_parents

    
    
    def crossover(self, genome_1, genome_2):
        if genome_1.fitness >= genome_2.fitness:
            fitter_parent, other_parent = genome_1, genome_2
        else:
            fitter_parent, other_parent = genome_2, genome_1
        
        starting_innovation = len(fitter_parent.connections)
        offspring = Genome(initial_innovation=starting_innovation)
        
        connections_other = {connection.innovation_number: connection for connection in other_parent.connections if connection.enabled}
        
        for gene_fitter in fitter_parent.connections:
            if gene_fitter.innovation_number in connections_other:
                if random.random() <= 0.5:
                    offspring.connections.append(gene_fitter.clone())
                    add_node_to_offspring(offspring, gene_fitter.in_node, fitter_parent)
                    add_node_to_offspring(offspring, gene_fitter.out_node, fitter_parent)
                else:
                    gene_other = connections_other[gene_fitter.innovation_number] 
                    offspring.connections.append(gene_other.clone())
                    add_node_to_offspring(offspring, gene_other.in_node, other_parent)
                    add_node_to_offspring(offspring, gene_other.out_node, other_parent)
            else:
                offspring.connections.append(gene_fitter.clone())
                add_node_to_offspring(offspring, gene_fitter.in_node, fitter_parent)
                add_node_to_offspring(offspring, gene_fitter.out_node, fitter_parent)      
        return offspring
        
        
    def mutation(self, offspring, mutation_rate):
        if random.random() < 0.03:
            offspring.add_node()
        if random.random() < 0.02:
            offspring.add_connection()
        if random.random() < 0.05:
            offspring.mutate_weights()
                
    
    def run_neat(self, num_generations):
        for generation in range(num_generations):
            self.evaluate_fitness()
            self.speciate()
            self.calc_adjusted_fitness_population()
            new_population = []
            for specie in self.species:
                total_adjusted_fitness = sum([g.adjusted_fitness for g in specie.members])
                if total_adjusted_fitness == 0:
                    continue
                normalized_values = [g.adjusted_fitness / total_adjusted_fitness for g in specie.members]
                num_offspring = int(sum(normalized_values) * self.population_size)
                
                # keep top half
                specie.members = [g for _, g in sorted(zip(normalized_values, specie.members), key=lambda x: x[0], reverse=True)]
                specie.members = specie.members[:max(2, len(specie.members) // 2)]
                
                for _ in range(num_offspring):
                    selected_parents = self.selection(specie, num_parents=2)
                    genome_1, genome_2 = selected_parents
                    offspring = self.crossover(genome_1, genome_2)
                    
                    self.mutation(offspring, mutation_rate=0.2)
                    new_population.append(offspring)
                    offspring.calculate_fitness()
                    if offspring.fitness > self.best_fitness:
                        self.best_genome = offspring
                        self.best_fitness = offspring.fitness
                    
            self.population = new_population
            print(f"Generation {generation + 1} completed", end='\r')

population = []
population_size = 150
num_generations = 100
for _ in range(population_size):
    genome = Genome(initial_innovation=3)
    nodes = {
        1: NodeGene(1, 'Input'),
        2: NodeGene(2, 'Input'),
        3: NodeGene(3, 'Bias'),
        4: NodeGene(4, 'Output')
    }
    
    connections = [
        ConnectionGene(1, 4, random.uniform(-1, 1), True, 1),
        ConnectionGene(2, 4, random.uniform(-1, 1), True, 2),
        ConnectionGene(3, 4, random.uniform(-1, 1), True, 3)
    ]
    genome.nodes = nodes
    genome.connections = connections
    population.append(genome)

neat = NEAT(population_size, population)
neat.run_neat(num_generations)
best_genome = neat.get_best_genome()
best_genome.draw_genome("Best Genome")
0

There are 0 best solutions below