quinta-feira, 1 de maio de 2014

PSO e TSP

Os optimizadores baseados em enxames de partículas (Particle Swarm Optimizers - PSO) foram apresentados no contexto de problemas envolvendo a procura do óptimo de funções. A representação das soluções candidatas foi um vector de reais. Existem problemas, como é o caso do problema do caixeiro viajante (Traveling Salesman Problem - TSP) em que a representação natural é uma permutação de inteiros, representando cada inteiro o índice de uma cidade. Para usar o PSO para resolver o TSP temos a dificuldade da representação. Uma forma de iludir essas questão é usar uma representação de baixo nível na forma de vector de reais e depois avaliar a qualidade das soluções passando a representação para alto nível, na forma de permutação de inteiros. Pelo meio, precisamos de usar um descodificador do vector de reais para a permutação de inteiros o que podemos fazer baseados no conceito de chaves aleatórias (random keys). É essa solução que vamos apresentar, completando o material das aulas. O código do PSO segue a formulação clássica de Kennedy e Eberhart no seu livro Swarm Intelligence. Além disso, o nosso código pressupõe que a vizinhança de cada partícula são todas as outras partículas.
def pso_tsp(numb_generations,numb_particles,weight_inertia, phi_1,phi_2,vel_max, domain, function,problem,type_problem):
    """
    num_generations = number of generations
    numb_particles = number of particles (10 + 2 * sqrt(dimensions)) or [10,50]
    weight_inertia (w) = to control oscilations (0.721 or [0,1[)
    phi_1, phi_2 = cognitive and social weights (1.193 or less than (12* w* (w-1) / (5*w -7)) or sum equal to 4)
    vel_max = maximum variation for move
    domain = [...,(inf_i,sup_i)...] domain values for each dimension
    function = to compute the fitness of a solution candidate
    problem = o dicionário com as coordenadas das cidades
    type_problem = max or min, for maximization or minimization problems.
    k = size of neighborhood
    
    Structures:
    particles = [...,[[...,p_i_j,...],fit_i],...], current position and fitness of particle i
    velocities = [..., [...,v_i_j,...],...]
    best_past = [...,[[...,p_i_j,...],fit_i],...], previous best position and fitness of particle i
    global_best = [[...,p_k_j,...],fit_k], position and fitness of the global best particle
    statistics_by_generation = [...,[best_fitness_gen_i, average_fitness_gen_i], ...]
    """
    # initialization
    numb_dimensions = len(domain)
    particles = [[generate_particle(domain),0] for count in range(numb_particles)]
    velocities = [generate_velocity(vel_max, numb_dimensions) for count in range(numb_particles)]
    
    # first evaluations
    particles = [[part, function(part,problem)] for [part,fit] in particles]
    best_past = deepcopy(particles)
    global_best = find_global_best(particles,type_problem)
    # statistics
    statistics_by_generation = []
    # Run!
    for gen in range(numb_generations):
        # for each particle
        for part in range(numb_particles): 
            # for each dimension
            for dim in range(numb_dimensions):
                # update velocity
                velocities[part][dim] = weight_inertia * velocities[part][dim] 
                + phi_1 * (best_past[part][0][dim] - particles[part][0][dim])
                + phi_2 * (global_best[0][dim] - particles[part][0][dim])
                # update position
                particles[part][0][dim] = particles[part][0][dim] + velocities[part][dim]
                # clampling
                if particles[part][0][dim] < domain[dim][0]:
                    particles[part][0][dim] = domain[dim][0]
                    #velocities[part][dim] = 0
                elif particles[part][0][dim] > domain[dim][1]:
                    particles[part][0][dim] = domain[dim][1]
                    #velocities[part][dim] = 0
            # update fitness particle
            particles[part][1] = function(particles[part][0],problem)

            # update best past
            if type_problem == max:
                # maximization situation
                if particles[part][1] > best_past[part][1]:
                    best_past[part] = deepcopy(particles[part])
        # update global best
                    if particles[part][1] > global_best[1]:
                        global_best = particles[part] 
            else: # minimization problem
                if particles[part][1] < best_past[part][1]:
                    best_past[part] = deepcopy(particles[part])
                    if particles[part][1] < global_best[1]:
                        global_best = particles[part] 
        # update statistics
        generation_average_fitness = sum([particle[1] for particle in best_past])/numb_particles
        generation_best_fitness = global_best[1]
        statistics_by_generation.append([generation_best_fitness,generation_average_fitness])
    # give me the best!
    print('\nBest Solution: %s\nFitness: %0.2f' % (decode_rk(global_best[0]),global_best[1]))
    return statistics_by_generation
