Support Data Consistency for Dynamic Data Update 13/140713/4
authorShalini Shivam <ss00765416@techmahindra.com>
Thu, 1 May 2025 11:33:57 +0000 (13:33 +0200)
committerShalini Shivam <ss00765416@techmahindra.com>
Tue, 6 May 2025 10:36:44 +0000 (12:36 +0200)
Issue-ID: POLICY-5349
Change-Id: I2192f6b8d0c1d0593c324def2945596fd08ac946
Signed-off-by: Shalini Shivam <ss00765416@techmahindra.com>
31 files changed:
Dockerfile
api/openapi.yaml
api/register-handlers.go
api/register-handlers_test.go
cfg/config.go
cfg/config_test.go
cmd/opa-pdp/opa-pdp.go
cmd/opa-pdp/opa-pdp_test.go
consts/constants.go
pkg/data/data-handler.go
pkg/data/data-handler_test.go
pkg/kafkacomm/handler/patch_message_handler.go [new file with mode: 0644]
pkg/kafkacomm/handler/patch_message_handler_test.go [new file with mode: 0644]
pkg/kafkacomm/handler/pdp_message_handler_test.go
pkg/kafkacomm/handler/pdp_update_deploy_policy.go
pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go
pkg/kafkacomm/handler/pdp_update_message_handler.go
pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go
pkg/kafkacomm/pdp_topic_consumer.go
pkg/kafkacomm/pdp_topic_consumer_test.go
pkg/kafkacomm/pdp_topic_producer.go
pkg/kafkacomm/publisher/patch_message_publisher.go [new file with mode: 0644]
pkg/kafkacomm/publisher/patch_message_publisher_test.go [new file with mode: 0644]
pkg/kafkacomm/publisher/pdp-heartbeat.go
pkg/kafkacomm/publisher/pdp-heartbeat_test.go
pkg/kafkacomm/publisher/pdp-status-publisher.go
pkg/metrics/counters.go
pkg/metrics/statistics-provider.go
pkg/model/oapicodegen/models.go
pkg/pdpattributes/pdpattributes.go
pkg/utils/utils.go

index 5d2c43e..3b14c62 100644 (file)
@@ -19,7 +19,7 @@
 FROM curlimages/curl:7.78.0 AS build
 
 # Get OPA
-RUN curl -Lo /tmp/opa https://github.com/open-policy-agent/opa/releases/download/v0.69.0/opa_linux_amd64
+RUN curl --proto "=https" -Lo /tmp/opa https://github.com/open-policy-agent/opa/releases/download/v0.69.0/opa_linux_amd64
 
 FROM golang:1.23 AS compile
 
@@ -27,7 +27,18 @@ RUN mkdir /app
 
 COPY go.mod go.sum /app/
 
-COPY . .
+# Copy individual files and directories
+COPY Dockerfile /go/
+COPY api /go/api
+COPY cfg /go/cfg
+COPY cmd /go/cmd
+COPY consts /go/consts
+COPY go.mod /go/
+COPY go.sum /go/
+COPY pkg /go/pkg
+COPY sonar-project.properties /go/
+COPY version /go/
+COPY version.properties /go/
 
 RUN mkdir -p /app/cfg /app/consts /app/api /app/cmd /app/pkg /app/bundles
 COPY cfg /app/cfg
@@ -42,9 +53,9 @@ WORKDIR /app
 # Build the binary
 RUN GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o /app/opa-pdp /app/cmd/opa-pdp/opa-pdp.go
 
-FROM ubuntu
+FROM ubuntu:24.04
 
