8000 refactor: refactor generic streaming by yqaty · Pull Request #1770 · cloudwego/kitex · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

refactor: refactor generic streaming #1770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 182 additions & 2 deletions client/genericclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ package genericclient
import (
"context"
"runtime"
"sync"

"github.com/cloudwego/kitex/client"
"github.com/cloudwego/kitex/client/callopt"
"github.com/cloudwego/kitex/client/callopt/streamcall"
"github.com/cloudwego/kitex/pkg/generic"
"github.com/cloudwego/kitex/pkg/serviceinfo"
"github.com/cloudwego/kitex/pkg/streaming"
"github.com/cloudwego/kitex/transport"
)

var _ Client = &genericServiceClient{}
Expand All @@ -40,25 +44,40 @@ func NewClientWithServiceInfo(destService string, g generic.Generic, svcInfo *se
var options []client.Option
options = append(options, client.WithGeneric(g))
options = append(options, client.WithDestService(destService))
options = append(options, client.WithTransportProtocol(transport.TTHeaderStreaming))
options = append(options, opts...)

kc, err := client.NewClient(svcInfo, options...)
if err != nil {
return nil, err
}
var mp *sync.Map
if !generic.HasIDLInfo(g) {
mp = &sync.Map{}
}
cli := &genericServiceClient{
svcInfo: svcInfo,
kClient: kc,
sClient: kc.(client.Streaming),
g: g,
modeMap: mp,
}
runtime.SetFinalizer(cli, (*genericServiceClient).Close)

svcInfo.GenericMethod = func(name string) serviceinfo.MethodInfo {
m := svcInfo.Methods[serviceinfo.GenericMethod]
key := serviceinfo.GenericMethod
if mp != nil {
if mode, ok := mp.Load(name); ok {
key = getGenericStreamingMethodInfoKey(mode.(serviceinfo.StreamingMode))
}
return svcInfo.Methods[key]
}
n, err := g.GetMethod(nil, name)
if err != nil {
return m
return svcInfo.Methods[key]
}
key = getGenericStreamingMethodInfoKey(n.StreamingMode)
m := svcInfo.Methods[key]
return &methodInfo{
MethodInfo: m,
oneway: n.Oneway,
Expand All @@ -84,12 +103,23 @@ type Client interface {

// GenericCall generic call
GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error)
// ClientStreaming creates an implementation of ClientStreamingClient
ClientStreaming(ctx context.Context, method string, callOptions ...streamcall.Option) (ClientStreamingClient, error)
// ServerStreaming creates an implementation of ServerStreamingClient
ServerStreaming(ctx context.Context, method string, req interface{}, callOptions ...streamcall.Option) (ServerStreamingClient, error)
// BidirectionalStreaming creates an implementation of BidiStreamingClient
BidirectionalStreaming(ctx context.Context, method string, callOptions ...streamcall.Option) (BidiStreamingClient, error)
}

type genericServiceClient struct {
svcInfo *serviceinfo.ServiceInfo
kClient client.Client
sClient client.Streaming
g generic.Generic
// modeMap stores the streaming mode of methods for binary generic
// because the streaming mode of a method is not stored in binary generic which doesn't have IDL info
// but we can know it when creating different streaming clients
modeMap *sync.Map // map[string]serviceinfo.StreamingMode
}

func (gc *genericServiceClient) GenericCall(ctx context.Context, method string, request interface{}, callOptions ...callopt.Option) (response interface{}, err error) {
Expand Down Expand Up @@ -121,3 +151,153 @@ func (gc *genericServiceClient) Close() error {
// Notice: don't need to close kClient because finalizer will close it.
return gc.g.Close()
}

func (gc *genericServiceClient) ClientStreaming(ctx context.Context, method string, callOptions ...streamcall.Option) (ClientStreamingClient, error) {
ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions))
if gc.modeMap != nil {
gc.modeMap.LoadOrStore(method, serviceinfo.StreamingClient)
}
st, err := gc.sClient.StreamX(ctx, method)
if err != nil {
return nil, err
}
return newClientStreamingClient(gc.svcInfo.MethodInfo(method), method, st), nil
}

func (gc *genericServiceClient) ServerStreaming(ctx context.Context, method string, req interface{}, callOptions ...streamcall.Option) (ServerStreamingClient, error) {
ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions))
if gc.modeMap != nil {
gc.modeMap.LoadOrStore(method, serviceinfo.StreamingServer)
}
st, err := gc.sClient.StreamX(ctx, method)
if err != nil {
return nil, err
}
stream := newServerStreamingClient(gc.svcInfo.MethodInfo(method), method, st).(*serverStreamingClient)

