Client Interface

The client module provides the main interface for interacting with Athena services, including the primary client class and authentication helpers.

AthenaClient

The main client class for image classification. Provides an async context manager interface for proper resource management and cleanup.

Example:
>>> async with AthenaClient(channel, options) as client:
...     results = client.classify_images(image_iterator)
...     async for result in results:
...         print(result.outputs)

Channel Management

async create_channel_with_credentials(host, credential_helper)[source]

Create a gRPC channel with OAuth credential helper.

Args:

host: The host address to connect to credential_helper: The credential helper for OAuth authentication

Returns:

A secure gRPC channel with OAuth authentication

Raises:

InvalidHostError: If host is empty OAuthError: If OAuth authentication fails

Parameters:
Return type:

Channel

class CredentialHelper(client_id, client_secret, auth_url='https://crispthinking.auth0.com/oauth/token', audience='crisp-athena-live')[source]

Bases: object

OAuth credential helper for managing authentication tokens.

OAuth credential helper for automatic token management and refresh.

Example:
>>> credential_helper = CredentialHelper(
...     client_id="your-client-id",
...     client_secret="your-client-secret"
... )
>>> channel = await create_channel_with_credentials(
...     host="your-host",
...     credential_helper=credential_helper
... )
Parameters:
  • client_id (str)

  • client_secret (str)

  • auth_url (str)

  • audience (str)

__init__(client_id, client_secret, auth_url='https://crispthinking.auth0.com/oauth/token', audience='crisp-athena-live')[source]

Initialize the credential helper.

Args:

client_id: OAuth client ID client_secret: OAuth client secret auth_url: OAuth token endpoint URL audience: OAuth audience

Parameters:
  • client_id (str)

  • client_secret (str)

  • auth_url (str)

  • audience (str)

Return type:

None

async get_token()[source]

Get a valid authentication token.

This method will return a cached token if it’s still valid, or fetch a new token if needed.

Return type:

A valid authentication token

Raises:
async invalidate_token()[source]

Invalidate the current token to force a refresh on next use.

Return type:

None

Deployment Selection

See Deployment Selector for deployment discovery and selection functionality.

Correlation and Tracing

class CorrelationProvider[source]

Bases: ABC

Abstract base class defining the contract for correlation ID generation.

This class serves as an interface for different strategies of generating correlation IDs. Implementations can use various methods such as hashing, UUIDs, or other approaches to generate unique identifiers.

abstractmethod get_correlation_id(input_data)[source]

Generate a correlation ID for the given input data.

Args:

input_data: Data to use as the basis for correlation ID generation.

The type and structure of this data depends on the specific implementation.

Returns:

A string containing the generated correlation ID.

Raises:

ValueError: If the input data is not in a format supported by

the implementation.

Parameters:

input_data (bytes | str | bytearray)

Return type:

str

class HashCorrelationProvider[source]

Bases: CorrelationProvider

Generates correlation IDs by hashing the input data.

This implementation uses SHA-256 to generate a deterministic hash of the input data’s bytes, which serves as the correlation ID.

Hash-based correlation ID provider for deterministic correlation IDs.

Example:
>>> provider = HashCorrelationProvider()
>>> correlation_id = provider.get_correlation_id(image_data)
get_correlation_id(input_data)[source]

Generate a correlation ID by hashing the input data.

The input data is converted to bytes using the following rules: - If input is bytes, use directly - If input is str, encode as UTF-8 - Otherwise, convert to string and encode as UTF-8

Args:

input_data: Data to hash for correlation ID generation.

Returns:

A hex string of the SHA-256 hash of the input data.

Raises:

ValueError: If the input data cannot be converted to bytes.

Parameters:

input_data (bytes | str | bytearray)

Return type:

str

Utility Functions

process_classification_outputs(response, *, raise_on_error=False, log_errors=True)[source]

Process classification outputs from a response, handling errors properly.

Args:

response: The ClassifyResponse containing outputs to process raise_on_error: If True, raises ClassificationOutputError when an output

contains an error. If False, logs the error and skips the output.

log_errors: If True, logs error information for failed outputs

Returns:

List of successful ClassificationOutput objects (excludes outputs with errors when raise_on_error=False)

Raises:

ClassificationOutputError: When raise_on_error=True and an output

contains an error

Parameters:
  • response (ClassifyResponse)

  • raise_on_error (bool)

  • log_errors (bool)

Return type:

list[ClassificationOutput]

has_output_errors(response)[source]

Check if any outputs in the response contain errors.

Args:

response: The ClassifyResponse to check

Returns:

True if any output contains an error, False otherwise

Parameters:

response (ClassifyResponse)

Return type:

bool

get_output_error_summary(response)[source]

