Python SDK

Official Klime SDK for Python. Track events, identify users, and associate them with groups.

Installation

pip install klime

Quick Start

from klime import KlimeClient

client = KlimeClient(
    write_key='your-write-key'
)

# Identify a user
client.identify('user_123', {
    'email': '[email protected]',
    'name': 'Stefan'
})

# Track an event
client.track('Button Clicked', {
    'button_name': 'Sign up',
    'plan': 'pro'
}, user_id='user_123')

# Associate user with a group and set group traits
client.group('org_456', {
    'name': 'Acme Inc',
    'plan': 'enterprise'
}, user_id='user_123')

# Or just link the user to a group (if traits are already set)
client.group('org_456', user_id='user_123')

# Shutdown gracefully
client.shutdown()

Good to know

  • No anonymous tracking. Every track() call needs a user_id or group_id. Wait until the user is identified before sending events.
  • Name your events in past tense with an Object + Action pattern: "Report Generated", "User Invited", "Export Completed".
  • Always set email and name traits in identify() and group(). These are used for display and search in the dashboard.
  • Traits can be strings, numbers, booleans, or ISO 8601 date strings. Use snake_case for trait keys.
  • In Group mode, your dashboard stays empty until you call group(). Events from identify() and track() are recorded, but customers won't appear until users are linked to groups.
  • Order doesn't matter. Events sent before identify() or group() are retroactively attributed once the relationships are established.

Installation Prompt

Copy and paste this prompt into Cursor, Copilot, or your favorite AI editor to integrate Klime:

Integrate Klime for customer analytics. Klime tracks user activity to identify which customers are healthy vs at risk of churning.

ANALYTICS MODES (determine which applies):
- Companies & Teams: Your customers are companies with multiple team members (SaaS, enterprise tools)
  → Use identify() + group() + track()
- Individual Customers: Your customers are individuals with private accounts (consumer apps, creator tools)
  → Use identify() + track() only (no group() needed)

KEY CONCEPTS:
- Every track() call requires either user_id OR group_id (no anonymous events)
- Use group_id alone for org-level events (webhooks, cron jobs, system metrics)
- group() links a user to a company AND sets company traits (only for Companies & Teams mode)
- Order doesn't matter - events before identify/group still get attributed correctly

BEST PRACTICES:
- Initialize client ONCE at app startup (singleton pattern)
- Store write key in KLIME_WRITE_KEY environment variable
- Call shutdown() on process exit to flush remaining events (auto-registered via atexit)

Install: pip install klime

import os
from klime import KlimeClient

client = KlimeClient(write_key=os.environ['KLIME_WRITE_KEY'])

# Identify users at signup/login:
client.identify('usr_abc123', {'email': '[email protected]', 'name': 'Jane Smith'})

# Track key activities:
client.track('Report Generated', {'report_type': 'revenue'}, user_id='usr_abc123')
client.track('Feature Used', {'feature': 'export', 'format': 'csv'}, user_id='usr_abc123')
client.track('Teammate Invited', {'role': 'member'}, user_id='usr_abc123')

# If Companies & Teams mode: link user to their company and set company traits
client.group('org_456', {'name': 'Acme Inc', 'plan': 'enterprise'}, user_id='usr_abc123')

INTEGRATION WORKFLOW:

Phase 1: Discover
Explore the codebase to understand:
1. What framework is used? (Django, Flask, FastAPI, Starlette, etc.)
2. Where is user identity available? (e.g., request.user.id, current_user.id, g.user, Depends() injection)
3. Is this Companies & Teams or Individual Customers?
   - Look for: organization, workspace, tenant, team, account models → Companies & Teams (use group())
   - No company/org concept, just individual users → Individual Customers (skip group())
4. Where do core user actions happen? (views, routes, API endpoints, services)
5. Is there existing analytics? (search: segment, posthog, mixpanel, amplitude, track)
Match your integration style to the framework's conventions.

Phase 2: Instrument
Add these calls using idiomatic patterns for the framework:
- Initialize client once (Django: apps.py/settings, Flask: app factory, FastAPI: lifespan/startup)
- identify() in auth/login success handler
- group() when user-org association is established (Companies & Teams mode only)
- track() for key user actions (see below)

WHAT TO TRACK:
Active engagement (primary): feature usage, resource creation, collaboration, completing flows
Session signals (secondary): login/session start, dashboard access - distinguishes "low usage" from "churned"
Do NOT track: every request, health checks, middleware passthrough, background tasks

Phase 3: Verify
Confirm: client initialized, shutdown handled, identify/group/track calls added

Phase 4: Summarize
Report what you added:
- Files modified and what was added to each
- Events being tracked (list event names and what triggers them)
- How user_id is obtained (and group_id if Companies & Teams mode)
- Any assumptions made or questions

API Reference

Constructor

KlimeClient(
    write_key: str,                    # Required: Your Klime write key
    endpoint: Optional[str] = None,    # Optional: API endpoint (default: https://i.klime.com)
    flush_interval: Optional[int] = None,      # Optional: Milliseconds between flushes (default: 2000)
    max_batch_size: Optional[int] = None,     # Optional: Max events per batch (default: 20, max: 100)
    max_queue_size: Optional[int] = None,      # Optional: Max queued events (default: 1000)
    retry_max_attempts: Optional[int] = None, # Optional: Max retry attempts (default: 5)
    retry_initial_delay: Optional[int] = None, # Optional: Initial retry delay in ms (default: 1000)
    flush_on_shutdown: Optional[bool] = None,  # Optional: Auto-flush on exit (default: True)
    logger: Optional[logging.Logger] = None,   # Optional: Custom logger (default: logging.getLogger("klime"))
    on_error: Optional[Callable] = None,       # Optional: Callback for batch failures
    on_success: Optional[Callable] = None      # Optional: Callback for successful sends
)