args := stream.methodInfo.NewArgs().(*generic.Args)
args.Method = stream.method
args.Request = req
if err := st.SendMsg(ctx, args); err != nil {
return nil, err
}
if err := stream.CloseSend(ctx); err != nil {
return nil, err
}
return stream, nil
}

func (gc *genericServiceClient) BidirectionalStreaming(ctx context.Context, method string, callOptions ...streamcall.Option) (BidiStreamingClient, error) {
ctx = client.NewCtxWithCallOptions(ctx, streamcall.GetCallOptions(callOptions))
if gc.modeMap != nil {
gc.modeMap.LoadOrStore(method, serviceinfo.StreamingBidirectional)
}
st, err := gc.sClient.StreamX(ctx, method)
if err != nil {
return nil, err
}
return newBidiStreamingClient(gc.svcInfo.MethodInfo(method), method, st), nil
}

// ClientStreamingClient define client side generic client streaming APIs
type ClientStreamingClient interface {
Send(ctx context.Context, req interface{}) error
CloseAndRecv(ctx context.Context) (interface{}, error)
streaming.ClientStream
}

type clientStreamingClient struct {
methodInfo serviceinfo.MethodInfo
method string
streaming.ClientStream
}

func newClientStreamingClient(methodInfo serviceinfo.MethodInfo, method string, st streaming.ClientStream) ClientStreamingClient {
return &clientStreamingClient{
methodInfo: methodInfo,
method: method,
ClientStream: st,
}
}

func (c *clientStreamingClient) Send(ctx context.Context, req interface{}) error {
args := c.methodInfo.NewArgs().(*generic.Args)
args.Method = c.method
args.Request = req
return c.ClientStream.SendMsg(ctx, args)
}

func (c *clientStreamingClient) CloseAndRecv(ctx context.Context) (interface{}, error) {
if err := c.ClientStream.CloseSend(ctx); err != nil {
return nil, err
}
res := c.methodInfo.NewResult().(*generic.Result)
if err := c.ClientStream.RecvMsg(ctx, res); err != nil {
return nil, err
}
return res.GetSuccess(), nil
}

// ServerStreamingClient define client side generic server streaming APIs
type ServerStreamingClient interface {
Recv(ctx context.Context) (interface{}, error)
streaming.ClientStream
}

type serverStreamingClient struct {
methodInfo serviceinfo.MethodInfo
method string
streaming.ClientStream
}

func newServerStreamingClient(methodInfo serviceinfo.MethodInfo, method string, st streaming.ClientStream) ServerStreamingClient {
return &serverStreamingClient{
methodInfo: methodInfo,
method: method,
ClientStream: st,
}
}

func (c *serverStreamingClient) Recv(ctx context.Context) (interface{}, error) {
res := c.methodInfo.NewResult().(*generic.Result)
if err := c.ClientStream.RecvMsg(ctx, res); err != nil {
return nil, err
}
return res.GetSuccess(), nil
}

// BidiStreamingClient define client side generic bidirectional streaming APIs
type BidiStreamingClient interface {
Send(ctx context.Context, req interface{}) error
Recv(ctx context.Context) (interface{}, error)
streaming.ClientStream
}

