Using Remote Files Locally

This tutorial demonstrates reading and writing files to remote archives using on-disk I/O operations.

To demonstrate this, we make use of the xarray module, which cannot read from a streaming object.

Set up the workspace

>>> from datafs import DataAPI
>>> from datafs.managers.manager_mongo import MongoDBManager
>>> from fs.s3fs import S3FS

Initialize the API

>>> api = DataAPI(
...      username='My Name',
...      contact = 'my.email@example.com')
>>>
>>> manager = MongoDBManager(
...     database_name = 'MyDatabase',
...     table_name = 'DataFiles')
>>>
>>> manager.create_archive_table('DataFiles', raise_on_err=False)
>>>
>>> api.attach_manager(manager)

Attach a remote service

In this example we’ll use a remote file system, in this case AWS S3. This filesystem returns streaming objects returned by boto or request calls.

>>> s3 = S3FS(
...     'test-bucket',
...     aws_access_key='MY_KEY',
...     aws_secret_key='MY_SECRET_KEY')
>>>
>>> api.attach_authority('aws', s3)
>>>
>>> var = api.create(
...     'streaming_archive',
...     metadata = dict(description = 'My test data archive'))
>>>

Create sample data

Create a sample dataset (from the xarray docs ):

>>> import xarray as xr
>>> import numpy as np
>>> import pandas as pd
>>>
>>> np.random.seed(123)
>>>
>>> times = pd.date_range('2000-01-01', '2001-12-31', name='time')
>>> annual_cycle = np.sin(2 * np.pi * (times.dayofyear / 365.25 - 0.28))
>>>
>>> base = 10 + 15 * np.array(annual_cycle).reshape(-1, 1)
>>> tmin_values = base + 3 * np.random.randn(annual_cycle.size, 3)
>>> tmax_values = base + 10 + 3 * np.random.randn(annual_cycle.size, 3)
>>>
>>> ds = xr.Dataset({'tmin': (('time', 'location'), tmin_values),
...                  'tmax': (('time', 'location'), tmax_values)},
...                 {'time': times, 'location': ['IA', 'IN', 'IL']})
>>>
>>> ds.attrs['version'] = 'version 1'

Upload the dataset to the archive

>>> with var.get_local_path() as f:
...     ds.to_netcdf(f)
...

Read and write to disk

NetCDF files cannot be read from a streaming object:

>>> with var.open() as f:
...     print(type(f))
...
<type '_io.TextIOWrapper'>
>>> with var.open() as f:      
...     with xr.open_dataset(f) as ds:
...         print(ds)
Traceback (most recent call last):
...
UnicodeDecodeError: 'utf8' codec can't decode byte 0x89 in position 0: \
invalid start byte

Instead, we can get a local path to open:

>>> with var.get_local_path() as f:
...     with xr.open_dataset(f) as ds:
...         print(ds) 
...
<xarray.Dataset>
Dimensions:   (location: 3, time: 731)
Coordinates:
  * location  (location) |S2 'IA' 'IN' 'IL'
  * time      (time) datetime64[ns] 2000-01-01 2000-01-02 2000-01-03 ...
Data variables:
    tmax      (time, location) float64 12.98 3.31 6.779 0.4479 ...
    tmin      (time, location) float64 -8.037 -1.788 -3.932 ...
Attributes:
    version:  version 1

We can update file in the same way:

>>> with var.get_local_path() as f:
...     with xr.open_dataset(f) as ds:
...
...         # Load the dataset fully into memory and then close the file
...
...         dsmem = ds.load()
...         ds.close()
...
...     # Update the version and save the file
...
...     dsmem.attrs['version'] = 'version 2'
...     dsmem.to_netcdf(f)
...

Now let’s open the file and see if our change was saved:

>>> # Acquire the file from the archive and print the version
... with var.get_local_path() as f:
...     with xr.open_dataset(f) as ds:
...         print(ds) 
...
<xarray.Dataset>
Dimensions:   (location: 3, time: 731)
Coordinates:
  * location  (location) |S2 'IA' 'IN' 'IL'
  * time      (time) datetime64[ns] 2000-01-01 2000-01-02 2000-01-03 ...
Data variables:
    tmax      (time, location) float64 12.98 3.31 6.779 0.4479 ...
    tmin      (time, location) float64 -8.037 -1.788 -3.932 ...
Attributes:
    version:  version 2

Cleaning up

>>> var.delete()
>>> api.manager.delete_table('DataFiles')