@@ -36,6 +36,7 @@ import (
3636
3737 "github.com/IBM/sarama"
3838 v1 "github.com/dataflow-operator/dataflow/api/v1"
39+ "github.com/dataflow-operator/dataflow/internal/logkeys"
3940 "github.com/dataflow-operator/dataflow/internal/metrics"
4041 "github.com/dataflow-operator/dataflow/internal/retry"
4142 "github.com/dataflow-operator/dataflow/internal/types"
@@ -237,6 +238,7 @@ func (k *KafkaSourceConnector) Connect(ctx context.Context) error {
237238 }(), err )
238239 }
239240 k .consumer = consumer
241+ k .logger .Info ("Successfully connected to Kafka" , "brokers" , k .config .Brokers , "topic" , k .config .Topic , "group" , consumerGroup )
240242
241243 // Record connection status
242244 if k .namespace != "" && k .name != "" {
@@ -607,6 +609,7 @@ func (k *KafkaSourceConnector) Close() error {
607609 return nil
608610 }
609611
612+ k .logger .Info ("Closing Kafka source connection" , "brokers" , k .config .Brokers , "topic" , k .config .Topic )
610613 k .closed = true
611614 if k .consumer != nil {
612615 // Record connection status
@@ -668,6 +671,7 @@ func (h *kafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSes
668671 msgData , err = h .connector .deserializeAvro (session .Context (), message .Value )
669672 if err != nil {
670673 h .connector .logger .Error (err , "Failed to deserialize Avro message" ,
674+ logkeys .MessageID , fmt .Sprintf ("%d/%d" , message .Partition , message .Offset ),
671675 "topic" , message .Topic ,
672676 "partition" , message .Partition ,
673677 "offset" , message .Offset )
@@ -685,11 +689,12 @@ func (h *kafkaConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSes
685689 msg .Metadata ["partition" ] = message .Partition
686690 msg .Metadata ["offset" ] = message .Offset
687691 msg .Metadata ["key" ] = string (message .Key )
692+ // Commit offset only after the message is successfully written to the sink (called by sink connectors)
693+ msg .Ack = func () { session .MarkMessage (message , "" ) }
688694
689695 select {
690696 case h .msgChan <- msg :
691- session .MarkMessage (message , "" )
692- // Record metrics
697+ // Record metrics (offset will be committed by sink after successful write)
693698 if h .connector .namespace != "" && h .connector .name != "" {
694699 metrics .RecordConnectorMessageRead (h .connector .namespace , h .connector .name , "kafka" , "source" )
695700 }
@@ -887,6 +892,7 @@ func (k *KafkaSinkConnector) Connect(ctx context.Context) error {
887892 }(), err )
888893 }
889894 k .producer = producer
895+ k .logger .Info ("Successfully connected to Kafka" , "brokers" , k .config .Brokers , "topic" , k .config .Topic )
890896
891897 // Record connection status
892898 if k .namespace != "" && k .name != "" {
@@ -981,6 +987,10 @@ func (k *KafkaSinkConnector) Write(ctx context.Context, messages <-chan *types.M
981987
982988 msg .Metadata ["partition" ] = partition
983989 msg .Metadata ["offset" ] = offset
990+
991+ if msg .Ack != nil {
992+ msg .Ack ()
993+ }
984994 }
985995 }
986996}
@@ -994,6 +1004,7 @@ func (k *KafkaSinkConnector) Close() error {
9941004 return nil
9951005 }
9961006
1007+ k .logger .Info ("Closing Kafka sink connection" , "brokers" , k .config .Brokers , "topic" , k .config .Topic )
9971008 k .closed = true
9981009 if k .producer != nil {
9991010 // Record connection status
0 commit comments