type bidiStreamingClient struct {
methodInfo serviceinfo.MethodInfo
method string
streaming.ClientStream
}

func newBidiStreamingClient(methodInfo serviceinfo.MethodInfo, method string, st streaming.ClientStream) BidiStreamingClient {
return &bidiStreamingClient{
methodInfo: methodInfo,
method: method,
ClientStream: st,
}
}

func (c *bidiStreamingClient) Send(ctx context.Context, req interface{}) error {
args := c.methodInfo.NewArgs().(*generic.Args)
args.Method = c.method
args.Request = req
return c.ClientStream.SendMsg(ctx, args)
}

func (c *bidiStreamingClient) Recv(ctx context.Context) (interface{}, error) {
res := c.methodInfo.NewResult().(*generic.Result)
if err := c.ClientStream.RecvMsg(ctx, res); err != nil {
return nil, err
}
return res.GetSuccess(), nil
}
88 changes: 2 additions & 86 deletions client/genericclient/generic_stream_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,91 +21,7 @@ import (
"github.com/cloudwego/kitex/pkg/serviceinfo"
)

// Deprecated, use generic.ServiceInfoWithGeneric instead
func StreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
return newClientStreamingServiceInfo(g)
}

func newClientStreamingServiceInfo(g generic.Generic) *serviceinfo.ServiceInfo {
if g.PayloadCodec() != nil {
// TODO: support grpc binary generic
panic("binary generic streaming is not supported")
}

methods := map[string]serviceinfo.MethodInfo{
serviceinfo.GenericClientStreamingMethod: serviceinfo.NewMethodInfo(
nil,
func() interface{} {
args := &generic.Args{}
args.SetCodec(g.MessageReaderWriter())
return args
},
func() interface{} {
result := &generic.Result{}
result.SetCodec(g.MessageReaderWriter())
return result
},
false,
serviceinfo.WithStreamingMode(serviceinfo.StreamingClient),
),
serviceinfo.GenericServerStreamingMethod: serviceinfo.NewMethodInfo(
nil,
func() interface{} {
args := &generic.Args{}
args.SetCodec(g.MessageReaderWriter())
return args
},
func() interface{} {
result := &generic.Result{}
result.SetCodec(g.MessageReaderWriter())
return result
},
false,
serviceinfo.WithStreamingMode(serviceinfo.StreamingServer),
),
serviceinfo.GenericBidirectionalStreamingMethod: serviceinfo.NewMethodInfo(
nil,
func() interface{} {
args := &generic.Args{}
args.SetCodec(g.MessageReaderWriter())
return args
},
func() interface{} {
result := &generic.Result{}
result.SetCodec(g.MessageReaderWriter())
return result
},
false,
serviceinfo.WithStreamingMode(serviceinfo.StreamingBidirectional),
),
serviceinfo.GenericMethod: serviceinfo.NewMethodInfo(
nil,
func() interface{} {
args := &generic.Args{}
args.SetCodec(g.MessageReaderWriter())
return args
},
func() interface{} {
result := &generic.Result{}
result.SetCodec(g.MessageReaderWriter())
return result
},
false,
),
}
svcInfo := &serviceinfo.ServiceInfo{
ServiceName: g.IDLServiceName(),
Methods: methods,
PayloadCodec: g.PayloadCodecType(),
Extra: make(map[string]interface{}),
}
svcInfo.Extra["generic"] = true
if extra, ok := g.(generic.ExtraProvider); ok {
if extra.GetExtra(generic.CombineServiceKey) == "true" {
svcInfo.Extra["combine_service"] = true
}
if pkg := extra.GetExtra("PackageName"); pkg != "" {
svcInfo.Extra["PackageName"] = pkg
}
}
return svcInfo
return generic.ServiceInfoWithGeneric(g)
}
59 changes: 0 additions & 59 deletions client/genericclient/generic_stream_service_test.go

This file was deleted.

Loading
0