from __future__ import absolute_import
from datafs.services.service import DataService
from datafs.core.data_archive import DataArchive
from datafs._compat import open_filelike
import hashlib
import fnmatch
import re
import fs.path
from fs.osfs import OSFS
try:
PermissionError
except NameError:
[docs] class PermissionError(NameError):
pass
_VALID_AUTHORITY_PATTERNS = r'[\w\-]+'
[docs]class DataAPI(object):
DefaultAuthorityName = None
_ArchiveConstructor = DataArchive
def __init__(self, default_versions=None, **kwargs):
if default_versions is None:
default_versions = {}
self.user_config = kwargs
self._manager = None
self._cache = None
self._authorities = {}
self.default_versions = default_versions
self._authorities_locked = False
self._manager_locked = False
[docs] def attach_authority(self, service_name, service):
if self._authorities_locked:
raise PermissionError('Authorities locked')
self._validate_authority_name(service_name)
self._authorities[service_name] = DataService(service)
[docs] def lock_authorities(self):
self._authorities_locked = True
[docs] def lock_manager(self):
self._manager_locked = True
[docs] def attach_cache(self, service):
if service in self._authorities.values():
raise ValueError('Cannot attach an authority as a cache')
else:
self._cache = DataService(service)
@property
def manager(self):
return self._manager
# set cache attr
@property
def cache(self):
return self._cache
# get the default athority setting
@property
def default_authority_name(self):
if self.DefaultAuthorityName is not None:
return self.DefaultAuthorityName
if len(self._authorities) == 0:
raise ValueError(
'No authorities found. See attach_authority.')
if len(self._authorities) > 1:
raise ValueError(
'Authority ambiguous. Set authority or DefaultAuthorityName.')
# get the zeroth key
return list(self._authorities.keys())[0]
# Do we want to provide a method for setting authorities
@property
def default_authority(self):
return self._authorities[self.default_authority_name]
@property
def default_versions(self):
return self._default_versions
@default_versions.setter
def default_versions(self, default_versions):
'''
Set archive default read versions
Parameters
----------
default_versions: dict
Dictionary of archive_name, version pairs. On read/download,
archives in this dictionary will download the specified version
by default. Before assignment, archive_names are checked and
normalized.
'''
default_versions = {
self._normalize_archive_name(arch)[1]: v
for arch, v in default_versions.items()}
self._default_versions = default_versions
[docs] def attach_manager(self, manager):
if self._manager_locked:
raise PermissionError('Manager locked')
self._manager = manager
[docs] def create(
self,
archive_name,
authority_name=None,
versioned=True,
raise_on_err=True,
metadata=None,
tags=None,
helper=False):
'''
Create a DataFS archive
Parameters
----------
archive_name: str
Name of the archive
authority_name: str
Name of the data service to use as the archive's data authority
versioned: bool
If true, store all versions with explicit version numbers (defualt)
raise_on_err: bool
Raise an error if the archive already exists (default True)
metadata: dict
Dictionary of additional archive metadata
helper: bool
If true, interactively prompt for required metadata (default False)
'''
authority_name, archive_name = self._normalize_archive_name(
archive_name, authority_name=authority_name)
if authority_name is None:
authority_name = self.default_authority_name
self._validate_archive_name(archive_name)
if metadata is None:
metadata = {}
res = self.manager.create_archive(
archive_name,
authority_name,
archive_path=archive_name,
versioned=versioned,
raise_on_err=raise_on_err,
metadata=metadata,
user_config=self.user_config,
tags=tags,
helper=helper)
return self._ArchiveConstructor(
api=self,
**res)
[docs] def get_archive(self, archive_name, default_version=None):
'''
Retrieve a data archive
Parameters
----------
archive_name: str
Name of the archive to retrieve
default_version: version
str or :py:class:`~distutils.StrictVersion` giving the default
version number to be used on read operations
Returns
-------
archive: object
New :py:class:`~datafs.core.data_archive.DataArchive` object
Raises
------
KeyError:
A KeyError is raised when the ``archive_name`` is not found
'''
auth, archive_name = self._normalize_archive_name(archive_name)
res = self.manager.get_archive(archive_name)
if default_version is None:
default_version = self._default_versions.get(archive_name, None)
if (auth is not None) and (auth != res['authority_name']):
raise ValueError(
'Archive "{}" not found on {}.'.format(archive_name, auth) +
' Did you mean "{}://{}"?'.format(
res['authority_name'], archive_name))
return self._ArchiveConstructor(
api=self,
default_version=default_version,
**res)
[docs] def batch_get_archive(self, archive_names, default_versions=None):
'''
Batch version of :py:meth:`~DataAPI.get_archive`
Parameters
----------
archive_names: list
Iterable of archive names to retrieve
default_versions: str, object, or dict
Default versions to assign to each returned archive. May be a dict
with archive names as keys and versions as values, or may be a
version, in which case the same version is used for all archives.
Versions must be a strict version number string, a
:py:class:`~distutils.version.StrictVersion`, or a
:py:class:`~datafs.core.versions.BumpableVersion` object.
Returns
-------
archives: list
List of :py:class:`~datafs.core.data_archive.DataArchive` objects.
If an archive is not found, it is omitted (``batch_get_archive``
does not raise a ``KeyError`` on invalid archive names).
'''
# toss prefixes and normalize names
archive_names = map(
lambda arch: self._normalize_archive_name(arch)[1],
archive_names)
responses = self.manager.batch_get_archive(archive_names)
archives = {}
if default_versions is None:
default_versions = {}
for res in responses:
res['archive_name'] = self._normalize_archive_name(
res['archive_name'])
archive_name = res['archive_name']
if hasattr(default_versions, 'get'):
# Get version number from default_versions or
# self._default_versions if key not present.
default_version = default_versions.get(
archive_name,
self._default_versions.get(archive_name, None))
else:
default_version = default_versions
archive = self._ArchiveConstructor(
api=self,
default_version=default_version,
**res)
archives[archive_name] = archive
return archives
[docs] def listdir(self, location, authority_name=None):
'''
List archive path components at a given location
.. Note ::
When using listdir on versioned archives, listdir will provide the
version numbers when a full archive path is supplied as the
location argument. This is because DataFS stores the archive path
as a directory and the versions as the actual files when versioning
is on.
Parameters
----------
location: str
Path of the "directory" to search
`location` can be a path relative to the authority root (e.g
`/MyFiles/Data`) or can include authority as a protocol (e.g.
`my_auth://MyFiles/Data`). If the authority is specified as a
protocol, the `authority_name` argument is ignored.
authority_name: str
Name of the authority to search (optional)
If no authority is specified, the default authority is used (if
only one authority is attached or if
:py:attr:`DefaultAuthorityName` is assigned).
Returns
-------
list
Archive path components that exist at the given "directory"
location on the specified authority
Raises
------
ValueError
A ValueError is raised if the authority is ambiguous or invalid
'''
authority_name, location = self._normalize_archive_name(
location,
authority_name=authority_name)
if authority_name is None:
authority_name = self.default_authority_name
return self._authorities[authority_name].fs.listdir(location)
[docs] def filter(self, pattern=None, engine='path', prefix=None):
'''
Performs a filtered search on entire universe of archives
according to pattern or prefix.
Parameters
----------
prefix: str
string matching beginning characters of the archive or set of
archives you are filtering. Note that authority prefixes, e.g.
``local://my/archive.txt`` are not supported in prefix searches.
pattern: str
string matching the characters within the archive or set of
archives you are filtering on. Note that authority prefixes, e.g.
``local://my/archive.txt`` are not supported in pattern searches.
engine: str
string of value 'str', 'path', or 'regex'. That indicates the
type of pattern you are filtering on
Returns
-------
generator
'''
if pattern is not None:
pattern = fs.path.relpath(pattern)
if prefix is not None:
prefix = fs.path.relpath(prefix)
archives = self.manager.search(tuple([]), begins_with=prefix)
if not pattern:
for archive in archives:
yield archive
if engine == 'str':
for arch in archives:
if pattern in arch:
yield arch
elif engine == 'path':
# Change to generator version of fnmatch.filter
for arch in archives:
if fnmatch.fnmatch(arch, pattern):
yield arch
elif engine == 'regex':
for arch in archives:
if re.search(pattern, arch):
yield arch
else:
raise ValueError(
'search engine "{}" not recognized. '.format(engine) +
'choose "str", "fn", or "regex"')
[docs] def search(self, *query, **kwargs):
'''
Searches based on tags specified by users
Parameters
---------
query: str
tags to search on. If multiple terms, provided in comma delimited
string format
prefix: str
start of archive name. Providing a start string improves search
speed.
'''
prefix = kwargs.get('prefix')
if prefix is not None:
prefix = fs.path.relpath(prefix)
return self.manager.search(query, begins_with=prefix)
def _validate_archive_name(self, archive_name):
'''
Utility function for creating and validating archive names
Parameters
----------
archive_name: str
Name of the archive from which to create a service path
Returns
-------
archive_path: str
Internal path used by services to reference archive data
'''
archive_name = fs.path.normpath(archive_name)
patterns = self.manager.required_archive_patterns
for pattern in patterns:
if not re.search(pattern, archive_name):
raise ValueError(
"archive name does not match pattern '{}'".format(pattern))
[docs] def delete_archive(self, archive_name):
'''
Delete an archive
Parameters
----------
archive_name: str
Name of the archive to delete
'''
archive = self.get_archive(archive_name)
archive.delete()
[docs] @staticmethod
def hash_file(f):
'''
Utility function for hashing file contents
Overload this function to change the file equality checking algorithm
Parameters
----------
f: file-like
File-like object or file path from which to compute checksum value
Returns
-------
checksum: dict
dictionary with {'algorithm': 'md5', 'checksum': hexdigest}
'''
md5 = hashlib.md5()
with open_filelike(f, 'rb') as f_obj:
for chunk in iter(lambda: f_obj.read(128 * md5.block_size), b''):
md5.update(chunk)
return {'algorithm': 'md5', 'checksum': md5.hexdigest()}
[docs] def close(self):
for service in self._authorities:
self._authorities[service].fs.close()
if self.cache:
self.cache.fs.close()
@staticmethod
def _validate_authority_name(authority_name):
matched = re.match(
r'^{}$'.format(_VALID_AUTHORITY_PATTERNS),
authority_name)
if matched:
return
raise ValueError('"{}" not a valid authority name'.format(
authority_name))
@staticmethod
def _split_authority(archive_name):
matched = re.match(
r'^((?P<auth>{})\:\/\/)?(?P<archive>.*)$'.format(
_VALID_AUTHORITY_PATTERNS),
archive_name)
return matched.group('auth'), matched.group('archive')
def _normalize_archive_name(self, archive_name, authority_name=None):
full_archive_arg = archive_name
str_authority_name, archive_name = self._split_authority(archive_name)
if ((str_authority_name is not None)
and (authority_name is not None)
and (str_authority_name != authority_name)):
raise ValueError(
'authority name "{}" not found in archive: "{}"'.format(
authority_name, full_archive_arg))
relpath = fs.path.relpath(fs.path.normpath(archive_name))
if str_authority_name is None:
str_authority_name = authority_name
if str_authority_name is None:
try:
str_authority_name = self.default_authority_name
except ValueError:
pass
if str_authority_name is not None:
if str_authority_name not in self._authorities:
raise ValueError('Authority "{}" not found'.format(
str_authority_name))
self._authorities[str_authority_name].fs.validatepath(relpath)
# additional check - not all fs.validatepath functions do anything:
OSFS('').isvalidpath(relpath)
return str_authority_name, relpath