Bounded, observable BEAM<->Python pipelines with shared state on ETS
Slither is a low-level concurrency substrate for BEAM + Python systems, built on top of SnakeBridge and Snakepit.
It gives you three primitives:
Store: ETS-backed shared state (lock-free reads, serialized writes), exposed to Python via views.Dispatch: batched fan-out to Python workers with bounded in-flight work (max_in_flight).Pipe: a DSL to compose BEAM stages, Python stages, and routing in one supervised flow.
Use Slither when you want Python to keep doing compute, while BEAM owns concurrency, state, and coordination.
Common wins:
- no unbounded queue growth (backpressure)
- no shared-thread state races in Python workers
- run-scoped session affinity for Python state
- consistent telemetry across runs, stages, and batches
defp deps do
[
{:slither, "~> 0.1.0"}
]
endThen:
mix deps.get
mix compile# config/runtime.exs
import Config
SnakeBridge.ConfigHelper.configure_snakepit!(pool_size: 2)defmodule MyApp.ScorePipe do
use Slither.Pipe
pipe :score do
stage :prepare, :beam,
handler: fn item, _ctx ->
%{"text" => item.payload}
end
stage :predict, :python,
executor: Slither.Dispatch.Executors.SnakeBridge,
module: "my_model",
function: "predict_batch",
pool: :default,
batch_size: 32,
max_in_flight: 4
stage :route, :router,
routes: [
{fn item -> item.payload["score"] >= 0.8 end, :accept},
{fn item -> item.payload["score"] >= 0.4 end, :review}
]
output :accept
output :review
output :default
on_error :predict, :skip
on_error :*, :halt
end
end{:ok, outputs} = Slither.run_pipe(MyApp.ScorePipe, ["first", "second", "third"])
accepted_payloads = Enum.map(outputs.accept, & &1.payload)
review_payloads = Enum.map(outputs.review, & &1.payload)mix slither.example
mix slither.example text_analysis
mix slither.example batch_stats
mix slither.example data_etl
mix slither.example ml_scoring
mix slither.example image_pipeline
mix slither.example --all
mix slither.example --no-baselineml_scoring and image_pipeline auto-install Python deps via Snakepit/uv.
Slither.Pipe.Runner.run/3: run a pipe and return output buckets.Slither.Pipe.Runner.stream/3: stream outputs lazily.Slither.run_pipe/3: convenience wrapper.
items = Slither.Item.wrap_many([1, 2, 3, 4])
{:ok, results} =
Slither.dispatch(items,
executor: Slither.Dispatch.Executors.SnakeBridge,
module: "my_model",
function: "predict_batch",
pool: :default,
batch_size: 64,
max_in_flight: 8,
ordering: :preserve,
on_error: :halt
)For large workloads, use Slither.Dispatch.stream/2.
defmodule MyApp.FeatureStore do
@behaviour Slither.Store
@impl true
def tables do
[
%{name: :features, type: :set, read_concurrency: true}
]
end
@impl true
def views do
[
%{
name: :lookup_feature,
mode: :scalar,
scope: :session,
handler: fn %{"key" => key}, _ctx ->
case Slither.Store.Server.get(__MODULE__, :features, key) do
nil -> %{"error" => "not_found"}
value -> %{"value" => value}
end
end,
timeout_ms: 5_000
}
]
end
@impl true
def load(tables) do
:ets.insert(tables[:features], {"user:42", %{tier: "gold"}})
:ok
end
endStart store processes by listing modules in config:
config :slither,
stores: [MyApp.FeatureStore]Read/write API:
Slither.Store.Server.get(MyApp.FeatureStore, :features, "user:42")
Slither.Store.Server.put(MyApp.FeatureStore, :features, "user:99", %{tier: "silver"})config :slither,
stores: [],
dispatch: [
default_batch_size: 64,
default_max_in_flight: 8,
default_ordering: :preserve,
default_on_error: :halt
],
bridge: [
default_scope: :session
]Slither emits under [:slither, ...].
- dispatch:
[:slither, :dispatch, :batch, :start|:stop|:exception] - pipe run:
[:slither, :pipe, :run, :start|:stop|:exception] - pipe stage:
[:slither, :pipe, :stage, :start|:stop|:exception] - store:
[:slither, :store, :write],[:slither, :store, :reload, :start|:stop] - bridge view callbacks:
[:slither, :bridge, :view, :start|:stop]
- Python module not found:
run examples through
mix slither.example ...soPYTHONPATHis set. - SnakeBridge/Snakepit calls failing:
verify runtime pool config in
config/runtime.exs. - Optional package import errors:
run
mix slither.example ml_scoringormix slither.example image_pipelineto auto-install deps.
guides/getting-started.mdguides/store.mdguides/dispatch.mdguides/pipe.mdguides/examples.mdguides/operations.md
MIT