Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -471,5 +471,4 @@ rocksdb-clean:
@if [ -d "third_party/rocksdb/src/v10.4.2" ]; then \
cd third_party/rocksdb/src/v10.4.2 && $(MAKE) clean; \
fi
@echo "$(GREEN)Clean complete$(NO_COLOR)"

@echo "$(GREEN)Clean complete$(NO_COLOR)"
73 changes: 73 additions & 0 deletions api/etcdgateway/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package etcdgateway

import (
"context"
"net/http"
"strings"

gw "go.etcd.io/etcd/api/v3/etcdserverpb/gw"

"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
)

// NewHandler creates an HTTP handler that serves the etcd v3 HTTP/JSON gateway routes
// (e.g. /v3/kv/range) and proxies them to the provided gRPC endpoint.
//
// The endpoint should be a gRPC dial target (typically host:port). If a port-only
// address like ":2379" is provided, it will be normalized to "127.0.0.1:2379".
func NewHandler(ctx context.Context, endpoint string, dialOpts []grpc.DialOption, muxOpts ...runtime.ServeMuxOption) (http.Handler, error) {
Comment thread
yantingqiu marked this conversation as resolved.
endpoint = normalizeEndpoint(endpoint)

// 参考 etcd
Comment thread
yantingqiu marked this conversation as resolved.
mux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard,
&runtime.HTTPBodyMarshaler{
Marshaler: &runtime.JSONPb{
MarshalOptions: protojson.MarshalOptions{
UseProtoNames: true,
EmitUnpopulated: false,
},
UnmarshalOptions: protojson.UnmarshalOptions{
DiscardUnknown: true,
},
},
},
),
)

// Register handlers for services implemented by MetaStore.
if err := gw.RegisterKVHandlerFromEndpoint(ctx, mux, endpoint, dialOpts); err != nil {
return nil, err
}
if err := gw.RegisterWatchHandlerFromEndpoint(ctx, mux, endpoint, dialOpts); err != nil {
return nil, err
}
if err := gw.RegisterLeaseHandlerFromEndpoint(ctx, mux, endpoint, dialOpts); err != nil {
return nil, err
}
if err := gw.RegisterClusterHandlerFromEndpoint(ctx, mux, endpoint, dialOpts); err != nil {
return nil, err
}
if err := gw.RegisterMaintenanceHandlerFromEndpoint(ctx, mux, endpoint, dialOpts); err != nil {
return nil, err
}
if err := gw.RegisterAuthHandlerFromEndpoint(ctx, mux, endpoint, dialOpts); err != nil {
return nil, err
}
Comment thread
yantingqiu marked this conversation as resolved.

return mux, nil
}

// TODO 使用 listen-client-urls 配置来规范化 endpoint
func normalizeEndpoint(endpoint string) string {
endpoint = strings.TrimSpace(endpoint)
if strings.HasPrefix(endpoint, ":") {
return "127.0.0.1" + endpoint
}
if strings.HasPrefix(endpoint, "0.0.0.0:") {
return "127.0.0.1" + strings.TrimPrefix(endpoint, "0.0.0.0")
}
return endpoint
}
Comment on lines +1 to +73
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new etcd grpc-gateway functionality lacks test coverage. Given that the repository has comprehensive automated testing for other API endpoints (HTTP, MySQL, etcd gRPC), tests should be added for the new gateway endpoints to verify the HTTP/JSON to gRPC proxying works correctly and handles error cases appropriately.

Copilot uses AI. Check for mistakes.
24 changes: 24 additions & 0 deletions api/etcdgateway/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package etcdgateway

import (
"net/http"

"metaStore/pkg/log"

"go.uber.org/zap"
)

// WithLogging wraps an HTTP handler and emits a structured log line per request.
func WithLogging(next http.Handler) http.Handler {
if next == nil {
return nil
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
log.Info("grpc-gateway request received",
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.String("component", "grpc-gateway"),
)
next.ServeHTTP(w, r)
})
}
15 changes: 12 additions & 3 deletions api/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@ type Server struct {
store kvstore.Store
confChangeC chan<- raftpb.ConfChange
httpServer *http.Server
etcdGateway http.Handler
}

// Config HTTP API configuration
type Config struct {
Store kvstore.Store
Port int
ConfChangeC chan<- raftpb.ConfChange
Store kvstore.Store
Port int
ConfChangeC chan<- raftpb.ConfChange
EtcdGatewayHandler http.Handler
}

