diff --git a/app/client/rpc/rpc_codec.go b/app/client/rpc/rpc_codec.go index c1dd0246..7d00416c 100644 --- a/app/client/rpc/rpc_codec.go +++ b/app/client/rpc/rpc_codec.go @@ -8,8 +8,6 @@ import ( raw "github.com/asim/nitro/app/codec/bytes" "github.com/asim/nitro/app/codec/json" "github.com/asim/nitro/app/codec/jsonrpc" - "github.com/asim/nitro/app/codec/proto" - "github.com/asim/nitro/app/codec/protorpc" "github.com/asim/nitro/app/errors" "github.com/asim/nitro/app/network" "github.com/asim/nitro/app/registry" @@ -52,20 +50,15 @@ var ( DefaultContentType = "application/json" DefaultCodecs = map[string]codec.NewCodec{ - "application/protobuf": proto.NewCodec, "application/json": json.NewCodec, "application/json-rpc": jsonrpc.NewCodec, - "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": raw.NewCodec, } // TODO: remove legacy codec list defaultCodecs = map[string]codec.NewCodec{ - "application/json": jsonrpc.NewCodec, - "application/json-rpc": jsonrpc.NewCodec, - "application/protobuf": protorpc.NewCodec, - "application/proto-rpc": protorpc.NewCodec, - "application/octet-stream": protorpc.NewCodec, + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, } ) @@ -139,8 +132,6 @@ func setupProtocol(msg *network.Message, node *registry.Node) codec.NewCodec { switch msg.Header["Content-Type"] { case "application/json": msg.Header["Content-Type"] = "application/json-rpc" - case "application/protobuf": - msg.Header["Content-Type"] = "application/proto-rpc" } // now return codec diff --git a/app/client/rpc/rpc_request_test.go b/app/client/rpc/rpc_request_test.go index 38dfbc4e..707fb564 100644 --- a/app/client/rpc/rpc_request_test.go +++ b/app/client/rpc/rpc_request_test.go @@ -18,8 +18,8 @@ func TestRequestOptions(t *testing.T) { t.Fatalf("expected 'endpoint' got %s", r.ContentType()) } - r2 := newRequest("service", "endpoint", nil, "application/json", client.WithContentType("application/protobuf")) - if r2.ContentType() != "application/protobuf" { + r2 := newRequest("service", "endpoint", nil, "application/octet", client.WithContentType("application/octet")) + if r2.ContentType() != "application/octet" { t.Fatalf("expected 'endpoint' got %s", r2.ContentType()) } } diff --git a/app/codec/codec_test.go b/app/codec/codec_test.go index f22aca43..684e11bd 100644 --- a/app/codec/codec_test.go +++ b/app/codec/codec_test.go @@ -8,8 +8,6 @@ import ( "github.com/asim/nitro/app/codec/bytes" "github.com/asim/nitro/app/codec/json" "github.com/asim/nitro/app/codec/jsonrpc" - "github.com/asim/nitro/app/codec/proto" - "github.com/asim/nitro/app/codec/protorpc" ) type testRWC struct{} @@ -28,11 +26,9 @@ func (rwc *testRWC) Close() error { func getCodecs(c io.ReadWriteCloser) map[string]codec.Codec { return map[string]codec.Codec{ - "bytes": bytes.NewCodec(c), - "json": json.NewCodec(c), - "jsonrpc": jsonrpc.NewCodec(c), - "proto": proto.NewCodec(c), - "protorpc": protorpc.NewCodec(c), + "bytes": bytes.NewCodec(c), + "json": json.NewCodec(c), + "jsonrpc": jsonrpc.NewCodec(c), } } diff --git a/app/codec/json/json.go b/app/codec/json/json.go index bb6f5f2e..ee09dce7 100644 --- a/app/codec/json/json.go +++ b/app/codec/json/json.go @@ -6,8 +6,6 @@ import ( "io" "github.com/asim/nitro/app/codec" - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" ) type Codec struct { @@ -24,9 +22,6 @@ func (c *Codec) ReadBody(b interface{}) error { if b == nil { return nil } - if pb, ok := b.(proto.Message); ok { - return jsonpb.UnmarshalNext(c.Decoder, pb) - } return c.Decoder.Decode(b) } diff --git a/app/codec/json/marshaler.go b/app/codec/json/marshaler.go index 15f89466..8486befc 100644 --- a/app/codec/json/marshaler.go +++ b/app/codec/json/marshaler.go @@ -1,37 +1,21 @@ package json import ( - "bytes" "encoding/json" "github.com/asim/nitro/util/buf" - "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" ) -var jsonpbMarshaler = &jsonpb.Marshaler{} - // create buffer pool with 16 instances each preallocated with 256 bytes var bufferPool = buf.NewPool() type Marshaler struct{} func (j Marshaler) Marshal(v interface{}) ([]byte, error) { - if pb, ok := v.(proto.Message); ok { - buf := bufferPool.Get() - defer bufferPool.Put(buf) - if err := jsonpbMarshaler.Marshal(buf, pb); err != nil { - return nil, err - } - return buf.Bytes(), nil - } return json.Marshal(v) } func (j Marshaler) Unmarshal(d []byte, v interface{}) error { - if pb, ok := v.(proto.Message); ok { - return jsonpb.Unmarshal(bytes.NewReader(d), pb) - } return json.Unmarshal(d, v) } diff --git a/app/codec/proto/marshaler.go b/app/codec/proto/marshaler.go deleted file mode 100644 index 617b3608..00000000 --- a/app/codec/proto/marshaler.go +++ /dev/null @@ -1,47 +0,0 @@ -package proto - -import ( - "bytes" - - "github.com/asim/nitro/app/codec" - "github.com/asim/nitro/util/buf" - "github.com/golang/protobuf/proto" -) - -// create buffer pool with 16 instances each preallocated with 256 bytes -var bufferPool = buf.NewPool() - -type Marshaler struct{} - -func (Marshaler) Marshal(v interface{}) ([]byte, error) { - pb, ok := v.(proto.Message) - if !ok { - return nil, codec.ErrInvalidMessage - } - - // looks not good, but allows to reuse underlining bytes - buf := bufferPool.Get() - pbuf := proto.NewBuffer(buf.Bytes()) - defer func() { - bufferPool.Put(bytes.NewBuffer(pbuf.Bytes())) - }() - - if err := pbuf.Marshal(pb); err != nil { - return nil, err - } - - return pbuf.Bytes(), nil -} - -func (Marshaler) Unmarshal(data []byte, v interface{}) error { - pb, ok := v.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - - return proto.Unmarshal(data, pb) -} - -func (Marshaler) String() string { - return "proto" -} diff --git a/app/codec/proto/message.go b/app/codec/proto/message.go deleted file mode 100644 index fd5cc6fa..00000000 --- a/app/codec/proto/message.go +++ /dev/null @@ -1,37 +0,0 @@ -package proto - -type Message struct { - Data []byte -} - -func (m *Message) MarshalJSON() ([]byte, error) { - return m.Data, nil -} - -func (m *Message) UnmarshalJSON(data []byte) error { - m.Data = data - return nil -} - -func (m *Message) ProtoMessage() {} - -func (m *Message) Reset() { - *m = Message{} -} - -func (m *Message) String() string { - return string(m.Data) -} - -func (m *Message) Marshal() ([]byte, error) { - return m.Data, nil -} - -func (m *Message) Unmarshal(data []byte) error { - m.Data = data - return nil -} - -func NewMessage(data []byte) *Message { - return &Message{data} -} diff --git a/app/codec/proto/proto.go b/app/codec/proto/proto.go deleted file mode 100644 index d465c209..00000000 --- a/app/codec/proto/proto.go +++ /dev/null @@ -1,64 +0,0 @@ -// Package proto provides a proto codec -package proto - -import ( - "io" - "io/ioutil" - - "github.com/asim/nitro/app/codec" - "github.com/golang/protobuf/proto" -) - -type Codec struct { - Conn io.ReadWriteCloser -} - -func (c *Codec) ReadHeader(m *codec.Message, t codec.MessageType) error { - return nil -} - -func (c *Codec) ReadBody(b interface{}) error { - if b == nil { - return nil - } - buf, err := ioutil.ReadAll(c.Conn) - if err != nil { - return err - } - m, ok := b.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - return proto.Unmarshal(buf, m) -} - -func (c *Codec) Write(m *codec.Message, b interface{}) error { - if b == nil { - // Nothing to write - return nil - } - p, ok := b.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - buf, err := proto.Marshal(p) - if err != nil { - return err - } - _, err = c.Conn.Write(buf) - return err -} - -func (c *Codec) Close() error { - return c.Conn.Close() -} - -func (c *Codec) String() string { - return "proto" -} - -func NewCodec(c io.ReadWriteCloser) codec.Codec { - return &Codec{ - Conn: c, - } -} diff --git a/app/codec/protorpc/envelope.pb.go b/app/codec/protorpc/envelope.pb.go deleted file mode 100644 index b0e88066..00000000 --- a/app/codec/protorpc/envelope.pb.go +++ /dev/null @@ -1,144 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: codec/protorpc/envelope.proto - -package protorpc - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type Request struct { - ServiceMethod string `protobuf:"bytes,1,opt,name=service_method,json=serviceMethod,proto3" json:"service_method,omitempty"` - Seq uint64 `protobuf:"fixed64,2,opt,name=seq,proto3" json:"seq,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Request) Reset() { *m = Request{} } -func (m *Request) String() string { return proto.CompactTextString(m) } -func (*Request) ProtoMessage() {} -func (*Request) Descriptor() ([]byte, []int) { - return fileDescriptor_12fd17ed7ee86a33, []int{0} -} - -func (m *Request) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Request.Unmarshal(m, b) -} -func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Request.Marshal(b, m, deterministic) -} -func (m *Request) XXX_Merge(src proto.Message) { - xxx_messageInfo_Request.Merge(m, src) -} -func (m *Request) XXX_Size() int { - return xxx_messageInfo_Request.Size(m) -} -func (m *Request) XXX_DiscardUnknown() { - xxx_messageInfo_Request.DiscardUnknown(m) -} - -var xxx_messageInfo_Request proto.InternalMessageInfo - -func (m *Request) GetServiceMethod() string { - if m != nil { - return m.ServiceMethod - } - return "" -} - -func (m *Request) GetSeq() uint64 { - if m != nil { - return m.Seq - } - return 0 -} - -type Response struct { - ServiceMethod string `protobuf:"bytes,1,opt,name=service_method,json=serviceMethod,proto3" json:"service_method,omitempty"` - Seq uint64 `protobuf:"fixed64,2,opt,name=seq,proto3" json:"seq,omitempty"` - Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Response) Reset() { *m = Response{} } -func (m *Response) String() string { return proto.CompactTextString(m) } -func (*Response) ProtoMessage() {} -func (*Response) Descriptor() ([]byte, []int) { - return fileDescriptor_12fd17ed7ee86a33, []int{1} -} - -func (m *Response) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Response.Unmarshal(m, b) -} -func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Response.Marshal(b, m, deterministic) -} -func (m *Response) XXX_Merge(src proto.Message) { - xxx_messageInfo_Response.Merge(m, src) -} -func (m *Response) XXX_Size() int { - return xxx_messageInfo_Response.Size(m) -} -func (m *Response) XXX_DiscardUnknown() { - xxx_messageInfo_Response.DiscardUnknown(m) -} - -var xxx_messageInfo_Response proto.InternalMessageInfo - -func (m *Response) GetServiceMethod() string { - if m != nil { - return m.ServiceMethod - } - return "" -} - -func (m *Response) GetSeq() uint64 { - if m != nil { - return m.Seq - } - return 0 -} - -func (m *Response) GetError() string { - if m != nil { - return m.Error - } - return "" -} - -func init() { - proto.RegisterType((*Request)(nil), "protorpc.Request") - proto.RegisterType((*Response)(nil), "protorpc.Response") -} - -func init() { proto.RegisterFile("codec/protorpc/envelope.proto", fileDescriptor_12fd17ed7ee86a33) } - -var fileDescriptor_12fd17ed7ee86a33 = []byte{ - // 148 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x4d, 0xce, 0x4f, 0x49, - 0x4d, 0xd6, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x2f, 0x2a, 0x48, 0xd6, 0x4f, 0xcd, 0x2b, 0x4b, 0xcd, - 0xc9, 0x2f, 0x48, 0xd5, 0x03, 0x8b, 0x08, 0x71, 0xc0, 0x24, 0x94, 0x9c, 0xb8, 0xd8, 0x83, 0x52, - 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84, 0x54, 0xb9, 0xf8, 0x8a, 0x53, 0x8b, 0xca, 0x32, 0x93, 0x53, - 0xe3, 0x73, 0x53, 0x4b, 0x32, 0xf2, 0x53, 0x24, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0x78, 0xa1, - 0xa2, 0xbe, 0x60, 0x41, 0x21, 0x01, 0x2e, 0xe6, 0xe2, 0xd4, 0x42, 0x09, 0x26, 0x05, 0x46, 0x0d, - 0xb6, 0x20, 0x10, 0x53, 0x29, 0x92, 0x8b, 0x23, 0x28, 0xb5, 0xb8, 0x20, 0x3f, 0xaf, 0x38, 0x95, - 0x6c, 0x43, 0x84, 0x44, 0xb8, 0x58, 0x53, 0x8b, 0x8a, 0xf2, 0x8b, 0x24, 0x98, 0xc1, 0xea, 0x21, - 0x9c, 0x24, 0x36, 0xb0, 0x43, 0x8d, 0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0xe4, 0x73, 0x3a, 0x4b, - 0xd0, 0x00, 0x00, 0x00, -} diff --git a/app/codec/protorpc/envelope.pb.micro.go b/app/codec/protorpc/envelope.pb.micro.go deleted file mode 100644 index fcaa757c..00000000 --- a/app/codec/protorpc/envelope.pb.micro.go +++ /dev/null @@ -1,21 +0,0 @@ -// Code generated by protoc-gen-micro. DO NOT EDIT. -// source: codec/protorpc/envelope.proto - -package protorpc - -import ( - fmt "fmt" - proto "github.com/golang/protobuf/proto" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package diff --git a/app/codec/protorpc/envelope.proto b/app/codec/protorpc/envelope.proto deleted file mode 100644 index 5b2f9599..00000000 --- a/app/codec/protorpc/envelope.proto +++ /dev/null @@ -1,14 +0,0 @@ -syntax = "proto3"; - -package protorpc; - -message Request { - string service_method = 1; - fixed64 seq = 2; -} - -message Response { - string service_method = 1; - fixed64 seq = 2; - string error = 3; -} diff --git a/app/codec/protorpc/netstring.go b/app/codec/protorpc/netstring.go deleted file mode 100644 index 8204a0e2..00000000 --- a/app/codec/protorpc/netstring.go +++ /dev/null @@ -1,36 +0,0 @@ -package protorpc - -import ( - "encoding/binary" - "io" -) - -// WriteNetString writes data to a big-endian netstring on a Writer. -// Size is always a 32-bit unsigned int. -func WriteNetString(w io.Writer, data []byte) (written int, err error) { - size := make([]byte, 4) - binary.BigEndian.PutUint32(size, uint32(len(data))) - if written, err = w.Write(size); err != nil { - return - } - return w.Write(data) -} - -// ReadNetString reads data from a big-endian netstring. -func ReadNetString(r io.Reader) (data []byte, err error) { - sizeBuf := make([]byte, 4) - _, err = r.Read(sizeBuf) - if err != nil { - return nil, err - } - size := binary.BigEndian.Uint32(sizeBuf) - if size == 0 { - return nil, nil - } - data = make([]byte, size) - _, err = r.Read(data) - if err != nil { - return nil, err - } - return -} diff --git a/app/codec/protorpc/protorpc.go b/app/codec/protorpc/protorpc.go deleted file mode 100644 index d4c54058..00000000 --- a/app/codec/protorpc/protorpc.go +++ /dev/null @@ -1,186 +0,0 @@ -// Protorpc provides a net/rpc proto-rpc codec. See envelope.proto for the format. -package protorpc - -import ( - "bytes" - "fmt" - "io" - "strconv" - "sync" - - "github.com/asim/nitro/app/codec" - "github.com/golang/protobuf/proto" -) - -type flusher interface { - Flush() error -} - -type protoCodec struct { - sync.Mutex - rwc io.ReadWriteCloser - mt codec.MessageType - buf *bytes.Buffer -} - -func (c *protoCodec) Close() error { - c.buf.Reset() - return c.rwc.Close() -} - -func (c *protoCodec) String() string { - return "proto-rpc" -} - -func id(id string) uint64 { - p, err := strconv.ParseInt(id, 10, 64) - if err != nil { - p = 0 - } - i := uint64(p) - return i -} - -func (c *protoCodec) Write(m *codec.Message, b interface{}) error { - switch m.Type { - case codec.Request: - c.Lock() - defer c.Unlock() - // This is protobuf, of course we copy it. - pbr := &Request{ServiceMethod: m.Method, Seq: id(m.Id)} - data, err := proto.Marshal(pbr) - if err != nil { - return err - } - _, err = WriteNetString(c.rwc, data) - if err != nil { - return err - } - // dont trust or incoming message - m, ok := b.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - data, err = proto.Marshal(m) - if err != nil { - return err - } - _, err = WriteNetString(c.rwc, data) - if err != nil { - return err - } - if flusher, ok := c.rwc.(flusher); ok { - if err = flusher.Flush(); err != nil { - return err - } - } - case codec.Response, codec.Error: - c.Lock() - defer c.Unlock() - rtmp := &Response{ServiceMethod: m.Method, Seq: id(m.Id), Error: m.Error} - data, err := proto.Marshal(rtmp) - if err != nil { - return err - } - _, err = WriteNetString(c.rwc, data) - if err != nil { - return err - } - if pb, ok := b.(proto.Message); ok { - data, err = proto.Marshal(pb) - if err != nil { - return err - } - } else { - data = nil - } - _, err = WriteNetString(c.rwc, data) - if err != nil { - return err - } - if flusher, ok := c.rwc.(flusher); ok { - if err = flusher.Flush(); err != nil { - return err - } - } - case codec.Event: - m, ok := b.(proto.Message) - if !ok { - return codec.ErrInvalidMessage - } - data, err := proto.Marshal(m) - if err != nil { - return err - } - c.rwc.Write(data) - default: - return fmt.Errorf("Unrecognised message type: %v", m.Type) - } - return nil -} - -func (c *protoCodec) ReadHeader(m *codec.Message, mt codec.MessageType) error { - c.buf.Reset() - c.mt = mt - - switch mt { - case codec.Request: - data, err := ReadNetString(c.rwc) - if err != nil { - return err - } - rtmp := new(Request) - err = proto.Unmarshal(data, rtmp) - if err != nil { - return err - } - m.Method = rtmp.GetServiceMethod() - m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) - case codec.Response: - data, err := ReadNetString(c.rwc) - if err != nil { - return err - } - rtmp := new(Response) - err = proto.Unmarshal(data, rtmp) - if err != nil { - return err - } - m.Method = rtmp.GetServiceMethod() - m.Id = fmt.Sprintf("%d", rtmp.GetSeq()) - m.Error = rtmp.GetError() - case codec.Event: - _, err := io.Copy(c.buf, c.rwc) - return err - default: - return fmt.Errorf("Unrecognised message type: %v", mt) - } - return nil -} - -func (c *protoCodec) ReadBody(b interface{}) error { - var data []byte - switch c.mt { - case codec.Request, codec.Response: - var err error - data, err = ReadNetString(c.rwc) - if err != nil { - return err - } - case codec.Event: - data = c.buf.Bytes() - default: - return fmt.Errorf("Unrecognised message type: %v", c.mt) - } - if b != nil { - return proto.Unmarshal(data, b.(proto.Message)) - } - return nil -} - -func NewCodec(rwc io.ReadWriteCloser) codec.Codec { - return &protoCodec{ - buf: bytes.NewBuffer(nil), - rwc: rwc, - } -} diff --git a/app/server/rpc/rpc_codec.go b/app/server/rpc/rpc_codec.go index 191345dd..602b8025 100644 --- a/app/server/rpc/rpc_codec.go +++ b/app/server/rpc/rpc_codec.go @@ -8,8 +8,6 @@ import ( raw "github.com/asim/nitro/app/codec/bytes" "github.com/asim/nitro/app/codec/json" "github.com/asim/nitro/app/codec/jsonrpc" - "github.com/asim/nitro/app/codec/proto" - "github.com/asim/nitro/app/codec/protorpc" "github.com/asim/nitro/app/errors" "github.com/asim/nitro/app/network" "github.com/asim/nitro/util/buf" @@ -40,18 +38,13 @@ var ( DefaultCodecs = map[string]codec.NewCodec{ "application/json": json.NewCodec, "application/json-rpc": jsonrpc.NewCodec, - "application/protobuf": proto.NewCodec, - "application/proto-rpc": protorpc.NewCodec, "application/octet-stream": raw.NewCodec, } // TODO: remove legacy codec list defaultCodecs = map[string]codec.NewCodec{ - "application/json": jsonrpc.NewCodec, - "application/json-rpc": jsonrpc.NewCodec, - "application/protobuf": protorpc.NewCodec, - "application/proto-rpc": protorpc.NewCodec, - "application/octet-stream": protorpc.NewCodec, + "application/json": jsonrpc.NewCodec, + "application/json-rpc": jsonrpc.NewCodec, } // the local buffer pool diff --git a/app/server/rpc/rpc_stream_test.go b/app/server/rpc/rpc_stream_test.go index 902714d0..f374687d 100644 --- a/app/server/rpc/rpc_stream_test.go +++ b/app/server/rpc/rpc_stream_test.go @@ -4,25 +4,17 @@ import ( "bytes" "fmt" "io" - "math/rand" "sync" "testing" - "time" "github.com/asim/nitro/app/codec/json" - protoCodec "github.com/asim/nitro/app/codec/proto" - "github.com/golang/protobuf/proto" ) -// protoStruct implements proto.Message -type protoStruct struct { - Payload string `protobuf:"bytes,1,opt,name=service,proto3" json:"service,omitempty"` +// jsonStruct implements proto.Message +type jsonStruct struct { + Payload string } -func (m *protoStruct) Reset() { *m = protoStruct{} } -func (m *protoStruct) String() string { return proto.CompactTextString(m) } -func (*protoStruct) ProtoMessage() {} - // safeBuffer throws away everything and wont Read data back type safeBuffer struct { sync.RWMutex @@ -92,42 +84,3 @@ func TestRPCStream_Sequence(t *testing.T) { } } } - -func TestRPCStream_Concurrency(t *testing.T) { - buffer := new(safeBuffer) - codec := protoCodec.NewCodec(buffer) - streamServer := rpcStream{ - codec: codec, - request: &rpcRequest{ - codec: codec, - }, - } - - var wg sync.WaitGroup - // Check if race conditions happen - for i := 0; i < 10; i++ { - wg.Add(2) - - go func() { - for i := 0; i < 50; i++ { - msg := protoStruct{Payload: "test"} - <-time.After(time.Duration(rand.Intn(50)) * time.Millisecond) - if err := streamServer.Send(msg); err != nil { - t.Errorf("Unexpected Send error: %s", err) - } - } - wg.Done() - }() - - go func() { - for i := 0; i < 50; i++ { - <-time.After(time.Duration(rand.Intn(50)) * time.Millisecond) - if err := streamServer.Recv(&protoStruct{}); err != nil { - t.Errorf("Unexpected Recv error: %s", err) - } - } - wg.Done() - }() - } - wg.Wait() -} diff --git a/go.mod b/go.mod index 70f9fe54..9d8f9126 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,3 @@ module github.com/asim/nitro go 1.15 - -require github.com/golang/protobuf v1.4.3 diff --git a/go.sum b/go.sum index b0bf8852..e69de29b 100644 --- a/go.sum +++ b/go.sum @@ -1,20 +0,0 @@ -github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= -github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= -github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= -github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= -github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= -github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= -github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= -google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= -google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= -google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= -google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= -google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=