# -*- coding: utf-8 -*-
""" Copyright 2020 Marco Arrigoni
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import copy
import numpy as np
import pandas as pd
import os
from mpi4py import MPI
from ase import Atoms
from ase.io.vasp import write_vasp
from ase.calculators.calculator import CalculationFailed
from clinamen.evpd import ROOT_LOGGER as logger
from clinamen.evpd import LOGFILE_NAME
from clinamen.utils import counter
proc_no = MPI.COMM_WORLD.Get_rank()
[docs]class Individual(Atoms):
""" Class for representing an individual in a population.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._total_energy_calculations = 0
# used to keep track of changes in the structure
self._atoms_check = None
# keep track of the original structure
self._original_structure = Atoms(*args, **kwargs)
# For cloning instances
self.calculator_factory = None
self.calculator_parameters = None
self.skip_calc = False # when fitness if produced with metamodel
# it must be set to True to avoid another
# calculation
[docs] def set_calculator_factory(self, calc_factory, calc_parameters):
"""
Parameters
----------
calc_factory : a class derived from
``ase.calculators.interface.Calculator`` or a function
generating a calculator
calc_parameters : dict
keyword-value pairs used to initialize a ``calc_factory``
instance
"""
if callable(calc_factory) or issubclass(calc_factory, type):
params = copy.deepcopy(calc_parameters)
calculator = calc_factory(**calc_parameters)
calculator.calculator_parameters = copy.deepcopy(params)
calculator.calculator_results = dict()
self.calculator_factory = calc_factory
self.calculator_parameters = copy.deepcopy(params)
super().set_calculator(calculator)
else:
raise TypeError('calc_factory must be an object able to '
'instantiate a new calculator, i.e. '
'either a function or a class')
@counter
def get_potential_energy(self, *args, **kwargs):
attempt_count = 0
while True:
try:
attempt_count += 1
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
'Attempting energy calculation for '
f'individual: {self.my_name}\n'
f'Attempt number: {attempt_count}')
ene = super().get_potential_energy(*args, **kwargs)
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
f'Energy counter +1 for "{self.my_name}"')
except CalculationFailed:
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
f'Calculating fitness for '
f'individual {self.my_name} '
'failed. Trying again...')
continue
else:
self._total_energy_calculations += 1
break
self.calc.calculator_results['energy'] = ene
# update
self._atoms_check = self.copy()
return ene
[docs] def get_forces(self, *args, **kwargs):
forces = super().get_forces(*args, **kwargs)
self.calc.calculator_results['forces'] = forces.copy()
return forces
[docs] def calculation_required(self):
""" Returns True if a new energy calculation is required """
if not self.calc:
raise RuntimeError('The Individual instance has no '
'set calculator')
try:
self.calc.calculator_results['energy']
except KeyError: # energy was never calculated
return True
changes = ['positions', 'cell', 'numbers', 'pbc']
if self.skip_calc:
return False
else:
for prop in changes:
value1 = getattr(self, prop)
value2 = getattr(self._atoms_check, prop)
if not (value1 == value2).all():
return True
[docs] def update_fitness(self):
""" Calculate the total energy of the individual """
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
'Calculating fitness of '
f'individual "{self.my_name}"')
if self.calc is not None:
if self.calculation_required():
self._fitness = -self.get_potential_energy()
else:
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
f'{self.my_name} did not change. Using '
'cached fitness')
self._fitness = -self.calc.calculator_results['energy']
else:
if proc_no == 0:
logger.critical(f'Process {proc_no}: '
'Trying to calculate the cost, '
f'but the Individual "{self.my_name}" '
'instance has no calculator')
raise RuntimeError('The Individual instance has no '
'set calculator')
@property
def cost(self):
""" Value of the cost function of the individual (its energy) """
return -self.fitness
@property
def fitness(self):
""" Fitness value of the individual """
self.update_fitness() # this will initialize self._fitness
if hasattr(self, '_fitness_transformed'):
return self._fitness_transformed
return self._fitness
@property
def defect_position(self):
""" Location of the eventual defect in the structure """
return self._defect_position
@defect_position.setter
def defect_position(self, value):
""" Scaled positions of the defective site """
if isinstance(value, (list, tuple, np.ndarray)):
self._defect_position = np.array(value)
else:
if proc_no == 0:
logger.exception(f'Process {proc_no}: '
'The defect position is required, but '
f'the Individual "{self.my_name}" '
'instance has no set defect position.')
raise TypeError('Defect position must be given as a 1D array')
@property
def metric_tensor(self):
""" The metric tensor of the cell """
cell = self.get_cell()[:]
metric = np.dot(cell, cell.T)
return metric
@property
def distances_from_defect(self):
""" Distance of each atom in the system from the defect """
self._distances_from_defect = self._get_distances_from_defect()
return self._distances_from_defect
@property
def my_name(self):
""" Identifier """
if not hasattr(self, '_my_name'):
self._my_name = 'individual'
return self._my_name
@my_name.setter
def my_name(self, value):
self._my_name = value
@property
def etol(self):
""" Tolerance for comparing costs between two individuals """
return self._etol
@etol.setter
def etol(self, value):
self._etol = value
@property
def drel(self):
r""" Tolerance for the relative distance between two individuals.
The relative distance is defined as:
.. math::
d_{rel}(i, j) = \frac{\sum_k |d_i(k) - d_j(k)|}{\sum_k d_i(k)}
"""
return self._drel
@drel.setter
def drel(self, value):
self._drel = value
@property
def dmax(self):
""" Tolerance for the maximum distance discrepancy
between two individuals.
This distance is defined as:
.. math::
d_max(i, j) = max_k(|d_i(k) - d_j(k)|)
"""
return self._dmax
@dmax.setter
def dmax(self, value):
self._dmax = value
@property
def dmin(self):
""" Tolerance for the minimum distance at which two atoms
can be located. Used to reject a structure where two atoms
are too close.
Default value = 0.25 minimum bond length in the system
"""
if hasattr(self, '_dmin') and self._dmin is not None:
return self._dmin
distances = self._original_structure.get_all_distances(mic=True)
mindist = distances[0][1:]
mindist = np.min(mindist)
self._dmin = 0.25*mindist
return self._dmin
@dmin.setter
def dmin(self, value):
self._dmin = value
@property
def chromosome(self):
""" The chromosome of an individual is the set of displacements
from an initial configuration. Usually one very similar to the
pristine system
"""
chromosome = self.positions - self._original_structure.positions
return chromosome
@property
def total_energy_calculations(self):
""" Number of times the energy of the individual has been calculated
"""
return self._total_energy_calculations
@property
def data(self):
positions = [['x' + str(i), 'y' + str(i), 'z' + str(i)]
for i in range(len(self))]
positions = [tag for sublist in positions for tag in sublist]
data_pos = self.positions.ravel()
displacements = [['dx' + str(i), 'dy' + str(i), 'dz' + str(i)]
for i in range(len(self))]
displacements = [tag for sublist in displacements for tag in sublist]
try:
data_disp = self.chromosome.ravel()
except AttributeError:
data_disp = np.zeros_like(data_pos)
lpos = ['Positions']*len(positions)
lchrom = ['Chromosome']*len(displacements)
level1 = lpos + lchrom
level1.extend(['Fitness']*2)
level2 = positions + displacements
level2.extend(['From Energy', 'Actual'])
index = pd.MultiIndex.from_arrays([level1, level2])
data = list(data_pos)
data.extend(list(data_disp))
if not self.calculation_required():
transformed_fitness = self.fitness #update fitness if needed
from_energy = self._fitness
data.extend([from_energy, transformed_fitness])
else:
data.extend([np.nan, np.nan])
series = pd.Series(index=index, data=data, dtype=np.float,
name=self.my_name)
return series
@property
def founder(self):
if hasattr(self, '_founder'):
return self._founder
founder_atoms = self._original_structure
founder = Individual.make_individual_from_ase_atoms(founder_atoms)
if hasattr(self, '_defect_position'):
founder.defect_position = self.defect_position.copy()
founder.my_name = self.my_name + '_founder'
if hasattr(self, '_etol'):
founder.etol = self.etol
if hasattr(self, '_drel'):
founder.drel = self.drel
if hasattr(self, '_dmax'):
founder.dmax = self.dmax
if hasattr(self, '_dmin'):
founder.dmin = self.dmin
founder._original_structure = self._original_structure.copy()
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
f'Founder "{founder.my_name}" created')
self._founder = founder
return self._founder
[docs] def has_proper_structure(self):
""" Returns False if any interatomic distance in the system is
smaller than ``self.dmin``.
"""
distances = self.get_all_distances(mic=True)
#remove diagonal elements
vdistances = distances[~np.eye(distances.shape[0],dtype=bool)]
return (vdistances > self.dmin).all()
def _get_distances_from_defect(self):
""" Calculate the distance of each atom in the cell from the defect
site using PBCs """
def_pos = self.defect_position
positions = self.get_scaled_positions()
cell = self.get_cell()[:]
diff_pos = positions - def_pos
diff_pos -= np.rint(diff_pos)
metric = self.metric_tensor
distances = np.diag(np.dot(diff_pos, np.dot(metric.T, diff_pos.T)))
return np.sqrt(distances)
def _get_distances_from_origin(self):
scaled_pos = self.get_scaled_positions()
scaled_pos -= np.rint(scaled_pos)
metric = self.metric_tensor
distances = np.diag(np.dot(scaled_pos, np.dot(metric.T, scaled_pos.T)))
return np.sqrt(distances)
[docs] def write_poscar(self, path=None):
""" If path is not None, it is the folder where the POSCAR file
will be saved
"""
filename = 'POSCAR_' + self.my_name
if path:
filename = os.path.join(path, filename)
write_vasp(filename, self, direct=True, sort=None, vasp5=True)
if proc_no == 0:
logger.info(f'Process {proc_no}: '
f'Individual "{self.my_name}" configuration '
'saved as POSCAR file '
f'with name: "{filename}"')
[docs] def clone(self):
""" Copy the instance, return a new instance """
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
f'Creating clone of Individual "{self.my_name}"')
copied_item = Individual.make_individual_from_ase_atoms(self)
copied_atoms = self.copy()
copied_original = self._original_structure.copy()
copied_item._original_structure = copied_original
copied_item._atoms_check = copied_atoms
copied_item.set_calculator_factory(self.calculator_factory,
self.calculator_parameters)
if hasattr(self, '_fitness'):
copied_item.calc.calculator_results['energy'] = \
self.calc.calculator_results['energy']
try:
copied_item.calc.calculator_results['forces'] = \
self.calc.calculator_results['forces'].copy()
except KeyError:
pass
if hasattr(self, '_defect_position'):
copied_item.defect_position = self.defect_position.copy()
copied_item.my_name = self.my_name + '_clone'
if hasattr(self, '_etol'):
copied_item.etol = self.etol
if hasattr(self, '_drel'):
copied_item.drel = self.drel
if hasattr(self, '_dmax'):
copied_item.dmax = self.dmax
if hasattr(self, '_dmin'):
copied_item.dmin = self.dmin
if hasattr(self, '_fitness'):
copied_item._fitness = self._fitness
if hasattr(self, '_fitness_transformed'):
copied_item._fitness_transformed = self._fitness_transformed
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
f'Clone "{copied_item.my_name}" created')
return copied_item
[docs] def calculate_comparison_distances(self, other):
""" Calculates the relative distance and the maximum distance
discrepancy between this individual and another one.
Parameters
----------
other : Individual instance
Return
------
rel_dist, max_dist : the relative and maximum discrepancy
distance between the two instances
"""
d_1 = self._get_distances_from_origin()
d_2 = other._get_distances_from_origin()
diff = np.abs(d_1 - d_2)
num = diff.sum()
den = np.sum(d_1)
return num/den, np.max(diff)
[docs] def optimize_structure(self, **kwargs):
""" Optimize the structure
Parameters
----------
kwargs : dict
parameters for running the geometry optimization.
A mandatory key is `optimizer`, which is an ase optimizer.
The other key:value pairs are the parameters to supply to
`optimizer`.
"""
if proc_no == 0:
logger.info(f'Process {proc_no}: '
'Optimizing structure of '
f'individual "{self.my_name}"')
optimizer = kwargs.pop('optimizer', None)
if optimizer is None:
raise ValueError('An optimizer should be given for '
'optimizing the structure')
with open(LOGFILE_NAME, 'a') as fileobj:
optimizer = optimizer(self, logfile=fileobj)
optimizer.run(**kwargs)
if proc_no == 0:
logger.info(f'Process {proc_no}: Optimization completed')
[docs] @staticmethod
def make_individual_from_ase_atoms(atoms):
""" Takes an ase.Atoms instance and returns
an evpd.core.Individual instance.
The result is analogous to using ``atoms.copy()``,
but also the calculator will be copied.
"""
if not isinstance(atoms, Atoms):
raise TypeError('The function argument is not '
'an ase.Atoms instance')
if proc_no == 0:
logger.debug(f'Process {proc_no}: '
'Generating an Individual from '
'an Atoms object')
cell = atoms.cell[:].copy()
pbc = atoms.pbc.copy()
celldisp = atoms._celldisp.copy()
info = atoms.info.copy()
individual = Individual(cell=cell, pbc=pbc,
celldisp=celldisp, info=info)
individual.arrays = dict()
for key, value in atoms.arrays.items():
individual.arrays[key] = value.copy()
individual.constraints = copy.deepcopy(atoms.constraints)
individual._original_structure = atoms.copy()
return individual