from __future__ import annotations
# the name "datetime" is used in this module, so use an alternative name
# in order to avoid name shadowing
import datetime as dt
import logging
import typing as t
import uuid
from globus_sdk._missing import MISSING, MissingType
from globus_sdk._payload import GlobusPayload
from globus_sdk.services.transfer import TransferData
log = logging.getLogger(__name__)
[docs]
class TransferTimer(GlobusPayload):
"""
A helper for defining a payload for Transfer Timer creation.
Use this along with :meth:`create_timer <globus_sdk.TimersClient.create_timer>` to
create a timer.
.. note::
``TimersClient`` has two methods for creating timers, ``create_timer`` and
``create_job``.
``create_job`` uses a different API -- only ``create_timer`` will work with
this helper class.
Users are strongly recommended to use ``create_timer`` and this helper for
timer creation.
:param name: A name to identify this timer
:param schedule: The schedule on which the timer runs
:param body: A transfer payload for the timer to use. If it includes
``submission_id`` or ``skip_activation_check``, these parameters will be
removed, as they are not supported in timers.
The ``schedule`` field determines when the timer will run.
Timers may be "run once" or "recurring", and "recurring" timers may specify an end
date or a number of executions after which the timer will stop. A ``schedule`` is
specified as a dict, but the SDK provides two useful helpers for constructing these
data.
**Example Schedules**
.. tab-set::
.. tab-item:: Run Once, Right Now
.. code-block:: python
schedule = OnceTimerSchedule()
.. tab-item:: Run Once, At a Specific Time
.. code-block:: python
schedule = OnceTimerSchedule(datetime="2023-09-22T00:00:00Z")
.. tab-item:: Run Every 5 Minutes, Until a Specific Time
.. code-block:: python
schedule = RecurringTimerSchedule(
interval_seconds=300,
end={"condition": "time", "datetime": "2023-10-01T00:00:00Z"},
)
.. tab-item:: Run Every 30 Minutes, 10 Times
.. code-block:: python
schedule = RecurringTimerSchedule(
interval_seconds=1800,
end={"condition": "iterations", "iterations": 10},
)
.. tab-item:: Run Every 10 Minutes, Indefinitely
.. code-block:: python
schedule = RecurringTimerSchedule(interval_seconds=600)
Using these schedules, you can create a timer from a ``TransferData`` object:
.. code-block:: pycon
>>> from globus_sdk import TransferData, TransferTimer
>>> schedule = ...
>>> transfer_data = TransferData(...)
>>> timer = TransferTimer(
... name="my timer",
... schedule=schedule,
... body=transfer_data,
... )
Submit the timer to the Timers service with
:meth:`create_timer <globus_sdk.TimersClient.create_timer>`.
"""
def __init__(
self,
*,
name: str | MissingType = MISSING,
schedule: dict[str, t.Any] | RecurringTimerSchedule | OnceTimerSchedule,
body: dict[str, t.Any] | TransferData,
) -> None:
super().__init__()
self["timer_type"] = "transfer"
self["name"] = name
self["schedule"] = schedule
self["body"] = self._preprocess_body(body)
def _preprocess_body(
self, body: dict[str, t.Any] | TransferData
) -> dict[str, t.Any]:
# shallow-copy for dicts, convert any TransferData to a dict
new_body = dict(body)
# remove the skip_activation_check and submission_id parameters unconditionally
# (not supported in timers, but often present in TransferData)
new_body.pop("submission_id", None)
new_body.pop("skip_activation_check", None)
return new_body
[docs]
class FlowTimer(GlobusPayload):
"""
A helper for defining a payload for Flow Timer creation.
Use this along with :meth:`create_timer <globus_sdk.TimersClient.create_timer>` to
create a timer.
.. note::
``TimersClient`` has two methods for creating timers:
``create_timer`` and ``create_job``.
This helper class only works with the ``create_timer`` method.
:param flow_id: The flow ID to run when the timer runs.
:param name: A name to identify this timer.
:param schedule: The schedule on which the timer runs
:param body: A transfer payload for the timer to use.
The ``schedule`` field determines when the timer will run.
Timers may be "run once" or "recurring", and "recurring" timers may specify an end
date or the number of executions after which the timer will stop.
A ``schedule`` is specified as a dict, but the SDK provides two helpers
for constructing these data.
**Example Schedules**
.. tab-set::
.. tab-item:: Run Once, Right Now
.. code-block:: python
schedule = OnceTimerSchedule()
.. tab-item:: Run Once, At a Specific Time
.. code-block:: python
schedule = OnceTimerSchedule(datetime="2023-09-22T00:00:00Z")
.. tab-item:: Run Every 5 Minutes, Until a Specific Time
.. code-block:: python
schedule = RecurringTimerSchedule(
interval_seconds=300,
end={"condition": "time", "datetime": "2023-10-01T00:00:00Z"},
)
.. tab-item:: Run Every 30 Minutes, 10 Times
.. code-block:: python
schedule = RecurringTimerSchedule(
interval_seconds=1800,
end={"condition": "iterations", "iterations": 10},
)
.. tab-item:: Run Every 10 Minutes, Indefinitely
.. code-block:: python
schedule = RecurringTimerSchedule(interval_seconds=600)
Using these schedules, you can create a timer:
.. code-block:: pycon
>>> from globus_sdk import FlowTimer
>>> schedule = ...
>>> timer = FlowTimer(
... name="my timer",
... flow_id="00000000-19a9-44e6-9c1a-867da59d84ab",
... schedule=schedule,
... body={
... "body": {
... "input_key": "input_value",
... },
... "run_managers": [
... "urn:globus:auth:identity:11111111-be6a-473a-a027-4cfe4ceeafe3"
... ],
... },
... )
Submit the timer to the Timers service with
:meth:`create_timer <globus_sdk.TimersClient.create_timer>`.
"""
def __init__(
self,
*,
flow_id: uuid.UUID | str,
name: str | MissingType = MISSING,
schedule: dict[str, t.Any] | RecurringTimerSchedule | OnceTimerSchedule,
body: dict[str, t.Any],
) -> None:
super().__init__()
self["timer_type"] = "flow"
self["flow_id"] = flow_id
self["name"] = name
self["schedule"] = schedule
self["body"] = self._preprocess_body(body)
def _preprocess_body(self, body: dict[str, t.Any]) -> dict[str, t.Any]:
# Additional processing may be added in the future.
return body.copy()
[docs]
class RecurringTimerSchedule(GlobusPayload):
"""
A helper used as part of a *timer* to define when the *timer* will run.
A ``RecurringTimerSchedule`` is used to describe a *timer* which runs repeatedly
until some end condition is reached.
:param interval_seconds: The number of seconds between each run of the timer.
:param start: The time at which to start the timer, either as an ISO 8601 string
with timezone information, or as a ``datetime.datetime`` object.
:param end: The end condition for the timer, as a dict. This either expresses a
number of iterations for the timer or an end date.
Example ``end`` conditions:
.. code-block:: python
# run 10 times
end = {"condition": "iterations", "iterations": 10}
# run until a specific date
end = {"condition": "time", "datetime": "2023-10-01T00:00:00Z"}
If the end condition is ``time``, then the ``datetime`` value can be expressed as a
python ``datetime`` type as well, e.g.
.. code-block:: python
# end in 10 days
end = {
"condition": "time",
"datetime": datetime.datetime.now() + datetime.timedelta(days=10),
}
"""
def __init__(
self,
interval_seconds: int,
start: str | dt.datetime | MissingType = MISSING,
end: dict[str, t.Any] | MissingType = MISSING,
) -> None:
super().__init__()
self["type"] = "recurring"
self["interval_seconds"] = interval_seconds
self["start"] = _format_date(start)
self["end"] = end
# if a datetime is given for part of the end condition, format it (and
# shallow-copy the end condition)
# primarily, this handles
# end={"condition": "time", "datetime": <some-datetime>}
if isinstance(end, dict):
self["end"] = {
k: (_format_date(v) if isinstance(v, dt.datetime) else v)
for k, v in end.items()
}
[docs]
class OnceTimerSchedule(GlobusPayload):
"""
A helper used as part of a *timer* to define when the *timer* will run.
A ``OnceTimerSchedule`` is used to describe a *timer* which runs exactly once.
It may be scheduled for a time in the future.
:param datetime: The time at which to run the timer, either as an ISO 8601
string with timezone information, or as a ``datetime.datetime`` object.
"""
def __init__(
self,
datetime: str | dt.datetime | MissingType = MISSING,
) -> None:
super().__init__()
self["type"] = "once"
self["datetime"] = _format_date(datetime)
[docs]
class TimerJob(GlobusPayload):
r"""
.. warning::
This method of specifying and creating Timers for data transfer is now
deprecated. Users should use ``TimerData`` instead.
``TimerJob`` is still supported for non-transfer use-cases.
Helper for creating a timer in the Timers service. Used as the ``data``
argument in :meth:`create_job <globus_sdk.TimersClient.create_job>`.
The ``callback_url`` parameter should always be the URL used to run an
action provider.
:param callback_url: URL for the action which the Timers job will use.
:param callback_body: JSON data which Timers will send to the Action Provider on
each invocation
:param start: The datetime at which to start the Timers job.
:param interval: The interval at which the Timers job should recur. Interpreted as
seconds if specified as an integer. If ``stop_after_n == 1``, i.e. the job is
set to run only a single time, then interval *must* be None.
:param name: A (not necessarily unique) name to identify this job in Timers
:param stop_after: A date after which the Timers job will stop running
:param stop_after_n: A number of executions after which the Timers job will stop
:param scope: Timers defaults to the Transfer 'all' scope. Use this parameter to
change the scope used by Timers when calling the Transfer Action Provider.
.. automethodlist:: globus_sdk.TimerJob
"""
def __init__(
self,
callback_url: str,
callback_body: dict[str, t.Any],
start: dt.datetime | str,
interval: dt.timedelta | int | None,
*,
name: str | None = None,
stop_after: dt.datetime | None = None,
stop_after_n: int | None = None,
scope: str | None = None,
) -> None:
super().__init__()
self["callback_url"] = callback_url
self["callback_body"] = callback_body
if isinstance(start, dt.datetime):
self["start"] = start.isoformat()
else:
self["start"] = start
if isinstance(interval, dt.timedelta):
self["interval"] = int(interval.total_seconds())
else:
self["interval"] = interval
if name is not None:
self["name"] = name
if stop_after is not None:
self["stop_after"] = stop_after.isoformat()
if stop_after_n is not None:
self["stop_after_n"] = stop_after_n
if scope is not None:
self["scope"] = scope
def _format_date(date: str | dt.datetime | MissingType) -> str | MissingType:
if isinstance(date, dt.datetime):
if date.tzinfo is None:
date = date.astimezone(dt.timezone.utc)
return date.isoformat(timespec="seconds")
else:
return date