Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add example of custom StacIO for Azure Blob storage #1372

Merged
merged 4 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- Allow object ID as input for getting APILayoutStrategy hrefs and add `items`, `collections`, `search`, `conformance`, `service_desc` and `service_doc` href methods ([#1335](https://github.com/stac-utils/pystac/pull/1335))
- Updated classification extension to v2.0.0 ([#1359](https://github.com/stac-utils/pystac/pull/1359))
- Update docstring of `name` argument to `Classification.apply` and `Classification.create` to agree with extension specification ([#1356](https://github.com/stac-utils/pystac/pull/1356))
- Add example of custom `StacIO` for Azure Blob Storage to docs ([#1372](https://github.com/stac-utils/pystac/pull/1372))

### Fixed

Expand Down
230 changes: 188 additions & 42 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -318,49 +318,195 @@ argument of most object-specific I/O methods. You can also use
:meth:`pystac.StacIO.set_default` in your client's ``__init__.py`` file to make this
sub-class the default :class:`pystac.StacIO` implementation throughout the library.

For example, this code will allow
For example, the following code examples will allow
for reading from AWS's S3 cloud object storage using `boto3
<https://boto3.amazonaws.com/v1/documentation/api/latest/index.html>`__:

.. code-block:: python

from urllib.parse import urlparse
import boto3
from pystac import Link
from pystac.stac_io import DefaultStacIO, StacIO
from typing import Union, Any

class CustomStacIO(DefaultStacIO):
def __init__(self):
self.s3 = boto3.resource("s3")
super().__init__()

def read_text(
self, source: Union[str, Link], *args: Any, **kwargs: Any
) -> str:
parsed = urlparse(source)
if parsed.scheme == "s3":
bucket = parsed.netloc
key = parsed.path[1:]

obj = self.s3.Object(bucket, key)
return obj.get()["Body"].read().decode("utf-8")
else:
return super().read_text(source, *args, **kwargs)

def write_text(
self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any
) -> None:
parsed = urlparse(dest)
if parsed.scheme == "s3":
bucket = parsed.netloc
key = parsed.path[1:]
self.s3.Object(bucket, key).put(Body=txt, ContentEncoding="utf-8")
else:
super().write_text(dest, txt, *args, **kwargs)

StacIO.set_default(CustomStacIO)

<https://boto3.amazonaws.com/v1/documentation/api/latest/index.html>`__
or Azure Blob Storage using the `Azure SDK for Python
<https://learn.microsoft.com/en-us/python/api/overview/azure/storage-blob-readme?view=azure-python>`__:

.. tab-set::
.. tab-item:: AWS S3

.. code-block:: python

from urllib.parse import urlparse
import boto3
from pystac import Link
from pystac.stac_io import DefaultStacIO, StacIO
from typing import Union, Any

class CustomStacIO(DefaultStacIO):
def __init__(self):
self.s3 = boto3.resource("s3")
super().__init__()

def read_text(
self, source: Union[str, Link], *args: Any, **kwargs: Any
) -> str:
parsed = urlparse(source)
if parsed.scheme == "s3":
bucket = parsed.netloc
key = parsed.path[1:]

obj = self.s3.Object(bucket, key)
return obj.get()["Body"].read().decode("utf-8")
else:
return super().read_text(source, *args, **kwargs)

def write_text(
self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any
) -> None:
parsed = urlparse(dest)
if parsed.scheme == "s3":
bucket = parsed.netloc
key = parsed.path[1:]
self.s3.Object(bucket, key).put(Body=txt, ContentEncoding="utf-8")
else:
super().write_text(dest, txt, *args, **kwargs)

StacIO.set_default(CustomStacIO)

.. tab-item:: Azure Blob Storage

.. code-block:: python

import os
import re
from typing import Any, Dict, Optional, Tuple, Union
from urllib.parse import urlparse

from azure.core.credentials import (
AzureNamedKeyCredential,
AzureSasCredential,
TokenCredential,
)
from azure.storage.blob import BlobClient, ContentSettings
from pystac import Link
from pystac.stac_io import DefaultStacIO

BLOB_HTTPS_URI_PATTERN = r"https:\/\/(.+?)\.blob\.core\.windows\.net"

AzureCredentialType = Union[
str,
Dict[str, str],
AzureNamedKeyCredential,
AzureSasCredential,
TokenCredential,
]


class BlobStacIO(DefaultStacIO):
"""A custom StacIO class for reading and writing STAC objects
from/to Azure Blob storage.
"""

conn_str: Optional[str] = os.getenv("AZURE_STORAGE_CONNECTION_STRING")
account_url: Optional[str] = None
credential: Optional[AzureCredentialType] = None
overwrite: bool = True

def _is_blob_uri(self, href: str) -> bool:
"""Check if href matches Blob URI pattern."""
if re.search(
re.compile(BLOB_HTTPS_URI_PATTERN), href
) is not None or href.startswith("abfs://"):
return True
else:
return False

def _parse_blob_uri(self, uri: str) -> Tuple[str, str]:
"""Parse the container and blob name from a Blob URI.

Parameters
----------
uri
An Azure Blob URI.

Returns
-------
The container and blob names.
"""
if uri.startswith("abfs://"):
path = uri.replace("abfs://", "/")
else:
path = urlparse(uri).path

parts = path.split("/")
container = parts[1]
blob = "/".join(parts[2:])
return container, blob

def _get_blob_client(self, uri: str) -> BlobClient:
"""Instantiate a `BlobClient` given a container and blob.

Parameters
----------
uri
An Azure Blob URI.

Returns
-------
A `BlobClient` for interacting with `blob` in `container`.
"""
container, blob = self._parse_blob_uri(uri)

if self.conn_str:
return BlobClient.from_connection_string(
self.conn_str,
container_name=container,
blob_name=blob,
)
elif self.account_url:
return BlobClient(
account_url=self.account_url,
container_name=container,
blob_name=blob,
credential=self.credential,
)
else:
raise ValueError(
"Must set conn_str or account_url (and credential if required)"
)

def read_text(self, source: Union[str, Link], *args: Any, **kwargs: Any) -> str:
if isinstance(source, Link):
source = source.href
if self._is_blob_uri(source):
blob_client = self._get_blob_client(source)
obj = blob_client.download_blob().readall().decode()
return obj
else:
return super().read_text(source, *args, **kwargs)

def write_text(
self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any
) -> None:
"""Write STAC Objects to Blob storage. Note: overwrites by default."""
if isinstance(dest, Link):
dest = dest.href
if self._is_blob_uri(dest):
blob_client = self._get_blob_client(dest)
blob_client.upload_blob(
txt,
overwrite=self.overwrite,
content_settings=ContentSettings(content_type="application/json"),
)
else:
super().write_text(dest, txt, *args, **kwargs)


# set Blob storage connection string
BlobStacIO.conn_str = "my-storage-connection-string"

# OR set Blob account URL, credential
BlobStacIO.account_url = "https://myblobstorageaccount.blob.core.windows.net"
BlobStacIO.credential = AzureSasCredential("my-sas-token")

# modify overwrite behavior
BlobStacIO.overwrite = False

# set BlobStacIO as default StacIO
StacIO.set_default(BlobStacIO)

If you only need to customize read operations you can inherit from
:class:`~pystac.stac_io.DefaultStacIO` and only overwrite the read method. For example,
Expand Down