Skip to content

Commit

Permalink
add ionbeam datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshawkes committed Mar 5, 2024
1 parent b6d8d2f commit 9cc6a04
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 0 deletions.
1 change: 1 addition & 0 deletions polytope_server/common/datasource/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def dispatch(self, request, input_data) -> bool:
"webmars": "WebMARSDataSource",
"polytope": "PolytopeDataSource",
"federated": "FederatedDataSource",
"ionbeam": "IonBeamDataSource",
"echo": "EchoDataSource",
"dummy": "DummyDataSource",
"raise": "RaiseDataSource",
Expand Down
138 changes: 138 additions & 0 deletions polytope_server/common/datasource/ionbeam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#
# Copyright 2022 European Centre for Medium-Range Weather Forecasts (ECMWF)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation nor
# does it submit to any jurisdiction.
#
import yaml
import logging
import requests
from urllib.parse import urljoin

from . import datasource
from requests import Request
from dataclasses import dataclass
from typing import Dict

@dataclass
class IonBeamAPI:
endpoint : str

def __post_init__(self):
assert not self.endpoint.endswith("/")
self.session = requests.Session()

def get(self, path : str, **kwargs) -> requests.Response:
return self.session.get(f"{self.endpoint}/{path}", stream=True, **kwargs)

def get_bytes(self, path : str, **kwargs) -> requests.Response:
kwargs["headers"] = {
**kwargs.get("headers", {}),
'Accept': 'application/octet-stream'
}
return self.get(path, **kwargs)

def get_json(self, path, **kwargs):
return self.get(path, **kwargs).json()

def list(self, request : Dict[str, str] = {}):
return self.get_json("list", params = request)

def head(self, request : Dict[str, str] = {}):
return self.get_json("head", params = request)

def retrieve(self, request : Dict[str, str]) -> requests.Response:
return self.get_bytes("retrieve", params = request)

def archive(self, request, file) -> requests.Response:
files = {'file': file}
return self.session.post(f"{self.endpoint}/archive", files=files, params = request)


class IonBeamDataSource(datasource.DataSource):
"""
Retrieve data from the IonBeam REST backend that lives here: https://github.com/ecmwf/IonBeam-Deployment/tree/main/docker/rest_api
"""
read_chunk_size = 2 * 1024 * 1024

def __init__(self, config):
"""Instantiate a datasource for the IonBeam REST API"""
self.type = config["type"]
assert self.type == "ionbeam"

self.match_rules = config.get("match", {})
endpoint = config.get("api_endpoint", None)
if not endpoint:
raise Exception("IonBeamDataSource requires an api_endpoint in configuration")
self.api = IonBeamAPI(endpoint)

def mime_type(self) -> str:
"""Returns the mimetype of the result"""
return "application/octet-stream"

def get_type(self):
return self.type

def archive(self, request : Request):
"""Archive data, returns nothing but updates datasource state"""
r = yaml.safe_load(request.user_request)
keys = r["keys"]

with open(r["path"], 'rb') as f:
return self.api.archive(keys, f)

def list(self, request : Request) -> list:
request_keys = yaml.safe_load(request.user_request)
return self.api.list(request_keys)

def retrieve(self, request : Request) -> bool:
"""Retrieve data, returns nothing but updates datasource state"""

request_keys = yaml.safe_load(request.user_request)
self.response = self.api.retrieve(request_keys)
return True

def result(self, request : Request):
"""Returns a generator for the resultant data"""
return self.response.iter_content(chunk_size = self.read_chunk_size, decode_unicode=False)

def destroy(self, request) -> None:
"""A hook to do essential freeing of resources, called upon success or failure"""

# requests response objects with stream=True can remain open indefinitely if not read to completion
# or closed explicitly
if self.response:
self.response.close()

def match(self, request: Request) -> None:
"""Checks if the request matches the datasource, raises on failure"""

r = yaml.safe_load(request.user_request) or {}

for k, v in self.match_rules.items():
# Check that all required keys exist
if k not in r:
raise Exception("Request does not contain expected key {}".format(k))
# Process date rules
if k == "date":
# self.date_check(r["date"], v)
continue
# ... and check the value of other keys
v = [v] if isinstance(v, str) else v
if r[k] not in v:
raise Exception("got {} : {}, but expected one of {}".format(k, r[k], v))

0 comments on commit 9cc6a04

Please sign in to comment.