Skip to main content

Python SDK Reference

Official Python SDK for Teckel AI. Handles batching, retries, and error handling automatically.

Field constraints and validation rules: See HTTP API Reference for complete field specifications.

Installation

pip install teckel-ai

Requirements: Python 3.10+ (repository development/testing standard: Python 3.11)

Frameworks: FastAPI, Flask, Django, LangChain, LlamaIndex, AWS Lambda, Google Cloud Functions

Quick Start

from teckel import TeckelTracer

tracer = TeckelTracer(api_key="tk_live_...")

tracer.trace({
"query": "How do I reset my password?",
"response": "Go to Settings > Security...",
"sessionId": "chat-session-42",
"userId": "user@example.com",
"model": "gpt-5",
"latencyMs": 234,
"documents": [{
"id": "password-reset-guide.md",
"name": "Password Reset Guide",
"text": "To reset your password...",
"similarity": 0.92
}]
})

# Serverless: REQUIRED before returning
tracer.flush(timeout_s=5.0)

API Reference

Constructor

TeckelTracer(
api_key: str, # Required: tk_live_...
endpoint: str = None, # Default: "https://app.teckel.ai/api"
debug: bool = False,
timeout_s: float = 5.0,
batch: BatchConfig = None,
on_error: Callable = None,
)
from teckel import BatchConfig

BatchConfig(
max_size=100, # Default: 100 traces
max_bytes=5_000_000, # Default: 5MB
flush_interval_s=0.1, # Default: 100ms
)

Python config uses seconds (timeout_s, flush_interval_s). Trace payload timing fields still use milliseconds (latencyMs, durationMs) to match the HTTP API.

tracer.trace(data)

Submit a trace. Fire-and-forget, non-blocking.

tracer.trace(data: TraceData | dict) -> None

See TraceData type below. For field constraints, see HTTP API Reference.

Token auto-aggregation: If you provide spans but not tokens, the SDK sums promptTokens and completionTokens from all spans automatically.

Cost calculation: Cost is automatically calculated server-side from token counts. Provide costUsd on traces or spans to override with your own values.

tracer.feedback(data)

Submit user feedback for a trace or session.

tracer.feedback(data: FeedbackData | dict) -> None

See FeedbackData type below.

tracer.flush(timeout_s?)

Wait for queued traces to send. Required in serverless.

tracer.flush(timeout_s: float = 5.0) -> None

Throws on timeout. Default: 5.0s.

tracer.destroy()

Cleanup tracer resources. Flushes pending traces and stops auto-flush timer.

tracer.destroy() -> None

Also available as a context manager:

with TeckelTracer(api_key="tk_live_...") as tracer:
tracer.trace({"query": "...", "response": "..."})
# Flushes and cleans up automatically

Types

Accepts both snake_case and camelCase field names.

TraceData

from teckel import TraceData

TraceData(
query: str, # User's question
response: str, # AI response
trace_id: str = None, # UUID (auto-generated if omitted)
session_id: str = None, # Groups traces into conversations
user_id: str = None, # End-user identifier
agent_name: str = None, # Agent/workflow identifier
model: str = None, # LLM model name
latency_ms: int = None, # Response time in ms
tokens: TokenUsage = None, # Token counts (auto-calculated from spans)
cost_usd: float = None, # Cost in USD (auto-calculated from tokens)
system_prompt: str = None, # LLM system instructions
documents: list[Document] = None, # RAG sources
spans: list[SpanData] = None, # OTel spans
metadata: dict = None,
)

Document

from teckel import Document

Document(
id: str, # Your document identifier (any format)
name: str, # Human-readable name
text: str, # Chunk content sent to LLM
last_updated: str = None, # ISO 8601 - for freshness analysis
url: str = None, # Link to source
source: str = None, # Platform: 'confluence', 'slack', 'gdrive'
file_format: str = None, # Format: 'pdf', 'md', 'docx'
similarity: float = None, # 0-1 relevance score
rank: int = None, # Position in results (0 = first)
owner_email: str = None, # Document owner
)

SpanData

For automatic span collection, see OpenTelemetry Integration.

from teckel import SpanData

SpanData(
name: str, # Span name
started_at: str, # ISO 8601
type: SpanType = "custom", # 'llm_call' | 'tool_call' | 'retrieval' | 'agent' | 'guardrail' | 'custom'
span_id: str = None, # Client-provided span ID (1-64 chars)
parent_span_id: str = None, # Parent for nesting
ended_at: str = None, # ISO 8601
duration_ms: int = None, # Duration in ms
status: SpanStatus = "completed", # 'running' | 'completed' | 'error'
status_message: str = None, # Error message

# LLM calls
model: str = None,
prompt_tokens: int = None, # Summed for trace-level tokens
completion_tokens: int = None,
cost_usd: float = None, # Cost for this span

# Tool calls
tool_name: str = None,
tool_arguments: dict = None,
tool_result: dict = None,

# Generic I/O (for non-tool spans)
input: dict = None,
output: dict = None,

metadata: dict = None,
)

FeedbackData

from teckel import FeedbackData

FeedbackData(
trace_id: str = None, # Target trace (UUID) - one required
session_id: str = None, # Target session - one required
type: FeedbackType, # 'thumbs_up' | 'thumbs_down' | 'flag' | 'rating'
value: str = None, # For ratings: '1'-'5'
comment: str = None, # User explanation
)

TokenUsage

from teckel import TokenUsage

TokenUsage(
prompt: int,
completion: int,
total: int,
)

Patterns

Serverless (Critical)

Functions terminate immediately after returning. Without flush(), traces are lost.

# AWS Lambda, Google Cloud Functions, etc.
def handler(event, context):
response = generate_response(event["query"])

tracer.trace({"query": event["query"], "response": response})
tracer.flush(timeout_s=5.0) # REQUIRED

return {"body": response}

Long-Running Servers

No flush needed as traces send in background.

# FastAPI
@app.post("/chat")
async def chat(request: ChatRequest):
response = await generate_response(request.query)
tracer.trace({"query": request.query, "response": response})
return {"response": response} # Trace sends async

High-Throughput Batching

Default config handles most cases. For high volume:

from teckel import TeckelTracer, BatchConfig

tracer = TeckelTracer(
api_key=os.environ["TECKEL_API_KEY"],
batch=BatchConfig(
max_size=100, # Flush after N traces
max_bytes=5_000_000, # Flush after 5MB
flush_interval_s=0.1, # Auto-flush interval
),
)

Multi-Turn Conversations

Group conversation turns with sessionId:

session_id = "user-123-conv-456"

# Turn 1
tracer.trace({"sessionId": session_id, "query": "What is X?", "response": "X is..."})

# Turn 2
tracer.trace({"sessionId": session_id, "query": "Tell me more", "response": "More about X..."})

# Session-level feedback
tracer.feedback({"sessionId": session_id, "type": "thumbs_up"})

RAG Document Tracking

Include retrieved documents for quality analysis:

chunks = vector_search(query, limit=5)
response = generate_with_context(query, chunks)

tracer.trace({
"query": query,
"response": response,
"documents": [
{
"id": chunk.id,
"name": chunk.title,
"text": chunk.content,
"similarity": chunk.score,
"rank": i,
"url": chunk.source_url,
"lastUpdated": chunk.modified_at.isoformat() if chunk.modified_at else None,
}
for i, chunk in enumerate(chunks)
],
})

OpenTelemetry Integration

Collect spans automatically with OTel:

from teckel import TeckelTracer
from teckel.otel import TeckelSpanCollector

tracer = TeckelTracer(api_key="tk_live_...")
collector = TeckelSpanCollector()

# Pass collector.get_tracer() to your OTel-instrumented library
otel_tracer = collector.get_tracer()

# After your instrumented code runs:
tracer.trace({
"query": user_query,
"response": result_text,
"spans": collector.get_spans(), # Tokens auto-calculated
})

collector.shutdown()
tracer.flush()

Install OTel extras: pip install teckel-ai[otel]

See OpenTelemetry Integration for complete documentation.

Error Handling

  • trace() never throws - logs failures in debug mode
  • feedback() never throws - logs failures in debug mode
  • flush() throws on timeout - catch to monitor data loss
  • Auto-retries once on 429/5xx/network errors (250-350ms jitter)
try:
tracer.flush(timeout_s=5.0)
except TimeoutError:
print("Trace flush timeout - some traces may be lost")

Troubleshooting

IssueSolution
Traces not appearingAdd tracer.flush() in serverless
DateTime errorsUse datetime.now(timezone.utc).isoformat() for timestamps
Debug issuesSet debug=True in constructor

See Troubleshooting Guide for more.


See also: HTTP API Reference | OpenTelemetry Integration | Getting Started

Version 0.1.0