# coding: utf-8
# Distributed under the terms of the MIT License.
""" This file implements the base class for all "fingerprints", which
here refers to any object derived from purely structural features of a
crystal, e.g. pair distribution functions (PDF) or simulated powder
X-ray diffraction (PXRD) spectra.
"""
# TODO: wrap these broadening methods with heuristics to decide which to use
import abc
import multiprocessing as mp
import os
import time
import sys
import functools
import psutil
try:
from numba import njit
except (ImportError, ModuleNotFoundError):
# Fake empty decorator to avoid needing numba installed
njit = lambda f: f # noqa
import numpy as np
from matador.utils.print_utils import print_notify
from matador.crystal import Crystal
[docs]class Fingerprint(abc.ABC):
fingerprint = None
default_key = None
@abc.abstractmethod
def __init__(self, doc, lazy=True, *args, **kwargs):
pass
[docs] @abc.abstractmethod
def calculate(self):
pass
@staticmethod
# @numba.njit
def _broadening_space_dominated(
distances, r_space, width, broadening_type="gaussian"
):
"""Add broadening to the PDF by convolving distances with
the radial space and summing. More memory-efficient if len(r_space)
is less than len(distances).
Parameters:
distances (numpy.ndarray): array of pair-wise distances.
r_space (numpy.ndarray): radial grid
width (float): amount of broadening.
broadening_type (str): 'gaussian' or 'lorentzian'.
Returns:
gr (numpy.ndarray): the unnormalised PDF.
"""
new_space = (
np.reshape(r_space, (1, len(r_space)))
- np.reshape(distances, (1, len(distances))).T
)
if broadening_type == "lorentzian":
width /= 2
return np.sum(1 / (1 + (new_space / width) ** 2), axis=0)
return np.sum(np.exp(-((new_space / width) ** 2)), axis=0)
@staticmethod
@njit
def _broadening_distance_dominated(
hist, r_space, width, broadening_type="gaussian"
):
"""Add broadening to the PDF by convolving the distance histogram with
the radial space and summing. Potentially more memory-efficient than the alternative
implementation if len(distances) > len(r_space).
Parameters:
hist (numpy.ndarray): histogram of pairwise frequencies.
r_space (numpy.ndarray): radial grid
width (float): amount of gaussian broadening.
broadening_type (str): 'gaussian' or 'lorentzian'.
Returns:
gr (numpy.ndarray): the unnormalised PDF.
"""
new_space = (
np.reshape(r_space, (1, len(r_space)))
- np.reshape(r_space, (1, len(r_space))).T
)
if broadening_type == "lorentzian":
width /= 2
return np.sum(hist / (1 + (new_space / width) ** 2), axis=1)
return np.sum(hist * np.exp(-((new_space / width) ** 2)), axis=1)
@staticmethod
@njit
def _broadening_unrolled(hist, r_space, width, broadening_type="gaussian"):
"""Add broadening to the PDF by convolving the distance histogram with
the radial space and summing. Unrolled loop to save memory.
Parameters:
hist (numpy.ndarray): histogram of pairwise frequencies.
r_space (numpy.ndarray): radial grid
width (float): amount of gaussian broadening.
broadening_type (str): 'gaussian' or 'lorentzian'.
Returns:
gr (numpy.ndarray): the unnormalised PDF.
"""
gr = np.zeros_like(r_space)
if broadening_type == "lorentzian":
width /= 2
for ind, _ in enumerate(hist):
if hist[ind] != 0:
gr += hist[ind] / (1 + ((r_space - r_space[ind]) / width) ** 2)
else:
for ind, _ in enumerate(hist):
if hist[ind] != 0:
gr += hist[ind] * np.exp(-(((r_space - r_space[ind]) / width) ** 2))
return gr
[docs]class FingerprintFactory(abc.ABC):
"""This class computes Fingerprint objects from a list of structures,
using multiprocessing to perform calculations concurrently. The computed
fingerprints are stored in each structure's dictionary under the
default key defined by the Fingerprint objects.
Note:
The number of processes used to concurrency is set by the following
hierarchy:
``$SLURM_NTASKS -> $OMP_NUM_THREADS -> psutil.cpu_count(logical=False)``.
Attributes:
nprocs (int): number of concurrent processes to be used.
"""
fingerprint = None
default_key = None
def __init__(self, cursor, required_inds=None, debug=False, **fprint_args):
"""Compute PDFs over n processes, where n is set by either
``$SLURM_NTASKS``, ``$OMP_NUM_THREADS`` or physical core count.
Parameters:
cursor (list of dict): list of matador structures
fingerprint (Fingerprint): class to compute for each structure
Keyword arguments:
pdf_args (dict): arguments to pass to the fingerprint calculator
required_inds (list(int)): indices in cursor to skip.
"""
if required_inds is None:
required_inds = list(range(len(cursor)))
elif len(required_inds) == 0:
return
else:
print(
"Skipping {} structures out of {} as no comparisons are required".format(
len(cursor) - len(required_inds), len(cursor)
)
)
if self.fingerprint is None or self.default_key is None:
raise NotImplementedError(
"Do not create FingerprintFactory directly, "
"use the appropriate sub-class!"
)
# create list of empty (lazy) PDF objects
if "lazy" in fprint_args:
del fprint_args["lazy"]
for ind, doc in enumerate(cursor):
if isinstance(doc, Crystal):
doc._data.pop(self.default_key, None)
if ind in required_inds:
doc[self.default_key] = self.fingerprint(doc, lazy=True, **fprint_args)
else:
doc[self.default_key] = None
compute_list = [doc for ind, doc in enumerate(cursor) if ind in required_inds]
# how many processes to use? either SLURM_NTASKS, OMP_NUM_THREADS or total num CPUs
if os.environ.get("SLURM_NTASKS") is not None:
self.nprocs = int(os.environ.get("SLURM_NTASKS"))
env = "$SLURM_NTASKS"
elif os.environ.get("OMP_NUM_THREADS") is not None:
self.nprocs = int(os.environ.get("OMP_NUM_THREADS"))
env = "$OMP_NUM_THREADS"
else:
self.nprocs = psutil.cpu_count(logical=False)
env = "core count"
print_notify(
"Running {} jobs on at most {} processes, set by {}.".format(
len(required_inds), self.nprocs, env
)
)
self.nprocs = min(len(compute_list), self.nprocs)
start = time.time()
if self.nprocs == 1:
import tqdm
for ind, doc in tqdm.tqdm(enumerate(cursor)):
if cursor[ind][self.default_key] is not None:
cursor[ind][self.default_key].calculate()
else:
pool = mp.Pool(processes=self.nprocs)
fprint_cursor = []
# for large cursors, set chunk to at most 16
# for smaller cursors, tend to use chunksize 1 for improved load balancing
chunksize = min(max(1, int(0.25 * len(compute_list) / self.nprocs)), 16)
results = pool.map_async(
functools.partial(_calc_fprint_pool_wrapper, key=self.default_key),
compute_list,
callback=fprint_cursor.extend,
error_callback=print,
chunksize=chunksize,
)
pool.close()
width = len(str(len(required_inds)))
total = len(required_inds)
while not results.ready():
sys.stdout.write(
"{done:{width}d} / {total:{width}d} {percentage:3d}%\r".format(
width=width,
done=total - results._number_left,
total=total,
percentage=int(100 * (total - results._number_left) / total),
)
)
sys.stdout.flush()
time.sleep(1)
if len(fprint_cursor) != len(required_inds):
raise RuntimeError(
"There was an error calculating the desired Fingerprint"
)
fprint_ind = 0
for ind, doc in enumerate(cursor):
if ind in required_inds:
if isinstance(cursor[ind], Crystal):
cursor[ind]._data.pop(self.default_key, None)
cursor[ind][self.default_key] = fprint_cursor[fprint_ind][
self.default_key
]
fprint_ind += 1
elapsed = time.time() - start
if debug:
pool.close()
print("Compute time: {:.4f} s".format(elapsed))
print("Work complete!")
def _calc_fprint_pool_wrapper(doc, key=None):
"""Evaluate Fingerprint of a structure where a lazy init of the
doc's Fingerprint object has already been made.
Parameters:
doc (dict): matador structures with empty PDF
"""
doc[key].calculate()
return {key: doc[key]}