ProofPudding Documentation
ProofPudding provides two ways to integrate document processing and AI-powered question answering into your applications: a REST API and a Python SDK.
API
The REST API allows you to upload documents () and ask questions about their content. Our agent will analyze the document and provide accurate answers with citations and optional structured output.
API Version
0.1.0Base URL
https://api.proofpudding.aiAuthentication
All endpoints except health checks require a Bearer token in the Authorization header.
- Revoked keys are rejected.
- On auth failure the API returns
401 Unauthorized.
Header Format
Authorization: Bearer <api_key>API Key Format
- API keys have a prefix format:
pk_xxxxxxxx... - Each API key is associated with your team
- You can generate and manage API keys from your dashboard
Authentication Errors
| Status Code | Error | Description |
|---|---|---|
401 | Invalid authorization header format | Missing `Bearer ` prefix |
401 | API key is required | Empty API key after Bearer |
401 | Invalid API key | Key not found or revoked |
API Endpoints
Health check endpoints have no prefix. All document and job endpoints are versioned under /api/v1.
Health Endpoints
Health check endpoints do not require authentication and have no URL prefix.
GET/health
Basic health check.
Authentication: None
Response 200 OK
{
"status": "healthy",
"version": "0.1.0",
"environment": "prod"
}Response Fields
| Field | Type | Description |
|---|---|---|
status | string | "healthy" |
version | string | Current API version |
environment | string | "dev" or "prod" |
GET/health/ready
Readiness check. Verifies database connection.
Authentication: None
Response 200 OK
{
"status": "ready",
"database": "connected"
}Response Fields
| Field | Type | Description |
|---|---|---|
status | string | "ready" or "not_ready" |
database | string | "connected" or "unavailable" |
Documents Endpoints
All document endpoints require authentication and operate within the scope of the authenticated team.
POST/api/v1/documents
Upload a document . Send as multipart/form-data with a "file" field.
Authentication: Required
Content-Type: multipart/form-data
Request Body
| Field | Type | Required | Description |
|---|---|---|---|
file | file (binary) | Yes | Document file to upload |
Accepted File Types
Constraints
- Maximum file size: 100 MB
- File must not be empty
Response 200 OK
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"team_id": "123e4567-e89b-12d3-a456-426614174000",
"filename": "quarterly-report.pdf",
"size_bytes": 2048576,
"created_at": "2026-02-02T10:30:00.000Z"
}Error Responses
| Status | Detail | Cause |
|---|---|---|
400 | Filename is missing | No filename provided |
400 | File type not allowed | Unsupported file type uploaded |
400 | File is empty | Zero-byte file |
400 | File too large. Maximum size is 100MB | Exceeds size limit |
500 | Failed to upload file to storage | Upload failed |
GET/api/v1/documents
List all documents for the authenticated team with pagination.
Query Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
skip | integer | 0 | Number of records to skip (offset) |
limit | integer | 20 | Maximum number of records to return |
Response 200 OK
{
"items": [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"team_id": "123e4567-e89b-12d3-a456-426614174000",
"filename": "quarterly-report.pdf",
"size_bytes": 2048576,
"created_at": "2026-02-02T10:30:00.000Z"
}
],
"total": 42,
"skip": 0,
"limit": 20
}DELETE/api/v1/documents/{document_id}
Delete a document by its UUID. Associated jobs are cascade-deleted.
Path Parameters
| Parameter | Type | Description |
|---|---|---|
document_id | string (UUID) | UUID of the document to delete |
Response 200 OK (DocumentResponse)
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"team_id": "123e4567-e89b-12d3-a456-426614174000",
"filename": "quarterly-report.pdf",
"size_bytes": 2048576,
"created_at": "2026-02-02T10:30:00.000Z"
}Error Responses
| Status | Detail | Cause |
|---|---|---|
404 | Document not found | Document not found or does not belong to the team |
500 | Failed to delete document | Server error during deletion |
Jobs Endpoints
Jobs represent document processing tasks. When a job is created, it synchronously processes the document and returns results.
POST/api/v1/jobs
Create a document processing job. Blocks until processing is complete, then returns the result. The connection stays open while the document is processed.
Request Body application/json (JobCreate)
{
"document_id": "550e8400-e29b-41d4-a716-446655440000",
"question": "What were the total revenues in Q4?",
"config": {
"verify_citations": true,
"reasoning_effort": "auto",
"output_schema": {
"schema": {
"type": "object",
"properties": {
"revenue": { "type": "number" },
"quarter": { "type": "string" }
},
"required": ["revenue", "quarter"]
},
"strict": true,
"include_citations": true,
"include_raw_answer": false
}
}
}Request Fields
| Field | Type | Required | Description |
|---|---|---|---|
document_id | string (UUID) | Yes | Document to process. Must belong to authenticated team. |
question | string | Yes | Question to ask about the document |
config | JobConfig | No | Processing configuration (see Data Models) |
Response 200 OK (JobResponse)
{
"id": "770e8400-e29b-41d4-a716-446655440002",
"document_id": "550e8400-e29b-41d4-a716-446655440000",
"question": "What were the total revenues in Q4?",
"success": true,
"result": {
"answer": "The total revenues in Q4 were $12.5 million...",
"confidence": "high",
"citations": [
{
"page": 3,
"quote": "Q4 revenues reached $12.5M, up 15% QoQ",
"type": "text"
}
],
"structured_output": {
"revenue": 12500000,
"quarter": "Q4"
}
},
"error": null,
"processing_time_ms": 4523,
"usage": {
"total_cost_cents": 5.2,
"llm_cost_cents": 4.0,
"fixed_fee_cents": 1.2
},
"created_at": "2026-02-02T10:35:00.000Z"
}Error Responses
| Status | Detail | Cause |
|---|---|---|
404 | Document not found | Document not found or does not belong to the team |
500 | Failed to save job result | Saving the result failed |
502 | Failed to process document | Processing service fails or is unreachable |
504 | Processing timed out | Processing timeout |
POST/api/v1/jobs/stream
Create a document processing job with streaming progress updates. Request body is a JobCreate object. Returns a text/event-stream (Server-Sent Events) response. Each event is a JSON object on a data: line.
Request Body application/json (JobCreate)
Same as POST /api/v1/jobs: document_id, question, and optional config.
Response: text/event-stream (SSE)
SSE Event Types
| Event | Fields | Description |
|---|---|---|
downloading | progress (int) | Document download progress |
processing | progress (int), pages_done (int), total_pages (int) | Progress updates as pages are processed |
thinking | iteration (int) | Agent reasoning iterations |
tool_exec / step | message (string) | Tool execution messages |
verifying | (none) | Citation verification in progress |
complete | result (object) | Final result including answer, citations, and structured_output if output_schema was provided |
error | error_code (string), result (object) | Error occurred during processing |
Example Stream
data: {"event": "downloading", "progress": 50}
data: {"event": "processing", "pages_done": 1, "total_pages": 5}
data: {"event": "processing", "pages_done": 3, "total_pages": 5}
data: {"event": "thinking", "iteration": 1}
data: {"event": "step", "message": "Searching for revenue data..."}
data: {"event": "verifying"}
data: {"event": "complete", "result": {"answer": "...", "citations": [...], "structured_output": {...}}}Error Responses
| Status | Detail | Cause |
|---|---|---|
404 | Document not found | Document not found or does not belong to the team |
GET/api/v1/jobs
List past jobs for the authenticated team. Returns a JobList object.
Query Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
document_id | string (UUID) | null | Optional. Filter jobs by document UUID |
skip | integer | 0 | Number of records to skip (offset) |
limit | integer | 20 | Maximum number of records to return |
Data Models
DocumentResponse
interface DocumentResponse {
id: string; // Document UUID
team_id: string; // Team UUID that owns this document
filename: string; // Original filename
size_bytes: number; // File size in bytes
created_at: string; // ISO 8601 timestamp
}DocumentList
interface DocumentList {
items: DocumentResponse[]; // List of documents
total: number; // Total count
skip: number; // Offset
limit: number; // Page size
}JobCreate
interface JobCreate {
document_id: string; // UUID of the document to process (required)
question: string; // Question to ask about the document (required)
config?: JobConfig | null; // Optional processing configuration (default: null)
}JobConfig
interface JobConfig {
verify_citations?: boolean; // Default: true. Whether to verify citations.
output_schema?: OutputSchemaConfig | null; // Structured output schema configuration
reasoning_effort?: "auto" | "low" | "high"; // Default: "auto".
// "auto": AI decides analysis depth
// "low": fast, for simple questions
// "high": thorough, for complex analysis
}OutputSchemaConfig
interface OutputSchemaConfig {
schema: object; // JSON Schema (draft-07) defining the output structure (required)
strict?: boolean; // Default: true. If true, fail if schema cannot be satisfied;
// if false, return partial data.
include_citations?: boolean; // Default: true. Include citations array in response.
include_raw_answer?: boolean; // Default: false. Include raw text answer for debugging.
}Citation
interface Citation {
page: number; // Page number
quote: string; // Quoted text from the document
type: string; // Default: "text". Citation type.
}JobResult
interface JobResult {
answer: string; // Answer text
confidence: "high" | "medium" | "low" | "not_found";
citations: Citation[]; // Default: []. Supporting citations.
structured_output?: object | null; // Default: null. Conforms to the provided
// output_schema if one was given.
}UsageInfo
interface UsageInfo {
total_cost_cents: number; // Total cost in cents.
llm_cost_cents: number; // LLM token cost in cents.
fixed_fee_cents: number; // Per-call fixed fee in cents.
}JobResponse
interface JobResponse {
id: string; // Job UUID
document_id: string; // Document UUID
question: string; // Question asked
success: boolean; // Whether processing succeeded
result: JobResult | null; // Present if success=true
error: string | null; // Error message if failed
processing_time_ms: number; // Processing time in milliseconds
usage: UsageInfo | null; // Cost information
created_at: string; // ISO 8601 timestamp
}JobList
interface JobList {
items: JobResponse[]; // List of jobs
total: number; // Total count
skip: number; // Offset
limit: number; // Page size
}SSE Stream Event Types
// Base event
interface StreamEvent {
event: string;
message?: string;
}
// Download progress
interface DownloadingEvent extends StreamEvent {
event: "downloading";
progress?: number; // 0-100
}
// Processing progress
interface ProcessingEvent extends StreamEvent {
event: "processing";
progress?: number; // 0-100
pages_done?: number;
total_pages?: number;
}
// Agent reasoning
interface ThinkingEvent extends StreamEvent {
event: "thinking";
iteration?: number;
}
// Tool execution (also sent as "tool_exec")
interface StepEvent extends StreamEvent {
event: "step";
}
// Citation verification
interface VerifyingEvent extends StreamEvent {
event: "verifying";
}
// Final result
interface CompleteEvent extends StreamEvent {
event: "complete";
result: object; // Full processing result
}
// Error
interface ErrorEvent extends StreamEvent {
event: "error";
error_code?: string; // Machine-readable error code
result?: object; // Error result details
}Common Patterns
Pagination
All list endpoints follow the same pagination pattern:
- Use
skipfor offset-based pagination - Use
limitto control page size - Response includes
totalfor calculating total pages
Example: Fetching page 3 with 10 items per page
GET /api/v1/documents?skip=20&limit=10Error Response Format
All error responses follow this structure:
{
"detail": "Error message describing what went wrong"
}UUID Format
All IDs use UUID v4 format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Timestamps
All timestamps are returned in ISO 8601 format with timezone: 2026-02-02T10:30:00.000Z
SDK (Python)
The Pudding Python SDK (proofpudding) is the recommended way to integrate with the Pudding API. It provides both synchronous and asynchronous clients with automatic retries, streaming support, and fully typed data models.
Package
proofpuddingVersion
0.1.6Status
Production/StableInstallation
pip install proofpuddingRequirements
| Requirement | Details |
|---|---|
Python | 3.10, 3.11, 3.12, 3.13 |
httpx | >= 0.25.0 |
pydantic | >= 2.0 |
Quick Start
Upload a document and ask a question in just a few lines:
from pudding import PuddingClient
with PuddingClient(access_token="pk_your_api_key") as client:
# Upload a PDF
doc = client.documents.upload(file_path="report.pdf")
print(f"Uploaded: {doc.filename} ({doc.size_bytes} bytes)")
# Ask a question
job = client.jobs.create(
document_id=doc.id,
question="What were the key findings?"
)
if job.success:
print(f"Answer: {job.result.answer}")
print(f"Confidence: {job.result.confidence}")
for citation in job.result.citations:
print(f" Page {citation.page}: {citation.quote}")
else:
print(f"Error: {job.error}")The same example using the async client:
import asyncio
from pudding import AsyncPuddingClient
async def main():
async with AsyncPuddingClient(access_token="pk_your_api_key") as client:
doc = await client.documents.upload(file_path="report.pdf")
job = await client.jobs.create(
document_id=doc.id,
question="What were the key findings?"
)
if job.success:
print(job.result.answer)
asyncio.run(main())Authentication
All API calls (except health checks) require an API key. Pass your key via the access_token parameter when creating a client. API keys are prefixed with pk_.
from pudding import PuddingClient
client = PuddingClient(access_token="pk_your_api_key")
# The SDK sends the key as a Bearer token automatically:
# Authorization: Bearer pk_your_api_keyIf access_token is empty or missing, a ValueError is raised immediately at client construction.
Client Configuration
Constructor Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
access_token | str | (required) | API key for authentication (pk_... prefix) |
timeout | float | 1800.0 | Default request timeout in seconds (30 min for long processing jobs) |
max_retries | int | 3 | Max retries for transient errors (up to 4 total attempts) |
Context Managers
Always use a context manager or call close explicitly to release HTTP connections.
# Recommended: context manager
with PuddingClient(access_token="pk_...") as client:
... # client is automatically closed
# Or close manually
client = PuddingClient(access_token="pk_...")
try:
...
finally:
client.close()
# Async equivalent
async with AsyncPuddingClient(access_token="pk_...") as client:
...
# or: await client.aclose()Retry Behavior
The SDK automatically retries on transient failures. Client errors (4xx) are never retried.
| Retried | Not Retried |
|---|---|
5xx server errors | 400 ValidationError |
Network errors (connection reset, DNS) | 401 AuthenticationError |
Timeout errors | 404 NotFoundError |
| 429 RateLimitError |
API Reference
Both PuddingClient and AsyncPuddingClient expose the same API surface through three resource properties: client.health, client.documents, and client.jobs.
Health (client.health)
check() → HealthResponse
Basic health check. No authentication required.
health = client.health.check()
print(health.status) # "healthy"
print(health.version) # "0.1.0"
print(health.environment) # "production"ready() → ReadinessResponse
Readiness check including database connectivity. No authentication required.
readiness = client.health.ready()
print(readiness.status) # "ready" or "not_ready"
print(readiness.database) # "connected" or error messageDocuments (client.documents)
upload(*, file_path, file, filename) → DocumentResponse
Upload a document (). Provide either file_path or both file and filename.
| Parameter | Type | Description |
|---|---|---|
file_path | str | Path | None | Path to a local file |
file | bytes | None | Raw file bytes (must also provide filename) |
filename | str | None | Filename when using bytes |
Constraints: only, max 100 MB, must not be empty.
Raises: ValueError for invalid arguments, ValidationError for invalid files.
# From file path
doc = client.documents.upload(file_path="report.pdf")
# From bytes
with open("report.pdf", "rb") as f:
doc = client.documents.upload(file=f.read(), filename="report.pdf")
print(f"{doc.id} - {doc.filename} ({doc.size_bytes} bytes)")list(*, skip, limit) → DocumentList
List documents for the authenticated team with pagination.
| Parameter | Type | Default | Description |
|---|---|---|---|
skip | int | 0 | Pagination offset |
limit | int | 20 | Page size |
docs = client.documents.list(skip=0, limit=10)
print(f"Showing {len(docs.items)} of {docs.total} documents")
for doc in docs.items:
print(f" {doc.filename} ({doc.size_bytes} bytes)")delete(document_id) → DocumentResponse
Delete a document and all associated jobs.
| Parameter | Type | Description |
|---|---|---|
document_id | str | UUID of the document to delete |
Raises: NotFoundError if document not found or belongs to another team.
deleted = client.documents.delete("550e8400-e29b-41d4-a716-446655440000")
print(f"Deleted: {deleted.filename}")Jobs (client.jobs)
create(*, document_id, question, config, timeout) → JobResponse
Blocking call that waits for document processing to complete and returns the result.
💡 Tip: Consider using create_stream() instead. Document processing can take time, and the streaming endpoint lets you show real-time progress (page processing, reasoning steps, citation verification) to your users rather than waiting silently for a response.
| Parameter | Type | Default | Description |
|---|---|---|---|
document_id | str | (required) | UUID of the document to process |
question | str | (required) | Question to ask about the document |
config | JobConfig | None | None | Processing configuration (citations, structured output) |
timeout | float | None | None | Timeout override in seconds (falls back to client timeout) |
Raises: NotFoundError, GatewayError, TimeoutError
job = client.jobs.create(
document_id=doc.id,
question="What is the total revenue for Q4 2025?",
timeout=300 # 5 minute timeout override
)
if job.success:
print(f"Answer: {job.result.answer}")
print(f"Confidence: {job.result.confidence}")
print(f"Processing time: {job.processing_time_ms}ms")
else:
print(f"Failed: {job.error}")create_stream(*, document_id, question, config, timeout) → Iterator[StreamEvent]
Stream processing progress via Server-Sent Events. Returns an iterator of typed StreamEvent objects. Same parameters as create(). Async client returns AsyncIterator[StreamEvent].
Raises: NotFoundError, AuthenticationError, PuddingError
from pudding import (
DownloadingEvent, ProcessingEvent, ThinkingEvent,
StepEvent, VerifyingEvent, CompleteEvent, ErrorEvent
)
for event in client.jobs.create_stream(
document_id=doc.id,
question="Summarize the findings"
):
if isinstance(event, DownloadingEvent):
print(f"Downloading: {event.progress}%")
elif isinstance(event, ProcessingEvent):
print(f"Processing: {event.pages_done}/{event.total_pages} pages")
elif isinstance(event, ThinkingEvent):
print(f"Thinking... (iteration {event.iteration})")
elif isinstance(event, StepEvent):
print(f"Step: {event.message}")
elif isinstance(event, VerifyingEvent):
print("Verifying citations...")
elif isinstance(event, CompleteEvent):
print(f"Done! Result: {event.result}")
elif isinstance(event, ErrorEvent):
print(f"Error: {event.error_code} - {event.message}")list(*, document_id, skip, limit) → JobList
List past jobs with optional filtering by document.
| Parameter | Type | Default | Description |
|---|---|---|---|
document_id | str | None | None | Filter jobs for a specific document |
skip | int | 0 | Pagination offset |
limit | int | 20 | Page size |
# All jobs
all_jobs = client.jobs.list()
# Jobs for a specific document
doc_jobs = client.jobs.list(document_id=doc.id, limit=50)
for job in doc_jobs.items:
print(f"Q: {job.question} → {job.result.confidence}")Data Models
All models are Pydantic BaseModel subclasses. Import from pudding.models or directly from pudding.
DocumentResponse
class DocumentResponse(BaseModel):
id: str # Document UUID
team_id: str # Team UUID
filename: str # Original filename
size_bytes: int # File size in bytes
created_at: datetime # Creation timestampDocumentList
class DocumentList(BaseModel):
items: list[DocumentResponse] = []
total: int
skip: int
limit: intHealthResponse / ReadinessResponse
class HealthResponse(BaseModel):
status: str # "healthy"
version: str # API version
environment: str # "production", "staging", etc.
class ReadinessResponse(BaseModel):
status: str # "ready" or "not_ready"
database: str # "connected" or error messageJobConfig
class JobConfig(BaseModel):
verify_citations: bool = True # Whether to verify citations
reasoning_effort: str = "auto" # "auto", "low", or "high"
output_schema: OutputSchemaConfig | None = None # Structured output configOutputSchemaConfig
class OutputSchemaConfig(BaseModel):
schema_: dict[str, Any] # JSON Schema (draft-07). Alias: "schema"
strict: bool = True # Fail if schema can't be satisfied
include_citations: bool = True # Include citations in response
include_raw_answer: bool = False # Include raw text answer (debugging)
# Both work in Python:
# OutputSchemaConfig(schema={"type": "object", ...})
# OutputSchemaConfig(schema_={"type": "object", ...})Citation
class Citation(BaseModel):
page: int # Page number
quote: str # Quoted text from document
type: str = "text" # Citation typeJobResult
class JobResult(BaseModel):
answer: str # Answer text
confidence: str # "high", "medium", "low", "not_found"
citations: list[Citation] = [] # Supporting citations
structured_output: dict[str, Any] | None = None # Present when output_schema was providedUsageInfo
class UsageInfo(BaseModel):
total_cost_cents: float # Total cost in cents
llm_cost_cents: float # LLM token cost in cents
fixed_fee_cents: float # Per-call fixed fee in centsJobResponse
class JobResponse(BaseModel):
id: str # Job UUID
document_id: str # Document UUID
question: str # Question asked
success: bool # Whether processing succeeded
result: JobResult | None # Present if success=True
error: str | None # Present if success=False
processing_time_ms: int # Processing time in milliseconds
usage: UsageInfo | None # Cost information
created_at: datetime # Creation timestampJobList
class JobList(BaseModel):
items: list[JobResponse] = []
total: int
skip: int
limit: intStream Event Types
# Base class for all stream events
class StreamEvent(BaseModel):
event: str # Event type
message: str = "" # Event message
class DownloadingEvent(StreamEvent):
event: str = "downloading"
progress: int | None = None # Download progress 0-100
class ProcessingEvent(StreamEvent):
event: str = "processing"
progress: int | None = None # Processing progress 0-100
pages_done: int | None = None # Pages processed so far
total_pages: int | None = None # Total pages in document
class ThinkingEvent(StreamEvent):
event: str = "thinking"
iteration: int | None = None # Reasoning iteration number
class StepEvent(StreamEvent):
event: str = "step" # Also mapped from "tool_exec" SSE events
class VerifyingEvent(StreamEvent):
event: str = "verifying"
class CompleteEvent(StreamEvent):
event: str = "complete"
result: dict[str, Any] # Full processing result
class ErrorEvent(StreamEvent):
event: str = "error"
error_code: str | None = None # Machine-readable error code
result: dict[str, Any] | None = None # Error result detailsStreaming (SSE)
Use client.jobs.create_stream() to receive real-time progress updates as your document is processed. Events are streamed as Server-Sent Events and automatically parsed into typed Python objects.
Event Flow
| Event | When | Key Fields |
|---|---|---|
DownloadingEvent | Document is being downloaded | progress |
ProcessingEvent | Pages are being read | progress, pages_done, total_pages |
ThinkingEvent | Agent is reasoning | iteration |
StepEvent | A tool/action is executed | message |
VerifyingEvent | Citations are being verified | (inherited from StreamEvent) |
CompleteEvent | Processing finished successfully | result (dict) |
ErrorEvent | An error occurred | error_code, result |
Sync streaming
from pudding import PuddingClient, ProcessingEvent, CompleteEvent
with PuddingClient(access_token="pk_...") as client:
for event in client.jobs.create_stream(
document_id="...",
question="Extract all financial data"
):
if isinstance(event, ProcessingEvent):
pct = event.pages_done / event.total_pages * 100
print(f"Progress: {pct:.0f}%")
elif isinstance(event, CompleteEvent):
print("Result:", event.result)Async streaming
from pudding import AsyncPuddingClient, ProcessingEvent, CompleteEvent
async with AsyncPuddingClient(access_token="pk_...") as client:
async for event in client.jobs.create_stream(
document_id="...",
question="Extract all financial data"
):
if isinstance(event, ProcessingEvent):
pct = event.pages_done / event.total_pages * 100
print(f"Progress: {pct:.0f}%")
elif isinstance(event, CompleteEvent):
print("Result:", event.result)Structured Output
Use JobConfig with an OutputSchemaConfig to get responses that conform to a JSON Schema (draft-07) you define. The result appears in job.result.structured_output.
from pudding import PuddingClient, JobConfig, OutputSchemaConfig
with PuddingClient(access_token="pk_...") as client:
config = JobConfig(
output_schema=OutputSchemaConfig(
schema={
"type": "object",
"properties": {
"company_name": {"type": "string"},
"revenue": {"type": "number"},
"quarter": {"type": "string"},
"year": {"type": "integer"}
},
"required": ["company_name", "revenue", "quarter", "year"]
},
strict=True, # Fail if schema cannot be satisfied
include_citations=True,
include_raw_answer=False
)
)
job = client.jobs.create(
document_id=doc.id,
question="What was the company's quarterly revenue?",
config=config
)
if job.success:
data = job.result.structured_output
print(f"{data['company_name']}: ${data['revenue']}M in {data['quarter']} {data['year']}")
# Citations are still available
for c in job.result.citations:
print(f" Source: page {c.page}")When strict=True (default), the job will fail if the document doesn't contain enough information to satisfy the schema. Set strict=False to get partial data instead.
Error Handling
All exceptions inherit from PuddingError. Each exception has message (str) and status_code (int | None) attributes. Import from pudding.exceptions or directly from pudding.
Exception Hierarchy
| Exception | HTTP Status | When |
|---|---|---|
PuddingError | (base class) | Unmapped status codes, network errors |
AuthenticationError | 401 | Invalid, missing, or revoked API key |
ValidationError | 400 | Invalid file type, empty file, oversized file, bad fields |
NotFoundError | 404 | Document/job not found or belongs to another team |
RateLimitError | 429 | Too many requests |
ServerError | 500 | Internal server error |
GatewayError | 502 | Processing service unavailable |
TimeoutError | 504 | Processing or request timed out |
Catching errors
from pudding import (
PuddingClient,
PuddingError,
AuthenticationError,
NotFoundError,
ValidationError,
TimeoutError,
GatewayError,
)
with PuddingClient(access_token="pk_...") as client:
try:
job = client.jobs.create(
document_id="...",
question="Summarize the report"
)
except AuthenticationError:
print("Invalid API key")
except NotFoundError:
print("Document not found")
except TimeoutError:
print("Processing timed out — try again or use streaming")
except GatewayError:
print("Processing service is temporarily unavailable")
except PuddingError as e:
# Catch-all for any other SDK error
print(f"[{e.status_code}] {e.message}")Logging
The SDK uses Python's standard logging module under the logger name "pudding". It logs request attempts, retries, errors, and client lifecycle events at DEBUG and WARNING levels.
import logging
# Enable debug logging for the SDK
logging.basicConfig()
logging.getLogger("pudding").setLevel(logging.DEBUG)Examples
Upload and Query a Document
from pudding import PuddingClient
with PuddingClient(access_token="pk_...") as client:
# Upload
doc = client.documents.upload(file_path="contract.pdf")
print(f"Uploaded {doc.filename} (id: {doc.id})")
# Query
job = client.jobs.create(
document_id=doc.id,
question="What is the termination clause?"
)
if job.success:
print(f"\nAnswer ({job.result.confidence} confidence):")
print(job.result.answer)
print(f"\nCitations:")
for c in job.result.citations:
print(f" p.{c.page}: \"{c.quote}\"")Paginate Through All Documents
from pudding import PuddingClient
with PuddingClient(access_token="pk_...") as client:
skip = 0
limit = 50
while True:
page = client.documents.list(skip=skip, limit=limit)
for doc in page.items:
print(f"{doc.filename} - {doc.created_at}")
if skip + limit >= page.total:
break
skip += limitStream Processing Progress
from pudding import (
PuddingClient,
DownloadingEvent, ProcessingEvent, ThinkingEvent,
StepEvent, VerifyingEvent, CompleteEvent, ErrorEvent
)
with PuddingClient(access_token="pk_...") as client:
doc = client.documents.upload(file_path="large-report.pdf")
for event in client.jobs.create_stream(
document_id=doc.id,
question="List all action items mentioned in the report"
):
match event:
case DownloadingEvent():
print(f"⬇️ Downloading: {event.progress}%")
case ProcessingEvent():
print(f"📄 Reading pages: {event.pages_done}/{event.total_pages}")
case ThinkingEvent():
print(f"🤔 Reasoning (iteration {event.iteration})...")
case StepEvent():
print(f"⚙️ {event.message}")
case VerifyingEvent():
print(f"🔍 Verifying citations...")
case CompleteEvent():
print(f"✅ Done!")
print(event.result)
case ErrorEvent():
print(f"❌ {event.error_code}: {event.message}")Extract Structured Data
from pudding import PuddingClient, JobConfig, OutputSchemaConfig
with PuddingClient(access_token="pk_...") as client:
doc = client.documents.upload(file_path="invoice.pdf")
job = client.jobs.create(
document_id=doc.id,
question="Extract the invoice details",
config=JobConfig(
output_schema=OutputSchemaConfig(
schema={
"type": "object",
"properties": {
"invoice_number": {"type": "string"},
"date": {"type": "string", "format": "date"},
"total_amount": {"type": "number"},
"currency": {"type": "string"},
"line_items": {
"type": "array",
"items": {
"type": "object",
"properties": {
"description": {"type": "string"},
"quantity": {"type": "integer"},
"unit_price": {"type": "number"}
}
}
}
},
"required": ["invoice_number", "total_amount", "currency"]
}
)
)
)
if job.success:
inv = job.result.structured_output
print(f"Invoice #{inv['invoice_number']}")
print(f"Total: {inv['currency']} {inv['total_amount']}")
for item in inv.get("line_items", []):
print(f" {item['description']}: {item['quantity']} x {item['unit_price']}")Robust Error Handling
from pudding import (
PuddingClient,
AuthenticationError,
ValidationError,
NotFoundError,
TimeoutError,
GatewayError,
PuddingError,
)
def process_document(api_key: str, file_path: str, question: str):
with PuddingClient(access_token=api_key, max_retries=5) as client:
# Upload with validation handling
try:
doc = client.documents.upload(file_path=file_path)
except ValidationError as e:
print(f"Invalid file: {e.message}")
return
except AuthenticationError:
print("Check your API key")
return
# Process with retry-aware error handling
try:
job = client.jobs.create(
document_id=doc.id,
question=question,
timeout=600 # 10 minutes
)
except TimeoutError:
print("Processing timed out — document may be too large")
return
except GatewayError:
print("Processing service unavailable — please retry later")
return
except PuddingError as e:
print(f"Unexpected error [{e.status_code}]: {e.message}")
return
if job.success:
return job.result
else:
print(f"Job failed: {job.error}")
return NoneAsync Batch Processing
import asyncio
from pudding import AsyncPuddingClient
async def ask_questions(doc_id: str, questions: list[str]):
async with AsyncPuddingClient(access_token="pk_...") as client:
tasks = [
client.jobs.create(document_id=doc_id, question=q)
for q in questions
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for question, result in zip(questions, results):
if isinstance(result, Exception):
print(f"Q: {question} → Error: {result}")
elif result.success:
print(f"Q: {question} → {result.result.answer[:80]}...")
asyncio.run(ask_questions(
doc_id="...",
questions=[
"What is the executive summary?",
"What are the key risks?",
"What is the financial outlook?",
]
))