"policy-opa-pdp/pkg/data"
"policy-opa-pdp/pkg/kafkacomm"
"policy-opa-pdp/pkg/log"
- "policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/model"
)
-type LocalHeader struct {
- MessageType string `json:"messageName"`
- SourceID string `json:"source-id"`
-}
-
-type PatchMessage struct {
- Header LocalHeader `json:"header"`
- PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
-}
-
-
// This function handles the incoming kafka messages and dispatches them futher for data patch processing.
func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error {
log.Debug("Starting Patch Message Listener.....")
log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message))
if message != nil {
- var patchMsg PatchMessage
+ var patchMsg model.PatchMessage
err = json.Unmarshal(message, &patchMsg)
if err != nil {
log.Warnf("Failed to UnMarshal PatchMessage: %v\n", err)
"net/http"
"policy-opa-pdp/pkg/data"
"policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/opasdk"
"testing"
"time"
return nil
}
- msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()})
+ msgBytes, _ := json.Marshal(model.PatchMessage{PatchInfos: samplePatchData()})
mockKafkaMessage := &kafka.Message{
Value: []byte(msgBytes),
return errors.New("mock failure")
}
- msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()})
+ msgBytes, _ := json.Marshal(model.PatchMessage{PatchInfos: samplePatchData()})
mockKafkaMessage := &kafka.Message{
Value: []byte(msgBytes),
Producer kafkacomm.KafkaProducerInterface
}
-// Define header structure
-type LocalHeader struct {
- MessageType model.PdpMessageType `json:"messageName"`
- SourceID string `json:"source-id"`
-}
-
-// Updated message structure to match the consumer
-type PatchKafkaPayload struct {
- Header LocalHeader `json:"header"`
- PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
-}
-
func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error {
log.Debugf("In SendPatchMessage")
- kafkaPayload := PatchKafkaPayload{
- Header: LocalHeader{
- MessageType: model.OPA_PDP_DATA_PATCH_SYNC,
+ kafkaPayload := model.PatchMessage{
+ Header: model.Header{
+ MessageType: model.OPA_PDP_DATA_PATCH_SYNC.String(),
SourceID: pdpattributes.PdpName,
},
PatchInfos: patchInfos,
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
import (
"encoding/json"
"fmt"
+ "policy-opa-pdp/pkg/opasdk"
)
// PdpMessageType represents the type of PDP message.
return json.Marshal(p.String())
}
+type Header struct {
+ MessageType string `json:"messageName"`
+ SourceID string `json:"source-id"`
+}
+
+type PatchMessage struct {
+ Header Header `json:"header"`
+ PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+}
+
// PdpStatus represents the PDP_STATUS message sent from PDP to PAP.
// https://github.com/onap/policy-models
// models-pdp/src/main/java/org/onap/policy/models/pdp/concepts/PdpStatus.java