From 8aecce25578d160f29815c3125d1083e53e13781 Mon Sep 17 00:00:00 2001 From: Brendan McAndrew Date: Thu, 25 Jul 2024 15:21:06 -0400 Subject: [PATCH 1/3] Add example of custom StacIO for Azure Blob storage --- docs/concepts.rst | 149 +++++++++++++++++++++++++++++++++------------- 1 file changed, 107 insertions(+), 42 deletions(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index 8a87eb168..dc324e5ba 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -318,49 +318,114 @@ argument of most object-specific I/O methods. You can also use :meth:`pystac.StacIO.set_default` in your client's ``__init__.py`` file to make this sub-class the default :class:`pystac.StacIO` implementation throughout the library. -For example, this code will allow +For example, the following code examples will allow for reading from AWS's S3 cloud object storage using `boto3 -`__: - -.. code-block:: python - - from urllib.parse import urlparse - import boto3 - from pystac import Link - from pystac.stac_io import DefaultStacIO, StacIO - from typing import Union, Any - - class CustomStacIO(DefaultStacIO): - def __init__(self): - self.s3 = boto3.resource("s3") - super().__init__() - - def read_text( - self, source: Union[str, Link], *args: Any, **kwargs: Any - ) -> str: - parsed = urlparse(source) - if parsed.scheme == "s3": - bucket = parsed.netloc - key = parsed.path[1:] - - obj = self.s3.Object(bucket, key) - return obj.get()["Body"].read().decode("utf-8") - else: - return super().read_text(source, *args, **kwargs) - - def write_text( - self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any - ) -> None: - parsed = urlparse(dest) - if parsed.scheme == "s3": - bucket = parsed.netloc - key = parsed.path[1:] - self.s3.Object(bucket, key).put(Body=txt, ContentEncoding="utf-8") - else: - super().write_text(dest, txt, *args, **kwargs) - - StacIO.set_default(CustomStacIO) - +`__ +or Azure Blob Storage using the `Azure SDK for Python +`__: + +.. tab-set:: + .. tab-item:: AWS S3 + + .. code-block:: python + + from urllib.parse import urlparse + import boto3 + from pystac import Link + from pystac.stac_io import DefaultStacIO, StacIO + from typing import Union, Any + + class CustomStacIO(DefaultStacIO): + def __init__(self): + self.s3 = boto3.resource("s3") + super().__init__() + + def read_text( + self, source: Union[str, Link], *args: Any, **kwargs: Any + ) -> str: + parsed = urlparse(source) + if parsed.scheme == "s3": + bucket = parsed.netloc + key = parsed.path[1:] + + obj = self.s3.Object(bucket, key) + return obj.get()["Body"].read().decode("utf-8") + else: + return super().read_text(source, *args, **kwargs) + + def write_text( + self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any + ) -> None: + parsed = urlparse(dest) + if parsed.scheme == "s3": + bucket = parsed.netloc + key = parsed.path[1:] + self.s3.Object(bucket, key).put(Body=txt, ContentEncoding="utf-8") + else: + super().write_text(dest, txt, *args, **kwargs) + + StacIO.set_default(CustomStacIO) + + .. tab-item:: Azure Blob Storage + + .. code-block:: python + + import os + from pathlib import PurePosixPath + from typing import Any, Tuple, Union + from urllib.parse import urlparse + + from azure.storage.blob import BlobClient, ContentSettings + from pystac import Link + from pystac.stac_io import DefaultStacIO, StacIO + + class BlobStacIO(DefaultStacIO): + """A custom StacIO class for reading and writing STAC objects + from/to Azure Blob storage. + """ + + def _parse_blob_url(self, url: str) -> Tuple[str, str]: + path = PurePosixPath(urlparse(url).path) + container = path.parts[1] + blob = "/".join(path.parts[2:]) + return container, blob + + def _get_blob_client(self, container: str, blob: str) -> BlobClient: + return BlobClient.from_connection_string( + os.environ["AZURE_STORAGE_CONNECTION_STRING"], + container_name=container, + blob_name=blob, + ) + + def read_text(self, source: Union[str, Link], *args: Any, **kwargs: Any) -> str: + if isinstance(source, Link): + source = source.href + if source.startswith("https"): + container, blob = self._parse_blob_url(source) + blob_client = self._get_blob_client(container, blob) + obj = blob_client.download_blob().readall().decode() + return obj + else: + return super().read_text(source, *args, **kwargs) + + def write_text( + self, dest: Union[str, Link], txt: str, *args: Any, **kwargs: Any + ) -> None: + """Write STAC Objects to Blob storage. Note: overwrites by default.""" + if isinstance(dest, Link): + dest = dest.href + if dest.startswith("https"): + container, blob = self._parse_blob_url(dest) + blob_client = self._get_blob_client(container, blob) + blob_client.upload_blob( + txt, + overwrite=True, + content_settings=ContentSettings(content_type="application/json"), + ) + else: + super().write_text(dest, txt, *args, **kwargs) + + StacIO.set_default(BlobStacIO) If you only need to customize read operations you can inherit from :class:`~pystac.stac_io.DefaultStacIO` and only overwrite the read method. For example, From 845c93626e9922f37274784de0c73426b7d41b01 Mon Sep 17 00:00:00 2001 From: Brendan McAndrew Date: Thu, 25 Jul 2024 15:29:54 -0400 Subject: [PATCH 2/3] Add #1372 to changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f10412165..9079fd897 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Allow object ID as input for getting APILayoutStrategy hrefs and add `items`, `collections`, `search`, `conformance`, `service_desc` and `service_doc` href methods ([#1335](https://github.com/stac-utils/pystac/pull/1335)) - Updated classification extension to v2.0.0 ([#1359](https://github.com/stac-utils/pystac/pull/1359)) - Update docstring of `name` argument to `Classification.apply` and `Classification.create` to agree with extension specification ([#1356](https://github.com/stac-utils/pystac/pull/1356)) +- Add example of custom `StacIO` for Azure Blob Storage to docs ([#1372](https://github.com/stac-utils/pystac/pull/1372)) ### Fixed From b1452ccf67564a3da8280a700d71e2bc250b07b2 Mon Sep 17 00:00:00 2001 From: Brendan McAndrew Date: Mon, 29 Jul 2024 19:41:19 -0400 Subject: [PATCH 3/3] Move implicit vars to class attributes, improve Blob URI handling --- docs/concepts.rst | 121 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 101 insertions(+), 20 deletions(-) diff --git a/docs/concepts.rst b/docs/concepts.rst index dc324e5ba..6c26c6dec 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -371,38 +371,108 @@ or Azure Blob Storage using the `Azure SDK for Python .. code-block:: python import os - from pathlib import PurePosixPath - from typing import Any, Tuple, Union + import re + from typing import Any, Dict, Optional, Tuple, Union from urllib.parse import urlparse + from azure.core.credentials import ( + AzureNamedKeyCredential, + AzureSasCredential, + TokenCredential, + ) from azure.storage.blob import BlobClient, ContentSettings from pystac import Link - from pystac.stac_io import DefaultStacIO, StacIO + from pystac.stac_io import DefaultStacIO + + BLOB_HTTPS_URI_PATTERN = r"https:\/\/(.+?)\.blob\.core\.windows\.net" + + AzureCredentialType = Union[ + str, + Dict[str, str], + AzureNamedKeyCredential, + AzureSasCredential, + TokenCredential, + ] + class BlobStacIO(DefaultStacIO): """A custom StacIO class for reading and writing STAC objects from/to Azure Blob storage. """ - def _parse_blob_url(self, url: str) -> Tuple[str, str]: - path = PurePosixPath(urlparse(url).path) - container = path.parts[1] - blob = "/".join(path.parts[2:]) + conn_str: Optional[str] = os.getenv("AZURE_STORAGE_CONNECTION_STRING") + account_url: Optional[str] = None + credential: Optional[AzureCredentialType] = None + overwrite: bool = True + + def _is_blob_uri(self, href: str) -> bool: + """Check if href matches Blob URI pattern.""" + if re.search( + re.compile(BLOB_HTTPS_URI_PATTERN), href + ) is not None or href.startswith("abfs://"): + return True + else: + return False + + def _parse_blob_uri(self, uri: str) -> Tuple[str, str]: + """Parse the container and blob name from a Blob URI. + + Parameters + ---------- + uri + An Azure Blob URI. + + Returns + ------- + The container and blob names. + """ + if uri.startswith("abfs://"): + path = uri.replace("abfs://", "/") + else: + path = urlparse(uri).path + + parts = path.split("/") + container = parts[1] + blob = "/".join(parts[2:]) return container, blob - def _get_blob_client(self, container: str, blob: str) -> BlobClient: - return BlobClient.from_connection_string( - os.environ["AZURE_STORAGE_CONNECTION_STRING"], - container_name=container, - blob_name=blob, - ) + def _get_blob_client(self, uri: str) -> BlobClient: + """Instantiate a `BlobClient` given a container and blob. + + Parameters + ---------- + uri + An Azure Blob URI. + + Returns + ------- + A `BlobClient` for interacting with `blob` in `container`. + """ + container, blob = self._parse_blob_uri(uri) + + if self.conn_str: + return BlobClient.from_connection_string( + self.conn_str, + container_name=container, + blob_name=blob, + ) + elif self.account_url: + return BlobClient( + account_url=self.account_url, + container_name=container, + blob_name=blob, + credential=self.credential, + ) + else: + raise ValueError( + "Must set conn_str or account_url (and credential if required)" + ) def read_text(self, source: Union[str, Link], *args: Any, **kwargs: Any) -> str: if isinstance(source, Link): source = source.href - if source.startswith("https"): - container, blob = self._parse_blob_url(source) - blob_client = self._get_blob_client(container, blob) + if self._is_blob_uri(source): + blob_client = self._get_blob_client(source) obj = blob_client.download_blob().readall().decode() return obj else: @@ -414,17 +484,28 @@ or Azure Blob Storage using the `Azure SDK for Python """Write STAC Objects to Blob storage. Note: overwrites by default.""" if isinstance(dest, Link): dest = dest.href - if dest.startswith("https"): - container, blob = self._parse_blob_url(dest) - blob_client = self._get_blob_client(container, blob) + if self._is_blob_uri(dest): + blob_client = self._get_blob_client(dest) blob_client.upload_blob( txt, - overwrite=True, + overwrite=self.overwrite, content_settings=ContentSettings(content_type="application/json"), ) else: super().write_text(dest, txt, *args, **kwargs) + + # set Blob storage connection string + BlobStacIO.conn_str = "my-storage-connection-string" + + # OR set Blob account URL, credential + BlobStacIO.account_url = "https://myblobstorageaccount.blob.core.windows.net" + BlobStacIO.credential = AzureSasCredential("my-sas-token") + + # modify overwrite behavior + BlobStacIO.overwrite = False + + # set BlobStacIO as default StacIO StacIO.set_default(BlobStacIO) If you only need to customize read operations you can inherit from