"""
API fetching and client management for apiout.
This module provides functionality for:
- Fetching data from API endpoints defined in TOML configurations
- Managing shared client instances across multiple API calls
- Processing post-processors that combine multiple API results
- Serializing API responses according to configuration
"""
import importlib
import inspect
import os
import re
import sys
import time
from pathlib import Path
from typing import Any, Optional, Union
if sys.version_info >= (3, 11):
import tomllib
else:
import tomli as tomllib # type: ignore[import-not-found]
from .serializer import serialize_response
def _substitute_vars(
value: Any,
method_params: Optional[dict[str, Any]] = None,
user_params: Optional[dict[str, str]] = None,
param_defaults: Optional[dict[str, Any]] = None,
) -> Any:
"""
Recursively substitute variables in configuration values.
Supports ${VAR_NAME} syntax with resolution order:
1. user_params (runtime parameters from stdin/CLI)
2. method_params (default method parameters from config)
3. param_defaults (default parameter values from config)
4. Environment variables
If no value is found, the placeholder is left unchanged.
Args:
value: Configuration value (can be str, dict, list, or other type)
method_params: Optional dict of method parameter defaults
user_params: Optional dict of runtime parameters
param_defaults: Optional dict of parameter default values
Returns:
Value with variables substituted
Examples:
>>> _substitute_vars("Bearer ${API_KEY}")
'Bearer secret123' # from env var
>>> _substitute_vars("Bearer ${TOKEN}", user_params={"TOKEN": "abc123"})
'Bearer abc123' # from user_params
>>> _substitute_vars("Bearer ${TOKEN}", method_params={"TOKEN": "default"})
'Bearer default' # from method_params
>>> _substitute_vars("Bearer ${TOKEN}", param_defaults={"TOKEN": "fallback"})
'Bearer fallback' # from param_defaults
"""
if isinstance(value, str):
# Match ${VAR_NAME} patterns
def replacer(match):
var_name = match.group(1)
# Priority: user_params > method_params > param_defaults > env vars
if user_params and var_name in user_params:
return str(user_params[var_name])
elif method_params and var_name in method_params:
return str(method_params[var_name])
elif param_defaults and var_name in param_defaults:
return str(param_defaults[var_name])
else:
return os.environ.get(var_name, match.group(0))
return re.sub(r"\$\{([^}]+)\}", replacer, value)
elif isinstance(value, dict):
return {
k: _substitute_vars(v, method_params, user_params, param_defaults)
for k, v in value.items()
}
elif isinstance(value, list):
return [
_substitute_vars(item, method_params, user_params, param_defaults)
for item in value
]
else:
return value
def _substitute_env_vars(value: Any) -> Any:
"""
Legacy function for backward compatibility.
Recursively substitute environment variables in configuration values.
Supports ${VAR_NAME} syntax. If the environment variable is not set,
the placeholder is left unchanged.
Args:
value: Configuration value (can be str, dict, list, or other type)
Returns:
Value with environment variables substituted
Examples:
>>> os.environ["API_KEY"] = "secret123"
>>> _substitute_env_vars("Bearer ${API_KEY}")
'Bearer secret123'
>>> _substitute_env_vars({"auth": "${API_KEY}", "timeout": 30})
{'auth': 'secret123', 'timeout': 30}
"""
return _substitute_vars(value)
[docs]
def resolve_serializer(
api_config: dict[str, Any],
global_serializers: Optional[dict[str, Any]] = None,
client_ref: Optional[str] = None,
) -> dict[str, Any]:
"""
Resolve serializer configuration from API config with
client-scoped namespace support.
Resolution order:
1. Inline dict (api_config["serializer"] is dict) - highest priority
2. Explicit dotted reference (e.g., "client.serializer_name")
3. Client-scoped lookup (e.g., serializers.{client_ref}.{name})
4. Global lookup (e.g., serializers.{name})
5. Empty dict (no serializer found)
Args:
api_config: API configuration dict containing optional 'serializer' key
global_serializers: Optional dict of named serializer configurations
client_ref: Optional client reference name for scoped serializer lookup
Returns:
Resolved serializer configuration dict, or empty dict if none found
Examples:
>>> # Global serializer
>>> api_config = {"serializer": "my_serializer"}
>>> global_serializers = {"my_serializer": {"fields": {"name": "name"}}}
>>> resolve_serializer(api_config, global_serializers)
{'fields': {'name': 'name'}}
>>> # Client-scoped serializer
>>> api_config = {"serializer": "data", "client": "btc_price"}
>>> global_serializers = {"btc_price.data": {"fields": {"value": "usd"}}}
>>> resolve_serializer(api_config, global_serializers, client_ref="btc_price")
{'fields': {'value': 'usd'}}
>>> # Explicit dotted reference
>>> api_config = {"serializer": "btc_price.data"}
>>> global_serializers = {"btc_price.data": {"fields": {"value": "usd"}}}
>>> resolve_serializer(api_config, global_serializers)
{'fields': {'value': 'usd'}}
"""
serializer_config: Any = api_config.get("serializer", {})
# 1. Inline dict - highest priority
if isinstance(serializer_config, dict):
return serializer_config
if not isinstance(serializer_config, str) or not global_serializers:
return {}
serializer_name = serializer_config
# 2. Explicit dotted reference (e.g., "btc_price.price_data")
if "." in serializer_name:
return global_serializers.get(serializer_name, {})
# 3. Client-scoped lookup
if client_ref:
client_scoped_name = f"{client_ref}.{serializer_name}"
if client_scoped_name in global_serializers:
return global_serializers[client_scoped_name]
# 4. Global lookup (existing behavior - fallback)
return global_serializers.get(serializer_name, {})
def _resolve_client_config(
api_config: dict[str, Any], client_configs: dict[str, Any]
) -> tuple[Optional[str], str, Optional[str], dict[str, Any], Optional[str]]:
"""
Resolve client configuration from API config and client configs.
Returns:
Tuple of (module_name, client_class_name, client_id, init_params,
init_method_name)
"""
module_name = api_config.get("module")
client_ref = api_config.get("client")
if client_ref and client_ref in client_configs:
client_config = client_configs[client_ref]
if not module_name:
module_name = client_config.get("module")
client_class_name = client_config.get("client_class", "Client")
client_id = client_ref
init_params = _substitute_env_vars(client_config.get("init_params", {}))
init_method_name = client_config.get("init_method")
else:
client_class_name = api_config.get("client_class", "Client")
client_id = None
init_params = _substitute_env_vars(api_config.get("init_params", {}))
init_method_name = None
return module_name, client_class_name, client_id, init_params, init_method_name
def _get_or_create_client(
module: Any,
client_class_name: str,
client_id: Optional[str],
init_params: Optional[dict[str, Any]],
init_method_name: Optional[str],
shared_clients: dict[str, Any],
) -> Any:
"""
Get or create a client instance, using a shared cache.
Returns:
Client instance
"""
cache_key = client_id
if client_id and init_params:
cache_key = f"{client_id}:{hash(frozenset(init_params.items()))}"
if cache_key and cache_key in shared_clients:
return shared_clients[cache_key]
client_class = getattr(module, client_class_name)
if init_params:
client = client_class(**init_params)
else:
client = client_class()
if init_method_name:
init_method = getattr(client, init_method_name)
init_method()
if cache_key:
shared_clients[cache_key] = client
return client
def _prepare_method_arguments(
method: Any,
url: str,
params: dict[str, Any],
headers: dict[str, Any],
method_params: dict[str, Any],
user_params: dict[str, str],
) -> tuple[list, dict]:
"""
Prepare arguments and kwargs for the API method call.
Returns:
Tuple of (method_args, method_kwargs)
"""
sig = inspect.signature(method)
param_names = list(sig.parameters.keys())
has_kwargs = any(
p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()
)
method_args = []
method_kwargs = {}
# For HTTP methods like requests.Session.get(), URL should be first argument
if url:
method_args.append(url)
else:
# For non-HTTP methods, pass method_params as positional arguments
# in the order they appear in the method signature
for param_name in param_names:
if param_name in method_params:
method_args.append(method_params[param_name])
# Add params and headers as kwargs if method accepts **kwargs
if has_kwargs:
if params:
method_kwargs["params"] = params
if headers:
method_kwargs["headers"] = headers
return method_args, method_kwargs
[docs]
def fetch_api_data(
api_config: dict[str, Any],
global_serializers: Optional[dict[str, Any]] = None,
shared_clients: Optional[dict[str, Any]] = None,
client_configs: Optional[dict[str, Any]] = None,
user_params: Optional[dict[str, str]] = None,
) -> Any:
"""
Fetch data from an API endpoint based on configuration.
Dynamically imports a module, instantiates or reuses a client class,
and calls the specified method. Supports shared client instances when
using client references.
Args:
api_config: API configuration dict with keys:
- module: Python module to import (required)
- method: Method name to call on client (required)
- client: Reference to a client config name (optional)
- client_class: Class name to instantiate (default: "Client")
- init_params: Params for client initialization (optional)
- url: URL parameter to pass to method (optional)
- params: Additional parameters for method (optional)
- method_params: Default values for method parameters (optional)
- serializer: Serializer config or reference (optional)
global_serializers: Named serializer configurations
shared_clients: Dict to store/retrieve shared client instances
client_configs: Dict of named client configurations
user_params: Dict of user-provided runtime parameters
Returns:
Serialized API response data, or error dict if fetch failed
Example:
>>> api_config = {
... "module": "requests",
... "client_class": "Session",
... "method": "get",
... "url": "https://api.example.com/data"
... }
>>> result = fetch_api_data(api_config)
"""
if shared_clients is None:
shared_clients = {}
if client_configs is None:
client_configs = {}
if user_params is None:
user_params = {}
try:
method_name = api_config.get("method")
(
module_name,
client_class_name,
client_id,
init_params,
init_method_name,
) = _resolve_client_config(api_config, client_configs)
if not module_name:
return {"error": "No module specified"}
if not method_name:
return {"error": "No method specified"}
method_params = api_config.get("method_params", {})
param_defaults = api_config.get("param_defaults", {})
# Merge user_params with method_params, giving priority to user_params
if user_params:
merged_method_params = method_params.copy()
for key, value in user_params.items():
merged_method_params[key] = value
method_params = merged_method_params
# Only override init_params with user_params if the key is NOT in method_params
# This prevents method_params from interfering with init_params
if user_params and init_params:
original_method_params = api_config.get("method_params", {})
for key, value in user_params.items():
if key in init_params and key not in original_method_params:
init_params[key] = value
module = importlib.import_module(module_name)
client = _get_or_create_client(
module,
client_class_name,
client_id,
init_params,
init_method_name,
shared_clients,
)
method = getattr(client, method_name)
# Apply variable substitution to all string fields
url = _substitute_vars(
api_config.get("url", ""), method_params, user_params, param_defaults
)
params = _substitute_vars(
api_config.get("params", {}), method_params, user_params, param_defaults
)
headers = _substitute_vars(
api_config.get("headers", {}), method_params, user_params, param_defaults
)
if user_params and isinstance(params, dict):
for key, value in user_params.items():
if key in params or key not in method_params:
params[key] = value
if callable(method):
method_args, method_kwargs = _prepare_method_arguments(
method, url, params, headers, method_params, user_params
)
responses = method(*method_args, **method_kwargs)
else:
responses = method
client_ref = api_config.get("client")
serializer_config = resolve_serializer(
api_config, global_serializers, client_ref=client_ref
)
return serialize_response(responses, serializer_config)
except ImportError as e:
return {"error": f"Failed to import module: {e}"}
except AttributeError as e:
return {"error": f"Failed to access class or method: {e}"}
except Exception as e:
return {"error": f"Failed to fetch data: {e}"}
[docs]
def process_post_processor(
post_processor_config: dict[str, Any],
api_results: dict[str, Any],
global_serializers: Optional[dict[str, Any]] = None,
) -> Any:
"""
Process data from multiple APIs using a post-processor class.
Post-processors combine results from multiple API calls by instantiating
a class with the API results as arguments, or calling a method on an
instance with the results.
Args:
post_processor_config: Post-processor configuration dict with keys:
- module: Python module to import (required)
- class: Class name to instantiate (required)
- inputs: List of API result names to pass as args (required)
- method: Method name to call on instance (optional)
- serializer: Serializer config or reference (optional)
api_results: Dict of API results by name
global_serializers: Named serializer configurations
Returns:
Serialized post-processor result, or error dict if processing failed
Example:
>>> post_processor_config = {
... "module": "mymodule",
... "class": "DataCombiner",
... "inputs": ["api1", "api2"]
... }
>>> api_results = {"api1": {"value": 1}, "api2": {"value": 2}}
>>> result = process_post_processor(post_processor_config, api_results)
"""
try:
module_name = post_processor_config.get("module")
if not module_name:
return {"error": "No module specified for post-processor"}
class_name = post_processor_config.get("class")
if not class_name:
return {"error": "No class specified for post-processor"}
inputs = post_processor_config.get("inputs", [])
if not inputs:
return {"error": "No inputs specified for post-processor"}
for input_name in inputs:
if input_name not in api_results:
return {
"error": f"Required input '{input_name}' not found in API results"
}
module = importlib.import_module(module_name)
processor_class = getattr(module, class_name)
input_data = [api_results[input_name] for input_name in inputs]
method_name = post_processor_config.get("method")
if method_name:
processor_instance = processor_class()
method = getattr(processor_instance, method_name)
result = method(*input_data)
else:
result = processor_class(*input_data)
serializer_config = resolve_serializer(
post_processor_config, global_serializers
)
return serialize_response(result, serializer_config)
except ImportError as e:
return {"error": f"Failed to import post-processor module: {e}"}
except AttributeError as e:
return {"error": f"Failed to access post-processor class or method: {e}"}
except Exception as e:
return {"error": f"Failed to process post-processor: {e}"}
[docs]
class ApiClient:
"""
Stateful API client with configuration management and result caching.
ApiClient provides a high-level interface for loading API configurations
from TOML files, fetching data from multiple APIs with shared client
instances, and caching results for repeated access without re-fetching.
Supports:
- Loading single or multiple TOML configuration files
- Automatic merging of APIs, serializers, and post-processors
- Shared client instance management via client references
- Result caching with success/failure tracking
- Timestamp tracking for each API call
Attributes:
config_paths: List of loaded configuration file paths
apis: List of API configurations from all loaded files
serializers: Dict of named serializer configurations
post_processors: List of post-processor configurations
clients: Dict of named client configurations
shared_clients: Dict of shared client instances by reference name
results: Dict of API results by name (cached after fetch)
status: Dict of status info by name (success, error, timestamp)
last_fetch_time: Timestamp of the most recent fetch() call
Example:
>>> # Single config file
>>> client = ApiClient("config.toml")
>>> results = client.fetch()
>>> cached = client.get_results()
>>>
>>> # Multiple config files
>>> client = ApiClient(["api_config.toml", "serializers.toml"])
>>> results = client.fetch()
>>> status = client.get_status()
>>> successful = client.get_successful_results()
"""
[docs]
def __init__(
self,
config_paths: Union[str, Path, list[Union[str, Path]]],
user_params: Optional[dict[str, str]] = None,
):
"""
Initialize ApiClient with one or more configuration files.
Args:
config_paths: Single path or list of paths to TOML configuration files.
All configs are loaded and merged during initialization.
user_params: Optional dict of user-provided runtime parameters
"""
if isinstance(config_paths, (str, Path)):
config_paths = [config_paths]
self.config_paths = [Path(p) for p in config_paths]
self.user_params = user_params or {}
self.apis = []
self.serializers = {}
self.post_processors = []
self.clients = {}
for config_path in self.config_paths:
config = self._load_config(config_path)
self.apis.extend(config.get("apis", []))
self.serializers.update(config.get("serializers", {}))
self.post_processors.extend(config.get("post_processors", []))
self.clients.update(config.get("clients", {}))
self.shared_clients: dict[str, Any] = {}
self.results: dict[str, Any] = {}
self.status: dict[str, dict[str, Any]] = {}
self.last_fetch_time: Optional[float] = None
def _load_config(self, config_path: Path) -> dict[str, Any]:
"""
Load a TOML configuration file.
Args:
config_path: Path to TOML file
Returns:
Parsed configuration dict
"""
with open(config_path, "rb") as f:
return tomllib.load(f)
[docs]
def fetch(self) -> dict[str, Any]:
"""
Fetch data from all configured APIs and post-processors.
Executes all API calls using shared client instances where configured,
then runs post-processors on the results. Updates results, status,
and last_fetch_time attributes.
Returns:
Dict mapping API/post-processor names to their results
Example:
>>> client = ApiClient("config.toml")
>>> results = client.fetch()
>>> print(results["my_api"])
{'data': 'value'}
"""
self.last_fetch_time = time.time()
for api_config in self.apis:
api_name = api_config.get("name", "unknown")
method_params = api_config.get("method_params", {})
if method_params:
missing = [
param_name
for param_name, param_value in method_params.items()
if (param_value == "" or param_value is None)
and param_name not in self.user_params
]
if missing:
self.status[api_name] = {
"success": False,
"error": f"Missing required parameter(s): {', '.join(missing)}",
"timestamp": time.time(),
}
continue
try:
result = fetch_api_data(
api_config,
global_serializers=self.serializers,
shared_clients=self.shared_clients,
client_configs=self.clients,
user_params=self.user_params,
)
has_error = isinstance(result, dict) and "error" in result
self.results[api_name] = result
self.status[api_name] = {
"success": not has_error,
"error": result.get("error") if has_error else None,
"timestamp": time.time(),
}
except Exception as e:
self.status[api_name] = {
"success": False,
"error": str(e),
"timestamp": time.time(),
}
for pp_config in self.post_processors:
pp_name = pp_config.get("name", "unknown")
try:
result = process_post_processor(
pp_config, self.results, global_serializers=self.serializers
)
has_error = isinstance(result, dict) and "error" in result
self.results[pp_name] = result
self.status[pp_name] = {
"success": not has_error,
"error": result.get("error") if has_error else None,
"timestamp": time.time(),
}
except Exception as e:
self.status[pp_name] = {
"success": False,
"error": str(e),
"timestamp": time.time(),
}
return self.results
[docs]
def get_results(self) -> dict[str, Any]:
"""
Get cached results without re-fetching.
Returns:
Dict of cached results from the last fetch() call
Example:
>>> client = ApiClient("config.toml")
>>> client.fetch()
>>> cached = client.get_results() # No network call
"""
return self.results
[docs]
def get_status(self) -> dict[str, dict]:
"""
Get status information for all APIs and post-processors.
Returns:
Dict mapping names to status dicts with keys:
- success: bool indicating if fetch/processing succeeded
- error: error message if failed, None otherwise
- timestamp: Unix timestamp of the operation
Example:
>>> client = ApiClient("config.toml")
>>> client.fetch()
>>> status = client.get_status()
>>> print(status["my_api"])
{'success': True, 'error': None, 'timestamp': 1234567890.123}
"""
return self.status
[docs]
def get_successful_results(self) -> dict[str, Any]:
"""
Get only results from successful API calls and post-processors.
Returns:
Dict containing only results where status['success'] is True
Example:
>>> client = ApiClient("config.toml")
>>> client.fetch()
>>> successful = client.get_successful_results()
"""
return {
name: result
for name, result in self.results.items()
if self.status.get(name, {}).get("success", False)
}