From: srinivasyanamadala Date: Fri, 16 May 2025 10:05:32 +0000 (+0200) Subject: Update the message to include header for patch operation X-Git-Tag: 1.0.5^0 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=23be260d297a39eb8b47ecd3fa2cecf4fbf46ad4;p=policy%2Fopa-pdp.git Update the message to include header for patch operation Issue-ID: POLICY-5358 Change-Id: I0722a4f9aed9498237bf74d14694e816848809c1 Signed-off-by: srinivasyanamadala --- diff --git a/pkg/kafkacomm/handler/patch_message_handler.go b/pkg/kafkacomm/handler/patch_message_handler.go index 25ca969..b520986 100644 --- a/pkg/kafkacomm/handler/patch_message_handler.go +++ b/pkg/kafkacomm/handler/patch_message_handler.go @@ -26,22 +26,10 @@ import ( "policy-opa-pdp/pkg/data" "policy-opa-pdp/pkg/kafkacomm" "policy-opa-pdp/pkg/log" - "policy-opa-pdp/pkg/opasdk" "policy-opa-pdp/pkg/model" ) -type LocalHeader struct { - MessageType string `json:"messageName"` - SourceID string `json:"source-id"` -} - -type PatchMessage struct { - Header LocalHeader `json:"header"` - PatchInfos []opasdk.PatchImpl `json:"patchInfos"` -} - - // This function handles the incoming kafka messages and dispatches them futher for data patch processing. func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error { log.Debug("Starting Patch Message Listener.....") @@ -58,7 +46,7 @@ func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message)) if message != nil { - var patchMsg PatchMessage + var patchMsg model.PatchMessage err = json.Unmarshal(message, &patchMsg) if err != nil { log.Warnf("Failed to UnMarshal PatchMessage: %v\n", err) diff --git a/pkg/kafkacomm/handler/patch_message_handler_test.go b/pkg/kafkacomm/handler/patch_message_handler_test.go index 93114d9..5fa58f6 100644 --- a/pkg/kafkacomm/handler/patch_message_handler_test.go +++ b/pkg/kafkacomm/handler/patch_message_handler_test.go @@ -30,6 +30,7 @@ import ( "net/http" "policy-opa-pdp/pkg/data" "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/model" "policy-opa-pdp/pkg/opasdk" "testing" "time" @@ -56,7 +57,7 @@ func TestPatchMessageHandler_Success(t *testing.T) { return nil } - msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()}) + msgBytes, _ := json.Marshal(model.PatchMessage{PatchInfos: samplePatchData()}) mockKafkaMessage := &kafka.Message{ Value: []byte(msgBytes), @@ -83,7 +84,7 @@ func TestPatchMessageHandler_PatchFail(t *testing.T) { return errors.New("mock failure") } - msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()}) + msgBytes, _ := json.Marshal(model.PatchMessage{PatchInfos: samplePatchData()}) mockKafkaMessage := &kafka.Message{ Value: []byte(msgBytes), diff --git a/pkg/kafkacomm/publisher/patch_message_publisher.go b/pkg/kafkacomm/publisher/patch_message_publisher.go index 5cd3452..50c9a45 100644 --- a/pkg/kafkacomm/publisher/patch_message_publisher.go +++ b/pkg/kafkacomm/publisher/patch_message_publisher.go @@ -34,24 +34,12 @@ type RealPatchSender struct { Producer kafkacomm.KafkaProducerInterface } -// Define header structure -type LocalHeader struct { - MessageType model.PdpMessageType `json:"messageName"` - SourceID string `json:"source-id"` -} - -// Updated message structure to match the consumer -type PatchKafkaPayload struct { - Header LocalHeader `json:"header"` - PatchInfos []opasdk.PatchImpl `json:"patchInfos"` -} - func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error { log.Debugf("In SendPatchMessage") - kafkaPayload := PatchKafkaPayload{ - Header: LocalHeader{ - MessageType: model.OPA_PDP_DATA_PATCH_SYNC, + kafkaPayload := model.PatchMessage{ + Header: model.Header{ + MessageType: model.OPA_PDP_DATA_PATCH_SYNC.String(), SourceID: pdpattributes.PdpName, }, PatchInfos: patchInfos, diff --git a/pkg/model/mesages.go b/pkg/model/mesages.go index d49716a..7c4171d 100644 --- a/pkg/model/mesages.go +++ b/pkg/model/mesages.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ package model import ( "encoding/json" "fmt" + "policy-opa-pdp/pkg/opasdk" ) // PdpMessageType represents the type of PDP message. @@ -65,6 +66,16 @@ func (p PdpMessageType) MarshalJSON() ([]byte, error) { return json.Marshal(p.String()) } +type Header struct { + MessageType string `json:"messageName"` + SourceID string `json:"source-id"` +} + +type PatchMessage struct { + Header Header `json:"header"` + PatchInfos []opasdk.PatchImpl `json:"patchInfos"` +} + // PdpStatus represents the PDP_STATUS message sent from PDP to PAP. // https://github.com/onap/policy-models // models-pdp/src/main/java/org/onap/policy/models/pdp/concepts/PdpStatus.java