Source code for matador.fingerprints.fingerprint

# coding: utf-8
# Distributed under the terms of the MIT License.

""" This file implements the base class for all "fingerprints", which
here refers to any object derived from purely structural features of a
crystal, e.g. pair distribution functions (PDF) or simulated powder
X-ray diffraction (PXRD) spectra.

"""

# TODO: wrap these broadening methods with heuristics to decide which to use

import abc
import multiprocessing as mp
import os
import time
import sys
import functools

import psutil

try:
    from numba import njit
except (ImportError, ModuleNotFoundError):
    # Fake empty decorator to avoid needing numba installed
    njit = lambda f: f  # noqa
import numpy as np

from matador.utils.print_utils import print_notify
from matador.crystal import Crystal


[docs]class Fingerprint(abc.ABC): fingerprint = None default_key = None @abc.abstractmethod def __init__(self, doc, lazy=True, *args, **kwargs): pass
[docs] @abc.abstractmethod def calculate(self): pass
@staticmethod # @numba.njit def _broadening_space_dominated( distances, r_space, width, broadening_type="gaussian" ): """Add broadening to the PDF by convolving distances with the radial space and summing. More memory-efficient if len(r_space) is less than len(distances). Parameters: distances (numpy.ndarray): array of pair-wise distances. r_space (numpy.ndarray): radial grid width (float): amount of broadening. broadening_type (str): 'gaussian' or 'lorentzian'. Returns: gr (numpy.ndarray): the unnormalised PDF. """ new_space = ( np.reshape(r_space, (1, len(r_space))) - np.reshape(distances, (1, len(distances))).T ) if broadening_type == "lorentzian": width /= 2 return np.sum(1 / (1 + (new_space / width) ** 2), axis=0) return np.sum(np.exp(-((new_space / width) ** 2)), axis=0) @staticmethod @njit def _broadening_distance_dominated( hist, r_space, width, broadening_type="gaussian" ): """Add broadening to the PDF by convolving the distance histogram with the radial space and summing. Potentially more memory-efficient than the alternative implementation if len(distances) > len(r_space). Parameters: hist (numpy.ndarray): histogram of pairwise frequencies. r_space (numpy.ndarray): radial grid width (float): amount of gaussian broadening. broadening_type (str): 'gaussian' or 'lorentzian'. Returns: gr (numpy.ndarray): the unnormalised PDF. """ new_space = ( np.reshape(r_space, (1, len(r_space))) - np.reshape(r_space, (1, len(r_space))).T ) if broadening_type == "lorentzian": width /= 2 return np.sum(hist / (1 + (new_space / width) ** 2), axis=1) return np.sum(hist * np.exp(-((new_space / width) ** 2)), axis=1) @staticmethod @njit def _broadening_unrolled(hist, r_space, width, broadening_type="gaussian"): """Add broadening to the PDF by convolving the distance histogram with the radial space and summing. Unrolled loop to save memory. Parameters: hist (numpy.ndarray): histogram of pairwise frequencies. r_space (numpy.ndarray): radial grid width (float): amount of gaussian broadening. broadening_type (str): 'gaussian' or 'lorentzian'. Returns: gr (numpy.ndarray): the unnormalised PDF. """ gr = np.zeros_like(r_space) if broadening_type == "lorentzian": width /= 2 for ind, _ in enumerate(hist): if hist[ind] != 0: gr += hist[ind] / (1 + ((r_space - r_space[ind]) / width) ** 2) else: for ind, _ in enumerate(hist): if hist[ind] != 0: gr += hist[ind] * np.exp(-(((r_space - r_space[ind]) / width) ** 2)) return gr
[docs]class FingerprintFactory(abc.ABC): """This class computes Fingerprint objects from a list of structures, using multiprocessing to perform calculations concurrently. The computed fingerprints are stored in each structure's dictionary under the default key defined by the Fingerprint objects. Note: The number of processes used to concurrency is set by the following hierarchy: ``$SLURM_NTASKS -> $OMP_NUM_THREADS -> psutil.cpu_count(logical=False)``. Attributes: nprocs (int): number of concurrent processes to be used. """ fingerprint = None default_key = None def __init__(self, cursor, required_inds=None, debug=False, **fprint_args): """Compute PDFs over n processes, where n is set by either ``$SLURM_NTASKS``, ``$OMP_NUM_THREADS`` or physical core count. Parameters: cursor (list of dict): list of matador structures fingerprint (Fingerprint): class to compute for each structure Keyword arguments: pdf_args (dict): arguments to pass to the fingerprint calculator required_inds (list(int)): indices in cursor to skip. """ if required_inds is None: required_inds = list(range(len(cursor))) elif len(required_inds) == 0: return else: print( "Skipping {} structures out of {} as no comparisons are required".format( len(cursor) - len(required_inds), len(cursor) ) ) if self.fingerprint is None or self.default_key is None: raise NotImplementedError( "Do not create FingerprintFactory directly, " "use the appropriate sub-class!" ) # create list of empty (lazy) PDF objects if "lazy" in fprint_args: del fprint_args["lazy"] for ind, doc in enumerate(cursor): if isinstance(doc, Crystal): doc._data.pop(self.default_key, None) if ind in required_inds: doc[self.default_key] = self.fingerprint(doc, lazy=True, **fprint_args) else: doc[self.default_key] = None compute_list = [doc for ind, doc in enumerate(cursor) if ind in required_inds] # how many processes to use? either SLURM_NTASKS, OMP_NUM_THREADS or total num CPUs if os.environ.get("SLURM_NTASKS") is not None: self.nprocs = int(os.environ.get("SLURM_NTASKS")) env = "$SLURM_NTASKS" elif os.environ.get("OMP_NUM_THREADS") is not None: self.nprocs = int(os.environ.get("OMP_NUM_THREADS")) env = "$OMP_NUM_THREADS" else: self.nprocs = psutil.cpu_count(logical=False) env = "core count" print_notify( "Running {} jobs on at most {} processes, set by {}.".format( len(required_inds), self.nprocs, env ) ) self.nprocs = min(len(compute_list), self.nprocs) start = time.time() if self.nprocs == 1: import tqdm for ind, doc in tqdm.tqdm(enumerate(cursor)): if cursor[ind][self.default_key] is not None: cursor[ind][self.default_key].calculate() else: pool = mp.Pool(processes=self.nprocs) fprint_cursor = [] # for large cursors, set chunk to at most 16 # for smaller cursors, tend to use chunksize 1 for improved load balancing chunksize = min(max(1, int(0.25 * len(compute_list) / self.nprocs)), 16) results = pool.map_async( functools.partial(_calc_fprint_pool_wrapper, key=self.default_key), compute_list, callback=fprint_cursor.extend, error_callback=print, chunksize=chunksize, ) pool.close() width = len(str(len(required_inds))) total = len(required_inds) while not results.ready(): sys.stdout.write( "{done:{width}d} / {total:{width}d} {percentage:3d}%\r".format( width=width, done=total - results._number_left, total=total, percentage=int(100 * (total - results._number_left) / total), ) ) sys.stdout.flush() time.sleep(1) if len(fprint_cursor) != len(required_inds): raise RuntimeError( "There was an error calculating the desired Fingerprint" ) fprint_ind = 0 for ind, doc in enumerate(cursor): if ind in required_inds: if isinstance(cursor[ind], Crystal): cursor[ind]._data.pop(self.default_key, None) cursor[ind][self.default_key] = fprint_cursor[fprint_ind][ self.default_key ] fprint_ind += 1 elapsed = time.time() - start if debug: pool.close() print("Compute time: {:.4f} s".format(elapsed)) print("Work complete!")
def _calc_fprint_pool_wrapper(doc, key=None): """Evaluate Fingerprint of a structure where a lazy init of the doc's Fingerprint object has already been made. Parameters: doc (dict): matador structures with empty PDF """ doc[key].calculate() return {key: doc[key]}