Em relação à versão para as funções, acrescentámos um parâmetro contendo as coordenadas das cidades. Igualmente, temos que adaptar outras funções auxiliares, nomeadamente as que envolvem o cálculo da qualidade das soluções:
def phenotype_rk(genotype,problem):
 """ Obtaing the phenotype = list of coordinates."""
 genotype_permut = decode_rk(genotype)
 pheno = [problem[city] for city in genotype_permut]
 return pheno

def evaluate(genotype,problem):
 tour = phenotype_rk(genotype,problem)
 numb_cities = len(tour)
 dist = 0
 for i in range(numb_cities):
  j = (i+1) % numb_cities
  dist  += distance(tour[i],tour[j])
 return dist
Como se pode observar no código acima, tivemos que incluir o novo parâmetro com o dicionários das cidades. Dito isto, apresentamos o código completo. Em relação ao problema concreto o leitor terá que adaptar com o seu exemplo.
#! /usr/bin/env python

"""
pso_tsp.py
Implementation of the standard PSO (Kennedy & Eberhart: Swarm Intelligence. pp 313). 
Global neighborhood. 
For solving the TSP using random keys.
Ernesto Costa, May 2014.
"""

# imports
from pylab import *
from random import random,uniform
from copy import deepcopy
from math import sqrt,sin,cos,pi
from operator import itemgetter

# colect and display
def run(numb_runs,numb_generations,numb_particles,weight_inertia, phi_1,phi_2,vel_max, domain, function, problem,type_problem):
    # Colect Data
    print('Wait, please ')
    statistics_total = [pso(numb_generations,numb_particles,weight_inertia, phi_1,phi_2,vel_max, domain, function,problem,type_problem) for i in range(numb_runs)]
    print("That's it!")
    # Process data: best and average by generation
    results = list(zip(*statistics_total))   
    best = [type_problem([result[0] for result in generation]) for generation in results]
    best_average = [sum([result[0] for result in generation])/numb_runs for generation in results]
    average = [sum([indiv[1] for indiv in genera])/numb_runs for genera in results]
    # Mostra
    ylabel('Fitness')
    xlabel('Generation')
    tit = 'Runs: %d , Phi: %0.2f, Vel: %0.2f' % (numb_runs,phi_1, vel_max)
    title(tit)
    axis= [0,numb_generations,0,len(domain)]
    p1 = plot(best,'r-o',label="Best")
    p2 = plot(average,'g->',label="Average")
    p3 = plot(best_average, 'y-s',label="Average of Best")
    if type_problem == max:
        legend(loc='lower right')
    else:
        legend(loc='upper right')
    show()
    
# main program

def pso(numb_generations,numb_particles,weight_inertia, phi_1,phi_2,vel_max, domain, function,problem,type_problem):
    """
    num_generations = number of generations
    numb_particles = number of particles (10 + 2 * sqrt(dimensions)) or [10,50]
    weight_inertia (w) = to control oscilations (0.721 or [0,1[)
    phi_1, phi_2 = cognitive and social weights (1.193 or less than (12* w* (w-1) / (5*w -7)) or sum equal to 4)
    vel_max = maximum variation for move
    domain = [...,(inf_i,sup_i)...] domain values for each dimension
    function = to compute the fitness of a solution candidate
    type_problem = max or min, for maximization or minimization problems.
    k = size of neighborhood
    
    Structures:
    particles = [...,[[...,p_i_j,...],fit_i],...], current position and fitness of particle i
    velocities = [..., [...,v_i_j,...],...]
    best_past = [...,[[...,p_i_j,...],fit_i],...], previous best position and fitness of particle i
    global_best = [[...,p_k_j,...],fit_k], position and fitness of the global best particle
    statistics_by_generation = [...,[best_fitness_gen_i, average_fitness_gen_i], ...]
    """
    # initialization
    numb_dimensions = len(domain)
    particles = [[generate_particle(domain),0] for count in range(numb_particles)]
    velocities = [generate_velocity(vel_max, numb_dimensions) for count in range(numb_particles)]
    
    # first evaluations
    particles = [[part, function(part,problem)] for [part,fit] in particles]
    best_past = deepcopy(particles)
    global_best = find_global_best(particles,type_problem)
    # statistics
    statistics_by_generation = []
    # Run!
    for gen in range(numb_generations):
        # for each particle
        for part in range(numb_particles): 
            # for each dimension
            for dim in range(numb_dimensions):
                # update velocity
                velocities[part][dim] = weight_inertia * velocities[part][dim] 
                + phi_1 * (best_past[part][0][dim] - particles[part][0][dim])
                + phi_2 * (global_best[0][dim] - particles[part][0][dim])
                # update position
                particles[part][0][dim] = particles[part][0][dim] + velocities[part][dim]
                # clampling
                if particles[part][0][dim] < domain[dim][0]:
                    particles[part][0][dim] = domain[dim][0]
                    #velocities[part][dim] = 0
                elif particles[part][0][dim] > domain[dim][1]:
                    particles[part][0][dim] = domain[dim][1]
                    #velocities[part][dim] = 0
            # update fitness particle
            particles[part][1] = function(particles[part][0],problem)

            # update best past
            if type_problem == max:
                # maximization situation
                if particles[part][1] > best_past[part][1]:
                    best_past[part] = deepcopy(particles[part])
        # update global best
                    if particles[part][1] > global_best[1]:
                        global_best = particles[part] 
            else: # minimization problem
                if particles[part][1] < best_past[part][1]:
                    best_past[part] = deepcopy(particles[part])
                    if particles[part][1] < global_best[1]:
                        global_best = particles[part] 
        # update statistics
        generation_average_fitness = sum([particle[1] for particle in best_past])/numb_particles
        generation_best_fitness = global_best[1]
        statistics_by_generation.append([generation_best_fitness,generation_average_fitness])
    # give me the best!
    print('\nBest Solution: %s\nFitness: %0.2f' % (decode_rk(global_best[0]),global_best[1]))
    return statistics_by_generation
    

    
