Source code for datafs.managers.manager_mongo


from __future__ import absolute_import

from datafs.managers.manager import BaseDataManager

from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError


[docs]class MongoDBManager(BaseDataManager): ''' Parameters ---------- database_name : str Name of the database containing the DataFS tables table_name: str Name of the data archive table client_kwargs : dict Keyword arguments used in initializing a :py:class:`pymongo.MongoClient` object ''' def __init__(self, database_name, table_name, client_kwargs=None): super(MongoDBManager, self).__init__(table_name) if client_kwargs is None: client_kwargs = {} # setup MongoClient # Arguments can be passed to the client self._client_kwargs = client_kwargs self._client = MongoClient(**client_kwargs) self._database_name = database_name self._db = None self._coll = None self._spec_coll = None @property def config(self): config = { 'database_name': self._database_name, 'table_name': self._table_name, 'client_kwargs': self._client_kwargs } return config @property def database_name(self): return self._database_name @property def table_name(self): return self._table_name def _get_table_names(self): return self.db.collection_names(include_system_collections=False) def _create_archive_table(self, table_name): if table_name in self._get_table_names(): raise KeyError('Table "{}" already exists'.format(table_name)) self.db.create_collection(table_name) def _delete_table(self, table_name): if table_name not in self._get_table_names(): raise KeyError('Table "{}" not found'.format(table_name)) self.db.drop_collection(table_name) @property def collection(self): table_name = self.table_name if table_name not in self._get_table_names(): raise KeyError('Table "{}" not found'.format(table_name)) return self.db[table_name] @property def spec_collection(self): spec_table_name = self._spec_table_name if spec_table_name not in self._get_table_names(): raise KeyError('Table "{}" not found'.format(spec_table_name)) return self.db[spec_table_name] @property def db(self): if self._db is None: self._db = self._client[self.database_name] return self._db # Private methods (to be implemented!) def _update(self, archive_name, version_metadata): self.collection.update( {"_id": archive_name}, {"$push": {"version_history": version_metadata}}) def _update_metadata(self, archive_name, archive_metadata): for key, val in archive_metadata.items(): if val is None: self.collection.update( {"_id": archive_name}, {"$unset": {"archive_metadata.{}".format(key): ""}}) else: self.collection.update( {"_id": archive_name}, {"$set": {"archive_metadata.{}".format(key): val}}) def _update_spec_config(self, document_name, spec): self.spec_collection.update_many( {"_id": document_name}, {"$set": {'config': spec}}, upsert=True) def _create_archive( self, archive_name, metadata): try: self.collection.insert_one(metadata) except DuplicateKeyError: raise KeyError('Archive "{}" already exists'.format(archive_name)) def _create_spec_config(self, table_name, spec_documents): if self._spec_coll is None: self._spec_coll = self.db[table_name + '.spec'] self.spec_collection.insert_many(spec_documents) def _get_archive_listing(self, archive_name): ''' Return full document for ``{_id:'archive_name'}`` .. note:: MongoDB specific results - do not expose to user ''' res = self.collection.find_one({'_id': archive_name}) if res is None: raise KeyError return res def _batch_get_archive_listing(self, archive_names): ''' Batched version of :py:meth:`~MongoDBManager._get_archive_listing` Returns a list of full archive listings from an iterable of archive names .. note :: Invalid archive names will simply not be returned, so the response may not be the same length as the supplied `archive_names`. Parameters ---------- archive_names : list List of archive names Returns ------- archive_listings : list List of archive listings ''' res = self.collection.find({'_id': {'$in': list(archive_names)}}) if res is None: res = [] return res def _delete_archive_record(self, archive_name): return self.collection.remove({'_id': archive_name}) def _search(self, search_terms, begins_with=None): if len(search_terms) == 0: query = {} elif len(search_terms) == 1: query = {'tags': {'$in': [search_terms[0]]}} else: query = { '$and': [{'tags': {'$in': [tag]}} for tag in search_terms]} res = self.collection.find(query, {"_id": 1}) for r in res: if (not begins_with) or r['_id'].startswith(begins_with): yield r['_id'] def _set_tags(self, archive_name, updated_tag_list): self.collection.update( {"_id": archive_name}, {"$set": {"tags": updated_tag_list}}) def _get_spec_documents(self, table_name): return [item for item in self.spec_collection.find({})]