Source code for matador.swaps.swaps

# coding: utf-8
# Distributed under the terms of the MIT License.

""" This file implements atomic swaps through the `AtomicSwapper` class. """


import re
from copy import deepcopy
from matador.utils.print_utils import print_success, print_warning
from matador.utils.chem_utils import get_periodic_table, get_stoich


[docs]class AtomicSwapper: """This class handles the creation of input files from database queries that have swapped atoms. """ def __init__( self, cursor, swap=None, uniq=False, top=None, maintain_num_species=True, debug=False, **kwargs ): """Initialise class with query cursor and arguments. Parameters: cursor (list): cursor of documents to swap. Keyword arguments: swap (str): specification of swaps to perform, e.g. "LiP:KSn" will swap all Li->P and all K->Sn in the cursor. uniq (bool/float): filter documents by similarity with the default sim_tol (True) or the value provided here. top (int): only swap from the first `top` structures in the cursor. maintain_num_species (bool): only perform swaps that maintain the number of species in the structure debug (bool): enable debug output kwargs (dict): dictionary of extra arguments that should be ignored. """ # define some swap macros self.periodic_table = get_periodic_table() self.maintain_num_species = maintain_num_species self.swap_dict_list = None self.swap_args = swap del self.periodic_table["X"] self.template_structure = None self.cursor = list(cursor) if top is not None: self.cursor = self.cursor[:top] if len(self.cursor) == 0: return self.swap_counter = 0 self.parse_swaps(self.swap_args) swap_cursor = [] for doc in self.cursor: docs, counter = self.atomic_swaps(doc) self.swap_counter += counter if counter > 0: swap_cursor.extend(docs) self.cursor = swap_cursor if self.swap_counter > 0: print_success("Performed {} swaps.".format(self.swap_counter)) else: print_warning("No swaps performed.") if uniq: from matador.utils.cursor_utils import filter_unique_structures print("Filtering for unique structures...") filtered_cursor = filter_unique_structures( self.cursor, debug=debug, sim_tol=uniq ) print( "Filtered {} down to {}".format(len(self.cursor), len(filtered_cursor)) ) self.cursor = filtered_cursor
[docs] def parse_swaps(self, swap_args=None): """Parse command line options into valid atomic species swaps. e.g. --swap LiP:NaAs ==> [[['Li'], ['P']], [['Na'], ['P']]. Handles multiple many-to-many swaps, macros for groups of the periodic table, and wildcards. Keyword arguments: swap_args (str): overrides command-line swap args. """ self.swap_pairs = [] if swap_args is None: swap_args = self.swap_args if swap_args is None: raise RuntimeError("No swap arguments passed.") if isinstance(swap_args, str): swap_args = [swap_args.strip()] if len(swap_args) > 1: raise RuntimeError( "Detected whitespace in your input clear it and try again." ) swap_list = swap_args[0].split(":") for swap in swap_list: if len(swap) <= 1: raise RuntimeError("Not enough arguments for swap!") # check is both options are groups if "][" in swap: tmp_list = [x for x in swap.split("][") if x != ""] # check if only first option is group elif swap[0] == "[": tmp_list = [x for x in swap.split("]") if x != ""] # check if only last option is group elif swap[-1] == "]": tmp_list = [x for x in swap.split("[") if x != ""] # check if no groups else: tmp_list = [x for x in re.split(r"([A-Z][a-z]*)", swap) if x != ""] for ind, tmp in enumerate(tmp_list): tmp_list[ind] = self._atoms_to_list(tmp) if len(tmp_list) != 2: raise RuntimeError( "Unable to parse swap! {} should contain only two entries".format( tmp_list ) ) self.swap_pairs.append(tmp_list) self.construct_swap_options()
def _atoms_to_list(self, atom_string): """For a given set of atoms in a string, parse any macros and return a list of options. e.g. '[V' -> [<all group V atoms>], and 'V' -> ['V']. Parameters: atom_string (str): formula string with macros. """ if "[" in atom_string or "]" in atom_string: group = atom_string.replace("[", "") group = group.replace("]", "") if group in self.periodic_table: atom_list = self.periodic_table[group] else: atom_list = group.split(",") else: return [atom_string] return [x.strip() for x in atom_list]
[docs] def construct_swap_options(self): """Iterate over possible combinations of multiple many-to-many swaps and create a dict for each swap. """ self.swap_dict_list = [] from itertools import product for branch in product(*([pair[1] for pair in self.swap_pairs])): self.swap_dict_list.append(dict()) for ind, pair in enumerate(self.swap_pairs): for swap_from in pair[0]: if swap_from != branch[ind]: self.swap_dict_list[-1][swap_from] = branch[ind]
[docs] def atomic_swaps(self, source_doc): """Swap atomic species according to parsed options. Parameters: source_doc (dict): matador doc to swap from. """ new_doc = deepcopy(source_doc) swapped_docs = [] unswapped_num_species = len(set(source_doc["atom_types"])) for swap in self.swap_dict_list: if any(key in source_doc["atom_types"] for key in swap): new_doc["atom_types"] = [ swap.get(atom, atom) for atom in source_doc["atom_types"] ] new_doc["_swapped_stoichiometry"] = get_stoich(source_doc["atom_types"]) new_doc["stoichiometry"] = get_stoich(new_doc["atom_types"]) new_doc["elems"] = set(new_doc["atom_types"]) new_doc["num_species"] = len(new_doc["elems"]) if ( not self.maintain_num_species or new_doc["num_species"] == unswapped_num_species ): swapped_doc = deepcopy(new_doc) swapped_docs.append(swapped_doc) return swapped_docs, len(swapped_docs)