# encoding: utf-8
""" This file implements lattice-site level measures of similarity, using
Voronoi decompositions.
Sketch of methodology:
* For a list of structures, take each in turn, compute the Voronoi substructure
and create normalised padded arrays so that they can be compared.
* For each structure, find the *unique* sites, to some definition of unique, initially
just a simple np.isclose() on the site arrays.
* Now do an all-to-all comparison of the unique sites in each structure, yielding
an overall list of unique substructures. This step should have a dial that can be turned
such that all sites fold onto each other, or all sites become distinct, i.e. sensitivity
vs specificity.
* [OPTIONAL] The remaining unique sites across the whole list can now be clustered, if desired,
using e.g. hierarchical clustering.
* Every site in every structure can now be assigned to one of the unique sites above, or
one of the unique clusters.
* Finally, the structures themselves can be clustered by the sites that are present, if desired.
"""
from collections import defaultdict
import numpy as np
from matador.utils.cell_utils import frac2cart
[docs]def are_sites_the_same(site_A, site_B, rtol=1e-2, atol=1e-2):
""" Simple check for uniqueness of sites based on their
Voronoi substructure.
Input:
| site_A : dict, with elements as keys, containing numpy arrays
of normalised solid angles.
| site_B : dict, as A.
Args:
| rtol : relative tolerance in solid angle for np.allclose,
| atol : absolute tolerance in solid angle for np.allclose.
Returns:
| True if sites have approximately the same values for each species,
else False.
"""
# if a key is missing from A or B, check that its not some extraneous distant atom
for key in site_B:
if key not in site_A:
site_A_missing_key = np.zeros_like(site_B[key])
if not (np.allclose(site_A_missing_key, site_B[key], atol=atol, rtol=rtol) or np.allclose(site_B[key], site_A_missing_key, atol=atol, rtol=rtol)):
return False
# now check remaining keys
for key in site_A:
if key not in site_B:
site_B_missing_key = np.zeros_like(site_A[key])
if not (np.allclose(site_B_missing_key, site_A[key], atol=atol, rtol=rtol) or np.allclose(site_A[key], site_B_missing_key, atol=atol, rtol=rtol)):
return False
else:
if len(site_A[key]) != len(site_B[key]):
if len(site_A[key]) > len(site_B[key]):
padded_B = np.append(site_B[key], np.zeros((len(site_A[key])-len(site_B[key]))))
if not (np.allclose(site_A[key], padded_B, atol=atol, rtol=rtol) or np.allclose(site_A[key], padded_B, atol=atol, rtol=rtol)):
return False
else:
padded_A = np.append(site_A[key], np.zeros((len(site_B[key])-len(site_A[key]))))
assert np.allclose(padded_A, site_B[key], atol=atol, rtol=rtol) is np.allclose(site_B[key], padded_A, atol=atol, rtol=rtol)
if not (np.allclose(padded_A, site_B[key], atol=atol, rtol=rtol) or np.allclose(site_B[key], padded_A, atol=atol, rtol=rtol)):
return False
else:
if not (np.allclose(site_A[key], site_B[key], atol=atol, rtol=rtol) or np.allclose(site_B[key], site_A[key], rtol=rtol, atol=atol)):
return False
return True
[docs]def collect_unique_sites(cursor):
""" Collect unique substrucs from each structure into a single dict.
Input:
| cursor: list(dict), list of structures with pre-computed unique sites.
Returns:
| unique_environments: dict(list), dict with element symbol keys containing
a list of each unique substructure.
"""
unique_environments = defaultdict(list)
for doc in cursor:
for elem in doc['unique_substrucs']:
for substruc in doc['unique_substrucs'][elem]:
unique_environments[elem].append(substruc)
return unique_environments
[docs]def get_max_coordination_of_elem(single_elem_environments):
""" For a given set of elemental environments, find the greatest
number of each element in a site.
Input:
| single_elem_environments: list(dict), list of voronoi substructures,
Returns:
| max_num_elem: dict(int), greatest number of each element in a substruc.
"""
max_num_elem = {}
for site_ind, site in enumerate(single_elem_environments):
for elem, value in site:
elem_len = len([val for val in site if val[0] == elem])
if elem not in max_num_elem:
max_num_elem[elem] = 0
if elem_len > max_num_elem[elem]:
max_num_elem[elem] = elem_len
return max_num_elem
[docs]def create_site_array(unique_environments, max_num_elems=None, elems=None, normalise=False):
""" Create padded numpy arrays based on unique environments provided.
Input:
| unique_environments: dict(list), dict with element keys full of
local substructures.
Args:
| max_num_elems : dict(dict(int)), dict with element keys, containing
sub-dict with element keys that contain the largest
number of each type of element contributing to a site.
If None, this will be calculated based on the input
environments. The site array is padded to this size.
| elems : list(str), custom list of elements to loop over.
| normalise : bool, whether to normalise site array to 1.
Returns:
| site_array : dict(np.ndarray), dict with element keys full of
normalised site arrays.
| max_num_elems : dict(dict(int)), dict with element keys containing the
highest number of atoms of an element contributing
to a particular site, e.g. {'P': {'K': 10}}.
"""
site_array = defaultdict(list)
if elems is None:
elems = [elem for elem in unique_environments]
if max_num_elems is None:
max_num_elems = dict()
for elem in elems:
max_num_elems[elem] = get_max_coordination_of_elem(unique_environments[elem])
for elem in elems:
for site_ind, site in enumerate(unique_environments[elem]):
site_array[elem].append(dict())
total_angle = 0
for _elem in elems:
if _elem not in max_num_elems[elem]:
max_num_elems[elem][_elem] = 0
site_array[elem][-1][_elem] = np.zeros((max_num_elems[elem][_elem]))
count = 0
for species, angle in site:
if species == _elem:
site_array[elem][-1][_elem][count] = angle
count += 1
total_angle += angle
if normalise:
for _elem in elems:
site_array[elem][-1][_elem] /= total_angle
return site_array, max_num_elems
[docs]def set_substruc_dict(doc):
""" Compute voronoi substructure and collect into dict. Sets the
'voronoi_substruc' and 'substruc_dict' entries in the doc.
Input:
| doc: dict, structure to compute substructure of.
"""
if 'voronoi_substruc' not in doc:
from matador.plugins.voronoi_interface.voronoi_interface import get_voronoi_substructure
doc['voronoi_substruc'] = get_voronoi_substructure(doc)
voronoi_substruc_dict = dict()
elems = set(doc['atom_types'])
for elem in elems:
voronoi_substruc_dict[elem] = [substruc[1] for substruc in doc['voronoi_substruc'] if substruc[0] == elem]
doc['substruc_dict'] = voronoi_substruc_dict
[docs]def set_site_array(doc, normalise=False):
""" Set the 'site_array' entry in the chosen document. Creates
a dict of numpy arrays of normalised solid angles with element keys
from the import calculated substructure.
Input:
| doc: dict, matador doc containing substruc_dict.
Args:
| normalise: bool, whether to normalise site_arrays to total angle.
"""
if 'substruc_dict' not in doc:
set_substruc_dict(doc)
doc['site_array'], doc['max_num_elems'] = create_site_array(doc['substruc_dict'], normalise=normalise)
[docs]def get_unique_sites(doc, atol=1e-2, rtol=1e-2):
if 'substruc_dict' not in doc:
set_substruc_dict(doc)
elems = set(doc['atom_types'])
substruc_dict = doc['substruc_dict']
if 'site_array' not in doc:
set_site_array(doc)
site_array = doc['site_array']
unique_sites = dict()
degeneracies = dict()
similar_sites = dict()
for elem in elems:
unique_sites[elem] = set()
similar_sites[elem] = []
for i in range(len(site_array[elem])):
for j in range(i+1, len(site_array[elem])):
same = are_sites_the_same(site_array[elem][i], site_array[elem][j], atol=atol, rtol=rtol)
if same:
added = False
for ind, site in enumerate(similar_sites[elem]):
if i in site:
similar_sites[elem][ind].add(j)
added = True
break
if j in site:
similar_sites[elem][ind].add(i)
added = True
break
if not added:
similar_sites[elem].append(set([i, j]))
if not any([i in _set for _set in similar_sites[elem]]):
similar_sites[elem].append(set([i]))
# for env in site_array[elem]:
# print(env)
from copy import deepcopy
temp_similar_sites = deepcopy(similar_sites[elem])
valid = False
_iter = 0
while not valid:
scrubbed = False
_iter += 1
if _iter > 100:
raise RuntimeError
for index in range(len(site_array[elem])):
index_sites = []
for ind, site in enumerate(temp_similar_sites):
if index in site:
index_sites.append(ind)
# if index is in multiple sites, combine all sites into the first
if len(index_sites) > 1:
# construct mean of each set of sites, and if they are too far apart, get worried
site_mean = []
for j in range(len(index_sites)):
site_mean.append(defaultdict(list))
for _elem in site_array[elem][i]:
site_mean[-1][_elem].append(np.mean(np.asarray([site_array[elem][i][_elem] for i in temp_similar_sites[index_sites[j]]]), axis=0))
assert np.shape(site_mean[-1][_elem][-1]) == np.shape(site_array[elem][0][_elem])
for j in range(len(index_sites)-1, 0, -1):
# let means deviate by twice the amount
if not are_sites_the_same(site_mean[j], site_mean[0], atol=5*atol, rtol=5*rtol):
raise RuntimeError('Numerical instability: site groups with distinct means were clustered.')
[temp_similar_sites[index_sites[0]].add(i) for i in list(temp_similar_sites[index_sites[j]])]
del temp_similar_sites[index_sites[j]]
scrubbed = True
break
elif len(index_sites) == 0:
raise RuntimeError
if not scrubbed:
valid = True
similar_sites[elem] = temp_similar_sites
unique_sites[elem] = [next(iter(site)) for site in similar_sites[elem]]
degeneracies[elem] = [len(_set) for _set in similar_sites[elem]]
doc['unique_site_inds'] = unique_sites
doc['unique_substrucs'] = dict()
doc['unique_site_array'] = dict()
doc['unique_site_stddev'] = dict()
doc['similar_sites'] = similar_sites
for elem in elems:
doc['unique_substrucs'][elem] = [substruc_dict[elem][ind] for ind in unique_sites[elem]]
doc['unique_site_array'][elem] = []
doc['unique_site_stddev'][elem] = []
for site in similar_sites[elem]:
doc['unique_site_array'][elem].append(dict())
doc['unique_site_stddev'][elem].append(dict())
for _elem in site_array[elem][0]:
doc['unique_site_array'][elem][-1][_elem] = np.mean(np.asarray([site_array[elem][i][_elem] for i in site]), axis=0)
doc['unique_site_stddev'][elem][-1][_elem] = np.std(np.asarray([site_array[elem][i][_elem] for i in site]), axis=0)
doc['site_degeneracies'] = degeneracies
[docs]def compare_docs(docA, docB, elems):
matching = 0
print('COMPARING', ' '.join(docA['text_id']), 'vs', ' '.join(docB['text_id']))
for elem in elems:
if elem not in docA['unique_substrucs'] or elem not in docB['unique_substrucs']:
print('Missing elements.')
return False
for i, site in enumerate(docA['unique_substrucs'][elem]):
for other_site in docB['unique_substrucs'][elem]:
if are_sites_the_same(site, other_site, ['K', 'P']):
matching += docA['site_degeneracies'][elem][i]
print(matching, '/', sum(docA['site_degeneracies'][elem]))
if matching / sum(docA['site_degeneracies'][elem]) > 0.5:
return True
[docs]def cluster(X, hull, max_of_elems, elems=None, method='KMeans', elem='P', quantile=None, n_clusters=None, cmap='tab10'):
if method == 'KMeans':
from sklearn.cluster import KMeans
inertia = []
elem = 'P'
est = KMeans(n_clusters=n_clusters, n_jobs=-2, precompute_distances=True)
est.fit(X)
inertia.append(est.inertia_)
labels = est.labels_
# means = est.cluster_centers_
print('Number of clusters: {}'.format(n_clusters))
elif method == 'MeanShift':
from sklearn.cluster import MeanShift, estimate_bandwidth
bandwidth = estimate_bandwidth(X, quantile=quantile if quantile is not None else 0.2)
est = MeanShift(bandwidth=bandwidth, cluster_all=False)
est.fit(X)
labels = est.labels_
# means = est.cluster_centers_
n_clusters = len(set(labels))
print('Number of estimate clusters:', n_clusters)
"""
means_of_point_class = np.asarray([means[label] for label in labels])
labels = labels[np.argsort(means_of_point_class[:, 0])]
X = X[np.argsort(means_of_point_class[:, 0])]
temp_labels = np.zeros_like(labels)
sorted_labels = np.zeros_like(labels)
for i in range(len(labels)-1):
if labels[i] != labels[i+1]:
temp_labels[i+1] += 1
current_group = 0
for i in range(len(temp_labels)):
if temp_labels[i] == 1:
current_group += 1
sorted_labels[i] = current_group
"""
# fig, axarr = plt.subplots(2, 2, figsize=(10,10))
# sns.set_palette(sns.color_palette(cmap, n_colors=n_clusters))
# colours = [sns.palettes.color_palette(cmap, n_colors=n_clusters)[label] for label in labels]
#
# sns.stripplot(x=labels, y=plot_X[:, 0], ax=axarr[0, 0], jitter=0.1)
# axarr[0, 0].set_ylabel('Fraction of solid angle seen as P')
#
# axarr[0, 1].scatter(plot_X[:, 1], plot_X[:, 3], c=colours, lw=0, alpha=0.3)
# axarr[0, 1].set_xlabel('Fraction of solid angle seen as P')
# axarr[0, 1].set_ylabel('Number of contrib P atoms')
#
# axarr[1, 0].scatter(plot_X[:, 0], plot_X[:, 2], c=colours, lw=0, alpha=0.3)
# axarr[1, 0].set_xlabel('Fraction of solid angle seen as K')
# axarr[1, 0].set_ylabel('Number of contrib K atoms')
#
# sns.stripplot(x=labels, y=plot_X[:, 3], ax=axarr[1, 1], jitter=0.1)
# axarr[1, 1].set_ylabel('Number of contrib Pf atoms')
# fig.suptitle('Number of clusters: {}'.format(n_clusters))
for doc in hull.hull_cursor:
site_classes = []
if elem not in doc['unique_substrucs']:
doc['class'] = -1
doc['class_err'] = 0
doc['class_sites'] = [-1]
continue
for ind, substruc in enumerate(doc['unique_substrucs'][elem]):
site_array = np.zeros((1, int(np.sum(max_of_elems[elem]))))
elem_counters = np.zeros((len(elems)), dtype=int)
for elem_ind, _elem in enumerate(elems):
if elem_ind != 0:
elem_counters[elem_ind] += max_of_elems[elem][elem_ind-1]
for elem_ind, _elem in enumerate(elems):
for atom in substruc:
if atom[0] == _elem:
site_array[0][elem_counters[elem_ind]] = atom[1]
elem_counters[elem_ind] += 1
site_classes.append(est.predict(site_array))
site_classes = [val[0] for val in site_classes]
doc['class'] = np.median(site_classes)
doc['class_sites'] = [val for val in site_classes]
doc['class_err'] = (len(set(doc['class_sites']))-1)/n_clusters
class_vector = np.zeros((n_clusters))
for site in site_classes:
class_vector[int(site)] += 1
doc['class_vector'] = class_vector
return est
[docs]def plot_class_hull(hull, n_clusters, plot_class=None):
from random import sample
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(12, 20))
ax = fig.add_subplot(211)
ax2 = fig.add_subplot(212)
ax2 = hull.plot_2d_hull(show=False, ax=ax2)
colours = plt.cm.Dark2(np.linspace(0, 1, n_clusters*10))
class_concs = defaultdict(list)
for doc in hull.hull_cursor:
class_concs[int(doc['class'])].append(doc['concentration'])
# ax.errorbar(doc['concentration'], doc['class'], yerr=doc['class_err'], fmt='o', c=colours[int(10*doc['class'])], alpha=0.01, zorder=100)
for _class in class_concs:
ax.errorbar(np.mean(class_concs[_class]), _class, xerr=np.std(class_concs[_class]), fmt='o', c=colours[int(10*_class)], lw=2, zorder=1000)
bounds = [0.32, 0.57, 0.74]
for bound in bounds:
ax.axvline(bound, ls='--', lw=1)
ax.set_ylim(-1.5, n_clusters)
ax.set_xlim(-0.1, 1.1)
for doc in hull.hull_cursor:
ax2.scatter(doc['concentration'], doc['formation_enthalpy_per_atom'], c='#eeeeee', s=60, zorder=9999, lw=0)
for doc in sample(hull.hull_cursor, len(hull.hull_cursor)):
if plot_class is not None:
if doc['class'] not in plot_class:
continue
else:
ax2.scatter(doc['concentration'], doc['formation_enthalpy_per_atom'], c=colours[int(10*(doc['class']))], s=50, alpha=max(1-2*doc['class_err'], 0.1), zorder=99999, lw=0)
else:
if doc['hull_distance'] <= 1e-12:
ax2.scatter(doc['concentration'], doc['formation_enthalpy_per_atom'], c=colours[int(10*(doc['class']))], s=75, zorder=99999999, lw=1)
else:
ax2.scatter(doc['concentration'], doc['formation_enthalpy_per_atom'], c=colours[int(10*(doc['class']))], s=50, alpha=max(1-2*doc['class_err'], 0.1), zorder=99999, lw=0)
[docs]def find_site_index(doc, elem, elem_ind):
elem_count = 0
for idx, atom in enumerate(doc['atom_types']):
if atom == elem:
if elem_count == elem_ind:
print('Found substruc at {}'.format(elem_count))
return elem_count
elem_count += 1
return False
[docs]def viz_site(doc, targ_substruc, elem, rmax=6):
if 'positions_abs' not in doc:
doc['positions_abs'] = frac2cart(doc['lattice_cart'], doc['positions_frac'])
for ind, substruc in enumerate(doc['substruc_dict'][elem]):
if substruc == targ_substruc:
elem_ind = ind
elem_count = find_site_index(doc, elem, elem_ind)
if elem_count is not False:
break
targ_site = doc['positions_frac'][elem_count]
targ_pos = doc['positions_abs'][elem_count]
print(elem_count)
from itertools import product
from collections import defaultdict
doc['lattice_cart'] = np.asarray(doc['lattice_cart'])
neighbour_doc = defaultdict(list)
neighbour_doc['positions_abs'].append(targ_pos)
neighbour_doc['positions_frac'].append(targ_site)
neighbour_doc['atom_types'].append('U')
neighbour_doc['lattice_cart'] = doc['lattice_cart']
for j, pos in enumerate(doc['positions_abs']):
for prod in product(range(-1, 2), repeat=3):
trans = np.zeros((3))
for ind, multi in enumerate(prod):
trans += doc['lattice_cart'][ind] * multi
dist2 = 0
for i in range(3):
dist2 += (targ_pos[i]-(pos+trans)[i])**2
if dist2 < rmax**2:
neighbour_doc['positions_abs'].append(pos + trans)
neighbour_doc['positions_frac'].append((np.asarray(doc['positions_frac'][j]) + prod).tolist())
neighbour_doc['atom_types'].append(doc['atom_types'][j])
print(targ_site)
return elem_count, neighbour_doc