The official Python client for the VynFi synthetic financial data API. Generate realistic financial datasets -- journal entries, chart of accounts, document flows, banking/AML data, ESG metrics, and more -- for audit analytics, fraud detection, compliance testing, and ML training.
pip install vynfiWith optional integrations:
pip install vynfi[pandas] # pandas DataFrame support
pip install vynfi[polars] # polars DataFrame support
pip install vynfi[all] # all integrationsfrom vynfi import VynFi
client = VynFi(api_key="vf_live_...")
# Generate synthetic financial data
job = client.generate(
tables=[{"name": "journal_entries", "rows": 5000}],
sector_slug="retail",
)
print(f"Job {job.id} submitted ({job.credits_reserved} credits)")
# Wait for completion (built-in polling)
completed = client.jobs.wait(job.id)
# Download and explore the archive
archive = client.jobs.download_archive(completed.id)
print(archive) # JobArchive(84 files, 1.5 GB)
# Access specific files
entries = archive.json("journal_entries.json")
print(f"{len(entries)} journal entry documents")
# Or download raw bytes
data = client.jobs.download(completed.id)See the examples/ directory for 7 Jupyter notebooks and 7 standalone scripts covering audit analytics, fraud detection, document flows, process mining, ESG reporting, and AML compliance testing.
# Browse available sectors
sectors = client.catalog.list_sectors()
for s in sectors:
print(f"{s.name}: {s.table_count} tables (quality={s.quality_score})")
# Get sector detail with table schemas
sector = client.catalog.get_sector("retail")
for table in sector.tables:
print(f" {table.name}: {len(table.columns)} columns")
# Browse system templates
templates = client.catalog.list_templates(sector="retail")
for t in templates:
print(f" {t.name} ({t.framework}, tier={t.min_tier})")# Async generation (large datasets)
job = client.jobs.generate(
tables=[{"name": "journal_entries", "rows": 50000}],
sector_slug="retail",
)
completed = client.jobs.wait(job.id)
# Quick synchronous generation (up to 10k rows)
result = client.jobs.generate_quick(
tables=[{"name": "journal_entries", "rows": 100}],
sector_slug="retail",
)
# Config-based generation
job = client.jobs.generate_config(
config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
)
# List and filter jobs
jobs = client.jobs.list(status="completed", limit=10)
# Download specific artifacts
data = client.jobs.download_file(job_id, "journal_entries.json")
# Stream progress via SSE
for event in client.jobs.stream(job.id):
if event["event"] == "progress":
print(f"{event['data']['percent']}%")# Save a generation config for reuse
cfg = client.configs.create(
name="Monthly Retail",
config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
tags=["recurring", "retail"],
)
# Validate before running
result = client.configs.validate(config={"sector": "retail", "rows": 100})
print(f"Valid: {result.valid}, errors: {len(result.errors)}")
# Estimate cost before running
est = client.configs.estimate_cost(config={"sector": "retail", "rows": 50000})
print(f"Estimated: {est.total_credits} credits")# Create a fiscal-year session
session = client.sessions.create(
name="FY2026",
fiscal_year_start="2026-01-01",
period_length_months=3,
periods=4,
generation_config={"sector": "retail", "rows": 10000},
)
# Generate each period sequentially
for _ in range(session.periods_total):
resp = client.sessions.generate_next(session.id)
print(f"Period {resp.period_index}: job {resp.job_id}")# List causal graph templates
templates = client.scenarios.templates()
# Create a scenario
scenario = client.scenarios.create(
name="Fraud Spike",
template_id="supply-chain",
interventions={"fraudRate": 0.05},
generation_config={"sector": "retail", "rows": 10000},
)
# Run baseline vs counterfactual
scenario = client.scenarios.run(scenario.id)
# Get diff analysis
scenario = client.scenarios.diff(scenario.id)# Download the output archive with easy file access
archive = client.jobs.download_archive(job_id)
# Explore contents
print(archive.backend) # "zip" (legacy) or "managed_blob" (TB-scale)
print(archive.files()) # all 80+ files
print(archive.categories()) # ['banking', 'document_flows', 'esg', ...]
print(archive.summary()) # file counts and sizes by category
# Access specific files (lazy fetch via presigned URL for managed_blob)
entries = archive.json("journal_entries.json")
coa = archive.json("chart_of_accounts.json")
# Find files by pattern
banking_files = archive.find("banking/*")
esg_files = archive.find("esg/*")
# Extract everything to disk
archive.extract_to("./output")# Get statistical evaluations for a completed job
a = client.jobs.analytics(job_id)
# Benford's Law conformity on amounts
print(f"MAD: {a.benford_analysis.mad:.4f}")
print(f"Conformity: {a.benford_analysis.conformity}")
# Amount distribution statistics
print(f"Skewness: {a.amount_distribution.skewness:.2f}")
print(f"Round number ratio: {a.amount_distribution.round_number_ratio:.2%}")
# Process variants
print(f"Happy path: {a.process_variant_summary.happy_path_concentration:.2%}")
# Banking evaluation (KYC, AML, cross-layer, velocity, false-positive)
print(f"Banking passes: {a.banking_evaluation.passes}")# Rate-controlled streaming for TB-scale jobs
for envelope in client.jobs.stream_ndjson(job_id, rate=500, progress_interval=1000):
if envelope.get("type") == "_progress":
print(f" {envelope['lines_emitted']:,} lines emitted")
else:
# Process each data record
my_pipeline.send(envelope)# Use native JSON numbers and flat layout to skip conversion boilerplate
job = client.jobs.generate_config(config={
"sector": "retail",
"rows": 1000,
# ...
"output": {
"numericMode": "native", # numbers, not strings
"exportLayout": "flat", # one row per line, header merged
},
})# Validate output size against tier quota before submitting
size = client.configs.estimate_size(config=my_config)
print(f"Estimated: {size.estimated_bytes / 1e9:.1f} GB across {size.estimated_files} files")
print(f"Quota: {size.tier_quota_bytes / 1e9:.0f} GB")
if size.exceeds_quota:
print(f"WARNING: {size.warning}")
for bucket in size.breakdown:
print(f" {bucket.domain}: {bucket.bytes / 1e6:.0f} MB")# Usage summary
usage = client.usage.summary()
print(f"Balance: {usage.balance} credits, burn rate: {usage.burn_rate}/day")
# Daily breakdown
daily = client.usage.daily()
for d in daily.daily:
print(f" {d.date}: {d.credits} credits")
# Prepaid credit balance
balance = client.credits.balance()
print(f"Prepaid: {balance.total_prepaid_credits}")
# Purchase credits
resp = client.credits.purchase(pack="10k")
print(f"Checkout: {resp.checkout_url}")scores = client.quality.scores()
for s in scores:
print(f"Job {s.job_id}: overall={s.overall_score:.2f}")
timeline = client.quality.timeline(days=30)# API keys
key = client.api_keys.create(name="CI pipeline", environment="test")
print(f"Key: {key.key}") # Only shown once!
# Webhooks
hook = client.webhooks.create(
url="https://example.com/webhook",
events=["job.completed", "job.failed"],
)
# Billing
sub = client.billing.subscription()
portal = client.billing.portal()
print(f"Manage billing: {portal.portal_url}")
# Notifications
unread = client.notifications.list(unread=True)
client.notifications.mark_read(all=True)from vynfi.integrations.pandas import (
job_to_dataframe,
archive_to_dataframes,
usage_to_dataframe,
)
# Convert a single file from an archive to a DataFrame
archive = client.jobs.download_archive(job_id)
df = job_to_dataframe(archive.read("journal_entries.json"))
# Convert ALL JSON files in the archive to DataFrames at once
frames = archive_to_dataframes(archive)
# {'journal_entries.json': DataFrame, 'banking/banking_customers.json': DataFrame, ...}
# Usage analytics as a time-indexed DataFrame
usage_df = usage_to_dataframe(client, days=30)from vynfi.integrations.polars import download_frame, usage_to_frame
df = download_frame(client, job_id, "journal_entries.json")
print(df.describe())from vynfi import (
VynFi,
AuthenticationError,
ForbiddenError,
InsufficientCreditsError,
NotFoundError,
RateLimitError,
ValidationError,
)
try:
job = client.generate(tables=[{"name": "journal_entries", "rows": 1000000}])
except InsufficientCreditsError:
print("Not enough credits")
except RateLimitError:
print("Too many requests — automatic retry exhausted")
except ValidationError as e:
print(f"Invalid request: {e}")client = VynFi(
api_key="vf_live_...",
base_url="https://api.vynfi.com", # default
timeout=30.0, # request timeout in seconds
max_retries=2, # automatic retry on 429/5xx
)
# Context manager support
with VynFi(api_key="vf_live_...") as client:
usage = client.usage.summary()Apache 2.0