Source code for globus_sdk.services.timers.data

from __future__ import annotations

# the name "datetime" is used in this module, so use an alternative name
# in order to avoid name shadowing
import datetime as dt
import logging
import typing as t
import uuid

from globus_sdk._missing import MISSING, MissingType
from globus_sdk._payload import GlobusPayload
from globus_sdk.services.transfer import TransferData

log = logging.getLogger(__name__)


[docs] class TransferTimer(GlobusPayload): """ A helper for defining a payload for Transfer Timer creation. Use this along with :meth:`create_timer <globus_sdk.TimersClient.create_timer>` to create a timer. .. note:: ``TimersClient`` has two methods for creating timers, ``create_timer`` and ``create_job``. ``create_job`` uses a different API -- only ``create_timer`` will work with this helper class. Users are strongly recommended to use ``create_timer`` and this helper for timer creation. :param name: A name to identify this timer :param schedule: The schedule on which the timer runs :param body: A transfer payload for the timer to use. If it includes ``submission_id`` or ``skip_activation_check``, these parameters will be removed, as they are not supported in timers. The ``schedule`` field determines when the timer will run. Timers may be "run once" or "recurring", and "recurring" timers may specify an end date or a number of executions after which the timer will stop. A ``schedule`` is specified as a dict, but the SDK provides two useful helpers for constructing these data. **Example Schedules** .. tab-set:: .. tab-item:: Run Once, Right Now .. code-block:: python schedule = OnceTimerSchedule() .. tab-item:: Run Once, At a Specific Time .. code-block:: python schedule = OnceTimerSchedule(datetime="2023-09-22T00:00:00Z") .. tab-item:: Run Every 5 Minutes, Until a Specific Time .. code-block:: python schedule = RecurringTimerSchedule( interval_seconds=300, end={"condition": "time", "datetime": "2023-10-01T00:00:00Z"}, ) .. tab-item:: Run Every 30 Minutes, 10 Times .. code-block:: python schedule = RecurringTimerSchedule( interval_seconds=1800, end={"condition": "iterations", "iterations": 10}, ) .. tab-item:: Run Every 10 Minutes, Indefinitely .. code-block:: python schedule = RecurringTimerSchedule(interval_seconds=600) Using these schedules, you can create a timer from a ``TransferData`` object: .. code-block:: pycon >>> from globus_sdk import TransferData, TransferTimer >>> schedule = ... >>> transfer_data = TransferData(...) >>> timer = TransferTimer( ... name="my timer", ... schedule=schedule, ... body=transfer_data, ... ) Submit the timer to the Timers service with :meth:`create_timer <globus_sdk.TimersClient.create_timer>`. """ def __init__( self, *, name: str | MissingType = MISSING, schedule: dict[str, t.Any] | RecurringTimerSchedule | OnceTimerSchedule, body: dict[str, t.Any] | TransferData, ) -> None: super().__init__() self["timer_type"] = "transfer" self["name"] = name self["schedule"] = schedule self["body"] = self._preprocess_body(body) def _preprocess_body( self, body: dict[str, t.Any] | TransferData ) -> dict[str, t.Any]: # shallow-copy for dicts, convert any TransferData to a dict new_body = dict(body) # remove the skip_activation_check and submission_id parameters unconditionally # (not supported in timers, but often present in TransferData) new_body.pop("submission_id", None) new_body.pop("skip_activation_check", None) return new_body
[docs] class FlowTimer(GlobusPayload): """ A helper for defining a payload for Flow Timer creation. Use this along with :meth:`create_timer <globus_sdk.TimersClient.create_timer>` to create a timer. .. note:: ``TimersClient`` has two methods for creating timers: ``create_timer`` and ``create_job``. This helper class only works with the ``create_timer`` method. :param flow_id: The flow ID to run when the timer runs. :param name: A name to identify this timer. :param schedule: The schedule on which the timer runs :param body: A transfer payload for the timer to use. The ``schedule`` field determines when the timer will run. Timers may be "run once" or "recurring", and "recurring" timers may specify an end date or the number of executions after which the timer will stop. A ``schedule`` is specified as a dict, but the SDK provides two helpers for constructing these data. **Example Schedules** .. tab-set:: .. tab-item:: Run Once, Right Now .. code-block:: python schedule = OnceTimerSchedule() .. tab-item:: Run Once, At a Specific Time .. code-block:: python schedule = OnceTimerSchedule(datetime="2023-09-22T00:00:00Z") .. tab-item:: Run Every 5 Minutes, Until a Specific Time .. code-block:: python schedule = RecurringTimerSchedule( interval_seconds=300, end={"condition": "time", "datetime": "2023-10-01T00:00:00Z"}, ) .. tab-item:: Run Every 30 Minutes, 10 Times .. code-block:: python schedule = RecurringTimerSchedule( interval_seconds=1800, end={"condition": "iterations", "iterations": 10}, ) .. tab-item:: Run Every 10 Minutes, Indefinitely .. code-block:: python schedule = RecurringTimerSchedule(interval_seconds=600) Using these schedules, you can create a timer: .. code-block:: pycon >>> from globus_sdk import FlowTimer >>> schedule = ... >>> timer = FlowTimer( ... name="my timer", ... flow_id="00000000-19a9-44e6-9c1a-867da59d84ab", ... schedule=schedule, ... body={ ... "body": { ... "input_key": "input_value", ... }, ... "run_managers": [ ... "urn:globus:auth:identity:11111111-be6a-473a-a027-4cfe4ceeafe3" ... ], ... }, ... ) Submit the timer to the Timers service with :meth:`create_timer <globus_sdk.TimersClient.create_timer>`. """ def __init__( self, *, flow_id: uuid.UUID | str, name: str | MissingType = MISSING, schedule: dict[str, t.Any] | RecurringTimerSchedule | OnceTimerSchedule, body: dict[str, t.Any], ) -> None: super().__init__() self["timer_type"] = "flow" self["flow_id"] = flow_id self["name"] = name self["schedule"] = schedule self["body"] = self._preprocess_body(body) def _preprocess_body(self, body: dict[str, t.Any]) -> dict[str, t.Any]: # Additional processing may be added in the future. return body.copy()
[docs] class RecurringTimerSchedule(GlobusPayload): """ A helper used as part of a *timer* to define when the *timer* will run. A ``RecurringTimerSchedule`` is used to describe a *timer* which runs repeatedly until some end condition is reached. :param interval_seconds: The number of seconds between each run of the timer. :param start: The time at which to start the timer, either as an ISO 8601 string with timezone information, or as a ``datetime.datetime`` object. :param end: The end condition for the timer, as a dict. This either expresses a number of iterations for the timer or an end date. Example ``end`` conditions: .. code-block:: python # run 10 times end = {"condition": "iterations", "iterations": 10} # run until a specific date end = {"condition": "time", "datetime": "2023-10-01T00:00:00Z"} If the end condition is ``time``, then the ``datetime`` value can be expressed as a python ``datetime`` type as well, e.g. .. code-block:: python # end in 10 days end = { "condition": "time", "datetime": datetime.datetime.now() + datetime.timedelta(days=10), } """ def __init__( self, interval_seconds: int, start: str | dt.datetime | MissingType = MISSING, end: dict[str, t.Any] | MissingType = MISSING, ) -> None: super().__init__() self["type"] = "recurring" self["interval_seconds"] = interval_seconds self["start"] = _format_date(start) self["end"] = end # if a datetime is given for part of the end condition, format it (and # shallow-copy the end condition) # primarily, this handles # end={"condition": "time", "datetime": <some-datetime>} if isinstance(end, dict): self["end"] = { k: (_format_date(v) if isinstance(v, dt.datetime) else v) for k, v in end.items() }
[docs] class OnceTimerSchedule(GlobusPayload): """ A helper used as part of a *timer* to define when the *timer* will run. A ``OnceTimerSchedule`` is used to describe a *timer* which runs exactly once. It may be scheduled for a time in the future. :param datetime: The time at which to run the timer, either as an ISO 8601 string with timezone information, or as a ``datetime.datetime`` object. """ def __init__( self, datetime: str | dt.datetime | MissingType = MISSING, ) -> None: super().__init__() self["type"] = "once" self["datetime"] = _format_date(datetime)
[docs] class TimerJob(GlobusPayload): r""" .. warning:: This method of specifying and creating Timers for data transfer is now deprecated. Users should use ``TimerData`` instead. ``TimerJob`` is still supported for non-transfer use-cases. Helper for creating a timer in the Timers service. Used as the ``data`` argument in :meth:`create_job <globus_sdk.TimersClient.create_job>`. The ``callback_url`` parameter should always be the URL used to run an action provider. :param callback_url: URL for the action which the Timers job will use. :param callback_body: JSON data which Timers will send to the Action Provider on each invocation :param start: The datetime at which to start the Timers job. :param interval: The interval at which the Timers job should recur. Interpreted as seconds if specified as an integer. If ``stop_after_n == 1``, i.e. the job is set to run only a single time, then interval *must* be None. :param name: A (not necessarily unique) name to identify this job in Timers :param stop_after: A date after which the Timers job will stop running :param stop_after_n: A number of executions after which the Timers job will stop :param scope: Timers defaults to the Transfer 'all' scope. Use this parameter to change the scope used by Timers when calling the Transfer Action Provider. .. automethodlist:: globus_sdk.TimerJob """ def __init__( self, callback_url: str, callback_body: dict[str, t.Any], start: dt.datetime | str, interval: dt.timedelta | int | None, *, name: str | None = None, stop_after: dt.datetime | None = None, stop_after_n: int | None = None, scope: str | None = None, ) -> None: super().__init__() self["callback_url"] = callback_url self["callback_body"] = callback_body if isinstance(start, dt.datetime): self["start"] = start.isoformat() else: self["start"] = start if isinstance(interval, dt.timedelta): self["interval"] = int(interval.total_seconds()) else: self["interval"] = interval if name is not None: self["name"] = name if stop_after is not None: self["stop_after"] = stop_after.isoformat() if stop_after_n is not None: self["stop_after_n"] = stop_after_n if scope is not None: self["scope"] = scope
def _format_date(date: str | dt.datetime | MissingType) -> str | MissingType: if isinstance(date, dt.datetime): if date.tzinfo is None: date = date.astimezone(dt.timezone.utc) return date.isoformat(timespec="seconds") else: return date