Update the message to include header for patch operation 30/140730/3 1.0.5
authorsrinivasyanamadala <srinivas.yanamadala@techmahindra.com>
Fri, 16 May 2025 10:05:32 +0000 (12:05 +0200)
committersrinivasyanamadala <srinivas.yanamadala@techmahindra.com>
Fri, 16 May 2025 11:01:30 +0000 (13:01 +0200)
Issue-ID: POLICY-5358
Change-Id: I0722a4f9aed9498237bf74d14694e816848809c1
Signed-off-by: srinivasyanamadala <srinivas.yanamadala@techmahindra.com>
pkg/kafkacomm/handler/patch_message_handler.go
pkg/kafkacomm/handler/patch_message_handler_test.go
pkg/kafkacomm/publisher/patch_message_publisher.go
pkg/model/mesages.go

index 25ca969..b520986 100644 (file)
@@ -26,22 +26,10 @@ import (
        "policy-opa-pdp/pkg/data"
        "policy-opa-pdp/pkg/kafkacomm"
        "policy-opa-pdp/pkg/log"
-       "policy-opa-pdp/pkg/opasdk"
        "policy-opa-pdp/pkg/model"
 )
 
 
-type LocalHeader struct {
-       MessageType string `json:"messageName"`
-       SourceID    string `json:"source-id"`
-}
-
-type PatchMessage struct {
-       Header     LocalHeader         `json:"header"`
-       PatchInfos []opasdk.PatchImpl  `json:"patchInfos"`
-}
-
-
 // This function handles the incoming kafka messages and dispatches them futher for data patch processing.
 func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error {
        log.Debug("Starting Patch Message Listener.....")
@@ -58,7 +46,7 @@ func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic
                        log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message))
 
                        if message != nil {
-                               var patchMsg PatchMessage
+                               var patchMsg model.PatchMessage
                                err = json.Unmarshal(message, &patchMsg)
                                if err != nil {
                                        log.Warnf("Failed to UnMarshal PatchMessage: %v\n", err)
index 93114d9..5fa58f6 100644 (file)
@@ -30,6 +30,7 @@ import (
        "net/http"
        "policy-opa-pdp/pkg/data"
        "policy-opa-pdp/pkg/kafkacomm"
+       "policy-opa-pdp/pkg/model"
        "policy-opa-pdp/pkg/opasdk"
        "testing"
        "time"
@@ -56,7 +57,7 @@ func TestPatchMessageHandler_Success(t *testing.T) {
                return nil
        }
 
-       msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()})
+       msgBytes, _ := json.Marshal(model.PatchMessage{PatchInfos: samplePatchData()})
 
        mockKafkaMessage := &kafka.Message{
                Value: []byte(msgBytes),
@@ -83,7 +84,7 @@ func TestPatchMessageHandler_PatchFail(t *testing.T) {
                return errors.New("mock failure")
        }
 
-       msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()})
+       msgBytes, _ := json.Marshal(model.PatchMessage{PatchInfos: samplePatchData()})
 
        mockKafkaMessage := &kafka.Message{
                Value: []byte(msgBytes),
index 5cd3452..50c9a45 100644 (file)
@@ -34,24 +34,12 @@ type RealPatchSender struct {
        Producer kafkacomm.KafkaProducerInterface
 }
 
-// Define header structure
-type LocalHeader struct {
-       MessageType model.PdpMessageType `json:"messageName"`
-       SourceID    string               `json:"source-id"`
-}
-
-// Updated message structure to match the consumer
-type PatchKafkaPayload struct {
-       Header     LocalHeader         `json:"header"`
-       PatchInfos []opasdk.PatchImpl  `json:"patchInfos"`
-}
-
 func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error {
        log.Debugf("In SendPatchMessage")
 
-       kafkaPayload := PatchKafkaPayload{
-               Header: LocalHeader{
-                       MessageType: model.OPA_PDP_DATA_PATCH_SYNC,
+       kafkaPayload := model.PatchMessage{
+               Header: model.Header{
+                       MessageType: model.OPA_PDP_DATA_PATCH_SYNC.String(),
                        SourceID:    pdpattributes.PdpName,
                },
                PatchInfos: patchInfos,
index d49716a..7c4171d 100644 (file)
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@ package model
 import (
        "encoding/json"
        "fmt"
+       "policy-opa-pdp/pkg/opasdk"
 )
 
 // PdpMessageType represents the type of PDP message.
@@ -65,6 +66,16 @@ func (p PdpMessageType) MarshalJSON() ([]byte, error) {
        return json.Marshal(p.String())
 }
 
+type Header struct {
+       MessageType string `json:"messageName"`
+       SourceID    string `json:"source-id"`
+}
+
+type PatchMessage struct {
+       Header     Header         `json:"header"`
+       PatchInfos []opasdk.PatchImpl  `json:"patchInfos"`
+}
+
 // PdpStatus represents the PDP_STATUS message sent from PDP to PAP.
 // https://github.com/onap/policy-models
 // models-pdp/src/main/java/org/onap/policy/models/pdp/concepts/PdpStatus.java