From 8f564a29f90875c395a60b1dbeaef0b9d1ea8996 Mon Sep 17 00:00:00 2001 From: Shalini Shivam Date: Thu, 1 May 2025 13:33:57 +0200 Subject: [PATCH] Support Data Consistency for Dynamic Data Update Issue-ID: POLICY-5349 Change-Id: I2192f6b8d0c1d0593c324def2945596fd08ac946 Signed-off-by: Shalini Shivam --- Dockerfile | 23 +- api/openapi.yaml | 20 +- api/register-handlers.go | 24 +- api/register-handlers_test.go | 34 +- cfg/config.go | 40 ++- cfg/config_test.go | 35 +- cmd/opa-pdp/opa-pdp.go | 110 ++++-- cmd/opa-pdp/opa-pdp_test.go | 107 +++++- consts/constants.go | 125 +++---- pkg/data/data-handler.go | 185 ++++++---- pkg/data/data-handler_test.go | 374 ++++++++++++++++++++- pkg/kafkacomm/handler/patch_message_handler.go | 73 ++++ .../handler/patch_message_handler_test.go | 153 +++++++++ pkg/kafkacomm/handler/pdp_message_handler_test.go | 22 +- pkg/kafkacomm/handler/pdp_update_deploy_policy.go | 1 - .../handler/pdp_update_deploy_policy_test.go | 2 +- .../handler/pdp_update_message_handler.go | 9 +- .../handler/pdp_update_undeploy_policy_test.go | 114 +++---- pkg/kafkacomm/pdp_topic_consumer.go | 104 +++--- pkg/kafkacomm/pdp_topic_consumer_test.go | 14 +- pkg/kafkacomm/pdp_topic_producer.go | 59 ++-- pkg/kafkacomm/publisher/patch_message_publisher.go | 70 ++++ .../publisher/patch_message_publisher_test.go | 109 ++++++ pkg/kafkacomm/publisher/pdp-heartbeat.go | 2 +- pkg/kafkacomm/publisher/pdp-heartbeat_test.go | 39 ++- pkg/kafkacomm/publisher/pdp-status-publisher.go | 4 +- pkg/metrics/counters.go | 96 +++++- pkg/metrics/statistics-provider.go | 8 +- pkg/model/oapicodegen/models.go | 22 +- pkg/pdpattributes/pdpattributes.go | 2 +- pkg/utils/utils.go | 46 ++- 31 files changed, 1602 insertions(+), 424 deletions(-) create mode 100644 pkg/kafkacomm/handler/patch_message_handler.go create mode 100644 pkg/kafkacomm/handler/patch_message_handler_test.go create mode 100644 pkg/kafkacomm/publisher/patch_message_publisher.go create mode 100644 pkg/kafkacomm/publisher/patch_message_publisher_test.go diff --git a/Dockerfile b/Dockerfile index 5d2c43e..3b14c62 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,7 @@ FROM curlimages/curl:7.78.0 AS build # Get OPA -RUN curl -Lo /tmp/opa https://github.com/open-policy-agent/opa/releases/download/v0.69.0/opa_linux_amd64 +RUN curl --proto "=https" -Lo /tmp/opa https://github.com/open-policy-agent/opa/releases/download/v0.69.0/opa_linux_amd64 FROM golang:1.23 AS compile @@ -27,7 +27,18 @@ RUN mkdir /app COPY go.mod go.sum /app/ -COPY . . +# Copy individual files and directories +COPY Dockerfile /go/ +COPY api /go/api +COPY cfg /go/cfg +COPY cmd /go/cmd +COPY consts /go/consts +COPY go.mod /go/ +COPY go.sum /go/ +COPY pkg /go/pkg +COPY sonar-project.properties /go/ +COPY version /go/ +COPY version.properties /go/ RUN mkdir -p /app/cfg /app/consts /app/api /app/cmd /app/pkg /app/bundles COPY cfg /app/cfg @@ -42,9 +53,9 @@ WORKDIR /app # Build the binary RUN GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o /app/opa-pdp /app/cmd/opa-pdp/opa-pdp.go -FROM ubuntu +FROM ubuntu:24.04 -RUN apt-get update && apt-get install -y netcat-openbsd curl && rm -rf /var/lib/apt/lists/*\ +RUN apt-get update && apt-get --no-install-recommends install -y netcat-openbsd curl && rm -rf /var/lib/apt/lists/*\ && mkdir -p /app /opt/policies /opt/data /var/logs \ && chown -R ubuntu:ubuntu /app /opt/policies /opt/data /var/logs @@ -52,7 +63,8 @@ COPY --from=compile /app /app # Copy our opa executable from build stage COPY --from=build /tmp/opa /app/opa -RUN chmod +x /app/opa-pdp && chmod 755 /app/opa && chmod 777 /app/bundles +RUN chown 1000:1000 /app/opa-pdp && chown 1000:1000 /app/opa && chown 1000:1000 /app/bundles +RUN chmod u+x /app/opa-pdp && chmod u+x /app/opa && chmod u+x /app/bundles # Switch to the non-root user and 1000 is for ubuntu @@ -63,4 +75,3 @@ EXPOSE 8282 # Command to run OPA with the policies CMD ["/app/opa-pdp"] - diff --git a/api/openapi.yaml b/api/openapi.yaml index d8afb2f..e448ab0 100644 --- a/api/openapi.yaml +++ b/api/openapi.yaml @@ -112,7 +112,7 @@ paths: - basicAuth: [] x-interface info: last-mod-release: Paris - pdpo-version: 1.0.0 + pdpo-version: 1.0.3 x-codegen-request-body-name: body /healthcheck: get: @@ -172,7 +172,7 @@ paths: - basicAuth: [] x-interface info: last-mod-release: Paris - pdpo-version: 1.0.0 + pdpo-version: 1.0.3 /statistics: get: tags: @@ -231,7 +231,7 @@ paths: - basicAuth: [] x-interface info: last-mod-release: Paris - pdpo-version: 1.0.0 + pdpo-version: 1.0.3 /data/{path}: patch: tags: @@ -325,7 +325,7 @@ paths: - basicAuth: [] x-interface info: last-mod-release: Paris - pdpo-version: 1.0.0 + pdpo-version: 1.0.3 x-codegen-request-body-name: body get: tags: @@ -387,6 +387,11 @@ paths: 500: description: Internal Server Error content: {} + security: + - basicAuth: [] + x-interface info: + last-mod-release: Paris + pdpo-version: 1.0.3 components: schemas: ErrorResponse: @@ -552,6 +557,12 @@ components: decisionFailureCount: type: integer format: int64 + dynamicDataUpdateSuccessCount: + type: integer + format: int64 + dynamicDataUpdateFailureCount: + type: integer + format: int64 OPADataResponse: type: object properties: @@ -565,4 +576,3 @@ components: type: http description: "" scheme: basic - diff --git a/api/register-handlers.go b/api/register-handlers.go index 1bd1815..25f1688 100644 --- a/api/register-handlers.go +++ b/api/register-handlers.go @@ -22,16 +22,17 @@ package api import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "net/http" "policy-opa-pdp/cfg" "policy-opa-pdp/pkg/data" "policy-opa-pdp/pkg/decision" "policy-opa-pdp/pkg/healthcheck" + "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/metrics" "policy-opa-pdp/pkg/opasdk" "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" ) // RegisterHandlers registers the HTTP handlers for the service. @@ -57,7 +58,7 @@ func RegisterHandlers() { http.Handle("/policy/pdpo/v1/data", basicAuth(trackDataResponseTime(dataHandler))) - http.Handle("/metrics", basicAuth(http.HandlerFunc(metricsHandler))) + http.Handle("/metrics", basicAuth(http.HandlerFunc(metricsHandler))) } @@ -66,14 +67,14 @@ func metricsHandler(w http.ResponseWriter, r *http.Request) { promhttp.Handler().ServeHTTP(w, r) } -//Track Decision response time metrics +// Track Decision response time metrics func trackDecisionResponseTime(next http.HandlerFunc) http.HandlerFunc { - return trackResponseTime(metrics.DecisionResponseTime, next) + return trackResponseTime(metrics.DecisionResponseTime_Prom, next) } -//Track Data response time metrics +// Track Data response time metrics func trackDataResponseTime(next http.HandlerFunc) http.HandlerFunc { - return trackResponseTime(metrics.DataResponseTime, next) + return trackResponseTime(metrics.DataResponseTime_Prom, next) } func trackResponseTime(metricCollector prometheus.Observer, next http.HandlerFunc) http.HandlerFunc { @@ -104,3 +105,12 @@ func validateCredentials(username, password string) bool { validPass := cfg.Password return username == validUser && password == validPass } + +// handles readiness probe endpoint +func readinessProbe(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(http.StatusOK) + _, err := res.Write([]byte("Ready")) + if err != nil { + log.Errorf("Failed to write response: %v", err) + } +} diff --git a/api/register-handlers_test.go b/api/register-handlers_test.go index 92ad776..4905eed 100644 --- a/api/register-handlers_test.go +++ b/api/register-handlers_test.go @@ -20,6 +20,7 @@ package api import ( + "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" "policy-opa-pdp/cfg" @@ -27,7 +28,6 @@ import ( "policy-opa-pdp/pkg/healthcheck" "testing" "time" - "github.com/stretchr/testify/assert" ) // Mock configuration @@ -94,7 +94,6 @@ func TestBasicAuth(t *testing.T) { } } - type mockObserver struct { observedDuration float64 } @@ -149,3 +148,34 @@ func TestTrackResponseTime(t *testing.T) { handler(res, req) assert.NotNil(t, observer.observedDuration) } + +func TestMetricsHandler(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/metrics", nil) + rr := httptest.NewRecorder() + + metricsHandler(rr, req) + + resp := rr.Result() + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode, "expected status OK") + + contentType := resp.Header.Get("Content-Type") + assert.Contains(t, contentType, "text/plain", "expected Prometheus content type") + +} + +func TestReadinessProbe(t *testing.T) { + req := httptest.NewRequest(http.MethodGet, "/ready", nil) + rr := httptest.NewRecorder() + + readinessProbe(rr, req) + + resp := rr.Result() + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode, "expected HTTP 200 OK") + + body := rr.Body.String() + assert.Equal(t, "Ready", body, "expected response body to be 'Ready'") +} diff --git a/cfg/config.go b/cfg/config.go index 62d4853..459d338 100644 --- a/cfg/config.go +++ b/cfg/config.go @@ -41,16 +41,19 @@ import ( // KAFKA_USERNAME - The Kafka username for SASL authentication. // KAFKA_PASSWORD - The Kafka password for SASL authentication. var ( - LogLevel string - BootstrapServer string - Topic string - GroupId string - Username string - Password string - UseSASLForKAFKA string - KAFKA_USERNAME string - KAFKA_PASSWORD string - JAASLOGIN string + LogLevel string + BootstrapServer string + Topic string + PatchTopic string + GroupId string + Username string + Password string + UseSASLForKAFKA string + KAFKA_USERNAME string + KAFKA_PASSWORD string + JAASLOGIN string + UseKafkaForPatch bool + PatchGroupId string ) // Initializes the configuration settings. @@ -66,14 +69,16 @@ func init() { LogLevel = getEnv("LOG_LEVEL", "info") BootstrapServer = getEnv("KAFKA_URL", "kafka:9092") Topic = getEnv("PAP_TOPIC", "policy-pdp-pap") + PatchTopic = getEnv("PATCH_TOPIC", "opa-pdp-data") GroupId = getEnv("GROUPID", "opa-pdp-"+uuid.New().String()) + PatchGroupId = getEnv("PATCH_GROUPID", "opa-pdp-data-"+uuid.New().String()) Username = getEnv("API_USER", "policyadmin") Password = getEnv("API_PASSWORD", "zb!XztG34") UseSASLForKAFKA = getEnv("UseSASLForKAFKA", "false") KAFKA_USERNAME, KAFKA_PASSWORD = getSaslJAASLOGINFromEnv(JAASLOGIN) log.Debugf("Username: %s", KAFKA_USERNAME) log.Debugf("Password: %s", KAFKA_PASSWORD) - + UseKafkaForPatch = getEnvAsBool("USE_KAFKA_FOR_PATCH", false) log.Debug("Configuration module: environment initialised") } @@ -109,6 +114,19 @@ func getLogLevel(key string, defaultVal string) log.Level { } } +func getEnvAsBool(key string, defaultVal bool) bool { + if value, exists := os.LookupEnv(key); exists { + parsed, err := strconv.ParseBool(value) + if err != nil { + log.Warnf("%v is set but not a valid bool (%v), using default: %v", key, value, defaultVal) + return defaultVal + } + return parsed + } + log.Warnf("%v not defined, using default value: %v", key, defaultVal) + return defaultVal +} + func getSaslJAASLOGINFromEnv(JAASLOGIN string) (string, string) { // Retrieve the value of the environment variable decodingConfigBytes := getEnv("JAASLOGIN", "JAASLOGIN") diff --git a/cfg/config_test.go b/cfg/config_test.go index 092a67e..652aed0 100644 --- a/cfg/config_test.go +++ b/cfg/config_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -131,3 +131,36 @@ func TestGetSaslJAASLOGINFromEnv_MissingEnv(t *testing.T) { assert.Empty(t, username, "Expected username to be empty for missing environment variable") assert.Empty(t, password, "Expected password to be empty for missing environment variable") } + +func TestGetEnvAsBool(t *testing.T) { + t.Run("valid boolean true", func(t *testing.T) { + os.Setenv("USE_KAFKA_FOR_PATCH", "true") + defer os.Unsetenv("USE_KAFKA_FOR_PATCH") + + result := getEnvAsBool("USE_KAFKA_FOR_PATCH", false) + assert.True(t, result) + }) + + t.Run("valid boolean false", func(t *testing.T) { + os.Setenv("USE_KAFKA_FOR_PATCH", "false") + defer os.Unsetenv("USE_KAFKA_FOR_PATCH") + + result := getEnvAsBool("USE_KAFKA_FOR_PATCH", true) + assert.False(t, result) + }) + + t.Run("invalid boolean value", func(t *testing.T) { + os.Setenv("USE_KAFKA_FOR_PATCH", "notabool") + defer os.Unsetenv("USE_KAFKA_FOR_PATCH") + + result := getEnvAsBool("USE_KAFKA_FOR_PATCH", true) + assert.True(t, result) // should return default (true) because parsing fails + }) + + t.Run("missing env variable", func(t *testing.T) { + os.Unsetenv("USE_KAFKA_FOR_PATCH") // ensure it's not set + + result := getEnvAsBool("USE_KAFKA_FOR_PATCH", false) + assert.False(t, result) // should return default (false) + }) +} diff --git a/cmd/opa-pdp/opa-pdp.go b/cmd/opa-pdp/opa-pdp.go index f4b8093..4005481 100644 --- a/cmd/opa-pdp/opa-pdp.go +++ b/cmd/opa-pdp/opa-pdp.go @@ -29,6 +29,7 @@ import ( h "policy-opa-pdp/api" "policy-opa-pdp/cfg" "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/data" "policy-opa-pdp/pkg/kafkacomm" "policy-opa-pdp/pkg/kafkacomm/handler" "policy-opa-pdp/pkg/kafkacomm/publisher" @@ -42,22 +43,30 @@ import ( var ( bootstrapServers = cfg.BootstrapServer //The Kafka bootstrap server address. topic = cfg.Topic //The Kafka topic to subscribe to. + patchTopic = cfg.PatchTopic + patchMsgProducer *kafkacomm.KafkaProducer + patchMsgConsumer *kafkacomm.KafkaConsumer + groupId = cfg.GroupId + patchGroupId = cfg.PatchGroupId ) // Declare function variables for dependency injection makes it more testable var ( - initializeHandlersFunc = initializeHandlers - startHTTPServerFunc = startHTTPServer - shutdownHTTPServerFunc = shutdownHTTPServer - waitForServerFunc = waitForServer - initializeOPAFunc = initializeOPA - startKafkaConsAndProdFunc = startKafkaConsAndProd - handleMessagesFunc = handleMessages - handleShutdownFunc = handleShutdown + initializeHandlersFunc = initializeHandlers + startHTTPServerFunc = startHTTPServer + shutdownHTTPServerFunc = shutdownHTTPServer + waitForServerFunc = waitForServer + initializeOPAFunc = initializeOPA + startKafkaConsAndProdFunc = startKafkaConsAndProd + handleMessagesFunc = handleMessages + handleShutdownFunc = handleShutdown + startPatchKafkaConsAndProdFunc = startPatchKafkaConsAndProd + handlePatchMessagesFunc = handlePatchMessages ) // main function func main() { + var useKafkaForPatch = cfg.UseKafkaForPatch log.Debugf("Starting OPA PDP Service") ctx, cancel := context.WithCancel(context.Background()) @@ -91,6 +100,16 @@ func main() { // start pdp message handler in a seperate routine handleMessagesFunc(ctx, kc, sender) + if useKafkaForPatch { + patchMsgConsumer, patchMsgProducer, err := startPatchKafkaConsAndProdFunc() + if err != nil || patchMsgConsumer == nil { + log.Warnf("Kafka consumer initialization failed: %v", err) + } + log.Debugf("Producer initialized is: %v", patchMsgProducer) + // start patch message handler in a seperate routine + handlePatchMessagesFunc(ctx, patchMsgConsumer) + } + time.Sleep(10 * time.Second) pdpattributes.SetPdpHeartbeatInterval(int64(consts.DefaultHeartbeatMS)) @@ -101,7 +120,9 @@ func main() { // Handle OS Interrupts and Graceful Shutdown interruptChannel := make(chan os.Signal, 1) signal.Notify(interruptChannel, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP) - handleShutdownFunc(kc, interruptChannel, cancel, producer) + consumers := []*kafkacomm.KafkaConsumer{kc, patchMsgConsumer} + producers := []*kafkacomm.KafkaProducer{producer, patchMsgProducer} + handleShutdownFunc(consumers, interruptChannel, cancel, producers) } type PdpMessageHandlerFunc func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error @@ -119,6 +140,21 @@ func handleMessages(ctx context.Context, kc *kafkacomm.KafkaConsumer, sender *pu }() } +type PatchMessageHandlerFunc func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error + +var PatchMessageHandler PatchMessageHandlerFunc = handler.PatchMessageHandler + +// starts patchMessage Handler in a seperate routine which handles incoming messages on Kfka topic +func handlePatchMessages(ctx context.Context, kc *kafkacomm.KafkaConsumer) { + + go func() { + err := PatchMessageHandler(ctx, kc, patchTopic) + if err != nil { + log.Warnf("Erro in Patch Message Handler: %v", err) + } + }() +} + // Register Handlers func initializeHandlers() { h.RegisterHandlers() @@ -163,7 +199,7 @@ func initializeOPA() error { return nil } -type NewKafkaConsumerFunc func() (*kafkacomm.KafkaConsumer, error) +type NewKafkaConsumerFunc func(topic string, groupid string) (*kafkacomm.KafkaConsumer, error) var NewKafkaConsumer NewKafkaConsumerFunc = kafkacomm.NewKafkaConsumer @@ -172,7 +208,8 @@ type GetKafkaProducerFunc func(bootstrapServers string, topic string) (*kafkacom var GetKafkaProducer GetKafkaProducerFunc = kafkacomm.GetKafkaProducer func startKafkaConsAndProd() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) { - kc, err := NewKafkaConsumer() + log.Debugf("Topic start :::: %s", topic) + kc, err := NewKafkaConsumer(topic, groupId) if err != nil { log.Warnf("Failed to create Kafka consumer: %v", err) return nil, nil, err @@ -185,7 +222,23 @@ func startKafkaConsAndProd() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer return kc, producer, nil } -func handleShutdown(kc *kafkacomm.KafkaConsumer, interruptChannel chan os.Signal, cancel context.CancelFunc, producer *kafkacomm.KafkaProducer) { +func startPatchKafkaConsAndProd() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) { + log.Debugf("Topic start :::: %s", patchTopic) + kc, err := NewKafkaConsumer(patchTopic, patchGroupId) + if err != nil { + log.Warnf("Failed to create Kafka consumer: %v", err) + return nil, nil, err + } + PatchProducer, err := GetKafkaProducer(bootstrapServers, patchTopic) + if err != nil { + log.Warnf("Failed to create Kafka producer: %v", err) + return nil, nil, err + } + data.PatchProducer = PatchProducer + return kc, PatchProducer, nil +} + +func handleShutdown(consumers []*kafkacomm.KafkaConsumer, interruptChannel chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) { myLoop: for { @@ -200,21 +253,28 @@ myLoop: signal.Stop(interruptChannel) publisher.StopTicker() - producer.Close() - if kc == nil { - log.Debugf("kc is nil so skipping") - return + for _, producer := range producers { + if producer != nil { + producer.Close() + } } - if err := kc.Consumer.Unsubscribe(); err != nil { - log.Warnf("Failed to unsubscribe consumer: %v", err) - } else { - log.Debugf("Consumer Unsubscribed....") - } - if err := kc.Consumer.Close(); err != nil { - log.Debug("Failed to close consumer......") - } else { - log.Debugf("Consumer closed....") + for _, consumer := range consumers { + if consumer == nil { + log.Debugf("kc is nil so skipping") + continue + } + + if err := consumer.Consumer.Unsubscribe(); err != nil { + log.Warnf("Failed to unsubscribe consumer: %v", err) + } else { + log.Debugf("Consumer Unsubscribed....") + } + if err := consumer.Consumer.Close(); err != nil { + log.Debug("Failed to close consumer......") + } else { + log.Debugf("Consumer closed....") + } } handler.SetShutdownFlag() diff --git a/cmd/opa-pdp/opa-pdp_test.go b/cmd/opa-pdp/opa-pdp_test.go index e421f12..d350c97 100644 --- a/cmd/opa-pdp/opa-pdp_test.go +++ b/cmd/opa-pdp/opa-pdp_test.go @@ -128,7 +128,7 @@ func TestHandleShutdown(t *testing.T) { }() done := make(chan bool) go func() { - handleShutdown(mockKafkaConsumer, interruptChannel, cancel, kafkaProducer) + handleShutdown([]*kafkacomm.KafkaConsumer{mockKafkaConsumer, nil}, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer}) done <- true }() @@ -185,7 +185,7 @@ func SetupMocks() { // Mock handleShutdown interruptChannel := make(chan os.Signal, 1) - handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, kp *kafkacomm.KafkaProducer) { + handleShutdownFunc = func(consumers []*kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) { interruptChannel <- os.Interrupt cancel() } @@ -348,7 +348,7 @@ func TestHandleShutdown_ErrorScenario(t *testing.T) { done := make(chan bool) go func() { - handleShutdown(mockKafkaConsumer, interruptChannel, cancel, kafkaProducer) + handleShutdown([]*kafkacomm.KafkaConsumer{mockKafkaConsumer}, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer}) done <- true }() @@ -436,6 +436,25 @@ func TestHandleMessages(t *testing.T) { } +// TestHandleMessages +func TestHandlePatchMessages(t *testing.T) { + message := `{"patchInfos":[{"Path":["node","testcell","testconsistency","testmaxPCI"],"Op":0,"Value":6778}]}` + mockKafkaConsumer := new(mocks.KafkaConsumerInterface) + expectedError := error(nil) + + kafkaMsg := &kafka.Message{ + Value: []byte(message), + } + mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError) + mockConsumer := &kafkacomm.KafkaConsumer{ + Consumer: mockKafkaConsumer, + } + + ctx := context.Background() + handlePatchMessages(ctx, mockConsumer) + +} + // Test to simulate a Kafka initialization failure in the main function. func TestMain_KafkaInitializationFailure(t *testing.T) { startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) { @@ -458,7 +477,7 @@ func TestMain_KafkaInitializationFailure(t *testing.T) { // Test to validate the main function's handling of shutdown signals. func TestMain_HandleShutdownWithSignals(t *testing.T) { - handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producer *kafkacomm.KafkaProducer) { + handleShutdownFunc = func(consumers []*kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) { go func() { interruptChan <- os.Interrupt // Simulate SIGTERM }() @@ -487,7 +506,7 @@ func TestStartKafkaConsumerFailure(t *testing.T) { t.Run("Kafka consumer creation failure", func(t *testing.T) { originalNewKafkaConsumer := NewKafkaConsumer originalGetKafkaProducer := GetKafkaProducer - NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) { + NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) { return nil, errors.New("Kafka consumer creation error") } @@ -514,7 +533,7 @@ func TestStartKafkaProducerFailure(t *testing.T) { t.Run("Kafka producer creation failure", func(t *testing.T) { originalNewKafkaConsumer := NewKafkaConsumer originalGetKafkaProducer := GetKafkaProducer - NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) { + NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) { return mockConsumer, nil } @@ -540,7 +559,7 @@ func TestStartKafkaAndProdSuccess(t *testing.T) { t.Run("Kafka consumer and producer creation success", func(t *testing.T) { originalNewKafkaConsumer := NewKafkaConsumer originalGetKafkaProducer := GetKafkaProducer - NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) { + NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) { return mockConsumer, nil } GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { @@ -575,7 +594,7 @@ func TestHandleShutdownWithNilConsumer(t *testing.T) { }() done := make(chan bool) go func() { - handleShutdown(nil, interruptChannel, cancel, kafkaProducer) // Pass nil as kc + handleShutdown(nil, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer}) // Pass nil as kc done <- true }() @@ -632,3 +651,75 @@ func TestShutdownHTTPServer_Errors(t *testing.T) { shutdownHTTPServer(server) assert.True(t, true, "Shutdown error") } + +// Test to verify that both the Kafka consumer and producer start successfully +func TestPatchStartKafkaAndProdSuccess(t *testing.T) { + t.Run("Kafka consumer and producer creation success", func(t *testing.T) { + originalNewKafkaConsumer := NewKafkaConsumer + originalGetKafkaProducer := GetKafkaProducer + NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) { + return mockConsumer, nil + } + GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { + return mockProducer, nil + } + + // Call the function under test + consumer, producer, err := startPatchKafkaConsAndProd() + + // Assertions + assert.NoError(t, err) + assert.NotNil(t, consumer) + assert.NotNil(t, producer) + + NewKafkaConsumer = originalNewKafkaConsumer + GetKafkaProducer = originalGetKafkaProducer + }) +} + +// Test to verify error scenarios during Kafka consumer and producer start +func TestPatchStartKafkaAndProdFailure(t *testing.T) { + t.Run("Kafka consumer creation failure", func(t *testing.T) { + originalNewKafkaConsumer := NewKafkaConsumer + originalGetKafkaProducer := GetKafkaProducer + defer func() { + NewKafkaConsumer = originalNewKafkaConsumer + GetKafkaProducer = originalGetKafkaProducer + }() + + NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) { + return nil, errors.New("consumer creation failed") + } + GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { + return mockProducer, nil + } + + consumer, producer, err := startPatchKafkaConsAndProd() + + assert.Error(t, err) + assert.Nil(t, consumer) + assert.Nil(t, producer) + }) + + t.Run("Kafka producer creation failure", func(t *testing.T) { + originalNewKafkaConsumer := NewKafkaConsumer + originalGetKafkaProducer := GetKafkaProducer + defer func() { + NewKafkaConsumer = originalNewKafkaConsumer + GetKafkaProducer = originalGetKafkaProducer + }() + + NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) { + return mockConsumer, nil + } + GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) { + return nil, errors.New("producer creation failed") + } + + consumer, producer, err := startPatchKafkaConsAndProd() + + assert.Error(t, err) + assert.Nil(t, consumer) + assert.Nil(t, producer) + }) +} diff --git a/consts/constants.go b/consts/constants.go index 3623f93..37407eb 100644 --- a/consts/constants.go +++ b/consts/constants.go @@ -23,67 +23,70 @@ package consts // Variables: // -// LogFilePath - The file path for the log file. -// LogMaxSize - The maximum size of the log file in megabytes. -// LogMaxBackups - The maximum number of backup log files to retain. -// OpasdkConfigPath - The file path for the OPA SDK configuration. -// Opa - The file path for the OPA binary. -// BuildBundle - The command to build the bundle. -// Policies - The directory path for policies. -// Data - The directory path for policy data. -// DataNode - The directory path for policy data with node. -// Output - The output flag for bundle commands. -// BundleTarGz - The name of the bundle tar.gz file. -// BundleTarGzFile - The file path for the bundle tar.gz file. -// PdpGroup - The default PDP group. -// PdpType - The type of PDP. -// ServerPort - The port on which the server listens. -// ServerWaitUpTime - The time to wait for the server to be up, in seconds. -// ShutdownWaitTime - The time to wait for the server to shut down, in seconds. -// V1Compatible - The flag for v1 compatibility. -// LatestVersion - The Version set in response for decision -// MinorVersion - The Minor version set in response header for decision -// PatchVersion - The Patch Version set in response header for decison -// OpaPdpUrl - The Healthcheck url for response -// HealtCheckStatus - The bool flag for Healthy field in HealtCheck response -// OkCode - The Code for HealthCheck response -// HealthCheckMessage - The Healtcheck Message -// DefaultHeartbeatMS - The default interval for heartbeat signals in milliseconds. -// SingleHierarchy - The Counter indicates the length of datakey path -// PolicyVersion - constant declared for policy-version -// PolicyID - constant declared for policy-id -// RequestId - constant declared for ONAP Request-ID -// MaxOutputResponseLength - constant declared for maximum length of output in response message +// LogFilePath - The file path for the log file. +// LogMaxSize - The maximum size of the log file in megabytes. +// LogMaxBackups - The maximum number of backup log files to retain. +// OpasdkConfigPath - The file path for the OPA SDK configuration. +// Opa - The file path for the OPA binary. +// BuildBundle - The command to build the bundle. +// Policies - The directory path for policies. +// Data - The directory path for policy data. +// DataNode - The directory path for policy data with node. +// Output - The output flag for bundle commands. +// BundleTarGz - The name of the bundle tar.gz file. +// BundleTarGzFile - The file path for the bundle tar.gz file. +// PdpGroup - The default PDP group. +// PdpType - The type of PDP. +// ServerPort - The port on which the server listens. +// ServerWaitUpTime - The time to wait for the server to be up, in seconds. +// ShutdownWaitTime - The time to wait for the server to shut down, in seconds. +// V1Compatible - The flag for v1 compatibility. +// LatestVersion - The Version set in response for decision +// MinorVersion - The Minor version set in response header for decision +// PatchVersion - The Patch Version set in response header for decison +// OpaPdpUrl - The Healthcheck url for response +// HealtCheckStatus - The bool flag for Healthy field in HealtCheck response +// OkCode - The Code for HealthCheck response +// HealthCheckMessage - The Healtcheck Message +// DefaultHeartbeatMS - The default interval for heartbeat signals in milliseconds. +// SingleHierarchy - The Counter indicates the length of datakey path +// PolicyVersion - constant declared for policy-version +// PolicyID - constant declared for policy-id +// RequestId - constant declared for ONAP Request-ID +// MaxOutputResponseLength - constant declared for maximum length of output in response message +// ContentType - constant for response Content Type var ( - LogFilePath = "/var/logs/logs.log" - LogMaxSize = 10 - LogMaxBackups = 3 - OpasdkConfigPath = "/app/config/config.json" - Opa = "/app/opa" - BuildBundle = "build" - Policies = "/opt/policies" - Data = "/opt/data" - DataNode = "/opt/data/node" - Output = "-o" - BundleTarGz = "bundle.tar.gz" - BundleTarGzFile = "/app/bundles/bundle.tar.gz" - PdpGroup = "opaGroup" - PdpType = "opa" - ServerPort = ":8282" - ServerWaitUpTime = 5 - ShutdownWaitTime = 5 - V1Compatible = "--v1-compatible" - LatestVersion = "1.0.0" - MinorVersion = "0" - PatchVersion = "0" - OpaPdpUrl = "self" - HealtCheckStatus = true - OkCode = int32(200) - HealthCheckMessage = "alive" - DefaultHeartbeatMS = 60000 - SingleHierarchy = 4 - PolicyVersion = "policy-version" - PolicyId = "policy-id" - RequestId = "X-ONAP-RequestID" + LogFilePath = "/var/logs/logs.log" + LogMaxSize = 10 + LogMaxBackups = 3 + OpasdkConfigPath = "/app/config/config.json" + Opa = "/app/opa" + BuildBundle = "build" + Policies = "/opt/policies" + Data = "/opt/data" + DataNode = "/opt/data/node" + Output = "-o" + BundleTarGz = "bundle.tar.gz" + BundleTarGzFile = "/app/bundles/bundle.tar.gz" + PdpGroup = "opaGroup" + PdpType = "opa" + ServerPort = ":8282" + ServerWaitUpTime = 5 + ShutdownWaitTime = 5 + V1Compatible = "--v1-compatible" + LatestVersion = "1.0.0" + MinorVersion = "0" + PatchVersion = "0" + OpaPdpUrl = "self" + HealtCheckStatus = true + OkCode = int32(200) + HealthCheckMessage = "alive" + DefaultHeartbeatMS = 60000 + SingleHierarchy = 4 + PolicyVersion = "policy-version" + PolicyId = "policy-id" + RequestId = "X-ONAP-RequestID" MaxOutputResponseLength = 200 + ContentType = "Content-Type" + ApplicationJson = "application/json" ) diff --git a/pkg/data/data-handler.go b/pkg/data/data-handler.go index e1dcf17..3267408 100644 --- a/pkg/data/data-handler.go +++ b/pkg/data/data-handler.go @@ -28,7 +28,10 @@ import ( "github.com/open-policy-agent/opa/storage" "net/http" "path/filepath" + "policy-opa-pdp/cfg" "policy-opa-pdp/consts" + "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/kafkacomm/publisher" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/metrics" "policy-opa-pdp/pkg/model/oapicodegen" @@ -40,6 +43,7 @@ import ( type ( checkIfPolicyAlreadyExistsFunc func(policyId string) bool + validateRequestFunc func(requestBody *oapicodegen.OPADataUpdateRequest) error ) var ( @@ -49,6 +53,11 @@ var ( checkIfPolicyAlreadyExistsVar checkIfPolicyAlreadyExistsFunc = policymap.CheckIfPolicyAlreadyExists getPolicyByIDVar = getPolicyByID extractPatchInfoVar = extractPatchInfo + bootstrapServers = cfg.BootstrapServer //The Kafka bootstrap server address. + PatchProducer kafkacomm.KafkaProducerInterface + patchTopic = cfg.PatchTopic + PatchDataVar = PatchData + getOperationTypeVar = getOperationType ) // creates a response code map to OPADataUpdateResponse @@ -69,7 +78,7 @@ func getErrorResponseCodeForOPADataUpdate(httpStatus int) oapicodegen.ErrorRespo // writes a Error JSON response to the HTTP response writer for OPADataUpdate func writeOPADataUpdateErrorJSONResponse(res http.ResponseWriter, status int, errorDescription string, dataErrorRes oapicodegen.ErrorResponse) { - res.Header().Set("Content-Type", "application/json") + res.Header().Set(consts.ContentType, consts.ApplicationJson) res.WriteHeader(status) if err := json.NewEncoder(res).Encode(dataErrorRes); err != nil { http.Error(res, err.Error(), status) @@ -113,63 +122,125 @@ func getPolicyByID(policiesMap string, policyId string) (*Policy, error) { } func patchHandler(res http.ResponseWriter, req *http.Request) { + var useKafkaForPatch = cfg.UseKafkaForPatch log.Infof("PDP received a request to update data through API") constructResponseHeader(res, req) var requestBody oapicodegen.OPADataUpdateRequest - if err := json.NewDecoder(req.Body).Decode(&requestBody); err != nil { - errMsg := "Error in decoding the request data - " + err.Error() + + requestBody, err := decodeRequest(req) + if err != nil { + sendErrorResponse(res, err.Error(), http.StatusBadRequest) + return + } + + dataDir, dirParts := extractDataDir(req) + + if err := validateRequest(&requestBody); err != nil { + sendErrorResponse(res, err.Error(), http.StatusBadRequest) + return + } + log.Debug("All fields are valid!") + // Access the data part + data := requestBody.Data + log.Infof("data : %s", data) + policyId := requestBody.PolicyName + if policyId == nil { + errMsg := "Policy Id is nil" + sendErrorResponse(res, errMsg, http.StatusBadRequest) + return + } + log.Infof("policy name : %s", *policyId) + isExists := policymap.CheckIfPolicyAlreadyExists(*policyId) + if !isExists { + errMsg := "Policy associated with the patch request does not exists" sendErrorResponse(res, errMsg, http.StatusBadRequest) log.Errorf(errMsg) return } - path := strings.TrimPrefix(req.URL.Path, "/policy/pdpo/v1/data") - dirParts := strings.Split(path, "/") - dataDir := filepath.Join(dirParts...) - log.Infof("dataDir : %s", dataDir) - - // Validate the request - validationErrors := utils.ValidateOPADataRequest(&requestBody) - // Validate Data field (ensure it's not nil and has items) - if !(utils.IsValidData(requestBody.Data)) { - validationErrors = append(validationErrors, "Data is required and cannot be empty") + matchFound := validatePolicyDataPathMatched(dirParts, *policyId, res) + if !matchFound { + return } - // Print validation errors - if len(validationErrors) > 0 { - errMsg := strings.Join(validationErrors, ", ") - log.Errorf("Facing validation error in requestbody - %s", errMsg) - sendErrorResponse(res, errMsg, http.StatusBadRequest) + patchInfos, err := getPatchInfo(requestBody.Data, dataDir, res) + if err != nil { + log.Warnf("Failed to get Patch Info : %v", err) return - } else { - log.Debug("All fields are valid!") - // Access the data part - data := requestBody.Data - log.Infof("data : %s", data) - policyId := requestBody.PolicyName - if policyId == nil { - errMsg := "Policy Id is nil" - sendErrorResponse(res, errMsg, http.StatusBadRequest) - return - } - log.Infof("policy name : %s", *policyId) - isExists := policymap.CheckIfPolicyAlreadyExists(*policyId) - if !isExists { - errMsg := "Policy associated with the patch request does not exists" - sendErrorResponse(res, errMsg, http.StatusBadRequest) - log.Errorf(errMsg) + } + + if useKafkaForPatch { + err := handleDynamicUpdateRequestWithKafka(patchInfos, res) + if err != nil { + log.Warnf("Error in handling dynamic update request wit kafka: %v", err) return } + res.Header().Set(consts.ContentType, consts.ApplicationJson) + res.WriteHeader(http.StatusAccepted) + _, _ = res.Write([]byte(`{"message": "Patch request accepted for processing via kafka and Use the get data url to fetch the latest data. In case of errors, Check logs."uri":"/policy/pdpo/v1/data/""}`)) + metrics.IncrementDynamicDataUpdateSuccessCount() + return + } + if err := PatchData(patchInfos, res); err != nil { + // Handle the error, for example, log it or return an appropriate response + log.Errorf("Error encoding JSON response: %s", err) + return + + } + metrics.IncrementDynamicDataUpdateSuccessCount() - matchFound := validatePolicyDataPathMatched(dirParts, *policyId, res) +} - if matchFound { - if err := patchData(dataDir, data, res); err != nil { - // Handle the error, for example, log it or return an appropriate response - log.Errorf("Error encoding JSON response: %s", err) - } - } +func decodeRequest(req *http.Request) (oapicodegen.OPADataUpdateRequest, error) { + var requestBody oapicodegen.OPADataUpdateRequest + if err := json.NewDecoder(req.Body).Decode(&requestBody); err != nil { + return requestBody, fmt.Errorf("Error in decoding request data: %v", err) + } + return requestBody, nil +} + +func extractDataDir(req *http.Request) (string, []string) { + path := strings.TrimPrefix(req.URL.Path, "/policy/pdpo/v1/data") + dirParts := strings.Split(path, "/") + return filepath.Join(dirParts...), dirParts +} + +func validateRequest(requestBody *oapicodegen.OPADataUpdateRequest) error { + validationErrors := utils.ValidateOPADataRequest(requestBody) + if !utils.IsValidData(requestBody.Data) { + validationErrors = append(validationErrors, "Data is required and cannot be empty") + } + if len(validationErrors) > 0 { + return fmt.Errorf(strings.Join(validationErrors, ", ")) + } + return nil +} + +func getPatchInfo(data *[]map[string]interface{}, dataDir string, res http.ResponseWriter) ([]opasdk.PatchImpl, error) { + root := "/" + strings.Trim(dataDir, "/") + patchInfos, err := extractPatchInfoVar(res, data, root) + if patchInfos == nil || err != nil { + return nil, fmt.Errorf("Error in extracting Patch Info : %v", err) } + return patchInfos, nil +} + +func handleDynamicUpdateRequestWithKafka(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error { + + if PatchProducer == nil { + log.Warnf("Failed to initialize Kafka producer") + return fmt.Errorf("Failed to initialize Kafka producer") + + } + sender := &publisher.RealPatchSender{ + Producer: PatchProducer, + } + if err := sender.SendPatchMessage(patchInfos); err != nil { + log.Warnf("Failed to send Patch Messge, %v", err) + return err + } + + return nil } func DataHandler(res http.ResponseWriter, req *http.Request) { @@ -191,11 +262,11 @@ func extractPatchInfo(res http.ResponseWriter, ops *[]map[string]interface{}, ro optypeString, opTypeErr := op["op"].(string) if !opTypeErr { opTypeErrMsg := "Error in getting op type. Op type is not given in request body" - sendErrorResponse(res, opTypeErrMsg, http.StatusInternalServerError) + sendErrorResponse(res, opTypeErrMsg, http.StatusBadRequest) log.Errorf(opTypeErrMsg) return nil, fmt.Errorf("Error in getting op type. Op type is not given in request body") } - opType, err := getOperationType(optypeString, res) + opType, err := getOperationTypeVar(optypeString, res) if err != nil { log.Warnf("Error in getting opType: %v", err) @@ -235,7 +306,7 @@ func getPatchValue(op map[string]interface{}, res http.ResponseWriter) (interfac value, valueErr = op["value"] if !valueErr || isEmpty(value) { valueErrMsg := "Error in getting data value. Value is not given in request body" - sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError) + sendErrorResponse(res, valueErrMsg, http.StatusBadRequest) log.Errorf(valueErrMsg) return nil, fmt.Errorf("Error in getting data value. Value is not given in request body") } @@ -298,7 +369,7 @@ func constructPath(opPath string, opType string, root string, res http.ResponseW } } else { valueErrMsg := "Error in getting data path - Invalid path (/) is used." - sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError) + sendErrorResponse(res, valueErrMsg, http.StatusBadRequest) log.Errorf(valueErrMsg) return nil } @@ -355,7 +426,7 @@ func constructOpStoragePath(op map[string]interface{}, root string, res http.Res opPath, opPathErr := op["path"].(string) if !opPathErr || len(opPath) == 0 { opPathErrMsg := "Error in getting data path. Path is not given in request body" - sendErrorResponse(res, opPathErrMsg, http.StatusInternalServerError) + sendErrorResponse(res, opPathErrMsg, http.StatusBadRequest) log.Errorf(opPathErrMsg) return nil } @@ -389,14 +460,7 @@ type NewOpaSDKPatchFunc func(ctx context.Context, patches []opasdk.PatchImpl) er var NewOpaSDKPatch NewOpaSDKPatchFunc = opasdk.PatchData -func patchData(root string, ops *[]map[string]interface{}, res http.ResponseWriter) (err error) { - root = "/" + strings.Trim(root, "/") - patchInfos, err := extractPatchInfoVar(res, ops, root) - if err != nil { - log.Warnf("Failed to extarct Patch Info") - return err - } - +func PatchData(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) (err error) { if patchInfos != nil { patchErr := NewOpaSDKPatch(context.Background(), patchInfos) if patchErr != nil { @@ -406,12 +470,16 @@ func patchData(root string, ops *[]map[string]interface{}, res http.ResponseWrit errCode = http.StatusNotFound } errMsg := "Error in updating data - " + patchErr.Error() - sendErrorResponse(res, errMsg, errCode) + if res != nil { + sendErrorResponse(res, errMsg, errCode) + } log.Errorf(errMsg) return patchErr } log.Infof("Updated the data in the corresponding path successfully\n") - res.WriteHeader(http.StatusNoContent) + if res != nil { + res.WriteHeader(http.StatusNoContent) + } } // handled all error scenarios in extractPatchInfo method return nil @@ -419,6 +487,7 @@ func patchData(root string, ops *[]map[string]interface{}, res http.ResponseWrit func sendErrorResponse(res http.ResponseWriter, errMsg string, statusCode int) { dataExc := createOPADataUpdateExceptionResponse(statusCode, errMsg, "") + metrics.IncrementDynamicDataUpdateFailureCount() metrics.IncrementTotalErrorCount() writeOPADataUpdateErrorJSONResponse(res, statusCode, errMsg, *dataExc) } @@ -500,7 +569,7 @@ func getData(res http.ResponseWriter, dataPath string) { dataResponse.Data = data } - res.Header().Set("Content-Type", "application/json") + res.Header().Set(consts.ContentType, consts.ApplicationJson) res.WriteHeader(http.StatusOK) if err := json.NewEncoder(res).Encode(dataResponse); err != nil { diff --git a/pkg/data/data-handler_test.go b/pkg/data/data-handler_test.go index 819f6d2..6cd5e5c 100644 --- a/pkg/data/data-handler_test.go +++ b/pkg/data/data-handler_test.go @@ -24,7 +24,9 @@ import ( "encoding/json" "errors" "fmt" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" openapi_types "github.com/oapi-codegen/runtime/types" + "github.com/open-policy-agent/opa/v1/storage" "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" @@ -166,7 +168,8 @@ func TestPatchData_failure(t *testing.T) { data = nil root := "/test" res := httptest.NewRecorder() - result := patchData(root, &data, res) + patchImpl, _ := extractPatchInfo(res, &data, root) + result := PatchData(patchImpl, res) assert.Nil(t, result) } @@ -182,7 +185,8 @@ func TestPatchData_storageFail(t *testing.T) { root := "/test" res := httptest.NewRecorder() - result := patchData(root, &data, res) + patchImpl, _ := extractPatchInfo(res, &data, root) + result := PatchData(patchImpl, res) assert.Equal(t, http.StatusNotFound, res.Code) assert.Error(t, result) } @@ -194,7 +198,7 @@ func Test_extractPatchInfo_OPTypefail(t *testing.T) { root := "/test" res := httptest.NewRecorder() extractPatchInfo(res, &data, root) - assert.Equal(t, http.StatusInternalServerError, res.Code) + assert.Equal(t, http.StatusBadRequest, res.Code) } func Test_extractPatchInfo_Pathfail(t *testing.T) { @@ -204,7 +208,7 @@ func Test_extractPatchInfo_Pathfail(t *testing.T) { root := "/test" res := httptest.NewRecorder() extractPatchInfo(res, &data, root) - assert.Equal(t, http.StatusInternalServerError, res.Code) + assert.Equal(t, http.StatusBadRequest, res.Code) } func Test_extractPatchInfo_valuefail(t *testing.T) { @@ -214,7 +218,7 @@ func Test_extractPatchInfo_valuefail(t *testing.T) { root := "/test" res := httptest.NewRecorder() extractPatchInfo(res, &data, root) - assert.Equal(t, http.StatusInternalServerError, res.Code) + assert.Equal(t, http.StatusBadRequest, res.Code) } func TestPatchData_success(t *testing.T) { @@ -229,7 +233,8 @@ func TestPatchData_success(t *testing.T) { root := "/test" res := httptest.NewRecorder() - patchData(root, &data, res) + patchImpl, _ := extractPatchInfo(res, &data, root) + PatchData(patchImpl, res) assert.Equal(t, http.StatusNoContent, res.Code) } @@ -420,6 +425,14 @@ func TestGetData(t *testing.T) { expectedStatus: http.StatusInternalServerError, expectedBody: "Error in getting data - internal server failure", }, + { + name: "Error- JSON ENcoding Failure", + requestURL: "/policy/pdpo/v1/datai/bad/json", + mockResponse: map[string]interface{}{"bad": make(chan int)}, + mockError: nil, + expectedStatus: http.StatusInternalServerError, + expectedBody: "Error in getting data - json: unsupported type: chan int", + }, } for _, tt := range tests { @@ -675,8 +688,6 @@ func Test_GetPolicyByIDFunc_Success(t *testing.T) { return policy, nil } - // Simulating HTTP request and response recorder - // req := httptest.NewRequest("GET", "/policy/test-policy", nil) res := httptest.NewRecorder() // Processing dirParts @@ -882,3 +893,350 @@ func TestPatchInfos_ExtractPatchInfo_Error(t *testing.T) { t.Errorf("expected patchInfos to be nil, got: %v", patchInfos) } } + +func TestHandleDynamicUpdateRequestWithKafka_KafkaDisabled_Error(t *testing.T) { + PatchDataVar = func(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error { + return errors.New("mock error") + } + + req := httptest.NewRecorder() + patchInfos := []opasdk.PatchImpl{{}} + + handleDynamicUpdateRequestWithKafka(patchInfos, req) + // Optionally assert on req.Body or req.Code if needed +} + +// --- Sample PatchImpl for testing --- +func samplePatchData() []opasdk.PatchImpl { + return []opasdk.PatchImpl{ + { + Path: storage.MustParsePath("/policy/config/name"), + Op: storage.ReplaceOp, + Value: "NewPolicyName", + }, + } +} + +var originalExtractPatchInfoVar = extractPatchInfoVar + +func TestGetPatchInfo_Success(t *testing.T) { + defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }() + + mockPatch := samplePatchData() + extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) { + return mockPatch, nil + } + + res := httptest.NewRecorder() + + data := &[]map[string]interface{}{{"key": "val"}} + patches, err := getPatchInfo(data, "/test/dir", res) + + assert.NoError(t, err) + assert.Equal(t, mockPatch, patches) +} +func TestGetPatchInfo_NilPatchInfos(t *testing.T) { + defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }() + + extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) { + return nil, nil + } + + res := httptest.NewRecorder() + + data := &[]map[string]interface{}{{"key": "val"}} + patches, err := getPatchInfo(data, "/test/dir", res) + + assert.Error(t, err) + assert.Nil(t, patches) +} +func TestGetPatchInfo_ExtractError(t *testing.T) { + defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }() + + extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) { + return nil, fmt.Errorf("mock error") + } + + data := &[]map[string]interface{}{{"key": "val"}} + res := httptest.NewRecorder() + + patches, err := getPatchInfo(data, "/test/dir", res) + + assert.Error(t, err) + assert.Nil(t, patches) +} + +func TestHandleDynamicUpdateRequestWithKafka_KafkaDisabled_Success(t *testing.T) { + // Set test version of PatchDataVar + var patchCalled bool + PatchDataVar = func(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error { + patchCalled = true + return nil + } + + req := httptest.NewRecorder() + patchInfos := []opasdk.PatchImpl{{}} + + handleDynamicUpdateRequestWithKafka(patchInfos, req) + + if patchCalled { + t.Errorf("Expected PatchData to be called") + } +} + +// MockKafkaProducer implements kafkacomm.KafkaProducerInterface. +type MockKafkaProducer struct { + ProduceCalled bool + ProducedMsg *kafka.Message + ProduceErr error + CloseCalled bool + FlushCalled bool + FlushTimeout int +} + +func (m *MockKafkaProducer) Produce(msg *kafka.Message, events chan kafka.Event) error { + m.ProduceCalled = true + m.ProducedMsg = msg + return m.ProduceErr +} + +func (m *MockKafkaProducer) Close() { m.CloseCalled = true } + +func (m *MockKafkaProducer) Flush(timeout int) int { + m.FlushCalled = true + m.FlushTimeout = timeout + return 0 +} + +// Test successful Produce through the interface +func TestHandleDynamicUpdateRequestWithKafka_ProduceSuccess(t *testing.T) { + // Arrange + patches := samplePatchData() + mockProd := &MockKafkaProducer{} + PatchProducer = mockProd + + resp := httptest.NewRecorder() + + // Act + err := handleDynamicUpdateRequestWithKafka(patches, resp) + + // Assert + assert.NoError(t, err) + assert.True(t, mockProd.ProduceCalled, "expected Produce to be called") + +} + +// Test nil interface returns initialization error testing.NamePreamble +func TestHandleDynamicUpdateRequestWithKafka_ProducerNil(t *testing.T) { + // Arrange: clear the global producer + PatchProducer = nil + + // Act + err := handleDynamicUpdateRequestWithKafka(nil, httptest.NewRecorder()) + + // Assert + assert.EqualError(t, err, "Failed to initialize Kafka producer") +} + +// Test Produce error is propagated testing.NamePreamble +func TestHandleDynamicUpdateRequestWithKafka_ProduceError(t *testing.T) { + // Arrange + mockProd := &MockKafkaProducer{ProduceErr: errors.New("produce failed")} + PatchProducer = mockProd + + // Act + err := handleDynamicUpdateRequestWithKafka(nil, httptest.NewRecorder()) + + // Assert + assert.EqualError(t, err, "produce failed") + assert.True(t, mockProd.ProduceCalled, "Produce should be called even on error") +} + +type errorWriter struct{} + +func (e *errorWriter) Header() http.Header { + return http.Header{} +} + +func (e *errorWriter) Write([]byte) (int, error) { + return 0, errors.New("write error") +} + +func (e *errorWriter) WriteHeader(statusCode int) {} + +func TestWriteOPADataUpdateErrorJSONResponse_EncodeFails(t *testing.T) { + mockRes := &errorWriter{} + + respMessage := "Failed to process" + respCode := oapicodegen.ErrorResponseResponseCode("500") + errorResp := oapicodegen.ErrorResponse{ + ErrorMessage: &respMessage, + ResponseCode: &respCode, + } + + // Call the function with the mock writer that fails on encode + writeOPADataUpdateErrorJSONResponse(mockRes, http.StatusInternalServerError, "fail", errorResp) + +} + +func TestConstructPath_BadPatchPath(t *testing.T) { + rec := httptest.NewRecorder() + storagePath := constructPath("???", "add", "/root", rec) + + assert.NotNil(t, storagePath) + assert.Equal(t, http.StatusOK, rec.Code) + assert.Contains(t, rec.Body.String(), "") +} + +func TestConstructPath_InvalidPath(t *testing.T) { + rec := httptest.NewRecorder() + storagePath := constructPath("", "add", "/root", rec) + + assert.Nil(t, storagePath) + assert.Equal(t, http.StatusBadRequest, rec.Code) + assert.True(t, strings.Contains(rec.Body.String(), "Invalid path")) +} +func TestConstructPath_RootSlash(t *testing.T) { + rec := httptest.NewRecorder() + storagePath := constructPath("sub/path", "add", "/", rec) + + assert.NotNil(t, storagePath) + assert.Equal(t, "/sub/path", storagePath.String()) +} + +func TestGetDataInfo_EmptyDataPath(t *testing.T) { + // Backup original function + originalOpaSDKGetDataInfo := NewOpaSDK + NewOpaSDK = func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error) { + assert.Equal(t, "/", dataPath) // Ensure "/" is passed + return nil, errors.New("storage_not_found_error") + } + defer func() { NewOpaSDK = originalOpaSDKGetDataInfo }() + + // Create a mock request with empty data path + req := httptest.NewRequest("GET", "/policy/pdpo/v1/data", nil) + res := httptest.NewRecorder() + + // Call the function under test + getDataInfo(res, req) + + // Validate response + assert.Equal(t, http.StatusNotFound, res.Code) + errorMessage := strings.TrimSpace(res.Body.String()) + assert.Contains(t, errorMessage, "storage_not_found_error") +} + +func TestDataHandler_GET_Success(t *testing.T) { + original := NewOpaSDK + defer func() { NewOpaSDK = original }() + + NewOpaSDK = func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error) { + assert.Equal(t, "/some/path", dataPath) + return &oapicodegen.OPADataResponse_Data{}, nil + } + + req := httptest.NewRequest(http.MethodGet, "/policy/pdpo/v1/data/some/path", nil) + w := httptest.NewRecorder() + + DataHandler(w, req) // <---- Only this + + res := w.Result() + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + t.Errorf("expected status 200 OK, got %d", res.StatusCode) + } +} + +func TestExtractPatchInfo_OperationTypeError(t *testing.T) { + // Arrange + reqOps := []map[string]interface{}{ + { + "op": "invalidOp", // simulate invalid op + }, + } + w := httptest.NewRecorder() + + // Mock + original := getOperationTypeVar + defer func() { getOperationTypeVar = original }() + getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) { + return nil, fmt.Errorf("forced error") // force error + } + + // Act + result, err := extractPatchInfo(w, &reqOps, "/root") + + // Assert + assert.Nil(t, result) + assert.Error(t, err) + assert.Contains(t, err.Error(), "operation type") +} + +func TestExtractPatchInfo_InvalidOpFieldType(t *testing.T) { + // Arrange + reqOps := []map[string]interface{}{ + { + "wrongField": "add", // no "op" field + }, + } + w := httptest.NewRecorder() + + // Act + result, err := extractPatchInfo(w, &reqOps, "/root") + + // Assert + assert.Nil(t, result) + assert.Error(t, err) + assert.Contains(t, err.Error(), "op type") +} + +func TestExtractPatchInfo_GetOperationTypeError(t *testing.T) { + // Arrange + reqOps := []map[string]interface{}{ + { + "op": "invalidOp", + }, + } + w := httptest.NewRecorder() + + // Mock getOperationTypeVar to simulate error + original := getOperationTypeVar + defer func() { getOperationTypeVar = original }() + getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) { + return nil, errors.New("mock getOperationType error") + } + + // Act + result, err := extractPatchInfo(w, &reqOps, "/root") + + // Assert + assert.Nil(t, result) + assert.Error(t, err) + assert.Contains(t, err.Error(), "operation type") +} + +func TestExtractPatchInfo_NilOpType(t *testing.T) { + // Arrange + reqOps := []map[string]interface{}{ + { + "op": "add", + }, + } + w := httptest.NewRecorder() + + // Mock getOperationTypeVar to return nil + original := getOperationTypeVar + defer func() { getOperationTypeVar = original }() + getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) { + return nil, nil // returning nil without error + } + + // Act + result, err := extractPatchInfo(w, &reqOps, "/root") + + // Assert + assert.Nil(t, result) + assert.Error(t, err) + assert.Contains(t, err.Error(), "opType is Missing") +} diff --git a/pkg/kafkacomm/handler/patch_message_handler.go b/pkg/kafkacomm/handler/patch_message_handler.go new file mode 100644 index 0000000..d537e61 --- /dev/null +++ b/pkg/kafkacomm/handler/patch_message_handler.go @@ -0,0 +1,73 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2025: Deutsche Telekom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== +// + +package handler + +import ( + "context" + "encoding/json" + "policy-opa-pdp/pkg/data" + "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/opasdk" +) + +type PatchMessage struct { + PatchInfos []opasdk.PatchImpl `json:"patchInfos"` +} + +// This function handles the incoming kafka messages and dispatches them futher for data patch processing. +func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error { + + log.Debug("Starting Patch Message Listener.....") + var stopConsuming bool + for !stopConsuming { + select { + case <-ctx.Done(): + log.Debug("Stopping PDP Listener.....") + return nil + stopConsuming = true ///Loop Exits + default: + message, err := kafkacomm.ReadKafkaMessages(kc) + if err != nil { + continue + } + log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message)) + + if message != nil { + var patchMsg PatchMessage + err = json.Unmarshal(message, &patchMsg) + if err != nil { + log.Warnf("Failed to UnMarshal Messages: %v\n", err) + continue + } + log.Debugf("Received patch request") + + if err := data.PatchDataVar(patchMsg.PatchInfos, nil); err != nil { + log.Debugf("patchData failed: %v", err) + } else { + log.Debugf("Successfully patched data") + } + } + } + + } + return nil + +} diff --git a/pkg/kafkacomm/handler/patch_message_handler_test.go b/pkg/kafkacomm/handler/patch_message_handler_test.go new file mode 100644 index 0000000..93114d9 --- /dev/null +++ b/pkg/kafkacomm/handler/patch_message_handler_test.go @@ -0,0 +1,153 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2025: Deutsche Telekom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== +// + +package handler + +import ( + "context" + "encoding/json" + "errors" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/open-policy-agent/opa/v1/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "net/http" + "policy-opa-pdp/pkg/data" + "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/opasdk" + "testing" + "time" +) + +// --- Sample PatchImpl for testing --- +func samplePatchData() []opasdk.PatchImpl { + return []opasdk.PatchImpl{ + { + Path: storage.MustParsePath("/policy/config/name"), + Op: storage.ReplaceOp, + Value: "NewPolicyName", + }, + } +} + +var originalPatchDataVar = data.PatchDataVar + +func TestPatchMessageHandler_Success(t *testing.T) { + defer func() { data.PatchDataVar = originalPatchDataVar }() + + // Mock PatchDataVar to simulate success + data.PatchDataVar = func(patchInfos []opasdk.PatchImpl, _ http.ResponseWriter) error { + return nil + } + + msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()}) + + mockKafkaMessage := &kafka.Message{ + Value: []byte(msgBytes), + } + mockConsumer := new(MockKafkaConsumer) + mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil) + + mockKafkaConsumer := &kafkacomm.KafkaConsumer{ + Consumer: mockConsumer, + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic") + assert.NoError(t, err) + mockConsumer.AssertExpectations(t) +} + +func TestPatchMessageHandler_PatchFail(t *testing.T) { + defer func() { data.PatchDataVar = originalPatchDataVar }() + + data.PatchDataVar = func(patchInfos []opasdk.PatchImpl, _ http.ResponseWriter) error { + return errors.New("mock failure") + } + + msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()}) + + mockKafkaMessage := &kafka.Message{ + Value: []byte(msgBytes), + } + + mockConsumer := new(MockKafkaConsumer) + mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil) + + mockKafkaConsumer := &kafkacomm.KafkaConsumer{ + Consumer: mockConsumer, + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic") + assert.NoError(t, err) + mockConsumer.AssertExpectations(t) +} + +func TestPatchMessageHandler_ReadError(t *testing.T) { + defer func() { data.PatchDataVar = originalPatchDataVar }() + + mockConsumer := new(MockKafkaConsumer) + mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")). + Return(nil, errors.New("read error")) + + mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer} + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic") + assert.NoError(t, err) + mockConsumer.AssertExpectations(t) +} + +func TestPatchMessageHandler_UnmarshalFail(t *testing.T) { + defer func() { data.PatchDataVar = originalPatchDataVar }() + + invalidJSON := []byte(`invalid json`) + mockKafkaMessage := &kafka.Message{Value: invalidJSON} + + mockConsumer := new(MockKafkaConsumer) + mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil) + + mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer} + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic") + assert.NoError(t, err) + mockConsumer.AssertExpectations(t) +} + +func TestPatchMessageHandler_ContextDone(t *testing.T) { + mockConsumer := new(MockKafkaConsumer) + mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer} + + // Context is cancelled immediately + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic") + assert.NoError(t, err) +} diff --git a/pkg/kafkacomm/handler/pdp_message_handler_test.go b/pkg/kafkacomm/handler/pdp_message_handler_test.go index 594498d..7a3b0af 100644 --- a/pkg/kafkacomm/handler/pdp_message_handler_test.go +++ b/pkg/kafkacomm/handler/pdp_message_handler_test.go @@ -35,7 +35,7 @@ import ( ) type KafkaConsumerInterface interface { - ReadMessage() ([]byte, error) + ReadMessage(time.Duration) ([]byte, error) ReadKafkaMessages() ([]byte, error) } @@ -43,17 +43,23 @@ type MockKafkaConsumer struct { mock.Mock } -func (m *MockKafkaConsumer) Unsubscribe() { - m.Called() +func (m *MockKafkaConsumer) Unsubscribe() error { + args := m.Called() + return args.Error(0) } -func (m *MockKafkaConsumer) Close() { - m.Called() +func (m *MockKafkaConsumer) Close() error { + args := m.Called() + return args.Error(0) } -func (m *MockKafkaConsumer) ReadMessage(kc *kafkacomm.KafkaConsumer) ([]byte, error) { - args := m.Called(kc) - return args.Get(0).([]byte), args.Error(0) +func (m *MockKafkaConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { + args := m.Called(timeout) + msg := args.Get(0) + if msg == nil { + return nil, args.Error(1) + } + return msg.(*kafka.Message), args.Error(1) } func (m *MockKafkaConsumer) pdpUpdateMessageHandler(msg string) error { diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go index 5c07ddd..54d33be 100644 --- a/pkg/kafkacomm/handler/pdp_update_deploy_policy.go +++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy.go @@ -221,7 +221,6 @@ func extractAndDecodeData(policy model.ToscaPolicy) (map[string]string, []string return decodedData, keys, nil } - // upsert policy to sdk. func upsertPolicy(policy model.ToscaPolicy) error { decodedContent, keys, _ := extractAndDecodePoliciesVar(policy) diff --git a/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go b/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go index 47efdef..8ca0c29 100644 --- a/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go +++ b/pkg/kafkacomm/handler/pdp_update_deploy_policy_test.go @@ -22,6 +22,7 @@ import ( "context" "encoding/base64" "errors" + "fmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -35,7 +36,6 @@ import ( "policy-opa-pdp/pkg/utils" "strings" "testing" - "fmt" ) func TestValidatePackageName(t *testing.T) { diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go index 77475d6..550eb98 100644 --- a/pkg/kafkacomm/handler/pdp_update_message_handler.go +++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go @@ -52,7 +52,6 @@ var ( handlePdpUpdateDeploymentVar handlePdpUpdateDeploymentFunc = handlePdpUpdateDeployment handlePdpUpdateUndeploymentVar handlePdpUpdateUndeploymentFunc = handlePdpUpdateUndeployment sendPDPStatusResponseFunc = sendPDPStatusResponse - ) // Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP). @@ -140,8 +139,8 @@ func handlePdpUpdateDeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatusS failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|") resMessage := fmt.Errorf("PDP Update Failed as failed to format successfullyDeployedPolicies json %v", failureMessages) if err = sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil { - log.Debugf("Failed to send update internal map error response: %v", err) - return "", err, failureMessages + log.Debugf("Failed to send update internal map error response: %v", err) + return "", err, failureMessages } } @@ -167,8 +166,8 @@ func handlePdpUpdateUndeployment(pdpUpdate model.PdpUpdate, p publisher.PdpStatu failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|") resMessage := fmt.Errorf("PDP Update Failed as failed to format successfullyUnDeployedPolicies json %v", failureMessages) if err = sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil { - log.Debugf("Failed to send update error response: %v", err) - return "", err, failureMessages + log.Debugf("Failed to send update error response: %v", err) + return "", err, failureMessages } } } diff --git a/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go b/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go index 9936c6c..9b0412f 100644 --- a/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go +++ b/pkg/kafkacomm/handler/pdp_update_undeploy_policy_test.go @@ -320,10 +320,10 @@ func TestPolicyUndeploymentAction(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Set mock behavior removePolicyFromSdkandDirFunc = func(policy map[string]interface{}) []string { - return tt.mockPolicyErrors + return tt.mockPolicyErrors } removeDataFromSdkandDirFunc = func(policy map[string]interface{}) []string { - return tt.mockDataErrors + return tt.mockDataErrors } // Call the function under test @@ -395,58 +395,58 @@ func TestCountChildKeysFromJSON(t *testing.T) { name: "Empty JSON", input: map[string]interface{}{}, expected: map[string]int{ - // No child nodes + // No child nodes }, }, { name: "Single Level JSON", input: map[string]interface{}{ - "key1": map[string]interface{}{ - "child1": "value1", - "child2": "value2", - }, - "key2": map[string]interface{}{ - "childA": "valueA", - }, + "key1": map[string]interface{}{ + "child1": "value1", + "child2": "value2", + }, + "key2": map[string]interface{}{ + "childA": "valueA", + }, }, expected: map[string]int{ - "node/key1": 2, // key1 has 2 children - "node/key2": 1, // key2 has 1 child + "node/key1": 2, // key1 has 2 children + "node/key2": 1, // key2 has 1 child }, }, { name: "Nested JSON", input: map[string]interface{}{ - "root": map[string]interface{}{ - "level1": map[string]interface{}{ - "level2": map[string]interface{}{ - "child1": "value1", - "child2": "value2", - }, - }, - }, + "root": map[string]interface{}{ + "level1": map[string]interface{}{ + "level2": map[string]interface{}{ + "child1": "value1", + "child2": "value2", + }, + }, + }, }, expected: map[string]int{ - "node/root": 1, // root has 1 child (level1) - "node/root/level1": 1, // level1 has 1 child (level2) - "node/root/level1/level2": 2, // level2 has 2 children + "node/root": 1, // root has 1 child (level1) + "node/root/level1": 1, // level1 has 1 child (level2) + "node/root/level1/level2": 2, // level2 has 2 children }, }, { name: "Mixed Data Types", input: map[string]interface{}{ - "parent": map[string]interface{}{ - "child1": "string", - "child2": 42, - "child3": map[string]interface{}{ - "subchild1": true, - "subchild2": nil, - }, - }, + "parent": map[string]interface{}{ + "child1": "string", + "child2": 42, + "child3": map[string]interface{}{ + "subchild1": true, + "subchild2": nil, + }, + }, }, expected: map[string]int{ - "node/parent": 3, // parent has 3 children - "node/parent/child3": 2, // child3 has 2 children + "node/parent": 3, // parent has 3 children + "node/parent/child3": 2, // child3 has 2 children }, }, } @@ -455,7 +455,7 @@ func TestCountChildKeysFromJSON(t *testing.T) { t.Run(tt.name, func(t *testing.T) { got := countChildKeysFromJSON(tt.input) if !reflect.DeepEqual(got, tt.expected) { - t.Errorf("countChildKeysFromJSON() = %v, expected %v", got, tt.expected) + t.Errorf("countChildKeysFromJSON() = %v, expected %v", got, tt.expected) } }) } @@ -536,10 +536,10 @@ func TestAnalyzeHierarchy(t *testing.T) { result, err := analyzeHierarchy(tt.parentDataJson, tt.dataPath) if tt.expectedErr { - assert.Error(t, err) + assert.Error(t, err) } else { - assert.NoError(t, err) - assert.Equal(t, tt.expectedPath, result) + assert.NoError(t, err) + assert.Equal(t, tt.expectedPath, result) } }) } @@ -580,7 +580,7 @@ func TestAnalyseEmptyParentNodes(t *testing.T) { name: "Success - Valid Parent Data Exists", inputPath: "/parent/child", mockResponse: map[string]interface{}{ - "child": "data", + "child": "data", }, mockError: nil, expectedOutput: "/parent/child", @@ -601,13 +601,13 @@ func TestAnalyseEmptyParentNodes(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Mock function behavior opasdkGetData = func(ctx context.Context, dataPath string) (*oapicodegen.OPADataResponse_Data, error) { - if tt.mockResponse != nil { - jsonData, _ := json.Marshal(tt.mockResponse) - var resData oapicodegen.OPADataResponse_Data - _ = json.Unmarshal(jsonData, &resData) - return &resData, tt.mockError - } - return nil, tt.mockError + if tt.mockResponse != nil { + jsonData, _ := json.Marshal(tt.mockResponse) + var resData oapicodegen.OPADataResponse_Data + _ = json.Unmarshal(jsonData, &resData) + return &resData, tt.mockError + } + return nil, tt.mockError } // Call function @@ -615,10 +615,10 @@ func TestAnalyseEmptyParentNodes(t *testing.T) { // Validate results if tt.expectError { - assert.Error(t, err, "Expected error but got none") + assert.Error(t, err, "Expected error but got none") } else { - assert.NoError(t, err, "Expected no error but got one") - assert.Equal(t, tt.expectedOutput, output, "Unexpected output") + assert.NoError(t, err, "Expected no error but got one") + assert.Equal(t, tt.expectedOutput, output, "Unexpected output") } }) } @@ -679,13 +679,13 @@ func TestProcessDataDeletionFromSdkAndDir(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Mock function variables analyseEmptyParentNodesFunc = func(dataPath string) (string, error) { - return dataPath, tt.mockAnalyseErr + return dataPath, tt.mockAnalyseErr } deleteDataSdkFunc = func(ctx context.Context, dataPath string) error { - return tt.mockDeleteErr + return tt.mockDeleteErr } removeDataDirectoryFunc = func(keyPath string) error { - return tt.mockRemoveErr + return tt.mockRemoveErr } // Call function @@ -693,7 +693,7 @@ func TestProcessDataDeletionFromSdkAndDir(t *testing.T) { // Normalize nil vs empty slice if failureMessages == nil { - failureMessages = []string{} + failureMessages = []string{} } // Validate results @@ -724,22 +724,22 @@ func TestRemoveDataFromSdkandDir(t *testing.T) { { name: "Valid data keys", policy: map[string]interface{}{ - "data": []interface{}{"policy.rule1", "policy.rule2"}, + "data": []interface{}{"policy.rule1", "policy.rule2"}, }, expectedFailures: nil, }, { name: "Invalid data key type", policy: map[string]interface{}{ - "data": []interface{}{"policy.rule1", 123}, // Invalid integer key + "data": []interface{}{"policy.rule1", 123}, // Invalid integer key }, expectedFailures: []string{"Invalid Key :123"}, }, { name: "Invalid JSON structure", policy: map[string]interface{}{ - "policyId": "test-policy", - "policyVersion": "1.0", + "policyId": "test-policy", + "policyVersion": "1.0", }, expectedFailures: []string{": Invalid JSON structure: 'data' is missing or not an array"}, }, @@ -747,7 +747,7 @@ func TestRemoveDataFromSdkandDir(t *testing.T) { { name: "Deletion failure", policy: map[string]interface{}{ - "data": []interface{}{"invalid.path"}, + "data": []interface{}{"invalid.path"}, }, expectedFailures: []string{"Failed to delete: /invalid/path"}, }, diff --git a/pkg/kafkacomm/pdp_topic_consumer.go b/pkg/kafkacomm/pdp_topic_consumer.go index a16bd4a..ae56e21 100644 --- a/pkg/kafkacomm/pdp_topic_consumer.go +++ b/pkg/kafkacomm/pdp_topic_consumer.go @@ -25,14 +25,12 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "policy-opa-pdp/cfg" "policy-opa-pdp/pkg/log" - "sync" "time" ) var ( // Declare a global variable to hold the singleton KafkaConsumer consumerInstance *KafkaConsumer - consumerOnce sync.Once // sync.Once ensures that the consumer is created only once ) // KafkaConsumerInterface defines the interface for a Kafka consumer. @@ -76,63 +74,59 @@ type KafkaNewConsumerFunc func(*kafka.ConfigMap) (*kafka.Consumer, error) var KafkaNewConsumer KafkaNewConsumerFunc = kafka.NewConsumer // NewKafkaConsumer creates a new Kafka consumer and returns -func NewKafkaConsumer() (*KafkaConsumer, error) { +func NewKafkaConsumer(topic string, groupid string) (*KafkaConsumer, error) { // Initialize the consumer instance only once - consumerOnce.Do(func() { - log.Debugf("Creating Kafka Consumer singleton instance") - brokers := cfg.BootstrapServer - groupid := cfg.GroupId - topic := cfg.Topic - useSASL := cfg.UseSASLForKAFKA - username := cfg.KAFKA_USERNAME - password := cfg.KAFKA_PASSWORD - - // Add Kafka connection properties - configMap := &kafka.ConfigMap{ - "bootstrap.servers": brokers, - "group.id": groupid, - "auto.offset.reset": "latest", - } - fmt.Print(configMap) - // If SASL is enabled, add SASL properties - if useSASL == "true" { - configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104 - configMap.SetKey("sasl.username", username) // #nosec G104 - configMap.SetKey("sasl.password", password) // #nosec G104 - configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104 - configMap.SetKey("fetch.max.bytes", 50*1024*1024) // #nosec G104 - configMap.SetKey("max.partition.fetch.bytes", 50*1024*1024) // #nosec G104 - configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) // #nosec G104 - configMap.SetKey("session.timeout.ms", "30000") // #nosec G104 - configMap.SetKey("max.poll.interval.ms", "300000") // #nosec G104 - configMap.SetKey("enable.partition.eof", true) // #nosec G104 - configMap.SetKey("enable.auto.commit", true) // #nosec G104 - // configMap.SetKey("debug", "all") // Uncomment for debug - } + log.Debugf("Creating Kafka Consumer singleton instance") + brokers := cfg.BootstrapServer + useSASL := cfg.UseSASLForKAFKA + username := cfg.KAFKA_USERNAME + password := cfg.KAFKA_PASSWORD + + // Add Kafka connection properties + configMap := &kafka.ConfigMap{ + "bootstrap.servers": brokers, + "group.id": groupid, + "auto.offset.reset": "latest", + } + fmt.Print(configMap) + // If SASL is enabled, add SASL properties + if useSASL == "true" { + configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104 + configMap.SetKey("sasl.username", username) // #nosec G104 + configMap.SetKey("sasl.password", password) // #nosec G104 + configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104 + configMap.SetKey("fetch.max.bytes", 50*1024*1024) // #nosec G104 + configMap.SetKey("max.partition.fetch.bytes", 50*1024*1024) // #nosec G104 + configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) // #nosec G104 + configMap.SetKey("session.timeout.ms", "30000") // #nosec G104 + configMap.SetKey("max.poll.interval.ms", "300000") // #nosec G104 + configMap.SetKey("enable.partition.eof", true) // #nosec G104 + configMap.SetKey("enable.auto.commit", true) // #nosec G104 + // configMap.SetKey("debug", "all") // Uncomment for debug + } - // Create a new Kafka consumer - consumer, err := KafkaNewConsumer(configMap) - if err != nil { - log.Warnf("Error creating consumer: %v", err) - return - } - if consumer == nil { - log.Warnf("Kafka Consumer is nil after creation") - return - } + // Create a new Kafka consumer + consumer, err := KafkaNewConsumer(configMap) + if err != nil { + log.Warnf("Error creating consumer: %v", err) + return nil, fmt.Errorf("error creating consumer: %w", err) + } + if consumer == nil { + log.Warnf("Kafka Consumer is nil after creation") + return nil, fmt.Errorf("Kafka Consumer is nil after creation") + } - // Subscribe to the topic - err = consumer.SubscribeTopics([]string{topic}, nil) - if err != nil { - log.Warnf("Error subscribing to topic: %v", err) - return - } - log.Debugf("Topic Subscribed: %v", topic) + // Subscribe to the topic + err = consumer.SubscribeTopics([]string{topic}, nil) + if err != nil { + log.Warnf("Error subscribing to topic: %v", err) + return nil, fmt.Errorf("error subscribing to topic: %w", err) + } + log.Debugf("Topic Subscribed: %v", topic) - // Assign the consumer instance - consumerInstance = &KafkaConsumer{Consumer: consumer} - log.Debugf("Created SIngleton consumer instance") - }) + // Assign the consumer instance + consumerInstance = &KafkaConsumer{Consumer: consumer} + log.Debugf("Created SIngleton consumer instance") // Return the singleton consumer instance if consumerInstance == nil { diff --git a/pkg/kafkacomm/pdp_topic_consumer_test.go b/pkg/kafkacomm/pdp_topic_consumer_test.go index 36a2aa4..48575df 100644 --- a/pkg/kafkacomm/pdp_topic_consumer_test.go +++ b/pkg/kafkacomm/pdp_topic_consumer_test.go @@ -27,7 +27,6 @@ import ( "github.com/stretchr/testify/mock" "policy-opa-pdp/cfg" "policy-opa-pdp/pkg/kafkacomm/mocks" - "sync" "testing" ) @@ -58,7 +57,7 @@ func TestNewKafkaConsumer_SASLTest(t *testing.T) { mockConsumer := new(MockKafkaConsumer) - consumer, err := NewKafkaConsumer() + consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId) assert.NoError(t, err) assert.NotNil(t, consumer) @@ -69,7 +68,7 @@ func TestNewKafkaConsumer(t *testing.T) { // Assuming configuration is correctly loaded from cfg package // You can mock or override cfg values here if needed - consumer, err := NewKafkaConsumer() + consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId) assert.NoError(t, err, "Expected no error when creating Kafka consumer") assert.NotNil(t, consumer, "Expected a non-nil KafkaConsumer") @@ -191,7 +190,6 @@ func TestKafkaConsumer_Unsubscribe_Nil_Error(t *testing.T) { // Helper function to reset func resetKafkaConsumerSingleton() { - consumerOnce = sync.Once{} consumerInstance = nil } @@ -203,9 +201,9 @@ func TestNewKafkaConsumer_ErrorCreatingConsumer(t *testing.T) { return nil, fmt.Errorf("mock error creating consumer") } - consumer, err := NewKafkaConsumer() + consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId) assert.Nil(t, consumer) - assert.EqualError(t, err, "Kafka Consumer instance not created") + assert.EqualError(t, err, "error creating consumer: mock error creating consumer") KafkaNewConsumer = originalNewKafkaConsumer } @@ -217,8 +215,8 @@ func TestNewKafkaConsumer_NilConsumer(t *testing.T) { return nil, nil } - consumer, err := NewKafkaConsumer() + consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId) assert.Nil(t, consumer) - assert.EqualError(t, err, "Kafka Consumer instance not created") + assert.EqualError(t, err, "Kafka Consumer is nil after creation") KafkaNewConsumer = originalNewKafkaConsumer } diff --git a/pkg/kafkacomm/pdp_topic_producer.go b/pkg/kafkacomm/pdp_topic_producer.go index 8685a34..bc8ce42 100644 --- a/pkg/kafkacomm/pdp_topic_producer.go +++ b/pkg/kafkacomm/pdp_topic_producer.go @@ -25,7 +25,6 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "log" "policy-opa-pdp/cfg" - "sync" ) type KafkaProducerInterface interface { @@ -43,7 +42,6 @@ type KafkaProducer struct { var ( instance *KafkaProducer - once sync.Once ) // GetKafkaProducer initializes and returns a KafkaProducer instance which is a singleton. @@ -51,43 +49,46 @@ var ( // If SASL authentication is enabled via the configuration, the necessary credentials // are set in the producer configuration. // -//nolint:gosec + func GetKafkaProducer(bootstrapServers, topic string) (*KafkaProducer, error) { var err error - once.Do(func() { - brokers := cfg.BootstrapServer - useSASL := cfg.UseSASLForKAFKA - username := cfg.KAFKA_USERNAME - password := cfg.KAFKA_PASSWORD - - // Add Kafka Connection Properties .... - configMap := &kafka.ConfigMap{ - "bootstrap.servers": brokers, - } + instance, err = initializeKafkaProducer(topic) + return instance, err +} - if useSASL == "true" { - configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104 - configMap.SetKey("sasl.username", username) // #nosec G104 - configMap.SetKey("sasl.password", password) // #nosec G104 - configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104 - } +//nolint:gosec +func initializeKafkaProducer(topic string) (*KafkaProducer, error) { + brokers := cfg.BootstrapServer + useSASL := cfg.UseSASLForKAFKA + username := cfg.KAFKA_USERNAME + password := cfg.KAFKA_PASSWORD - p, err := kafka.NewProducer(configMap) - if err != nil { - return - } - instance = &KafkaProducer{ - producer: p, - topic: topic, - } + configMap := &kafka.ConfigMap{ + "bootstrap.servers": brokers, + } - }) - return instance, err + if useSASL == "true" { + configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104 + configMap.SetKey("sasl.username", username) // #nosec G104 + configMap.SetKey("sasl.password", password) // #nosec G104 + configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104 + } + + p, err := kafka.NewProducer(configMap) + if err != nil { + return nil, err + } + + return &KafkaProducer{ + producer: p, + topic: topic, + }, nil } // Produce sends a message to the configured Kafka topic. // It takes the message payload as a byte slice and returns any errors func (kp *KafkaProducer) Produce(kafkaMessage *kafka.Message, eventChan chan kafka.Event) error { + log.Println("KafkaProducer or producer produce message") if kafkaMessage.TopicPartition.Topic == nil { kafkaMessage.TopicPartition = kafka.TopicPartition{ Topic: &kp.topic, diff --git a/pkg/kafkacomm/publisher/patch_message_publisher.go b/pkg/kafkacomm/publisher/patch_message_publisher.go new file mode 100644 index 0000000..2be1655 --- /dev/null +++ b/pkg/kafkacomm/publisher/patch_message_publisher.go @@ -0,0 +1,70 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2025: Deutsche Telekom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== +// + +package publisher + +import ( + "encoding/json" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "policy-opa-pdp/cfg" + "policy-opa-pdp/pkg/kafkacomm" + "policy-opa-pdp/pkg/log" + "policy-opa-pdp/pkg/opasdk" +) + +type RealPatchSender struct { + Producer kafkacomm.KafkaProducerInterface +} + +type PatchKafkaPayload struct { + PatchInfos []opasdk.PatchImpl `json:"patchInfos"` +} + +func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error { + log.Debugf("In SendPatchMessage") + var topic string + topic = cfg.PatchTopic + kafkaPayload := PatchKafkaPayload{ + PatchInfos: patchInfos, + } + + jsonMessage, err := json.Marshal(kafkaPayload) + if err != nil { + log.Warnf("failed to marshal Patch Payload to JSON: %v", err) + return err + } + + kafkaMessage := &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topic, + Partition: kafka.PartitionAny, + }, + Value: jsonMessage, + } + var eventChan chan kafka.Event = nil + err = s.Producer.Produce(kafkaMessage, eventChan) + if err != nil { + log.Warnf("Error producing message: %v\n", err) + return err + } else { + log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage)) + } + + return nil +} diff --git a/pkg/kafkacomm/publisher/patch_message_publisher_test.go b/pkg/kafkacomm/publisher/patch_message_publisher_test.go new file mode 100644 index 0000000..bff781d --- /dev/null +++ b/pkg/kafkacomm/publisher/patch_message_publisher_test.go @@ -0,0 +1,109 @@ +// - +// ========================LICENSE_START================================= +// Copyright (C) 2025: Deutsche Telekom +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== +// + +package publisher + +import ( + "encoding/json" + "errors" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/open-policy-agent/opa/v1/storage" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "policy-opa-pdp/pkg/opasdk" + "testing" +) + +// --- Sample PatchImpl for testing --- +func samplePatchData() []opasdk.PatchImpl { + return []opasdk.PatchImpl{ + { + Path: storage.MustParsePath("/policy/config/name"), + Op: storage.ReplaceOp, + Value: "NewPolicyName", + }, + } +} + +// --- Helper to get mock sender --- +func getMockSender() (*RealPatchSender, *MockKafkaProducer) { + mockProducer := new(MockKafkaProducer) + sender := &RealPatchSender{ + Producer: mockProducer, + } + return sender, mockProducer +} + +// --- Test: Successful message send --- +func TestSendPatchMessage_Success(t *testing.T) { + sender, mockProducer := getMockSender() + + mockProducer.On("Produce", mock.Anything).Return(nil) + + err := sender.SendPatchMessage(samplePatchData()) + assert.NoError(t, err) + mockProducer.AssertExpectations(t) +} + +// --- Test: Kafka produce failure --- +func TestSendPatchMessage_ProduceError(t *testing.T) { + sender, mockProducer := getMockSender() + + mockProducer.On("Produce", mock.Anything).Return(errors.New("kafka error")) + + err := sender.SendPatchMessage(samplePatchData()) + assert.Error(t, err) + assert.EqualError(t, err, "kafka error") + mockProducer.AssertExpectations(t) +} + +// --- Test: JSON marshal error --- +func TestSendPatchMessage_MarshalError(t *testing.T) { + sender, _ := getMockSender() + + badData := []opasdk.PatchImpl{ + { + Path: storage.MustParsePath("/invalid"), + Op: storage.AddOp, + Value: make(chan int), // JSON marshal fails on channels + }, + } + + err := sender.SendPatchMessage(badData) + assert.Error(t, err) + assert.Contains(t, err.Error(), "json: unsupported type: chan int") +} + +// --- Test: Validate payload content --- +func TestSendPatchMessage_PayloadContent(t *testing.T) { + sender, mockProducer := getMockSender() + + mockProducer.On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool { + var payload PatchKafkaPayload + err := json.Unmarshal(msg.Value, &payload) + return err == nil && + len(payload.PatchInfos) == 1 && + payload.PatchInfos[0].Path.String() == "/policy/config/name" && + payload.PatchInfos[0].Value == "NewPolicyName" + })).Return(nil) + + err := sender.SendPatchMessage(samplePatchData()) + assert.NoError(t, err) + mockProducer.AssertExpectations(t) +} diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go index da4888f..71f87ec 100644 --- a/pkg/kafkacomm/publisher/pdp-heartbeat.go +++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go @@ -24,6 +24,7 @@ package publisher import ( "fmt" + "github.com/google/uuid" "policy-opa-pdp/consts" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/model" @@ -32,7 +33,6 @@ import ( "policy-opa-pdp/pkg/policymap" "sync" "time" - "github.com/google/uuid" ) var ( diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go index 7a8fe55..ece8b6f 100644 --- a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go +++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go @@ -1,30 +1,32 @@ // - -// ========================LICENSE_START================================= -// Copyright (C) 2024-2025: Deutsche Telekom // -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +// ========================LICENSE_START================================= +// Copyright (C) 2024-2025: Deutsche Telekom // -// http://www.apache.org/licenses/LICENSE-2.0 +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at // -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// SPDX-License-Identifier: Apache-2.0 -// ========================LICENSE_END=================================== +// http://www.apache.org/licenses/LICENSE-2.0 // +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// SPDX-License-Identifier: Apache-2.0 +// ========================LICENSE_END=================================== package publisher + import ( "errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" "policy-opa-pdp/pkg/policymap" "testing" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) + /* Success Case 1 TestStartHeartbeatIntervalTimer_ValidInterval @@ -46,6 +48,7 @@ func TestStartHeartbeatIntervalTimer_ValidInterval(t *testing.T) { t.Errorf("Expected currentInterval to be %d, got %d", intervalMs, currentInterval) } } + /* Failure Case 1 TestStartHeartbeatIntervalTimer_InvalidInterval @@ -64,6 +67,7 @@ func TestStartHeartbeatIntervalTimer_InvalidInterval(t *testing.T) { t.Log("Expected ticker to be nil for invalid interval") } } + /* TestSendPDPHeartBeat_Success 2 Description: Test sending a heartbeat successfully. @@ -76,6 +80,7 @@ func TestSendPDPHeartBeat_Success(t *testing.T) { err := sendPDPHeartBeat(mockSender) assert.NoError(t, err) } + /* TestSendPDPHeartBeat_Failure 2 Description: Test failing to send a heartbeat. @@ -89,6 +94,7 @@ func TestSendPDPHeartBeat_Failure(t *testing.T) { err := sendPDPHeartBeat(mockSender) assert.Error(t, err) } + /* TestsendPDPHeartBeat_Success 3 Description: Test sending a heartbeat successfully with some deployed policies. @@ -106,6 +112,7 @@ func TestSendPDPHeartBeat_SuccessSomeDeployedPolicies(t *testing.T) { err := sendPDPHeartBeat(mockSender) assert.NoError(t, err) } + /* TestsendPDPHeartBeat_Success 4 Description: Test sending a heartbeat successfully with no deployed policies. @@ -123,6 +130,7 @@ func TestSendPDPHeartBeat_SuccessNoDeployedPolicies(t *testing.T) { err := sendPDPHeartBeat(mockSender) assert.NoError(t, err) } + /* TestStopTicker_Success 3 Description: Test stopping the ticker. @@ -141,6 +149,7 @@ func TestStopTicker_Success(t *testing.T) { t.Errorf("Expected ticker to be nil") } } + /* TestStopTicker_NotRunning 3 Description: Test stopping the ticker when it is not running. diff --git a/pkg/kafkacomm/publisher/pdp-status-publisher.go b/pkg/kafkacomm/publisher/pdp-status-publisher.go index a9aa6e0..801db2d 100644 --- a/pkg/kafkacomm/publisher/pdp-status-publisher.go +++ b/pkg/kafkacomm/publisher/pdp-status-publisher.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -35,7 +35,7 @@ import ( "github.com/google/uuid" ) -type( +type ( SendPdpUpdateResponseFunc func(s PdpStatusSender, pdpUpdate *model.PdpUpdate, resMessage string) error ) diff --git a/pkg/metrics/counters.go b/pkg/metrics/counters.go index b2b03d8..cf42a98 100644 --- a/pkg/metrics/counters.go +++ b/pkg/metrics/counters.go @@ -20,9 +20,10 @@ package metrics import ( - "sync" - "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus" + "sync" ) + // global counter variables var TotalErrorCount int64 var DecisionSuccessCount int64 @@ -32,24 +33,61 @@ var DeploySuccessCount int64 var UndeployFailureCount int64 var UndeploySuccessCount int64 var TotalPoliciesCount int64 +var DynamicDataUpdateSuccessCount int64 +var DynamicDataUpdateFailureCount int64 var mu sync.Mutex -//Decision and Data counters to be used in prometheus +// Decision and Data counters to be used in prometheus var ( - DecisionResponseTime = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "opa_decision_response_time_seconds", - Help: "Response time of OPA decision handler", + DecisionResponseTime_Prom = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "opa_decision_response_time_seconds", + Help: "Response time of OPA decision handler", + }) + DataResponseTime_Prom = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "opa_data_response_time_seconds", + Help: "Response time of OPA data handler", + }) + DecisionHandlerCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pdpo_policy_decisions_total", + Help: "Total Number of Decision Handler hits for OPA", + }) + DeploymentSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pdpo_policy_deployments_total", + Help: "Total Number of Successful Deployment for OPA", + }) + DeploymentFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pdpo_policy_failures_total", + Help: "Total Number of Deployment Failures for OPA", + }) + DynamicDataUpdatesSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pdpo_dynamic_data_success_total", + Help: "Total Number of Successful Dynamic Data Updates for OPA", + }) + DynamicDataUpdatesFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pdpo_dynamic_data_failures_total", + Help: "Total Number of Failed Dynamic Data Updates for OPA", }) - DataResponseTime = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "opa_data_response_time_seconds", - Help: "Response time of OPA data handler", + UndeploymentSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pdpo_policy_undeployments_success_total", + Help: "Total Number of Successful Deployment for OPA", + }) + UndeploymentFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "pdpo_policy_undeployments_failures_total", + Help: "Total Number of Deployment Failures for OPA", }) ) -//register counters in init +// register counters in init func init() { - prometheus.MustRegister(DecisionResponseTime) - prometheus.MustRegister(DataResponseTime) + prometheus.MustRegister(DecisionResponseTime_Prom) + prometheus.MustRegister(DataResponseTime_Prom) + prometheus.MustRegister(DecisionHandlerCount_Prom) + prometheus.MustRegister(DeploymentSuccessCount_Prom) + prometheus.MustRegister(DeploymentFailureCount_Prom) + prometheus.MustRegister(DynamicDataUpdatesSuccessCount_Prom) + prometheus.MustRegister(DynamicDataUpdatesFailureCount_Prom) + prometheus.MustRegister(UndeploymentSuccessCount_Prom) + prometheus.MustRegister(UndeploymentFailureCount_Prom) } // Increment counter @@ -66,10 +104,39 @@ func totalErrorCountRef() *int64 { return &TotalErrorCount } +func IncrementDynamicDataUpdateSuccessCount() { + mu.Lock() + DynamicDataUpdateSuccessCount++ + DynamicDataUpdatesSuccessCount_Prom.Inc() + mu.Unlock() +} + +func totalDynamicDataUpdateSuccessCountRef() *int64 { + mu.Lock() + defer mu.Unlock() + return &DynamicDataUpdateSuccessCount + +} + +func IncrementDynamicDataUpdateFailureCount() { + mu.Lock() + DynamicDataUpdateFailureCount++ + DynamicDataUpdatesFailureCount_Prom.Inc() + mu.Unlock() +} + +func totalDynamicDataUpdateFailureCountRef() *int64 { + mu.Lock() + defer mu.Unlock() + return &DynamicDataUpdateFailureCount + +} + // Increment counter func IncrementDecisionSuccessCount() { mu.Lock() DecisionSuccessCount++ + DecisionHandlerCount_Prom.Inc() mu.Unlock() } @@ -85,6 +152,7 @@ func totalDecisionSuccessCountRef() *int64 { func IncrementDecisionFailureCount() { mu.Lock() DecisionFailureCount++ + DecisionHandlerCount_Prom.Inc() mu.Unlock() } @@ -100,6 +168,7 @@ func TotalDecisionFailureCountRef() *int64 { func IncrementDeploySuccessCount() { mu.Lock() DeploySuccessCount++ + DeploymentSuccessCount_Prom.Inc() mu.Unlock() } @@ -116,6 +185,7 @@ func totalDeploySuccessCountRef() *int64 { func IncrementDeployFailureCount() { mu.Lock() DeployFailureCount++ + DeploymentFailureCount_Prom.Inc() mu.Unlock() } @@ -132,6 +202,7 @@ func totalDeployFailureCountRef() *int64 { func IncrementUndeploySuccessCount() { mu.Lock() UndeploySuccessCount++ + UndeploymentSuccessCount_Prom.Inc() mu.Unlock() } @@ -148,6 +219,7 @@ func totalUndeploySuccessCountRef() *int64 { func IncrementUndeployFailureCount() { mu.Lock() UndeployFailureCount++ + UndeploymentFailureCount_Prom.Inc() mu.Unlock() } diff --git a/pkg/metrics/statistics-provider.go b/pkg/metrics/statistics-provider.go index 9776511..51b2ec3 100644 --- a/pkg/metrics/statistics-provider.go +++ b/pkg/metrics/statistics-provider.go @@ -23,13 +23,13 @@ package metrics import ( "encoding/json" + "github.com/google/uuid" + openapi_types "github.com/oapi-codegen/runtime/types" "net/http" + "policy-opa-pdp/consts" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/model/oapicodegen" "policy-opa-pdp/pkg/utils" - "policy-opa-pdp/consts" - "github.com/google/uuid" - openapi_types "github.com/oapi-codegen/runtime/types" ) func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) { @@ -65,6 +65,8 @@ func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) { statReport.UndeployFailureCount = totalUndeployFailureCountRef() statReport.UndeploySuccessCount = totalUndeploySuccessCountRef() statReport.TotalPoliciesCount = totalPoliciesCountRef() + statReport.DynamicDataUpdateFailureCount = totalDynamicDataUpdateFailureCountRef() + statReport.DynamicDataUpdateSuccessCount = totalDynamicDataUpdateSuccessCountRef() // not implemented hardcoding the values to zero // will be implemeneted in phase-2 diff --git a/pkg/model/oapicodegen/models.go b/pkg/model/oapicodegen/models.go index 34d88cd..59d9a74 100644 --- a/pkg/model/oapicodegen/models.go +++ b/pkg/model/oapicodegen/models.go @@ -121,16 +121,18 @@ type OPADecisionResponse struct { // StatisticsReport defines model for StatisticsReport. type StatisticsReport struct { - Code *int32 `json:"code,omitempty"` - DecisionFailureCount *int64 `json:"decisionFailureCount,omitempty"` - DecisionSuccessCount *int64 `json:"decisionSuccessCount,omitempty"` - DeployFailureCount *int64 `json:"deployFailureCount,omitempty"` - DeploySuccessCount *int64 `json:"deploySuccessCount,omitempty"` - TotalErrorCount *int64 `json:"totalErrorCount,omitempty"` - TotalPoliciesCount *int64 `json:"totalPoliciesCount,omitempty"` - TotalPolicyTypesCount *int64 `json:"totalPolicyTypesCount,omitempty"` - UndeployFailureCount *int64 `json:"undeployFailureCount,omitempty"` - UndeploySuccessCount *int64 `json:"undeploySuccessCount,omitempty"` + Code *int32 `json:"code,omitempty"` + DecisionFailureCount *int64 `json:"decisionFailureCount,omitempty"` + DecisionSuccessCount *int64 `json:"decisionSuccessCount,omitempty"` + DeployFailureCount *int64 `json:"deployFailureCount,omitempty"` + DeploySuccessCount *int64 `json:"deploySuccessCount,omitempty"` + DynamicDataUpdateFailureCount *int64 `json:"dynamicDataUpdateFailureCount,omitempty"` + DynamicDataUpdateSuccessCount *int64 `json:"dynamicDataUpdateSuccessCount,omitempty"` + TotalErrorCount *int64 `json:"totalErrorCount,omitempty"` + TotalPoliciesCount *int64 `json:"totalPoliciesCount,omitempty"` + TotalPolicyTypesCount *int64 `json:"totalPolicyTypesCount,omitempty"` + UndeployFailureCount *int64 `json:"undeployFailureCount,omitempty"` + UndeploySuccessCount *int64 `json:"undeploySuccessCount,omitempty"` } // DataGetParams defines parameters for DataGet. diff --git a/pkg/pdpattributes/pdpattributes.go b/pkg/pdpattributes/pdpattributes.go index a40a55a..ca18d79 100644 --- a/pkg/pdpattributes/pdpattributes.go +++ b/pkg/pdpattributes/pdpattributes.go @@ -22,8 +22,8 @@ package pdpattributes import ( - "policy-opa-pdp/pkg/log" "github.com/google/uuid" + "policy-opa-pdp/pkg/log" ) var ( diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index fd09bd5..aa52503 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -37,13 +37,13 @@ import ( ) type ( - CreateDirectoryFunc func(dirPath string) error + CreateDirectoryFunc func(dirPath string) error ValidateFieldsStructsFunc func(pdpUpdate model.PdpUpdate) error ) var ( - CreateDirectoryVar CreateDirectoryFunc = CreateDirectory - removeAll = os.RemoveAll + CreateDirectoryVar CreateDirectoryFunc = CreateDirectory + removeAll = os.RemoveAll ValidateFieldsStructsVar ValidateFieldsStructsFunc = ValidateFieldsStructs ) @@ -361,7 +361,6 @@ func BuildBundle(cmdFunc func(string, ...string) *exec.Cmd) (string, error) { return string(output), nil } - type CommonFields struct { CurrentDate *string CurrentDateTime *time.Time @@ -389,18 +388,17 @@ func ValidateOPADataRequest(request interface{}) []string { } commonFields := CommonFields{ - CurrentDate: ¤tDate, - CurrentDateTime: updateReq.CurrentDateTime, - CurrentTime: updateReq.CurrentTime, - TimeOffset: updateReq.TimeOffset, - TimeZone: updateReq.TimeZone, - OnapComponent: updateReq.OnapComponent, - OnapInstance: updateReq.OnapInstance, - OnapName: updateReq.OnapName, - PolicyName: convertPtrToString(updateReq.PolicyName), - + CurrentDate: ¤tDate, + CurrentDateTime: updateReq.CurrentDateTime, + CurrentTime: updateReq.CurrentTime, + TimeOffset: updateReq.TimeOffset, + TimeZone: updateReq.TimeZone, + OnapComponent: updateReq.OnapComponent, + OnapInstance: updateReq.OnapInstance, + OnapName: updateReq.OnapName, + PolicyName: convertPtrToString(updateReq.PolicyName), } - return validateCommonFields(commonFields) + return validateCommonFields(commonFields) } @@ -412,15 +410,15 @@ func ValidateOPADataRequest(request interface{}) []string { } commonFields := CommonFields{ - CurrentDate: ¤tDate, - CurrentDateTime: decisionReq.CurrentDateTime, - CurrentTime: decisionReq.CurrentTime, - TimeOffset: decisionReq.TimeOffset, - TimeZone: decisionReq.TimeZone, - OnapComponent: decisionReq.OnapComponent, - OnapInstance: decisionReq.OnapInstance, - OnapName: decisionReq.OnapName, - PolicyName: decisionReq.PolicyName, + CurrentDate: ¤tDate, + CurrentDateTime: decisionReq.CurrentDateTime, + CurrentTime: decisionReq.CurrentTime, + TimeOffset: decisionReq.TimeOffset, + TimeZone: decisionReq.TimeZone, + OnapComponent: decisionReq.OnapComponent, + OnapInstance: decisionReq.OnapInstance, + OnapName: decisionReq.OnapName, + PolicyName: decisionReq.PolicyName, } return validateCommonFields(commonFields) -- 2.16.6