# Utilities
def generate_particle(domain):
    """ randomly construct a particle."""
    particle = [random() for inf,sup in domain]
    return particle

def generate_velocity(vel_max, numb_dimensions):
    """ randomly define the velocity of a particle."""
    velocity = [uniform(-vel_max,vel_max) for count in range(numb_dimensions)]
    return velocity


def find_global_best(particles,type_problem):
    """ Find the overall global best."""
    global_best = deepcopy(particles)
    global_best.sort(key=itemgetter(1))
    if type_problem == max:
        return global_best[-1]
    else:
        return global_best[0]
 
def clamping(indiv, domain):
    """keep individual's values inside the domain."""
    new_indiv = [verify(indiv[i],domain[i]) for i in range(len(indiv))]
    return new_indiv

def verify(value,domain_value):
    if value < domain_value[0]:
        return domain_value[0]
    elif value > domain_value[1]:
        return domain_value[1]
    else:
        return value
   

#  --------------------------  For TSP ------------------------------------

# Getting gthe coordinates of the cities from a file and store them in a dictionary
def get_coordinates_tsp(filename):
    """ 
    General parser for tsp files.
    Obtain the cities' coordinates from a file in tsp format.
    """ 
    with open(filename) as f_in:
        coordinates = []
        line = f_in.readline()
        while not line.startswith('NODE_COORD_SECTION'):
            line = f_in.readline()
        line = f_in.readline()
        while not line.startswith('EOF'):
            n,x,y = line[:-1].split()
            coordinates.append((float(x),float(y)))
            line = f_in.readline()
        f_in.close()
    return coordinates

def dict_cities(coordinates):
    """ Create a dictionary. Key = city identifier, value = coordinates."""
    my_dict = {}
    for i, (x,y) in enumerate(coordinates):
        my_dict[i] = (x,y)
    return my_dict

#  Fitness calculation
def phenotype_rk(genotype,problem):
 """ Obtaing the phenotype = list of coordinates."""
 genotype_permut = decode_rk(genotype)
 pheno = [problem[city] for city in genotype_permut]
 return pheno

def evaluate(genotype,problem):
 tour = phenotype_rk(genotype,problem)
 numb_cities = len(tour)
 dist = 0
 for i in range(numb_cities):
  j = (i+1) % numb_cities
  dist  += distance(tour[i],tour[j])
 return dist

def distance(cid_i, cid_j):
    """ Euclidian distance."""
    x_i, y_i = cid_i
    x_j, y_j = cid_j
    dx = x_i - x_j
    dy = y_i - y_j
    dist = sqrt(dx**2 + dy**2)
    return dist

# From a vector of reals to a permutation of integers
def decode_rk(vector):
    """Work even with repeated elements."""
    copia = deepcopy(vector)
    aux = deepcopy(vector)
    aux.sort()
    permutation = []
    for i,elem in enumerate(aux):
        permutation.append(copia.index(elem))
        copia[copia.index(elem)] = None
    return permutation


if __name__== '__main__':
 filename = '/Users/ernestojfcosta/tmp/wi29.tsp'
 coordinates = get_coordinates_tsp(filename)
 problem = dict_cities(coordinates)
 size = len(problem)
 run(1,100,100,0.8,1.3,2.7,0.8,[(0,1) for i in range(size)],evaluate,problem,min)

Extrair dados de um ficheiro em formato TSP

