package handler
+
import (
"context"
"encoding/json"
"policy-opa-pdp/pkg/kafkacomm"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/opasdk"
+ "policy-opa-pdp/pkg/model"
)
+
+type LocalHeader struct {
+ MessageType string `json:"messageName"`
+ SourceID string `json:"source-id"`
+}
+
type PatchMessage struct {
- PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+ Header LocalHeader `json:"header"`
+ PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
}
+
// This function handles the incoming kafka messages and dispatches them futher for data patch processing.
func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error {
-
log.Debug("Starting Patch Message Listener.....")
- var stopConsuming bool
- for !stopConsuming {
+ for {
select {
case <-ctx.Done():
log.Debug("Stopping PDP Listener.....")
return nil
- stopConsuming = true ///Loop Exits
default:
message, err := kafkacomm.ReadKafkaMessages(kc)
if err != nil {
var patchMsg PatchMessage
err = json.Unmarshal(message, &patchMsg)
if err != nil {
- log.Warnf("Failed to UnMarshal Messages: %v\n", err)
+ log.Warnf("Failed to UnMarshal PatchMessage: %v\n", err)
continue
}
log.Debugf("Received patch request")
+ // check message type
+ if patchMsg.Header.MessageType != model.OPA_PDP_DATA_PATCH_SYNC.String() {
+ log.Warnf("Ignoring message with unexpected type: %s", patchMsg.Header.MessageType)
+ continue
+ }
+
+ log.Debugf("Received patch request from source: %s", patchMsg.Header.SourceID)
+
if err := data.PatchDataVar(patchMsg.PatchInfos, nil); err != nil {
log.Debugf("patchData failed: %v", err)
} else {
}
}
}
-
}
- return nil
-
}
+
"policy-opa-pdp/cfg"
"policy-opa-pdp/pkg/kafkacomm"
"policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/opasdk"
+ "policy-opa-pdp/pkg/pdpattributes"
)
type RealPatchSender struct {
Producer kafkacomm.KafkaProducerInterface
}
+// Define header structure
+type LocalHeader struct {
+ MessageType model.PdpMessageType `json:"messageName"`
+ SourceID string `json:"source-id"`
+}
+
+// Updated message structure to match the consumer
type PatchKafkaPayload struct {
- PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+ Header LocalHeader `json:"header"`
+ PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
}
func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error {
log.Debugf("In SendPatchMessage")
- var topic string
- topic = cfg.PatchTopic
+
kafkaPayload := PatchKafkaPayload{
+ Header: LocalHeader{
+ MessageType: model.OPA_PDP_DATA_PATCH_SYNC,
+ SourceID: pdpattributes.PdpName,
+ },
PatchInfos: patchInfos,
}
return err
}
+ topic := cfg.PatchTopic
kafkaMessage := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
},
Value: jsonMessage,
}
+
var eventChan chan kafka.Event = nil
err = s.Producer.Produce(kafkaMessage, eventChan)
if err != nil {
- log.Warnf("Error producing message: %v\n", err)
+ log.Warnf("Error producing message: %v", err)
return err
- } else {
- log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage))
}
+ log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage))
return nil
}
package publisher
import (
- "encoding/json"
"errors"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/open-policy-agent/opa/v1/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
+ "policy-opa-pdp/cfg"
"policy-opa-pdp/pkg/opasdk"
+ "strings"
"testing"
)
func TestSendPatchMessage_PayloadContent(t *testing.T) {
sender, mockProducer := getMockSender()
- mockProducer.On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool {
- var payload PatchKafkaPayload
- err := json.Unmarshal(msg.Value, &payload)
- return err == nil &&
- len(payload.PatchInfos) == 1 &&
- payload.PatchInfos[0].Path.String() == "/policy/config/name" &&
- payload.PatchInfos[0].Value == "NewPolicyName"
- })).Return(nil)
+ mockProducer.
+ On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool {
+ // match based on msg type
+ return strings.Contains(string(msg.Value), "OPA_PDP_DATA_PATCH_SYNC")
+ })).
+ Return(nil)
+ cfg.GroupId = "opa-pdp"
err := sender.SendPatchMessage(samplePatchData())
assert.NoError(t, err)
PDP_STATE_CHANGE
PDP_HEALTH_CHECK
PDP_TOPIC_CHECK
+ OPA_PDP_DATA_PATCH_SYNC
)
// String representation of PdpMessageType
return "PDP_HEALTH_CHECK"
case PDP_TOPIC_CHECK:
return "PDP_TOPIC_CHECK"
+ case OPA_PDP_DATA_PATCH_SYNC:
+ return "OPA_PDP_DATA_PATCH_SYNC"
default:
return fmt.Sprintf("Unknown PdpMessageType: %d", msgType)
}