Source code for globus_sdk.experimental.gcs_downloader

"""
The GCSDownloader provides HTTPS file download capabilities for Globus Connect Server.
"""

from __future__ import annotations

import logging
import sys
import types
import typing as t
import urllib.parse

import globus_sdk
import globus_sdk.transport
from globus_sdk._internal.type_definitions import Closable
from globus_sdk.experimental.gcs_collection_client import GCSCollectionClient
from globus_sdk.transport.default_retry_checks import DEFAULT_RETRY_CHECKS

if sys.version_info >= (3, 11):
    from typing import Self
else:
    from typing_extensions import Self

log = logging.getLogger(__name__)


[docs] class GCSDownloader: """ An object which manages connection and authentication state to enable HTTPS downloads from a specific Globus Connect Server collection. The initial request to read a file features support for determining authentication requirements dynamically, and subsequent requests will reuse that authentication data. Using a single :class:`GCSDownloader` to access distinct collections is not supported. A separate downloader should be used for each collection. Downloaders may be used as context managers, in which case they automatically call their ``close()`` method on exit: >>> with GCSDownloader(app) as downloader: >>> print(downloader.read_file(url)) :param app: The :class:`GlobusApp` used to authenticate calls to this server. :param gcs_client: The underlying client used for the file read request. Typically omitted. When not provided, one will be constructed on demand by the downloader. :param transfer_client: A client used when detecting collection information. Typically omitted. When not provided, one will be constructed on demand by the downloader. :param transport: A transport for the downloader, used for authentication sniffing operations. When a client is built by the downloader it will inherit this transport. """ def __init__( self, app: globus_sdk.GlobusApp, *, gcs_client: GCSCollectionClient | None = None, transfer_client: globus_sdk.TransferClient | None = None, transport: globus_sdk.transport.RequestsTransport | None = None, ) -> None: self.app = app self._resources_to_close: list[Closable] = [] if transport is not None: self.transport = transport else: self.transport = globus_sdk.transport.RequestsTransport() self._resources_to_close.append(self.transport) # the downloader will need a RetryConfig when it uses its own transport self._retry_config = globus_sdk.transport.RetryConfig() self._retry_config.checks.register_many_checks(DEFAULT_RETRY_CHECKS) self.gcs_client = gcs_client # set the transfer_client if provided self.transfer_client = transfer_client def __enter__(self) -> Self: return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None, ) -> None: self.close()
[docs] def close(self) -> None: """ Close all resources which are owned by this downloader. """ for resource in self._resources_to_close: log.debug( f"closing resource of type {type(resource).__name__} " f"for {type(self).__name__}" ) resource.close()
@t.overload def read_file(self, file_uri: str, *, as_text: t.Literal[True]) -> str: ... @t.overload def read_file(self, file_uri: str, *, as_text: t.Literal[False]) -> bytes: ... @t.overload def read_file(self, file_uri: str) -> str: ...
[docs] def read_file(self, file_uri: str, *, as_text: bool = True) -> str | bytes: """ Given a file URI on a GCS Collection, read the data. :param file_uri: The full URI of the file on the collection which is being downloaded. :param as_text: When ``True``, the file contents are decoded into a string. Set to ``False`` to retrieve data as bytes. .. caution:: The file read is done naively as a GET request. This may be unsuitable for very large files. """ # dynamically build a client if needed if self.gcs_client is None: self.gcs_client = self._get_client_from_uri(file_uri) self._resources_to_close.append(self.gcs_client) response = self.gcs_client.get(file_uri) if as_text: return response.text return response.binary_content
def _get_client_from_uri(self, file_uri: str) -> GCSCollectionClient: collection_id = self._sniff_collection_id(file_uri) collection_address = _get_collection_address(file_uri) client = self._gcs_client_constructor( collection_client_id=collection_id, collection_address=collection_address, ) self.app.add_scope_requirements( {client.scopes.resource_server: self._determine_required_scopes(client)} ) return client def _gcs_client_constructor( self, *, collection_client_id: str, collection_address: str ) -> GCSCollectionClient: return GCSCollectionClient( collection_client_id, collection_address, app=self.app, transport=self.transport, ) def _determine_required_scopes( self, client: GCSCollectionClient ) -> list[globus_sdk.Scope]: if self.transfer_client is None: self.transfer_client = globus_sdk.TransferClient( app=self.app, transport=self.transport ) self._resources_to_close.append(self.transfer_client) required_scopes: list[globus_sdk.Scope] = [client.scopes.https] if _uses_data_access(self.transfer_client, client.collection_id): required_scopes.append(client.scopes.data_access) return required_scopes def _sniff_collection_id(self, file_uri: str) -> str: response = self.transport.request( "GET", file_uri, caller_info=globus_sdk.transport.RequestCallerInfo( retry_config=self._retry_config ), allow_redirects=False, ) if "Location" not in response.headers: msg = ( f"Attempting to detect the collection ID for the file at '{file_uri}' " "failed. Did not receive a redirect with Location header on " "unauthenticated call." ) raise RuntimeError(msg) location_header = response.headers["Location"] parsed_location = urllib.parse.urlparse(location_header) parsed_location_qs = urllib.parse.parse_qs(parsed_location.query) if "client_id" not in parsed_location_qs: msg = ( f"Attempting to detect the collection ID for the file at '{file_uri}' " "failed. Location header did not encode a 'client_id'." ) raise RuntimeError(msg) client_ids = parsed_location_qs["client_id"] if len(client_ids) != 1: msg = ( f"Attempting to detect the collection ID for the file at '{file_uri}' " "failed. Multiple 'client_id' params were present." ) raise RuntimeError(msg) return client_ids[0]
def _get_collection_address(file_uri: str) -> str: parsed = urllib.parse.urlparse(file_uri) return f"{parsed.scheme}://{parsed.netloc}" def _uses_data_access( transfer_client: globus_sdk.TransferClient, collection_id: str ) -> bool: doc = transfer_client.get_endpoint(collection_id) if doc["entity_type"] != "GCSv5_mapped_collection": return False if doc["high_assurance"]: return False return True