O problema do caixeiro viajante é um exemplo clássico para as diferentes meta-heurísticas, de inspirarão biológica ou não, mostrarem o seu desempenho. Existem sítio específicos na Internet com exemplos concretos de conjunto de cidades (veja-se, por exemplo, http://www.iwr.uni-heidelberg.de/groups/comopt/software/TSPLIB95/tsp/). Esses ficheiros estão num formato especial, pelo que é preciso escrever um programa para extrair as coordenadas das diferentes cidades. Um exemplo concreto de ficheiro é wi29.tsp (omitimos algumas entradas):
NAME : wi29
COMMENT : 29 locations in Western Sahara
COMMENT : Derived from National Imagery and Mapping Agency data
TYPE : TSP
DIMENSION : 29
EDGE_WEIGHT_TYPE : EUC_2D
NODE_COORD_SECTION
1 20833.3333 17100.0000
2 20900.0000 17066.6667
3 21300.0000 13016.6667
4 21600.0000 14150.0000
…
28 27433.3333 12400.0000
29 27462.5000 12992.2222
EOF
Como se pode ver as 7 primeiras linhas e a última têm informação irrelevante para o problema, pelo que uma solução possível será:
def le_coordenadas_tsp(ficheiro):
    """ Devolve a matriz das coordenadas a partir de ficheiro em formato tsp."""
    fich_in = open(ficheiro)
    tudo = fich_in.readlines()
    coordenadas = []
    for linha in tudo[7:-1]:
 n,x,y = linha[:-1].split()
 coordenadas.append((float(x),float(y)))
    fich_in.close()
    return coordenadas
A solução acima o que faz é obter de uma só vez a lista das linhas, ignorar as 7 primeiras e a última e construir uma lista com as coordenadas.

Com a lista das coordenadas podemos, por exemplo, construir um dicionário, em que as chaves são um identificador único da cidade (um inteiro entre 0 e o número de cidades menos um), e os valores um duplo com as coordenadas.
def dicio_cidades(coordenadas):
    """ Cria um dicionário com as cidades e suas coordenadas."""
    dicio = {}
    for i, (x,y) in enumerate(coordenadas):
 dicio[i] = (x,y)
    return dicio
Esta solução tem o inconveniente de ser um pouco rígida, em particular se o cabeçalho for diferente do habitual. Podemos no entanto criar uma solução mais conveniente de modo bastante simples:
def get_coordinates_tsp(filename):
    """ 
    General parser for tsp files.
    Obtain the cities' coordinates from a file in tsp format.
    """ 
    with open(filename) as f_in:
        coordinates = []
        line = f_in.readline()
        while not line.startswith('NODE_COORD_SECTION'):
            line = f_in.readline()
        line = f_in.readline()
        while not line.startswith('EOF'):
            n,x,y = line[:-1].split()
            coordinates.append((float(x),float(y)))
            line = f_in.readline()
        f_in.close()
    return coordinates
Este programa aplica-se a todos os ficheiros semelhantes ao anterior, mas identifica a zona com a informação relevante não pelo número de linha do ficheiro, mas antes por palavras chaves (no caso NODE_COORD_SECTION e EOF).

De novo as chaves aleatórias

Já aqui abordámos neste blogue o conceito de chaves aleatórias. Elas permitem representar através de um vector de reais entre zero e um uma permutação de inteiros. A vantagem desta representação está em podermos utilizar os operadores genéticos “normais” (e mais simples!) sobre números reais e não os operadores especializados (e mais complexos) sobre permutações de inteiros. Claro que vamos depois precisar de um descodificados quando se tratar de avaliar a qualidade de cada indivíduo. Uma implementação ingénua do descodificador é a seguinte:
def decode_rk(vector):
    aux = vector[:]
    aux.sort()
    permutation =[ vector.index(elem) + 1 for elem in aux]
    return permutation
Nesta implementação geramos permutações entre 1 e n. Se quisermos entre 0 e (n-1) basta retirar a soma de 1 que aparece na lista por compreensão. Esta solução funciona bem caso não existam elementos repetidos. Se tal acontecer temos um problema… Alterar a solução acima para que a existência de números repetidos não cause problemas não é difícil:
from copy import deepcopy

def decode_rk(vector):
    """Work even with repeated elements."""
    copia = deepcopy(vector)
    aux = deepcopy(vector)
    aux.sort()
    permutation = []
    for i,elem in enumerate(aux):
        permutation.append(copia.index(elem))
        copia[copia.index(elem)] = None
    return permutation
A solução passa por alterar um elemento cada vez que ele é considerado, substituindo-o por None. Experimente e verifique que agora já não tem que se preocupar com os elementos repetidos.

sábado, 5 de abril de 2014

Experiências

Em computação evolucionária existe uma actividade fundamental: determinar qual a melhor configuração de um algoritmo para resolver um determinado problema. E esse trabalho começa normalmente com uma questão, como, por exemplo, saber se a escolha do operador de cruzamento é relevante para o desempenho do algoritmo. Essa pergunta continua com o levantar de uma hipótese, por exemplo, os operadores de cruzamento de um ponto e de cruzamento uniforme promovem desempenhos diferentes. Para comprovar a nossa hipótese efectuamos uma série de experiências e coligimos dados, que mais tarde são analisados estatisticamente.Vamos concretizar.

Suponhamos que a nossa hipótese de trabalho é a enunciada acima: há diferença entre usar o operador de cruzamento de um ponto e o operador de cruzamento uniforme. A nossa experiência vai consistir em escolher um problema, no caso o problema de encontrar mínimo da função de Rastrigin:

 Tipicamente A=10 e cada variável x_i assume um valor no intervalo [-5.12, 5.12]. Vamos correr 30 vezes duas versões do nosso algoritmo, versões essas que apenas diferem no operador de cruzamento usado, recolher os valores das melhores soluções obtidas no final. Esses resultados são finalmente analisados e tiradas conclusões.

O código usado tem três partes principais: (1) experiência; (2) algoritmo evolucionário; (3) função de teste. A componente (1), experiência, permite controlar o número de execuções emparelhadas que são efectuadas, guardar os resultados num ficheiro e visualizar os resultados. Inclui ainda a parte de importação dos módulos necessários. O código é específico para a comparação de dois operadores de cruzamento.
import copy
import random
import numpy as np
from operator import itemgetter
import matplotlib.pyplot as plt

# ----------------------------- EXPERIMENT --------------------------------------------------------------------------
def run_parents_selection(numb_runs, filename,pop_size, cromo_size, fitness_func, prob_cross, prob_muta,select_parents, muta_method, cross_method, select_survivors, max_gener):
    with open(filename,'w') as f_data:
        for i in range(numb_runs):
            print('RUN...%s' % (i+1))
            initial_pop = init_pop(pop_size, cromo_size)
            best_1 = sea(initial_pop, fitness_func, prob_cross, prob_muta,select_parents, muta_method, cross_method[0], select_survivors, max_gener)
            best_2 = sea(initial_pop, fitness_func, prob_cross, prob_muta,select_parents, muta_method, cross_method[1], select_survivors, max_gener)
            f_data.write(str(best_1[1])  + '  ' + str(best_2[1]) + '\n')
        f_data.close()
        show(filename)

# show results
def show(filename):
    with open(filename,'r') as f_data:
        data_1 = []
        data_2 = []
        for line in f_data:
            data = line[:-1].split()
            data_1.append(str(data[0]))
            data_2.append(str(data[1]))
        plt.grid(True)
        plt.title('Rastrigin')
        plt.xlabel('Run')
        plt.ylabel('Best')
        plt.plot(data_1, label='One-point')
        plt.plot(data_2,label='Uniform')
        plt.legend(loc='upper left')
        plt.show()
            
A segunda parte é um algoritmo evolucionário genérico para uma representação com reais.
# ---------------------------- EVOLUTIONARY ALGORITHM --------------------------------------------------
def sea(initial_pop, fitness_func, prob_cross, prob_muta,select_parents, muta_method, cross_method, select_survivors, max_gener):
    pop_size = len(initial_pop)
    population = eval_pop(initial_pop, fitness_func)
    for gener in range(max_gener):
        mates = select_parents(population,pop_size,3)
        offspring = crossover(mates, prob_cross, cross_method)
        offspring = mutation(offspring, prob_muta,muta_method)
        offspring = eval_pop(offspring,fitness_func)
        population = select_survivors(population, offspring)
    best_individual = best_pop(population)
    return best_individual

# --------------------------- Auxiliary Functions
def init_pop(pop_size, cromo_size):
    """Return a list of individuals, where each indicidual has the forma [chromo, 0]"""
    population = [[cromo_reals(cromo_size),0] for count in range(pop_size)]
    return population
 
 
def cromo_reals(size):
    cromo =[random.uniform(-5.12,5.12) for i in range(size)]
    return cromo


def eval_pop(population,fitness_function):
    return [[indiv[0], fitness_function(indiv[0])] for indiv in population]
    
# -------------------------- Selection of Parents
def roulette_wheel(population, numb):
    """ Select numb individuals from the population
    according with their relative fitness. MIN """
    pop = copy.deepcopy(population)
    pop.sort(key=itemgetter(1))
    total_fitness = sum([indiv[1] for indiv in pop])
    mate_pool = []
    for i in range(numb):
        value = random.uniform(0,1)
        index = 0
        total = pop[index][1]/ total_fitness
        while total < value:
            index += 1
            total += pop[index][1]/total_fitness
        mate_pool.append(pop[index])
    return mate_pool
    
def tournament_sel(population, numb,t_size):
    mate_pool=[]
    for i in range(numb):
        indiv = tournament(population,t_size)
        mate_pool.append(indiv)
    return mate_pool

def tournament(population,t_size):
    """Deterministic. Minimization"""
    pool = random.sample(population, t_size)
    pool.sort(key=itemgetter(1))
    return pool[0]    

# ---------------------------------- Genetic Operators

# ------------- Crossover

def crossover(population,prob_cross, method):
    new_population = copy.deepcopy(population)
    offspring = []
    for i in range(0,len(population)-1,2):
        off_1,off_2 = method(new_population[i][0], new_population[i+1][0],prob_cross)
        offspring.extend([[off_1,0],[off_2,0]])
    if len(population)% 2 == 1:
        offspring.append(new_population[-1])                   
    return offspring
  
def one_point_cross(cromo_1, cromo_2,prob_cross):
    value = random.random()
    if value < prob_cross:
        pos = random.randint(0,len(cromo_1))
        f1 = cromo_1[0:pos] + cromo_2[pos:]
        f2 = cromo_2[0:pos] + cromo_1[pos:]
        return [f1,f2]
    else:
        return [cromo_1,cromo_2]
  

def uniform_cross(cromo_1, cromo_2,prob_cross):
    value = random.random()
    if value < prob_cross:
        f1=[]
        f2=[]
        for i in range(len(cromo_1)):
            if random.random() < 0.5:
                f1.append(cromo_1[i])
                f2.append(cromo_2[i])
            else:
                f1.append(cromo_2[i])
                f2.append(cromo_1[i])
        return [f1,f2]
    else:
        return [cromo_1,cromo_2]
  
#---------------- Mutation
def mutation(population,prob_muta, method):
    new_population = copy.deepcopy(population)
    offspring = []
    for i in range(len(population)):
        off = method(new_population[i][0],prob_muta)
        offspring.append([off,0])
    return offspring
    
def muta_reals_rastrigin(chromo, prob_muta):
    """For Rastrigin..."""
    new_chromo = copy.deepcopy(chromo)
    for i in range(len(new_chromo)):
        new_chromo[i] = muta_reals_gene(new_chromo[i],prob_muta, [-5.12,5.12], 1)
    return new_chromo
    
def muta_reals_gene(gene,prob_muta, domain_gene, sigma_gene):
    new_gene = gene
    value = random.random()
    if value < prob_muta:
        muta_value = random.gauss(0,sigma_gene)
        new_gene = gene + muta_value
        if new_gene < domain_gene[0]:
            new_gene = domain_gene[0]
        elif new_gene > domain_gene[1]:
            new_gene = domain_gene[1]
    return new_gene

# ----------------------------------- Selection of Survivors
def survivors_generational(parents,offspring):
    return offspring


def survivors_steady_state(parents,offspring):
    """Minimizing."""
    size = len(parents)
    parents.extend(offspring)
    parents.sort(key=itemgetter(1)) 
    return parents[:size]

def best_pop(population):
    """minimization"""
    population.sort(key=itemgetter(1))
    return population[0]
Finalmente a função a estudar que coincide com a função de mérito (fitness function).
# ------------------------------------- TEST FUNCTION
def rastrigin(x):
    """Rely on numpy arrays."""
    w = np.array(x)
    y= 10*len(w)+sum((w**2 - 10* np.cos(2*np.pi*w)))
    return y
Neste caso usamos uma representação em arrays do numpy porque torna os cálculos mais fáceis e o programa mais claro. Podemos deposi passar à fase de produção dos dados.
if __name__ == '__main__':
    file_name = '/Users/ernestojfcosta/my_python_env/rastrigin_cross.txt'
    run_parents_selection(30,file_name, 150, 10, rastrigin, 0.8, 0.01,tournament_sel, muta_reals_rastrigin, [one_point_cross,uniform_cross], survivors_steady_state, 500)
Quando executamos vamos ficar com os resultados num ficheiro e ao mesmo tempo vemos esses resultados.
Falta agora a análise estatística para o que vamos usar o programa SPSS 21. A primeira coisa a fazer é escolher o teste. Para isso sabemos que temos que responder a três questões: quantos grupos, emparelhados ou não, paramétrico ou não. Sabemos que temos dois grupos (um ponto e uniforme), emparelhados (cada execução é feita com a mesma população inicial). Precisamos apenas responder ao problema de saber se usamos testes paramétricos ou não paramétricos. Vamos testar a normalidade dos dados recorrendo ao teste de Kolgomorov-Smirnov.
Olhando para os resultados concluímos que podemos rejeitar a hipótese nula, pelo que não podemos aceitar a normalidade dos dados. Assim vamos recorrer ao teste de Wilcoxon. Correndo o teste os resultados são:
Perante estes resultados (sig=0.360) podemos concluir não poder rejeitar a hipótese nula (não há diferenças) pelo que o teste nos diz que não há diferenças estatisticamente significativas entre os dois operadores de cruzamento.

Podemos completar a análise calculando a dimensão do efeito. No caso do teste de Wilcoxon esse valor é dado pelo quociente da estatística (Z) pela raiz quadrada do número total de execuões (30+30). Neste caso será então r= -0.915/sqrt(60) = -0.1189, um valor pequeno.

sábado, 8 de março de 2014

Trepar em paralelo

O algoritmo trepa-colinas convencional melhora iterativamente uma solução olhando na sua vizinhança. Sabemos que um dos problemas maiores desta abordagem consiste na elevada probabilidade de o algoritmo convergir para um máximo local. Uma forma de tentar minorar esse problema é permitir que o algoritmo recomece de tempos a tempos a partir de um novo ponto escolhido aleatoriamente. Uma outra solução consiste em ter um conjunto de indivíduos a evoluir em paralelo,. Dito de outra forma, cada indivíduo da população evolui com base num trepa colinas convencional. A implementação desta versão não coloca muitos problemas.
import random
import operator
import copy

def paralell_hc(population_size, problem,max_iter):
    population = random_pop(problem, population_size)
    population = fitness_pop(population)
    for i in range(max_iter):
        for index,individual in enumerate(population):
            candidate,fit_candidate = individual
            # look for the first best
            for pos in range(len(candidate)):
                next_neighbor = neighbor(candidate,pos)
                cost_next_neighbor = fitness(next_neighbor)
                if cost_next_neighbor >= fit_candidate: # maximization
                    candidate = next_neighbor
                    fit_candidate = cost_next_neighbor
                    break
            population[index] = [candidate,fit_candidate]
    population.sort(key = operator.itemgetter(1))  
    return population
Notar que todos os indivíduos tentam evoluir e de forma autónoma. O trepa colinas individual é muito simples e escolhe a cada passo o primeiro vizinho melhor que o progenitor, caso exista.

Execute-se

Os algoritmos de inspiração biológica são algoritmos estocásticos. Significa isso, dentre outras coisas, que duas execuções consecutivas do mesmo algoritmo, mesmo com a mesma parametrização e população inicial dá, em geral, resultados diferentes. Daí que seja necessário executar os algoritmos várias vezes a analisar os resultados obtidos do ponto de vista estatístico, por exemplo, indicado os valores médios e o desvio padrão. Podemos alterar o nosso código acrescentando em particular uma nova função responsável pelas várias execuções e armazenamento dos resultados.
import matplotlib.pyplot as plt
from random import random,randint, shuffle, uniform,sample
from operator import itemgetter
from math import sqrt
from copy import deepcopy

def run(numb_runs,numb_generations,size_pop, size_cromo, prob_mut,prob_cross,selection,recombination,mutation,survivors, fit_func, phenotype,elite,t_size):
    best,average = sea(numb_generations,size_pop, size_cromo, prob_mut,prob_cross,selection,recombination,mutation,survivors, fit_func, phenotype,elite,t_size)
    best_of_best = [best]
    average_of_average = [average] 
    for i in range(numb_runs-1):
        best,average = sea(numb_generations,size_pop, size_cromo, prob_mut,prob_cross,selection,recombination,mutation,survivors, fit_func, phenotype,elite,t_size)
        best_of_best.append(best)
        average_of_average.append(average)
    best_by_gener = list(zip(*best_of_best))
    average_by_gener = list(zip(*average_of_average))
    data_best = [max(elem) for elem in best_by_gener]
    data_average = [sum(elem)/len(elem) for elem in average_by_gener]
    display(data_best, data_average)
Como se pode observar a função run chama o algoritmo evolucionário várias vezes, e espera que este devolva os valores do melhor indivíduo em cada geração e a média da qualidade dos indivíduos da população, também aqui em cada geração.

Por outro lado, a visualização é feita de modo simples, estando implementada para o caso concreto dos números de João Brandão.
def display(best,average):
    x = list(range(len(best)))
    plt.title('João Brandão using a Simple EA.')
    plt.xlabel('Generation')
    plt.ylabel('Fitness')
    plt.grid(True)
    plt.plot(x,best, label='Best')
    plt.plot(x,average, label='Average')
    plt.legend(loc='lower right')
    plt.show()
Estes gráficos são muito informativos e devemos aprender a interpretá-los. Dizem-nos, por exemplo, quantas gerações foram necessárias em média para alcançar a melhor solução; quando o valor do melhor e a média estão muito juntas significa uma perda de diversidade da população (e se isso acontece muito cedo indicia convergência prematura); se o melhor e a média estão muito afastados pode querer dizer que tivemos sorte a encontrar um elemento muito bom (o que pode acontecer por o problema ser muito difícil).

Ver para crer

Os algoritmos evolucionários são processos estocásticos que fazem uma procura paralela no espaço das soluções candidatas. Aprende-se muito sobre o funcionamento destes algoritmos vendo o modo como a qualidade dos indivíduos evolui ao longo das gerações. Esta breve nota destina-se a exemplificar de modo simples como se pode usar o módulo matplotlib do python para visualizar o resultado das execuções destes algoritmos.

O caso mais simples de gráfico é aquele em que temos os dados e mandamos fazer o gráfico correspondente. Não precisamos de muito:
import matplotlib.pyplot as plt

def visual_1(data):
    plt.plot(data)
    plt.show() 
Podemos começar com a cosmética, dando um título ao gráfico, nomes aos eixos, incluir uma grelha, acrescentar uma legenda, desenhar mais do que uma função, salvar o ra imagem num ficheiro:
import matplotlib.pyplot as plt

def visual_5(data_1,data_2):
    x = list(range(1,len(data_1)+1))
    plt.title('Two plots ')
    plt.grid(True)
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.plot(x,data_1, label= 'Log')
    plt.plot(x,data_2,label= 'Sqrt')
    plt.legend(loc='lower right')
    plt.savefig('/Users/ernestojfcosta/tmp/matplotlib_test.png')
    plt.show() 
Podemos controlar o aspecto das linhas:
def visual_6(data_1,data_2):
    x = list(range(1,len(data_1)+1))
    plt.title('Two plots ')
    plt.grid(True)
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.plot(x,data_1, 'r-o',label= 'Log')
    plt.plot(x,data_2,'g-.v',label= 'Sqrt')
    plt.legend(loc='lower right')
    plt.savefig('/Users/ernestojfcosta/tmp/matplotlib_test.png')
    plt.show() 
Nos exemplos acima os dados foram calculados antes de chamar a função de visualização. Mas podemos ter soluções diferentes. Por exemplo, podemos ter como argumento a função e o intervalo de valores de ‘X’ e o incremento:
def display_3(f, x_min, x_max, delta=0.001):
    """Compute values and display."""
    x = list(drange(x_min, x_max,delta))
    y = [f(i) for i in x]
    plt.title(f.__name__)
    plt.grid(True)
    plt.xlabel('X')
    plt.ylabel('Y= '+f.__name__ + '(X)')
    plt.plot(x,y, 'r')
    plt.show()
    
 def drange(start,stop,step=0.1):
    """ range for floats."""
    number = start
    while number < stop:
        yield number
        number += step
Veja-se o modo como se obtém o título do gráfico. Note-se ainda o uso de drange para gerar valores com um intervalo delta definido pelo utilizador. Para os leitores conhecedores de python podemos não usar drange se passarmos para arrays disponibilizados pelo módulo numpy:
import matplotlib.pyplot as plt
import numpy as np

def visual_22(data):
    plt.title('One more example...')
    plt.xlabel('X')
    plt.ylabel('Y')
    plt.plot(data)
    plt.show() 

if __name__ == '__main__':
    data_6 = [i**2 for i in np.arange(1,10,0.01)]
    visual_22(data_6)
Também podemos fazer gráficos 3D ... mas isso custa um pouco mais. Vejamos um casos ilustrativo.
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import numpy as np

def rastrigin():
    fig = plt.figure()
    fig.suptitle('Rastrigin', fontsize=18, fontweight='bold')
    ax = Axes3D(fig)
    
    
    X = np.arange(-5.12, 5.12, 0.1)
    Y = np.arange(-5.12, 5.12, 0.1)
    
    X, Y = np.meshgrid(X, Y)
    Z = 10*(X**2 - 10* np.cos(2*3.14159*X)) + 20*(Y**2 - 10* np.cos(2*3.14159*Y))
    
    ax.plot_surface(X, Y, Z, rstride=1,cstride=1,cmap=cm.jet)
    ax.legend()
    plt.show()

if __name__ == '__main__':
    rastrigin()
Agora o leitor interessado só tem que de consultar o manual do Matplotlib para ver a multiplicidade de possibilidades.