Get a summary of error types in the response outputs.

Args:

response: The ClassifyResponse to analyze

Returns:

Dictionary mapping error code names to their counts

Utility functions for processing classification results and handling errors.

Example:
>>> if has_output_errors(result):
...     error_summary = get_output_error_summary(result)
...     logger.warning(f"Errors: {error_summary}")
>>>
>>> successful_outputs = process_classification_outputs(
...     result, raise_on_error=False, log_errors=True
... )
Parameters:

response (ClassifyResponse)

Return type:

dict[str, int]

Constants

Common constants used throughout the client library.

EXPECTED_WIDTH: Final[int] = 448

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating-point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by ‘+’ or ‘-’ and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int(‘0b100’, base=0) 4

EXPECTED_HEIGHT: Final[int] = 448

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating-point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by ‘+’ or ‘-’ and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int(‘0b100’, base=0) 4

Expected image dimensions for optimal classification performance.

Usage Examples

Basic Classification

import asyncio
from resolver_athena_client.client.athena_client import AthenaClient
from resolver_athena_client.client.athena_options import AthenaOptions
from resolver_athena_client.client.channel import create_channel

async def main():
    # Create channel with static token
    channel = create_channel(
        host="your-host",
        auth_token="your-token"
    )

    # Configure client options (see api/options for details)
    options = AthenaOptions(
        host="your-host",
        deployment_id="your-deployment-id",
        resize_images=True,
        compress_images=True
    )

    # Use client for classification
    async with AthenaClient(channel, options) as client:
        results = client.classify_images(image_iterator)

        async for result in results:
            for output in result.outputs:
                # Manually map classifications, as the generated grpc
                # implementation for __str__ will ignore weights of 0.0,
                # which are common, especially for binary classifications
                # such as hash checks
                classifications = {
                    c.label: c.weight
                    for c in output.classifications
                }
                print(f"Classifications: {classifications}")

asyncio.run(main())

OAuth Authentication

import asyncio
import os
from resolver_athena_client.client.athena_client import AthenaClient
from resolver_athena_client.client.athena_options import AthenaOptions
from resolver_athena_client.client.channel import (
    CredentialHelper,
    create_channel_with_credentials
)

async def main():
    # Create OAuth credential helper
    credential_helper = CredentialHelper(
        client_id=os.getenv("OAUTH_CLIENT_ID"),
        client_secret=os.getenv("OAUTH_CLIENT_SECRET")
    )

    # Create authenticated channel
    channel = await create_channel_with_credentials(
        host=os.getenv("ATHENA_HOST"),
        credential_helper=credential_helper
    )

    options = AthenaOptions(
        host=os.getenv("ATHENA_HOST"),
        deployment_id="your-deployment-id",
        resize_images=True,
        compress_images=True
    )

    async with AthenaClient(channel, options) as client:
        # Your classification logic here
        pass

asyncio.run(main())

Error Handling

from resolver_athena_client.client.exceptions import (
    AthenaClientError,
    AuthenticationError,
    ConnectionError
)
from resolver_athena_client.client.utils import (
    process_classification_outputs,
    has_output_errors
)

try:
    async with AthenaClient(channel, options) as client:
        results = client.classify_images(image_iterator)

        async for result in results:
            # Check for errors in the result
            if has_output_errors(result):
                logger.warning("Some outputs had errors")

            # Process successful outputs
            successful_outputs = process_classification_outputs(
                result,
                raise_on_error=False,
                log_errors=True
            )

            for output in successful_outputs:
                # Handle successful classification
                pass

except AuthenticationError as e:
    logger.error(f"Authentication failed: {e}")
except ConnectionError as e:
    logger.error(f"Connection failed: {e}")
except AthenaClientError as e:
    logger.error(f"Client error: {e}")

Performance Optimization

# Configure for high throughput (see api/options for all options)
options = AthenaOptions(
    host="your-host",
    deployment_id="your-deployment-id",
    resize_images=True,
    compress_images=True,
    timeout=300.0,  # 5 minute timeout
    keepalive_interval=60.0,  # 1 minute keepalive
)

async with AthenaClient(channel, options) as client:
    # Use streaming for large batches
    results = client.classify_images(large_image_iterator)

    async for batch_result in results:
        # Process in batches for better performance
        await process_batch(batch_result.outputs)

Notes

  • Always use the client as an async context manager to ensure proper cleanup

  • The client handles connection management and retries automatically

  • Use correlation IDs for request tracing in distributed systems

  • Enable compression for bandwidth-constrained environments

  • Configure appropriate timeouts for your use case

  • See Configuration Options for detailed configuration options

  • Carefully handle any logging of classification results, as decribed in the basic classification example above, to avoid losing important information