diff --git a/pkg/chipingress/go.mod b/pkg/chipingress/go.mod index 0730579539..602f035521 100644 --- a/pkg/chipingress/go.mod +++ b/pkg/chipingress/go.mod @@ -11,6 +11,7 @@ require ( go.opentelemetry.io/otel/metric v1.43.0 go.opentelemetry.io/otel/trace v1.43.0 go.uber.org/zap v1.27.0 + google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 google.golang.org/grpc v1.79.3 google.golang.org/protobuf v1.36.10 ) @@ -33,7 +34,6 @@ require ( golang.org/x/net v0.48.0 // indirect golang.org/x/sys v0.42.0 // indirect golang.org/x/text v0.32.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/pkg/chipingress/pb/chip_ingress.pb.go b/pkg/chipingress/pb/chip_ingress.pb.go index 7117f287c6..eb60a9f348 100644 --- a/pkg/chipingress/pb/chip_ingress.pb.go +++ b/pkg/chipingress/pb/chip_ingress.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.6 +// protoc-gen-go v1.36.11 // protoc v5.29.3 // source: pb/chip_ingress.proto @@ -8,6 +8,7 @@ package pb import ( pb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" + status "google.golang.org/genproto/googleapis/rpc/status" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -22,17 +23,67 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// PublishOptions controls optional behaviour of PublishBatch. +type PublishOptions struct { + state protoimpl.MessageState `protogen:"open.v1"` + // allowPartialSuccess makes the batch atomic: either all events are committed or none are. + // When unset, the server preserves the original atomic behaviour. + // Set to true to allow partial success, where individual results carry per-event errors. + AllowPartialSuccess *bool `protobuf:"varint,1,opt,name=allowPartialSuccess,proto3,oneof" json:"allowPartialSuccess,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PublishOptions) Reset() { + *x = PublishOptions{} + mi := &file_pb_chip_ingress_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PublishOptions) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PublishOptions) ProtoMessage() {} + +func (x *PublishOptions) ProtoReflect() protoreflect.Message { + mi := &file_pb_chip_ingress_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PublishOptions.ProtoReflect.Descriptor instead. +func (*PublishOptions) Descriptor() ([]byte, []int) { + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} +} + +func (x *PublishOptions) GetAllowPartialSuccess() bool { + if x != nil && x.AllowPartialSuccess != nil { + return *x.AllowPartialSuccess + } + return false +} + // CloudEventBatch is used to send many ChipIngress type CloudEventBatch struct { - state protoimpl.MessageState `protogen:"open.v1"` - Events []*pb.CloudEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + Events []*pb.CloudEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + // options are optional publish settings. When omitted, allOrNothing defaults to true. + Options *PublishOptions `protobuf:"bytes,2,opt,name=options,proto3" json:"options,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *CloudEventBatch) Reset() { *x = CloudEventBatch{} - mi := &file_pb_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -44,7 +95,7 @@ func (x *CloudEventBatch) String() string { func (*CloudEventBatch) ProtoMessage() {} func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[0] + mi := &file_pb_chip_ingress_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -57,7 +108,7 @@ func (x *CloudEventBatch) ProtoReflect() protoreflect.Message { // Deprecated: Use CloudEventBatch.ProtoReflect.Descriptor instead. func (*CloudEventBatch) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{0} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} } func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { @@ -67,6 +118,13 @@ func (x *CloudEventBatch) GetEvents() []*pb.CloudEvent { return nil } +func (x *CloudEventBatch) GetOptions() *PublishOptions { + if x != nil { + return x.Options + } + return nil +} + type PublishResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Results []*PublishResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` @@ -76,7 +134,7 @@ type PublishResponse struct { func (x *PublishResponse) Reset() { *x = PublishResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -88,7 +146,7 @@ func (x *PublishResponse) String() string { func (*PublishResponse) ProtoMessage() {} func (x *PublishResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[1] + mi := &file_pb_chip_ingress_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -101,7 +159,7 @@ func (x *PublishResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResponse.ProtoReflect.Descriptor instead. func (*PublishResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{1} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} } func (x *PublishResponse) GetResults() []*PublishResult { @@ -114,13 +172,14 @@ func (x *PublishResponse) GetResults() []*PublishResult { type PublishResult struct { state protoimpl.MessageState `protogen:"open.v1"` EventId string `protobuf:"bytes,1,opt,name=eventId,proto3" json:"eventId,omitempty"` + Error *status.Status `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } func (x *PublishResult) Reset() { *x = PublishResult{} - mi := &file_pb_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -132,7 +191,7 @@ func (x *PublishResult) String() string { func (*PublishResult) ProtoMessage() {} func (x *PublishResult) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[2] + mi := &file_pb_chip_ingress_proto_msgTypes[3] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -145,7 +204,7 @@ func (x *PublishResult) ProtoReflect() protoreflect.Message { // Deprecated: Use PublishResult.ProtoReflect.Descriptor instead. func (*PublishResult) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{2} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} } func (x *PublishResult) GetEventId() string { @@ -155,6 +214,13 @@ func (x *PublishResult) GetEventId() string { return "" } +func (x *PublishResult) GetError() *status.Status { + if x != nil { + return x.Error + } + return nil +} + // EmptyRequest is just an empty request type EmptyRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -164,7 +230,7 @@ type EmptyRequest struct { func (x *EmptyRequest) Reset() { *x = EmptyRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -176,7 +242,7 @@ func (x *EmptyRequest) String() string { func (*EmptyRequest) ProtoMessage() {} func (x *EmptyRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[3] + mi := &file_pb_chip_ingress_proto_msgTypes[4] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -189,7 +255,7 @@ func (x *EmptyRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use EmptyRequest.ProtoReflect.Descriptor instead. func (*EmptyRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{3} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} } // PingResponse responds to pings @@ -202,7 +268,7 @@ type PingResponse struct { func (x *PingResponse) Reset() { *x = PingResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -214,7 +280,7 @@ func (x *PingResponse) String() string { func (*PingResponse) ProtoMessage() {} func (x *PingResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[4] + mi := &file_pb_chip_ingress_proto_msgTypes[5] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -227,7 +293,7 @@ func (x *PingResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. func (*PingResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{4} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} } func (x *PingResponse) GetMessage() string { @@ -247,7 +313,7 @@ type StreamEventsRequest struct { func (x *StreamEventsRequest) Reset() { *x = StreamEventsRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -259,7 +325,7 @@ func (x *StreamEventsRequest) String() string { func (*StreamEventsRequest) ProtoMessage() {} func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[5] + mi := &file_pb_chip_ingress_proto_msgTypes[6] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -272,7 +338,7 @@ func (x *StreamEventsRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsRequest.ProtoReflect.Descriptor instead. func (*StreamEventsRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{5} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} } func (x *StreamEventsRequest) GetEvent() *pb.CloudEvent { @@ -292,7 +358,7 @@ type StreamEventsResponse struct { func (x *StreamEventsResponse) Reset() { *x = StreamEventsResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -304,7 +370,7 @@ func (x *StreamEventsResponse) String() string { func (*StreamEventsResponse) ProtoMessage() {} func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[6] + mi := &file_pb_chip_ingress_proto_msgTypes[7] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -317,7 +383,7 @@ func (x *StreamEventsResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use StreamEventsResponse.ProtoReflect.Descriptor instead. func (*StreamEventsResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{6} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} } func (x *StreamEventsResponse) GetEventId() string { @@ -344,7 +410,7 @@ type RegisterSchemaRequest struct { func (x *RegisterSchemaRequest) Reset() { *x = RegisterSchemaRequest{} - mi := &file_pb_chip_ingress_proto_msgTypes[7] + mi := &file_pb_chip_ingress_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -356,7 +422,7 @@ func (x *RegisterSchemaRequest) String() string { func (*RegisterSchemaRequest) ProtoMessage() {} func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[7] + mi := &file_pb_chip_ingress_proto_msgTypes[8] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -369,7 +435,7 @@ func (x *RegisterSchemaRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterSchemaRequest.ProtoReflect.Descriptor instead. func (*RegisterSchemaRequest) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{7} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} } func (x *RegisterSchemaRequest) GetSchemas() []*Schema { @@ -389,7 +455,7 @@ type RegisterSchemaResponse struct { func (x *RegisterSchemaResponse) Reset() { *x = RegisterSchemaResponse{} - mi := &file_pb_chip_ingress_proto_msgTypes[8] + mi := &file_pb_chip_ingress_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -401,7 +467,7 @@ func (x *RegisterSchemaResponse) String() string { func (*RegisterSchemaResponse) ProtoMessage() {} func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { - mi := &file_pb_chip_ingress_proto_msgTypes[8] + mi := &file_pb_chip_ingress_proto_msgTypes[9] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -414,7 +480,7 @@ func (x *RegisterSchemaResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use RegisterSchemaResponse.ProtoReflect.Descriptor instead. func (*RegisterSchemaResponse) Descriptor() ([]byte, []int) { - return file_pb_chip_ingress_proto_rawDescGZIP(), []int{8} + return file_pb_chip_ingress_proto_rawDescGZIP(), []int{9} } func (x *RegisterSchemaResponse) GetRegistered() []*RegisteredSchema { @@ -428,13 +494,18 @@ var File_pb_chip_ingress_proto protoreflect.FileDescriptor const file_pb_chip_ingress_proto_rawDesc = "" + "\n" + - "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\"H\n" + + "\x15pb/chip_ingress.proto\x12\x0echipingress.pb\x1aLgithub.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto\x1a\x14pb/chip_common.proto\x1a\x17google/rpc/status.proto\"_\n" + + "\x0ePublishOptions\x125\n" + + "\x13allowPartialSuccess\x18\x01 \x01(\bH\x00R\x13allowPartialSuccess\x88\x01\x01B\x16\n" + + "\x14_allowPartialSuccess\"\x82\x01\n" + "\x0fCloudEventBatch\x125\n" + - "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\"J\n" + + "\x06events\x18\x01 \x03(\v2\x1d.io.cloudevents.v1.CloudEventR\x06events\x128\n" + + "\aoptions\x18\x02 \x01(\v2\x1e.chipingress.pb.PublishOptionsR\aoptions\"J\n" + "\x0fPublishResponse\x127\n" + - "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\")\n" + + "\aresults\x18\x01 \x03(\v2\x1d.chipingress.pb.PublishResultR\aresults\"S\n" + "\rPublishResult\x12\x18\n" + - "\aeventId\x18\x01 \x01(\tR\aeventId\"\x0e\n" + + "\aeventId\x18\x01 \x01(\tR\aeventId\x12(\n" + + "\x05error\x18\x02 \x01(\v2\x12.google.rpc.StatusR\x05error\"\x0e\n" + "\fEmptyRequest\"(\n" + "\fPingResponse\x12\x18\n" + "\amessage\x18\x01 \x01(\tR\amessage\"J\n" + @@ -468,42 +539,46 @@ func file_pb_chip_ingress_proto_rawDescGZIP() []byte { return file_pb_chip_ingress_proto_rawDescData } -var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_pb_chip_ingress_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_pb_chip_ingress_proto_goTypes = []any{ - (*CloudEventBatch)(nil), // 0: chipingress.pb.CloudEventBatch - (*PublishResponse)(nil), // 1: chipingress.pb.PublishResponse - (*PublishResult)(nil), // 2: chipingress.pb.PublishResult - (*EmptyRequest)(nil), // 3: chipingress.pb.EmptyRequest - (*PingResponse)(nil), // 4: chipingress.pb.PingResponse - (*StreamEventsRequest)(nil), // 5: chipingress.pb.StreamEventsRequest - (*StreamEventsResponse)(nil), // 6: chipingress.pb.StreamEventsResponse - (*RegisterSchemaRequest)(nil), // 7: chipingress.pb.RegisterSchemaRequest - (*RegisterSchemaResponse)(nil), // 8: chipingress.pb.RegisterSchemaResponse - (*pb.CloudEvent)(nil), // 9: io.cloudevents.v1.CloudEvent - (*Schema)(nil), // 10: chip_common.Schema - (*RegisteredSchema)(nil), // 11: chip_common.RegisteredSchema + (*PublishOptions)(nil), // 0: chipingress.pb.PublishOptions + (*CloudEventBatch)(nil), // 1: chipingress.pb.CloudEventBatch + (*PublishResponse)(nil), // 2: chipingress.pb.PublishResponse + (*PublishResult)(nil), // 3: chipingress.pb.PublishResult + (*EmptyRequest)(nil), // 4: chipingress.pb.EmptyRequest + (*PingResponse)(nil), // 5: chipingress.pb.PingResponse + (*StreamEventsRequest)(nil), // 6: chipingress.pb.StreamEventsRequest + (*StreamEventsResponse)(nil), // 7: chipingress.pb.StreamEventsResponse + (*RegisterSchemaRequest)(nil), // 8: chipingress.pb.RegisterSchemaRequest + (*RegisterSchemaResponse)(nil), // 9: chipingress.pb.RegisterSchemaResponse + (*pb.CloudEvent)(nil), // 10: io.cloudevents.v1.CloudEvent + (*status.Status)(nil), // 11: google.rpc.Status + (*Schema)(nil), // 12: chip_common.Schema + (*RegisteredSchema)(nil), // 13: chip_common.RegisteredSchema } var file_pb_chip_ingress_proto_depIdxs = []int32{ - 9, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent - 2, // 1: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult - 9, // 2: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent - 10, // 3: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema - 11, // 4: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema - 9, // 5: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent - 0, // 6: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch - 3, // 7: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest - 5, // 8: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest - 7, // 9: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest - 1, // 10: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse - 1, // 11: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse - 4, // 12: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse - 6, // 13: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse - 8, // 14: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse - 10, // [10:15] is the sub-list for method output_type - 5, // [5:10] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 10, // 0: chipingress.pb.CloudEventBatch.events:type_name -> io.cloudevents.v1.CloudEvent + 0, // 1: chipingress.pb.CloudEventBatch.options:type_name -> chipingress.pb.PublishOptions + 3, // 2: chipingress.pb.PublishResponse.results:type_name -> chipingress.pb.PublishResult + 11, // 3: chipingress.pb.PublishResult.error:type_name -> google.rpc.Status + 10, // 4: chipingress.pb.StreamEventsRequest.event:type_name -> io.cloudevents.v1.CloudEvent + 12, // 5: chipingress.pb.RegisterSchemaRequest.schemas:type_name -> chip_common.Schema + 13, // 6: chipingress.pb.RegisterSchemaResponse.registered:type_name -> chip_common.RegisteredSchema + 10, // 7: chipingress.pb.ChipIngress.Publish:input_type -> io.cloudevents.v1.CloudEvent + 1, // 8: chipingress.pb.ChipIngress.PublishBatch:input_type -> chipingress.pb.CloudEventBatch + 4, // 9: chipingress.pb.ChipIngress.Ping:input_type -> chipingress.pb.EmptyRequest + 6, // 10: chipingress.pb.ChipIngress.StreamEvents:input_type -> chipingress.pb.StreamEventsRequest + 8, // 11: chipingress.pb.ChipIngress.RegisterSchema:input_type -> chipingress.pb.RegisterSchemaRequest + 2, // 12: chipingress.pb.ChipIngress.Publish:output_type -> chipingress.pb.PublishResponse + 2, // 13: chipingress.pb.ChipIngress.PublishBatch:output_type -> chipingress.pb.PublishResponse + 5, // 14: chipingress.pb.ChipIngress.Ping:output_type -> chipingress.pb.PingResponse + 7, // 15: chipingress.pb.ChipIngress.StreamEvents:output_type -> chipingress.pb.StreamEventsResponse + 9, // 16: chipingress.pb.ChipIngress.RegisterSchema:output_type -> chipingress.pb.RegisterSchemaResponse + 12, // [12:17] is the sub-list for method output_type + 7, // [7:12] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_pb_chip_ingress_proto_init() } @@ -512,13 +587,14 @@ func file_pb_chip_ingress_proto_init() { return } file_pb_chip_common_proto_init() + file_pb_chip_ingress_proto_msgTypes[0].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_pb_chip_ingress_proto_rawDesc), len(file_pb_chip_ingress_proto_rawDesc)), NumEnums: 0, - NumMessages: 9, + NumMessages: 10, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/chipingress/pb/chip_ingress.proto b/pkg/chipingress/pb/chip_ingress.proto index 1675f5d9c6..9b13ca8868 100644 --- a/pkg/chipingress/pb/chip_ingress.proto +++ b/pkg/chipingress/pb/chip_ingress.proto @@ -2,6 +2,7 @@ syntax = "proto3"; import "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb/cloudevent.proto"; import "pb/chip_common.proto"; +import "google/rpc/status.proto"; package chipingress.pb; @@ -28,9 +29,19 @@ service ChipIngress { rpc RegisterSchema(RegisterSchemaRequest) returns (RegisterSchemaResponse) {} } +// PublishOptions controls optional behaviour of PublishBatch. +message PublishOptions { + // allowPartialSuccess makes the batch atomic: either all events are committed or none are. + // When unset, the server preserves the original atomic behaviour. + // Set to true to allow partial success, where individual results carry per-event errors. + optional bool allowPartialSuccess = 1; +} + // CloudEventBatch is used to send many ChipIngress -message CloudEventBatch{ +message CloudEventBatch { repeated io.cloudevents.v1.CloudEvent events = 1; + // options are optional publish settings. When omitted, allOrNothing defaults to true. + PublishOptions options = 2; } message PublishResponse { @@ -39,6 +50,7 @@ message PublishResponse { message PublishResult { string eventId = 1; + google.rpc.Status error = 2; } // EmptyRequest is just an empty request diff --git a/pkg/chipingress/types.go b/pkg/chipingress/types.go index 15f94c04a3..e3f72541bb 100644 --- a/pkg/chipingress/types.go +++ b/pkg/chipingress/types.go @@ -22,6 +22,7 @@ type ( PingResponse = pb.PingResponse PublishResponse = pb.PublishResponse PublishResult = pb.PublishResult + PublishOptions = pb.PublishOptions StreamEventsRequest = pb.StreamEventsRequest StreamEventsResponse = pb.StreamEventsResponse )