From: srinivasyanamadala Date: Thu, 15 May 2025 12:26:28 +0000 (+0200) Subject: Update the kafka message to include header for patch operation X-Git-Tag: 1.0.5~1 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=72c1e46905b450331d04f50a9af325f509fc5749;p=policy%2Fopa-pdp.git Update the kafka message to include header for patch operation Issue-ID: POLICY-5358 Change-Id: Ia4df78be6e3d05bbdcb01ff65849701274670310 Signed-off-by: srinivasyanamadala --- diff --git a/pkg/kafkacomm/handler/patch_message_handler.go b/pkg/kafkacomm/handler/patch_message_handler.go index d537e61..25ca969 100644 --- a/pkg/kafkacomm/handler/patch_message_handler.go +++ b/pkg/kafkacomm/handler/patch_message_handler.go @@ -19,6 +19,7 @@ package handler + import ( "context" "encoding/json" @@ -26,23 +27,29 @@ import ( "policy-opa-pdp/pkg/kafkacomm" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/opasdk" + "policy-opa-pdp/pkg/model" ) + +type LocalHeader struct { + MessageType string `json:"messageName"` + SourceID string `json:"source-id"` +} + type PatchMessage struct { - PatchInfos []opasdk.PatchImpl `json:"patchInfos"` + Header LocalHeader `json:"header"` + PatchInfos []opasdk.PatchImpl `json:"patchInfos"` } + // This function handles the incoming kafka messages and dispatches them futher for data patch processing. func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error { - log.Debug("Starting Patch Message Listener.....") - var stopConsuming bool - for !stopConsuming { + for { select { case <-ctx.Done(): log.Debug("Stopping PDP Listener.....") return nil - stopConsuming = true ///Loop Exits default: message, err := kafkacomm.ReadKafkaMessages(kc) if err != nil { @@ -54,11 +61,19 @@ func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic var patchMsg PatchMessage err = json.Unmarshal(message, &patchMsg) if err != nil { - log.Warnf("Failed to UnMarshal Messages: %v\n", err) + log.Warnf("Failed to UnMarshal PatchMessage: %v\n", err) continue } log.Debugf("Received patch request") + // check message type + if patchMsg.Header.MessageType != model.OPA_PDP_DATA_PATCH_SYNC.String() { + log.Warnf("Ignoring message with unexpected type: %s", patchMsg.Header.MessageType) + continue + } + + log.Debugf("Received patch request from source: %s", patchMsg.Header.SourceID) + if err := data.PatchDataVar(patchMsg.PatchInfos, nil); err != nil { log.Debugf("patchData failed: %v", err) } else { @@ -66,8 +81,6 @@ func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic } } } - } - return nil - } + diff --git a/pkg/kafkacomm/publisher/patch_message_publisher.go b/pkg/kafkacomm/publisher/patch_message_publisher.go index 2be1655..5cd3452 100644 --- a/pkg/kafkacomm/publisher/patch_message_publisher.go +++ b/pkg/kafkacomm/publisher/patch_message_publisher.go @@ -25,22 +25,35 @@ import ( "policy-opa-pdp/cfg" "policy-opa-pdp/pkg/kafkacomm" "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/model" "policy-opa-pdp/pkg/opasdk" + "policy-opa-pdp/pkg/pdpattributes" ) type RealPatchSender struct { Producer kafkacomm.KafkaProducerInterface } +// Define header structure +type LocalHeader struct { + MessageType model.PdpMessageType `json:"messageName"` + SourceID string `json:"source-id"` +} + +// Updated message structure to match the consumer type PatchKafkaPayload struct { - PatchInfos []opasdk.PatchImpl `json:"patchInfos"` + Header LocalHeader `json:"header"` + PatchInfos []opasdk.PatchImpl `json:"patchInfos"` } func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error { log.Debugf("In SendPatchMessage") - var topic string - topic = cfg.PatchTopic + kafkaPayload := PatchKafkaPayload{ + Header: LocalHeader{ + MessageType: model.OPA_PDP_DATA_PATCH_SYNC, + SourceID: pdpattributes.PdpName, + }, PatchInfos: patchInfos, } @@ -50,6 +63,7 @@ func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error return err } + topic := cfg.PatchTopic kafkaMessage := &kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &topic, @@ -57,14 +71,14 @@ func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error }, Value: jsonMessage, } + var eventChan chan kafka.Event = nil err = s.Producer.Produce(kafkaMessage, eventChan) if err != nil { - log.Warnf("Error producing message: %v\n", err) + log.Warnf("Error producing message: %v", err) return err - } else { - log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage)) } + log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage)) return nil } diff --git a/pkg/kafkacomm/publisher/patch_message_publisher_test.go b/pkg/kafkacomm/publisher/patch_message_publisher_test.go index bff781d..fcecf4a 100644 --- a/pkg/kafkacomm/publisher/patch_message_publisher_test.go +++ b/pkg/kafkacomm/publisher/patch_message_publisher_test.go @@ -20,13 +20,14 @@ package publisher import ( - "encoding/json" "errors" "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/open-policy-agent/opa/v1/storage" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "policy-opa-pdp/cfg" "policy-opa-pdp/pkg/opasdk" + "strings" "testing" ) @@ -94,14 +95,13 @@ func TestSendPatchMessage_MarshalError(t *testing.T) { func TestSendPatchMessage_PayloadContent(t *testing.T) { sender, mockProducer := getMockSender() - mockProducer.On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool { - var payload PatchKafkaPayload - err := json.Unmarshal(msg.Value, &payload) - return err == nil && - len(payload.PatchInfos) == 1 && - payload.PatchInfos[0].Path.String() == "/policy/config/name" && - payload.PatchInfos[0].Value == "NewPolicyName" - })).Return(nil) + mockProducer. + On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool { + // match based on msg type + return strings.Contains(string(msg.Value), "OPA_PDP_DATA_PATCH_SYNC") + })). + Return(nil) + cfg.GroupId = "opa-pdp" err := sender.SendPatchMessage(samplePatchData()) assert.NoError(t, err) diff --git a/pkg/model/mesages.go b/pkg/model/mesages.go index 966cd46..d49716a 100644 --- a/pkg/model/mesages.go +++ b/pkg/model/mesages.go @@ -38,6 +38,7 @@ const ( PDP_STATE_CHANGE PDP_HEALTH_CHECK PDP_TOPIC_CHECK + OPA_PDP_DATA_PATCH_SYNC ) // String representation of PdpMessageType @@ -53,6 +54,8 @@ func (msgType PdpMessageType) String() string { return "PDP_HEALTH_CHECK" case PDP_TOPIC_CHECK: return "PDP_TOPIC_CHECK" + case OPA_PDP_DATA_PATCH_SYNC: + return "OPA_PDP_DATA_PATCH_SYNC" default: return fmt.Sprintf("Unknown PdpMessageType: %d", msgType) }