# coding: utf-8
# Distributed under the terms of the MIT License.
""" Some simple utilities for making DB connections. """
import pymongo as pm
from matador.config import load_custom_settings
[docs]def make_connection_to_collection(coll_names, check_collection=False, allow_changelog=False,
mongo_settings=None, override=False, import_mode=False, quiet=True, debug=False):
""" Connect to database of choice.
Parameters:
coll_names (str): name of collection.
Keyword Arguments:
check_collection (bool): check whether collections exist (forces connection)
allow_changelog (bool): allow queries to collections with names prefixed by __
mongo_settings (dict): dict containing mongo and related config
override (bool): don't ask for user input from stdin and assume all is well
quiet (bool): don't print very much.
Returns:
client (MongoClient): the connection to the database
db (Database): the database to query
collections (dict): Collection objects indexed by name
"""
if mongo_settings is None:
settings = load_custom_settings(no_quickstart=override)
else:
settings = mongo_settings
if not quiet:
print('Trying to connect to {host}:{port}/{db}'.format(**settings['mongo']))
client = pm.MongoClient(
host=settings['mongo']['host'],
port=settings['mongo']['port'],
connect=False,
maxIdleTimeMS=600000, # disconnect after 10 minutes idle
socketTimeoutMS=3600000, # give up on database after 1 hr without results
serverSelectionTimeoutMS=10000, # give up on server after 2 seconds without results
connectTimeoutMS=10000) # give up trying to connect to new database after 2 seconds
try:
database_names = client.list_database_names()
if not quiet:
print('Success!')
except pm.errors.ServerSelectionTimeoutError as exc:
print('{}: {}'.format(type(exc).__name__, exc))
raise SystemExit('Unable to connect to {host}:{port}/{db}, exiting...'.format(**settings['mongo']))
if settings['mongo']['db'] not in database_names:
if override:
response = 'y'
else:
response = input('Database {db} does not exist at {host}:{port}/{db}, '
'would you like to create it? (y/n) '
.format(**settings['mongo']))
if response.lower() != 'y':
raise SystemExit('Exiting...')
else:
print('Creating database {}'.format(settings['mongo']['db']))
db = client[settings['mongo']['db']]
possible_collections = [name for name in db.list_collection_names() if not name.startswith('__')]
collections = dict()
# allow lists of collections for backwards-compat, though normally
# we only want to connect to one at a time
if coll_names is not None:
if not isinstance(coll_names, list):
coll_names = [coll_names]
if len(coll_names) > 1:
raise NotImplementedError("Querying multiple collections is no longer supported.")
for collection in coll_names:
if not allow_changelog:
if collection.startswith('__'):
raise SystemExit('Queries to collections prefixed with __ are VERBOTEN!')
if collection not in possible_collections:
options = fuzzy_collname_match(collection, possible_collections)
if not options and check_collection:
client.close()
raise SystemExit('Collection {} not found!'.format(collection))
else:
print('Collection {} not found, did you mean one of these?'.format(collection))
for ind, value in enumerate(options[:10]):
print('({}):\t{}'.format(ind, value))
if check_collection:
try:
choice = int(input('Please enter your choice: '))
collection = options[choice]
except Exception:
raise SystemExit('Invalid choice. Exiting...')
elif import_mode:
if override:
choice = 'y'
else:
choice = input('Are you sure you want to make a new collection called {}? (y/n) '
.format(collection))
if choice.lower() != 'y' and choice.lower != 'yes':
try:
choice = int(input('Then please enter your choice from above: '))
collection = options[choice]
except Exception:
raise SystemExit('Invalid choice. Exiting...')
collections[collection] = db[collection]
else:
default_collection = settings['mongo']['default_collection']
if default_collection not in possible_collections:
if check_collection:
client.close()
raise SystemExit('Default collection {} not found!'.format(default_collection))
else:
print('Creating new collection {}...'.format(default_collection))
collections['repo'] = db[default_collection]
return client, db, collections
[docs]def fuzzy_collname_match(trial, targets):
""" Do a noddy fuzzy match for bits between punctuation, e.g.
matthews_cool_database will search for matthews, cool and database
in the known collection names.
Parameters:
trial (str): database search name.
targets (list): list of existing database names.
Returns:
list: list of roughly matching collection names ordered
by occurence of tokens.
"""
split_chars = set([char for char in trial if (not char.isalpha() and not char.isdigit())])
tokens = trial
for char in split_chars:
tokens = tokens.replace(char, ' ')
tokens = tokens.split()
options = {}
for token in tokens:
for option in targets:
if token in option:
if option not in options:
options[option] = 0
else:
options[option] += 1
options_list = [item[0] for item in list(sorted(options.items(), reverse=True, key=lambda item: item[1]))]
return options_list