A service to assist with running Goroutines against limits of Shopify's GraphQL API. This service will "pause" running Goroutines if the configured API thresholds for point balance has been reached, pausing for a calculated duration, allowing for the point balance to refill completely before resuming.
go get github.com/gnikyt/shopifysemaphore
No external dependencies for this package.
Create a new Semaphore instance by supplying the capacity of the number of Goroutines you wish to run concurrently and information about the GraphQL point balance.
The two key methods are:
Acquire(ctx context.Context)which accepts a context which will return an error, if one has happened (such as a context timeout).Release(pts *int32)which accepts an optional pointer to the remaining point balance returned by Shopify's GraphQL API response. Passnilwhen the balance is already being updated externally.
Example usage when the worker updates the balance directly:
package main
import (
"context"
"log"
"sync"
"time"
ssem "github.com/gnikyt/shopify-semaphore"
)
func work(id int, wg *sync.WaitGroup, ctx context.Context, sem *ssem.Semaphore) {
err := sem.Acquire(ctx)
if err != nil {
// Possible context timeout.
log.Printf("work: %w\n", err)
wg.Done()
return
}
var points *int32
defer func() {
sem.Release(points)
}()
// Return remaining points from call.
remaining, err := graphQLCall()
if err != nil {
log.Printf("work: %w\n", err)
} else {
points = &remaining
log.Printf("remaining: %d points\n", remaining)
}
wg.Done()
}
func main() {
log.Println("started!")
done := make(chan bool)
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Minute)
// Semaphore with a concurrent capacity of 10.
// Including a point balance setup with a threshold to pause at 200 points,
// a maximum of 2000 points available, and a refill rate of 100 points per second.
sem := ssem.New(
10,
ssem.NewBalance(200, 2000, 100),
ssem.WithPauseFunc(func (pts int32, dur time.Duration) {
log.Printf("pausing for %s due to remaining points of %d...\n", dur, pts)
}),
ssem.WithResumeFunc(func () {
log.Println("resuming...")
})
)
// Run 100 Goroutines.
var wg sync.WaitGroup
for i := 0; i < 100; i += 1 {
wg.Add(1)
go work(i, &wg, ctx, sem)
}
// Wait for completion of Goroutines.
go func() {
wg.Wait()
done <- true
}()
select {
case <-ctx.Done():
log.Println("timeout happened.")
case <-done:
log.Println("work finished.")
}
log.Println("completed.")
}Example usage when something such as an HTTP transport layer updates the balance externally by reading the GraphQL response and updating the points.
func work(ctx context.Context, sem *ssem.Semaphore) error {
if err := sem.Acquire(ctx); err != nil {
return err
}
defer sem.Release(nil)
return graphQLCall(ctx)
}Example output:
started!
remaining: 1840 points
remaining: 1710 points
remaining: 1660 points
...
remaining: 280 points
remaining: 190 points
pausing for 18 seconds due to remaining points of 190...
resuming...
remaining: 1890 points
remaining: 1810 points
...
work finished.
completed.
The package also supports:
WithFailOnFull()to returnErrAtCapacityimmediately instead of waiting for capacity.WithBackoffStrategy(...)to customize pause duration calculation. Built-ins includeFullRefillStrategy,ExponentialBackoffStrategy, andJitterBackoffStrategy.ContextWithSemaphore,SemaphoreFromContext, andWithFunchelpers for passing a semaphore throughcontext.Context.WithFuncaccepts a callback that returns*int32, so it can either hand back points ornil.
go test -v ./...
Run go doc -all to see the current exported API.
This project is released under the MIT license.