Update the kafka message to include header for patch operation 28/140728/1
authorsrinivasyanamadala <srinivas.yanamadala@techmahindra.com>
Thu, 15 May 2025 12:26:28 +0000 (14:26 +0200)
committersrinivasyanamadala <srinivas.yanamadala@techmahindra.com>
Thu, 15 May 2025 12:26:38 +0000 (14:26 +0200)
Issue-ID: POLICY-5358
Change-Id: Ia4df78be6e3d05bbdcb01ff65849701274670310
Signed-off-by: srinivasyanamadala <srinivas.yanamadala@techmahindra.com>
pkg/kafkacomm/handler/patch_message_handler.go
pkg/kafkacomm/publisher/patch_message_publisher.go
pkg/kafkacomm/publisher/patch_message_publisher_test.go
pkg/model/mesages.go

index d537e61..25ca969 100644 (file)
@@ -19,6 +19,7 @@
 
 package handler
 
+
 import (
        "context"
        "encoding/json"
@@ -26,23 +27,29 @@ import (
        "policy-opa-pdp/pkg/kafkacomm"
        "policy-opa-pdp/pkg/log"
        "policy-opa-pdp/pkg/opasdk"
+       "policy-opa-pdp/pkg/model"
 )
 
+
+type LocalHeader struct {
+       MessageType string `json:"messageName"`
+       SourceID    string `json:"source-id"`
+}
+
 type PatchMessage struct {
-       PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+       Header     LocalHeader         `json:"header"`
+       PatchInfos []opasdk.PatchImpl  `json:"patchInfos"`
 }
 
+
 // This function handles the incoming kafka messages and dispatches them futher for data patch processing.
 func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error {
-
        log.Debug("Starting Patch Message Listener.....")
-       var stopConsuming bool
-       for !stopConsuming {
+       for {
                select {
                case <-ctx.Done():
                        log.Debug("Stopping PDP Listener.....")
                        return nil
-                       stopConsuming = true ///Loop Exits
                default:
                        message, err := kafkacomm.ReadKafkaMessages(kc)
                        if err != nil {
@@ -54,11 +61,19 @@ func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic
                                var patchMsg PatchMessage
                                err = json.Unmarshal(message, &patchMsg)
                                if err != nil {
-                                       log.Warnf("Failed to UnMarshal Messages: %v\n", err)
+                                       log.Warnf("Failed to UnMarshal PatchMessage: %v\n", err)
                                        continue
                                }
                                log.Debugf("Received patch request")
 
+                               // check message type
+                               if patchMsg.Header.MessageType != model.OPA_PDP_DATA_PATCH_SYNC.String() {
+                                       log.Warnf("Ignoring message with unexpected type: %s", patchMsg.Header.MessageType)
+                                       continue
+                               }
+
+                               log.Debugf("Received patch request from source: %s", patchMsg.Header.SourceID)
+
                                if err := data.PatchDataVar(patchMsg.PatchInfos, nil); err != nil {
                                        log.Debugf("patchData failed: %v", err)
                                } else {
@@ -66,8 +81,6 @@ func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic
                                }
                        }
                }
-
        }
-       return nil
-
 }
+
index 2be1655..5cd3452 100644 (file)
@@ -25,22 +25,35 @@ import (
        "policy-opa-pdp/cfg"
        "policy-opa-pdp/pkg/kafkacomm"
        "policy-opa-pdp/pkg/log"
+       "policy-opa-pdp/pkg/model"
        "policy-opa-pdp/pkg/opasdk"
+       "policy-opa-pdp/pkg/pdpattributes"
 )
 
 type RealPatchSender struct {
        Producer kafkacomm.KafkaProducerInterface
 }
 
+// Define header structure
+type LocalHeader struct {
+       MessageType model.PdpMessageType `json:"messageName"`
+       SourceID    string               `json:"source-id"`
+}
+
+// Updated message structure to match the consumer
 type PatchKafkaPayload struct {
-       PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+       Header     LocalHeader         `json:"header"`
+       PatchInfos []opasdk.PatchImpl  `json:"patchInfos"`
 }
 
 func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error {
        log.Debugf("In SendPatchMessage")
-       var topic string
-       topic = cfg.PatchTopic
+
        kafkaPayload := PatchKafkaPayload{
+               Header: LocalHeader{
+                       MessageType: model.OPA_PDP_DATA_PATCH_SYNC,
+                       SourceID:    pdpattributes.PdpName,
+               },
                PatchInfos: patchInfos,
        }
 
@@ -50,6 +63,7 @@ func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error
                return err
        }
 
+       topic := cfg.PatchTopic
        kafkaMessage := &kafka.Message{
                TopicPartition: kafka.TopicPartition{
                        Topic:     &topic,
@@ -57,14 +71,14 @@ func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error
                },
                Value: jsonMessage,
        }
+
        var eventChan chan kafka.Event = nil
        err = s.Producer.Produce(kafkaMessage, eventChan)
        if err != nil {
-               log.Warnf("Error producing message: %v\n", err)
+               log.Warnf("Error producing message: %v", err)
                return err
-       } else {
-               log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage))
        }
 
+       log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage))
        return nil
 }
index bff781d..fcecf4a 100644 (file)
 package publisher
 
 import (
-       "encoding/json"
        "errors"
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        "github.com/open-policy-agent/opa/v1/storage"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/mock"
+       "policy-opa-pdp/cfg"
        "policy-opa-pdp/pkg/opasdk"
+       "strings"
        "testing"
 )
 
@@ -94,14 +95,13 @@ func TestSendPatchMessage_MarshalError(t *testing.T) {
 func TestSendPatchMessage_PayloadContent(t *testing.T) {
        sender, mockProducer := getMockSender()
 
-       mockProducer.On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool {
-               var payload PatchKafkaPayload
-               err := json.Unmarshal(msg.Value, &payload)
-               return err == nil &&
-                       len(payload.PatchInfos) == 1 &&
-                       payload.PatchInfos[0].Path.String() == "/policy/config/name" &&
-                       payload.PatchInfos[0].Value == "NewPolicyName"
-       })).Return(nil)
+       mockProducer.
+           On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool {
+               // match based on msg type
+               return strings.Contains(string(msg.Value), "OPA_PDP_DATA_PATCH_SYNC")
+           })).
+           Return(nil)
+       cfg.GroupId = "opa-pdp"
 
        err := sender.SendPatchMessage(samplePatchData())
        assert.NoError(t, err)
index 966cd46..d49716a 100644 (file)
@@ -38,6 +38,7 @@ const (
        PDP_STATE_CHANGE
        PDP_HEALTH_CHECK
        PDP_TOPIC_CHECK
+       OPA_PDP_DATA_PATCH_SYNC
 )
 
 // String representation of PdpMessageType
@@ -53,6 +54,8 @@ func (msgType PdpMessageType) String() string {
                return "PDP_HEALTH_CHECK"
        case PDP_TOPIC_CHECK:
                return "PDP_TOPIC_CHECK"
+       case OPA_PDP_DATA_PATCH_SYNC:
+               return "OPA_PDP_DATA_PATCH_SYNC"
        default:
                return fmt.Sprintf("Unknown PdpMessageType: %d", msgType)
        }