Skip to content

Commit

Permalink
feat: unified proto definition (#412)
Browse files Browse the repository at this point in the history
Signed-off-by: jyjiangkai <[email protected]>
  • Loading branch information
hwjiangkai committed Feb 2, 2023
1 parent 197c576 commit 1157347
Show file tree
Hide file tree
Showing 3 changed files with 822 additions and 254 deletions.
19 changes: 9 additions & 10 deletions internal/gateway/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/cloudevents/sdk-go/v2/types"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
vanuspb "github.com/linkall-labs/sdk/proto/pkg/vanus"
eb "github.com/linkall-labs/vanus/client"
"github.com/linkall-labs/vanus/client/pkg/api"
"github.com/linkall-labs/vanus/client/pkg/option"
Expand Down Expand Up @@ -95,7 +94,7 @@ type Config struct {
}

var (
_ vanuspb.ClientServer = &ControllerProxy{}
_ proxypb.StoreProxyServer = &ControllerProxy{}
)

type ackCallback func(bool)
Expand All @@ -108,12 +107,12 @@ type message struct {
type subscribeCache struct {
sequenceID uint64
subscriptionID string
subscribeStream vanuspb.Client_SubscribeServer
subscribeStream proxypb.StoreProxy_SubscribeServer
acks sync.Map
eventc chan message
}

func newSubscribeCache(subscriptionID string, stream vanuspb.Client_SubscribeServer) *subscribeCache {
func newSubscribeCache(subscriptionID string, stream proxypb.StoreProxy_SubscribeServer) *subscribeCache {
return &subscribeCache{
sequenceID: 0,
subscriptionID: subscriptionID,
Expand All @@ -127,7 +126,7 @@ func (s *subscribeCache) ch() chan message {
return s.eventc
}

func (s *subscribeCache) stream() vanuspb.Client_SubscribeServer {
func (s *subscribeCache) stream() proxypb.StoreProxy_SubscribeServer {
return s.subscribeStream
}

Expand All @@ -144,7 +143,7 @@ type ControllerProxy struct {
cache sync.Map
}

func (cp *ControllerProxy) Publish(ctx context.Context, req *vanuspb.PublishRequest) (*emptypb.Empty, error) {
func (cp *ControllerProxy) Publish(ctx context.Context, req *proxypb.PublishRequest) (*emptypb.Empty, error) {
if req.EventbusName == "" {
return nil, v2.NewHTTPResult(http.StatusBadRequest, "invalid eventbus name")
}
Expand Down Expand Up @@ -213,7 +212,7 @@ func (cp *ControllerProxy) Publish(ctx context.Context, req *vanuspb.PublishRequ
return &emptypb.Empty{}, nil
}

func (cp *ControllerProxy) Subscribe(req *vanuspb.SubscribeRequest, stream vanuspb.Client_SubscribeServer) error {
func (cp *ControllerProxy) Subscribe(req *proxypb.SubscribeRequest, stream proxypb.StoreProxy_SubscribeServer) error {
_ctx, span := cp.tracer.Start(context.Background(), "Subscribe")
defer span.End()

Expand Down Expand Up @@ -291,7 +290,7 @@ func (cp *ControllerProxy) Subscribe(req *vanuspb.SubscribeRequest, stream vanus
log.KeyError: err,
"eventpb": eventpb.String(),
})
err = subscribe.stream().Send(&vanuspb.SubscribeResponse{
err = subscribe.stream().Send(&proxypb.SubscribeResponse{
SequenceId: msg.sequenceID,
Events: &cloudevents.CloudEventBatch{
Events: []*cloudevents.CloudEvent{eventpb},
Expand All @@ -311,7 +310,7 @@ func (cp *ControllerProxy) Subscribe(req *vanuspb.SubscribeRequest, stream vanus
}
}

func (cp *ControllerProxy) Ack(stream vanuspb.Client_AckServer) error {
func (cp *ControllerProxy) Ack(stream proxypb.StoreProxy_AckServer) error {
_ctx, span := cp.tracer.Start(context.Background(), "Ack")
defer span.End()
for {
Expand Down Expand Up @@ -484,7 +483,7 @@ func (cp *ControllerProxy) Start() error {
}

proxypb.RegisterControllerProxyServer(cp.grpcSrv, cp)
vanuspb.RegisterClientServer(cp.grpcSrv, cp)
proxypb.RegisterStoreProxyServer(cp.grpcSrv, cp)

proxyListen, err := net.Listen("tcp", fmt.Sprintf(":%d", cp.cfg.ProxyPort))
if err != nil {
Expand Down
Loading

0 comments on commit 1157347

Please sign in to comment.