Python SDK for the VynFi synthetic financial data API
Project description
VynFi Python SDK
The official Python client for the VynFi synthetic financial data API. Generate realistic financial datasets -- journal entries, chart of accounts, document flows, banking/AML data, ESG metrics, and more -- for audit analytics, fraud detection, compliance testing, and ML training.
Installation
pip install vynfi
With optional integrations:
pip install vynfi[pandas] # pandas DataFrame support
pip install vynfi[polars] # polars DataFrame support
pip install vynfi[all] # all integrations
Quick Start
from vynfi import VynFi
client = VynFi(api_key="vf_live_...")
# Generate synthetic financial data
job = client.generate(
tables=[{"name": "journal_entries", "rows": 5000}],
sector_slug="retail",
)
print(f"Job {job.id} submitted ({job.credits_reserved} credits)")
# Wait for completion (built-in polling)
completed = client.jobs.wait(job.id)
# Download and explore the archive
archive = client.jobs.download_archive(completed.id)
print(archive) # JobArchive(84 files, 1.5 GB)
# Access specific files
entries = archive.json("journal_entries.json")
print(f"{len(entries)} journal entry documents")
# Or download raw bytes
data = client.jobs.download(completed.id)
See the examples/ directory for 7 Jupyter notebooks and 7 standalone scripts covering audit analytics, fraud detection, document flows, process mining, ESG reporting, and AML compliance testing.
Resources
Catalog & Templates
# Browse available sectors
sectors = client.catalog.list_sectors()
for s in sectors:
print(f"{s.name}: {s.table_count} tables (quality={s.quality_score})")
# Get sector detail with table schemas
sector = client.catalog.get_sector("retail")
for table in sector.tables:
print(f" {table.name}: {len(table.columns)} columns")
# Browse system templates
templates = client.catalog.list_templates(sector="retail")
for t in templates:
print(f" {t.name} ({t.framework}, tier={t.min_tier})")
Jobs
# Async generation (large datasets)
job = client.jobs.generate(
tables=[{"name": "journal_entries", "rows": 50000}],
sector_slug="retail",
)
completed = client.jobs.wait(job.id)
# Quick synchronous generation (up to 10k rows)
result = client.jobs.generate_quick(
tables=[{"name": "journal_entries", "rows": 100}],
sector_slug="retail",
)
# Config-based generation
job = client.jobs.generate_config(
config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
)
# List and filter jobs
jobs = client.jobs.list(status="completed", limit=10)
# Download specific artifacts
data = client.jobs.download_file(job_id, "journal_entries.json")
# Stream progress via SSE
for event in client.jobs.stream(job.id):
if event["event"] == "progress":
print(f"{event['data']['percent']}%")
Saved Configs
# Save a generation config for reuse
cfg = client.configs.create(
name="Monthly Retail",
config={"sector": "retail", "rows": 10000, "exportFormat": "csv"},
tags=["recurring", "retail"],
)
# Validate before running
result = client.configs.validate(config={"sector": "retail", "rows": 100})
print(f"Valid: {result.valid}, errors: {len(result.errors)}")
# Estimate cost before running
est = client.configs.estimate_cost(config={"sector": "retail", "rows": 50000})
print(f"Estimated: {est.total_credits} credits")
Multi-Period Sessions
# Create a fiscal-year session
session = client.sessions.create(
name="FY2026",
fiscal_year_start="2026-01-01",
period_length_months=3,
periods=4,
generation_config={"sector": "retail", "rows": 10000},
)
# Generate each period sequentially
for _ in range(session.periods_total):
resp = client.sessions.generate_next(session.id)
print(f"Period {resp.period_index}: job {resp.job_id}")
What-If Scenarios
# List causal graph templates
templates = client.scenarios.templates()
# Create a scenario
scenario = client.scenarios.create(
name="Fraud Spike",
template_id="supply-chain",
interventions={"fraudRate": 0.05},
generation_config={"sector": "retail", "rows": 10000},
)
# Run baseline vs counterfactual
scenario = client.scenarios.run(scenario.id)
# Get diff analysis
scenario = client.scenarios.diff(scenario.id)
Job Archives
# Download the output archive with easy file access
archive = client.jobs.download_archive(job_id)
# Explore contents
print(archive.backend) # "zip" (legacy) or "managed_blob" (TB-scale)
print(archive.files()) # all 80+ files
print(archive.categories()) # ['banking', 'document_flows', 'esg', ...]
print(archive.summary()) # file counts and sizes by category
# Access specific files (lazy fetch via presigned URL for managed_blob)
entries = archive.json("journal_entries.json")
coa = archive.json("chart_of_accounts.json")
# Find files by pattern
banking_files = archive.find("banking/*")
esg_files = archive.find("esg/*")
# Extract everything to disk
archive.extract_to("./output")
Scenario Packs (DataSynth 3.0+)
# List 11 built-in scenario packs
packs = client.scenarios.packs()
for p in packs:
print(f"{p.category}: {p.name} — {p.description}")
# Run a scenario pack
scenario = client.scenarios.create(
name="Q3 revenue stress test",
generation_config={
"sector": "retail", "rows": 10000,
"scenarios": {
"enabled": True,
"packs": ["channel_stuffing"],
"diffFormats": ["summary", "record_level"],
},
},
)
client.scenarios.run(scenario.id) # spawns baseline + counterfactual
diff = client.scenarios.diff(scenario.id)
AI Tuning & Co-pilot (DataSynth 3.0+, Scale+)
# LLM suggests config improvements based on quality scores
suggestion = client.jobs.tune(job_id, target_scores={"overall": 0.95})
print(suggestion.explanation)
print("Change rows:", suggestion.original_config.get("rows"),
"->", suggestion.suggested_config.get("rows"))
# Ask the dashboard co-pilot
reply = client.ai.chat("Which fraud packs give me the best audit training?")
print(reply.reply)
Fingerprint Synthesis (DataSynth 3.0+, Team+)
# Privacy-preserving synthesis from a .dsf fingerprint file
submission = client.fingerprint.synthesize(
"./my_data.dsf",
rows=10000,
backend="statistical", # or "neural" / "hybrid" (Scale+)
)
job = client.jobs.wait(submission.job_id)
Adversarial Probing (DataSynth 3.0+, Enterprise)
# Probe an ONNX fraud detector for decision-boundary weaknesses
probe = client.adversarial.probe(
"./my_model.onnx",
n_probes=10000,
perturbation_budget=0.05,
threshold=0.5,
)
# ... wait for probe to complete ...
results = client.adversarial.results(probe.id)
print(f"Mean margin: {results.mean_margin:.3f}")
print(f"Positive rate: {results.positive_rate:.1%}")
Pre-built Analytics (DataSynth 2.3+)
# Get statistical evaluations for a completed job
a = client.jobs.analytics(job_id)
# Benford's Law conformity on amounts
print(f"MAD: {a.benford_analysis.mad:.4f}")
print(f"Conformity: {a.benford_analysis.conformity}")
# Amount distribution statistics
print(f"Skewness: {a.amount_distribution.skewness:.2f}")
print(f"Round number ratio: {a.amount_distribution.round_number_ratio:.2%}")
# Process variants
print(f"Happy path: {a.process_variant_summary.happy_path_concentration:.2%}")
# Banking evaluation (KYC, AML, cross-layer, velocity, false-positive)
print(f"Banking passes: {a.banking_evaluation.passes}")
NDJSON Streaming (Scale tier+)
# Rate-controlled streaming for TB-scale jobs
for envelope in client.jobs.stream_ndjson(job_id, rate=500, progress_interval=1000):
if envelope.get("type") == "_progress":
print(f" {envelope['lines_emitted']:,} lines emitted")
else:
# Process each data record
my_pipeline.send(envelope)
Output Mode (DataSynth 2.3+)
# Use native JSON numbers and flat layout to skip conversion boilerplate
job = client.jobs.generate_config(config={
"sector": "retail",
"rows": 1000,
# ...
"output": {
"numericMode": "native", # numbers, not strings
"exportLayout": "flat", # one row per line, header merged
},
})
Storage Quota (TB-scale)
# Validate output size against tier quota before submitting
size = client.configs.estimate_size(config=my_config)
print(f"Estimated: {size.estimated_bytes / 1e9:.1f} GB across {size.estimated_files} files")
print(f"Quota: {size.tier_quota_bytes / 1e9:.0f} GB")
if size.exceeds_quota:
print(f"WARNING: {size.warning}")
for bucket in size.breakdown:
print(f" {bucket.domain}: {bucket.bytes / 1e6:.0f} MB")
Usage & Credits
# Usage summary
usage = client.usage.summary()
print(f"Balance: {usage.balance} credits, burn rate: {usage.burn_rate}/day")
# Daily breakdown
daily = client.usage.daily()
for d in daily.daily:
print(f" {d.date}: {d.credits} credits")
# Prepaid credit balance
balance = client.credits.balance()
print(f"Prepaid: {balance.total_prepaid_credits}")
# Purchase credits
resp = client.credits.purchase(pack="10k")
print(f"Checkout: {resp.checkout_url}")
Quality Scores
scores = client.quality.scores()
for s in scores:
print(f"Job {s.job_id}: overall={s.overall_score:.2f}")
timeline = client.quality.timeline(days=30)
API Keys, Webhooks, Billing, Notifications
# API keys
key = client.api_keys.create(name="CI pipeline", environment="test")
print(f"Key: {key.key}") # Only shown once!
# Webhooks
hook = client.webhooks.create(
url="https://example.com/webhook",
events=["job.completed", "job.failed"],
)
# Billing
sub = client.billing.subscription()
portal = client.billing.portal()
print(f"Manage billing: {portal.portal_url}")
# Notifications
unread = client.notifications.list(unread=True)
client.notifications.mark_read(all=True)
Ecosystem Integrations
pandas
from vynfi.integrations.pandas import (
job_to_dataframe,
archive_to_dataframes,
usage_to_dataframe,
)
# Convert a single file from an archive to a DataFrame
archive = client.jobs.download_archive(job_id)
df = job_to_dataframe(archive.read("journal_entries.json"))
# Convert ALL JSON files in the archive to DataFrames at once
frames = archive_to_dataframes(archive)
# {'journal_entries.json': DataFrame, 'banking/banking_customers.json': DataFrame, ...}
# Usage analytics as a time-indexed DataFrame
usage_df = usage_to_dataframe(client, days=30)
polars
from vynfi.integrations.polars import download_frame, usage_to_frame
df = download_frame(client, job_id, "journal_entries.json")
print(df.describe())
Error Handling
from vynfi import (
VynFi,
AuthenticationError,
ForbiddenError,
InsufficientCreditsError,
NotFoundError,
RateLimitError,
ValidationError,
)
try:
job = client.generate(tables=[{"name": "journal_entries", "rows": 1000000}])
except InsufficientCreditsError:
print("Not enough credits")
except RateLimitError:
print("Too many requests — automatic retry exhausted")
except ValidationError as e:
print(f"Invalid request: {e}")
Configuration
client = VynFi(
api_key="vf_live_...",
base_url="https://api.vynfi.com", # default
timeout=30.0, # request timeout in seconds
max_retries=2, # automatic retry on 429/5xx
)
# Context manager support
with VynFi(api_key="vf_live_...") as client:
usage = client.usage.summary()
License
Apache 2.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Filter files by name, interpreter, ABI, and platform.
If you're not sure about the file name format, learn more about wheel file names.
Copy a direct link to the current filters
File details
Details for the file vynfi-1.8.0.tar.gz.
File metadata
- Download URL: vynfi-1.8.0.tar.gz
- Upload date:
- Size: 4.0 MB
- Tags: Source
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
eba78f7860870a6d9a0eb275c60aed556d500a4967150bab57bc9cd04576cfae
|
|
| MD5 |
496e92e52aac5d76d8402cbb2c25efdf
|
|
| BLAKE2b-256 |
1493077664d6d64438da6f45d2b07d057f02396df3c7af68df4634ce53f7df1e
|
Provenance
The following attestation bundles were made for vynfi-1.8.0.tar.gz:
Publisher:
publish.yml on VynFi/VynFi-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
vynfi-1.8.0.tar.gz -
Subject digest:
eba78f7860870a6d9a0eb275c60aed556d500a4967150bab57bc9cd04576cfae - Sigstore transparency entry: 1361418138
- Sigstore integration time:
-
Permalink:
VynFi/VynFi-python@04b08b3b2a4b481710964d7078d7877467fc26fe -
Branch / Tag:
refs/tags/v1.8.0 - Owner: https://github.com/VynFi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@04b08b3b2a4b481710964d7078d7877467fc26fe -
Trigger Event:
release
-
Statement type:
File details
Details for the file vynfi-1.8.0-py3-none-any.whl.
File metadata
- Download URL: vynfi-1.8.0-py3-none-any.whl
- Upload date:
- Size: 54.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? Yes
- Uploaded via: twine/6.1.0 CPython/3.13.12
File hashes
| Algorithm | Hash digest | |
|---|---|---|
| SHA256 |
381ab4059512f5011c601d810799d448c6107ea3a350a2e850a41110abf9fb33
|
|
| MD5 |
7b2491388619e08a59876c64437b6602
|
|
| BLAKE2b-256 |
1aa85d518207f1028890a87d0cdd6801e43ab7fa59b73e7e321021385324aa42
|
Provenance
The following attestation bundles were made for vynfi-1.8.0-py3-none-any.whl:
Publisher:
publish.yml on VynFi/VynFi-python
-
Statement:
-
Statement type:
https://in-toto.io/Statement/v1 -
Predicate type:
https://docs.pypi.org/attestations/publish/v1 -
Subject name:
vynfi-1.8.0-py3-none-any.whl -
Subject digest:
381ab4059512f5011c601d810799d448c6107ea3a350a2e850a41110abf9fb33 - Sigstore transparency entry: 1361418152
- Sigstore integration time:
-
Permalink:
VynFi/VynFi-python@04b08b3b2a4b481710964d7078d7877467fc26fe -
Branch / Tag:
refs/tags/v1.8.0 - Owner: https://github.com/VynFi
-
Access:
public
-
Token Issuer:
https://token.actions.githubusercontent.com -
Runner Environment:
github-hosted -
Publication workflow:
publish.yml@04b08b3b2a4b481710964d7078d7877467fc26fe -
Trigger Event:
release
-
Statement type: