Feature Selection using Genetic Algorithms

For a more interactive experience a Kaggle version can be found through this notebook.

Introduction

I am currently going through a course of metaheuristics and one of the programs that I implemented was Feature Selection through Genetic Algorithms.

In this case we are trying to optimize two different things.

1. Number of features

2. Fitness: In this case 1-r^2 score.

This poses a problem. How can we optimize for more than one variable?

A solution to this is to use the Pareto front. Which is the set of all the current pareto efficent solutions.

Imports and load Dataset

First, let's do some imports and load the dataframe we are trying to predict.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
				
df = pd.read_csv('./Datasets/wine.csv')
				
X = df.drop('quality', axis=1)
y = df['quality']
				

Pareto efficient solutions

Pareto efficiency is a situation where no action or allocation is available that makes one individual better off without making another worse off. - Pareto efficiency.

In this case, for every new feature that I add. I can see that it might benefit me. But I might not want to use all of the features in a dataset, so I can see which are the features that are the most helpful considering the tradeoff between the most amount of features and the least amount of r^2 score lost.

The pareto front

The pareto front normally can be seen as a frontier closest to the origin. In this case, we are trying to minimize so its going to be the closest, in the case that we try to maximize we could grab the one least close to the origin or multiply everything by minus one.

The pareto front

Non dominated sort

Now that we know what a pareto front solution is, we are going to rank our current solutions based on the level of the pareto front they are currently at.

We can do this with a simple algorithm.

  1. Generate the current pareto front.
  2. Rank this as the ith best solutions.
  3. Remove the current ith best pareto front.
  4. Repeat from 1 until you have no points left.

Non dominated sort step by step.

In this case, we are implementing the fast version or version 2 of this algorithm.

In this version you should generate a graph from the non-dominated (the pareto efficient solutions) to the dominated (the rest of the solutions).

After this, run a variation of Kahn's algorithm for topological sorting.

For each element that currently has no in-degrees, remove it and put it as part of the ith ranking. Repeat until there are no more elements in the graph.

Non dominated sort step by step.

Note: In theory for the first iteration, there should be lines from the pareto front marked as zero to the rest of the the points, but drawing those lines became to messy. Just imagine that the pareto front marked as zero dominates the rest of the points and not just the first pareto front

The following is an implementation of this second method.

def fast_non_dominated_sort(population):
    '''
    Input:
	population list(Individual): .
    Output:
	rankings (list(int)): The current rankings.
    '''

    numbers = [i for i in range(0, len(population))]
    numbered_population = list(zip(population, numbers)) # tuple(fitness, array_solution, number)

    # Initialize
    exdegrees = [0]*len(numbers)
    dominated_by = [None]*len(numbers)
    rankings = [0]*len(numbers)

    for i in numbers:
	rankings[i] = 0
	dominated_by[i] = set()

    # Calculate exdegrees
    for i, numbered_individual in enumerate(numbered_population):
	is_dominated = False
	tested_individual = numbered_individual[0]
	number = numbered_individual[1]

	# Get the exdegrees
	for i, individual in enumerate(population):

	    if tested_individual.r2_score > individual.r2_score and tested_individual.n_features >= individual.n_features and i != number:
		exdegrees[number] += 1
		dominated_by[number].add(i)
		# They are the same point, remove them to not create a cyclic dependecy
		if tested_individual.r2_score == individual.r2_score and tested_individual.n_features == individual.nfeatures and i == number:
		    exdegrees[number] -= 1
		    dominated_by[number].remove(i)

    numbered_exdegrees = list(zip(exdegrees, numbers))
    q = []
    level = 0
    used = set()

    # first time
    for element in numbered_exdegrees:
	if element[0] == 0 and not used:
	    q.append(element[1])

    for i in range(len(population)):                
	while q:
	    # get first element
	    current_number = q[0]
	    q.pop(0)

	    rankings[current_number] = level
	    used.add(current_number)

	    for i, sett in enumerate(dominated_by):
		size = len(sett)
		sett.discard(current_number)
		new_size = len(sett)
		if size != new_size:
		    numbered_exdegrees[i] = (numbered_exdegrees[i][0]-1, numbered_exdegrees[i][1])

	# add each item that is now possible
	for element in numbered_exdegrees:
	    if element[0] == 0 and element[1] not in used:
		q.append(element[1])

	if len(used) == len(rankings):
	    break
	level += 1

    for i, rank in enumerate(rankings):
	population[i].rank = rank

    return population
				