// NewServer creates a new HTTP API server
func NewServer(cfg Config) *Server {
s := &Server{
store: cfg.Store,
confChangeC: cfg.ConfChangeC,
etcdGateway: cfg.EtcdGatewayHandler,
}

mux := http.NewServeMux()
Expand Down Expand Up @@ -80,6 +83,12 @@ func (s *Server) Stop() error {

// ServeHTTP handles HTTP requests
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// etcd v3 grpc-gateway routes take precedence when enabled.
if s.etcdGateway != nil && strings.HasPrefix(r.URL.Path, "/v3/") {
s.etcdGateway.ServeHTTP(w, r)
return
}

log.Info("HTTP request received",
zap.String("method", r.Method),
zap.String("uri", r.RequestURI),
Expand Down
61 changes: 53 additions & 8 deletions cmd/metastore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
package main

import (
"context"
"flag"
"fmt"
"net"
nethttp "net/http"
"os"
"strings"

"github.com/soheilhy/cmux"

// "metaStore/internal/batch" // 已禁用 BatchProposer
"metaStore/api/etcd"
"metaStore/api/etcdgateway"
"metaStore/api/http"
"metaStore/api/mysql"
"metaStore/internal/memory"
Expand All @@ -37,6 +40,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/raft/v3/raftpb"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
// "time" // disabled BatchProposer,no longer needed
)

Expand Down Expand Up @@ -204,14 +209,34 @@ func main() {
}

m := cmux.New(l)
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
grpcL := m.Match(cmux.HTTP2())
Comment thread
yantingqiu marked this conversation as resolved.
httpL := m.Match(cmux.HTTP1Fast())

// Start HTTP API server
var etcdGatewayHandler nethttp.Handler
if cfg.Server.EtcdGateway.Enable {
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(cfg.Server.GRPC.MaxRecvMsgSize),
grpc.MaxCallSendMsgSize(cfg.Server.GRPC.MaxSendMsgSize),
),
}
h, err := etcdgateway.NewHandler(context.Background(), cfg.Server.EtcdGateway.Endpoint, dialOpts)
Comment thread
yantingqiu marked this conversation as resolved.
if err != nil {
log.Fatalf("Failed to create etcd grpc-gateway handler: %v", err)
os.Exit(-1)
return
Comment thread
yantingqiu marked this conversation as resolved.
}
etcdGatewayHandler = etcdgateway.WithLogging(h)
log.Info("etcd grpc-gateway enabled", zap.String("endpoint", cfg.Server.EtcdGateway.Endpoint), zap.String("component", "main"))
}
Comment thread
yantingqiu marked this conversation as resolved.

httpServer := http.NewServer(http.Config{
Store: kvs,
Port: *kvport, // Configured port (mostly for display/struct init)
ConfChangeC: confChangeC,
Store: kvs,
Port: *kvport, // Configured port (mostly for display/struct init)
ConfChangeC: confChangeC,
EtcdGatewayHandler: etcdGatewayHandler,
})

go func() {
Expand Down Expand Up @@ -304,14 +329,34 @@ func main() {
}

m := cmux.New(l)
grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc"))
grpcL := m.Match(cmux.HTTP2())
Comment thread
yantingqiu marked this conversation as resolved.
httpL := m.Match(cmux.HTTP1Fast())

// Start HTTP API server
var etcdGatewayHandler nethttp.Handler
if cfg.Server.EtcdGateway.Enable {
dialOpts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(cfg.Server.GRPC.MaxRecvMsgSize),
grpc.MaxCallSendMsgSize(cfg.Server.GRPC.MaxSendMsgSize),
),
}
h, err := etcdgateway.NewHandler(context.Background(), cfg.Server.EtcdGateway.Endpoint, dialOpts)
Comment thread
yantingqiu marked this conversation as resolved.
if err != nil {
log.Fatalf("Failed to create etcd grpc-gateway handler: %v", err)
os.Exit(-1)
return
}
Comment thread
yantingqiu marked this conversation as resolved.
etcdGatewayHandler = etcdgateway.WithLogging(h)
log.Info("etcd grpc-gateway enabled", zap.String("endpoint", cfg.Server.EtcdGateway.Endpoint), zap.String("component", "main"))
}

httpServer := http.NewServer(http.Config{
Store: kvs,
Port: *kvport,
ConfChangeC: confChangeC,
Store: kvs,
Port: *kvport,
ConfChangeC: confChangeC,
EtcdGatewayHandler: etcdGatewayHandler,
})

go func() {
Expand Down
8 changes: 7 additions & 1 deletion configs/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ server:

# etcd gRPC 协议配置
etcd:
address: ":2379" # etcd gRPC 监听地址
address: ":22379" # etcd gRPC 监听地址

# etcd v3 HTTP/JSON grpc-gateway(可选)
# 当启用时,HTTP 请求 `/v3/**` 将被代理到 etcd gRPC 服务
etcd_gateway:
enable: true
endpoint: ":22379" # gRPC dial 目标地址(通常与 etcd.address 一致)

# HTTP REST API 配置
http:
Expand Down
Loading