ProofPudding Documentation

ProofPudding provides two ways to integrate document processing and AI-powered question answering into your applications: a REST API and a Python SDK.

API

The REST API allows you to upload documents () and ask questions about their content. Our agent will analyze the document and provide accurate answers with citations and optional structured output.

API Version

0.1.0

Base URL

https://api.proofpudding.ai

Authentication

All endpoints except health checks require a Bearer token in the Authorization header.

  • Revoked keys are rejected.
  • On auth failure the API returns 401 Unauthorized.

Header Format

Authorization: Bearer <api_key>

API Key Format

  • API keys have a prefix format: pk_xxxxxxxx...
  • Each API key is associated with your team
  • You can generate and manage API keys from your dashboard

Authentication Errors

Status CodeErrorDescription
401Invalid authorization header formatMissing `Bearer ` prefix
401API key is requiredEmpty API key after Bearer
401Invalid API keyKey not found or revoked

API Endpoints

Health check endpoints have no prefix. All document and job endpoints are versioned under /api/v1.

Health Endpoints

Health check endpoints do not require authentication and have no URL prefix.

GET/health

Basic health check.

Authentication: None

Response 200 OK

{
  "status": "healthy",
  "version": "0.1.0",
  "environment": "prod"
}

Response Fields

FieldTypeDescription
statusstring"healthy"
versionstringCurrent API version
environmentstring"dev" or "prod"

GET/health/ready

Readiness check. Verifies database connection.

Authentication: None

Response 200 OK

{
  "status": "ready",
  "database": "connected"
}

Response Fields

FieldTypeDescription
statusstring"ready" or "not_ready"
databasestring"connected" or "unavailable"

Documents Endpoints

All document endpoints require authentication and operate within the scope of the authenticated team.

POST/api/v1/documents

Upload a document . Send as multipart/form-data with a "file" field.

Authentication: Required

Content-Type: multipart/form-data

Request Body

FieldTypeRequiredDescription
filefile (binary)YesDocument file to upload