The individuals class

I created an individuals class to manage them more easily and to debug without a problem.

class Individual:
    def __init__(self, genotype, n_features):
	self.genotype=genotype
	self.n_features=n_features
	self.r2_score=-1
	self.rank=-1
    def __repr__(self):
	return f"Individual(genotype={self.genotype}, fitness=({self.r2_score}, {self.n_features}), rank={self.rank})"
				

Creating the Genetic Algorithm

Some pseudocode for a possible genetic algorithm could be the following:

  1. Create the initial population.
  2. Calculate the population fitness.
  3. Get the elite.
  4. While iteration < Generations:
    1. Crossover selecting the parents.
    2. Mutate population.
    3. Calculate the fitness of population.
    4. Get the current elite.

That is it, now, the following are implementations of each of this parts.

Creating an initial population

def create_population(N, gene_pool, maximum_value):
    initial_population = []
    for _ in range(N):

	genotype = rng.integers(maximum_value, size=len(X.columns))
	n_features = sum(genotype)/len(genotype)
	initial_population.append(Individual(genotype, n_features))
    return initial_population
				
rng = np.random.default_rng(seed=42)
				
trial_population = create_population(10, len(X.columns) , 2)
create_population(10, len(X.columns) , 2)
				
[Individual(genotype=[0 0 1 0 0 0 0 1 1 0 1], fitness=(-1, 0.36363636363636365), rank=-1),
 Individual(genotype=[0 0 0 0 0 1 1 1 0 0 0], fitness=(-1, 0.2727272727272727), rank=-1),
 Individual(genotype=[0 0 1 1 1 1 1 1 0 1 1], fitness=(-1, 0.7272727272727273), rank=-1),
 Individual(genotype=[1 1 0 1 0 1 1 1 0 1 1], fitness=(-1, 0.7272727272727273), rank=-1),
 Individual(genotype=[0 0 1 1 1 1 0 1 0 1 0], fitness=(-1, 0.5454545454545454), rank=-1),
 Individual(genotype=[1 0 1 0 0 1 0 1 1 1 0], fitness=(-1, 0.5454545454545454), rank=-1),
 Individual(genotype=[0 0 1 1 1 1 0 0 0 0 0], fitness=(-1, 0.36363636363636365), rank=-1),
 Individual(genotype=[0 0 0 1 1 1 0 0 0 0 1], fitness=(-1, 0.36363636363636365), rank=-1),
 Individual(genotype=[1 0 0 0 0 1 1 1 1 1 1], fitness=(-1, 0.6363636363636364), rank=-1),
 Individual(genotype=[1 1 0 1 1 1 1 1 1 0 1], fitness=(-1, 0.8181818181818182), rank=-1)]
				

Calculating the fitness

Right now, I am hardcoding a LinearRegressor as our current model, but later, I will change it so that any arbitrary model can be used.

def individual_fitness(individual, X, y):
    '''
    Calculate the fitness for the current algorithm.

    Input:
	individual (list(Individual))
	X: The features of the df 
	y: The feature to predict
    Output:
	fitness (tuple(float, float)): The current fitness in this case it is 1-r^2 and  
    '''
    X = X.copy()
    mask = []
    cont = 0

    for i in individual.genotype:
	if i == 1:
	    mask.append(True)
	    cont += 1
	else:
	    mask.append(False)

    if cont == 0:
	rng = np.random.default_rng(seed=42)
	random_change = rng.integers(len(X.columns))
	mask[random_change] = True
	individual.genotype[random_change] = 1

    filtered_X = X.loc[:, mask]

    X_train, X_test, y_train, y_test = train_test_split(filtered_X, y, test_size=0.33, random_state=42)
    lr = LinearRegression(n_jobs=-1)
    lr.fit(X_train, y_train)
    y_preds = lr.predict(X_test)
    r2_current_score = 1-r2_score(y_test, y_preds)


    n_features = sum(individual.genotype)/len(individual.genotype)

    individual.n_features = n_features
    individual.r2_score = r2_current_score
    return individual
				
