Client Interface¶
The client module provides the main interface for interacting with Athena services, including the primary client class and authentication helpers.
AthenaClient¶
The main client class for image classification. Provides an async context manager interface for proper resource management and cleanup.
- Example:
>>> async with AthenaClient(channel, options) as client: ... results = client.classify_images(image_iterator) ... async for result in results: ... print(result.outputs)
Channel Management¶
- async create_channel_with_credentials(host, credential_helper)[source]¶
Create a gRPC channel with OAuth credential helper.
Args:¶
host: The host address to connect to credential_helper: The credential helper for OAuth authentication
Returns:¶
A secure gRPC channel with OAuth authentication
Raises:¶
InvalidHostError: If host is empty OAuthError: If OAuth authentication fails
- Parameters:
host (str)
credential_helper (CredentialHelper)
- Return type:
- class CredentialHelper(client_id, client_secret, auth_url='https://crispthinking.auth0.com/oauth/token', audience='crisp-athena-live')[source]¶
Bases:
object
OAuth credential helper for managing authentication tokens.
OAuth credential helper for automatic token management and refresh.
- Example:
>>> credential_helper = CredentialHelper( ... client_id="your-client-id", ... client_secret="your-client-secret" ... ) >>> channel = await create_channel_with_credentials( ... host="your-host", ... credential_helper=credential_helper ... )
- __init__(client_id, client_secret, auth_url='https://crispthinking.auth0.com/oauth/token', audience='crisp-athena-live')[source]¶
Initialize the credential helper.
Args:¶
client_id: OAuth client ID client_secret: OAuth client secret auth_url: OAuth token endpoint URL audience: OAuth audience
- async get_token()[source]¶
Get a valid authentication token.
This method will return a cached token if it’s still valid, or fetch a new token if needed.
- Return type:
A valid authentication token
- Raises:
OAuthError – If token acquisition fails:
TokenExpiredError – If token has expired and refresh fails:
Deployment Selection¶
See Deployment Selector for deployment discovery and selection functionality.
Correlation and Tracing¶
- class CorrelationProvider[source]
Bases:
ABC
Abstract base class defining the contract for correlation ID generation.
This class serves as an interface for different strategies of generating correlation IDs. Implementations can use various methods such as hashing, UUIDs, or other approaches to generate unique identifiers.
- abstractmethod get_correlation_id(input_data)[source]
Generate a correlation ID for the given input data.
Args:¶
- input_data: Data to use as the basis for correlation ID generation.
The type and structure of this data depends on the specific implementation.
Returns:¶
A string containing the generated correlation ID.
Raises:¶
- ValueError: If the input data is not in a format supported by
the implementation.
- class HashCorrelationProvider[source]
Bases:
CorrelationProvider
Generates correlation IDs by hashing the input data.
This implementation uses SHA-256 to generate a deterministic hash of the input data’s bytes, which serves as the correlation ID.
Hash-based correlation ID provider for deterministic correlation IDs.
- Example:
>>> provider = HashCorrelationProvider() >>> correlation_id = provider.get_correlation_id(image_data)
- get_correlation_id(input_data)[source]
Generate a correlation ID by hashing the input data.
The input data is converted to bytes using the following rules: - If input is bytes, use directly - If input is str, encode as UTF-8 - Otherwise, convert to string and encode as UTF-8
Args:¶
input_data: Data to hash for correlation ID generation.
Returns:¶
A hex string of the SHA-256 hash of the input data.
Raises:¶
ValueError: If the input data cannot be converted to bytes.
Utility Functions¶
- process_classification_outputs(response, *, raise_on_error=False, log_errors=True)[source]¶
Process classification outputs from a response, handling errors properly.
Args:¶
response: The ClassifyResponse containing outputs to process raise_on_error: If True, raises ClassificationOutputError when an output
contains an error. If False, logs the error and skips the output.
log_errors: If True, logs error information for failed outputs
Returns:¶
List of successful ClassificationOutput objects (excludes outputs with errors when raise_on_error=False)
Raises:¶
- ClassificationOutputError: When raise_on_error=True and an output
contains an error
- has_output_errors(response)[source]¶
Check if any outputs in the response contain errors.
Args:¶
response: The ClassifyResponse to check
Returns:¶
True if any output contains an error, False otherwise
- Parameters:
response (ClassifyResponse)
- Return type:
- get_output_error_summary(response)[source]¶
Get a summary of error types in the response outputs.
Args:¶
response: The ClassifyResponse to analyze
Returns:¶
Dictionary mapping error code names to their counts
Utility functions for processing classification results and handling errors.
- Example:
>>> if has_output_errors(result): ... error_summary = get_output_error_summary(result) ... logger.warning(f"Errors: {error_summary}") >>> >>> successful_outputs = process_classification_outputs( ... result, raise_on_error=False, log_errors=True ... )
Constants¶
Common constants used throughout the client library.
- EXPECTED_WIDTH: Final[int] = 448¶
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating-point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by ‘+’ or ‘-’ and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int(‘0b100’, base=0) 4
- EXPECTED_HEIGHT: Final[int] = 448¶
int([x]) -> integer int(x, base=10) -> integer
Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating-point numbers, this truncates towards zero.
If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by ‘+’ or ‘-’ and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int(‘0b100’, base=0) 4
Expected image dimensions for optimal classification performance.
Usage Examples¶
Basic Classification¶
import asyncio
from resolver_athena_client.client.athena_client import AthenaClient
from resolver_athena_client.client.athena_options import AthenaOptions
from resolver_athena_client.client.channel import create_channel
async def main():
# Create channel with static token
channel = create_channel(
host="your-host",
auth_token="your-token"
)
# Configure client options (see api/options for details)
options = AthenaOptions(
host="your-host",
deployment_id="your-deployment-id",
resize_images=True,
compress_images=True
)
# Use client for classification
async with AthenaClient(channel, options) as client:
results = client.classify_images(image_iterator)
async for result in results:
for output in result.outputs:
# Manually map classifications, as the generated grpc
# implementation for __str__ will ignore weights of 0.0,
# which are common, especially for binary classifications
# such as hash checks
classifications = {
c.label: c.weight
for c in output.classifications
}
print(f"Classifications: {classifications}")
asyncio.run(main())
OAuth Authentication¶
import asyncio
import os
from resolver_athena_client.client.athena_client import AthenaClient
from resolver_athena_client.client.athena_options import AthenaOptions
from resolver_athena_client.client.channel import (
CredentialHelper,
create_channel_with_credentials
)
async def main():
# Create OAuth credential helper
credential_helper = CredentialHelper(
client_id=os.getenv("OAUTH_CLIENT_ID"),
client_secret=os.getenv("OAUTH_CLIENT_SECRET")
)
# Create authenticated channel
channel = await create_channel_with_credentials(
host=os.getenv("ATHENA_HOST"),
credential_helper=credential_helper
)
options = AthenaOptions(
host=os.getenv("ATHENA_HOST"),
deployment_id="your-deployment-id",
resize_images=True,
compress_images=True
)
async with AthenaClient(channel, options) as client:
# Your classification logic here
pass
asyncio.run(main())
Error Handling¶
from resolver_athena_client.client.exceptions import (
AthenaClientError,
AuthenticationError,
ConnectionError
)
from resolver_athena_client.client.utils import (
process_classification_outputs,
has_output_errors
)
try:
async with AthenaClient(channel, options) as client:
results = client.classify_images(image_iterator)
async for result in results:
# Check for errors in the result
if has_output_errors(result):
logger.warning("Some outputs had errors")
# Process successful outputs
successful_outputs = process_classification_outputs(
result,
raise_on_error=False,
log_errors=True
)
for output in successful_outputs:
# Handle successful classification
pass
except AuthenticationError as e:
logger.error(f"Authentication failed: {e}")
except ConnectionError as e:
logger.error(f"Connection failed: {e}")
except AthenaClientError as e:
logger.error(f"Client error: {e}")
Performance Optimization¶
# Configure for high throughput (see api/options for all options)
options = AthenaOptions(
host="your-host",
deployment_id="your-deployment-id",
resize_images=True,
compress_images=True,
timeout=300.0, # 5 minute timeout
keepalive_interval=60.0, # 1 minute keepalive
)
async with AthenaClient(channel, options) as client:
# Use streaming for large batches
results = client.classify_images(large_image_iterator)
async for batch_result in results:
# Process in batches for better performance
await process_batch(batch_result.outputs)
Notes¶
Always use the client as an async context manager to ensure proper cleanup
The client handles connection management and retries automatically
Use correlation IDs for request tracing in distributed systems
Enable compression for bandwidth-constrained environments
Configure appropriate timeouts for your use case
See Configuration Options for detailed configuration options
Carefully handle any logging of classification results, as decribed in the basic classification example above, to avoid losing important information