Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import (
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/x/authz"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
"github.com/gitopia/gitopia-go/logger"
gtypes "github.com/gitopia/gitopia/v5/x/gitopia/types"
rtypes "github.com/gitopia/gitopia/v5/x/rewards/types"
gtypes "github.com/gitopia/gitopia/v6/x/gitopia/types"
rtypes "github.com/gitopia/gitopia/v6/x/rewards/types"
storagetypes "github.com/gitopia/gitopia/v6/x/storage/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand All @@ -36,6 +38,8 @@ const (
type Query struct {
Gitopia gtypes.QueryClient
Rewards rtypes.QueryClient
Storage storagetypes.QueryClient
Bank banktypes.QueryClient
}
type Client struct {
cc client.Context
Expand Down Expand Up @@ -81,7 +85,9 @@ func GetQueryClient(addr string) (Query, error) {

gqc := gtypes.NewQueryClient(grpcConn)
rqc := rtypes.NewQueryClient(grpcConn)
return Query{gqc, rqc}, nil
sc := storagetypes.NewQueryClient(grpcConn)
bc := banktypes.NewQueryClient(grpcConn)
return Query{gqc, rqc, sc, bc}, nil
}

// implement io.Closer
Expand Down
6 changes: 3 additions & 3 deletions cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/cosmos/cosmos-sdk/version"
"github.com/cosmos/cosmos-sdk/x/auth/tx"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
gtypes "github.com/gitopia/gitopia/v5/x/gitopia/types"
otypes "github.com/gitopia/gitopia/v5/x/offchain/types"
rtypes "github.com/gitopia/gitopia/v5/x/rewards/types"
gtypes "github.com/gitopia/gitopia/v6/x/gitopia/types"
otypes "github.com/gitopia/gitopia/v6/x/offchain/types"
rtypes "github.com/gitopia/gitopia/v6/x/rewards/types"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.21
require (
github.com/cometbft/cometbft v0.37.6
github.com/cosmos/cosmos-sdk v0.47.13
github.com/gitopia/gitopia/v5 v5.1.0
github.com/gitopia/gitopia/v6 v6.0.0-rc.6
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/sirupsen/logrus v1.9.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyT
github.com/getsentry/sentry-go v0.23.0 h1:dn+QRCeJv4pPt9OjVXiMcGIBIefaTJPw/h0bZWO05nE=
github.com/getsentry/sentry-go v0.23.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gitopia/gitopia/v5 v5.1.0 h1:a/RySnjfpRYJwbkqW76QFmkBW/FihLDJHEpCtbF85Zg=
github.com/gitopia/gitopia/v5 v5.1.0/go.mod h1:t1FjB6j0LlGB2Ka1+Hu+XiARQ4NHhoVx9uKiR8+yhwc=
github.com/gitopia/gitopia/v6 v6.0.0-rc.6 h1:z64t9EkTxuQ4SuoE0O8Zi/NGLVV5lAP0JDGwsqfeCqs=
github.com/gitopia/gitopia/v6 v6.0.0-rc.6/go.mod h1:HujUCHweDG8AoLctQPt8L7EtL+Eu7L1KpyjVqGVEDr8=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand Down
9 changes: 0 additions & 9 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package logger

import (
"context"
"log"
"os"

"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

const LOG_FILE_EXT = ".log"
Expand All @@ -17,12 +14,6 @@ type logctx struct{}
func InitLogger(ctx context.Context, appName string) context.Context {
logger := logrus.New()

// current directory if WORKING_DIR is not configured
file, err := os.OpenFile(viper.GetString("WORKING_DIR")+appName+LOG_FILE_EXT, os.O_CREATE|os.O_WRONLY|os.O_APPEND, LOG_FILE_PERM)
if err != nil {
log.Fatalln("error opening log file: ", err.Error())
}
logger.SetOutput(file)
// todo: make configurable
logger.SetLevel(logrus.DebugLevel)
return ContextWithValue(ctx, logger)
Expand Down
2 changes: 1 addition & 1 deletion offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package gitopia

import (
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/gitopia/gitopia/v5/x/offchain/types"
"github.com/gitopia/gitopia/v6/x/offchain/types"
"github.com/pkg/errors"
)

Expand Down
148 changes: 97 additions & 51 deletions ws_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ package gitopia

import (
"context"
"sync"
"time"

"github.com/cometbft/cometbft/libs/log"
jsonrpcclient "github.com/cometbft/cometbft/rpc/jsonrpc/client"
jsonrpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types"
"github.com/gitopia/gitopia-go/logger"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

Expand All @@ -31,13 +30,14 @@ var (
type evenHandlerFunc func(context.Context, []byte) error

type WSEvents struct {
wsc *jsonrpcclient.WSClient
query string
wsc *jsonrpcclient.WSClient
queries map[string]bool // Track active subscriptions
mu sync.RWMutex // Protect queries map
}

func NewWSEvents(ctx context.Context, query string) (*WSEvents, error) {
func NewWSEvents(ctx context.Context) (*WSEvents, error) {
wse := &WSEvents{
query: query,
queries: make(map[string]bool),
}

var err error
Expand All @@ -46,24 +46,78 @@ func NewWSEvents(ctx context.Context, query string) (*WSEvents, error) {
jsonrpcclient.PingPeriod(TM_WS_PING_PERIOD),
jsonrpcclient.MaxReconnectAttempts(TM_WS_MAX_RECONNECT),
jsonrpcclient.OnReconnect(func() {
// resubscribe immediately
wse.subscribeAfter(0 * time.Second)
// Resubscribe to all queries after reconnection
wse.resubscribeAll()
}))
if err != nil {
return nil, errors.Wrap(err, "error creating ws client")
}

w := logger.FromContext(ctx).WriterLevel(logrus.DebugLevel)
l := log.NewTMLogger(log.NewSyncWriter(w))
wse.wsc.SetLogger(l)

if err := wse.wsc.Start(); err != nil {
return nil, errors.Wrap(err, "error connecting to WS")
}

return wse, nil
}

// Subscribe to a single query
func (wse *WSEvents) SubscribeQuery(ctx context.Context, query string) error {
wse.mu.Lock()
defer wse.mu.Unlock()

// Avoid duplicate subscriptions
if wse.queries[query] {
return nil // Already subscribed
}

err := wse.wsc.Subscribe(ctx, query)
if err != nil {
return errors.Wrap(err, "error sending subscribe request")
}

wse.queries[query] = true
return nil
}

// Subscribe to multiple queries at once
func (wse *WSEvents) SubscribeQueries(ctx context.Context, queries ...string) error {
for _, query := range queries {
if err := wse.SubscribeQuery(ctx, query); err != nil {
return err
}
}
return nil
}

// Unsubscribe from a specific query
func (wse *WSEvents) UnsubscribeQuery(ctx context.Context, query string) error {
wse.mu.Lock()
defer wse.mu.Unlock()

if !wse.queries[query] {
return nil // Not subscribed
}

if err := wse.wsc.Unsubscribe(ctx, query); err != nil {
return err
}

delete(wse.queries, query)
return nil
}

// Get list of active subscriptions
func (wse *WSEvents) GetActiveQueries() []string {
wse.mu.RLock()
defer wse.mu.RUnlock()

queries := make([]string, 0, len(wse.queries))
for query := range wse.queries {
queries = append(queries, query)
}
return queries
}

func terminateOnCancel(ctx context.Context) error {
select {
case <-ctx.Done():
Expand All @@ -73,53 +127,36 @@ func terminateOnCancel(ctx context.Context) error {
return nil
}

// processes events from tm
// cancel context to stop processing
// returns error on failure
// returns error when event handler returns error
// handler must handle all errors. it must return only fatal errors
func (wse *WSEvents) Subscribe(ctx context.Context, h evenHandlerFunc) (<-chan struct{}, chan error) {
// ProcessEvents handles events from all subscribed queries
// The handler receives all events and must filter/route them as needed
func (wse *WSEvents) ProcessEvents(ctx context.Context, h evenHandlerFunc) (<-chan struct{}, chan error) {
e := make(chan error)
done := make(chan struct{})

go func() {
defer func() { close(done) }()
logger.FromContext(ctx).Debug("subscribing to tm")
defer logger.FromContext(ctx).Debug("subscription done")

err := wse.wsc.Subscribe(ctx, wse.query)
if err != nil {
e <- errors.Wrap(err, "error sending subscribe request")
return
}
logger.FromContext(ctx).Debug("processing tm events")
defer logger.FromContext(ctx).Debug("event processing done")

//!! CAUTION!! all events are processed sequentially in order to support backfill!
// this might lead to event queue overflow on the chain and connection disconnection
for {
err = terminateOnCancel(ctx)
err := terminateOnCancel(ctx)
if err != nil {
e <- err
return
}

var event jsonrpctypes.RPCResponse
select {
case event = <-wse.wsc.ResponsesCh:
case <-wse.wsc.Quit():
e <- errors.New("ws conn closed")
return
}

if event.Error != nil {
logger.FromContext(ctx).Error("WS error", "err", event.Error.Error())
// Error can be ErrAlreadySubscribed or max client (subscriptions per
// client) reached or Tendermint exited.
// We can ignore ErrAlreadySubscribed, but need to retry in other
// cases.
// if !isErrAlreadySubscribed(event.Error) {
// // Resubscribe after 1 second to give Tendermint time to restart (if
// // crashed).
// wse.subscribeAfter(1 * time.Second)
// }
// OnReconnect handles this
mTmError.With(prometheus.Labels{"error": "ws_event_error"}).Inc()
continue
}
Expand All @@ -131,11 +168,13 @@ func (wse *WSEvents) Subscribe(ctx context.Context, h evenHandlerFunc) (<-chan s
mTmError.With(prometheus.Labels{"error": "parse_error"}).Inc()
continue
}

// hack: TM sends empty event to begin with. skipping
if string(jsonBuf) == "{}" {
logger.FromContext(ctx).Info("received empty event. continuing...")
// logger.FromContext(ctx).Info("received empty event. continuing...")
continue
}

err = h(ctx, jsonBuf)
if err != nil {
logger.FromContext(ctx).Error(errors.WithMessage(err, "error from event handler"))
Expand All @@ -148,21 +187,28 @@ func (wse *WSEvents) Subscribe(ctx context.Context, h evenHandlerFunc) (<-chan s
return ctx.Done(), e
}

func (wse *WSEvents) Unsubscribe(ctx context.Context, query string) error {
if err := wse.wsc.Unsubscribe(ctx, query); err != nil {
return err
// Resubscribe to all active queries (used after reconnection)
func (wse *WSEvents) resubscribeAll() {
wse.mu.RLock()
queries := make([]string, 0, len(wse.queries))
for query := range wse.queries {
queries = append(queries, query)
}
wse.mu.RUnlock()

return nil
}

// After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received.
func (wse *WSEvents) subscribeAfter(d time.Duration) {
time.Sleep(d)
time.Sleep(100 * time.Millisecond) // Small delay to ensure connection is ready

err := wse.wsc.Subscribe(context.Background(), wse.query)
if err != nil {
wse.wsc.Logger.Error("Failed to resubscribe", "err", err)
for _, query := range queries {
err := wse.wsc.Subscribe(context.Background(), query)
if err != nil {
wse.wsc.Logger.Error("Failed to resubscribe", "query", query, "err", err)
} else {
wse.wsc.Logger.Info("Resubscribed successfully", "query", query)
}
}
}

// Close the connection and cleanup
func (wse *WSEvents) Close() error {
return wse.wsc.Stop()
}