individual_fitness(trial_population[0], X, y)
				
Individual(genotype=[0 1 1 0 0 1 0 1 0 0 1], fitness=(0.6985795098341726, 0.45454545454545453), rank=-1)
				
def population_fitness(population, X, y):
    '''
    Calculate the fitness for all of the population.
    Input:
	population (list): N individuals
	X: The features of the df 
	y: The feature to predict
    Output:
	population_fitness (tuple(float, list)): A list of evaluated individuals.
    '''
    evaluated_fitness = []
    for individual in population:
	evaluated_fitness.append(individual_fitness(individual, X, y))

    population = fast_non_dominated_sort(evaluated_fitness)

    return population
				
evaluated_individuals = population_fitness(trial_population, X, y)
				

Graphing the results

After evaluating the model, it would be nice to have a way to visualize how each model performed.

def graph_rankings(population, elites):
    '''
	population: 
    '''
    fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(12, 6))
    #sns.set(rc={"figure.figsize":(5, 5)})
    r2_score = [individual.r2_score for individual in population]
    n_features = [individual.n_features for individual in population]
    grade = [individual.rank for individual in population]
    finalized_scores = pd.DataFrame({'1-r2':r2_score, 'n_features': n_features, 'grade': grade})



    r2_score = [individual.r2_score for individual in elites]
    n_features = [individual.n_features for individual in elites]
    elites =  pd.DataFrame({'1-r2':r2_score, 'n_features': n_features})


    scatter1 = sns.scatterplot(finalized_scores, x='1-r2', y='n_features', hue='grade', ax=ax1)
    scatter2 = sns.scatterplot(elites, x='1-r2', y='n_features', ax=ax2)

    # Set limits for both graphs
    ax1.set_xlim(0.3, 1.0)
    ax1.set_ylim(0.0, 1.0)
    ax2.set_xlim(0.3, 1.0)
    ax2.set_ylim(0.0, 1.0) 

    plt.tight_layout()
    plt.show()
				
graph_rankings(evaluated_individuals, [individual for individual in evaluated_individuals if individual.rank == 0])
				

Parent Selection

In this case, I decided to go for a tournament selection.

def parent_selection_tournament(population):
    '''
	Parent selection based on tournament selection.
	Input: 
	    parents: List of lists of parents to evaluate.

	Output:
	    parent: The best parent.
    '''
    idx1 = np.random.randint(low=0, high=len(population))
    idx2 = np.random.randint(low=0, high=len(population))

    while idx1 == idx2:
	idx2 = np.random.randint(low=0, high=len(population))

    parent1 = population[idx1]
    parent2 = population[idx2]

    if parent1.rank <= parent2.rank:
	return parent1
    else:
	if parent1.r2_score < parent2.r2_score:
	    return parent1
	else:
	    return parent2 
				
print(evaluated_individuals[0].rank, evaluated_individuals[1].rank)
parent_selection_tournament(evaluated_individuals)
				

Crossover

I decided to combine the parents in here based on a uniform distribution, but in the class version I implemented another way.

def crossover(population: list, pc: float):
    '''
    Crossover operation on a population of parents.

    Input:
	population (list): parents
	fitness (list): list of evaluated fitness for the population.
	pc (float): probability of crossover
    Output:
	individuals (list): 

    '''
    new_population = []
    for i in range(len(population)):
	prob = np.random.uniform(0, 1)
	if prob < pc:
	    parent1 = parent_selection_tournament(population)
	    parent2 = parent_selection_tournament(population)
	    genotype = combine_parents(parent1, parent2, 0.5)
	    n_features = sum(genotype)/len(genotype)
	    new_population.append(Individual(genotype, n_features))
	else: 
	    parent = parent_selection_tournament(population)
	    clone = parent
	    new_population.append(clone)
    return new_population

    def combine_parents(parent1, parent2, probability):
	'''
	Helper function for crossover.
	This is an implementation of the uniform crossover.
	probability: the probability of choosing parent1 over parent2.
	'''
	child = []

	for i in range(len(parent1.genotype)):
	    if np.random.uniform(0, 1) < probability:
		child.append(parent1.genotype[i])
	    else:
		child.append(parent2.genotype[i])
	return child    
				
cross = crossover(evaluated_individuals, 0.5)
crossover(evaluated_individuals, 0.5)
				

Mutation

    def mutation(population: list, pm: float):
        '''
        Mutation on a population.
        Input:
    	population (list): parents
    	probability (float): probability of mutation
    
        Output:
    	individuals (list): 
        '''
        new_population = []
    
        for individual in population:
    	    if np.random.uniform(0, 1) < pm:
    	        # mutate
    	        mutated_individual = mutate_one_feature(individual)
    	        new_population.append(mutated_individual)
    	    else:
    	        new_population.append(individual)
    	        return new_population

    def mutate_one_feature(individual):
        idx = np.random.randint(low=0, high=len(individual.genotype))
	individual.genotype[idx] = 0 if individual.genotype[idx] else 1
	return individual
				
mutated_individual = mutation(evaluated_individuals, 0.5)
mutation(evaluated_individuals, 0.5)
				

Manage elites

Get the best members of the current population and do a fast non dominated sort. After that, just grab the ones that ranked the best.

def manage_elites(population, elite):
    '''
    Input:
	population (list): the individuals 
	elite list(tuple(float, float), list): best individuals

    Output:
	elite (tuple(float, float), list): The current elite individual    
    '''

    # Merge both normal_population and elite
    population = population + elite 

    # Rankings are indexed
    population = fast_non_dominated_sort(population)

    elite = [individual for individual in population if individual.rank == 0]

    return elite
				

Genetic algorithm

Finally, putting it all together, the implementation of the genetic algorithm is the following.

def genetic_algorithm(N, G, pr, pm, X, y):
    '''
    Input:
	N (int): Population size
	G (int): Maximum number of generations
	pr (float): reproduction's probability
	pm (float): mutations probability
    Output:
	elite (list): the best individual throught the generations
    '''

    population = create_population(N, len(X.columns), 2)
    population = population_fitness(population, X, y)
    elite = population

    fitness_over_time = [elite]

    for i in range(G):
	population = crossover(population, pr)
	population = mutation(population, pm)
	population = population_fitness(population, X, y)

	elite = manage_elites(population, elite)

	fitness_over_time.append(elite)
	if i%10 == 0:
	    graph_rankings(population, elite)

    return fitness_over_time
				
fitness_over_time = genetic_algorithm(20, 100, 0.8, 0.5, X, y)
				

The Genetic Algorithm class

I wanted to have a more portable version of the code. So I decided to have a class version of the code. Everything except for the fast non dominated is again replicated down there and can be used calling the genetic_algorithm() method. Now, it would return the elite individuals and would print the best pareto front of the elites.