Accepted File Types

    Constraints

    • Maximum file size: 100 MB
    • File must not be empty

    Response 200 OK

    {
      "id": "550e8400-e29b-41d4-a716-446655440000",
      "team_id": "123e4567-e89b-12d3-a456-426614174000",
      "filename": "quarterly-report.pdf",
      "size_bytes": 2048576,
      "created_at": "2026-02-02T10:30:00.000Z"
    }

    Error Responses

    StatusDetailCause
    400Filename is missingNo filename provided
    400File type not allowedUnsupported file type uploaded
    400File is emptyZero-byte file
    400File too large. Maximum size is 100MBExceeds size limit
    500Failed to upload file to storageUpload failed

    GET/api/v1/documents

    List all documents for the authenticated team with pagination.

    Query Parameters

    ParameterTypeDefaultDescription
    skipinteger0Number of records to skip (offset)
    limitinteger20Maximum number of records to return

    Response 200 OK

    {
      "items": [
        {
          "id": "550e8400-e29b-41d4-a716-446655440000",
          "team_id": "123e4567-e89b-12d3-a456-426614174000",
          "filename": "quarterly-report.pdf",
          "size_bytes": 2048576,
          "created_at": "2026-02-02T10:30:00.000Z"
        }
      ],
      "total": 42,
      "skip": 0,
      "limit": 20
    }

    DELETE/api/v1/documents/{document_id}

    Delete a document by its UUID. Associated jobs are cascade-deleted.

    Path Parameters

    ParameterTypeDescription
    document_idstring (UUID)UUID of the document to delete

    Response 200 OK (DocumentResponse)

    {
      "id": "550e8400-e29b-41d4-a716-446655440000",
      "team_id": "123e4567-e89b-12d3-a456-426614174000",
      "filename": "quarterly-report.pdf",
      "size_bytes": 2048576,
      "created_at": "2026-02-02T10:30:00.000Z"
    }

    Error Responses

    StatusDetailCause
    404Document not foundDocument not found or does not belong to the team
    500Failed to delete documentServer error during deletion

    Jobs Endpoints

    Jobs represent document processing tasks. When a job is created, it synchronously processes the document and returns results.

    POST/api/v1/jobs

    Create a document processing job. Blocks until processing is complete, then returns the result. The connection stays open while the document is processed.

    Request Body application/json (JobCreate)

    {
      "document_id": "550e8400-e29b-41d4-a716-446655440000",
      "question": "What were the total revenues in Q4?",
      "config": {
        "verify_citations": true,
        "reasoning_effort": "auto",
        "output_schema": {
          "schema": {
            "type": "object",
            "properties": {
              "revenue": { "type": "number" },
              "quarter": { "type": "string" }
            },
            "required": ["revenue", "quarter"]
          },
          "strict": true,
          "include_citations": true,
          "include_raw_answer": false
        }
      }
    }

    Request Fields

    FieldTypeRequiredDescription
    document_idstring (UUID)YesDocument to process. Must belong to authenticated team.
    questionstringYesQuestion to ask about the document
    configJobConfigNoProcessing configuration (see Data Models)

    Response 200 OK (JobResponse)

    {
      "id": "770e8400-e29b-41d4-a716-446655440002",
      "document_id": "550e8400-e29b-41d4-a716-446655440000",
      "question": "What were the total revenues in Q4?",
      "success": true,
      "result": {
        "answer": "The total revenues in Q4 were $12.5 million...",
        "confidence": "high",
        "citations": [
          {
            "page": 3,
            "quote": "Q4 revenues reached $12.5M, up 15% QoQ",
            "type": "text"
          }
        ],
        "structured_output": {
          "revenue": 12500000,
          "quarter": "Q4"
        }
      },
      "error": null,
      "processing_time_ms": 4523,
      "usage": {
        "total_cost_cents": 5.2,
        "llm_cost_cents": 4.0,
        "fixed_fee_cents": 1.2
      },
      "created_at": "2026-02-02T10:35:00.000Z"
    }

    Error Responses

    StatusDetailCause
    404Document not foundDocument not found or does not belong to the team
    500Failed to save job resultSaving the result failed
    502Failed to process documentProcessing service fails or is unreachable
    504Processing timed outProcessing timeout

    POST/api/v1/jobs/stream

    Create a document processing job with streaming progress updates. Request body is a JobCreate object. Returns a text/event-stream (Server-Sent Events) response. Each event is a JSON object on a data: line.

    Request Body application/json (JobCreate)

    Same as POST /api/v1/jobs: document_id, question, and optional config.

    Response: text/event-stream (SSE)

    SSE Event Types

    EventFieldsDescription
    downloadingprogress (int)Document download progress
    processingprogress (int), pages_done (int), total_pages (int)Progress updates as pages are processed
    thinkingiteration (int)Agent reasoning iterations
    tool_exec / stepmessage (string)Tool execution messages
    verifying(none)Citation verification in progress
    completeresult (object)Final result including answer, citations, and structured_output if output_schema was provided
    errorerror_code (string), result (object)Error occurred during processing

    Example Stream

    data: {"event": "downloading", "progress": 50}
    
    data: {"event": "processing", "pages_done": 1, "total_pages": 5}
    
    data: {"event": "processing", "pages_done": 3, "total_pages": 5}
    
    data: {"event": "thinking", "iteration": 1}
    
    data: {"event": "step", "message": "Searching for revenue data..."}
    
    data: {"event": "verifying"}
    
    data: {"event": "complete", "result": {"answer": "...", "citations": [...], "structured_output": {...}}}

    Error Responses

    StatusDetailCause
    404Document not foundDocument not found or does not belong to the team

    GET/api/v1/jobs

    List past jobs for the authenticated team. Returns a JobList object.

    Query Parameters

    ParameterTypeDefaultDescription
    document_idstring (UUID)nullOptional. Filter jobs by document UUID
    skipinteger0Number of records to skip (offset)
    limitinteger20Maximum number of records to return

    Data Models

    DocumentResponse

    interface DocumentResponse {
      id: string;           // Document UUID
      team_id: string;      // Team UUID that owns this document
      filename: string;     // Original filename
      size_bytes: number;   // File size in bytes
      created_at: string;   // ISO 8601 timestamp
    }

    DocumentList

    interface DocumentList {
      items: DocumentResponse[];  // List of documents
      total: number;              // Total count
      skip: number;               // Offset
      limit: number;              // Page size
    }

    JobCreate

    interface JobCreate {
      document_id: string;         // UUID of the document to process (required)
      question: string;            // Question to ask about the document (required)
      config?: JobConfig | null;   // Optional processing configuration (default: null)
    }

    JobConfig

    interface JobConfig {
      verify_citations?: boolean;              // Default: true. Whether to verify citations.
      output_schema?: OutputSchemaConfig | null; // Structured output schema configuration
      reasoning_effort?: "auto" | "low" | "high"; // Default: "auto".
                                               // "auto": AI decides analysis depth
                                               // "low": fast, for simple questions
                                               // "high": thorough, for complex analysis
    }

    OutputSchemaConfig

    interface OutputSchemaConfig {
      schema: object;              // JSON Schema (draft-07) defining the output structure (required)
      strict?: boolean;            // Default: true. If true, fail if schema cannot be satisfied;
                                   // if false, return partial data.
      include_citations?: boolean; // Default: true. Include citations array in response.
      include_raw_answer?: boolean; // Default: false. Include raw text answer for debugging.
    }

    Citation

    interface Citation {
      page: number;         // Page number
      quote: string;        // Quoted text from the document
      type: string;         // Default: "text". Citation type.
    }

    JobResult

    interface JobResult {
      answer: string;                          // Answer text
      confidence: "high" | "medium" | "low" | "not_found";
      citations: Citation[];                   // Default: []. Supporting citations.
      structured_output?: object | null;       // Default: null. Conforms to the provided
                                               // output_schema if one was given.
    }

    UsageInfo

    interface UsageInfo {
      total_cost_cents: number;  // Total cost in cents.
      llm_cost_cents: number;    // LLM token cost in cents.
      fixed_fee_cents: number;   // Per-call fixed fee in cents.
    }

    JobResponse

    interface JobResponse {
      id: string;                        // Job UUID
      document_id: string;               // Document UUID
      question: string;                  // Question asked
      success: boolean;                  // Whether processing succeeded
      result: JobResult | null;          // Present if success=true
      error: string | null;              // Error message if failed
      processing_time_ms: number;        // Processing time in milliseconds
      usage: UsageInfo | null;           // Cost information
      created_at: string;                // ISO 8601 timestamp
    }

    JobList

    interface JobList {
      items: JobResponse[];   // List of jobs
      total: number;          // Total count
      skip: number;           // Offset
      limit: number;          // Page size
    }

    SSE Stream Event Types

    // Base event
    interface StreamEvent {
      event: string;
      message?: string;
    }
    
    // Download progress
    interface DownloadingEvent extends StreamEvent {
      event: "downloading";
      progress?: number;     // 0-100
    }
    
    // Processing progress
    interface ProcessingEvent extends StreamEvent {
      event: "processing";
      progress?: number;     // 0-100
      pages_done?: number;
      total_pages?: number;
    }
    
    // Agent reasoning
    interface ThinkingEvent extends StreamEvent {
      event: "thinking";
      iteration?: number;
    }
    
    // Tool execution (also sent as "tool_exec")
    interface StepEvent extends StreamEvent {
      event: "step";
    }
    
    // Citation verification
    interface VerifyingEvent extends StreamEvent {
      event: "verifying";
    }
    
    // Final result
    interface CompleteEvent extends StreamEvent {
      event: "complete";
      result: object;        // Full processing result
    }
    
    // Error
    interface ErrorEvent extends StreamEvent {
      event: "error";
      error_code?: string;   // Machine-readable error code
      result?: object;       // Error result details
    }

    Common Patterns

    Pagination

    All list endpoints follow the same pagination pattern:

    • Use skip for offset-based pagination
    • Use limit to control page size
    • Response includes total for calculating total pages

    Example: Fetching page 3 with 10 items per page

    GET /api/v1/documents?skip=20&limit=10

    Error Response Format

    All error responses follow this structure:

    {
      "detail": "Error message describing what went wrong"
    }

    UUID Format

    All IDs use UUID v4 format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx

    Timestamps

    All timestamps are returned in ISO 8601 format with timezone: 2026-02-02T10:30:00.000Z

    SDK (Python)

    The Pudding Python SDK (proofpudding) is the recommended way to integrate with the Pudding API. It provides both synchronous and asynchronous clients with automatic retries, streaming support, and fully typed data models.

    Package

    proofpudding

    Version

    0.1.6

    Status

    Production/Stable

    Installation

    pip install proofpudding

    Requirements

    RequirementDetails
    Python3.10, 3.11, 3.12, 3.13
    httpx>= 0.25.0
    pydantic>= 2.0

    Quick Start

    Upload a document and ask a question in just a few lines:

    from pudding import PuddingClient
    
    with PuddingClient(access_token="pk_your_api_key") as client:
        # Upload a PDF
        doc = client.documents.upload(file_path="report.pdf")
        print(f"Uploaded: {doc.filename} ({doc.size_bytes} bytes)")
    
        # Ask a question
        job = client.jobs.create(
            document_id=doc.id,
            question="What were the key findings?"
        )
    
        if job.success:
            print(f"Answer: {job.result.answer}")
            print(f"Confidence: {job.result.confidence}")
            for citation in job.result.citations:
                print(f"  Page {citation.page}: {citation.quote}")
        else:
            print(f"Error: {job.error}")

    The same example using the async client:

    import asyncio
    from pudding import AsyncPuddingClient
    
    async def main():
        async with AsyncPuddingClient(access_token="pk_your_api_key") as client:
            doc = await client.documents.upload(file_path="report.pdf")
            job = await client.jobs.create(
                document_id=doc.id,
                question="What were the key findings?"
            )
            if job.success:
                print(job.result.answer)
    
    asyncio.run(main())

    Authentication

    All API calls (except health checks) require an API key. Pass your key via the access_token parameter when creating a client. API keys are prefixed with pk_.

    from pudding import PuddingClient
    
    client = PuddingClient(access_token="pk_your_api_key")
    
    # The SDK sends the key as a Bearer token automatically:
    # Authorization: Bearer pk_your_api_key

    If access_token is empty or missing, a ValueError is raised immediately at client construction.

    Client Configuration

    Constructor Parameters

    ParameterTypeDefaultDescription
    access_tokenstr(required)API key for authentication (pk_... prefix)
    timeoutfloat1800.0Default request timeout in seconds (30 min for long processing jobs)
    max_retriesint3Max retries for transient errors (up to 4 total attempts)

    Context Managers

    Always use a context manager or call close explicitly to release HTTP connections.

    # Recommended: context manager
    with PuddingClient(access_token="pk_...") as client:
        ...  # client is automatically closed
    
    # Or close manually
    client = PuddingClient(access_token="pk_...")
    try:
        ...
    finally:
        client.close()
    
    # Async equivalent
    async with AsyncPuddingClient(access_token="pk_...") as client:
        ...
    # or: await client.aclose()

    Retry Behavior

    The SDK automatically retries on transient failures. Client errors (4xx) are never retried.

    RetriedNot Retried
    5xx server errors400 ValidationError
    Network errors (connection reset, DNS)401 AuthenticationError
    Timeout errors404 NotFoundError
    429 RateLimitError

    API Reference

    Both PuddingClient and AsyncPuddingClient expose the same API surface through three resource properties: client.health, client.documents, and client.jobs.

    Health (client.health)

    check() → HealthResponse

    Basic health check. No authentication required.

    health = client.health.check()
    print(health.status)       # "healthy"
    print(health.version)      # "0.1.0"
    print(health.environment)  # "production"

    ready() → ReadinessResponse

    Readiness check including database connectivity. No authentication required.

    readiness = client.health.ready()
    print(readiness.status)    # "ready" or "not_ready"
    print(readiness.database)  # "connected" or error message

    Documents (client.documents)

    upload(*, file_path, file, filename) → DocumentResponse

    Upload a document (). Provide either file_path or both file and filename.

    ParameterTypeDescription
    file_pathstr | Path | NonePath to a local file
    filebytes | NoneRaw file bytes (must also provide filename)
    filenamestr | NoneFilename when using bytes

    Constraints: only, max 100 MB, must not be empty.

    Raises: ValueError for invalid arguments, ValidationError for invalid files.

    # From file path
    doc = client.documents.upload(file_path="report.pdf")
    
    # From bytes
    with open("report.pdf", "rb") as f:
        doc = client.documents.upload(file=f.read(), filename="report.pdf")
    
    print(f"{doc.id} - {doc.filename} ({doc.size_bytes} bytes)")

    list(*, skip, limit) → DocumentList

    List documents for the authenticated team with pagination.

    ParameterTypeDefaultDescription
    skipint0Pagination offset
    limitint20Page size
    docs = client.documents.list(skip=0, limit=10)
    print(f"Showing {len(docs.items)} of {docs.total} documents")
    for doc in docs.items:
        print(f"  {doc.filename} ({doc.size_bytes} bytes)")

    delete(document_id) → DocumentResponse

    Delete a document and all associated jobs.

    ParameterTypeDescription
    document_idstrUUID of the document to delete

    Raises: NotFoundError if document not found or belongs to another team.

    deleted = client.documents.delete("550e8400-e29b-41d4-a716-446655440000")
    print(f"Deleted: {deleted.filename}")

    Jobs (client.jobs)

    create(*, document_id, question, config, timeout) → JobResponse

    Blocking call that waits for document processing to complete and returns the result.

    💡 Tip: Consider using create_stream() instead. Document processing can take time, and the streaming endpoint lets you show real-time progress (page processing, reasoning steps, citation verification) to your users rather than waiting silently for a response.

    ParameterTypeDefaultDescription
    document_idstr(required)UUID of the document to process
    questionstr(required)Question to ask about the document
    configJobConfig | NoneNoneProcessing configuration (citations, structured output)
    timeoutfloat | NoneNoneTimeout override in seconds (falls back to client timeout)

    Raises: NotFoundError, GatewayError, TimeoutError

    job = client.jobs.create(
        document_id=doc.id,
        question="What is the total revenue for Q4 2025?",
        timeout=300  # 5 minute timeout override
    )
    
    if job.success:
        print(f"Answer: {job.result.answer}")
        print(f"Confidence: {job.result.confidence}")
        print(f"Processing time: {job.processing_time_ms}ms")
    else:
        print(f"Failed: {job.error}")

    create_stream(*, document_id, question, config, timeout) → Iterator[StreamEvent]

    Stream processing progress via Server-Sent Events. Returns an iterator of typed StreamEvent objects. Same parameters as create(). Async client returns AsyncIterator[StreamEvent].

    Raises: NotFoundError, AuthenticationError, PuddingError

    from pudding import (
        DownloadingEvent, ProcessingEvent, ThinkingEvent,
        StepEvent, VerifyingEvent, CompleteEvent, ErrorEvent
    )
    
    for event in client.jobs.create_stream(
        document_id=doc.id,
        question="Summarize the findings"
    ):
        if isinstance(event, DownloadingEvent):
            print(f"Downloading: {event.progress}%")
        elif isinstance(event, ProcessingEvent):
            print(f"Processing: {event.pages_done}/{event.total_pages} pages")
        elif isinstance(event, ThinkingEvent):
            print(f"Thinking... (iteration {event.iteration})")
        elif isinstance(event, StepEvent):
            print(f"Step: {event.message}")
        elif isinstance(event, VerifyingEvent):
            print("Verifying citations...")
        elif isinstance(event, CompleteEvent):
            print(f"Done! Result: {event.result}")
        elif isinstance(event, ErrorEvent):
            print(f"Error: {event.error_code} - {event.message}")

    list(*, document_id, skip, limit) → JobList

    List past jobs with optional filtering by document.

    ParameterTypeDefaultDescription
    document_idstr | NoneNoneFilter jobs for a specific document
    skipint0Pagination offset
    limitint20Page size
    # All jobs
    all_jobs = client.jobs.list()
    
    # Jobs for a specific document
    doc_jobs = client.jobs.list(document_id=doc.id, limit=50)
    for job in doc_jobs.items:
        print(f"Q: {job.question} → {job.result.confidence}")

    Data Models

    All models are Pydantic BaseModel subclasses. Import from pudding.models or directly from pudding.

    DocumentResponse

    class DocumentResponse(BaseModel):
        id: str                # Document UUID
        team_id: str           # Team UUID
        filename: str          # Original filename
        size_bytes: int        # File size in bytes
        created_at: datetime   # Creation timestamp

    DocumentList

    class DocumentList(BaseModel):
        items: list[DocumentResponse] = []
        total: int
        skip: int
        limit: int

    HealthResponse / ReadinessResponse

    class HealthResponse(BaseModel):
        status: str       # "healthy"
        version: str      # API version
        environment: str  # "production", "staging", etc.
    
    class ReadinessResponse(BaseModel):
        status: str       # "ready" or "not_ready"
        database: str     # "connected" or error message

    JobConfig

    class JobConfig(BaseModel):
        verify_citations: bool = True                    # Whether to verify citations
        reasoning_effort: str = "auto"                    # "auto", "low", or "high"
        output_schema: OutputSchemaConfig | None = None   # Structured output config

    OutputSchemaConfig

    class OutputSchemaConfig(BaseModel):
        schema_: dict[str, Any]  # JSON Schema (draft-07). Alias: "schema"
        strict: bool = True              # Fail if schema can't be satisfied
        include_citations: bool = True   # Include citations in response
        include_raw_answer: bool = False # Include raw text answer (debugging)
    
        # Both work in Python:
        # OutputSchemaConfig(schema={"type": "object", ...})
        # OutputSchemaConfig(schema_={"type": "object", ...})

    Citation

    class Citation(BaseModel):
        page: int          # Page number
        quote: str         # Quoted text from document
        type: str = "text" # Citation type

    JobResult

    class JobResult(BaseModel):
        answer: str                                # Answer text
        confidence: str                            # "high", "medium", "low", "not_found"
        citations: list[Citation] = []             # Supporting citations
        structured_output: dict[str, Any] | None = None  # Present when output_schema was provided

    UsageInfo

    class UsageInfo(BaseModel):
        total_cost_cents: float   # Total cost in cents
        llm_cost_cents: float     # LLM token cost in cents
        fixed_fee_cents: float    # Per-call fixed fee in cents

    JobResponse

    class JobResponse(BaseModel):
        id: str                          # Job UUID
        document_id: str                 # Document UUID
        question: str                    # Question asked
        success: bool                    # Whether processing succeeded
        result: JobResult | None         # Present if success=True
        error: str | None                # Present if success=False
        processing_time_ms: int          # Processing time in milliseconds
        usage: UsageInfo | None          # Cost information
        created_at: datetime             # Creation timestamp

    JobList

    class JobList(BaseModel):
        items: list[JobResponse] = []
        total: int
        skip: int
        limit: int

    Stream Event Types

    # Base class for all stream events
    class StreamEvent(BaseModel):
        event: str            # Event type
        message: str = ""     # Event message
    
    class DownloadingEvent(StreamEvent):
        event: str = "downloading"
        progress: int | None = None   # Download progress 0-100
    
    class ProcessingEvent(StreamEvent):
        event: str = "processing"
        progress: int | None = None   # Processing progress 0-100
        pages_done: int | None = None # Pages processed so far
        total_pages: int | None = None # Total pages in document
    
    class ThinkingEvent(StreamEvent):
        event: str = "thinking"
        iteration: int | None = None  # Reasoning iteration number
    
    class StepEvent(StreamEvent):
        event: str = "step"           # Also mapped from "tool_exec" SSE events
    
    class VerifyingEvent(StreamEvent):
        event: str = "verifying"
    
    class CompleteEvent(StreamEvent):
        event: str = "complete"
        result: dict[str, Any]        # Full processing result
    
    class ErrorEvent(StreamEvent):
        event: str = "error"
        error_code: str | None = None      # Machine-readable error code
        result: dict[str, Any] | None = None  # Error result details

    Streaming (SSE)

    Use client.jobs.create_stream() to receive real-time progress updates as your document is processed. Events are streamed as Server-Sent Events and automatically parsed into typed Python objects.

    Event Flow

    EventWhenKey Fields
    DownloadingEventDocument is being downloadedprogress
    ProcessingEventPages are being readprogress, pages_done, total_pages
    ThinkingEventAgent is reasoningiteration
    StepEventA tool/action is executedmessage
    VerifyingEventCitations are being verified(inherited from StreamEvent)
    CompleteEventProcessing finished successfullyresult (dict)
    ErrorEventAn error occurrederror_code, result

    Sync streaming

    from pudding import PuddingClient, ProcessingEvent, CompleteEvent
    
    with PuddingClient(access_token="pk_...") as client:
        for event in client.jobs.create_stream(
            document_id="...",
            question="Extract all financial data"
        ):
            if isinstance(event, ProcessingEvent):
                pct = event.pages_done / event.total_pages * 100
                print(f"Progress: {pct:.0f}%")
            elif isinstance(event, CompleteEvent):
                print("Result:", event.result)

    Async streaming

    from pudding import AsyncPuddingClient, ProcessingEvent, CompleteEvent
    
    async with AsyncPuddingClient(access_token="pk_...") as client:
        async for event in client.jobs.create_stream(
            document_id="...",
            question="Extract all financial data"
        ):
            if isinstance(event, ProcessingEvent):
                pct = event.pages_done / event.total_pages * 100
                print(f"Progress: {pct:.0f}%")
            elif isinstance(event, CompleteEvent):
                print("Result:", event.result)

    Structured Output

    Use JobConfig with an OutputSchemaConfig to get responses that conform to a JSON Schema (draft-07) you define. The result appears in job.result.structured_output.

    from pudding import PuddingClient, JobConfig, OutputSchemaConfig
    
    with PuddingClient(access_token="pk_...") as client:
        config = JobConfig(
            output_schema=OutputSchemaConfig(
                schema={
                    "type": "object",
                    "properties": {
                        "company_name": {"type": "string"},
                        "revenue": {"type": "number"},
                        "quarter": {"type": "string"},
                        "year": {"type": "integer"}
                    },
                    "required": ["company_name", "revenue", "quarter", "year"]
                },
                strict=True,  # Fail if schema cannot be satisfied
                include_citations=True,
                include_raw_answer=False
            )
        )
    
        job = client.jobs.create(
            document_id=doc.id,
            question="What was the company's quarterly revenue?",
            config=config
        )
    
        if job.success:
            data = job.result.structured_output
            print(f"{data['company_name']}: ${data['revenue']}M in {data['quarter']} {data['year']}")
            # Citations are still available
            for c in job.result.citations:
                print(f"  Source: page {c.page}")

    When strict=True (default), the job will fail if the document doesn't contain enough information to satisfy the schema. Set strict=False to get partial data instead.

    Error Handling

    All exceptions inherit from PuddingError. Each exception has message (str) and status_code (int | None) attributes. Import from pudding.exceptions or directly from pudding.

    Exception Hierarchy

    ExceptionHTTP StatusWhen
    PuddingError(base class)Unmapped status codes, network errors
    AuthenticationError401Invalid, missing, or revoked API key
    ValidationError400Invalid file type, empty file, oversized file, bad fields
    NotFoundError404Document/job not found or belongs to another team
    RateLimitError429Too many requests
    ServerError500Internal server error
    GatewayError502Processing service unavailable
    TimeoutError504Processing or request timed out

    Catching errors

    from pudding import (
        PuddingClient,
        PuddingError,
        AuthenticationError,
        NotFoundError,
        ValidationError,
        TimeoutError,
        GatewayError,
    )
    
    with PuddingClient(access_token="pk_...") as client:
        try:
            job = client.jobs.create(
                document_id="...",
                question="Summarize the report"
            )
        except AuthenticationError:
            print("Invalid API key")
        except NotFoundError:
            print("Document not found")
        except TimeoutError:
            print("Processing timed out — try again or use streaming")
        except GatewayError:
            print("Processing service is temporarily unavailable")
        except PuddingError as e:
            # Catch-all for any other SDK error
            print(f"[{e.status_code}] {e.message}")

    Logging

    The SDK uses Python's standard logging module under the logger name "pudding". It logs request attempts, retries, errors, and client lifecycle events at DEBUG and WARNING levels.

    import logging
    
    # Enable debug logging for the SDK
    logging.basicConfig()
    logging.getLogger("pudding").setLevel(logging.DEBUG)

    Examples

    Upload and Query a Document

    from pudding import PuddingClient
    
    with PuddingClient(access_token="pk_...") as client:
        # Upload
        doc = client.documents.upload(file_path="contract.pdf")
        print(f"Uploaded {doc.filename} (id: {doc.id})")
    
        # Query
        job = client.jobs.create(
            document_id=doc.id,
            question="What is the termination clause?"
        )
    
        if job.success:
            print(f"\nAnswer ({job.result.confidence} confidence):")
            print(job.result.answer)
            print(f"\nCitations:")
            for c in job.result.citations:
                print(f"  p.{c.page}: \"{c.quote}\"")

    Paginate Through All Documents

    from pudding import PuddingClient
    
    with PuddingClient(access_token="pk_...") as client:
        skip = 0
        limit = 50
        while True:
            page = client.documents.list(skip=skip, limit=limit)
            for doc in page.items:
                print(f"{doc.filename} - {doc.created_at}")
            if skip + limit >= page.total:
                break
            skip += limit

    Stream Processing Progress

    from pudding import (
        PuddingClient,
        DownloadingEvent, ProcessingEvent, ThinkingEvent,
        StepEvent, VerifyingEvent, CompleteEvent, ErrorEvent
    )
    
    with PuddingClient(access_token="pk_...") as client:
        doc = client.documents.upload(file_path="large-report.pdf")
    
        for event in client.jobs.create_stream(
            document_id=doc.id,
            question="List all action items mentioned in the report"
        ):
            match event:
                case DownloadingEvent():
                    print(f"⬇️  Downloading: {event.progress}%")
                case ProcessingEvent():
                    print(f"📄 Reading pages: {event.pages_done}/{event.total_pages}")
                case ThinkingEvent():
                    print(f"🤔 Reasoning (iteration {event.iteration})...")
                case StepEvent():
                    print(f"⚙️  {event.message}")
                case VerifyingEvent():
                    print(f"🔍 Verifying citations...")
                case CompleteEvent():
                    print(f"✅ Done!")
                    print(event.result)
                case ErrorEvent():
                    print(f"❌ {event.error_code}: {event.message}")

    Extract Structured Data

    from pudding import PuddingClient, JobConfig, OutputSchemaConfig
    
    with PuddingClient(access_token="pk_...") as client:
        doc = client.documents.upload(file_path="invoice.pdf")
    
        job = client.jobs.create(
            document_id=doc.id,
            question="Extract the invoice details",
            config=JobConfig(
                output_schema=OutputSchemaConfig(
                    schema={
                        "type": "object",
                        "properties": {
                            "invoice_number": {"type": "string"},
                            "date": {"type": "string", "format": "date"},
                            "total_amount": {"type": "number"},
                            "currency": {"type": "string"},
                            "line_items": {
                                "type": "array",
                                "items": {
                                    "type": "object",
                                    "properties": {
                                        "description": {"type": "string"},
                                        "quantity": {"type": "integer"},
                                        "unit_price": {"type": "number"}
                                    }
                                }
                            }
                        },
                        "required": ["invoice_number", "total_amount", "currency"]
                    }
                )
            )
        )
    
        if job.success:
            inv = job.result.structured_output
            print(f"Invoice #{inv['invoice_number']}")
            print(f"Total: {inv['currency']} {inv['total_amount']}")
            for item in inv.get("line_items", []):
                print(f"  {item['description']}: {item['quantity']} x {item['unit_price']}")

    Robust Error Handling

    from pudding import (
        PuddingClient,
        AuthenticationError,
        ValidationError,
        NotFoundError,
        TimeoutError,
        GatewayError,
        PuddingError,
    )
    
    def process_document(api_key: str, file_path: str, question: str):
        with PuddingClient(access_token=api_key, max_retries=5) as client:
            # Upload with validation handling
            try:
                doc = client.documents.upload(file_path=file_path)
            except ValidationError as e:
                print(f"Invalid file: {e.message}")
                return
            except AuthenticationError:
                print("Check your API key")
                return
    
            # Process with retry-aware error handling
            try:
                job = client.jobs.create(
                    document_id=doc.id,
                    question=question,
                    timeout=600  # 10 minutes
                )
            except TimeoutError:
                print("Processing timed out — document may be too large")
                return
            except GatewayError:
                print("Processing service unavailable — please retry later")
                return
            except PuddingError as e:
                print(f"Unexpected error [{e.status_code}]: {e.message}")
                return
    
            if job.success:
                return job.result
            else:
                print(f"Job failed: {job.error}")
                return None

    Async Batch Processing

    import asyncio
    from pudding import AsyncPuddingClient
    
    async def ask_questions(doc_id: str, questions: list[str]):
        async with AsyncPuddingClient(access_token="pk_...") as client:
            tasks = [
                client.jobs.create(document_id=doc_id, question=q)
                for q in questions
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
    
            for question, result in zip(questions, results):
                if isinstance(result, Exception):
                    print(f"Q: {question} → Error: {result}")
                elif result.success:
                    print(f"Q: {question} → {result.result.answer[:80]}...")
    
    asyncio.run(ask_questions(
        doc_id="...",
        questions=[
            "What is the executive summary?",
            "What are the key risks?",
            "What is the financial outlook?",
        ]
    ))