-RUN apt-get update && apt-get install -y netcat-openbsd curl && rm -rf /var/lib/apt/lists/*\
+RUN apt-get update && apt-get --no-install-recommends install -y netcat-openbsd curl && rm -rf /var/lib/apt/lists/*\
     && mkdir -p /app /opt/policies /opt/data /var/logs \
     && chown -R ubuntu:ubuntu /app /opt/policies /opt/data /var/logs
 
@@ -52,7 +63,8 @@ COPY --from=compile /app /app
 # Copy our opa executable from build stage
 COPY --from=build /tmp/opa /app/opa
 
-RUN chmod +x /app/opa-pdp && chmod 755 /app/opa && chmod 777 /app/bundles
+RUN chown 1000:1000 /app/opa-pdp && chown 1000:1000 /app/opa && chown 1000:1000 /app/bundles
+RUN chmod u+x /app/opa-pdp && chmod u+x /app/opa && chmod u+x /app/bundles
 
 
 # Switch to the non-root user and 1000 is for ubuntu
@@ -63,4 +75,3 @@ EXPOSE 8282
 
 # Command to run OPA with the policies
 CMD ["/app/opa-pdp"]
-
index d8afb2f..e448ab0 100644 (file)
@@ -112,7 +112,7 @@ paths:
       - basicAuth: []
       x-interface info:
         last-mod-release: Paris
-        pdpo-version: 1.0.0
+        pdpo-version: 1.0.3
       x-codegen-request-body-name: body
   /healthcheck:
     get:
@@ -172,7 +172,7 @@ paths:
       - basicAuth: []
       x-interface info:
         last-mod-release: Paris
-        pdpo-version: 1.0.0
+        pdpo-version: 1.0.3
   /statistics:
     get:
       tags:
@@ -231,7 +231,7 @@ paths:
       - basicAuth: []
       x-interface info:
         last-mod-release: Paris
-        pdpo-version: 1.0.0
+        pdpo-version: 1.0.3
   /data/{path}:
     patch:
       tags:
@@ -325,7 +325,7 @@ paths:
       - basicAuth: []
       x-interface info:
         last-mod-release: Paris
-        pdpo-version: 1.0.0
+        pdpo-version: 1.0.3
       x-codegen-request-body-name: body
     get:
       tags:
@@ -387,6 +387,11 @@ paths:
         500:
           description: Internal Server Error
           content: {}
+      security:
+        - basicAuth: []
+      x-interface info:
+        last-mod-release: Paris
+        pdpo-version: 1.0.3
 components:
   schemas:
     ErrorResponse:
@@ -552,6 +557,12 @@ components:
         decisionFailureCount:
           type: integer
           format: int64
+        dynamicDataUpdateSuccessCount:
+          type: integer
+          format: int64
+        dynamicDataUpdateFailureCount:
+          type: integer
+          format: int64
     OPADataResponse:
       type: object
       properties:
@@ -565,4 +576,3 @@ components:
       type: http
       description: ""
       scheme: basic
-
index 1bd1815..25f1688 100644 (file)
 package api
 
 import (
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
        "net/http"
        "policy-opa-pdp/cfg"
        "policy-opa-pdp/pkg/data"
        "policy-opa-pdp/pkg/decision"
        "policy-opa-pdp/pkg/healthcheck"
+       "policy-opa-pdp/pkg/log"
        "policy-opa-pdp/pkg/metrics"
        "policy-opa-pdp/pkg/opasdk"
        "time"
-       "github.com/prometheus/client_golang/prometheus"
-       "github.com/prometheus/client_golang/prometheus/promhttp"
 )
 
 // RegisterHandlers registers the HTTP handlers for the service.
@@ -57,7 +58,7 @@ func RegisterHandlers() {
 
        http.Handle("/policy/pdpo/v1/data", basicAuth(trackDataResponseTime(dataHandler)))
 
-        http.Handle("/metrics", basicAuth(http.HandlerFunc(metricsHandler)))
+       http.Handle("/metrics", basicAuth(http.HandlerFunc(metricsHandler)))
 
 }
 
@@ -66,14 +67,14 @@ func metricsHandler(w http.ResponseWriter, r *http.Request) {
        promhttp.Handler().ServeHTTP(w, r)
 }
 
-//Track Decision response time metrics
+// Track Decision response time metrics
 func trackDecisionResponseTime(next http.HandlerFunc) http.HandlerFunc {
-       return trackResponseTime(metrics.DecisionResponseTime, next)
+       return trackResponseTime(metrics.DecisionResponseTime_Prom, next)
 }
 
-//Track Data response time metrics
+// Track Data response time metrics
 func trackDataResponseTime(next http.HandlerFunc) http.HandlerFunc {
-       return trackResponseTime(metrics.DataResponseTime, next)
+       return trackResponseTime(metrics.DataResponseTime_Prom, next)
 }
 
 func trackResponseTime(metricCollector prometheus.Observer, next http.HandlerFunc) http.HandlerFunc {
@@ -104,3 +105,12 @@ func validateCredentials(username, password string) bool {
        validPass := cfg.Password
        return username == validUser && password == validPass
 }
+
+// handles readiness probe endpoint
+func readinessProbe(res http.ResponseWriter, req *http.Request) {
+       res.WriteHeader(http.StatusOK)
+       _, err := res.Write([]byte("Ready"))
+       if err != nil {
+               log.Errorf("Failed to write response: %v", err)
+       }
+}
index 92ad776..4905eed 100644 (file)
@@ -20,6 +20,7 @@
 package api
 
 import (
+       "github.com/stretchr/testify/assert"
        "net/http"
        "net/http/httptest"
        "policy-opa-pdp/cfg"
@@ -27,7 +28,6 @@ import (
        "policy-opa-pdp/pkg/healthcheck"
        "testing"
        "time"
-       "github.com/stretchr/testify/assert"
 )
 
 // Mock configuration
@@ -94,7 +94,6 @@ func TestBasicAuth(t *testing.T) {
        }
 }
 
-
 type mockObserver struct {
        observedDuration float64
 }
@@ -149,3 +148,34 @@ func TestTrackResponseTime(t *testing.T) {
        handler(res, req)
        assert.NotNil(t, observer.observedDuration)
 }
+
+func TestMetricsHandler(t *testing.T) {
+       req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
+       rr := httptest.NewRecorder()
+
+       metricsHandler(rr, req)
+
+       resp := rr.Result()
+       defer resp.Body.Close()
+
+       assert.Equal(t, http.StatusOK, resp.StatusCode, "expected status OK")
+
+       contentType := resp.Header.Get("Content-Type")
+       assert.Contains(t, contentType, "text/plain", "expected Prometheus content type")
+
+}
+
+func TestReadinessProbe(t *testing.T) {
+       req := httptest.NewRequest(http.MethodGet, "/ready", nil)
+       rr := httptest.NewRecorder()
+
+       readinessProbe(rr, req)
+
+       resp := rr.Result()
+       defer resp.Body.Close()
+
+       assert.Equal(t, http.StatusOK, resp.StatusCode, "expected HTTP 200 OK")
+
+       body := rr.Body.String()
+       assert.Equal(t, "Ready", body, "expected response body to be 'Ready'")
+}
index 62d4853..459d338 100644 (file)
@@ -41,16 +41,19 @@ import (
 // KAFKA_USERNAME  - The Kafka username for SASL authentication.
 // KAFKA_PASSWORD  - The Kafka password for SASL authentication.
 var (
-       LogLevel        string
-       BootstrapServer string
-       Topic           string
-       GroupId         string
-       Username        string
-       Password        string
-       UseSASLForKAFKA string
-       KAFKA_USERNAME  string
-       KAFKA_PASSWORD  string
-       JAASLOGIN       string
+       LogLevel         string
+       BootstrapServer  string
+       Topic            string
+       PatchTopic       string
+       GroupId          string
+       Username         string
+       Password         string
+       UseSASLForKAFKA  string
+       KAFKA_USERNAME   string
+       KAFKA_PASSWORD   string
+       JAASLOGIN        string
+       UseKafkaForPatch bool
+       PatchGroupId     string
 )
 
 // Initializes the configuration settings.
@@ -66,14 +69,16 @@ func init() {
        LogLevel = getEnv("LOG_LEVEL", "info")
        BootstrapServer = getEnv("KAFKA_URL", "kafka:9092")
        Topic = getEnv("PAP_TOPIC", "policy-pdp-pap")
+       PatchTopic = getEnv("PATCH_TOPIC", "opa-pdp-data")
        GroupId = getEnv("GROUPID", "opa-pdp-"+uuid.New().String())
+       PatchGroupId = getEnv("PATCH_GROUPID", "opa-pdp-data-"+uuid.New().String())
        Username = getEnv("API_USER", "policyadmin")
        Password = getEnv("API_PASSWORD", "zb!XztG34")
        UseSASLForKAFKA = getEnv("UseSASLForKAFKA", "false")
        KAFKA_USERNAME, KAFKA_PASSWORD = getSaslJAASLOGINFromEnv(JAASLOGIN)
        log.Debugf("Username: %s", KAFKA_USERNAME)
        log.Debugf("Password: %s", KAFKA_PASSWORD)
-
+       UseKafkaForPatch = getEnvAsBool("USE_KAFKA_FOR_PATCH", false)
        log.Debug("Configuration module: environment initialised")
 }
 
@@ -109,6 +114,19 @@ func getLogLevel(key string, defaultVal string) log.Level {
        }
 }
 
+func getEnvAsBool(key string, defaultVal bool) bool {
+       if value, exists := os.LookupEnv(key); exists {
+               parsed, err := strconv.ParseBool(value)
+               if err != nil {
+                       log.Warnf("%v is set but not a valid bool (%v), using default: %v", key, value, defaultVal)
+                       return defaultVal
+               }
+               return parsed
+       }
+       log.Warnf("%v not defined, using default value: %v", key, defaultVal)
+       return defaultVal
+}
+
 func getSaslJAASLOGINFromEnv(JAASLOGIN string) (string, string) {
        // Retrieve the value of the environment variable
        decodingConfigBytes := getEnv("JAASLOGIN", "JAASLOGIN")
index 092a67e..652aed0 100644 (file)
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -131,3 +131,36 @@ func TestGetSaslJAASLOGINFromEnv_MissingEnv(t *testing.T) {
        assert.Empty(t, username, "Expected username to be empty for missing environment variable")
        assert.Empty(t, password, "Expected password to be empty for missing environment variable")
 }
+
+func TestGetEnvAsBool(t *testing.T) {
+       t.Run("valid boolean true", func(t *testing.T) {
+               os.Setenv("USE_KAFKA_FOR_PATCH", "true")
+               defer os.Unsetenv("USE_KAFKA_FOR_PATCH")
+
+               result := getEnvAsBool("USE_KAFKA_FOR_PATCH", false)
+               assert.True(t, result)
+       })
+
+       t.Run("valid boolean false", func(t *testing.T) {
+               os.Setenv("USE_KAFKA_FOR_PATCH", "false")
+               defer os.Unsetenv("USE_KAFKA_FOR_PATCH")
+
+               result := getEnvAsBool("USE_KAFKA_FOR_PATCH", true)
+               assert.False(t, result)
+       })
+
+       t.Run("invalid boolean value", func(t *testing.T) {
+               os.Setenv("USE_KAFKA_FOR_PATCH", "notabool")
+               defer os.Unsetenv("USE_KAFKA_FOR_PATCH")
+
+               result := getEnvAsBool("USE_KAFKA_FOR_PATCH", true)
+               assert.True(t, result) // should return default (true) because parsing fails
+       })
+
+       t.Run("missing env variable", func(t *testing.T) {
+               os.Unsetenv("USE_KAFKA_FOR_PATCH") // ensure it's not set
+
+               result := getEnvAsBool("USE_KAFKA_FOR_PATCH", false)
+               assert.False(t, result) // should return default (false)
+       })
+}
index f4b8093..4005481 100644 (file)
@@ -29,6 +29,7 @@ import (
        h "policy-opa-pdp/api"
        "policy-opa-pdp/cfg"
        "policy-opa-pdp/consts"
+       "policy-opa-pdp/pkg/data"
        "policy-opa-pdp/pkg/kafkacomm"
        "policy-opa-pdp/pkg/kafkacomm/handler"
        "policy-opa-pdp/pkg/kafkacomm/publisher"
@@ -42,22 +43,30 @@ import (
 var (
        bootstrapServers = cfg.BootstrapServer //The Kafka bootstrap server address.
        topic            = cfg.Topic           //The Kafka topic to subscribe to.
+       patchTopic       = cfg.PatchTopic
+       patchMsgProducer *kafkacomm.KafkaProducer
+       patchMsgConsumer *kafkacomm.KafkaConsumer
+       groupId          = cfg.GroupId
+       patchGroupId     = cfg.PatchGroupId
 )
 
 // Declare function variables for dependency injection makes it more testable
 var (
-       initializeHandlersFunc    = initializeHandlers
-       startHTTPServerFunc       = startHTTPServer
-       shutdownHTTPServerFunc    = shutdownHTTPServer
-       waitForServerFunc         = waitForServer
-       initializeOPAFunc         = initializeOPA
-       startKafkaConsAndProdFunc = startKafkaConsAndProd
-       handleMessagesFunc        = handleMessages
-       handleShutdownFunc        = handleShutdown
+       initializeHandlersFunc         = initializeHandlers
+       startHTTPServerFunc            = startHTTPServer
+       shutdownHTTPServerFunc         = shutdownHTTPServer
+       waitForServerFunc              = waitForServer
+       initializeOPAFunc              = initializeOPA
+       startKafkaConsAndProdFunc      = startKafkaConsAndProd
+       handleMessagesFunc             = handleMessages
+       handleShutdownFunc             = handleShutdown
+       startPatchKafkaConsAndProdFunc = startPatchKafkaConsAndProd
+       handlePatchMessagesFunc        = handlePatchMessages
 )
 
 // main function
 func main() {
+       var useKafkaForPatch = cfg.UseKafkaForPatch
        log.Debugf("Starting OPA PDP Service")
 
        ctx, cancel := context.WithCancel(context.Background())
@@ -91,6 +100,16 @@ func main() {
        // start pdp message handler in a seperate routine
        handleMessagesFunc(ctx, kc, sender)
 
+       if useKafkaForPatch {
+               patchMsgConsumer, patchMsgProducer, err := startPatchKafkaConsAndProdFunc()
+               if err != nil || patchMsgConsumer == nil {
+                       log.Warnf("Kafka consumer initialization failed: %v", err)
+               }
+               log.Debugf("Producer initialized is: %v", patchMsgProducer)
+               // start patch message handler in a seperate routine
+               handlePatchMessagesFunc(ctx, patchMsgConsumer)
+       }
+
        time.Sleep(10 * time.Second)
 
        pdpattributes.SetPdpHeartbeatInterval(int64(consts.DefaultHeartbeatMS))
@@ -101,7 +120,9 @@ func main() {
        // Handle OS Interrupts and Graceful Shutdown
        interruptChannel := make(chan os.Signal, 1)
        signal.Notify(interruptChannel, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
-       handleShutdownFunc(kc, interruptChannel, cancel, producer)
+       consumers := []*kafkacomm.KafkaConsumer{kc, patchMsgConsumer}
+       producers := []*kafkacomm.KafkaProducer{producer, patchMsgProducer}
+       handleShutdownFunc(consumers, interruptChannel, cancel, producers)
 }
 
 type PdpMessageHandlerFunc func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error
@@ -119,6 +140,21 @@ func handleMessages(ctx context.Context, kc *kafkacomm.KafkaConsumer, sender *pu
        }()
 }
 
+type PatchMessageHandlerFunc func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error
+
+var PatchMessageHandler PatchMessageHandlerFunc = handler.PatchMessageHandler
+
+// starts patchMessage Handler in a seperate routine which handles incoming messages on Kfka topic
+func handlePatchMessages(ctx context.Context, kc *kafkacomm.KafkaConsumer) {
+
+       go func() {
+               err := PatchMessageHandler(ctx, kc, patchTopic)
+               if err != nil {
+                       log.Warnf("Erro in Patch Message Handler: %v", err)
+               }
+       }()
+}
+
 // Register Handlers
 func initializeHandlers() {
        h.RegisterHandlers()
@@ -163,7 +199,7 @@ func initializeOPA() error {
        return nil
 }
 
-type NewKafkaConsumerFunc func() (*kafkacomm.KafkaConsumer, error)
+type NewKafkaConsumerFunc func(topic string, groupid string) (*kafkacomm.KafkaConsumer, error)
 
 var NewKafkaConsumer NewKafkaConsumerFunc = kafkacomm.NewKafkaConsumer
 
@@ -172,7 +208,8 @@ type GetKafkaProducerFunc func(bootstrapServers string, topic string) (*kafkacom
 var GetKafkaProducer GetKafkaProducerFunc = kafkacomm.GetKafkaProducer
 
 func startKafkaConsAndProd() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
-       kc, err := NewKafkaConsumer()
+       log.Debugf("Topic start :::: %s", topic)
+       kc, err := NewKafkaConsumer(topic, groupId)
        if err != nil {
                log.Warnf("Failed to create Kafka consumer: %v", err)
                return nil, nil, err
@@ -185,7 +222,23 @@ func startKafkaConsAndProd() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer
        return kc, producer, nil
 }
 
-func handleShutdown(kc *kafkacomm.KafkaConsumer, interruptChannel chan os.Signal, cancel context.CancelFunc, producer *kafkacomm.KafkaProducer) {
+func startPatchKafkaConsAndProd() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
+       log.Debugf("Topic start :::: %s", patchTopic)
+       kc, err := NewKafkaConsumer(patchTopic, patchGroupId)
+       if err != nil {
+               log.Warnf("Failed to create Kafka consumer: %v", err)
+               return nil, nil, err
+       }
+       PatchProducer, err := GetKafkaProducer(bootstrapServers, patchTopic)
+       if err != nil {
+               log.Warnf("Failed to create Kafka producer: %v", err)
+               return nil, nil, err
+       }
+       data.PatchProducer = PatchProducer
+       return kc, PatchProducer, nil
+}
+
+func handleShutdown(consumers []*kafkacomm.KafkaConsumer, interruptChannel chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) {
 
 myLoop:
        for {
@@ -200,21 +253,28 @@ myLoop:
        signal.Stop(interruptChannel)
 
        publisher.StopTicker()
-       producer.Close()
-       if kc == nil {
-               log.Debugf("kc is nil so skipping")
-               return
+       for _, producer := range producers {
+               if producer != nil {
+                       producer.Close()
+               }
        }
 
-       if err := kc.Consumer.Unsubscribe(); err != nil {
-               log.Warnf("Failed to unsubscribe consumer: %v", err)
-       } else {
-               log.Debugf("Consumer Unsubscribed....")
-       }
-       if err := kc.Consumer.Close(); err != nil {
-               log.Debug("Failed to close consumer......")
-       } else {
-               log.Debugf("Consumer closed....")
+       for _, consumer := range consumers {
+               if consumer == nil {
+                       log.Debugf("kc is nil so skipping")
+                       continue
+               }
+
+               if err := consumer.Consumer.Unsubscribe(); err != nil {
+                       log.Warnf("Failed to unsubscribe consumer: %v", err)
+               } else {
+                       log.Debugf("Consumer Unsubscribed....")
+               }
+               if err := consumer.Consumer.Close(); err != nil {
+                       log.Debug("Failed to close consumer......")
+               } else {
+                       log.Debugf("Consumer closed....")
+               }
        }
 
        handler.SetShutdownFlag()
index e421f12..d350c97 100644 (file)
@@ -128,7 +128,7 @@ func TestHandleShutdown(t *testing.T) {
        }()
        done := make(chan bool)
        go func() {
-               handleShutdown(mockKafkaConsumer, interruptChannel, cancel, kafkaProducer)
+               handleShutdown([]*kafkacomm.KafkaConsumer{mockKafkaConsumer, nil}, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer})
                done <- true
        }()
 
@@ -185,7 +185,7 @@ func SetupMocks() {
 
        // Mock handleShutdown
        interruptChannel := make(chan os.Signal, 1)
-       handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, kp *kafkacomm.KafkaProducer) {
+       handleShutdownFunc = func(consumers []*kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) {
                interruptChannel <- os.Interrupt
                cancel()
        }
@@ -348,7 +348,7 @@ func TestHandleShutdown_ErrorScenario(t *testing.T) {
 
        done := make(chan bool)
        go func() {
-               handleShutdown(mockKafkaConsumer, interruptChannel, cancel, kafkaProducer)
+               handleShutdown([]*kafkacomm.KafkaConsumer{mockKafkaConsumer}, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer})
                done <- true
        }()
 
@@ -436,6 +436,25 @@ func TestHandleMessages(t *testing.T) {
 
 }
 
+// TestHandleMessages
+func TestHandlePatchMessages(t *testing.T) {
+       message := `{"patchInfos":[{"Path":["node","testcell","testconsistency","testmaxPCI"],"Op":0,"Value":6778}]}`
+       mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
+       expectedError := error(nil)
+
+       kafkaMsg := &kafka.Message{
+               Value: []byte(message),
+       }
+       mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+       mockConsumer := &kafkacomm.KafkaConsumer{
+               Consumer: mockKafkaConsumer,
+       }
+
+       ctx := context.Background()
+       handlePatchMessages(ctx, mockConsumer)
+
+}
+
 // Test to simulate a Kafka initialization failure in the main function.
 func TestMain_KafkaInitializationFailure(t *testing.T) {
        startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
@@ -458,7 +477,7 @@ func TestMain_KafkaInitializationFailure(t *testing.T) {
 
 // Test to validate the main function's handling of shutdown signals.
 func TestMain_HandleShutdownWithSignals(t *testing.T) {
-       handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producer *kafkacomm.KafkaProducer) {
+       handleShutdownFunc = func(consumers []*kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) {
                go func() {
                        interruptChan <- os.Interrupt // Simulate SIGTERM
                }()
@@ -487,7 +506,7 @@ func TestStartKafkaConsumerFailure(t *testing.T) {
        t.Run("Kafka consumer creation failure", func(t *testing.T) {
                originalNewKafkaConsumer := NewKafkaConsumer
                originalGetKafkaProducer := GetKafkaProducer
-               NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) {
+               NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
                        return nil, errors.New("Kafka consumer creation error")
                }
 
@@ -514,7 +533,7 @@ func TestStartKafkaProducerFailure(t *testing.T) {
        t.Run("Kafka producer creation failure", func(t *testing.T) {
                originalNewKafkaConsumer := NewKafkaConsumer
                originalGetKafkaProducer := GetKafkaProducer
-               NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) {
+               NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
                        return mockConsumer, nil
                }
 
@@ -540,7 +559,7 @@ func TestStartKafkaAndProdSuccess(t *testing.T) {
        t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
                originalNewKafkaConsumer := NewKafkaConsumer
                originalGetKafkaProducer := GetKafkaProducer
-               NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) {
+               NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
                        return mockConsumer, nil
                }
                GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
@@ -575,7 +594,7 @@ func TestHandleShutdownWithNilConsumer(t *testing.T) {
        }()
        done := make(chan bool)
        go func() {
-               handleShutdown(nil, interruptChannel, cancel, kafkaProducer) // Pass nil as kc
+               handleShutdown(nil, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer}) // Pass nil as kc
                done <- true
        }()
 
@@ -632,3 +651,75 @@ func TestShutdownHTTPServer_Errors(t *testing.T) {
        shutdownHTTPServer(server)
        assert.True(t, true, "Shutdown error")
 }
+
+// Test to verify that both the Kafka consumer and producer start successfully
+func TestPatchStartKafkaAndProdSuccess(t *testing.T) {
+       t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
+               originalNewKafkaConsumer := NewKafkaConsumer
+               originalGetKafkaProducer := GetKafkaProducer
+               NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
+                       return mockConsumer, nil
+               }
+               GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+                       return mockProducer, nil
+               }
+
+               // Call the function under test
+               consumer, producer, err := startPatchKafkaConsAndProd()
+
+               // Assertions
+               assert.NoError(t, err)
+               assert.NotNil(t, consumer)
+               assert.NotNil(t, producer)
+
+               NewKafkaConsumer = originalNewKafkaConsumer
+               GetKafkaProducer = originalGetKafkaProducer
+       })
+}
+
+// Test to verify error scenarios during Kafka consumer and producer start
+func TestPatchStartKafkaAndProdFailure(t *testing.T) {
+       t.Run("Kafka consumer creation failure", func(t *testing.T) {
+               originalNewKafkaConsumer := NewKafkaConsumer
+               originalGetKafkaProducer := GetKafkaProducer
+               defer func() {
+                       NewKafkaConsumer = originalNewKafkaConsumer
+                       GetKafkaProducer = originalGetKafkaProducer
+               }()
+
+               NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
+                       return nil, errors.New("consumer creation failed")
+               }
+               GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+                       return mockProducer, nil
+               }
+
+               consumer, producer, err := startPatchKafkaConsAndProd()
+
+               assert.Error(t, err)
+               assert.Nil(t, consumer)
+               assert.Nil(t, producer)
+       })
+
+       t.Run("Kafka producer creation failure", func(t *testing.T) {
+               originalNewKafkaConsumer := NewKafkaConsumer
+               originalGetKafkaProducer := GetKafkaProducer
+               defer func() {
+                       NewKafkaConsumer = originalNewKafkaConsumer
+                       GetKafkaProducer = originalGetKafkaProducer
+               }()
+
+               NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
+                       return mockConsumer, nil
+               }
+               GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+                       return nil, errors.New("producer creation failed")
+               }
+
+               consumer, producer, err := startPatchKafkaConsAndProd()
+
+               assert.Error(t, err)
+               assert.Nil(t, consumer)
+               assert.Nil(t, producer)
+       })
+}
index 3623f93..37407eb 100644 (file)
@@ -23,67 +23,70 @@ package consts
 
 // Variables:
 //
-//             LogFilePath         - The file path for the log file.
-//             LogMaxSize          - The maximum size of the log file in megabytes.
-//             LogMaxBackups       - The maximum number of backup log files to retain.
-//             OpasdkConfigPath    - The file path for the OPA SDK configuration.
-//             Opa                 - The file path for the OPA binary.
-//             BuildBundle         - The command to build the bundle.
-//             Policies            - The directory path for policies.
-//             Data                - The directory path for policy data.
-//             DataNode            - The directory path for policy data with node.
-//             Output              - The output flag for bundle commands.
-//             BundleTarGz         - The name of the bundle tar.gz file.
-//             BundleTarGzFile     - The file path for the bundle tar.gz file.
-//             PdpGroup            - The default PDP group.
-//             PdpType             - The type of PDP.
-//             ServerPort          - The port on which the server listens.
-//             ServerWaitUpTime    - The time to wait for the server to be up, in seconds.
-//             ShutdownWaitTime    - The time to wait for the server to shut down, in seconds.
-//             V1Compatible        - The flag for v1 compatibility.
-//             LatestVersion       - The Version set in response for decision
-//             MinorVersion        - The Minor version set in response header for decision
-//             PatchVersion        - The Patch Version set in response header for decison
-//             OpaPdpUrl           - The Healthcheck url for response
-//             HealtCheckStatus    - The bool flag for Healthy field in HealtCheck response
-//             OkCode              - The Code for HealthCheck response
-//             HealthCheckMessage  - The Healtcheck Message
-//              DefaultHeartbeatMS  - The default interval for heartbeat signals in milliseconds.
-//             SingleHierarchy     - The Counter indicates the length of datakey path
-//             PolicyVersion       - constant declared for policy-version
-//             PolicyID            - constant declared for policy-id
-//             RequestId           - constant declared for ONAP Request-ID
-//              MaxOutputResponseLength - constant declared for maximum length of output in response message
+//                     LogFilePath         - The file path for the log file.
+//                     LogMaxSize          - The maximum size of the log file in megabytes.
+//                     LogMaxBackups       - The maximum number of backup log files to retain.
+//                     OpasdkConfigPath    - The file path for the OPA SDK configuration.
+//                     Opa                 - The file path for the OPA binary.
+//                     BuildBundle         - The command to build the bundle.
+//                     Policies            - The directory path for policies.
+//                     Data                - The directory path for policy data.
+//                     DataNode            - The directory path for policy data with node.
+//                     Output              - The output flag for bundle commands.
+//                     BundleTarGz         - The name of the bundle tar.gz file.
+//                     BundleTarGzFile     - The file path for the bundle tar.gz file.
+//                     PdpGroup            - The default PDP group.
+//                     PdpType             - The type of PDP.
+//                     ServerPort          - The port on which the server listens.
+//                     ServerWaitUpTime    - The time to wait for the server to be up, in seconds.
+//                     ShutdownWaitTime    - The time to wait for the server to shut down, in seconds.
+//                     V1Compatible        - The flag for v1 compatibility.
+//                     LatestVersion       - The Version set in response for decision
+//                     MinorVersion        - The Minor version set in response header for decision
+//                     PatchVersion        - The Patch Version set in response header for decison
+//                     OpaPdpUrl           - The Healthcheck url for response
+//                     HealtCheckStatus    - The bool flag for Healthy field in HealtCheck response
+//                     OkCode              - The Code for HealthCheck response
+//                     HealthCheckMessage  - The Healtcheck Message
+//                     DefaultHeartbeatMS  - The default interval for heartbeat signals in milliseconds.
+//                     SingleHierarchy     - The Counter indicates the length of datakey path
+//                     PolicyVersion       - constant declared for policy-version
+//                     PolicyID            - constant declared for policy-id
+//                     RequestId           - constant declared for ONAP Request-ID
+//                  MaxOutputResponseLength - constant declared for maximum length of output in response message
+//                  ContentType              - constant for response Content Type
 var (
-       LogFilePath      = "/var/logs/logs.log"
-       LogMaxSize       = 10
-       LogMaxBackups    = 3
-       OpasdkConfigPath = "/app/config/config.json"
-       Opa              = "/app/opa"
-       BuildBundle      = "build"
-       Policies         = "/opt/policies"
-       Data             = "/opt/data"
-       DataNode         = "/opt/data/node"
-       Output           = "-o"
-       BundleTarGz      = "bundle.tar.gz"
-       BundleTarGzFile  = "/app/bundles/bundle.tar.gz"
-       PdpGroup         = "opaGroup"
-       PdpType             = "opa"
-       ServerPort          = ":8282"
-       ServerWaitUpTime    = 5
-       ShutdownWaitTime    = 5
-       V1Compatible       = "--v1-compatible"
-       LatestVersion       = "1.0.0"
-       MinorVersion        = "0"
-       PatchVersion        = "0"
-       OpaPdpUrl           = "self"
-       HealtCheckStatus    = true
-       OkCode              = int32(200)
-       HealthCheckMessage  = "alive"
-       DefaultHeartbeatMS  = 60000
-       SingleHierarchy     = 4
-       PolicyVersion       = "policy-version"
-       PolicyId            = "policy-id"
-       RequestId          = "X-ONAP-RequestID"
+       LogFilePath             = "/var/logs/logs.log"
+       LogMaxSize              = 10
+       LogMaxBackups           = 3
+       OpasdkConfigPath        = "/app/config/config.json"
+       Opa                     = "/app/opa"
+       BuildBundle             = "build"
+       Policies                = "/opt/policies"
+       Data                    = "/opt/data"
+       DataNode                = "/opt/data/node"
+       Output                  = "-o"
+       BundleTarGz             = "bundle.tar.gz"
+       BundleTarGzFile         = "/app/bundles/bundle.tar.gz"
+       PdpGroup                = "opaGroup"
+       PdpType                 = "opa"
+       ServerPort              = ":8282"
+       ServerWaitUpTime        = 5
+       ShutdownWaitTime        = 5
+       V1Compatible            = "--v1-compatible"
+       LatestVersion           = "1.0.0"
+       MinorVersion            = "0"
+       PatchVersion            = "0"
+       OpaPdpUrl               = "self"
+       HealtCheckStatus        = true
+       OkCode                  = int32(200)
+       HealthCheckMessage      = "alive"
+       DefaultHeartbeatMS      = 60000
+       SingleHierarchy         = 4
+       PolicyVersion           = "policy-version"
+       PolicyId                = "policy-id"
+       RequestId               = "X-ONAP-RequestID"
        MaxOutputResponseLength = 200
+       ContentType             = "Content-Type"
+       ApplicationJson         = "application/json"
 )
index e1dcf17..3267408 100644 (file)
@@ -28,7 +28,10 @@ import (
        "github.com/open-policy-agent/opa/storage"
        "net/http"
        "path/filepath"
+       "policy-opa-pdp/cfg"
        "policy-opa-pdp/consts"
+       "policy-opa-pdp/pkg/kafkacomm"
+       "policy-opa-pdp/pkg/kafkacomm/publisher"
        "policy-opa-pdp/pkg/log"
        "policy-opa-pdp/pkg/metrics"
        "policy-opa-pdp/pkg/model/oapicodegen"
@@ -40,6 +43,7 @@ import (
 
 type (
        checkIfPolicyAlreadyExistsFunc func(policyId string) bool
+       validateRequestFunc            func(requestBody *oapicodegen.OPADataUpdateRequest) error
 )
 
 var (
@@ -49,6 +53,11 @@ var (
        checkIfPolicyAlreadyExistsVar checkIfPolicyAlreadyExistsFunc = policymap.CheckIfPolicyAlreadyExists
        getPolicyByIDVar                                             = getPolicyByID
        extractPatchInfoVar                                          = extractPatchInfo
+       bootstrapServers                                             = cfg.BootstrapServer //The Kafka bootstrap server address.
+       PatchProducer                 kafkacomm.KafkaProducerInterface
+       patchTopic                    = cfg.PatchTopic
+       PatchDataVar                  = PatchData
+       getOperationTypeVar           = getOperationType
 )
 
 // creates a response code map to OPADataUpdateResponse
@@ -69,7 +78,7 @@ func getErrorResponseCodeForOPADataUpdate(httpStatus int) oapicodegen.ErrorRespo
 
 // writes a Error JSON response to the HTTP response writer for OPADataUpdate
 func writeOPADataUpdateErrorJSONResponse(res http.ResponseWriter, status int, errorDescription string, dataErrorRes oapicodegen.ErrorResponse) {
-       res.Header().Set("Content-Type", "application/json")
+       res.Header().Set(consts.ContentType, consts.ApplicationJson)
        res.WriteHeader(status)
        if err := json.NewEncoder(res).Encode(dataErrorRes); err != nil {
                http.Error(res, err.Error(), status)
@@ -113,63 +122,125 @@ func getPolicyByID(policiesMap string, policyId string) (*Policy, error) {
 }
 
 func patchHandler(res http.ResponseWriter, req *http.Request) {
+       var useKafkaForPatch = cfg.UseKafkaForPatch
        log.Infof("PDP received a request to update data through API")
        constructResponseHeader(res, req)
        var requestBody oapicodegen.OPADataUpdateRequest
-       if err := json.NewDecoder(req.Body).Decode(&requestBody); err != nil {
-               errMsg := "Error in decoding the request data - " + err.Error()
+
+       requestBody, err := decodeRequest(req)
+       if err != nil {
+               sendErrorResponse(res, err.Error(), http.StatusBadRequest)
+               return
+       }
+
+       dataDir, dirParts := extractDataDir(req)
+
+       if err := validateRequest(&requestBody); err != nil {
+               sendErrorResponse(res, err.Error(), http.StatusBadRequest)
+               return
+       }
+       log.Debug("All fields are valid!")
+       // Access the data part
+       data := requestBody.Data
+       log.Infof("data : %s", data)
+       policyId := requestBody.PolicyName
+       if policyId == nil {
+               errMsg := "Policy Id is nil"
+               sendErrorResponse(res, errMsg, http.StatusBadRequest)
+               return
+       }
+       log.Infof("policy name : %s", *policyId)
+       isExists := policymap.CheckIfPolicyAlreadyExists(*policyId)
+       if !isExists {
+               errMsg := "Policy associated with the patch request does not exists"
                sendErrorResponse(res, errMsg, http.StatusBadRequest)
                log.Errorf(errMsg)
                return
        }
-       path := strings.TrimPrefix(req.URL.Path, "/policy/pdpo/v1/data")
-       dirParts := strings.Split(path, "/")
-       dataDir := filepath.Join(dirParts...)
-       log.Infof("dataDir : %s", dataDir)
-
-       // Validate the request
-       validationErrors := utils.ValidateOPADataRequest(&requestBody)
 
-       // Validate Data field (ensure it's not nil and has items)
-       if !(utils.IsValidData(requestBody.Data)) {
-               validationErrors = append(validationErrors, "Data is required and cannot be empty")
+       matchFound := validatePolicyDataPathMatched(dirParts, *policyId, res)
+       if !matchFound {
+               return
        }
 
-       // Print validation errors
-       if len(validationErrors) > 0 {
-               errMsg := strings.Join(validationErrors, ", ")
-               log.Errorf("Facing validation error in requestbody - %s", errMsg)
-               sendErrorResponse(res, errMsg, http.StatusBadRequest)
+       patchInfos, err := getPatchInfo(requestBody.Data, dataDir, res)
+       if err != nil {
+               log.Warnf("Failed to get Patch Info : %v", err)
                return
-       } else {
-               log.Debug("All fields are valid!")
-               // Access the data part
-               data := requestBody.Data
-               log.Infof("data : %s", data)
-               policyId := requestBody.PolicyName
-               if policyId == nil {
-                       errMsg := "Policy Id is nil"
-                       sendErrorResponse(res, errMsg, http.StatusBadRequest)
-                       return
-               }
-               log.Infof("policy name : %s", *policyId)
-               isExists := policymap.CheckIfPolicyAlreadyExists(*policyId)
-               if !isExists {
-                       errMsg := "Policy associated with the patch request does not exists"
-                       sendErrorResponse(res, errMsg, http.StatusBadRequest)
-                       log.Errorf(errMsg)
+       }
+
+       if useKafkaForPatch {
+               err := handleDynamicUpdateRequestWithKafka(patchInfos, res)
+               if err != nil {
+                       log.Warnf("Error in handling dynamic update request wit kafka: %v", err)
                        return
                }
+               res.Header().Set(consts.ContentType, consts.ApplicationJson)
+               res.WriteHeader(http.StatusAccepted)
+               _, _ = res.Write([]byte(`{"message": "Patch request accepted for processing via kafka and Use the get data url to fetch the latest data. In case of errors, Check logs."uri":"/policy/pdpo/v1/data/""}`))
+               metrics.IncrementDynamicDataUpdateSuccessCount()
+               return
+       }
+       if err := PatchData(patchInfos, res); err != nil {
+               // Handle the error, for example, log it or return an appropriate response
+               log.Errorf("Error encoding JSON response: %s", err)
+               return
+
+       }
+       metrics.IncrementDynamicDataUpdateSuccessCount()
 
-               matchFound := validatePolicyDataPathMatched(dirParts, *policyId, res)
+}
 
-               if matchFound {
-                       if err := patchData(dataDir, data, res); err != nil {
-                               // Handle the error, for example, log it or return an appropriate response
-                               log.Errorf("Error encoding JSON response: %s", err)
-                       }
-               }
+func decodeRequest(req *http.Request) (oapicodegen.OPADataUpdateRequest, error) {
+       var requestBody oapicodegen.OPADataUpdateRequest
+       if err := json.NewDecoder(req.Body).Decode(&requestBody); err != nil {
+               return requestBody, fmt.Errorf("Error in decoding request data: %v", err)
+       }
+       return requestBody, nil
+}
+
+func extractDataDir(req *http.Request) (string, []string) {
+       path := strings.TrimPrefix(req.URL.Path, "/policy/pdpo/v1/data")
+       dirParts := strings.Split(path, "/")
+       return filepath.Join(dirParts...), dirParts
+}
+
+func validateRequest(requestBody *oapicodegen.OPADataUpdateRequest) error {
+       validationErrors := utils.ValidateOPADataRequest(requestBody)
+       if !utils.IsValidData(requestBody.Data) {
+               validationErrors = append(validationErrors, "Data is required and cannot be empty")
+       }
+       if len(validationErrors) > 0 {
+               return fmt.Errorf(strings.Join(validationErrors, ", "))
+       }
+       return nil
+}
+
+func getPatchInfo(data *[]map[string]interface{}, dataDir string, res http.ResponseWriter) ([]opasdk.PatchImpl, error) {
+       root := "/" + strings.Trim(dataDir, "/")
+       patchInfos, err := extractPatchInfoVar(res, data, root)
+       if patchInfos == nil || err != nil {
+               return nil, fmt.Errorf("Error in extracting Patch Info : %v", err)
        }
+       return patchInfos, nil
+}
+
+func handleDynamicUpdateRequestWithKafka(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error {
+
+       if PatchProducer == nil {
+               log.Warnf("Failed to initialize Kafka producer")
+               return fmt.Errorf("Failed to initialize Kafka producer")
+
+       }
+       sender := &publisher.RealPatchSender{
+               Producer: PatchProducer,
+       }
+       if err := sender.SendPatchMessage(patchInfos); err != nil {
+               log.Warnf("Failed to send Patch Messge, %v", err)
+               return err
+       }
+
+       return nil
 }
 
 func DataHandler(res http.ResponseWriter, req *http.Request) {
@@ -191,11 +262,11 @@ func extractPatchInfo(res http.ResponseWriter, ops *[]map[string]interface{}, ro
                optypeString, opTypeErr := op["op"].(string)
                if !opTypeErr {
                        opTypeErrMsg := "Error in getting op type. Op type is not given in request body"
-                       sendErrorResponse(res, opTypeErrMsg, http.StatusInternalServerError)
+                       sendErrorResponse(res, opTypeErrMsg, http.StatusBadRequest)
                        log.Errorf(opTypeErrMsg)
                        return nil, fmt.Errorf("Error in getting op type. Op type is not given in request body")
                }
-               opType, err := getOperationType(optypeString, res)
+               opType, err := getOperationTypeVar(optypeString, res)
 
                if err != nil {
                        log.Warnf("Error in getting opType: %v", err)
@@ -235,7 +306,7 @@ func getPatchValue(op map[string]interface{}, res http.ResponseWriter) (interfac
        value, valueErr = op["value"]
        if !valueErr || isEmpty(value) {
                valueErrMsg := "Error in getting data value. Value is not given in request body"
-               sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError)
+               sendErrorResponse(res, valueErrMsg, http.StatusBadRequest)
                log.Errorf(valueErrMsg)
                return nil, fmt.Errorf("Error in getting data value. Value is not given in request body")
        }
@@ -298,7 +369,7 @@ func constructPath(opPath string, opType string, root string, res http.ResponseW
                }
        } else {
                valueErrMsg := "Error in getting data path - Invalid path (/) is used."
-               sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError)
+               sendErrorResponse(res, valueErrMsg, http.StatusBadRequest)
                log.Errorf(valueErrMsg)
                return nil
        }
@@ -355,7 +426,7 @@ func constructOpStoragePath(op map[string]interface{}, root string, res http.Res
        opPath, opPathErr := op["path"].(string)
        if !opPathErr || len(opPath) == 0 {
                opPathErrMsg := "Error in getting data path. Path is not given in request body"
-               sendErrorResponse(res, opPathErrMsg, http.StatusInternalServerError)
+               sendErrorResponse(res, opPathErrMsg, http.StatusBadRequest)
                log.Errorf(opPathErrMsg)
                return nil
        }
@@ -389,14 +460,7 @@ type NewOpaSDKPatchFunc func(ctx context.Context, patches []opasdk.PatchImpl) er
 
 var NewOpaSDKPatch NewOpaSDKPatchFunc = opasdk.PatchData
 
-func patchData(root string, ops *[]map[string]interface{}, res http.ResponseWriter) (err error) {
-       root = "/" + strings.Trim(root, "/")
-       patchInfos, err := extractPatchInfoVar(res, ops, root)
-       if err != nil {
-               log.Warnf("Failed to extarct Patch Info")
-               return err
-       }
-
+func PatchData(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) (err error) {
        if patchInfos != nil {
                patchErr := NewOpaSDKPatch(context.Background(), patchInfos)
                if patchErr != nil {
@@ -406,12 +470,16 @@ func patchData(root string, ops *[]map[string]interface{}, res http.ResponseWrit
                                errCode = http.StatusNotFound
                        }
                        errMsg := "Error in updating data - " + patchErr.Error()
-                       sendErrorResponse(res, errMsg, errCode)
+                       if res != nil {
+                               sendErrorResponse(res, errMsg, errCode)
+                       }
                        log.Errorf(errMsg)
                        return patchErr
                }
                log.Infof("Updated the data in the corresponding path successfully\n")
-               res.WriteHeader(http.StatusNoContent)
+               if res != nil {
+                       res.WriteHeader(http.StatusNoContent)
+               }
        }
        // handled all error scenarios in extractPatchInfo method
        return nil
@@ -419,6 +487,7 @@ func patchData(root string, ops *[]map[string]interface{}, res http.ResponseWrit
 
 func sendErrorResponse(res http.ResponseWriter, errMsg string, statusCode int) {
        dataExc := createOPADataUpdateExceptionResponse(statusCode, errMsg, "")
+       metrics.IncrementDynamicDataUpdateFailureCount()
        metrics.IncrementTotalErrorCount()
        writeOPADataUpdateErrorJSONResponse(res, statusCode, errMsg, *dataExc)
 }
@@ -500,7 +569,7 @@ func getData(res http.ResponseWriter, dataPath string) {
                dataResponse.Data = data
        }
 
-       res.Header().Set("Content-Type", "application/json")
+       res.Header().Set(consts.ContentType, consts.ApplicationJson)
        res.WriteHeader(http.StatusOK)
 
        if err := json.NewEncoder(res).Encode(dataResponse); err != nil {
index 819f6d2..6cd5e5c 100644 (file)
@@ -24,7 +24,9 @@ import (
        "encoding/json"
        "errors"
        "fmt"
+       "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        openapi_types "github.com/oapi-codegen/runtime/types"
+       "github.com/open-policy-agent/opa/v1/storage"
        "github.com/stretchr/testify/assert"
        "net/http"
        "net/http/httptest"
@@ -166,7 +168,8 @@ func TestPatchData_failure(t *testing.T) {
        data = nil
        root := "/test"
        res := httptest.NewRecorder()
-       result := patchData(root, &data, res)
+       patchImpl, _ := extractPatchInfo(res, &data, root)
+       result := PatchData(patchImpl, res)
        assert.Nil(t, result)
 }
 
@@ -182,7 +185,8 @@ func TestPatchData_storageFail(t *testing.T) {
 
        root := "/test"
        res := httptest.NewRecorder()
-       result := patchData(root, &data, res)
+       patchImpl, _ := extractPatchInfo(res, &data, root)
+       result := PatchData(patchImpl, res)
        assert.Equal(t, http.StatusNotFound, res.Code)
        assert.Error(t, result)
 }
@@ -194,7 +198,7 @@ func Test_extractPatchInfo_OPTypefail(t *testing.T) {
        root := "/test"
        res := httptest.NewRecorder()
        extractPatchInfo(res, &data, root)
-       assert.Equal(t, http.StatusInternalServerError, res.Code)
+       assert.Equal(t, http.StatusBadRequest, res.Code)
 }
 
 func Test_extractPatchInfo_Pathfail(t *testing.T) {
@@ -204,7 +208,7 @@ func Test_extractPatchInfo_Pathfail(t *testing.T) {
        root := "/test"
        res := httptest.NewRecorder()
        extractPatchInfo(res, &data, root)
-       assert.Equal(t, http.StatusInternalServerError, res.Code)
+       assert.Equal(t, http.StatusBadRequest, res.Code)
 }
 
 func Test_extractPatchInfo_valuefail(t *testing.T) {
@@ -214,7 +218,7 @@ func Test_extractPatchInfo_valuefail(t *testing.T) {
        root := "/test"
        res := httptest.NewRecorder()
        extractPatchInfo(res, &data, root)
-       assert.Equal(t, http.StatusInternalServerError, res.Code)
+       assert.Equal(t, http.StatusBadRequest, res.Code)
 }
 
 func TestPatchData_success(t *testing.T) {
@@ -229,7 +233,8 @@ func TestPatchData_success(t *testing.T) {
 
        root := "/test"
        res := httptest.NewRecorder()
-       patchData(root, &data, res)
+       patchImpl, _ := extractPatchInfo(res, &data, root)
+       PatchData(patchImpl, res)
        assert.Equal(t, http.StatusNoContent, res.Code)
 }
 
@@ -420,6 +425,14 @@ func TestGetData(t *testing.T) {
                        expectedStatus: http.StatusInternalServerError,
                        expectedBody:   "Error in getting data - internal server failure",
                },
+               {
+                       name:           "Error- JSON ENcoding Failure",
+                       requestURL:     "/policy/pdpo/v1/datai/bad/json",
+                       mockResponse:   map[string]interface{}{"bad": make(chan int)},
+                       mockError:      nil,
+                       expectedStatus: http.StatusInternalServerError,
+                       expectedBody:   "Error in getting data - json: unsupported type: chan int",
+               },
        }
 
        for _, tt := range tests {
@@ -675,8 +688,6 @@ func Test_GetPolicyByIDFunc_Success(t *testing.T) {
                return policy, nil
        }
 
-       // Simulating HTTP request and response recorder
-       // req := httptest.NewRequest("GET", "/policy/test-policy", nil)
        res := httptest.NewRecorder()
 
        // Processing dirParts
@@ -882,3 +893,350 @@ func TestPatchInfos_ExtractPatchInfo_Error(t *testing.T) {
                t.Errorf("expected patchInfos to be nil, got: %v", patchInfos)
        }
 }
+
+func TestHandleDynamicUpdateRequestWithKafka_KafkaDisabled_Error(t *testing.T) {
+       PatchDataVar = func(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error {
+               return errors.New("mock error")
+       }
+
+       req := httptest.NewRecorder()
+       patchInfos := []opasdk.PatchImpl{{}}
+
+       handleDynamicUpdateRequestWithKafka(patchInfos, req)
+       // Optionally assert on req.Body or req.Code if needed
+}
+
+// --- Sample PatchImpl for testing ---
+func samplePatchData() []opasdk.PatchImpl {
+       return []opasdk.PatchImpl{
+               {
+                       Path:  storage.MustParsePath("/policy/config/name"),
+                       Op:    storage.ReplaceOp,
+                       Value: "NewPolicyName",
+               },
+       }
+}
+
+var originalExtractPatchInfoVar = extractPatchInfoVar
+
+func TestGetPatchInfo_Success(t *testing.T) {
+       defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }()
+
+       mockPatch := samplePatchData()
+       extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) {
+               return mockPatch, nil
+       }
+
+       res := httptest.NewRecorder()
+
+       data := &[]map[string]interface{}{{"key": "val"}}
+       patches, err := getPatchInfo(data, "/test/dir", res)
+
+       assert.NoError(t, err)
+       assert.Equal(t, mockPatch, patches)
+}
+func TestGetPatchInfo_NilPatchInfos(t *testing.T) {
+       defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }()
+
+       extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) {
+               return nil, nil
+       }
+
+       res := httptest.NewRecorder()
+
+       data := &[]map[string]interface{}{{"key": "val"}}
+       patches, err := getPatchInfo(data, "/test/dir", res)
+
+       assert.Error(t, err)
+       assert.Nil(t, patches)
+}
+func TestGetPatchInfo_ExtractError(t *testing.T) {
+       defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }()
+
+       extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) {
+               return nil, fmt.Errorf("mock error")
+       }
+
+       data := &[]map[string]interface{}{{"key": "val"}}
+       res := httptest.NewRecorder()
+
+       patches, err := getPatchInfo(data, "/test/dir", res)
+
+       assert.Error(t, err)
+       assert.Nil(t, patches)
+}
+
+func TestHandleDynamicUpdateRequestWithKafka_KafkaDisabled_Success(t *testing.T) {
+       // Set test version of PatchDataVar
+       var patchCalled bool
+       PatchDataVar = func(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error {
+               patchCalled = true
+               return nil
+       }
+
+       req := httptest.NewRecorder()
+       patchInfos := []opasdk.PatchImpl{{}}
+
+       handleDynamicUpdateRequestWithKafka(patchInfos, req)
+
+       if patchCalled {
+               t.Errorf("Expected PatchData to be called")
+       }
+}
+
+// MockKafkaProducer implements kafkacomm.KafkaProducerInterface.
+type MockKafkaProducer struct {
+       ProduceCalled bool
+       ProducedMsg   *kafka.Message
+       ProduceErr    error
+       CloseCalled   bool
+       FlushCalled   bool
+       FlushTimeout  int
+}
+
+func (m *MockKafkaProducer) Produce(msg *kafka.Message, events chan kafka.Event) error {
+       m.ProduceCalled = true
+       m.ProducedMsg = msg
+       return m.ProduceErr
+}
+
+func (m *MockKafkaProducer) Close() { m.CloseCalled = true }
+
+func (m *MockKafkaProducer) Flush(timeout int) int {
+       m.FlushCalled = true
+       m.FlushTimeout = timeout
+       return 0
+}
+
+// Test successful Produce through the interface
+func TestHandleDynamicUpdateRequestWithKafka_ProduceSuccess(t *testing.T) {
+       // Arrange
+       patches := samplePatchData()
+       mockProd := &MockKafkaProducer{}
+       PatchProducer = mockProd
+
+       resp := httptest.NewRecorder()
+
+       // Act
+       err := handleDynamicUpdateRequestWithKafka(patches, resp)
+
+       // Assert
+       assert.NoError(t, err)
+       assert.True(t, mockProd.ProduceCalled, "expected Produce to be called")
+
+}
+
+// Test nil interface returns initialization error testing.NamePreamble
+func TestHandleDynamicUpdateRequestWithKafka_ProducerNil(t *testing.T) {
+       // Arrange: clear the global producer
+       PatchProducer = nil
+
+       // Act
+       err := handleDynamicUpdateRequestWithKafka(nil, httptest.NewRecorder())
+
+       // Assert
+       assert.EqualError(t, err, "Failed to initialize Kafka producer")
+}
+
+// Test Produce error is propagated testing.NamePreamble
+func TestHandleDynamicUpdateRequestWithKafka_ProduceError(t *testing.T) {
+       // Arrange
+       mockProd := &MockKafkaProducer{ProduceErr: errors.New("produce failed")}
+       PatchProducer = mockProd
+
+       // Act
+       err := handleDynamicUpdateRequestWithKafka(nil, httptest.NewRecorder())
+
+       // Assert
+       assert.EqualError(t, err, "produce failed")
+       assert.True(t, mockProd.ProduceCalled, "Produce should be called even on error")
+}
+
+type errorWriter struct{}
+
+func (e *errorWriter) Header() http.Header {
+       return http.Header{}
+}
+
+func (e *errorWriter) Write([]byte) (int, error) {
+       return 0, errors.New("write error")
+}
+
+func (e *errorWriter) WriteHeader(statusCode int) {}
+
+func TestWriteOPADataUpdateErrorJSONResponse_EncodeFails(t *testing.T) {
+       mockRes := &errorWriter{}
+
+       respMessage := "Failed to process"
+       respCode := oapicodegen.ErrorResponseResponseCode("500")
+       errorResp := oapicodegen.ErrorResponse{
+               ErrorMessage: &respMessage,
+               ResponseCode: &respCode,
+       }
+
+       // Call the function with the mock writer that fails on encode
+       writeOPADataUpdateErrorJSONResponse(mockRes, http.StatusInternalServerError, "fail", errorResp)
+
+}
+
+func TestConstructPath_BadPatchPath(t *testing.T) {
+       rec := httptest.NewRecorder()
+       storagePath := constructPath("???", "add", "/root", rec)
+
+       assert.NotNil(t, storagePath)
+       assert.Equal(t, http.StatusOK, rec.Code)
+       assert.Contains(t, rec.Body.String(), "")
+}
+
+func TestConstructPath_InvalidPath(t *testing.T) {
+       rec := httptest.NewRecorder()
+       storagePath := constructPath("", "add", "/root", rec)
+
+       assert.Nil(t, storagePath)
+       assert.Equal(t, http.StatusBadRequest, rec.Code)
+       assert.True(t, strings.Contains(rec.Body.String(), "Invalid path"))
+}
+func TestConstructPath_RootSlash(t *testing.T) {
+       rec := httptest.NewRecorder()
+       storagePath := constructPath("sub/path", "add", "/", rec)
+
+       assert.NotNil(t, storagePath)
+       assert.Equal(t, "/sub/path", storagePath.String())
+}
+
+func TestGetDataInfo_EmptyDataPath(t *testing.T) {
+       // Backup original function
+       originalOpaSDKGetDataInfo := NewOpaSDK
+       NewOpaSDK = func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error) {
+               assert.Equal(t, "/", dataPath) // Ensure "/" is passed
+               return nil, errors.New("storage_not_found_error")
+       }
+       defer func() { NewOpaSDK = originalOpaSDKGetDataInfo }()
+
+       // Create a mock request with empty data path
+       req := httptest.NewRequest("GET", "/policy/pdpo/v1/data", nil)
+       res := httptest.NewRecorder()
+
+       // Call the function under test
+       getDataInfo(res, req)
+
+       // Validate response
+       assert.Equal(t, http.StatusNotFound, res.Code)
+       errorMessage := strings.TrimSpace(res.Body.String())
+       assert.Contains(t, errorMessage, "storage_not_found_error")
+}
+
+func TestDataHandler_GET_Success(t *testing.T) {
+       original := NewOpaSDK
+       defer func() { NewOpaSDK = original }()
+
+       NewOpaSDK = func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error) {
+               assert.Equal(t, "/some/path", dataPath)
+               return &oapicodegen.OPADataResponse_Data{}, nil
+       }
+
+       req := httptest.NewRequest(http.MethodGet, "/policy/pdpo/v1/data/some/path", nil)
+       w := httptest.NewRecorder()
+
+       DataHandler(w, req) // <---- Only this
+
+       res := w.Result()
+       defer res.Body.Close()
+
+       if res.StatusCode != http.StatusOK {
+               t.Errorf("expected status 200 OK, got %d", res.StatusCode)
+       }
+}
+
+func TestExtractPatchInfo_OperationTypeError(t *testing.T) {
+       // Arrange
+       reqOps := []map[string]interface{}{
+               {
+                       "op": "invalidOp", // simulate invalid op
+               },
+       }
+       w := httptest.NewRecorder()
+
+       // Mock
+       original := getOperationTypeVar
+       defer func() { getOperationTypeVar = original }()
+       getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) {
+               return nil, fmt.Errorf("forced error") // force error
+       }
+
+       // Act
+       result, err := extractPatchInfo(w, &reqOps, "/root")
+
+       // Assert
+       assert.Nil(t, result)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "operation type")
+}
+
+func TestExtractPatchInfo_InvalidOpFieldType(t *testing.T) {
+       // Arrange
+       reqOps := []map[string]interface{}{
+               {
+                       "wrongField": "add", // no "op" field
+               },
+       }
+       w := httptest.NewRecorder()
+
+       // Act
+       result, err := extractPatchInfo(w, &reqOps, "/root")
+
+       // Assert
+       assert.Nil(t, result)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "op type")
+}
+
+func TestExtractPatchInfo_GetOperationTypeError(t *testing.T) {
+       // Arrange
+       reqOps := []map[string]interface{}{
+               {
+                       "op": "invalidOp",
+               },
+       }
+       w := httptest.NewRecorder()
+
+       // Mock getOperationTypeVar to simulate error
+       original := getOperationTypeVar
+       defer func() { getOperationTypeVar = original }()
+       getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) {
+               return nil, errors.New("mock getOperationType error")
+       }
+
+       // Act
+       result, err := extractPatchInfo(w, &reqOps, "/root")
+
+       // Assert
+       assert.Nil(t, result)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "operation type")
+}
+
+func TestExtractPatchInfo_NilOpType(t *testing.T) {
+       // Arrange
+       reqOps := []map[string]interface{}{
+               {
+                       "op": "add",
+               },
+       }
+       w := httptest.NewRecorder()
+
+       // Mock getOperationTypeVar to return nil
+       original := getOperationTypeVar
+       defer func() { getOperationTypeVar = original }()
+       getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) {
+               return nil, nil // returning nil without error
+       }
+
+       // Act
+       result, err := extractPatchInfo(w, &reqOps, "/root")
+
+       // Assert
+       assert.Nil(t, result)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "opType is Missing")
+}
diff --git a/pkg/kafkacomm/handler/patch_message_handler.go b/pkg/kafkacomm/handler/patch_message_handler.go
new file mode 100644 (file)
index 0000000..d537e61
--- /dev/null
@@ -0,0 +1,73 @@
+// -
+//   ========================LICENSE_START=================================
+//   Copyright (C) 2025: Deutsche Telekom
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   SPDX-License-Identifier: Apache-2.0
+//   ========================LICENSE_END===================================
+//
+
+package handler
+
+import (
+       "context"
+       "encoding/json"
+       "policy-opa-pdp/pkg/data"
+       "policy-opa-pdp/pkg/kafkacomm"
+       "policy-opa-pdp/pkg/log"
+       "policy-opa-pdp/pkg/opasdk"
+)
+
+type PatchMessage struct {
+       PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+}
+
+// This function handles the incoming kafka messages and dispatches them futher for data patch processing.
+func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error {
+
+       log.Debug("Starting Patch Message Listener.....")
+       var stopConsuming bool
+       for !stopConsuming {
+               select {
+               case <-ctx.Done():
+                       log.Debug("Stopping PDP Listener.....")
+                       return nil
+                       stopConsuming = true ///Loop Exits
+               default:
+                       message, err := kafkacomm.ReadKafkaMessages(kc)
+                       if err != nil {
+                               continue
+                       }
+                       log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message))
+
+                       if message != nil {
+                               var patchMsg PatchMessage
+                               err = json.Unmarshal(message, &patchMsg)
+                               if err != nil {
+                                       log.Warnf("Failed to UnMarshal Messages: %v\n", err)
+                                       continue
+                               }
+                               log.Debugf("Received patch request")
+
+                               if err := data.PatchDataVar(patchMsg.PatchInfos, nil); err != nil {
+                                       log.Debugf("patchData failed: %v", err)
+                               } else {
+                                       log.Debugf("Successfully patched data")
+                               }
+                       }
+               }
+
+       }
+       return nil
+
+}
diff --git a/pkg/kafkacomm/handler/patch_message_handler_test.go b/pkg/kafkacomm/handler/patch_message_handler_test.go
new file mode 100644 (file)
index 0000000..93114d9
--- /dev/null
@@ -0,0 +1,153 @@
+// -
+//   ========================LICENSE_START=================================
+//   Copyright (C) 2025: Deutsche Telekom
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   SPDX-License-Identifier: Apache-2.0
+//   ========================LICENSE_END===================================
+//
+
+package handler
+
+import (
+       "context"
+       "encoding/json"
+       "errors"
+       "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+       "github.com/open-policy-agent/opa/v1/storage"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/mock"
+       "net/http"
+       "policy-opa-pdp/pkg/data"
+       "policy-opa-pdp/pkg/kafkacomm"
+       "policy-opa-pdp/pkg/opasdk"
+       "testing"
+       "time"
+)
+
+// --- Sample PatchImpl for testing ---
+func samplePatchData() []opasdk.PatchImpl {
+       return []opasdk.PatchImpl{
+               {
+                       Path:  storage.MustParsePath("/policy/config/name"),
+                       Op:    storage.ReplaceOp,
+                       Value: "NewPolicyName",
+               },
+       }
+}
+
+var originalPatchDataVar = data.PatchDataVar
+
+func TestPatchMessageHandler_Success(t *testing.T) {
+       defer func() { data.PatchDataVar = originalPatchDataVar }()
+
+       // Mock PatchDataVar to simulate success
+       data.PatchDataVar = func(patchInfos []opasdk.PatchImpl, _ http.ResponseWriter) error {
+               return nil
+       }
+
+       msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()})
+
+       mockKafkaMessage := &kafka.Message{
+               Value: []byte(msgBytes),
+       }
+       mockConsumer := new(MockKafkaConsumer)
+       mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil)
+
+       mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+               Consumer: mockConsumer,
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+       defer cancel()
+
+       err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+       assert.NoError(t, err)
+       mockConsumer.AssertExpectations(t)
+}
+
+func TestPatchMessageHandler_PatchFail(t *testing.T) {
+       defer func() { data.PatchDataVar = originalPatchDataVar }()
+
+       data.PatchDataVar = func(patchInfos []opasdk.PatchImpl, _ http.ResponseWriter) error {
+               return errors.New("mock failure")
+       }
+
+       msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()})
+
+       mockKafkaMessage := &kafka.Message{
+               Value: []byte(msgBytes),
+       }
+
+       mockConsumer := new(MockKafkaConsumer)
+       mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil)
+
+       mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+               Consumer: mockConsumer,
+       }
+
+       ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+       defer cancel()
+
+       err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+       assert.NoError(t, err)
+       mockConsumer.AssertExpectations(t)
+}
+
+func TestPatchMessageHandler_ReadError(t *testing.T) {
+       defer func() { data.PatchDataVar = originalPatchDataVar }()
+
+       mockConsumer := new(MockKafkaConsumer)
+       mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).
+               Return(nil, errors.New("read error"))
+
+       mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer}
+
+       ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+       defer cancel()
+
+       err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+       assert.NoError(t, err)
+       mockConsumer.AssertExpectations(t)
+}
+
+func TestPatchMessageHandler_UnmarshalFail(t *testing.T) {
+       defer func() { data.PatchDataVar = originalPatchDataVar }()
+
+       invalidJSON := []byte(`invalid json`)
+       mockKafkaMessage := &kafka.Message{Value: invalidJSON}
+
+       mockConsumer := new(MockKafkaConsumer)
+       mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil)
+
+       mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer}
+
+       ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+       defer cancel()
+
+       err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+       assert.NoError(t, err)
+       mockConsumer.AssertExpectations(t)
+}
+
+func TestPatchMessageHandler_ContextDone(t *testing.T) {
+       mockConsumer := new(MockKafkaConsumer)
+       mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer}
+
+       // Context is cancelled immediately
+       ctx, cancel := context.WithCancel(context.Background())
+       cancel()
+
+       err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+       assert.NoError(t, err)
+}
index 594498d..7a3b0af 100644 (file)
@@ -35,7 +35,7 @@ import (
 )
 
 type KafkaConsumerInterface interface {
-       ReadMessage() ([]byte, error)
+       ReadMessage(time.Duration) ([]byte, error)
        ReadKafkaMessages() ([]byte, error)
 }
 
@@ -43,17 +43,23 @@ type MockKafkaConsumer struct {
        mock.Mock
 }
 
-func (m *MockKafkaConsumer) Unsubscribe() {
-       m.Called()
+func (m *MockKafkaConsumer) Unsubscribe() error {
+       args := m.Called()
+       return args.Error(0)
 }
 
-func (m *MockKafkaConsumer) Close() {
-       m.Called()
+func (m *MockKafkaConsumer) Close() error {
+       args := m.Called()
+       return args.Error(0)
 }
 
-func (m *MockKafkaConsumer) ReadMessage(kc *kafkacomm.KafkaConsumer) ([]byte, error) {
-       args := m.Called(kc)
-       return args.Get(0).([]byte), args.Error(0)
+func (m *MockKafkaConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+       args := m.Called(timeout)
+       msg := args.Get(0)
+       if msg == nil {
+               return nil, args.Error(1)
+       }
+       return msg.(*kafka.Message), args.Error(1)
 }
 
 func (m *MockKafkaConsumer) pdpUpdateMessageHandler(msg string) error {
index 5c07ddd..54d33be 100644 (file)
@@ -221,7 +221,6 @@ func extractAndDecodeData(policy model.ToscaPolicy) (map[string]string, []string
        return decodedData, keys, nil
 }
 
-
 // upsert policy to sdk.
 func upsertPolicy(policy model.ToscaPolicy) error {
        decodedContent, keys, _ := extractAndDecodePoliciesVar(policy)
index 47efdef..8ca0c29 100644 (file)
@@ -22,6 +22,7 @@ import (
        "context"
        "encoding/base64"
        "errors"
+       "fmt"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/mock"
        "github.com/stretchr/testify/require"
@@ -35,7 +36,6 @@ import (
        "policy-opa-pdp/pkg/utils"
        "strings"
        "testing"
-       "fmt"
 )
 
 func TestValidatePackageName(t *testing.T) {
index 77475d6..550eb98 100644 (file)
@@ -52,7 +52,6 @@ var (
        handlePdpUpdateDeploymentVar   handlePdpUpdateDeploymentFunc   = handlePdpUpdateDeployment
        handlePdpUpdateUndeploymentVar handlePdpUpdateUndeploymentFunc = handlePdpUpdateUndeployment
        sendPDPStatusResponseFunc                                      = sendPDPStatusResponse
-
 )
 
 // Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP).
@@ -140,8 +139,8 @@ func handlePdpUpdateDeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusS
                        failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
                        resMessage := fmt.Errorf("PDP Update Failed as failed to format successfullyDeployedPolicies json %v", failureMessages)
                        if err = sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
-       log.Debugf("Failed to send update internal map  error response: %v", err)
-       return "", err, failureMessages
+                               log.Debugf("Failed to send update internal map  error response: %v", err)
+                               return "", err, failureMessages
                        }
                }
 
@@ -167,8 +166,8 @@ func handlePdpUpdateUndeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatu
                        failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
                        resMessage := fmt.Errorf("PDP Update Failed as failed to format successfullyUnDeployedPolicies json %v", failureMessages)
                        if err = sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
-                               log.Debugf("Failed to send update error response: %v", err)
-                               return "", err, failureMessages
+                               log.Debugf("Failed to send update error response: %v", err)
+                               return "", err, failureMessages
                        }
                }
        }
index 9936c6c..9b0412f 100644 (file)
@@ -320,10 +320,10 @@ func TestPolicyUndeploymentAction(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        // Set mock behavior
                        removePolicyFromSdkandDirFunc = func(policy map[string]interface{}) []string {
-       return tt.mockPolicyErrors
+                               return tt.mockPolicyErrors
                        }
                        removeDataFromSdkandDirFunc = func(policy map[string]interface{}) []string {
-       return tt.mockDataErrors
+                               return tt.mockDataErrors
                        }
 
                        // Call the function under test
@@ -395,58 +395,58 @@ func TestCountChildKeysFromJSON(t *testing.T) {
                        name:     "Empty JSON",
                        input:    map[string]interface{}{},
                        expected: map[string]int{
-       // No child nodes
+                               // No child nodes
                        },
                },
                {
                        name: "Single Level JSON",
                        input: map[string]interface{}{
-       "key1": map[string]interface{}{
-               "child1": "value1",
-               "child2": "value2",
-       },
-       "key2": map[string]interface{}{
-               "childA": "valueA",
-       },
+                               "key1": map[string]interface{}{
+                                       "child1": "value1",
+                                       "child2": "value2",
+                               },
+                               "key2": map[string]interface{}{
+                                       "childA": "valueA",
+                               },
                        },
                        expected: map[string]int{
-       "node/key1": 2, // key1 has 2 children
-       "node/key2": 1, // key2 has 1 child
+                               "node/key1": 2, // key1 has 2 children
+                               "node/key2": 1, // key2 has 1 child
                        },
                },
                {
                        name: "Nested JSON",
                        input: map[string]interface{}{
-       "root": map[string]interface{}{
-               "level1": map[string]interface{}{
-                       "level2": map[string]interface{}{
-                               "child1": "value1",
-                               "child2": "value2",
-                       },
-               },
-       },
+                               "root": map[string]interface{}{
+                                       "level1": map[string]interface{}{
+                                               "level2": map[string]interface{}{
+                                                       "child1": "value1",
+                                                       "child2": "value2",
+                                               },
+                                       },
+                               },
                        },
                        expected: map[string]int{
-       "node/root":               1, // root has 1 child (level1)
-       "node/root/level1":        1, // level1 has 1 child (level2)
-       "node/root/level1/level2": 2, // level2 has 2 children
+                               "node/root":               1, // root has 1 child (level1)
+                               "node/root/level1":        1, // level1 has 1 child (level2)
+                               "node/root/level1/level2": 2, // level2 has 2 children
                        },
                },
                {
                        name: "Mixed Data Types",
                        input: map[string]interface{}{
-       "parent": map[string]interface{}{
-               "child1": "string",
-               "child2": 42,
-               "child3": map[string]interface{}{
-                       "subchild1": true,
-                       "subchild2": nil,
-               },
-       },
+                               "parent": map[string]interface{}{
+                                       "child1": "string",
+                                       "child2": 42,
+                                       "child3": map[string]interface{}{
+                                               "subchild1": true,
+                                               "subchild2": nil,
+                                       },
+                               },
                        },
                        expected: map[string]int{
-       "node/parent":        3, // parent has 3 children
-       "node/parent/child3": 2, // child3 has 2 children
+                               "node/parent":        3, // parent has 3 children
+                               "node/parent/child3": 2, // child3 has 2 children
                        },
                },
        }
@@ -455,7 +455,7 @@ func TestCountChildKeysFromJSON(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        got := countChildKeysFromJSON(tt.input)
                        if !reflect.DeepEqual(got, tt.expected) {
-       t.Errorf("countChildKeysFromJSON() = %v, expected %v", got, tt.expected)
+                               t.Errorf("countChildKeysFromJSON() = %v, expected %v", got, tt.expected)
                        }
                })
        }
@@ -536,10 +536,10 @@ func TestAnalyzeHierarchy(t *testing.T) {
                        result, err := analyzeHierarchy(tt.parentDataJson, tt.dataPath)
 
                        if tt.expectedErr {
-       assert.Error(t, err)
+                               assert.Error(t, err)
                        } else {
-       assert.NoError(t, err)
-       assert.Equal(t, tt.expectedPath, result)
+                               assert.NoError(t, err)
+                               assert.Equal(t, tt.expectedPath, result)
                        }
                })
        }
@@ -580,7 +580,7 @@ func TestAnalyseEmptyParentNodes(t *testing.T) {
                        name:      "Success - Valid Parent Data Exists",
                        inputPath: "/parent/child",
                        mockResponse: map[string]interface{}{
-       "child": "data",
+                               "child": "data",
                        },
                        mockError:      nil,
                        expectedOutput: "/parent/child",
@@ -601,13 +601,13 @@ func TestAnalyseEmptyParentNodes(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        // Mock function behavior
                        opasdkGetData = func(ctx context.Context, dataPath string) (*oapicodegen.OPADataResponse_Data, error) {
-       if tt.mockResponse != nil {
-               jsonData, _ := json.Marshal(tt.mockResponse)
-               var resData oapicodegen.OPADataResponse_Data
-               _ = json.Unmarshal(jsonData, &resData)
-               return &resData, tt.mockError
-       }
-       return nil, tt.mockError
+                               if tt.mockResponse != nil {
+                                       jsonData, _ := json.Marshal(tt.mockResponse)
+                                       var resData oapicodegen.OPADataResponse_Data
+                                       _ = json.Unmarshal(jsonData, &resData)
+                                       return &resData, tt.mockError
+                               }
+                               return nil, tt.mockError
                        }
 
                        // Call function
@@ -615,10 +615,10 @@ func TestAnalyseEmptyParentNodes(t *testing.T) {
 
                        // Validate results
                        if tt.expectError {
-       assert.Error(t, err, "Expected error but got none")
+                               assert.Error(t, err, "Expected error but got none")
                        } else {
-       assert.NoError(t, err, "Expected no error but got one")
-       assert.Equal(t, tt.expectedOutput, output, "Unexpected output")
+                               assert.NoError(t, err, "Expected no error but got one")
+                               assert.Equal(t, tt.expectedOutput, output, "Unexpected output")
                        }
                })
        }
@@ -679,13 +679,13 @@ func TestProcessDataDeletionFromSdkAndDir(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        // Mock function variables
                        analyseEmptyParentNodesFunc = func(dataPath string) (string, error) {
-       return dataPath, tt.mockAnalyseErr
+                               return dataPath, tt.mockAnalyseErr
                        }
                        deleteDataSdkFunc = func(ctx context.Context, dataPath string) error {
-       return tt.mockDeleteErr
+                               return tt.mockDeleteErr
                        }
                        removeDataDirectoryFunc = func(keyPath string) error {
-       return tt.mockRemoveErr
+                               return tt.mockRemoveErr
                        }
 
                        // Call function
@@ -693,7 +693,7 @@ func TestProcessDataDeletionFromSdkAndDir(t *testing.T) {
 
                        // Normalize nil vs empty slice
                        if failureMessages == nil {
-       failureMessages = []string{}
+                               failureMessages = []string{}
                        }
 
                        // Validate results
@@ -724,22 +724,22 @@ func TestRemoveDataFromSdkandDir(t *testing.T) {
                {
                        name: "Valid data keys",
                        policy: map[string]interface{}{
-       "data": []interface{}{"policy.rule1", "policy.rule2"},
+                               "data": []interface{}{"policy.rule1", "policy.rule2"},
                        },
                        expectedFailures: nil,
                },
                {
                        name: "Invalid data key type",
                        policy: map[string]interface{}{
-       "data": []interface{}{"policy.rule1", 123}, // Invalid integer key
+                               "data": []interface{}{"policy.rule1", 123}, // Invalid integer key
                        },
                        expectedFailures: []string{"Invalid Key :123"},
                },
                {
                        name: "Invalid JSON structure",
                        policy: map[string]interface{}{
-       "policyId":      "test-policy",
-       "policyVersion": "1.0",
+                               "policyId":      "test-policy",
+                               "policyVersion": "1.0",
                        },
                        expectedFailures: []string{": Invalid JSON structure: 'data' is missing or not an array"},
                },
@@ -747,7 +747,7 @@ func TestRemoveDataFromSdkandDir(t *testing.T) {
                {
                        name: "Deletion failure",
                        policy: map[string]interface{}{
-       "data": []interface{}{"invalid.path"},
+                               "data": []interface{}{"invalid.path"},
                        },
                        expectedFailures: []string{"Failed to delete: /invalid/path"},
                },
index a16bd4a..ae56e21 100644 (file)
@@ -25,14 +25,12 @@ import (
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        "policy-opa-pdp/cfg"
        "policy-opa-pdp/pkg/log"
-       "sync"
        "time"
 )
 
 var (
        // Declare a global variable to hold the singleton KafkaConsumer
        consumerInstance *KafkaConsumer
-       consumerOnce     sync.Once // sync.Once ensures that the consumer is created only once
 )
 
 // KafkaConsumerInterface defines the interface for a Kafka consumer.
@@ -76,63 +74,59 @@ type KafkaNewConsumerFunc func(*kafka.ConfigMap) (*kafka.Consumer, error)
 var KafkaNewConsumer KafkaNewConsumerFunc = kafka.NewConsumer
 
 // NewKafkaConsumer creates a new Kafka consumer and returns
-func NewKafkaConsumer() (*KafkaConsumer, error) {
+func NewKafkaConsumer(topic string, groupid string) (*KafkaConsumer, error) {
        // Initialize the consumer instance only once
-       consumerOnce.Do(func() {
-               log.Debugf("Creating Kafka Consumer singleton instance")
-               brokers := cfg.BootstrapServer
-               groupid := cfg.GroupId
-               topic := cfg.Topic
-               useSASL := cfg.UseSASLForKAFKA
-               username := cfg.KAFKA_USERNAME
-               password := cfg.KAFKA_PASSWORD
-
-               // Add Kafka connection properties
-               configMap := &kafka.ConfigMap{
-                       "bootstrap.servers": brokers,
-                       "group.id":          groupid,
-                       "auto.offset.reset": "latest",
-               }
-               fmt.Print(configMap)
-               // If SASL is enabled, add SASL properties
-               if useSASL == "true" {
-                       configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")           // #nosec G104
-                       configMap.SetKey("sasl.username", username)                   // #nosec G104
-                       configMap.SetKey("sasl.password", password)                   // #nosec G104
-                       configMap.SetKey("security.protocol", "SASL_PLAINTEXT")       // #nosec G104
-                       configMap.SetKey("fetch.max.bytes", 50*1024*1024)             // #nosec G104
-                       configMap.SetKey("max.partition.fetch.bytes", 50*1024*1024)   // #nosec G104
-                       configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) // #nosec G104
-                       configMap.SetKey("session.timeout.ms", "30000")               // #nosec G104
-                       configMap.SetKey("max.poll.interval.ms", "300000")            // #nosec G104
-                       configMap.SetKey("enable.partition.eof", true)                // #nosec G104
-                       configMap.SetKey("enable.auto.commit", true)                  // #nosec G104
-                       // configMap.SetKey("debug", "all") // Uncomment for debug
-               }
+       log.Debugf("Creating Kafka Consumer singleton instance")
+       brokers := cfg.BootstrapServer
+       useSASL := cfg.UseSASLForKAFKA
+       username := cfg.KAFKA_USERNAME
+       password := cfg.KAFKA_PASSWORD
+
+       // Add Kafka connection properties
+       configMap := &kafka.ConfigMap{
+               "bootstrap.servers": brokers,
+               "group.id":          groupid,
+               "auto.offset.reset": "latest",
+       }
+       fmt.Print(configMap)
+       // If SASL is enabled, add SASL properties
+       if useSASL == "true" {
+               configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")           // #nosec G104
+               configMap.SetKey("sasl.username", username)                   // #nosec G104
+               configMap.SetKey("sasl.password", password)                   // #nosec G104
+               configMap.SetKey("security.protocol", "SASL_PLAINTEXT")       // #nosec G104
+               configMap.SetKey("fetch.max.bytes", 50*1024*1024)             // #nosec G104
+               configMap.SetKey("max.partition.fetch.bytes", 50*1024*1024)   // #nosec G104
+               configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) // #nosec G104
+               configMap.SetKey("session.timeout.ms", "30000")               // #nosec G104
+               configMap.SetKey("max.poll.interval.ms", "300000")            // #nosec G104
+               configMap.SetKey("enable.partition.eof", true)                // #nosec G104
+               configMap.SetKey("enable.auto.commit", true)                  // #nosec G104
+               // configMap.SetKey("debug", "all") // Uncomment for debug
+       }
 
-               // Create a new Kafka consumer
-               consumer, err := KafkaNewConsumer(configMap)
-               if err != nil {
-                       log.Warnf("Error creating consumer: %v", err)
-                       return
-               }
-               if consumer == nil {
-                       log.Warnf("Kafka Consumer is nil after creation")
-                       return
-               }
+       // Create a new Kafka consumer
+       consumer, err := KafkaNewConsumer(configMap)
+       if err != nil {
+               log.Warnf("Error creating consumer: %v", err)
+               return nil, fmt.Errorf("error creating consumer: %w", err)
+       }
+       if consumer == nil {
+               log.Warnf("Kafka Consumer is nil after creation")
+               return nil, fmt.Errorf("Kafka Consumer is nil after creation")
+       }
 
-               // Subscribe to the topic
-               err = consumer.SubscribeTopics([]string{topic}, nil)
-               if err != nil {
-                       log.Warnf("Error subscribing to topic: %v", err)
-                       return
-               }
-               log.Debugf("Topic Subscribed: %v", topic)
+       // Subscribe to the topic
+       err = consumer.SubscribeTopics([]string{topic}, nil)
+       if err != nil {
+               log.Warnf("Error subscribing to topic: %v", err)
+               return nil, fmt.Errorf("error subscribing to topic: %w", err)
+       }
+       log.Debugf("Topic Subscribed: %v", topic)
 
-               // Assign the consumer instance
-               consumerInstance = &KafkaConsumer{Consumer: consumer}
-               log.Debugf("Created SIngleton consumer instance")
-       })
+       // Assign the consumer instance
+       consumerInstance = &KafkaConsumer{Consumer: consumer}
+       log.Debugf("Created SIngleton consumer instance")
 
        // Return the singleton consumer instance
        if consumerInstance == nil {
index 36a2aa4..48575df 100644 (file)
@@ -27,7 +27,6 @@ import (
        "github.com/stretchr/testify/mock"
        "policy-opa-pdp/cfg"
        "policy-opa-pdp/pkg/kafkacomm/mocks"
-       "sync"
        "testing"
 )
 
@@ -58,7 +57,7 @@ func TestNewKafkaConsumer_SASLTest(t *testing.T) {
 
        mockConsumer := new(MockKafkaConsumer)
 
-       consumer, err := NewKafkaConsumer()
+       consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId)
 
        assert.NoError(t, err)
        assert.NotNil(t, consumer)
@@ -69,7 +68,7 @@ func TestNewKafkaConsumer(t *testing.T) {
        // Assuming configuration is correctly loaded from cfg package
        // You can mock or override cfg values here if needed
 
-       consumer, err := NewKafkaConsumer()
+       consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId)
        assert.NoError(t, err, "Expected no error when creating Kafka consumer")
        assert.NotNil(t, consumer, "Expected a non-nil KafkaConsumer")
 
@@ -191,7 +190,6 @@ func TestKafkaConsumer_Unsubscribe_Nil_Error(t *testing.T) {
 
 // Helper function to reset
 func resetKafkaConsumerSingleton() {
-       consumerOnce = sync.Once{}
        consumerInstance = nil
 }
 
@@ -203,9 +201,9 @@ func TestNewKafkaConsumer_ErrorCreatingConsumer(t *testing.T) {
                return nil, fmt.Errorf("mock error creating consumer")
        }
 
-       consumer, err := NewKafkaConsumer()
+       consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId)
        assert.Nil(t, consumer)
-       assert.EqualError(t, err, "Kafka Consumer instance not created")
+       assert.EqualError(t, err, "error creating consumer: mock error creating consumer")
        KafkaNewConsumer = originalNewKafkaConsumer
 }
 
@@ -217,8 +215,8 @@ func TestNewKafkaConsumer_NilConsumer(t *testing.T) {
                return nil, nil
        }
 
-       consumer, err := NewKafkaConsumer()
+       consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId)
        assert.Nil(t, consumer)
-       assert.EqualError(t, err, "Kafka Consumer instance not created")
+       assert.EqualError(t, err, "Kafka Consumer is nil after creation")
        KafkaNewConsumer = originalNewKafkaConsumer
 }
index 8685a34..bc8ce42 100644 (file)
@@ -25,7 +25,6 @@ import (
        "github.com/confluentinc/confluent-kafka-go/v2/kafka"
        "log"
        "policy-opa-pdp/cfg"
-       "sync"
 )
 
 type KafkaProducerInterface interface {
@@ -43,7 +42,6 @@ type KafkaProducer struct {
 
 var (
        instance *KafkaProducer
-       once     sync.Once
 )
 
 // GetKafkaProducer initializes and returns a KafkaProducer instance which is a singleton.
@@ -51,43 +49,46 @@ var (
 // If SASL authentication is enabled via the configuration, the necessary credentials
 // are set in the producer configuration.
 //
-//nolint:gosec
+
 func GetKafkaProducer(bootstrapServers, topic string) (*KafkaProducer, error) {
        var err error
-       once.Do(func() {
-               brokers := cfg.BootstrapServer
-               useSASL := cfg.UseSASLForKAFKA
-               username := cfg.KAFKA_USERNAME
-               password := cfg.KAFKA_PASSWORD
-
-               // Add Kafka Connection Properties ....
-               configMap := &kafka.ConfigMap{
-                       "bootstrap.servers": brokers,
-               }
+       instance, err = initializeKafkaProducer(topic)
+       return instance, err
+}
 
-               if useSASL == "true" {
-                       configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")     // #nosec G104
-                       configMap.SetKey("sasl.username", username)             // #nosec G104
-                       configMap.SetKey("sasl.password", password)             // #nosec G104
-                       configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
-               }
+//nolint:gosec
+func initializeKafkaProducer(topic string) (*KafkaProducer, error) {
+       brokers := cfg.BootstrapServer
+       useSASL := cfg.UseSASLForKAFKA
+       username := cfg.KAFKA_USERNAME
+       password := cfg.KAFKA_PASSWORD
 
-               p, err := kafka.NewProducer(configMap)
-               if err != nil {
-                       return
-               }
-               instance = &KafkaProducer{
-                       producer: p,
-                       topic:    topic,
-               }
+       configMap := &kafka.ConfigMap{
+               "bootstrap.servers": brokers,
+       }
 
-       })
-       return instance, err
+       if useSASL == "true" {
+               configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512")     // #nosec G104
+               configMap.SetKey("sasl.username", username)             // #nosec G104
+               configMap.SetKey("sasl.password", password)             // #nosec G104
+               configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
+       }
+
+       p, err := kafka.NewProducer(configMap)
+       if err != nil {
+               return nil, err
+       }
+
+       return &KafkaProducer{
+               producer: p,
+               topic:    topic,
+       }, nil
 }
 
 // Produce sends a message to the configured Kafka topic.
 // It takes the message payload as a byte slice and returns any errors
 func (kp *KafkaProducer) Produce(kafkaMessage *kafka.Message, eventChan chan kafka.Event) error {
+       log.Println("KafkaProducer or producer produce message")
        if kafkaMessage.TopicPartition.Topic == nil {
                kafkaMessage.TopicPartition = kafka.TopicPartition{
                        Topic:     &kp.topic,
diff --git a/pkg/kafkacomm/publisher/patch_message_publisher.go b/pkg/kafkacomm/publisher/patch_message_publisher.go
new file mode 100644 (file)
index 0000000..2be1655
--- /dev/null
@@ -0,0 +1,70 @@
+// -
+//   ========================LICENSE_START=================================
+//   Copyright (C) 2025: Deutsche Telekom
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   SPDX-License-Identifier: Apache-2.0
+//   ========================LICENSE_END===================================
+//
+
+package publisher
+
+import (
+       "encoding/json"
+       "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+       "policy-opa-pdp/cfg"
+       "policy-opa-pdp/pkg/kafkacomm"
+       "policy-opa-pdp/pkg/log"
+       "policy-opa-pdp/pkg/opasdk"
+)
+
+type RealPatchSender struct {
+       Producer kafkacomm.KafkaProducerInterface
+}
+
+type PatchKafkaPayload struct {
+       PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+}
+
+func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error {
+       log.Debugf("In SendPatchMessage")
+       var topic string
+       topic = cfg.PatchTopic
+       kafkaPayload := PatchKafkaPayload{
+               PatchInfos: patchInfos,
+       }
+
+       jsonMessage, err := json.Marshal(kafkaPayload)
+       if err != nil {
+               log.Warnf("failed to marshal Patch Payload to JSON: %v", err)
+               return err
+       }
+
+       kafkaMessage := &kafka.Message{
+               TopicPartition: kafka.TopicPartition{
+                       Topic:     &topic,
+                       Partition: kafka.PartitionAny,
+               },
+               Value: jsonMessage,
+       }
+       var eventChan chan kafka.Event = nil
+       err = s.Producer.Produce(kafkaMessage, eventChan)
+       if err != nil {
+               log.Warnf("Error producing message: %v\n", err)
+               return err
+       } else {
+               log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage))
+       }
+
+       return nil
+}
diff --git a/pkg/kafkacomm/publisher/patch_message_publisher_test.go b/pkg/kafkacomm/publisher/patch_message_publisher_test.go
new file mode 100644 (file)
index 0000000..bff781d
--- /dev/null
@@ -0,0 +1,109 @@
+// -
+//   ========================LICENSE_START=================================
+//   Copyright (C) 2025: Deutsche Telekom
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   SPDX-License-Identifier: Apache-2.0
+//   ========================LICENSE_END===================================
+//
+
+package publisher
+
+import (
+       "encoding/json"
+       "errors"
+       "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+       "github.com/open-policy-agent/opa/v1/storage"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/mock"
+       "policy-opa-pdp/pkg/opasdk"
+       "testing"
+)
+
+// --- Sample PatchImpl for testing ---
+func samplePatchData() []opasdk.PatchImpl {
+       return []opasdk.PatchImpl{
+               {
+                       Path:  storage.MustParsePath("/policy/config/name"),
+                       Op:    storage.ReplaceOp,
+                       Value: "NewPolicyName",
+               },
+       }
+}
+
+// --- Helper to get mock sender ---
+func getMockSender() (*RealPatchSender, *MockKafkaProducer) {
+       mockProducer := new(MockKafkaProducer)
+       sender := &RealPatchSender{
+               Producer: mockProducer,
+       }
+       return sender, mockProducer
+}
+
+// --- Test: Successful message send ---
+func TestSendPatchMessage_Success(t *testing.T) {
+       sender, mockProducer := getMockSender()
+
+       mockProducer.On("Produce", mock.Anything).Return(nil)
+
+       err := sender.SendPatchMessage(samplePatchData())
+       assert.NoError(t, err)
+       mockProducer.AssertExpectations(t)
+}
+
+// --- Test: Kafka produce failure ---
+func TestSendPatchMessage_ProduceError(t *testing.T) {
+       sender, mockProducer := getMockSender()
+
+       mockProducer.On("Produce", mock.Anything).Return(errors.New("kafka error"))
+
+       err := sender.SendPatchMessage(samplePatchData())
+       assert.Error(t, err)
+       assert.EqualError(t, err, "kafka error")
+       mockProducer.AssertExpectations(t)
+}
+
+// --- Test: JSON marshal error ---
+func TestSendPatchMessage_MarshalError(t *testing.T) {
+       sender, _ := getMockSender()
+
+       badData := []opasdk.PatchImpl{
+               {
+                       Path:  storage.MustParsePath("/invalid"),
+                       Op:    storage.AddOp,
+                       Value: make(chan int), // JSON marshal fails on channels
+               },
+       }
+
+       err := sender.SendPatchMessage(badData)
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "json: unsupported type: chan int")
+}
+
+// --- Test: Validate payload content ---
+func TestSendPatchMessage_PayloadContent(t *testing.T) {
+       sender, mockProducer := getMockSender()
+
+       mockProducer.On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool {
+               var payload PatchKafkaPayload
+               err := json.Unmarshal(msg.Value, &payload)
+               return err == nil &&
+                       len(payload.PatchInfos) == 1 &&
+                       payload.PatchInfos[0].Path.String() == "/policy/config/name" &&
+                       payload.PatchInfos[0].Value == "NewPolicyName"
+       })).Return(nil)
+
+       err := sender.SendPatchMessage(samplePatchData())
+       assert.NoError(t, err)
+       mockProducer.AssertExpectations(t)
+}
index da4888f..71f87ec 100644 (file)
@@ -24,6 +24,7 @@ package publisher
 
 import (
        "fmt"
+       "github.com/google/uuid"
        "policy-opa-pdp/consts"
        "policy-opa-pdp/pkg/log"
        "policy-opa-pdp/pkg/model"
@@ -32,7 +33,6 @@ import (
        "policy-opa-pdp/pkg/policymap"
        "sync"
        "time"
-       "github.com/google/uuid"
 )
 
 var (
index 7a8fe55..ece8b6f 100644 (file)
@@ -1,30 +1,32 @@
 // -
-//   ========================LICENSE_START=================================
-//   Copyright (C) 2024-2025: Deutsche Telekom
 //
-//   Licensed under the Apache License, Version 2.0 (the "License");
-//   you may not use this file except in compliance with the License.
-//   You may obtain a copy of the License at
+//     ========================LICENSE_START=================================
+//     Copyright (C) 2024-2025: Deutsche Telekom
 //
-//        http://www.apache.org/licenses/LICENSE-2.0
+//     Licensed under the Apache License, Version 2.0 (the "License");
+//     you may not use this file except in compliance with the License.
+//     You may obtain a copy of the License at
 //
-//   Unless required by applicable law or agreed to in writing, software
-//   distributed under the License is distributed on an "AS IS" BASIS,
-//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-//   See the License for the specific language governing permissions and
-//   limitations under the License.
-//   SPDX-License-Identifier: Apache-2.0
-//   ========================LICENSE_END===================================
+//          http://www.apache.org/licenses/LICENSE-2.0
 //
+//     Unless required by applicable law or agreed to in writing, software
+//     distributed under the License is distributed on an "AS IS" BASIS,
+//     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//     See the License for the specific language governing permissions and
+//     limitations under the License.
+//     SPDX-License-Identifier: Apache-2.0
+//     ========================LICENSE_END===================================
 package publisher
+
 import (
        "errors"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/mock"
        "policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
        "policy-opa-pdp/pkg/policymap"
        "testing"
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/mock"
 )
+
 /*
 Success Case 1
 TestStartHeartbeatIntervalTimer_ValidInterval
@@ -46,6 +48,7 @@ func TestStartHeartbeatIntervalTimer_ValidInterval(t *testing.T) {
                t.Errorf("Expected currentInterval to be %d, got %d", intervalMs, currentInterval)
        }
 }
+
 /*
 Failure Case 1
 TestStartHeartbeatIntervalTimer_InvalidInterval
@@ -64,6 +67,7 @@ func TestStartHeartbeatIntervalTimer_InvalidInterval(t *testing.T) {
                t.Log("Expected ticker to be nil for invalid interval")
        }
 }
+
 /*
 TestSendPDPHeartBeat_Success 2
 Description: Test sending a heartbeat successfully.
@@ -76,6 +80,7 @@ func TestSendPDPHeartBeat_Success(t *testing.T) {
        err := sendPDPHeartBeat(mockSender)
        assert.NoError(t, err)
 }
+
 /*
 TestSendPDPHeartBeat_Failure 2
 Description: Test failing to send a heartbeat.
@@ -89,6 +94,7 @@ func TestSendPDPHeartBeat_Failure(t *testing.T) {
        err := sendPDPHeartBeat(mockSender)
        assert.Error(t, err)
 }
+
 /*
 TestsendPDPHeartBeat_Success 3
 Description: Test sending a heartbeat successfully with some deployed policies.
@@ -106,6 +112,7 @@ func TestSendPDPHeartBeat_SuccessSomeDeployedPolicies(t *testing.T) {
        err := sendPDPHeartBeat(mockSender)
        assert.NoError(t, err)
 }
+
 /*
 TestsendPDPHeartBeat_Success 4
 Description: Test sending a heartbeat successfully with no deployed policies.
@@ -123,6 +130,7 @@ func TestSendPDPHeartBeat_SuccessNoDeployedPolicies(t *testing.T) {
        err := sendPDPHeartBeat(mockSender)
        assert.NoError(t, err)
 }
+
 /*
 TestStopTicker_Success 3
 Description: Test stopping the ticker.
@@ -141,6 +149,7 @@ func TestStopTicker_Success(t *testing.T) {
                t.Errorf("Expected ticker to be nil")
        }
 }
+
 /*
 TestStopTicker_NotRunning 3
 Description: Test stopping the ticker when it is not running.
index a9aa6e0..801db2d 100644 (file)
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -35,7 +35,7 @@ import (
        "github.com/google/uuid"
 )
 
-type(
+type (
        SendPdpUpdateResponseFunc func(s PdpStatusSender, pdpUpdate *model.PdpUpdate, resMessage string) error
 )
 
index b2b03d8..cf42a98 100644 (file)
 package metrics
 
 import (
-        "sync"
-        "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus"
+       "sync"
 )
+
 // global counter variables
 var TotalErrorCount int64
 var DecisionSuccessCount int64
@@ -32,24 +33,61 @@ var DeploySuccessCount int64
 var UndeployFailureCount int64
 var UndeploySuccessCount int64
 var TotalPoliciesCount int64
+var DynamicDataUpdateSuccessCount int64
+var DynamicDataUpdateFailureCount int64
 var mu sync.Mutex
 
-//Decision and Data counters to be used in prometheus
+// Decision and Data counters to be used in prometheus
 var (
-       DecisionResponseTime = prometheus.NewSummary(prometheus.SummaryOpts{
-               Name:    "opa_decision_response_time_seconds",
-               Help:    "Response time of OPA decision handler",
+       DecisionResponseTime_Prom = prometheus.NewSummary(prometheus.SummaryOpts{
+               Name: "opa_decision_response_time_seconds",
+               Help: "Response time of OPA decision handler",
+       })
+       DataResponseTime_Prom = prometheus.NewSummary(prometheus.SummaryOpts{
+               Name: "opa_data_response_time_seconds",
+               Help: "Response time of OPA data handler",
+       })
+       DecisionHandlerCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+               Name: "pdpo_policy_decisions_total",
+               Help: "Total Number of Decision Handler hits for OPA",
+       })
+       DeploymentSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+               Name: "pdpo_policy_deployments_total",
+               Help: "Total Number of Successful Deployment for OPA",
+       })
+       DeploymentFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+               Name: "pdpo_policy_failures_total",
+               Help: "Total Number of Deployment Failures for OPA",
+       })
+       DynamicDataUpdatesSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+               Name: "pdpo_dynamic_data_success_total",
+               Help: "Total Number of Successful Dynamic Data Updates for OPA",
+       })
+       DynamicDataUpdatesFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+               Name: "pdpo_dynamic_data_failures_total",
+               Help: "Total Number of Failed Dynamic Data Updates for OPA",
        })
-       DataResponseTime = prometheus.NewSummary(prometheus.SummaryOpts{
-               Name:    "opa_data_response_time_seconds",
-               Help:    "Response time of OPA data handler",
+       UndeploymentSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+               Name: "pdpo_policy_undeployments_success_total",
+               Help: "Total Number of Successful Deployment for OPA",
+       })
+       UndeploymentFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+               Name: "pdpo_policy_undeployments_failures_total",
+               Help: "Total Number of Deployment Failures for OPA",
        })
 )
 
-//register counters in init
+// register counters in init
 func init() {
-       prometheus.MustRegister(DecisionResponseTime)
-       prometheus.MustRegister(DataResponseTime)
+       prometheus.MustRegister(DecisionResponseTime_Prom)
+       prometheus.MustRegister(DataResponseTime_Prom)
+       prometheus.MustRegister(DecisionHandlerCount_Prom)
+       prometheus.MustRegister(DeploymentSuccessCount_Prom)
+       prometheus.MustRegister(DeploymentFailureCount_Prom)
+       prometheus.MustRegister(DynamicDataUpdatesSuccessCount_Prom)
+       prometheus.MustRegister(DynamicDataUpdatesFailureCount_Prom)
+       prometheus.MustRegister(UndeploymentSuccessCount_Prom)
+       prometheus.MustRegister(UndeploymentFailureCount_Prom)
 }
 
 // Increment counter
@@ -66,10 +104,39 @@ func totalErrorCountRef() *int64 {
        return &TotalErrorCount
 }
 
+func IncrementDynamicDataUpdateSuccessCount() {
+       mu.Lock()
+       DynamicDataUpdateSuccessCount++
+       DynamicDataUpdatesSuccessCount_Prom.Inc()
+       mu.Unlock()
+}
+
+func totalDynamicDataUpdateSuccessCountRef() *int64 {
+       mu.Lock()
+       defer mu.Unlock()
+       return &DynamicDataUpdateSuccessCount
+
+}
+
+func IncrementDynamicDataUpdateFailureCount() {
+       mu.Lock()
+       DynamicDataUpdateFailureCount++
+       DynamicDataUpdatesFailureCount_Prom.Inc()
+       mu.Unlock()
+}
+
+func totalDynamicDataUpdateFailureCountRef() *int64 {
+       mu.Lock()
+       defer mu.Unlock()
+       return &DynamicDataUpdateFailureCount
+
+}
+
 // Increment counter
 func IncrementDecisionSuccessCount() {
        mu.Lock()
        DecisionSuccessCount++
+       DecisionHandlerCount_Prom.Inc()
        mu.Unlock()
 }
 
@@ -85,6 +152,7 @@ func totalDecisionSuccessCountRef() *int64 {
 func IncrementDecisionFailureCount() {
        mu.Lock()
        DecisionFailureCount++
+       DecisionHandlerCount_Prom.Inc()
        mu.Unlock()
 }
 
@@ -100,6 +168,7 @@ func TotalDecisionFailureCountRef() *int64 {
 func IncrementDeploySuccessCount() {
        mu.Lock()
        DeploySuccessCount++
+       DeploymentSuccessCount_Prom.Inc()
        mu.Unlock()
 }
 
@@ -116,6 +185,7 @@ func totalDeploySuccessCountRef() *int64 {
 func IncrementDeployFailureCount() {
        mu.Lock()
        DeployFailureCount++
+       DeploymentFailureCount_Prom.Inc()
        mu.Unlock()
 }
 
@@ -132,6 +202,7 @@ func totalDeployFailureCountRef() *int64 {
 func IncrementUndeploySuccessCount() {
        mu.Lock()
        UndeploySuccessCount++
+       UndeploymentSuccessCount_Prom.Inc()
        mu.Unlock()
 }
 
@@ -148,6 +219,7 @@ func totalUndeploySuccessCountRef() *int64 {
 func IncrementUndeployFailureCount() {
        mu.Lock()
        UndeployFailureCount++
+       UndeploymentFailureCount_Prom.Inc()
        mu.Unlock()
 }
 
index 9776511..51b2ec3 100644 (file)
@@ -23,13 +23,13 @@ package metrics
 
 import (
        "encoding/json"
+       "github.com/google/uuid"
+       openapi_types "github.com/oapi-codegen/runtime/types"
        "net/http"
+       "policy-opa-pdp/consts"
        "policy-opa-pdp/pkg/log"
        "policy-opa-pdp/pkg/model/oapicodegen"
        "policy-opa-pdp/pkg/utils"
-       "policy-opa-pdp/consts"
-       "github.com/google/uuid"
-       openapi_types "github.com/oapi-codegen/runtime/types"
 )
 
 func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) {
@@ -65,6 +65,8 @@ func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) {
        statReport.UndeployFailureCount = totalUndeployFailureCountRef()
        statReport.UndeploySuccessCount = totalUndeploySuccessCountRef()
        statReport.TotalPoliciesCount = totalPoliciesCountRef()
+       statReport.DynamicDataUpdateFailureCount = totalDynamicDataUpdateFailureCountRef()
+       statReport.DynamicDataUpdateSuccessCount = totalDynamicDataUpdateSuccessCountRef()
 
        // not implemented hardcoding the values to zero
        // will be implemeneted in phase-2
index 34d88cd..59d9a74 100644 (file)
@@ -121,16 +121,18 @@ type OPADecisionResponse struct {
 
 // StatisticsReport defines model for StatisticsReport.
 type StatisticsReport struct {
-       Code                  *int32 `json:"code,omitempty"`
-       DecisionFailureCount  *int64 `json:"decisionFailureCount,omitempty"`
-       DecisionSuccessCount  *int64 `json:"decisionSuccessCount,omitempty"`
-       DeployFailureCount    *int64 `json:"deployFailureCount,omitempty"`
-       DeploySuccessCount    *int64 `json:"deploySuccessCount,omitempty"`
-       TotalErrorCount       *int64 `json:"totalErrorCount,omitempty"`
-       TotalPoliciesCount    *int64 `json:"totalPoliciesCount,omitempty"`
-       TotalPolicyTypesCount *int64 `json:"totalPolicyTypesCount,omitempty"`
-       UndeployFailureCount  *int64 `json:"undeployFailureCount,omitempty"`
-       UndeploySuccessCount  *int64 `json:"undeploySuccessCount,omitempty"`
+       Code                          *int32 `json:"code,omitempty"`
+       DecisionFailureCount          *int64 `json:"decisionFailureCount,omitempty"`
+       DecisionSuccessCount          *int64 `json:"decisionSuccessCount,omitempty"`
+       DeployFailureCount            *int64 `json:"deployFailureCount,omitempty"`
+       DeploySuccessCount            *int64 `json:"deploySuccessCount,omitempty"`
+       DynamicDataUpdateFailureCount *int64 `json:"dynamicDataUpdateFailureCount,omitempty"`
+       DynamicDataUpdateSuccessCount *int64 `json:"dynamicDataUpdateSuccessCount,omitempty"`
+       TotalErrorCount               *int64 `json:"totalErrorCount,omitempty"`
+       TotalPoliciesCount            *int64 `json:"totalPoliciesCount,omitempty"`
+       TotalPolicyTypesCount         *int64 `json:"totalPolicyTypesCount,omitempty"`
+       UndeployFailureCount          *int64 `json:"undeployFailureCount,omitempty"`
+       UndeploySuccessCount          *int64 `json:"undeploySuccessCount,omitempty"`
 }
 
 // DataGetParams defines parameters for DataGet.
index a40a55a..ca18d79 100644 (file)
@@ -22,8 +22,8 @@
 package pdpattributes
 
 import (
-       "policy-opa-pdp/pkg/log"
        "github.com/google/uuid"
+       "policy-opa-pdp/pkg/log"
 )
 
 var (
index fd09bd5..aa52503 100644 (file)
@@ -37,13 +37,13 @@ import (
 )
 
 type (
-       CreateDirectoryFunc func(dirPath string) error
+       CreateDirectoryFunc       func(dirPath string) error
        ValidateFieldsStructsFunc func(pdpUpdate model.PdpUpdate) error
 )
 
 var (
-       CreateDirectoryVar CreateDirectoryFunc = CreateDirectory
-       removeAll                              = os.RemoveAll
+       CreateDirectoryVar       CreateDirectoryFunc       = CreateDirectory
+       removeAll                                          = os.RemoveAll
        ValidateFieldsStructsVar ValidateFieldsStructsFunc = ValidateFieldsStructs
 )
 
@@ -361,7 +361,6 @@ func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) (string, error) {
        return string(output), nil
 }
 
-
 type CommonFields struct {
        CurrentDate     *string
        CurrentDateTime *time.Time
@@ -389,18 +388,17 @@ func ValidateOPADataRequest(request interface{}) []string {
                }
 
                commonFields := CommonFields{
-                       CurrentDate: &currentDate, 
-                       CurrentDateTime: updateReq.CurrentDateTime, 
-                       CurrentTime: updateReq.CurrentTime, 
-                       TimeOffset: updateReq.TimeOffset, 
-                       TimeZone: updateReq.TimeZone, 
-                       OnapComponent: updateReq.OnapComponent, 
-                       OnapInstance: updateReq.OnapInstance, 
-                       OnapName: updateReq.OnapName, 
-                       PolicyName: convertPtrToString(updateReq.PolicyName),
-
+                       CurrentDate:     &currentDate,
+                       CurrentDateTime: updateReq.CurrentDateTime,
+                       CurrentTime:     updateReq.CurrentTime,
+                       TimeOffset:      updateReq.TimeOffset,
+                       TimeZone:        updateReq.TimeZone,
+                       OnapComponent:   updateReq.OnapComponent,
+                       OnapInstance:    updateReq.OnapInstance,
+                       OnapName:        updateReq.OnapName,
+                       PolicyName:      convertPtrToString(updateReq.PolicyName),
                }
-               return validateCommonFields(commonFields)
+               return validateCommonFields(commonFields)
 
        }
 
@@ -412,15 +410,15 @@ func ValidateOPADataRequest(request interface{}) []string {
                }
 
                commonFields := CommonFields{
-                       CurrentDate: &currentDate,
-                       CurrentDateTime: decisionReq.CurrentDateTime,
-                       CurrentTime: decisionReq.CurrentTime,
-                       TimeOffset: decisionReq.TimeOffset,
-                       TimeZone: decisionReq.TimeZone,
-                       OnapComponent: decisionReq.OnapComponent,
-                       OnapInstance: decisionReq.OnapInstance,
-                       OnapName: decisionReq.OnapName,
-                       PolicyName: decisionReq.PolicyName,
+                       CurrentDate:     &currentDate,
+                       CurrentDateTime: decisionReq.CurrentDateTime,
+                       CurrentTime:     decisionReq.CurrentTime,
+                       TimeOffset:      decisionReq.TimeOffset,
+                       TimeZone:        decisionReq.TimeZone,
+                       OnapComponent:   decisionReq.OnapComponent,
+                       OnapInstance:    decisionReq.OnapInstance,
+                       OnapName:        decisionReq.OnapName,
+                       PolicyName:      decisionReq.PolicyName,
                }
                return validateCommonFields(commonFields)