from __future__ import absolute_import
from datafs.core import data_file
from datafs.core.versions import BumpableVersion
from datafs._compat import string_types
from contextlib import contextmanager
from fs.osfs import OSFS
import fs.utils
import fs.path
import click
import os
import textwrap
import time
def _process_version(self, version):
if not self.versioned and version is None:
return None
elif not self.versioned and version is not None:
raise ValueError('Cannot specify version on an unversioned archive.')
elif version is None:
return self.get_default_version()
elif isinstance(version, BumpableVersion):
return version
elif isinstance(version, string_types) and version == 'latest':
return self.get_latest_version()
elif isinstance(version, string_types):
return BumpableVersion(version)
[docs]class DataArchive(object):
def __init__(
self,
api,
archive_name,
authority_name,
archive_path,
versioned=True,
default_version=None):
self.api = api
self.archive_name = archive_name
self._authority_name = authority_name
self._archive_path = archive_path
self._versioned = versioned
self._default_version = default_version
def __repr__(self):
return "<{} {}://{}>".format(self.__class__.__name__,
self.authority_name, self.archive_name)
def __eq__(self, other):
return (
isinstance(other, self.__class__) and
(self.archive_name == other.archive_name) and
(self.authority_name == other.authority_name) and
(self.archive_path == other.archive_path) and
(self.versioned == other.versioned))
@property
def versioned(self):
return self._versioned
[docs] def get_latest_version(self):
versions = self.get_versions()
if len(versions) == 0:
return None
else:
return max(versions)
[docs] def get_versions(self):
versions = self.get_history()
if len(versions) == 0:
return []
elif not self.versioned:
return [None]
else:
return sorted(map(BumpableVersion, set(
[v['version'] for v in versions])))
[docs] def get_default_version(self):
if not self.versioned:
return None
versions = self.get_versions()
if self._default_version is None or self._default_version == 'latest':
if len(versions) == 0:
return None
return max(versions)
matches = filter(
lambda v: v == self._default_version,
self.get_versions())
if len(matches) > 0:
return max(matches)
raise ValueError('Archive "{}" version {} not found'.format(
self.archive_name, self._default_version))
[docs] def get_version_path(self, version=None):
'''
Returns a storage path for the archive and version
If the archive is versioned, the version number is used as the file
path and the archive path is the directory. If not, the archive path is
used as the file path.
Parameters
----------
version : str or object
Version number to use as file name on versioned archives (default
latest unless ``default_version`` set)
Examples
--------
.. code-block:: python
>>> arch = DataArchive(None, 'arch', None, 'a1', versioned=False)
>>> print(arch.get_version_path())
a1
>>>
>>> ver = DataArchive(None, 'ver', None, 'a2', versioned=True)
>>> print(ver.get_version_path('0.0.0'))
a2/0.0
>>>
>>> print(ver.get_version_path('0.0.1a1'))
a2/0.0.1a1
>>>
>>> print(ver.get_version_path('latest')) # doctest: +ELLIPSIS
Traceback (most recent call last):
...
AttributeError: 'NoneType' object has no attribute 'manager'
'''
version = _process_version(self, version)
if self.versioned:
return fs.path.join(self.archive_path, str(version))
else:
return self.archive_path
@property
def authority_name(self):
return self._authority_name
@property
def authority(self):
return self.api._authorities[self.authority_name]
@property
def archive_path(self):
return self._archive_path
[docs] def get_history(self):
return self.api.manager.get_version_history(self.archive_name)
[docs] def get_latest_hash(self):
return self.api.manager.get_latest_hash(self.archive_name)
[docs] def get_version_hash(self, version=None):
version = _process_version(self, version)
if self.versioned:
if version is None:
return None
for ver in self.get_history():
if BumpableVersion(ver['version']) == version:
return ver['checksum']
raise ValueError(
'Version "{}" not found in archive history'.format(version))
else:
return self.get_latest_hash()
[docs] def update(
self,
filepath,
cache=False,
remove=False,
bumpversion=None,
prerelease=None,
dependencies=None,
metadata=None,
message=None):
'''
Enter a new version to a DataArchive
Parameters
----------
filepath : str
The path to the file on your local file system
cache : bool
Turn on caching for this archive if not already on before update
remove : bool
removes a file from your local directory
bumpversion : str
Version component to update on write if archive is versioned. Valid
bumpversion values are 'major', 'minor', and 'patch', representing
the three components of the strict version numbering system (e.g.
"1.2.3"). If bumpversion is None the version number is not updated
on write. Either bumpversion or prerelease (or both) must be a
non-None value. If the archive is not versioned, bumpversion is
ignored.
prerelease : str
Prerelease component of archive version to update on write if
archive is versioned. Valid prerelease values are 'alpha' and
'beta'. Either bumpversion or prerelease (or both) must be a
non-None value. If the archive is not versioned, prerelease is
ignored.
metadata : dict
Updates to archive metadata. Pass {key: None} to remove a key from
the archive's metadata.
'''
if metadata is None:
metadata = {}
latest_version = self.get_latest_version()
hashval = self.api.hash_file(filepath)
checksum = hashval['checksum']
algorithm = hashval['algorithm']
if checksum == self.get_latest_hash():
self.update_metadata(metadata)
if remove and os.path.isfile(filepath):
os.remove(filepath)
return
if self.versioned:
if latest_version is None:
latest_version = BumpableVersion()
next_version = latest_version.bump(
kind=bumpversion,
prerelease=prerelease,
inplace=False)
else:
next_version = None
next_path = self.get_version_path(next_version)
if cache:
self.cache(next_version)
if self.is_cached(next_version):
self.authority.upload(filepath, next_path)
self.api.cache.upload(filepath, next_path, remove=remove)
else:
self.authority.upload(filepath, next_path, remove=remove)
self._update_manager(
archive_metadata=metadata,
version_metadata=dict(
checksum=checksum,
algorithm=algorithm,
version=next_version,
dependencies=dependencies,
message=message))
def _get_default_dependencies(self):
'''
Get default dependencies for archive
Get default dependencies from requirements file or (if no requirements
file) from previous version
'''
# Get default dependencies from requirements file
default_dependencies = {
k: v for k,
v in self.api.default_versions.items() if k != self.archive_name}
# If no requirements file or is empty:
if len(default_dependencies) == 0:
# Retrieve dependencies from last archive record
history = self.get_history()
if len(history) > 0:
default_dependencies = history[-1].get('dependencies', {})
return default_dependencies
def _set_version_defaults(self, version_metadata):
version_metadata['user_config'] = self.api.user_config
if version_metadata.get('dependencies', None) is None:
version_metadata['dependencies'] = self._get_default_dependencies()
def _update_manager(self, archive_metadata=None, version_metadata=None):
if archive_metadata is None:
archive_metadata = {}
if version_metadata is None:
version_metadata = {}
self._set_version_defaults(version_metadata)
# update records in self.api.manager
self.api.manager.update(self.archive_name, version_metadata)
self.update_metadata(archive_metadata)
# File I/O methods
[docs] @contextmanager
def open(
self,
mode='r',
version=None,
bumpversion=None,
prerelease=None,
dependencies=None,
metadata=None,
message=None,
*args,
**kwargs):
'''
Opens a file for read/write
Parameters
----------
mode : str
Specifies the mode in which the file is opened (default 'r')
version : str
Version number of the file to open (default latest)
bumpversion : str
Version component to update on write if archive is versioned. Valid
bumpversion values are 'major', 'minor', and 'patch', representing
the three components of the strict version numbering system (e.g.
"1.2.3"). If bumpversion is None the version number is not updated
on write. Either bumpversion or prerelease (or both) must be a
non-None value. If the archive is not versioned, bumpversion is
ignored.
prerelease : str
Prerelease component of archive version to update on write if
archive is versioned. Valid prerelease values are 'alpha' and
'beta'. Either bumpversion or prerelease (or both) must be a
non-None value. If the archive is not versioned, prerelease is
ignored.
metadata : dict
Updates to archive metadata. Pass {key: None} to remove a key from
the archive's metadata.
args, kwargs sent to file system opener
'''
if metadata is None:
metadata = {}
latest_version = self.get_latest_version()
version = _process_version(self, version)
version_hash = self.get_version_hash(version)
if self.versioned:
if latest_version is None:
latest_version = BumpableVersion()
next_version = latest_version.bump(
kind=bumpversion,
prerelease=prerelease,
inplace=False)
msg = "Version must be bumped on write. " \
"Provide bumpversion and/or prerelease."
assert next_version > latest_version, msg
read_path = self.get_version_path(version)
write_path = self.get_version_path(next_version)
else:
read_path = self.archive_path
write_path = self.archive_path
next_version = None
# version_check returns true if fp's hash is current as of read
def version_check(chk):
return chk['checksum'] == version_hash
# Updater updates the manager with the latest version number
def updater(checksum, algorithm):
self._update_manager(
archive_metadata=metadata,
version_metadata=dict(
version=next_version,
dependencies=dependencies,
checksum=checksum,
algorithm=algorithm,
message=message))
opener = data_file.open_file(
self.authority,
self.api.cache,
updater,
version_check,
self.api.hash_file,
read_path,
write_path,
mode=mode,
*args,
**kwargs)
with opener as f:
yield f
[docs] @contextmanager
def get_local_path(
self,
version=None,
bumpversion=None,
prerelease=None,
dependencies=None,
metadata=None,
message=None):
'''
Returns a local path for read/write
Parameters
----------
version : str
Version number of the file to retrieve (default latest)
bumpversion : str
Version component to update on write if archive is versioned. Valid
bumpversion values are 'major', 'minor', and 'patch', representing
the three components of the strict version numbering system (e.g.
"1.2.3"). If bumpversion is None the version number is not updated
on write. Either bumpversion or prerelease (or both) must be a
non-None value. If the archive is not versioned, bumpversion is
ignored.
prerelease : str
Prerelease component of archive version to update on write if
archive is versioned. Valid prerelease values are 'alpha' and
'beta'. Either bumpversion or prerelease (or both) must be a
non-None value. If the archive is not versioned, prerelease is
ignored.
metadata : dict
Updates to archive metadata. Pass {key: None} to remove a key from
the archive's metadata.
'''
if metadata is None:
metadata = {}
latest_version = self.get_latest_version()
version = _process_version(self, version)
version_hash = self.get_version_hash(version)
if self.versioned:
if latest_version is None:
latest_version = BumpableVersion()
next_version = latest_version.bump(
kind=bumpversion,
prerelease=prerelease,
inplace=False)
msg = "Version must be bumped on write. " \
"Provide bumpversion and/or prerelease."
assert next_version > latest_version, msg
read_path = self.get_version_path(version)
write_path = self.get_version_path(next_version)
else:
read_path = self.archive_path
write_path = self.archive_path
next_version = None
# version_check returns true if fp's hash is current as of read
def version_check(chk):
return chk['checksum'] == version_hash
# Updater updates the manager with the latest version number
def updater(checksum, algorithm):
self._update_manager(
archive_metadata=metadata,
version_metadata=dict(
version=next_version,
dependencies=dependencies,
checksum=checksum,
algorithm=algorithm,
message=message))
path = data_file.get_local_path(
self.authority,
self.api.cache,
updater,
version_check,
self.api.hash_file,
read_path,
write_path)
with path as fp:
yield fp
[docs] def download(self, filepath, version=None):
'''
Downloads a file from authority to local path
1. First checks in cache to check if file is there and if it is, is it
up to date
2. If it is not up to date, it will download the file to cache
'''
version = _process_version(self, version)
dirname, filename = os.path.split(
os.path.abspath(os.path.expanduser(filepath)))
assert os.path.isdir(dirname), 'Directory not found: "{}"'.format(
dirname)
local = OSFS(dirname)
version_hash = self.get_version_hash(version)
# version_check returns true if fp's hash is current as of read
def version_check(chk):
return chk['checksum'] == version_hash
if os.path.exists(filepath):
if version_check(self.api.hash_file(filepath)):
return
read_path = self.get_version_path(version)
with data_file._choose_read_fs(
self.authority,
self.api.cache,
read_path,
version_check,
self.api.hash_file) as read_fs:
fs.utils.copyfile(
read_fs,
read_path,
local,
filename)
[docs] def log(self):
history = self.get_history()
outputs = []
for i, record in enumerate(history):
output = ''
record['timestamp'] = time.strftime(
'%a, %d %b %Y %H:%M:%S +0000',
time.strptime(record['updated'], '%Y%m%d-%H%M%S'))
checksum = (
' ({algorithm} {checksum})\nDate: {timestamp}'.format(
**record))
if self.versioned:
output = output + '\n' + (
click.style(
'version {}'.format(record['version']) + checksum,
fg='green'))
else:
output = output + '\n' + (
click.style(
'update {}'.format(len(history) - i) + checksum,
fg='green'))
for attr, val in sorted(record['user_config'].items()):
output = output + '\n' + (
'{:<10} {}'.format(attr+':', val))
if record.get('message', None) is not None:
wrapper = textwrap.TextWrapper(
initial_indent=' ',
subsequent_indent=' ',
width=66)
output = output + '\n\n'
output = output + '\n'.join(
wrapper.wrap(str(record['message'])))
outputs.append(output)
click.echo_via_pager('\n\n'.join(reversed(outputs)) + '\n')
[docs] def delete(self):
'''
Delete the archive
.. warning::
Deleting an archive will erase all data and metadata permanently.
For help setting user permissions, see
:ref:`Administrative Tools <admin>`
'''
versions = self.get_versions()
self.api.manager.delete_archive_record(self.archive_name)
for version in versions:
if self.authority.fs.exists(self.get_version_path(version)):
self.authority.fs.remove(self.get_version_path(version))
if self.api.cache:
if self.api.cache.fs.exists(self.get_version_path(version)):
self.api.cache.fs.remove(self.get_version_path(version))
if self.authority.fs.exists(self.archive_name):
self.authority.fs.removedir(self.archive_name)
if self.api.cache:
if self.api.cache.fs.exists(self.archive_name):
self.api.cache.fs.removedir(self.archive_name)
[docs] def isfile(self, version=None, *args, **kwargs):
'''
Check whether the path exists and is a file
'''
version = _process_version(self, version)
path = self.get_version_path(version)
self.authority.fs.isfile(path, *args, **kwargs)
[docs] def getinfo(self, version=None, *args, **kwargs):
'''
Return information about the path e.g. size, mtime
'''
version = _process_version(self, version)
path = self.get_version_path(version)
self.authority.fs.getinfo(path, *args, **kwargs)
[docs] def desc(self, version=None, *args, **kwargs):
'''
Return a short descriptive text regarding a path
'''
version = _process_version(self, version)
path = self.get_version_path(version)
self.authority.fs.desc(path, *args, **kwargs)
[docs] def exists(self, version=None, *args, **kwargs):
'''
Check whether a path exists as file or directory
'''
version = _process_version(self, version)
path = self.get_version_path(version)
self.authority.fs.exists(path, *args, **kwargs)
[docs] def is_cached(self, version=None):
'''
Set the cache property to start/stop file caching for this archive
'''
version = _process_version(self, version)
if self.api.cache and self.api.cache.fs.isfile(
self.get_version_path(version)):
return True
return False
[docs] def cache(self, version=None):
version = _process_version(self, version)
if not self.api.cache:
raise ValueError('No cache attached')
if not self.api.cache.fs.isfile(self.get_version_path(version)):
data_file._touch(self.api.cache.fs, self.get_version_path(version))
assert self.api.cache.fs.isfile(
self.get_version_path(version)), "Cache creation failed"
[docs] def remove_from_cache(self, version=None):
version = _process_version(self, version)
if self.api.cache.fs.isfile(self.get_version_path(version)):
self.api.cache.fs.remove(self.get_version_path(version))
[docs] def get_dependencies(self, version=None):
'''
Parameters
----------
version: str
string representing version number whose dependencies you are
looking up
'''
version = _process_version(self, version)
history = self.get_history()
for v in reversed(history):
if BumpableVersion(v['version']) == version:
return v['dependencies']
raise ValueError('Version {} not found'.format(version))
[docs] def set_dependencies(self, dependencies=None):
if dependencies is None:
dependencies = {}
history = self.get_history()
if len(history) == 0:
raise ValueError('Cannot set dependencies on an empty archive')
version_metadata = history[-1]
version_metadata['dependencies'] = dependencies
version_metadata['user_config'] = self.api.user_config
self.api.manager.update(self.archive_name, version_metadata)