This guide provides comprehensive documentation for using Splintr's Python and Rust APIs. For a quick start, see the main README.
The Tokenizer class is the main entry point for tokenization in Python.
Load a pretrained model:
from splintr import Tokenizer
# Load pretrained model (includes vocabulary and special tokens)
tokenizer = Tokenizer.from_pretrained("cl100k_base") # OpenAI GPT-4/3.5
tokenizer = Tokenizer.from_pretrained("o200k_base") # OpenAI GPT-4o
tokenizer = Tokenizer.from_pretrained("llama3") # Meta Llama 3 family
tokenizer = Tokenizer.from_pretrained("deepseek_v3") # DeepSeek V3/R1
tokenizer = Tokenizer.from_pretrained("mistral_v1") # Mistral 7B v0.1/v0.2, Mixtral 8x7B
tokenizer = Tokenizer.from_pretrained("mistral_v2") # Mistral 7B v0.3, Codestral, Mixtral 8x22B
tokenizer = Tokenizer.from_pretrained("mistral_v3") # Mistral NeMo, Large 2, PixtralLoad from custom vocabulary file:
from splintr import Tokenizer, CL100K_BASE_PATTERN
tokenizer = Tokenizer(
vocab_path="path/to/vocab.tiktoken",
pattern=CL100K_BASE_PATTERN,
special_tokens={"<|endoftext|>": 100257}
)Encode text to token IDs using sequential processing. This is optimal for most use cases with texts under 1MB.
tokens = tokenizer.encode("Hello, world!")
print(tokens) # [9906, 11, 1917, 0]Encode text while recognizing special tokens in the input. Special tokens are matched and encoded as single tokens rather than being split.
text = "<|endoftext|>This is a test"
tokens = tokenizer.encode_with_special(text)
# Special token <|endoftext|> becomes a single token IDEncode multiple texts in parallel using Rayon. This is where Splintr really shines, achieving 10-12x speedup over sequential processing.
texts = ["Hello, world!", "How are you?", "Machine learning is fun!"]
batch_tokens = tokenizer.encode_batch(texts)
# Returns: [[9906, 11, 1917, 0], [4438, 527, 499, 30], ...]Encode a single text using Rayon's internal parallelization. This is only beneficial for very large texts (>1MB). For typical use cases, encode() is faster.
# Only useful for very large texts
large_text = "..." * 1000000 # >1MB of text
tokens = tokenizer.encode_rayon(large_text)Decode token IDs back to text. Raises an error if the decoded bytes are not valid UTF-8.
tokens = [9906, 11, 1917, 0]
text = tokenizer.decode(tokens)
print(text) # "Hello, world!"Decode token IDs to raw bytes without UTF-8 validation.
tokens = [9906, 11, 1917, 0]
raw_bytes = tokenizer.decode_bytes(tokens)
print(raw_bytes) # b'Hello, world!'Decode token IDs to text, replacing any invalid UTF-8 sequences with the replacement character (�).
tokens = [9906, 11, 1917, 0]
text = tokenizer.decode_lossy(tokens)
# Invalid UTF-8 sequences become �The total vocabulary size including special tokens.
print(tokenizer.vocab_size) # e.g., 100311 for cl100k_base with agent tokensThe number of entries currently in the LRU cache.
print(tokenizer.cache_len) # Number of cached text chunksClear the LRU encoding cache. Useful if memory pressure is a concern.
tokenizer.clear_cache()The SentencePieceTokenizer class provides unigram tokenization for models using SentencePiece (e.g., loaded from GGUF files).
from splintr import SentencePieceTokenizer
# Create from raw vocabulary data
tokenizer = SentencePieceTokenizer(
tokens=["<unk>", "<s>", "</s>", "▁Hello", "▁world"],
scores=[0.0, 0.0, 0.0, -1.2, -1.5],
eos_token_id=2,
bos_token_id=1, # optional
)Encode text using greedy longest-match with score-based tie-breaking. Prepends BOS if configured.
ids = tokenizer.encode("Hello world")
# [1, 3, 4] (BOS + ▁Hello + ▁world)Decode token IDs to text. Skips BOS/EOS tokens, converts ▁ back to spaces.
text = tokenizer.decode([1, 3, 4])
# "Hello world"Decode token IDs, silently skipping any invalid (out-of-range) IDs.
text = tokenizer.decode_lossy([1, 3, 999, 4])
# "Hello world" (999 is skipped)vocab_size: int— Total vocabulary sizeeos_token_id: int— End-of-sequence token IDbos_token_id: int | None— Beginning-of-sequence token ID (if configured)
is_eos(token_id: int) -> bool— Check if a token is the EOS token
Streaming decoders are essential for real-time LLM applications where tokens arrive one at a time. They handle the critical problem of BPE tokens not aligning with UTF-8 character boundaries.
Use streaming_decoder() for standard tokenizers (cl100k_base, o200k_base, llama3).
BPE tokens don't align with UTF-8 character boundaries. A multi-byte Unicode character like "世" (3 bytes: 0xE4 0xB8 0x96) might split across tokens. The streaming decoder:
- Buffers incomplete byte sequences across token boundaries
- Only outputs text when complete UTF-8 characters are available
- Prevents display corruption in streaming LLM output
- Handles edge cases automatically
# Create a streaming decoder
decoder = tokenizer.streaming_decoder()
# Process tokens one at a time (typical LLM streaming scenario)
for token_id in token_stream:
# Returns text only when complete UTF-8 characters are available
if text := decoder.add_token(token_id):
print(text, end="", flush=True)
# Flush any remaining buffered bytes at the end
print(decoder.flush())import openai
from splintr import Tokenizer
tokenizer = Tokenizer.from_pretrained("cl100k_base")
decoder = tokenizer.streaming_decoder()
# Stream tokens from OpenAI API
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
# Process each token as it arrives
token_ids = get_token_ids(chunk) # pseudo-code
for token_id in token_ids:
if text := decoder.add_token(token_id):
print(text, end="", flush=True)
# Don't forget to flush at the end
print(decoder.flush())Core operations:
add_token(token_id: int) -> str | None: Add a token, return complete characters or None if bufferingadd_tokens(token_ids: list[int]) -> str | None: Add multiple tokens at onceflush() -> str: Flush buffered bytes (incomplete sequences become �)reset(): Clear the buffer and start fresh
Properties:
has_pending: bool: Whether there are buffered bytes waitingpending_bytes: int: Number of bytes currently buffered
For tokenizers using ByteLevel BPE encoding (DeepSeek V3, GPT-2), use byte_level_streaming_decoder() instead.
ByteLevel BPE encodes raw bytes (0-255) as printable Unicode characters (e.g., space 0x20 becomes Ġ). The ByteLevel streaming decoder handles this extra decoding step automatically:
- Decodes ByteLevel-encoded token bytes back to raw bytes
- Buffers incomplete UTF-8 sequences across token boundaries
- Only outputs text when complete UTF-8 characters are available
See bytelevel_bpe.md for details on ByteLevel encoding.
from splintr import Tokenizer
# DeepSeek V3 uses ByteLevel BPE encoding
tokenizer = Tokenizer.from_pretrained("deepseek_v3")
decoder = tokenizer.byte_level_streaming_decoder()
# Process tokens one at a time
for token_id in token_stream:
if text := decoder.add_token(token_id):
print(text, end="", flush=True)
print(decoder.flush())The ByteLevel streaming decoder has the same API as the regular streaming decoder:
add_token(token_id: int) -> str | Noneadd_tokens(token_ids: list[int]) -> str | Noneflush() -> strreset()has_pending: boolpending_bytes: int
The Rust API provides similar functionality with strongly-typed interfaces. For complete documentation, see docs.rs/splintr.
Add Splintr to your Cargo.toml:
[dependencies]
splintr = "*" # or pin to a specific versionAll tokenizer backends implement the Tokenize trait, enabling generic code:
use splintr::Tokenize;
fn count_tokens(tokenizer: &dyn Tokenize, text: &str) -> usize {
tokenizer.encode(text).len()
}Methods:
encode(&self, text: &str) -> Vec<u32>: Encode text to token IDsdecode(&self, ids: &[u32]) -> Result<String, TokenizeError>: Decode token IDs to textvocab_size(&self) -> usize: Vocabulary size
Implemented by Tokenizer (BPE), SentencePieceTokenizer (unigram), and WordPieceTokenizer (WordPiece).
use splintr::{Tokenizer, CL100K_BASE_PATTERN};
use rustc_hash::FxHashMap;
// Load vocabulary and create tokenizer
let encoder = load_tiktoken_bpe_file("cl100k_base.tiktoken")?;
let special_tokens = FxHashMap::default();
let tokenizer = Tokenizer::new(encoder, special_tokens, CL100K_BASE_PATTERN)?;
// Encode text
let tokens = tokenizer.encode("Hello, world!");
println!("{:?}", tokens);
// Batch encode
let texts = vec!["Hello".to_string(), "World".to_string()];
let batch_tokens = tokenizer.encode_batch(&texts);encode(&self, text: &str) -> Vec<u32>: Sequential encoding (optimal for texts <1MB)encode_with_special(&self, text: &str) -> Vec<u32>: Encode with special token recognitionencode_batch(&self, texts: &[String]) -> Vec<Vec<u32>>: Parallel encoding across textsencode_rayon(&self, text: &str) -> Vec<u32>: Parallel encoding within text (for texts >1MB)
decode(&self, tokens: &[u32]) -> Result<String, TokenizerError>: Decode to UTF-8 stringdecode_bytes(&self, tokens: &[u32]) -> Vec<u8>: Decode to raw bytesdecode_lossy(&self, tokens: &[u32]) -> String: Decode with replacement for invalid UTF-8
For models using SentencePiece unigram tokenization (e.g., Mistral V1/V2):
use splintr::SentencePieceTokenizer;
// Create from raw vocabulary data
let tokenizer = SentencePieceTokenizer::new(
tokens, // Vec<String> — token strings indexed by ID
scores, // Vec<f32> — scores for tie-breaking (empty for uniform)
Some(1), // Optional BOS token ID
2, // EOS token ID
)?;
// Encode (prepends BOS if configured, uses ▁ word boundaries)
let ids = tokenizer.encode("Hello world");
// Decode (skips BOS/EOS, converts ▁ back to spaces)
let text = tokenizer.decode(&ids)?;
// Lossy decode (skips invalid token IDs instead of erroring)
let text = tokenizer.decode_lossy(&ids);encode(&self, text: &str) -> Vec<u32>: Greedy longest-match encoding with score-based tie-breakingdecode(&self, ids: &[u32]) -> Result<String, SentencePieceError>: Decode to UTF-8 stringdecode_lossy(&self, ids: &[u32]) -> String: Decode, skipping invalid token IDsvocab_size(&self) -> usize: Vocabulary sizeis_eos(&self, token_id: u32) -> bool: Check if token is EOSeos_token_id(&self) -> u32: Get EOS token IDbos_token_id(&self) -> Option<u32>: Get BOS token ID
For BERT-family models using WordPiece subword tokenization:
use splintr::{WordPieceTokenizer, Tokenize};
// Create from a flat vocabulary (index = token ID)
let vocab = vec![
"[PAD]", "[UNK]", "[CLS]", "[SEP]",
"hello", "world", "##ing", "##s",
].into_iter().map(String::from).collect();
let tokenizer = WordPieceTokenizer::new(
vocab, // Vec<String> — token strings indexed by ID
1, // UNK token ID
200, // Max word length before mapping to UNK
true, // Lowercase and strip accents (for uncased models)
);
// Encode (BasicTokenizer + WordPiece greedy longest-match)
let ids = tokenizer.encode("Hello world");
// Decode (reconstructs text, skips [CLS]/[SEP]/[PAD] special tokens)
let text = tokenizer.decode(&ids)?;encode(&self, text: &str) -> Vec<u32>: BasicTokenizer + WordPiece subword tokenizationdecode(&self, ids: &[u32]) -> Result<String, TokenizeError>: Decode, joining subwords and removing##prefixesvocab_size(&self) -> usize: Vocabulary sizecls_token_id(&self) -> Option<u32>:[CLS]token IDsep_token_id(&self) -> Option<u32>:[SEP]token IDpad_token_id(&self) -> Option<u32>:[PAD]token IDunk_token_id(&self) -> u32:[UNK]token ID
The Rust API uses Result types for operations that can fail:
match tokenizer.decode(&tokens) {
Ok(text) => println!("Decoded: {}", text),
Err(e) => eprintln!("Decoding error: {}", e),
}from splintr import Tokenizer
# Load tokenizer
tokenizer = Tokenizer.from_pretrained("cl100k_base")
# Simple encoding
text = "The quick brown fox jumps over the lazy dog."
tokens = tokenizer.encode(text)
print(f"Text: {text}")
print(f"Tokens: {tokens}")
print(f"Token count: {len(tokens)}")
# Simple decoding
decoded = tokenizer.decode(tokens)
print(f"Decoded: {decoded}")
assert decoded == text
# Handle different languages
chinese = "你好世界"
tokens_cn = tokenizer.encode(chinese)
print(f"Chinese tokens: {tokens_cn}")
decoded_cn = tokenizer.decode(tokens_cn)
print(f"Decoded Chinese: {decoded_cn}")from splintr import Tokenizer
import time
tokenizer = Tokenizer.from_pretrained("cl100k_base")
# Prepare a batch of texts
texts = [
"First text to encode",
"Second text to encode",
"Third text with different content",
"Fourth text for batch processing",
] * 100 # 400 texts
# Measure batch encoding performance
start = time.time()
batch_tokens = tokenizer.encode_batch(texts)
elapsed = time.time() - start
print(f"Encoded {len(texts)} texts in {elapsed:.3f}s")
print(f"Throughput: {len(texts)/elapsed:.1f} texts/second")
# Process results
for i, tokens in enumerate(batch_tokens[:5]):
print(f"Text {i}: {len(tokens)} tokens")from splintr import Tokenizer
tokenizer = Tokenizer.from_pretrained("cl100k_base")
# Encode without special token recognition
# The special token gets split into multiple tokens
text = "Start <|endoftext|> End"
tokens_no_special = tokenizer.encode(text)
print(f"Without special tokens: {len(tokens_no_special)} tokens")
# Encode with special token recognition
# The special token becomes a single token
tokens_with_special = tokenizer.encode_with_special(text)
print(f"With special tokens: {len(tokens_with_special)} tokens")
# Verify the difference
decoded = tokenizer.decode(tokens_with_special)
print(f"Decoded: {decoded}")from splintr import Tokenizer, CL100K_AGENT_TOKENS, LLAMA3_AGENT_TOKENS, DEEPSEEK_V3_AGENT_TOKENS
# OpenAI models with agent tokens
tokenizer_openai = Tokenizer.from_pretrained("cl100k_base")
# Chain-of-Thought reasoning
cot_text = "<|think|>Let me break this down step by step...<|/think|>The answer is 42."
tokens = tokenizer_openai.encode_with_special(cot_text)
print(f"Thinking token ID: {CL100K_AGENT_TOKENS.THINK}")
print(f"Thinking end token ID: {CL100K_AGENT_TOKENS.THINK_END}")
# ReAct agent pattern
react_text = """<|plan|>I need to search for information
<|act|>search("climate change")
<|observe|>Found 10 results...
<|think|>Based on these results..."""
tokens = tokenizer_openai.encode_with_special(react_text)
print(f"Encoded {len(tokens)} tokens")
# Function calling
function_text = """<|function|>calculate_sum
<|result|>42
<|/result|>"""
tokens = tokenizer_openai.encode_with_special(function_text)
print(f"Function token ID: {CL100K_AGENT_TOKENS.FUNCTION}")
print(f"Result token ID: {CL100K_AGENT_TOKENS.RESULT}")
# RAG with citations
rag_text = """<|context|>This is source material...
<|cite|>According to the documentation...
<|source|>docs.example.com"""
tokens = tokenizer_openai.encode_with_special(rag_text)
print(f"Context token ID: {CL100K_AGENT_TOKENS.CONTEXT}")
print(f"Cite token ID: {CL100K_AGENT_TOKENS.CITE}")
# Llama 3 models
tokenizer_llama = Tokenizer.from_pretrained("llama3")
# Use Llama 3 native tokens
llama_text = "<|begin_of_text|><|start_header_id|>user<|end_header_id|>\nHello<|eot_id|>"
tokens = tokenizer_llama.encode_with_special(llama_text)
print(f"Llama begin_of_text: {LLAMA3_AGENT_TOKENS.BEGIN_OF_TEXT}")
print(f"Llama start_header_id: {LLAMA3_AGENT_TOKENS.START_HEADER_ID}")
# DeepSeek V3 models with native thinking tokens
tokenizer_deepseek = Tokenizer.from_pretrained("deepseek_v3")
# Use DeepSeek's native thinking tokens for R1-style reasoning
deepseek_text = "<think>Let me reason through this problem step by step...</think>The solution is X."
tokens = tokenizer_deepseek.encode_with_special(deepseek_text)
print(f"DeepSeek think token (native): {DEEPSEEK_V3_AGENT_TOKENS.THINK_NATIVE}")
print(f"DeepSeek think_end token (native): {DEEPSEEK_V3_AGENT_TOKENS.THINK_END_NATIVE}")
# DeepSeek V3 also has tool calling tokens
tool_text = """<|tool▁calls▁begin|>
<|tool▁call▁begin|>
function_name
<|tool▁call▁end|>
<|tool▁calls▁end|>"""
tokens = tokenizer_deepseek.encode_with_special(tool_text)
print(f"Encoded tool calling pattern with {len(tokens)} tokens")from splintr import Tokenizer
import time
tokenizer = Tokenizer.from_pretrained("cl100k_base")
# Simulate streaming token generation
text = "Hello, 世界! This is a test of streaming decoding with Unicode characters: 你好"
tokens = tokenizer.encode(text)
# Create streaming decoder
decoder = tokenizer.streaming_decoder()
print("Streaming output:")
for token in tokens:
# Simulate network delay
time.sleep(0.05)
# Add token and print if we get complete characters
if chunk := decoder.add_token(token):
print(chunk, end="", flush=True)
# Flush any remaining bytes
if remaining := decoder.flush():
print(remaining, end="", flush=True)
print("\n\nStreaming complete!")from splintr import Tokenizer
import time
tokenizer = Tokenizer.from_pretrained("deepseek_v3")
# Test text with Unicode
text = "DeepSeek V3 supports ByteLevel BPE! 中文测试"
tokens = tokenizer.encode(text)
# Create ByteLevel streaming decoder
decoder = tokenizer.byte_level_streaming_decoder()
print("ByteLevel streaming output:")
for token in tokens:
time.sleep(0.05)
if chunk := decoder.add_token(token):
print(chunk, end="", flush=True)
# Flush remaining
if remaining := decoder.flush():
print(remaining, end="", flush=True)
print("\n\nByteLevel streaming complete!")
# Check pending state
print(f"Has pending bytes: {decoder.has_pending}")
print(f"Pending byte count: {decoder.pending_bytes}")from splintr import Tokenizer
tokenizer = Tokenizer.from_pretrained("cl100k_base")
decoder = tokenizer.streaming_decoder()
def stream_tokens(token_generator):
"""Stream tokens with proper error handling."""
try:
for token_id in token_generator:
try:
if text := decoder.add_token(token_id):
yield text
except Exception as e:
print(f"\nError processing token {token_id}: {e}")
# Reset decoder and continue
decoder.reset()
continue
# Always flush at the end
if remaining := decoder.flush():
yield remaining
except Exception as e:
print(f"\nFatal streaming error: {e}")
# Final flush attempt
try:
if remaining := decoder.flush():
yield remaining
except:
pass
# Use the streaming function
text = "Test streaming with proper error handling"
tokens = tokenizer.encode(text)
for chunk in stream_tokens(iter(tokens)):
print(chunk, end="", flush=True)
print("\nDone!")-
Use
encode_batch()for multiple texts: This is where Splintr achieves 10-12x speedup. Always prefer batch encoding when you have multiple texts. -
Use
encode()for single texts: Don't useencode_rayon()unless your text is >1MB. The sequential implementation is faster for typical use cases. -
Cache frequently encoded text: Splintr includes an LRU cache. If you're encoding the same text repeatedly, the cache will speed things up automatically.
-
Clear cache if memory is tight: Use
clear_cache()if you're processing millions of unique texts and memory becomes a concern. -
Use streaming decoders for real-time output: Don't decode each token individually. Use
streaming_decoder()orbyte_level_streaming_decoder()to handle UTF-8 boundaries correctly. -
Choose the right special token encoding: Use
encode_with_special()only when your text actually contains special tokens. For regular text,encode()is faster.
- Main README - Quick start and overview
- Special Tokens Documentation - Complete agent tokens reference
- ByteLevel BPE Documentation - ByteLevel encoding details
- API Documentation (Rust) - Complete Rust API reference
- GitHub Repository - Source code and examples