Methods

track(event: str, properties: Optional[Dict] = None, user_id: Optional[str] = None, group_id: Optional[str] = None) -> None

Track an event. Events can be attributed in two ways:

  • User events: Provide user_id to track user activity (most common)
  • Group events: Provide group_id without user_id for organization-level events
# User event (most common)
client.track('Button Clicked', {
    'button_name': 'Sign up',
    'plan': 'pro'
}, user_id='user_123')

# Group event (for webhooks, cron jobs, system events)
client.track('Events Received', {
    'count': 100,
    'source': 'webhook'
}, group_id='org_456')

Note: The group_id parameter can also be combined with user_id for multi-tenant scenarios where you need to specify which organization context a user event occurred in.

identify(user_id: str, traits: Optional[Dict] = None) -> None

Identify a user with traits.

client.identify('user_123', {
    'email': '[email protected]',
    'name': 'Stefan'
})

group(group_id: str, traits: Optional[Dict] = None, user_id: Optional[str] = None) -> None

Associate a user with a group and/or set group traits.

# Associate user with a group and set group traits (most common)
client.group('org_456', {
    'name': 'Acme Inc',
    'plan': 'enterprise'
}, user_id='user_123')

# Just link a user to a group (traits already set or not needed)
client.group('org_456', user_id='user_123')

# Just update group traits (e.g., from a webhook or background job)
client.group('org_456', {
    'plan': 'enterprise',
    'employee_count': 50
})

flush() -> None

Manually flush queued events immediately.

client.flush()

shutdown() -> None

Gracefully shutdown the client, flushing remaining events.

client.shutdown()

queue_size() -> int

Return the number of events currently in the queue.

pending = client.queue_size()
print(f"{pending} events waiting to be sent")

Synchronous Methods

For cases where you need confirmation that events were sent (e.g., before process exit, in tests), use the synchronous variants:

track_sync(event, properties, user_id, group_id) -> BatchResponse

Track an event synchronously. Raises SendError on failure.

from klime import SendError

try:
    response = client.track_sync('Critical Action', {'key': 'value'}, user_id='user_123')
    print(f"Sent! Accepted: {response.accepted}")
except SendError as e:
    print(f"Failed to send: {e.message}")

identify_sync(user_id, traits) -> BatchResponse

Identify a user synchronously. Raises SendError on failure.

group_sync(group_id, traits, user_id) -> BatchResponse

Associate a user with a group synchronously. Raises SendError on failure.

Features

  • Automatic Batching: Events are automatically batched and sent every 2 seconds or when the batch size reaches 20 events
  • Automatic Retries: Failed requests are automatically retried with exponential backoff
  • Thread-Safe: Safe to use from multiple threads
  • Process Exit Handling: Automatically flushes events on process exit (via atexit)
  • Zero Dependencies: Uses only Python standard library

Performance

When you call track(), identify(), or group(), the SDK:

  1. Adds the event to a thread-safe queue (microseconds)
  2. Returns immediately without waiting for network I/O

Events are sent to Klime's servers in background threads. This means:

  • No network blocking: HTTP requests happen asynchronously in background threads
  • No latency impact: Tracking calls add < 1ms to your request handling time
  • Automatic batching: Events are queued and sent in batches (default: every 2 seconds or 20 events)
# This returns immediately - no HTTP request is made here
client.track('Button Clicked', {'button': 'signup'}, user_id='user_123')

# Your code continues without waiting
return {'success': True}

The only blocking operation is flush(), which waits for all queued events to be sent. This is typically only called during graceful shutdown.

Configuration

Default Values

  • flush_interval: 2000ms
  • max_batch_size: 20 events
  • max_queue_size: 1000 events
  • retry_max_attempts: 5 attempts
  • retry_initial_delay: 1000ms
  • flush_on_shutdown: True

Logging

The SDK uses Python's built-in logging module. By default, it logs to logging.getLogger("klime").

import logging

# Enable debug logging
logging.getLogger("klime").setLevel(logging.DEBUG)

# Or provide a custom logger
client = KlimeClient(
    write_key='your-write-key',
    logger=logging.getLogger("myapp.klime")
)

For Django/Flask, the SDK automatically integrates with your app's logging configuration.

Callbacks

def handle_error(error, events):
    # Report to your error tracking service
    sentry_sdk.capture_exception(error)
    print(f"Failed to send {len(events)} events: {error}")

def handle_success(response):
    print(f"Sent {response.accepted} events")

client = KlimeClient(
    write_key='your-write-key',
    on_error=handle_error,
    on_success=handle_success
)

Error Handling

The SDK automatically handles:

  • Transient errors (429, 503, network failures): Retries with exponential backoff
  • Permanent errors (400, 401): Logs error and drops event
  • Rate limiting: Respects Retry-After header

For synchronous operations, use *_sync() methods which raise SendError on failure:

from klime import KlimeClient, SendError

try:
    response = client.track_sync('Event', user_id='user_123')
except SendError as e:
    print(f"Failed: {e.message}, events: {len(e.events)}")