Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example of custom StacIO for Azure Blob storage #1372

Merged
merged 4 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Allow object ID as input for getting APILayoutStrategy hrefs and add `items`, `collections`, `search`, `conformance`, `service_desc` and `service_doc` href methods ([#1335](https://github.com/stac-utils/pystac/pull/1335))
- Updated classification extension to v2.0.0 ([#1359](https://github.com/stac-utils/pystac/pull/1359))
- Update docstring of `name` argument to `Classification.apply` and `Classification.create` to agree with extension specification ([#1356](https://github.com/stac-utils/pystac/pull/1356))
- Add example of custom `StacIO` for Azure Blob Storage to docs ([#1372](https://github.com/stac-utils/pystac/pull/1372))

### Fixed

Expand Down
149 changes: 107 additions & 42 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -318,49 +318,114 @@ argument of most object-specific I/O methods. You can also use
:meth:`pystac.StacIO.set_default` in your client's ``__init__.py`` file to make this
sub-class the default :class:`pystac.StacIO` implementation throughout the library.

For example, this code will allow
For example, the following code examples will allow
for reading from AWS's S3 cloud object storage using `boto3
<https://boto3.amazonaws.com/v1/documentation/api/latest/index.html>`__:

.. code-block:: python

from urllib.parse import urlparse
import boto3
from pystac import Link
from pystac.stac_io import DefaultStacIO, StacIO
from typing import Union, Any

class CustomStacIO(DefaultStacIO):
def __init__(self):
self.s3 = boto3.resource("s3")
super().__init__()

def read_text(
self, source: Union[str, Link], *args: Any, **kwargs: Any
) -> str:
parsed = urlparse(source)
if parsed.scheme == "s3":
bucket = parsed.netloc
key = parsed.path[1:]

obj = self.s3.Object(bucket, key)
return obj.get()["Body"].read().decode("utf-8")
else:
return super().read_text(source, *args, **kwargs)

def write_text(
self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any
) -> None:
parsed = urlparse(dest)
if parsed.scheme == "s3":
bucket = parsed.netloc
key = parsed.path[1:]
self.s3.Object(bucket, key).put(Body=txt, ContentEncoding="utf-8")
else:
super().write_text(dest, txt, *args, **kwargs)

StacIO.set_default(CustomStacIO)

<https://boto3.amazonaws.com/v1/documentation/api/latest/index.html>`__
or Azure Blob Storage using the `Azure SDK for Python
<https://learn.microsoft.com/en-us/python/api/overview/azure/storage-blob-readme?view=azure-python>`__:

.. tab-set::
.. tab-item:: AWS S3

.. code-block:: python

from urllib.parse import urlparse
import boto3
from pystac import Link
from pystac.stac_io import DefaultStacIO, StacIO
from typing import Union, Any

class CustomStacIO(DefaultStacIO):
def __init__(self):
self.s3 = boto3.resource("s3")
super().__init__()

def read_text(
self, source: Union[str, Link], *args: Any, **kwargs: Any
) -> str:
parsed = urlparse(source)
if parsed.scheme == "s3":
bucket = parsed.netloc
key = parsed.path[1:]

obj = self.s3.Object(bucket, key)
return obj.get()["Body"].read().decode("utf-8")
else:
return super().read_text(source, *args, **kwargs)

def write_text(
self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any
) -> None:
parsed = urlparse(dest)
if parsed.scheme == "s3":
bucket = parsed.netloc
key = parsed.path[1:]
self.s3.Object(bucket, key).put(Body=txt, ContentEncoding="utf-8")
else:
super().write_text(dest, txt, *args, **kwargs)

StacIO.set_default(CustomStacIO)

.. tab-item:: Azure Blob Storage

.. code-block:: python

import os
from pathlib import PurePosixPath
from typing import Any, Tuple, Union
from urllib.parse import urlparse

from azure.storage.blob import BlobClient, ContentSettings
from pystac import Link
from pystac.stac_io import DefaultStacIO, StacIO

class BlobStacIO(DefaultStacIO):
"""A custom StacIO class for reading and writing STAC objects
from/to Azure Blob storage.
"""

def _parse_blob_url(self, url: str) -> Tuple[str, str]:
path = PurePosixPath(urlparse(url).path)
container = path.parts[1]
blob = "/".join(path.parts[2:])
return container, blob

def _get_blob_client(self, container: str, blob: str) -> BlobClient:
return BlobClient.from_connection_string(
os.environ["AZURE_STORAGE_CONNECTION_STRING"],
gadomski marked this conversation as resolved.
Show resolved Hide resolved
container_name=container,
blob_name=blob,
)

def read_text(self, source: Union[str, Link], *args: Any, **kwargs: Any) -> str:
if isinstance(source, Link):
source = source.href
if source.startswith("https"):
gadomski marked this conversation as resolved.
Show resolved Hide resolved
container, blob = self._parse_blob_url(source)
blob_client = self._get_blob_client(container, blob)
obj = blob_client.download_blob().readall().decode()
return obj
else:
return super().read_text(source, *args, **kwargs)

def write_text(
self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any
) -> None:
"""Write STAC Objects to Blob storage. Note: overwrites by default."""
if isinstance(dest, Link):
dest = dest.href
if dest.startswith("https"):
container, blob = self._parse_blob_url(dest)
blob_client = self._get_blob_client(container, blob)
blob_client.upload_blob(
txt,
overwrite=True,
gadomski marked this conversation as resolved.
Show resolved Hide resolved
content_settings=ContentSettings(content_type="application/json"),
)
else:
super().write_text(dest, txt, *args, **kwargs)

StacIO.set_default(BlobStacIO)

If you only need to customize read operations you can inherit from
:class:`~pystac.stac_io.DefaultStacIO` and only overwrite the read method. For example,
Expand Down