class GeneticAlgorithm:
    def __init__(self, N, G, model, pc, pm, X, y):
	"""
	Input:
	    N (int): The size of the population.
	    G (int): The number of generations.
	    X (DataFrame): The features of the df.
	    y (DataFrame): The feature to predict.
	    model: A model that uses the sklearn api to train and infer.
	    pc: Probability of crossover.
	    pm: Probability of mutation.
	"""
	self.N = N
	self.G = G
	self.population = []
	self.model = model
	self.n_features = len(X.columns)
	self.X = X
	self.y = y
	self.pc = pc
	self.pm = pm

	for _ in range(N):
	    genotype = np.random.randint(2, size=self.n_features)
	    n_features = sum(genotype)/self.n_features
	    self.population.append(Individual(genotype, n_features))

    def _individual_fitness(self, individual):
	"""
	Calculate the fitness for the current individual.

	Input:
	    individual (Individual): The individual to be evaluated.
	Output:
	    evaluated_individual: The individual evaluated on 1-r2_score and n_features.
	"""

	mask = []
	cont = 0

	for i in individual.genotype:
	    if i == 1:
		mask.append(True)
		cont += 1
	    else:
		mask.append(False)

	if cont == 0:
	    rng = np.random.default_rng(seed=42)
	    random_change = rng.integers(len(X.columns))
	    mask[random_change] = True
	    individual.genotype[random_change] = 1

	filtered_X = X.loc[:, mask]

	X_train, X_test, y_train, y_test = train_test_split(filtered_X, y, test_size=0.33, random_state=42)
	model = self.model
	model.fit(X_train, y_train)
	y_preds = model.predict(X_test)
	r2_current_score = 1-r2_score(y_test, y_preds)


	n_features = sum(individual.genotype)/len(individual.genotype)

	individual.n_features = n_features
	individual.r2_score = r2_current_score
	return individual

    def _population_fitness(self):
	"""
	Calculate the fitness for all of the population.
	Input:
	    population (list): N individuals
	Output:
	    population_fitness (tuple(float, list)): A list of evaluated individuals.
	"""
	evaluated_fitness = []

	for individual in self.population:
	    evaluated_fitness.append(self._individual_fitness(individual))

	population = fast_non_dominated_sort(evaluated_fitness)

	return population

    def _graph_rankings(self, elites):
	"""
	Graph the current state for the entire population and the elite individual.
	Input: 
	    elites: The best individuals over all generations.

	Output:
	    There is no output, it graphs the state of the population and elites.
	"""

	fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(12, 6))
	#sns.set(rc={"figure.figsize":(5, 5)})
	r2_score = [individual.r2_score for individual in self.population]
	n_features = [individual.n_features for individual in self.population]
	grade = [individual.rank for individual in self.population]
	finalized_scores = pd.DataFrame({'1-r2':r2_score, 'n_features': n_features, 'grade': grade})



	r2_score = [individual.r2_score for individual in elites]
	n_features = [individual.n_features for individual in elites]
	elites_df =  pd.DataFrame({'1-r2':r2_score, 'n_features': n_features})


	scatter1 = sns.scatterplot(finalized_scores, x='1-r2', y='n_features', hue='grade', ax=ax1)
	scatter2 = sns.scatterplot(elites_df, x='1-r2', y='n_features', ax=ax2)

	# Set limits for both graphs
	ax1.set_xlim(0.3, 1.0)
	ax1.set_ylim(0.0, 1.0)
	ax2.set_xlim(0.3, 1.0)
	ax2.set_ylim(0.0, 1.0) 

	plt.tight_layout()
	plt.show()

    def _parent_selection_tournament(self, population):
	'''
	    Parent selection based on tournament selection.
	    Input: 
		parents: List of lists of parents to evaluate.

	    Output:
		parent: The best parent.
	'''
	idx1 = np.random.randint(low=0, high=len(population))
	idx2 = np.random.randint(low=0, high=len(population))

	while idx1 == idx2:
	    idx2 = np.random.randint(low=0, high=len(population))

	parent1 = population[idx1]
	parent2 = population[idx2]

	if parent1.rank <= parent2.rank:
		return parent1
	else:
		if parent1.r2_score < parent2.r2_score:
			return parent1
		else:
			return parent2

    def _crossover(self):
	'''
	Crossover operation on a population of parents.

	Output:
	new_population (list): A new population that has been crossed over.
	'''

	new_population = []

	for i in range(self.N):
	    prob = np.random.uniform(0, 1)

	    if prob lt; self.pc:
	    	parent1 = self._parent_selection_tournament(self.population)
	    	parent2 = self._parent_selection_tournament(self.population)
	    	genotype = self._combine_parents(parent1, parent2)
	    	n_features = sum(genotype)/len(genotype)
	    	new_population.append(Individual(genotype, n_features))
	    else: 
	    	parent = self._parent_selection_tournament(self.population)
	    	clone = parent
	    	new_population.append(clone)

	return new_population

    def _combine_parents(self, parent1, parent2):
    """
       Grab a random index and combine both parents beginning there.

       Helper function for crossover.
       Input:
         parent1: The list of genotype for the first parent.
         parent2: The list of genotype for the second parent.
       Output:
         offspring: A crossover for both parents.
    """
	break_point = np.random.randint(low=0, high=self.n_features)
	genotype1 = parent1.genotype
	genotype2 = parent2.genotype
	
	return np.append(genotype1[break_point:], genotype2[:break_point])

    def _mutate(self):
    """
       Mutate all of the population.

       Output:
	new_population (list): The new mutated population.
    """

    new_population = []
    
    for individual in self.population:
        if np.random.uniform(0, 1) < self.pm:
    	    mutated_individual = self._mutate_one_feature(individual)
    	    new_population.append(mutated_individual)
    	else:
    	    new_population.append(individual)
    
    return new_population
    
    def _mutate_one_feature(self, individual):
        idx = np.random.randint(low=0, high=len(individual.genotype))
        individual.genotype[idx] = 0 if individual.genotype[idx] else 1
        return individual

    def _manage_elites(self, population, elite):
    '''
    Input:
    population (list): the individuals 
    elite list(tuple(float, float), list): best individuals
    
    Output:
    elite (tuple(float, float), list): The current elite individual    
    '''

	population = population + elite 
	population = fast_non_dominated_sort(population)
	
	elite = [individual for individual in population if individual.rank == 0]
	
	return elite

    def _print_solutions(self, elites):
       """
       Print the solutions of the elites.
       """
        unique_individuals = {}
	for individual in elites:
	    n_features = individual.n_features

	    if n_features not in unique_individuals:
	        unique_individuals[n_features] = individual

		elites =  list(unique_individuals.values())
	        elites.sort(key= lambda individual: individual.n_features)
		
		for individual in elites:

		     mask = []
	             cont = 0
		     for i in individual.genotype:
		         if i == 1:
		             cont += 1
			     mask.append(True)
			 else:
			     mask.append(False)
	             cols = X.loc[:, mask].columns
		     cols = list(cols)
		     print()
		     print("The individual with n_features:", cont)
	             print("\tHas the following fitness:", individual.r2_score)
		     print("\tColumns used->", cols)
		     print()

    def genetic_algorithm(self):
       """
       The implementation of genetic algorithm.
       """
           self.population = self._population_fitness()
	   elite = [individual for individual in self.population if individual.rank == 0]

	   for i in range(self.G):
		self.population = self._crossover()
		self.population = self._mutate()
		self.population = self._population_fitness()

		elite = self._manage_elites(self.population, elite)

		if i%10 == 0:
		    self._graph_rankings(elite)
		    self._print_solutions(elite)

	    return elite
				
ga = GeneticAlgorithm(N=30, G=40, model=LinearRegression(n_jobs=-1), pc=0.6, pm=0.7, X=X, y=y)
				
elites = ga.genetic_algorithm()
				

Now running the algorithm gives us the following output.

iteration 1 iteration 2 iteration 3 iteration 4 iteration 5

The individual with n_features: 1
	Has the following fitness: 0.9199570400385672
	Columns used-> ['sulphates']


The individual with n_features: 2
	Has the following fitness: 0.7578008670371558
	Columns used-> ['sulphates', 'alcohol']


The individual with n_features: 3
	Has the following fitness: 0.6928505981395727
	Columns used-> ['volatile acidity', 'total sulfur dioxide', 'alcohol']


The individual with n_features: 5
	Has the following fitness: 0.659681296101031
	Columns used-> ['volatile acidity', 'free sulfur dioxide', 'total sulfur dioxide', 'sulphates', 'alcohol']


The individual with n_features: 6
	Has the following fitness: 0.6490835368389359
	Columns used-> ['fixed acidity', 'volatile acidity', 'chlorides', 'total sulfur dioxide', 'sulphates', 'alcohol']


The individual with n_features: 7
	Has the following fitness: 0.6446328316470582
	Columns used-> ['volatile acidity', 'chlorides', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol']


The individual with n_features: 8
	Has the following fitness: 0.6406986424594847
	Columns used-> ['volatile acidity', 'citric acid', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'pH', 'sulphates', 'alcohol']
				

Conclusion

Now we know which ones are going to be the features that are the best. The class version of this code can use different models.

The best part of this code is that it can be used as a standalone class which is portable.

Back to articles