Skip to content

nshkrdotcom/slither

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Slither

Bounded, observable BEAM<->Python pipelines with shared state on ETS

Hex.pm HexDocs License

Slither is a low-level concurrency substrate for BEAM + Python systems, built on top of SnakeBridge and Snakepit.

It gives you three primitives:

  • Store: ETS-backed shared state (lock-free reads, serialized writes), exposed to Python via views.
  • Dispatch: batched fan-out to Python workers with bounded in-flight work (max_in_flight).
  • Pipe: a DSL to compose BEAM stages, Python stages, and routing in one supervised flow.

Why use Slither

Use Slither when you want Python to keep doing compute, while BEAM owns concurrency, state, and coordination.

Common wins:

  • no unbounded queue growth (backpressure)
  • no shared-thread state races in Python workers
  • run-scoped session affinity for Python state
  • consistent telemetry across runs, stages, and batches

Installation

defp deps do
  [
    {:slither, "~> 0.1.0"}
  ]
end

Then:

mix deps.get
mix compile

Quick Start

1) Configure a Python worker pool

# config/runtime.exs
import Config

SnakeBridge.ConfigHelper.configure_snakepit!(pool_size: 2)

2) Define a pipeline

defmodule MyApp.ScorePipe do
  use Slither.Pipe

  pipe :score do
    stage :prepare, :beam,
      handler: fn item, _ctx ->
        %{"text" => item.payload}
      end

    stage :predict, :python,
      executor: Slither.Dispatch.Executors.SnakeBridge,
      module: "my_model",
      function: "predict_batch",
      pool: :default,
      batch_size: 32,
      max_in_flight: 4

    stage :route, :router,
      routes: [
        {fn item -> item.payload["score"] >= 0.8 end, :accept},
        {fn item -> item.payload["score"] >= 0.4 end, :review}
      ]

    output :accept
    output :review
    output :default

    on_error :predict, :skip
    on_error :*, :halt
  end
end

3) Run it

{:ok, outputs} = Slither.run_pipe(MyApp.ScorePipe, ["first", "second", "third"])

accepted_payloads = Enum.map(outputs.accept, & &1.payload)
review_payloads = Enum.map(outputs.review, & &1.payload)

Run the built-in demos

mix slither.example
mix slither.example text_analysis
mix slither.example batch_stats
mix slither.example data_etl
mix slither.example ml_scoring
mix slither.example image_pipeline
mix slither.example --all
mix slither.example --no-baseline

ml_scoring and image_pipeline auto-install Python deps via Snakepit/uv.

Core APIs

Pipe (orchestration)

  • Slither.Pipe.Runner.run/3: run a pipe and return output buckets.
  • Slither.Pipe.Runner.stream/3: stream outputs lazily.
  • Slither.run_pipe/3: convenience wrapper.

Dispatch (batched Python calls)

items = Slither.Item.wrap_many([1, 2, 3, 4])

{:ok, results} =
  Slither.dispatch(items,
    executor: Slither.Dispatch.Executors.SnakeBridge,
    module: "my_model",
    function: "predict_batch",
    pool: :default,
    batch_size: 64,
    max_in_flight: 8,
    ordering: :preserve,
    on_error: :halt
  )

For large workloads, use Slither.Dispatch.stream/2.

Store (shared state on ETS)

defmodule MyApp.FeatureStore do
  @behaviour Slither.Store

  @impl true
  def tables do
    [
      %{name: :features, type: :set, read_concurrency: true}
    ]
  end

  @impl true
  def views do
    [
      %{
        name: :lookup_feature,
        mode: :scalar,
        scope: :session,
        handler: fn %{"key" => key}, _ctx ->
          case Slither.Store.Server.get(__MODULE__, :features, key) do
            nil -> %{"error" => "not_found"}
            value -> %{"value" => value}
          end
        end,
        timeout_ms: 5_000
      }
    ]
  end

  @impl true
  def load(tables) do
    :ets.insert(tables[:features], {"user:42", %{tier: "gold"}})
    :ok
  end
end

Start store processes by listing modules in config:

config :slither,
  stores: [MyApp.FeatureStore]

Read/write API:

Slither.Store.Server.get(MyApp.FeatureStore, :features, "user:42")
Slither.Store.Server.put(MyApp.FeatureStore, :features, "user:99", %{tier: "silver"})

Configuration knobs

config :slither,
  stores: [],
  dispatch: [
    default_batch_size: 64,
    default_max_in_flight: 8,
    default_ordering: :preserve,
    default_on_error: :halt
  ],
  bridge: [
    default_scope: :session
  ]

Telemetry events

Slither emits under [:slither, ...].

  • dispatch: [:slither, :dispatch, :batch, :start|:stop|:exception]
  • pipe run: [:slither, :pipe, :run, :start|:stop|:exception]
  • pipe stage: [:slither, :pipe, :stage, :start|:stop|:exception]
  • store: [:slither, :store, :write], [:slither, :store, :reload, :start|:stop]
  • bridge view callbacks: [:slither, :bridge, :view, :start|:stop]

Troubleshooting

  • Python module not found: run examples through mix slither.example ... so PYTHONPATH is set.
  • SnakeBridge/Snakepit calls failing: verify runtime pool config in config/runtime.exs.
  • Optional package import errors: run mix slither.example ml_scoring or mix slither.example image_pipeline to auto-install deps.

Guides

  • guides/getting-started.md
  • guides/store.md
  • guides/dispatch.md
  • guides/pipe.md
  • guides/examples.md
  • guides/operations.md

License

MIT

About

Lightweight Elixir runtime for composing and executing Python-backed data pipelines with automatic dependency resolution, lazy evaluation, fault-tolerant stage supervision, and seamless Snakebridge/Snakepit integration for production-grade cross-language AI and scientific computing workflows on the BEAM.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors