Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(taps): Add api costs hook #704

Merged
merged 14 commits into from
Jun 21, 2022
15 changes: 15 additions & 0 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class Stream(metaclass=abc.ABCMeta):
parent_stream_type: Optional[Type["Stream"]] = None
ignore_parent_replication_key: bool = False

# Internal API cost aggregator
_api_costs: Dict[str, int] = {}

def __init__(
self,
tap: TapBaseClass,
Expand Down Expand Up @@ -854,6 +857,18 @@ def _write_request_duration_log(
extra_tags["context"] = context
self._write_metric_log(metric=request_duration_metric, extra_tags=extra_tags)

def log_api_costs(self) -> None:
"""Log a summary of API costs.

The costs are calculated via `calculate_api_request_cost`.
This method can be overridden to log results in a custom
format. It is only called once at the end of the life of
the stream.
"""
if len(self._api_costs) > 0:
msg = f"Total API costs for stream {self.name}: {self._api_costs}"
self.logger.info(msg)

def _check_max_record_limit(self, record_count: int) -> None:
"""TODO.

Expand Down
57 changes: 57 additions & 0 deletions singer_sdk/streams/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]:
context, next_page_token=next_page_token
)
resp = decorated_request(prepared_request, context)
self.update_api_costs(prepared_request, resp, context)
yield from self.parse_response(resp)
previous_token = copy.deepcopy(next_page_token)
next_page_token = self.get_next_page_token(
Expand All @@ -333,8 +334,64 @@ def request_records(self, context: Optional[dict]) -> Iterable[dict]:
# Cycle until get_next_page_token() no longer returns a value
finished = not next_page_token

def update_api_costs(
self,
request: requests.PreparedRequest,
response: requests.Response,
context: Optional[Dict],
) -> Dict[str, int]:
"""Update internal calculation of API costs.

Args:
request: the API Request object that was just called.
response: the `requests.Response` object
context: the context passed to the call

Returns:
A dict of costs (for the single request) whose keys are
the "cost domains". See `calculate_api_request_cost` for details.
"""
call_costs = self.calculate_api_request_cost(request, response, context)
self._api_costs = {
k: self._api_costs.get(k, 0) + call_costs.get(k, 0)
for k in call_costs.keys()
}
return self._api_costs

# Overridable:

def calculate_api_request_cost(
self,
request: requests.PreparedRequest,
response: requests.Response,
context: Optional[Dict],
) -> Dict[str, int]:
"""Calculate the cost of the last API call made.

This method can optionally be implemented in streams to calculate
the costs (in arbitrary units to be defined by the tap developer)
associated with a single API call. The request and response objects
are available in the callback, as well as the context.

The method returns a dict where the keys are arbitrary cost dimensions,
and the values the cost along each dimension for this one call. For
instance: { "rest": 0, "graphql": 42 } for a call to github's graphql API.
All keys should be present in the dict.

This method can be overridden by tap streams. By default it won't do
anything.

Args:
request: the API Request object that was just called.
response: the `requests.Response` object
context: the context passed to the call

Returns:
A dict of accumulated costs whose keys are the "cost domains". See
`calculate_api_request_cost` for details.
"""
return {}

def prepare_request_payload(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Optional[dict]:
Expand Down
3 changes: 3 additions & 0 deletions singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,9 @@ def sync_all(self) -> None:
stream.sync()
stream.finalize_state_progress_markers()

for stream in self.streams.values():
stream.log_api_costs()

# Command Line Execution

@classproperty
Expand Down