From 4ba2362a24a38f29f46553f28e188ac7d1868401 Mon Sep 17 00:00:00 2001 From: Phil Lu Date: Tue, 8 Sep 2015 12:56:36 -0500 Subject: [PATCH 1/4] added Call and Go to rpc pool --- client/pool.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/client/pool.go b/client/pool.go index 3100de9..e802b3a 100644 --- a/client/pool.go +++ b/client/pool.go @@ -3,6 +3,7 @@ package rpc_client import ( "crypto/tls" "net" + "net/rpc" "strings" "sync" "time" @@ -169,3 +170,43 @@ func (p *ConnPool) RPC(addr net.Addr, stream_type string, version rpc_stream.Mux clnt_stream.Release() return err } + +// Call is used to make an RPC call to a remote host +func (p *ConnPool) Call(addr net.Addr, stream_type string, version rpc_stream.MuxVersion, + method string, args interface{}, reply interface{}) error { + + call, clnt_stream := p.Go(addr, stream_type, version, method, args, reply, nil) + call = <-call.Done + if clnt_stream != nil { + clnt_stream.Release() + } + return call.Error +} + +// Go is used to make an RPC Go call to a remote host +func (p *ConnPool) Go(addr net.Addr, stream_type string, version rpc_stream.MuxVersion, + method string, args interface{}, reply interface{}, done chan *rpc.Call) (*rpc.Call, *Conn) { + + st := strings.ToUpper(stream_type) + // sLog.Printf("Go: pool->%p addr: %s stream: %s method: %s", p, addr, st, method) + if reply == nil { + return &rpc.Call{ServiceMethod: method, Args: args, Reply: reply, Done: done, Error: ErrNeedReply}, nil + } + clnt_stream, err := p.getClnt(addr, st) + if err != nil { + sLog.Printf("rpc error: getClnt() %v", err) + return &rpc.Call{ServiceMethod: method, Args: args, Reply: reply, Done: done, Error: ErrNoClient}, clnt_stream + } + // sLog.Printf("@%p -> Go(%s, %s, %d, %s: Args: %#v)", clnt_stream, addr, st, version, method, args) + call := clnt_stream.rpc_clnt.Go(method, args, reply, done) + if call.Error != nil { + p.Shutdown(clnt_stream) + sLog.Printf("error on Go(): %v", err) + return &rpc.Call{ServiceMethod: method, Args: args, Reply: reply, Done: done, Error: ErrCallFailed}, clnt_stream + } + + // caller of this method needs to call this: + // clnt_stream.Release() + + return call, clnt_stream +} From dde67c01ef4ad0677736e27031ca163d6790c23a Mon Sep 17 00:00:00 2001 From: Daniel Garcia Date: Thu, 10 Sep 2015 13:26:19 +0000 Subject: [PATCH 2/4] allow message pack handle to be set at init time --- server/server.go | 13 ++++++++++--- server/svcrpc.go | 3 ++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/server/server.go b/server/server.go index a1ca094..5cccddc 100644 --- a/server/server.go +++ b/server/server.go @@ -13,6 +13,7 @@ import ( "github.com/stackengine/selog" "github.com/stackengine/serpc" "github.com/stackengine/ssltls" + "github.com/ugorji/go/codec" ) var ( @@ -64,6 +65,7 @@ type RPCImpl struct { rpc_l net.Listener rpc_svr *netrpc.Server lck sync.Mutex + mh *codec.MsgpackHandle shutdown bool } @@ -75,9 +77,10 @@ func (impl *RPCImpl) Server() *netrpc.Server { return impl.rpc_svr } -func (impl *RPCImpl) Init(tlscfg *ssltls.Cfg, enforce_secure bool, port int) error { +func (impl *RPCImpl) Init(tlscfg *ssltls.Cfg, enforce_secure bool, port int, mh *codec.MsgpackHandle) error { var err error + impl.mh = mh if tlscfg != nil { if impl.outboundTLS, err = tlscfg.OutgoingTLSConfig(); err != nil { return err @@ -225,9 +228,13 @@ func (impl *RPCImpl) MuxRPC(conn net.Conn, isTLS bool) { } func (impl *RPCImpl) serviceRPC(conn net.Conn) { - // codec := codec.GoRpc.ServerCodec(conn, impl.mh) sLog.Printf("Processing connection from %s", conn.RemoteAddr()) - impl.rpc_svr.ServeConn(conn) + if impl.mh == nil { + impl.rpc_svr.ServeConn(conn) + } else { + c := codec.GoRpc.ServerCodec(conn, impl.mh) + impl.rpc_svr.ServeCodec(c) + } sLog.Printf("Close connection from %s", conn.RemoteAddr()) conn.Close() } diff --git a/server/svcrpc.go b/server/svcrpc.go index f5de7e3..7007d32 100644 --- a/server/svcrpc.go +++ b/server/svcrpc.go @@ -4,10 +4,11 @@ import ( netrpc "net/rpc" "github.com/stackengine/ssltls" + "github.com/ugorji/go/codec" ) type SvcRPC interface { - Init(*ssltls.Cfg, bool, int) error + Init(*ssltls.Cfg, bool, int, *codec.MsgpackHandle) error Start() error Shutdown() Server() *netrpc.Server From 8264e410d0ff6ef8f7ccb618db3211f148fedb16 Mon Sep 17 00:00:00 2001 From: Daniel Garcia Date: Thu, 10 Sep 2015 13:43:40 +0000 Subject: [PATCH 3/4] make codec generic --- server/server.go | 30 +++++++++++++++--------------- server/svcrpc.go | 6 ++++-- 2 files changed, 19 insertions(+), 17 deletions(-) diff --git a/server/server.go b/server/server.go index 5cccddc..3896662 100644 --- a/server/server.go +++ b/server/server.go @@ -13,7 +13,6 @@ import ( "github.com/stackengine/selog" "github.com/stackengine/serpc" "github.com/stackengine/ssltls" - "github.com/ugorji/go/codec" ) var ( @@ -57,16 +56,18 @@ func Register(name string, obj interface{}) error { return nil } +var _ SvcRPC = &RPCImpl{} + type RPCImpl struct { - inboundTLS *tls.Config - isTLS bool - secure bool - outboundTLS *tls.Config - rpc_l net.Listener - rpc_svr *netrpc.Server - lck sync.Mutex - mh *codec.MsgpackHandle - shutdown bool + inboundTLS *tls.Config + isTLS bool + secure bool + outboundTLS *tls.Config + rpc_l net.Listener + rpc_svr *netrpc.Server + lck sync.Mutex + newServerCodec NewServerCodec + shutdown bool } func NewServer() *RPCImpl { @@ -77,10 +78,10 @@ func (impl *RPCImpl) Server() *netrpc.Server { return impl.rpc_svr } -func (impl *RPCImpl) Init(tlscfg *ssltls.Cfg, enforce_secure bool, port int, mh *codec.MsgpackHandle) error { +func (impl *RPCImpl) Init(tlscfg *ssltls.Cfg, enforce_secure bool, port int, newServerCodec NewServerCodec) error { var err error - impl.mh = mh + impl.newServerCodec = newServerCodec if tlscfg != nil { if impl.outboundTLS, err = tlscfg.OutgoingTLSConfig(); err != nil { return err @@ -229,11 +230,10 @@ func (impl *RPCImpl) MuxRPC(conn net.Conn, isTLS bool) { func (impl *RPCImpl) serviceRPC(conn net.Conn) { sLog.Printf("Processing connection from %s", conn.RemoteAddr()) - if impl.mh == nil { + if impl.newServerCodec == nil { impl.rpc_svr.ServeConn(conn) } else { - c := codec.GoRpc.ServerCodec(conn, impl.mh) - impl.rpc_svr.ServeCodec(c) + impl.rpc_svr.ServeCodec(impl.newServerCodec(conn)) } sLog.Printf("Close connection from %s", conn.RemoteAddr()) conn.Close() diff --git a/server/svcrpc.go b/server/svcrpc.go index 7007d32..eb9c176 100644 --- a/server/svcrpc.go +++ b/server/svcrpc.go @@ -1,14 +1,16 @@ package rpc_server import ( + "io" netrpc "net/rpc" "github.com/stackengine/ssltls" - "github.com/ugorji/go/codec" ) +type NewServerCodec func(conn io.ReadWriteCloser) netrpc.ServerCodec + type SvcRPC interface { - Init(*ssltls.Cfg, bool, int, *codec.MsgpackHandle) error + Init(*ssltls.Cfg, bool, int, NewServerCodec) error Start() error Shutdown() Server() *netrpc.Server From cd696dd87b8c95cf6359ad4560d482dc50fd1622 Mon Sep 17 00:00:00 2001 From: Daniel Garcia Date: Thu, 10 Sep 2015 13:50:20 +0000 Subject: [PATCH 4/4] make client codec generic --- client/client.go | 33 +++++++++++++++++---------------- client/pool.go | 33 ++++++++++++++++----------------- 2 files changed, 33 insertions(+), 33 deletions(-) diff --git a/client/client.go b/client/client.go index bf68e28..2704e56 100644 --- a/client/client.go +++ b/client/client.go @@ -3,6 +3,7 @@ package rpc_client import ( "crypto/tls" "fmt" + "io" "net" netrpc "net/rpc" "sync" @@ -12,25 +13,26 @@ import ( "github.com/stackengine/selog" "github.com/stackengine/serpc" "github.com/stackengine/ssltls" - "github.com/ugorji/go/codec" ) var sLog = selog.Register("clntrpc", 0) +type NewClientCodec func(conn io.ReadWriteCloser) netrpc.ClientCodec + type Conn struct { sync.Mutex - addr net.Addr - key string - lastUsed time.Time - mh *codec.MsgpackHandle - net_con net.Conn - pool *ConnPool - refCount int32 - rpc_clnt *netrpc.Client - shutdown int32 - stream_type string - version int + addr net.Addr + key string + lastUsed time.Time + newClientCodec NewClientCodec + net_con net.Conn + pool *ConnPool + refCount int32 + rpc_clnt *netrpc.Client + shutdown int32 + stream_type string + version int } func (c *Conn) String() string { @@ -38,7 +40,7 @@ func (c *Conn) String() string { c, c.stream_type, c.refCount, c.key, c.addr.String(), c.shutdown) } -func NewConn(mh *codec.MsgpackHandle, +func NewConn(newClientCodec NewClientCodec, addr net.Addr, stream_type string, key string, @@ -92,9 +94,8 @@ func NewConn(mh *codec.MsgpackHandle, // sLog.Printf("Wrote stream type for: '%s'", stream_type) var clnt *netrpc.Client - if mh != nil { - codec := codec.GoRpc.ClientCodec(conn, mh) - clnt = netrpc.NewClientWithCodec(codec) + if newClientCodec != nil { + clnt = netrpc.NewClientWithCodec(newClientCodec(conn)) } else { clnt = netrpc.NewClient(conn) } diff --git a/client/pool.go b/client/pool.go index e802b3a..c4a22a7 100644 --- a/client/pool.go +++ b/client/pool.go @@ -9,20 +9,19 @@ import ( "time" "github.com/stackengine/serpc" - "github.com/ugorji/go/codec" ) type ConnPool struct { sync.Mutex - maxTime time.Duration // The maximum time to keep a connection open - timo time.Duration // The maximum time to attempt net.Dail() - pool map[string]*Conn // Pool maps an address to a open connection - tlsConfig *tls.Config // TLS settings - shutdown bool // Used to indicate the pool is shutdown - shutdownCh chan struct{} - wg sync.WaitGroup - mh *codec.MsgpackHandle + maxTime time.Duration // The maximum time to keep a connection open + timo time.Duration // The maximum time to attempt net.Dail() + pool map[string]*Conn // Pool maps an address to a open connection + tlsConfig *tls.Config // TLS settings + shutdown bool // Used to indicate the pool is shutdown + shutdownCh chan struct{} + wg sync.WaitGroup + newClientCodec NewClientCodec } // Reap is used to close unused conns open over maxTime @@ -65,18 +64,18 @@ func (p *ConnPool) reap() { // Maintain at most one connection per host, for up to maxTime. // Set maxTime to 0 to disable reaping. // If TLS settings are provided outgoing connections use TLS. -func NewPool(mh *codec.MsgpackHandle, +func NewPool(newClientCodec NewClientCodec, maxTime time.Duration, timo time.Duration, tlsConfig *tls.Config) *ConnPool { pool := &ConnPool{ - maxTime: maxTime, - timo: timo, - pool: make(map[string]*Conn), - tlsConfig: tlsConfig, - shutdownCh: make(chan struct{}), - mh: mh, + maxTime: maxTime, + timo: timo, + pool: make(map[string]*Conn), + tlsConfig: tlsConfig, + shutdownCh: make(chan struct{}), + newClientCodec: newClientCodec, } if maxTime > 0 { go pool.reap() @@ -137,7 +136,7 @@ func (p *ConnPool) getClnt(addr net.Addr, st string) (*Conn, error) { key := addr.String() + "/" + st c = p.getConn(key) if c == nil { - c, err = NewConn(p.mh, addr, st, key, p.timo, p.tlsConfig) + c, err = NewConn(p.newClientCodec, addr, st, key, p.timo, p.tlsConfig) if err != nil { return nil, err }