Source code for matador.fingerprints.fingerprint

# coding: utf-8
# Distributed under the terms of the MIT License.

""" This file implements the base class for all "fingerprints", which
here refers to any object derived from purely structural features of a
crystal, e.g. pair distribution functions (PDF) or simulated powder
X-ray diffraction (PXRD) spectra.

"""

# TODO: wrap these broadening methods with heuristics to decide which to use

import abc
import multiprocessing as mp
import os
import time
import sys
import functools

import psutil
import numba
import numpy as np

from matador.utils.print_utils import print_notify
from matador.crystal import Crystal


[docs]class Fingerprint(abc.ABC): fingerprint = None default_key = None @abc.abstractmethod def __init__(self, doc, lazy=True, *args, **kwargs): pass
[docs] @abc.abstractmethod def calculate(self): pass
@staticmethod # @numba.njit def _broadening_space_dominated( distances, r_space, width, broadening_type="gaussian" ): """Add broadening to the PDF by convolving distances with the radial space and summing. More memory-efficient if len(r_space) is less than len(distances). Parameters: distances (numpy.ndarray): array of pair-wise distances. r_space (numpy.ndarray): radial grid width (float): amount of broadening. broadening_type (str): 'gaussian' or 'lorentzian'. Returns: gr (numpy.ndarray): the unnormalised PDF. """ new_space = ( np.reshape(r_space, (1, len(r_space))) - np.reshape(distances, (1, len(distances))).T ) if broadening_type == "lorentzian": width /= 2 return np.sum(1 / (1 + (new_space / width) ** 2), axis=0) return np.sum(np.exp(-((new_space / width) ** 2)), axis=0) @staticmethod @numba.njit def _broadening_distance_dominated( hist, r_space, width, broadening_type="gaussian" ): """Add broadening to the PDF by convolving the distance histogram with the radial space and summing. Potentially more memory-efficient than the alternative implementation if len(distances) > len(r_space). Parameters: hist (numpy.ndarray): histogram of pairwise frequencies. r_space (numpy.ndarray): radial grid width (float): amount of gaussian broadening. broadening_type (str): 'gaussian' or 'lorentzian'. Returns: gr (numpy.ndarray): the unnormalised PDF. """ new_space = ( np.reshape(r_space, (1, len(r_space))) - np.reshape(r_space, (1, len(r_space))).T ) if broadening_type == "lorentzian": width /= 2 return np.sum(hist / (1 + (new_space / width) ** 2), axis=1) return np.sum(hist * np.exp(-((new_space / width) ** 2)), axis=1) @staticmethod @numba.njit def _broadening_unrolled(hist, r_space, width, broadening_type="gaussian"): """Add broadening to the PDF by convolving the distance histogram with the radial space and summing. Unrolled loop to save memory. Parameters: hist (numpy.ndarray): histogram of pairwise frequencies. r_space (numpy.ndarray): radial grid width (float): amount of gaussian broadening. broadening_type (str): 'gaussian' or 'lorentzian'. Returns: gr (numpy.ndarray): the unnormalised PDF. """ gr = np.zeros_like(r_space) if broadening_type == "lorentzian": width /= 2 for ind, _ in enumerate(hist): if hist[ind] != 0: gr += hist[ind] / (1 + ((r_space - r_space[ind]) / width) ** 2) else: for ind, _ in enumerate(hist): if hist[ind] != 0: gr += hist[ind] * np.exp(-(((r_space - r_space[ind]) / width) ** 2)) return gr
[docs]class FingerprintFactory(abc.ABC): """This class computes Fingerprint objects from a list of structures, using multiprocessing to perform calculations concurrently. The computed fingerprints are stored in each structure's dictionary under the default key defined by the Fingerprint objects. Note: The number of processes used to concurrency is set by the following hierarchy: ``$SLURM_NTASKS -> $OMP_NUM_THREADS -> psutil.cpu_count(logical=False)``. Attributes: nprocs (int): number of concurrent processes to be used. """ fingerprint = None default_key = None def __init__(self, cursor, required_inds=None, debug=False, **fprint_args): """Compute PDFs over n processes, where n is set by either ``$SLURM_NTASKS``, ``$OMP_NUM_THREADS`` or physical core count. Parameters: cursor (list of dict): list of matador structures fingerprint (Fingerprint): class to compute for each structure Keyword arguments: pdf_args (dict): arguments to pass to the fingerprint calculator required_inds (list(int)): indices in cursor to skip. """ if required_inds is None: required_inds = list(range(len(cursor))) elif len(required_inds) == 0: return else: print( "Skipping {} structures out of {} as no comparisons are required".format( len(cursor) - len(required_inds), len(cursor) ) ) if self.fingerprint is None or self.default_key is None: raise NotImplementedError( "Do not create FingerprintFactory directly, " "use the appropriate sub-class!" ) # create list of empty (lazy) PDF objects if "lazy" in fprint_args: del fprint_args["lazy"] for ind, doc in enumerate(cursor): if isinstance(doc, Crystal): doc._data.pop(self.default_key, None) if ind in required_inds: doc[self.default_key] = self.fingerprint(doc, lazy=True, **fprint_args) else: doc[self.default_key] = None compute_list = [doc for ind, doc in enumerate(cursor) if ind in required_inds] # how many processes to use? either SLURM_NTASKS, OMP_NUM_THREADS or total num CPUs if os.environ.get("SLURM_NTASKS") is not None: self.nprocs = int(os.environ.get("SLURM_NTASKS")) env = "$SLURM_NTASKS" elif os.environ.get("OMP_NUM_THREADS") is not None: self.nprocs = int(os.environ.get("OMP_NUM_THREADS")) env = "$OMP_NUM_THREADS" else: self.nprocs = psutil.cpu_count(logical=False) env = "core count" print_notify( "Running {} jobs on at most {} processes, set by {}.".format( len(required_inds), self.nprocs, env ) ) self.nprocs = min(len(compute_list), self.nprocs) start = time.time() if self.nprocs == 1: import tqdm for ind, doc in tqdm.tqdm(enumerate(cursor)): if cursor[ind][self.default_key] is not None: cursor[ind][self.default_key].calculate() else: pool = mp.Pool(processes=self.nprocs) fprint_cursor = [] # for large cursors, set chunk to at most 16 # for smaller cursors, tend to use chunksize 1 for improved load balancing chunksize = min(max(1, int(0.25 * len(compute_list) / self.nprocs)), 16) results = pool.map_async( functools.partial(_calc_fprint_pool_wrapper, key=self.default_key), compute_list, callback=fprint_cursor.extend, error_callback=print, chunksize=chunksize, ) pool.close() width = len(str(len(required_inds))) total = len(required_inds) while not results.ready(): sys.stdout.write( "{done:{width}d} / {total:{width}d} {percentage:3d}%\r".format( width=width, done=total - results._number_left, total=total, percentage=int(100 * (total - results._number_left) / total), ) ) sys.stdout.flush() time.sleep(1) if len(fprint_cursor) != len(required_inds): raise RuntimeError( "There was an error calculating the desired Fingerprint" ) fprint_ind = 0 for ind, doc in enumerate(cursor): if ind in required_inds: if isinstance(cursor[ind], Crystal): cursor[ind]._data.pop(self.default_key, None) cursor[ind][self.default_key] = fprint_cursor[fprint_ind][ self.default_key ] fprint_ind += 1 elapsed = time.time() - start if debug: pool.close() print("Compute time: {:.4f} s".format(elapsed)) print("Work complete!")
def _calc_fprint_pool_wrapper(doc, key=None): """Evaluate Fingerprint of a structure where a lazy init of the doc's Fingerprint object has already been made. Parameters: doc (dict): matador structures with empty PDF """ doc[key].calculate() return {key: doc[key]}