Skip to content

gnikyt/shopifysemaphore

Repository files navigation

Shopify Semaphore

A service to assist with running Goroutines against limits of Shopify's GraphQL API. This service will "pause" running Goroutines if the configured API thresholds for point balance has been reached, pausing for a calculated duration, allowing for the point balance to refill completely before resuming.

Installation

go get github.com/gnikyt/shopifysemaphore

No external dependencies for this package.

Usage

Create a new Semaphore instance by supplying the capacity of the number of Goroutines you wish to run concurrently and information about the GraphQL point balance.

The two key methods are:

  • Acquire(ctx context.Context) which accepts a context which will return an error, if one has happened (such as a context timeout).
  • Release(pts *int32) which accepts an optional pointer to the remaining point balance returned by Shopify's GraphQL API response. Pass nil when the balance is already being updated externally.

Example usage when the worker updates the balance directly:

package main

import (
  "context"
  "log"
  "sync"
  "time"

  ssem "github.com/gnikyt/shopify-semaphore"
)

func work(id int, wg *sync.WaitGroup, ctx context.Context, sem *ssem.Semaphore) {
  err := sem.Acquire(ctx)
  if err != nil {
    // Possible context timeout.
    log.Printf("work: %w\n", err)
    wg.Done()
    return
  }
  var points *int32
  defer func() {
    sem.Release(points)
  }()

  // Return remaining points from call.
  remaining, err := graphQLCall()
  if err != nil {
    log.Printf("work: %w\n", err)
  } else {
    points = &remaining
    log.Printf("remaining: %d points\n", remaining)
  }

  wg.Done()
}

func main() {
  log.Println("started!")
  done := make(chan bool)
  ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Minute)

  // Semaphore with a concurrent capacity of 10.
  // Including a point balance setup with a threshold to pause at 200 points,
  // a maximum of 2000 points available, and a refill rate of 100 points per second.
  sem := ssem.New(
    10,
    ssem.NewBalance(200, 2000, 100),
    ssem.WithPauseFunc(func (pts int32, dur time.Duration) {
      log.Printf("pausing for %s due to remaining points of %d...\n", dur, pts)
    }),
    ssem.WithResumeFunc(func () {
      log.Println("resuming...")
    })
  )

  // Run 100 Goroutines.
  var wg sync.WaitGroup
  for i := 0; i < 100; i += 1 {
    wg.Add(1)
    go work(i, &wg, ctx, sem)
  }

  // Wait for completion of Goroutines.
  go func() {
    wg.Wait()
    done <- true
  }()

  select {
    case <-ctx.Done():
      log.Println("timeout happened.")
    case <-done:
      log.Println("work finished.")
  }
  log.Println("completed.")
}

Example usage when something such as an HTTP transport layer updates the balance externally by reading the GraphQL response and updating the points.

func work(ctx context.Context, sem *ssem.Semaphore) error {
  if err := sem.Acquire(ctx); err != nil {
    return err
  }
  defer sem.Release(nil)

  return graphQLCall(ctx)
}

Example output:

started!
remaining: 1840 points
remaining: 1710 points
remaining: 1660 points
...
remaining: 280 points
remaining: 190 points
pausing for 18 seconds due to remaining points of 190...
resuming...
remaining: 1890 points
remaining: 1810 points
...
work finished.
completed.

Additional options

The package also supports:

  • WithFailOnFull() to return ErrAtCapacity immediately instead of waiting for capacity.
  • WithBackoffStrategy(...) to customize pause duration calculation. Built-ins include FullRefillStrategy, ExponentialBackoffStrategy, and JitterBackoffStrategy.
  • ContextWithSemaphore, SemaphoreFromContext, and WithFunc helpers for passing a semaphore through context.Context. WithFunc accepts a callback that returns *int32, so it can either hand back points or nil.

Testing

go test -v ./...

Documentation

Run go doc -all to see the current exported API.

LICENSE

This project is released under the MIT license.

About

A service to assist with running Goroutines against limits of Shopify's GraphQL API.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages