FROM curlimages/curl:7.78.0 AS build
# Get OPA
-RUN curl -Lo /tmp/opa https://github.com/open-policy-agent/opa/releases/download/v0.69.0/opa_linux_amd64
+RUN curl --proto "=https" -Lo /tmp/opa https://github.com/open-policy-agent/opa/releases/download/v0.69.0/opa_linux_amd64
FROM golang:1.23 AS compile
COPY go.mod go.sum /app/
-COPY . .
+# Copy individual files and directories
+COPY Dockerfile /go/
+COPY api /go/api
+COPY cfg /go/cfg
+COPY cmd /go/cmd
+COPY consts /go/consts
+COPY go.mod /go/
+COPY go.sum /go/
+COPY pkg /go/pkg
+COPY sonar-project.properties /go/
+COPY version /go/
+COPY version.properties /go/
RUN mkdir -p /app/cfg /app/consts /app/api /app/cmd /app/pkg /app/bundles
COPY cfg /app/cfg
# Build the binary
RUN GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" -o /app/opa-pdp /app/cmd/opa-pdp/opa-pdp.go
-FROM ubuntu
+FROM ubuntu:24.04
-RUN apt-get update && apt-get install -y netcat-openbsd curl && rm -rf /var/lib/apt/lists/*\
+RUN apt-get update && apt-get --no-install-recommends install -y netcat-openbsd curl && rm -rf /var/lib/apt/lists/*\
&& mkdir -p /app /opt/policies /opt/data /var/logs \
&& chown -R ubuntu:ubuntu /app /opt/policies /opt/data /var/logs
# Copy our opa executable from build stage
COPY --from=build /tmp/opa /app/opa
-RUN chmod +x /app/opa-pdp && chmod 755 /app/opa && chmod 777 /app/bundles
+RUN chown 1000:1000 /app/opa-pdp && chown 1000:1000 /app/opa && chown 1000:1000 /app/bundles
+RUN chmod u+x /app/opa-pdp && chmod u+x /app/opa && chmod u+x /app/bundles
# Switch to the non-root user and 1000 is for ubuntu
# Command to run OPA with the policies
CMD ["/app/opa-pdp"]
-
- basicAuth: []
x-interface info:
last-mod-release: Paris
- pdpo-version: 1.0.0
+ pdpo-version: 1.0.3
x-codegen-request-body-name: body
/healthcheck:
get:
- basicAuth: []
x-interface info:
last-mod-release: Paris
- pdpo-version: 1.0.0
+ pdpo-version: 1.0.3
/statistics:
get:
tags:
- basicAuth: []
x-interface info:
last-mod-release: Paris
- pdpo-version: 1.0.0
+ pdpo-version: 1.0.3
/data/{path}:
patch:
tags:
- basicAuth: []
x-interface info:
last-mod-release: Paris
- pdpo-version: 1.0.0
+ pdpo-version: 1.0.3
x-codegen-request-body-name: body
get:
tags:
500:
description: Internal Server Error
content: {}
+ security:
+ - basicAuth: []
+ x-interface info:
+ last-mod-release: Paris
+ pdpo-version: 1.0.3
components:
schemas:
ErrorResponse:
decisionFailureCount:
type: integer
format: int64
+ dynamicDataUpdateSuccessCount:
+ type: integer
+ format: int64
+ dynamicDataUpdateFailureCount:
+ type: integer
+ format: int64
OPADataResponse:
type: object
properties:
type: http
description: ""
scheme: basic
-
package api
import (
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
"policy-opa-pdp/cfg"
"policy-opa-pdp/pkg/data"
"policy-opa-pdp/pkg/decision"
"policy-opa-pdp/pkg/healthcheck"
+ "policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/metrics"
"policy-opa-pdp/pkg/opasdk"
"time"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promhttp"
)
// RegisterHandlers registers the HTTP handlers for the service.
http.Handle("/policy/pdpo/v1/data", basicAuth(trackDataResponseTime(dataHandler)))
- http.Handle("/metrics", basicAuth(http.HandlerFunc(metricsHandler)))
+ http.Handle("/metrics", basicAuth(http.HandlerFunc(metricsHandler)))
}
promhttp.Handler().ServeHTTP(w, r)
}
-//Track Decision response time metrics
+// Track Decision response time metrics
func trackDecisionResponseTime(next http.HandlerFunc) http.HandlerFunc {
- return trackResponseTime(metrics.DecisionResponseTime, next)
+ return trackResponseTime(metrics.DecisionResponseTime_Prom, next)
}
-//Track Data response time metrics
+// Track Data response time metrics
func trackDataResponseTime(next http.HandlerFunc) http.HandlerFunc {
- return trackResponseTime(metrics.DataResponseTime, next)
+ return trackResponseTime(metrics.DataResponseTime_Prom, next)
}
func trackResponseTime(metricCollector prometheus.Observer, next http.HandlerFunc) http.HandlerFunc {
validPass := cfg.Password
return username == validUser && password == validPass
}
+
+// handles readiness probe endpoint
+func readinessProbe(res http.ResponseWriter, req *http.Request) {
+ res.WriteHeader(http.StatusOK)
+ _, err := res.Write([]byte("Ready"))
+ if err != nil {
+ log.Errorf("Failed to write response: %v", err)
+ }
+}
package api
import (
+ "github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"policy-opa-pdp/cfg"
"policy-opa-pdp/pkg/healthcheck"
"testing"
"time"
- "github.com/stretchr/testify/assert"
)
// Mock configuration
}
}
-
type mockObserver struct {
observedDuration float64
}
handler(res, req)
assert.NotNil(t, observer.observedDuration)
}
+
+func TestMetricsHandler(t *testing.T) {
+ req := httptest.NewRequest(http.MethodGet, "/metrics", nil)
+ rr := httptest.NewRecorder()
+
+ metricsHandler(rr, req)
+
+ resp := rr.Result()
+ defer resp.Body.Close()
+
+ assert.Equal(t, http.StatusOK, resp.StatusCode, "expected status OK")
+
+ contentType := resp.Header.Get("Content-Type")
+ assert.Contains(t, contentType, "text/plain", "expected Prometheus content type")
+
+}
+
+func TestReadinessProbe(t *testing.T) {
+ req := httptest.NewRequest(http.MethodGet, "/ready", nil)
+ rr := httptest.NewRecorder()
+
+ readinessProbe(rr, req)
+
+ resp := rr.Result()
+ defer resp.Body.Close()
+
+ assert.Equal(t, http.StatusOK, resp.StatusCode, "expected HTTP 200 OK")
+
+ body := rr.Body.String()
+ assert.Equal(t, "Ready", body, "expected response body to be 'Ready'")
+}
// KAFKA_USERNAME - The Kafka username for SASL authentication.
// KAFKA_PASSWORD - The Kafka password for SASL authentication.
var (
- LogLevel string
- BootstrapServer string
- Topic string
- GroupId string
- Username string
- Password string
- UseSASLForKAFKA string
- KAFKA_USERNAME string
- KAFKA_PASSWORD string
- JAASLOGIN string
+ LogLevel string
+ BootstrapServer string
+ Topic string
+ PatchTopic string
+ GroupId string
+ Username string
+ Password string
+ UseSASLForKAFKA string
+ KAFKA_USERNAME string
+ KAFKA_PASSWORD string
+ JAASLOGIN string
+ UseKafkaForPatch bool
+ PatchGroupId string
)
// Initializes the configuration settings.
LogLevel = getEnv("LOG_LEVEL", "info")
BootstrapServer = getEnv("KAFKA_URL", "kafka:9092")
Topic = getEnv("PAP_TOPIC", "policy-pdp-pap")
+ PatchTopic = getEnv("PATCH_TOPIC", "opa-pdp-data")
GroupId = getEnv("GROUPID", "opa-pdp-"+uuid.New().String())
+ PatchGroupId = getEnv("PATCH_GROUPID", "opa-pdp-data-"+uuid.New().String())
Username = getEnv("API_USER", "policyadmin")
Password = getEnv("API_PASSWORD", "zb!XztG34")
UseSASLForKAFKA = getEnv("UseSASLForKAFKA", "false")
KAFKA_USERNAME, KAFKA_PASSWORD = getSaslJAASLOGINFromEnv(JAASLOGIN)
log.Debugf("Username: %s", KAFKA_USERNAME)
log.Debugf("Password: %s", KAFKA_PASSWORD)
-
+ UseKafkaForPatch = getEnvAsBool("USE_KAFKA_FOR_PATCH", false)
log.Debug("Configuration module: environment initialised")
}
}
}
+func getEnvAsBool(key string, defaultVal bool) bool {
+ if value, exists := os.LookupEnv(key); exists {
+ parsed, err := strconv.ParseBool(value)
+ if err != nil {
+ log.Warnf("%v is set but not a valid bool (%v), using default: %v", key, value, defaultVal)
+ return defaultVal
+ }
+ return parsed
+ }
+ log.Warnf("%v not defined, using default value: %v", key, defaultVal)
+ return defaultVal
+}
+
func getSaslJAASLOGINFromEnv(JAASLOGIN string) (string, string) {
// Retrieve the value of the environment variable
decodingConfigBytes := getEnv("JAASLOGIN", "JAASLOGIN")
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
assert.Empty(t, username, "Expected username to be empty for missing environment variable")
assert.Empty(t, password, "Expected password to be empty for missing environment variable")
}
+
+func TestGetEnvAsBool(t *testing.T) {
+ t.Run("valid boolean true", func(t *testing.T) {
+ os.Setenv("USE_KAFKA_FOR_PATCH", "true")
+ defer os.Unsetenv("USE_KAFKA_FOR_PATCH")
+
+ result := getEnvAsBool("USE_KAFKA_FOR_PATCH", false)
+ assert.True(t, result)
+ })
+
+ t.Run("valid boolean false", func(t *testing.T) {
+ os.Setenv("USE_KAFKA_FOR_PATCH", "false")
+ defer os.Unsetenv("USE_KAFKA_FOR_PATCH")
+
+ result := getEnvAsBool("USE_KAFKA_FOR_PATCH", true)
+ assert.False(t, result)
+ })
+
+ t.Run("invalid boolean value", func(t *testing.T) {
+ os.Setenv("USE_KAFKA_FOR_PATCH", "notabool")
+ defer os.Unsetenv("USE_KAFKA_FOR_PATCH")
+
+ result := getEnvAsBool("USE_KAFKA_FOR_PATCH", true)
+ assert.True(t, result) // should return default (true) because parsing fails
+ })
+
+ t.Run("missing env variable", func(t *testing.T) {
+ os.Unsetenv("USE_KAFKA_FOR_PATCH") // ensure it's not set
+
+ result := getEnvAsBool("USE_KAFKA_FOR_PATCH", false)
+ assert.False(t, result) // should return default (false)
+ })
+}
h "policy-opa-pdp/api"
"policy-opa-pdp/cfg"
"policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/data"
"policy-opa-pdp/pkg/kafkacomm"
"policy-opa-pdp/pkg/kafkacomm/handler"
"policy-opa-pdp/pkg/kafkacomm/publisher"
var (
bootstrapServers = cfg.BootstrapServer //The Kafka bootstrap server address.
topic = cfg.Topic //The Kafka topic to subscribe to.
+ patchTopic = cfg.PatchTopic
+ patchMsgProducer *kafkacomm.KafkaProducer
+ patchMsgConsumer *kafkacomm.KafkaConsumer
+ groupId = cfg.GroupId
+ patchGroupId = cfg.PatchGroupId
)
// Declare function variables for dependency injection makes it more testable
var (
- initializeHandlersFunc = initializeHandlers
- startHTTPServerFunc = startHTTPServer
- shutdownHTTPServerFunc = shutdownHTTPServer
- waitForServerFunc = waitForServer
- initializeOPAFunc = initializeOPA
- startKafkaConsAndProdFunc = startKafkaConsAndProd
- handleMessagesFunc = handleMessages
- handleShutdownFunc = handleShutdown
+ initializeHandlersFunc = initializeHandlers
+ startHTTPServerFunc = startHTTPServer
+ shutdownHTTPServerFunc = shutdownHTTPServer
+ waitForServerFunc = waitForServer
+ initializeOPAFunc = initializeOPA
+ startKafkaConsAndProdFunc = startKafkaConsAndProd
+ handleMessagesFunc = handleMessages
+ handleShutdownFunc = handleShutdown
+ startPatchKafkaConsAndProdFunc = startPatchKafkaConsAndProd
+ handlePatchMessagesFunc = handlePatchMessages
)
// main function
func main() {
+ var useKafkaForPatch = cfg.UseKafkaForPatch
log.Debugf("Starting OPA PDP Service")
ctx, cancel := context.WithCancel(context.Background())
// start pdp message handler in a seperate routine
handleMessagesFunc(ctx, kc, sender)
+ if useKafkaForPatch {
+ patchMsgConsumer, patchMsgProducer, err := startPatchKafkaConsAndProdFunc()
+ if err != nil || patchMsgConsumer == nil {
+ log.Warnf("Kafka consumer initialization failed: %v", err)
+ }
+ log.Debugf("Producer initialized is: %v", patchMsgProducer)
+ // start patch message handler in a seperate routine
+ handlePatchMessagesFunc(ctx, patchMsgConsumer)
+ }
+
time.Sleep(10 * time.Second)
pdpattributes.SetPdpHeartbeatInterval(int64(consts.DefaultHeartbeatMS))
// Handle OS Interrupts and Graceful Shutdown
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
- handleShutdownFunc(kc, interruptChannel, cancel, producer)
+ consumers := []*kafkacomm.KafkaConsumer{kc, patchMsgConsumer}
+ producers := []*kafkacomm.KafkaProducer{producer, patchMsgProducer}
+ handleShutdownFunc(consumers, interruptChannel, cancel, producers)
}
type PdpMessageHandlerFunc func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error
}()
}
+type PatchMessageHandlerFunc func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error
+
+var PatchMessageHandler PatchMessageHandlerFunc = handler.PatchMessageHandler
+
+// starts patchMessage Handler in a seperate routine which handles incoming messages on Kfka topic
+func handlePatchMessages(ctx context.Context, kc *kafkacomm.KafkaConsumer) {
+
+ go func() {
+ err := PatchMessageHandler(ctx, kc, patchTopic)
+ if err != nil {
+ log.Warnf("Erro in Patch Message Handler: %v", err)
+ }
+ }()
+}
+
// Register Handlers
func initializeHandlers() {
h.RegisterHandlers()
return nil
}
-type NewKafkaConsumerFunc func() (*kafkacomm.KafkaConsumer, error)
+type NewKafkaConsumerFunc func(topic string, groupid string) (*kafkacomm.KafkaConsumer, error)
var NewKafkaConsumer NewKafkaConsumerFunc = kafkacomm.NewKafkaConsumer
var GetKafkaProducer GetKafkaProducerFunc = kafkacomm.GetKafkaProducer
func startKafkaConsAndProd() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
- kc, err := NewKafkaConsumer()
+ log.Debugf("Topic start :::: %s", topic)
+ kc, err := NewKafkaConsumer(topic, groupId)
if err != nil {
log.Warnf("Failed to create Kafka consumer: %v", err)
return nil, nil, err
return kc, producer, nil
}
-func handleShutdown(kc *kafkacomm.KafkaConsumer, interruptChannel chan os.Signal, cancel context.CancelFunc, producer *kafkacomm.KafkaProducer) {
+func startPatchKafkaConsAndProd() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
+ log.Debugf("Topic start :::: %s", patchTopic)
+ kc, err := NewKafkaConsumer(patchTopic, patchGroupId)
+ if err != nil {
+ log.Warnf("Failed to create Kafka consumer: %v", err)
+ return nil, nil, err
+ }
+ PatchProducer, err := GetKafkaProducer(bootstrapServers, patchTopic)
+ if err != nil {
+ log.Warnf("Failed to create Kafka producer: %v", err)
+ return nil, nil, err
+ }
+ data.PatchProducer = PatchProducer
+ return kc, PatchProducer, nil
+}
+
+func handleShutdown(consumers []*kafkacomm.KafkaConsumer, interruptChannel chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) {
myLoop:
for {
signal.Stop(interruptChannel)
publisher.StopTicker()
- producer.Close()
- if kc == nil {
- log.Debugf("kc is nil so skipping")
- return
+ for _, producer := range producers {
+ if producer != nil {
+ producer.Close()
+ }
}
- if err := kc.Consumer.Unsubscribe(); err != nil {
- log.Warnf("Failed to unsubscribe consumer: %v", err)
- } else {
- log.Debugf("Consumer Unsubscribed....")
- }
- if err := kc.Consumer.Close(); err != nil {
- log.Debug("Failed to close consumer......")
- } else {
- log.Debugf("Consumer closed....")
+ for _, consumer := range consumers {
+ if consumer == nil {
+ log.Debugf("kc is nil so skipping")
+ continue
+ }
+
+ if err := consumer.Consumer.Unsubscribe(); err != nil {
+ log.Warnf("Failed to unsubscribe consumer: %v", err)
+ } else {
+ log.Debugf("Consumer Unsubscribed....")
+ }
+ if err := consumer.Consumer.Close(); err != nil {
+ log.Debug("Failed to close consumer......")
+ } else {
+ log.Debugf("Consumer closed....")
+ }
}
handler.SetShutdownFlag()
}()
done := make(chan bool)
go func() {
- handleShutdown(mockKafkaConsumer, interruptChannel, cancel, kafkaProducer)
+ handleShutdown([]*kafkacomm.KafkaConsumer{mockKafkaConsumer, nil}, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer})
done <- true
}()
// Mock handleShutdown
interruptChannel := make(chan os.Signal, 1)
- handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, kp *kafkacomm.KafkaProducer) {
+ handleShutdownFunc = func(consumers []*kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) {
interruptChannel <- os.Interrupt
cancel()
}
done := make(chan bool)
go func() {
- handleShutdown(mockKafkaConsumer, interruptChannel, cancel, kafkaProducer)
+ handleShutdown([]*kafkacomm.KafkaConsumer{mockKafkaConsumer}, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer})
done <- true
}()
}
+// TestHandleMessages
+func TestHandlePatchMessages(t *testing.T) {
+ message := `{"patchInfos":[{"Path":["node","testcell","testconsistency","testmaxPCI"],"Op":0,"Value":6778}]}`
+ mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
+ expectedError := error(nil)
+
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+ mockConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockKafkaConsumer,
+ }
+
+ ctx := context.Background()
+ handlePatchMessages(ctx, mockConsumer)
+
+}
+
// Test to simulate a Kafka initialization failure in the main function.
func TestMain_KafkaInitializationFailure(t *testing.T) {
startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
// Test to validate the main function's handling of shutdown signals.
func TestMain_HandleShutdownWithSignals(t *testing.T) {
- handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producer *kafkacomm.KafkaProducer) {
+ handleShutdownFunc = func(consumers []*kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc, producers []*kafkacomm.KafkaProducer) {
go func() {
interruptChan <- os.Interrupt // Simulate SIGTERM
}()
t.Run("Kafka consumer creation failure", func(t *testing.T) {
originalNewKafkaConsumer := NewKafkaConsumer
originalGetKafkaProducer := GetKafkaProducer
- NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) {
+ NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
return nil, errors.New("Kafka consumer creation error")
}
t.Run("Kafka producer creation failure", func(t *testing.T) {
originalNewKafkaConsumer := NewKafkaConsumer
originalGetKafkaProducer := GetKafkaProducer
- NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) {
+ NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
return mockConsumer, nil
}
t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
originalNewKafkaConsumer := NewKafkaConsumer
originalGetKafkaProducer := GetKafkaProducer
- NewKafkaConsumer = func() (*kafkacomm.KafkaConsumer, error) {
+ NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
return mockConsumer, nil
}
GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
}()
done := make(chan bool)
go func() {
- handleShutdown(nil, interruptChannel, cancel, kafkaProducer) // Pass nil as kc
+ handleShutdown(nil, interruptChannel, cancel, []*kafkacomm.KafkaProducer{kafkaProducer}) // Pass nil as kc
done <- true
}()
shutdownHTTPServer(server)
assert.True(t, true, "Shutdown error")
}
+
+// Test to verify that both the Kafka consumer and producer start successfully
+func TestPatchStartKafkaAndProdSuccess(t *testing.T) {
+ t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
+ originalNewKafkaConsumer := NewKafkaConsumer
+ originalGetKafkaProducer := GetKafkaProducer
+ NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
+ return mockConsumer, nil
+ }
+ GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ return mockProducer, nil
+ }
+
+ // Call the function under test
+ consumer, producer, err := startPatchKafkaConsAndProd()
+
+ // Assertions
+ assert.NoError(t, err)
+ assert.NotNil(t, consumer)
+ assert.NotNil(t, producer)
+
+ NewKafkaConsumer = originalNewKafkaConsumer
+ GetKafkaProducer = originalGetKafkaProducer
+ })
+}
+
+// Test to verify error scenarios during Kafka consumer and producer start
+func TestPatchStartKafkaAndProdFailure(t *testing.T) {
+ t.Run("Kafka consumer creation failure", func(t *testing.T) {
+ originalNewKafkaConsumer := NewKafkaConsumer
+ originalGetKafkaProducer := GetKafkaProducer
+ defer func() {
+ NewKafkaConsumer = originalNewKafkaConsumer
+ GetKafkaProducer = originalGetKafkaProducer
+ }()
+
+ NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
+ return nil, errors.New("consumer creation failed")
+ }
+ GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ return mockProducer, nil
+ }
+
+ consumer, producer, err := startPatchKafkaConsAndProd()
+
+ assert.Error(t, err)
+ assert.Nil(t, consumer)
+ assert.Nil(t, producer)
+ })
+
+ t.Run("Kafka producer creation failure", func(t *testing.T) {
+ originalNewKafkaConsumer := NewKafkaConsumer
+ originalGetKafkaProducer := GetKafkaProducer
+ defer func() {
+ NewKafkaConsumer = originalNewKafkaConsumer
+ GetKafkaProducer = originalGetKafkaProducer
+ }()
+
+ NewKafkaConsumer = func(topic string, groupId string) (*kafkacomm.KafkaConsumer, error) {
+ return mockConsumer, nil
+ }
+ GetKafkaProducer = func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ return nil, errors.New("producer creation failed")
+ }
+
+ consumer, producer, err := startPatchKafkaConsAndProd()
+
+ assert.Error(t, err)
+ assert.Nil(t, consumer)
+ assert.Nil(t, producer)
+ })
+}
// Variables:
//
-// LogFilePath - The file path for the log file.
-// LogMaxSize - The maximum size of the log file in megabytes.
-// LogMaxBackups - The maximum number of backup log files to retain.
-// OpasdkConfigPath - The file path for the OPA SDK configuration.
-// Opa - The file path for the OPA binary.
-// BuildBundle - The command to build the bundle.
-// Policies - The directory path for policies.
-// Data - The directory path for policy data.
-// DataNode - The directory path for policy data with node.
-// Output - The output flag for bundle commands.
-// BundleTarGz - The name of the bundle tar.gz file.
-// BundleTarGzFile - The file path for the bundle tar.gz file.
-// PdpGroup - The default PDP group.
-// PdpType - The type of PDP.
-// ServerPort - The port on which the server listens.
-// ServerWaitUpTime - The time to wait for the server to be up, in seconds.
-// ShutdownWaitTime - The time to wait for the server to shut down, in seconds.
-// V1Compatible - The flag for v1 compatibility.
-// LatestVersion - The Version set in response for decision
-// MinorVersion - The Minor version set in response header for decision
-// PatchVersion - The Patch Version set in response header for decison
-// OpaPdpUrl - The Healthcheck url for response
-// HealtCheckStatus - The bool flag for Healthy field in HealtCheck response
-// OkCode - The Code for HealthCheck response
-// HealthCheckMessage - The Healtcheck Message
-// DefaultHeartbeatMS - The default interval for heartbeat signals in milliseconds.
-// SingleHierarchy - The Counter indicates the length of datakey path
-// PolicyVersion - constant declared for policy-version
-// PolicyID - constant declared for policy-id
-// RequestId - constant declared for ONAP Request-ID
-// MaxOutputResponseLength - constant declared for maximum length of output in response message
+// LogFilePath - The file path for the log file.
+// LogMaxSize - The maximum size of the log file in megabytes.
+// LogMaxBackups - The maximum number of backup log files to retain.
+// OpasdkConfigPath - The file path for the OPA SDK configuration.
+// Opa - The file path for the OPA binary.
+// BuildBundle - The command to build the bundle.
+// Policies - The directory path for policies.
+// Data - The directory path for policy data.
+// DataNode - The directory path for policy data with node.
+// Output - The output flag for bundle commands.
+// BundleTarGz - The name of the bundle tar.gz file.
+// BundleTarGzFile - The file path for the bundle tar.gz file.
+// PdpGroup - The default PDP group.
+// PdpType - The type of PDP.
+// ServerPort - The port on which the server listens.
+// ServerWaitUpTime - The time to wait for the server to be up, in seconds.
+// ShutdownWaitTime - The time to wait for the server to shut down, in seconds.
+// V1Compatible - The flag for v1 compatibility.
+// LatestVersion - The Version set in response for decision
+// MinorVersion - The Minor version set in response header for decision
+// PatchVersion - The Patch Version set in response header for decison
+// OpaPdpUrl - The Healthcheck url for response
+// HealtCheckStatus - The bool flag for Healthy field in HealtCheck response
+// OkCode - The Code for HealthCheck response
+// HealthCheckMessage - The Healtcheck Message
+// DefaultHeartbeatMS - The default interval for heartbeat signals in milliseconds.
+// SingleHierarchy - The Counter indicates the length of datakey path
+// PolicyVersion - constant declared for policy-version
+// PolicyID - constant declared for policy-id
+// RequestId - constant declared for ONAP Request-ID
+// MaxOutputResponseLength - constant declared for maximum length of output in response message
+// ContentType - constant for response Content Type
var (
- LogFilePath = "/var/logs/logs.log"
- LogMaxSize = 10
- LogMaxBackups = 3
- OpasdkConfigPath = "/app/config/config.json"
- Opa = "/app/opa"
- BuildBundle = "build"
- Policies = "/opt/policies"
- Data = "/opt/data"
- DataNode = "/opt/data/node"
- Output = "-o"
- BundleTarGz = "bundle.tar.gz"
- BundleTarGzFile = "/app/bundles/bundle.tar.gz"
- PdpGroup = "opaGroup"
- PdpType = "opa"
- ServerPort = ":8282"
- ServerWaitUpTime = 5
- ShutdownWaitTime = 5
- V1Compatible = "--v1-compatible"
- LatestVersion = "1.0.0"
- MinorVersion = "0"
- PatchVersion = "0"
- OpaPdpUrl = "self"
- HealtCheckStatus = true
- OkCode = int32(200)
- HealthCheckMessage = "alive"
- DefaultHeartbeatMS = 60000
- SingleHierarchy = 4
- PolicyVersion = "policy-version"
- PolicyId = "policy-id"
- RequestId = "X-ONAP-RequestID"
+ LogFilePath = "/var/logs/logs.log"
+ LogMaxSize = 10
+ LogMaxBackups = 3
+ OpasdkConfigPath = "/app/config/config.json"
+ Opa = "/app/opa"
+ BuildBundle = "build"
+ Policies = "/opt/policies"
+ Data = "/opt/data"
+ DataNode = "/opt/data/node"
+ Output = "-o"
+ BundleTarGz = "bundle.tar.gz"
+ BundleTarGzFile = "/app/bundles/bundle.tar.gz"
+ PdpGroup = "opaGroup"
+ PdpType = "opa"
+ ServerPort = ":8282"
+ ServerWaitUpTime = 5
+ ShutdownWaitTime = 5
+ V1Compatible = "--v1-compatible"
+ LatestVersion = "1.0.0"
+ MinorVersion = "0"
+ PatchVersion = "0"
+ OpaPdpUrl = "self"
+ HealtCheckStatus = true
+ OkCode = int32(200)
+ HealthCheckMessage = "alive"
+ DefaultHeartbeatMS = 60000
+ SingleHierarchy = 4
+ PolicyVersion = "policy-version"
+ PolicyId = "policy-id"
+ RequestId = "X-ONAP-RequestID"
MaxOutputResponseLength = 200
+ ContentType = "Content-Type"
+ ApplicationJson = "application/json"
)
"github.com/open-policy-agent/opa/storage"
"net/http"
"path/filepath"
+ "policy-opa-pdp/cfg"
"policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/kafkacomm/publisher"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/metrics"
"policy-opa-pdp/pkg/model/oapicodegen"
type (
checkIfPolicyAlreadyExistsFunc func(policyId string) bool
+ validateRequestFunc func(requestBody *oapicodegen.OPADataUpdateRequest) error
)
var (
checkIfPolicyAlreadyExistsVar checkIfPolicyAlreadyExistsFunc = policymap.CheckIfPolicyAlreadyExists
getPolicyByIDVar = getPolicyByID
extractPatchInfoVar = extractPatchInfo
+ bootstrapServers = cfg.BootstrapServer //The Kafka bootstrap server address.
+ PatchProducer kafkacomm.KafkaProducerInterface
+ patchTopic = cfg.PatchTopic
+ PatchDataVar = PatchData
+ getOperationTypeVar = getOperationType
)
// creates a response code map to OPADataUpdateResponse
// writes a Error JSON response to the HTTP response writer for OPADataUpdate
func writeOPADataUpdateErrorJSONResponse(res http.ResponseWriter, status int, errorDescription string, dataErrorRes oapicodegen.ErrorResponse) {
- res.Header().Set("Content-Type", "application/json")
+ res.Header().Set(consts.ContentType, consts.ApplicationJson)
res.WriteHeader(status)
if err := json.NewEncoder(res).Encode(dataErrorRes); err != nil {
http.Error(res, err.Error(), status)
}
func patchHandler(res http.ResponseWriter, req *http.Request) {
+ var useKafkaForPatch = cfg.UseKafkaForPatch
log.Infof("PDP received a request to update data through API")
constructResponseHeader(res, req)
var requestBody oapicodegen.OPADataUpdateRequest
- if err := json.NewDecoder(req.Body).Decode(&requestBody); err != nil {
- errMsg := "Error in decoding the request data - " + err.Error()
+
+ requestBody, err := decodeRequest(req)
+ if err != nil {
+ sendErrorResponse(res, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ dataDir, dirParts := extractDataDir(req)
+
+ if err := validateRequest(&requestBody); err != nil {
+ sendErrorResponse(res, err.Error(), http.StatusBadRequest)
+ return
+ }
+ log.Debug("All fields are valid!")
+ // Access the data part
+ data := requestBody.Data
+ log.Infof("data : %s", data)
+ policyId := requestBody.PolicyName
+ if policyId == nil {
+ errMsg := "Policy Id is nil"
+ sendErrorResponse(res, errMsg, http.StatusBadRequest)
+ return
+ }
+ log.Infof("policy name : %s", *policyId)
+ isExists := policymap.CheckIfPolicyAlreadyExists(*policyId)
+ if !isExists {
+ errMsg := "Policy associated with the patch request does not exists"
sendErrorResponse(res, errMsg, http.StatusBadRequest)
log.Errorf(errMsg)
return
}
- path := strings.TrimPrefix(req.URL.Path, "/policy/pdpo/v1/data")
- dirParts := strings.Split(path, "/")
- dataDir := filepath.Join(dirParts...)
- log.Infof("dataDir : %s", dataDir)
-
- // Validate the request
- validationErrors := utils.ValidateOPADataRequest(&requestBody)
- // Validate Data field (ensure it's not nil and has items)
- if !(utils.IsValidData(requestBody.Data)) {
- validationErrors = append(validationErrors, "Data is required and cannot be empty")
+ matchFound := validatePolicyDataPathMatched(dirParts, *policyId, res)
+ if !matchFound {
+ return
}
- // Print validation errors
- if len(validationErrors) > 0 {
- errMsg := strings.Join(validationErrors, ", ")
- log.Errorf("Facing validation error in requestbody - %s", errMsg)
- sendErrorResponse(res, errMsg, http.StatusBadRequest)
+ patchInfos, err := getPatchInfo(requestBody.Data, dataDir, res)
+ if err != nil {
+ log.Warnf("Failed to get Patch Info : %v", err)
return
- } else {
- log.Debug("All fields are valid!")
- // Access the data part
- data := requestBody.Data
- log.Infof("data : %s", data)
- policyId := requestBody.PolicyName
- if policyId == nil {
- errMsg := "Policy Id is nil"
- sendErrorResponse(res, errMsg, http.StatusBadRequest)
- return
- }
- log.Infof("policy name : %s", *policyId)
- isExists := policymap.CheckIfPolicyAlreadyExists(*policyId)
- if !isExists {
- errMsg := "Policy associated with the patch request does not exists"
- sendErrorResponse(res, errMsg, http.StatusBadRequest)
- log.Errorf(errMsg)
+ }
+
+ if useKafkaForPatch {
+ err := handleDynamicUpdateRequestWithKafka(patchInfos, res)
+ if err != nil {
+ log.Warnf("Error in handling dynamic update request wit kafka: %v", err)
return
}
+ res.Header().Set(consts.ContentType, consts.ApplicationJson)
+ res.WriteHeader(http.StatusAccepted)
+ _, _ = res.Write([]byte(`{"message": "Patch request accepted for processing via kafka and Use the get data url to fetch the latest data. In case of errors, Check logs."uri":"/policy/pdpo/v1/data/""}`))
+ metrics.IncrementDynamicDataUpdateSuccessCount()
+ return
+ }
+ if err := PatchData(patchInfos, res); err != nil {
+ // Handle the error, for example, log it or return an appropriate response
+ log.Errorf("Error encoding JSON response: %s", err)
+ return
+
+ }
+ metrics.IncrementDynamicDataUpdateSuccessCount()
- matchFound := validatePolicyDataPathMatched(dirParts, *policyId, res)
+}
- if matchFound {
- if err := patchData(dataDir, data, res); err != nil {
- // Handle the error, for example, log it or return an appropriate response
- log.Errorf("Error encoding JSON response: %s", err)
- }
- }
+func decodeRequest(req *http.Request) (oapicodegen.OPADataUpdateRequest, error) {
+ var requestBody oapicodegen.OPADataUpdateRequest
+ if err := json.NewDecoder(req.Body).Decode(&requestBody); err != nil {
+ return requestBody, fmt.Errorf("Error in decoding request data: %v", err)
+ }
+ return requestBody, nil
+}
+
+func extractDataDir(req *http.Request) (string, []string) {
+ path := strings.TrimPrefix(req.URL.Path, "/policy/pdpo/v1/data")
+ dirParts := strings.Split(path, "/")
+ return filepath.Join(dirParts...), dirParts
+}
+
+func validateRequest(requestBody *oapicodegen.OPADataUpdateRequest) error {
+ validationErrors := utils.ValidateOPADataRequest(requestBody)
+ if !utils.IsValidData(requestBody.Data) {
+ validationErrors = append(validationErrors, "Data is required and cannot be empty")
+ }
+ if len(validationErrors) > 0 {
+ return fmt.Errorf(strings.Join(validationErrors, ", "))
+ }
+ return nil
+}
+
+func getPatchInfo(data *[]map[string]interface{}, dataDir string, res http.ResponseWriter) ([]opasdk.PatchImpl, error) {
+ root := "/" + strings.Trim(dataDir, "/")
+ patchInfos, err := extractPatchInfoVar(res, data, root)
+ if patchInfos == nil || err != nil {
+ return nil, fmt.Errorf("Error in extracting Patch Info : %v", err)
}
+ return patchInfos, nil
+}
+
+func handleDynamicUpdateRequestWithKafka(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error {
+
+ if PatchProducer == nil {
+ log.Warnf("Failed to initialize Kafka producer")
+ return fmt.Errorf("Failed to initialize Kafka producer")
+
+ }
+ sender := &publisher.RealPatchSender{
+ Producer: PatchProducer,
+ }
+ if err := sender.SendPatchMessage(patchInfos); err != nil {
+ log.Warnf("Failed to send Patch Messge, %v", err)
+ return err
+ }
+
+ return nil
}
func DataHandler(res http.ResponseWriter, req *http.Request) {
optypeString, opTypeErr := op["op"].(string)
if !opTypeErr {
opTypeErrMsg := "Error in getting op type. Op type is not given in request body"
- sendErrorResponse(res, opTypeErrMsg, http.StatusInternalServerError)
+ sendErrorResponse(res, opTypeErrMsg, http.StatusBadRequest)
log.Errorf(opTypeErrMsg)
return nil, fmt.Errorf("Error in getting op type. Op type is not given in request body")
}
- opType, err := getOperationType(optypeString, res)
+ opType, err := getOperationTypeVar(optypeString, res)
if err != nil {
log.Warnf("Error in getting opType: %v", err)
value, valueErr = op["value"]
if !valueErr || isEmpty(value) {
valueErrMsg := "Error in getting data value. Value is not given in request body"
- sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError)
+ sendErrorResponse(res, valueErrMsg, http.StatusBadRequest)
log.Errorf(valueErrMsg)
return nil, fmt.Errorf("Error in getting data value. Value is not given in request body")
}
}
} else {
valueErrMsg := "Error in getting data path - Invalid path (/) is used."
- sendErrorResponse(res, valueErrMsg, http.StatusInternalServerError)
+ sendErrorResponse(res, valueErrMsg, http.StatusBadRequest)
log.Errorf(valueErrMsg)
return nil
}
opPath, opPathErr := op["path"].(string)
if !opPathErr || len(opPath) == 0 {
opPathErrMsg := "Error in getting data path. Path is not given in request body"
- sendErrorResponse(res, opPathErrMsg, http.StatusInternalServerError)
+ sendErrorResponse(res, opPathErrMsg, http.StatusBadRequest)
log.Errorf(opPathErrMsg)
return nil
}
var NewOpaSDKPatch NewOpaSDKPatchFunc = opasdk.PatchData
-func patchData(root string, ops *[]map[string]interface{}, res http.ResponseWriter) (err error) {
- root = "/" + strings.Trim(root, "/")
- patchInfos, err := extractPatchInfoVar(res, ops, root)
- if err != nil {
- log.Warnf("Failed to extarct Patch Info")
- return err
- }
-
+func PatchData(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) (err error) {
if patchInfos != nil {
patchErr := NewOpaSDKPatch(context.Background(), patchInfos)
if patchErr != nil {
errCode = http.StatusNotFound
}
errMsg := "Error in updating data - " + patchErr.Error()
- sendErrorResponse(res, errMsg, errCode)
+ if res != nil {
+ sendErrorResponse(res, errMsg, errCode)
+ }
log.Errorf(errMsg)
return patchErr
}
log.Infof("Updated the data in the corresponding path successfully\n")
- res.WriteHeader(http.StatusNoContent)
+ if res != nil {
+ res.WriteHeader(http.StatusNoContent)
+ }
}
// handled all error scenarios in extractPatchInfo method
return nil
func sendErrorResponse(res http.ResponseWriter, errMsg string, statusCode int) {
dataExc := createOPADataUpdateExceptionResponse(statusCode, errMsg, "")
+ metrics.IncrementDynamicDataUpdateFailureCount()
metrics.IncrementTotalErrorCount()
writeOPADataUpdateErrorJSONResponse(res, statusCode, errMsg, *dataExc)
}
dataResponse.Data = data
}
- res.Header().Set("Content-Type", "application/json")
+ res.Header().Set(consts.ContentType, consts.ApplicationJson)
res.WriteHeader(http.StatusOK)
if err := json.NewEncoder(res).Encode(dataResponse); err != nil {
"encoding/json"
"errors"
"fmt"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
openapi_types "github.com/oapi-codegen/runtime/types"
+ "github.com/open-policy-agent/opa/v1/storage"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
data = nil
root := "/test"
res := httptest.NewRecorder()
- result := patchData(root, &data, res)
+ patchImpl, _ := extractPatchInfo(res, &data, root)
+ result := PatchData(patchImpl, res)
assert.Nil(t, result)
}
root := "/test"
res := httptest.NewRecorder()
- result := patchData(root, &data, res)
+ patchImpl, _ := extractPatchInfo(res, &data, root)
+ result := PatchData(patchImpl, res)
assert.Equal(t, http.StatusNotFound, res.Code)
assert.Error(t, result)
}
root := "/test"
res := httptest.NewRecorder()
extractPatchInfo(res, &data, root)
- assert.Equal(t, http.StatusInternalServerError, res.Code)
+ assert.Equal(t, http.StatusBadRequest, res.Code)
}
func Test_extractPatchInfo_Pathfail(t *testing.T) {
root := "/test"
res := httptest.NewRecorder()
extractPatchInfo(res, &data, root)
- assert.Equal(t, http.StatusInternalServerError, res.Code)
+ assert.Equal(t, http.StatusBadRequest, res.Code)
}
func Test_extractPatchInfo_valuefail(t *testing.T) {
root := "/test"
res := httptest.NewRecorder()
extractPatchInfo(res, &data, root)
- assert.Equal(t, http.StatusInternalServerError, res.Code)
+ assert.Equal(t, http.StatusBadRequest, res.Code)
}
func TestPatchData_success(t *testing.T) {
root := "/test"
res := httptest.NewRecorder()
- patchData(root, &data, res)
+ patchImpl, _ := extractPatchInfo(res, &data, root)
+ PatchData(patchImpl, res)
assert.Equal(t, http.StatusNoContent, res.Code)
}
expectedStatus: http.StatusInternalServerError,
expectedBody: "Error in getting data - internal server failure",
},
+ {
+ name: "Error- JSON ENcoding Failure",
+ requestURL: "/policy/pdpo/v1/datai/bad/json",
+ mockResponse: map[string]interface{}{"bad": make(chan int)},
+ mockError: nil,
+ expectedStatus: http.StatusInternalServerError,
+ expectedBody: "Error in getting data - json: unsupported type: chan int",
+ },
}
for _, tt := range tests {
return policy, nil
}
- // Simulating HTTP request and response recorder
- // req := httptest.NewRequest("GET", "/policy/test-policy", nil)
res := httptest.NewRecorder()
// Processing dirParts
t.Errorf("expected patchInfos to be nil, got: %v", patchInfos)
}
}
+
+func TestHandleDynamicUpdateRequestWithKafka_KafkaDisabled_Error(t *testing.T) {
+ PatchDataVar = func(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error {
+ return errors.New("mock error")
+ }
+
+ req := httptest.NewRecorder()
+ patchInfos := []opasdk.PatchImpl{{}}
+
+ handleDynamicUpdateRequestWithKafka(patchInfos, req)
+ // Optionally assert on req.Body or req.Code if needed
+}
+
+// --- Sample PatchImpl for testing ---
+func samplePatchData() []opasdk.PatchImpl {
+ return []opasdk.PatchImpl{
+ {
+ Path: storage.MustParsePath("/policy/config/name"),
+ Op: storage.ReplaceOp,
+ Value: "NewPolicyName",
+ },
+ }
+}
+
+var originalExtractPatchInfoVar = extractPatchInfoVar
+
+func TestGetPatchInfo_Success(t *testing.T) {
+ defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }()
+
+ mockPatch := samplePatchData()
+ extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) {
+ return mockPatch, nil
+ }
+
+ res := httptest.NewRecorder()
+
+ data := &[]map[string]interface{}{{"key": "val"}}
+ patches, err := getPatchInfo(data, "/test/dir", res)
+
+ assert.NoError(t, err)
+ assert.Equal(t, mockPatch, patches)
+}
+func TestGetPatchInfo_NilPatchInfos(t *testing.T) {
+ defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }()
+
+ extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) {
+ return nil, nil
+ }
+
+ res := httptest.NewRecorder()
+
+ data := &[]map[string]interface{}{{"key": "val"}}
+ patches, err := getPatchInfo(data, "/test/dir", res)
+
+ assert.Error(t, err)
+ assert.Nil(t, patches)
+}
+func TestGetPatchInfo_ExtractError(t *testing.T) {
+ defer func() { extractPatchInfoVar = originalExtractPatchInfoVar }()
+
+ extractPatchInfoVar = func(res http.ResponseWriter, data *[]map[string]interface{}, root string) ([]opasdk.PatchImpl, error) {
+ return nil, fmt.Errorf("mock error")
+ }
+
+ data := &[]map[string]interface{}{{"key": "val"}}
+ res := httptest.NewRecorder()
+
+ patches, err := getPatchInfo(data, "/test/dir", res)
+
+ assert.Error(t, err)
+ assert.Nil(t, patches)
+}
+
+func TestHandleDynamicUpdateRequestWithKafka_KafkaDisabled_Success(t *testing.T) {
+ // Set test version of PatchDataVar
+ var patchCalled bool
+ PatchDataVar = func(patchInfos []opasdk.PatchImpl, res http.ResponseWriter) error {
+ patchCalled = true
+ return nil
+ }
+
+ req := httptest.NewRecorder()
+ patchInfos := []opasdk.PatchImpl{{}}
+
+ handleDynamicUpdateRequestWithKafka(patchInfos, req)
+
+ if patchCalled {
+ t.Errorf("Expected PatchData to be called")
+ }
+}
+
+// MockKafkaProducer implements kafkacomm.KafkaProducerInterface.
+type MockKafkaProducer struct {
+ ProduceCalled bool
+ ProducedMsg *kafka.Message
+ ProduceErr error
+ CloseCalled bool
+ FlushCalled bool
+ FlushTimeout int
+}
+
+func (m *MockKafkaProducer) Produce(msg *kafka.Message, events chan kafka.Event) error {
+ m.ProduceCalled = true
+ m.ProducedMsg = msg
+ return m.ProduceErr
+}
+
+func (m *MockKafkaProducer) Close() { m.CloseCalled = true }
+
+func (m *MockKafkaProducer) Flush(timeout int) int {
+ m.FlushCalled = true
+ m.FlushTimeout = timeout
+ return 0
+}
+
+// Test successful Produce through the interface
+func TestHandleDynamicUpdateRequestWithKafka_ProduceSuccess(t *testing.T) {
+ // Arrange
+ patches := samplePatchData()
+ mockProd := &MockKafkaProducer{}
+ PatchProducer = mockProd
+
+ resp := httptest.NewRecorder()
+
+ // Act
+ err := handleDynamicUpdateRequestWithKafka(patches, resp)
+
+ // Assert
+ assert.NoError(t, err)
+ assert.True(t, mockProd.ProduceCalled, "expected Produce to be called")
+
+}
+
+// Test nil interface returns initialization error testing.NamePreamble
+func TestHandleDynamicUpdateRequestWithKafka_ProducerNil(t *testing.T) {
+ // Arrange: clear the global producer
+ PatchProducer = nil
+
+ // Act
+ err := handleDynamicUpdateRequestWithKafka(nil, httptest.NewRecorder())
+
+ // Assert
+ assert.EqualError(t, err, "Failed to initialize Kafka producer")
+}
+
+// Test Produce error is propagated testing.NamePreamble
+func TestHandleDynamicUpdateRequestWithKafka_ProduceError(t *testing.T) {
+ // Arrange
+ mockProd := &MockKafkaProducer{ProduceErr: errors.New("produce failed")}
+ PatchProducer = mockProd
+
+ // Act
+ err := handleDynamicUpdateRequestWithKafka(nil, httptest.NewRecorder())
+
+ // Assert
+ assert.EqualError(t, err, "produce failed")
+ assert.True(t, mockProd.ProduceCalled, "Produce should be called even on error")
+}
+
+type errorWriter struct{}
+
+func (e *errorWriter) Header() http.Header {
+ return http.Header{}
+}
+
+func (e *errorWriter) Write([]byte) (int, error) {
+ return 0, errors.New("write error")
+}
+
+func (e *errorWriter) WriteHeader(statusCode int) {}
+
+func TestWriteOPADataUpdateErrorJSONResponse_EncodeFails(t *testing.T) {
+ mockRes := &errorWriter{}
+
+ respMessage := "Failed to process"
+ respCode := oapicodegen.ErrorResponseResponseCode("500")
+ errorResp := oapicodegen.ErrorResponse{
+ ErrorMessage: &respMessage,
+ ResponseCode: &respCode,
+ }
+
+ // Call the function with the mock writer that fails on encode
+ writeOPADataUpdateErrorJSONResponse(mockRes, http.StatusInternalServerError, "fail", errorResp)
+
+}
+
+func TestConstructPath_BadPatchPath(t *testing.T) {
+ rec := httptest.NewRecorder()
+ storagePath := constructPath("???", "add", "/root", rec)
+
+ assert.NotNil(t, storagePath)
+ assert.Equal(t, http.StatusOK, rec.Code)
+ assert.Contains(t, rec.Body.String(), "")
+}
+
+func TestConstructPath_InvalidPath(t *testing.T) {
+ rec := httptest.NewRecorder()
+ storagePath := constructPath("", "add", "/root", rec)
+
+ assert.Nil(t, storagePath)
+ assert.Equal(t, http.StatusBadRequest, rec.Code)
+ assert.True(t, strings.Contains(rec.Body.String(), "Invalid path"))
+}
+func TestConstructPath_RootSlash(t *testing.T) {
+ rec := httptest.NewRecorder()
+ storagePath := constructPath("sub/path", "add", "/", rec)
+
+ assert.NotNil(t, storagePath)
+ assert.Equal(t, "/sub/path", storagePath.String())
+}
+
+func TestGetDataInfo_EmptyDataPath(t *testing.T) {
+ // Backup original function
+ originalOpaSDKGetDataInfo := NewOpaSDK
+ NewOpaSDK = func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error) {
+ assert.Equal(t, "/", dataPath) // Ensure "/" is passed
+ return nil, errors.New("storage_not_found_error")
+ }
+ defer func() { NewOpaSDK = originalOpaSDKGetDataInfo }()
+
+ // Create a mock request with empty data path
+ req := httptest.NewRequest("GET", "/policy/pdpo/v1/data", nil)
+ res := httptest.NewRecorder()
+
+ // Call the function under test
+ getDataInfo(res, req)
+
+ // Validate response
+ assert.Equal(t, http.StatusNotFound, res.Code)
+ errorMessage := strings.TrimSpace(res.Body.String())
+ assert.Contains(t, errorMessage, "storage_not_found_error")
+}
+
+func TestDataHandler_GET_Success(t *testing.T) {
+ original := NewOpaSDK
+ defer func() { NewOpaSDK = original }()
+
+ NewOpaSDK = func(ctx context.Context, dataPath string) (data *oapicodegen.OPADataResponse_Data, err error) {
+ assert.Equal(t, "/some/path", dataPath)
+ return &oapicodegen.OPADataResponse_Data{}, nil
+ }
+
+ req := httptest.NewRequest(http.MethodGet, "/policy/pdpo/v1/data/some/path", nil)
+ w := httptest.NewRecorder()
+
+ DataHandler(w, req) // <---- Only this
+
+ res := w.Result()
+ defer res.Body.Close()
+
+ if res.StatusCode != http.StatusOK {
+ t.Errorf("expected status 200 OK, got %d", res.StatusCode)
+ }
+}
+
+func TestExtractPatchInfo_OperationTypeError(t *testing.T) {
+ // Arrange
+ reqOps := []map[string]interface{}{
+ {
+ "op": "invalidOp", // simulate invalid op
+ },
+ }
+ w := httptest.NewRecorder()
+
+ // Mock
+ original := getOperationTypeVar
+ defer func() { getOperationTypeVar = original }()
+ getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) {
+ return nil, fmt.Errorf("forced error") // force error
+ }
+
+ // Act
+ result, err := extractPatchInfo(w, &reqOps, "/root")
+
+ // Assert
+ assert.Nil(t, result)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "operation type")
+}
+
+func TestExtractPatchInfo_InvalidOpFieldType(t *testing.T) {
+ // Arrange
+ reqOps := []map[string]interface{}{
+ {
+ "wrongField": "add", // no "op" field
+ },
+ }
+ w := httptest.NewRecorder()
+
+ // Act
+ result, err := extractPatchInfo(w, &reqOps, "/root")
+
+ // Assert
+ assert.Nil(t, result)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "op type")
+}
+
+func TestExtractPatchInfo_GetOperationTypeError(t *testing.T) {
+ // Arrange
+ reqOps := []map[string]interface{}{
+ {
+ "op": "invalidOp",
+ },
+ }
+ w := httptest.NewRecorder()
+
+ // Mock getOperationTypeVar to simulate error
+ original := getOperationTypeVar
+ defer func() { getOperationTypeVar = original }()
+ getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) {
+ return nil, errors.New("mock getOperationType error")
+ }
+
+ // Act
+ result, err := extractPatchInfo(w, &reqOps, "/root")
+
+ // Assert
+ assert.Nil(t, result)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "operation type")
+}
+
+func TestExtractPatchInfo_NilOpType(t *testing.T) {
+ // Arrange
+ reqOps := []map[string]interface{}{
+ {
+ "op": "add",
+ },
+ }
+ w := httptest.NewRecorder()
+
+ // Mock getOperationTypeVar to return nil
+ original := getOperationTypeVar
+ defer func() { getOperationTypeVar = original }()
+ getOperationTypeVar = func(opType string, res http.ResponseWriter) (*storage.PatchOp, error) {
+ return nil, nil // returning nil without error
+ }
+
+ // Act
+ result, err := extractPatchInfo(w, &reqOps, "/root")
+
+ // Assert
+ assert.Nil(t, result)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "opType is Missing")
+}
--- /dev/null
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+//
+
+package handler
+
+import (
+ "context"
+ "encoding/json"
+ "policy-opa-pdp/pkg/data"
+ "policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/opasdk"
+)
+
+type PatchMessage struct {
+ PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+}
+
+// This function handles the incoming kafka messages and dispatches them futher for data patch processing.
+func PatchMessageHandler(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string) error {
+
+ log.Debug("Starting Patch Message Listener.....")
+ var stopConsuming bool
+ for !stopConsuming {
+ select {
+ case <-ctx.Done():
+ log.Debug("Stopping PDP Listener.....")
+ return nil
+ stopConsuming = true ///Loop Exits
+ default:
+ message, err := kafkacomm.ReadKafkaMessages(kc)
+ if err != nil {
+ continue
+ }
+ log.Debugf("[IN|KAFKA|%s]\n%s", topic, string(message))
+
+ if message != nil {
+ var patchMsg PatchMessage
+ err = json.Unmarshal(message, &patchMsg)
+ if err != nil {
+ log.Warnf("Failed to UnMarshal Messages: %v\n", err)
+ continue
+ }
+ log.Debugf("Received patch request")
+
+ if err := data.PatchDataVar(patchMsg.PatchInfos, nil); err != nil {
+ log.Debugf("patchData failed: %v", err)
+ } else {
+ log.Debugf("Successfully patched data")
+ }
+ }
+ }
+
+ }
+ return nil
+
+}
--- /dev/null
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+//
+
+package handler
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+ "github.com/open-policy-agent/opa/v1/storage"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "net/http"
+ "policy-opa-pdp/pkg/data"
+ "policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/opasdk"
+ "testing"
+ "time"
+)
+
+// --- Sample PatchImpl for testing ---
+func samplePatchData() []opasdk.PatchImpl {
+ return []opasdk.PatchImpl{
+ {
+ Path: storage.MustParsePath("/policy/config/name"),
+ Op: storage.ReplaceOp,
+ Value: "NewPolicyName",
+ },
+ }
+}
+
+var originalPatchDataVar = data.PatchDataVar
+
+func TestPatchMessageHandler_Success(t *testing.T) {
+ defer func() { data.PatchDataVar = originalPatchDataVar }()
+
+ // Mock PatchDataVar to simulate success
+ data.PatchDataVar = func(patchInfos []opasdk.PatchImpl, _ http.ResponseWriter) error {
+ return nil
+ }
+
+ msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()})
+
+ mockKafkaMessage := &kafka.Message{
+ Value: []byte(msgBytes),
+ }
+ mockConsumer := new(MockKafkaConsumer)
+ mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+ defer cancel()
+
+ err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+ assert.NoError(t, err)
+ mockConsumer.AssertExpectations(t)
+}
+
+func TestPatchMessageHandler_PatchFail(t *testing.T) {
+ defer func() { data.PatchDataVar = originalPatchDataVar }()
+
+ data.PatchDataVar = func(patchInfos []opasdk.PatchImpl, _ http.ResponseWriter) error {
+ return errors.New("mock failure")
+ }
+
+ msgBytes, _ := json.Marshal(PatchMessage{PatchInfos: samplePatchData()})
+
+ mockKafkaMessage := &kafka.Message{
+ Value: []byte(msgBytes),
+ }
+
+ mockConsumer := new(MockKafkaConsumer)
+ mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+ defer cancel()
+
+ err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+ assert.NoError(t, err)
+ mockConsumer.AssertExpectations(t)
+}
+
+func TestPatchMessageHandler_ReadError(t *testing.T) {
+ defer func() { data.PatchDataVar = originalPatchDataVar }()
+
+ mockConsumer := new(MockKafkaConsumer)
+ mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).
+ Return(nil, errors.New("read error"))
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer}
+
+ ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+ defer cancel()
+
+ err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+ assert.NoError(t, err)
+ mockConsumer.AssertExpectations(t)
+}
+
+func TestPatchMessageHandler_UnmarshalFail(t *testing.T) {
+ defer func() { data.PatchDataVar = originalPatchDataVar }()
+
+ invalidJSON := []byte(`invalid json`)
+ mockKafkaMessage := &kafka.Message{Value: invalidJSON}
+
+ mockConsumer := new(MockKafkaConsumer)
+ mockConsumer.On("ReadMessage", mock.AnythingOfType("time.Duration")).Return(mockKafkaMessage, nil)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer}
+
+ ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+ defer cancel()
+
+ err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+ assert.NoError(t, err)
+ mockConsumer.AssertExpectations(t)
+}
+
+func TestPatchMessageHandler_ContextDone(t *testing.T) {
+ mockConsumer := new(MockKafkaConsumer)
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{Consumer: mockConsumer}
+
+ // Context is cancelled immediately
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+
+ err := PatchMessageHandler(ctx, mockKafkaConsumer, "patch-topic")
+ assert.NoError(t, err)
+}
)
type KafkaConsumerInterface interface {
- ReadMessage() ([]byte, error)
+ ReadMessage(time.Duration) ([]byte, error)
ReadKafkaMessages() ([]byte, error)
}
mock.Mock
}
-func (m *MockKafkaConsumer) Unsubscribe() {
- m.Called()
+func (m *MockKafkaConsumer) Unsubscribe() error {
+ args := m.Called()
+ return args.Error(0)
}
-func (m *MockKafkaConsumer) Close() {
- m.Called()
+func (m *MockKafkaConsumer) Close() error {
+ args := m.Called()
+ return args.Error(0)
}
-func (m *MockKafkaConsumer) ReadMessage(kc *kafkacomm.KafkaConsumer) ([]byte, error) {
- args := m.Called(kc)
- return args.Get(0).([]byte), args.Error(0)
+func (m *MockKafkaConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+ args := m.Called(timeout)
+ msg := args.Get(0)
+ if msg == nil {
+ return nil, args.Error(1)
+ }
+ return msg.(*kafka.Message), args.Error(1)
}
func (m *MockKafkaConsumer) pdpUpdateMessageHandler(msg string) error {
return decodedData, keys, nil
}
-
// upsert policy to sdk.
func upsertPolicy(policy model.ToscaPolicy) error {
decodedContent, keys, _ := extractAndDecodePoliciesVar(policy)
"context"
"encoding/base64"
"errors"
+ "fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"policy-opa-pdp/pkg/utils"
"strings"
"testing"
- "fmt"
)
func TestValidatePackageName(t *testing.T) {
handlePdpUpdateDeploymentVar handlePdpUpdateDeploymentFunc = handlePdpUpdateDeployment
handlePdpUpdateUndeploymentVar handlePdpUpdateUndeploymentFunc = handlePdpUpdateUndeployment
sendPDPStatusResponseFunc = sendPDPStatusResponse
-
)
// Handles messages of type PDP_UPDATE sent from the Policy Administration Point (PAP).
failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
resMessage := fmt.Errorf("PDP Update Failed as failed to format successfullyDeployedPolicies json %v", failureMessages)
if err = sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
- log.Debugf("Failed to send update internal map error response: %v", err)
- return "", err, failureMessages
+ log.Debugf("Failed to send update internal map error response: %v", err)
+ return "", err, failureMessages
}
}
failureMessages = append(failureMessages, "|Internal Map Error:"+err.Error()+"|")
resMessage := fmt.Errorf("PDP Update Failed as failed to format successfullyUnDeployedPolicies json %v", failureMessages)
if err = sendFailureResponseVar(p, &pdpUpdate, resMessage); err != nil {
- log.Debugf("Failed to send update error response: %v", err)
- return "", err, failureMessages
+ log.Debugf("Failed to send update error response: %v", err)
+ return "", err, failureMessages
}
}
}
t.Run(tt.name, func(t *testing.T) {
// Set mock behavior
removePolicyFromSdkandDirFunc = func(policy map[string]interface{}) []string {
- return tt.mockPolicyErrors
+ return tt.mockPolicyErrors
}
removeDataFromSdkandDirFunc = func(policy map[string]interface{}) []string {
- return tt.mockDataErrors
+ return tt.mockDataErrors
}
// Call the function under test
name: "Empty JSON",
input: map[string]interface{}{},
expected: map[string]int{
- // No child nodes
+ // No child nodes
},
},
{
name: "Single Level JSON",
input: map[string]interface{}{
- "key1": map[string]interface{}{
- "child1": "value1",
- "child2": "value2",
- },
- "key2": map[string]interface{}{
- "childA": "valueA",
- },
+ "key1": map[string]interface{}{
+ "child1": "value1",
+ "child2": "value2",
+ },
+ "key2": map[string]interface{}{
+ "childA": "valueA",
+ },
},
expected: map[string]int{
- "node/key1": 2, // key1 has 2 children
- "node/key2": 1, // key2 has 1 child
+ "node/key1": 2, // key1 has 2 children
+ "node/key2": 1, // key2 has 1 child
},
},
{
name: "Nested JSON",
input: map[string]interface{}{
- "root": map[string]interface{}{
- "level1": map[string]interface{}{
- "level2": map[string]interface{}{
- "child1": "value1",
- "child2": "value2",
- },
- },
- },
+ "root": map[string]interface{}{
+ "level1": map[string]interface{}{
+ "level2": map[string]interface{}{
+ "child1": "value1",
+ "child2": "value2",
+ },
+ },
+ },
},
expected: map[string]int{
- "node/root": 1, // root has 1 child (level1)
- "node/root/level1": 1, // level1 has 1 child (level2)
- "node/root/level1/level2": 2, // level2 has 2 children
+ "node/root": 1, // root has 1 child (level1)
+ "node/root/level1": 1, // level1 has 1 child (level2)
+ "node/root/level1/level2": 2, // level2 has 2 children
},
},
{
name: "Mixed Data Types",
input: map[string]interface{}{
- "parent": map[string]interface{}{
- "child1": "string",
- "child2": 42,
- "child3": map[string]interface{}{
- "subchild1": true,
- "subchild2": nil,
- },
- },
+ "parent": map[string]interface{}{
+ "child1": "string",
+ "child2": 42,
+ "child3": map[string]interface{}{
+ "subchild1": true,
+ "subchild2": nil,
+ },
+ },
},
expected: map[string]int{
- "node/parent": 3, // parent has 3 children
- "node/parent/child3": 2, // child3 has 2 children
+ "node/parent": 3, // parent has 3 children
+ "node/parent/child3": 2, // child3 has 2 children
},
},
}
t.Run(tt.name, func(t *testing.T) {
got := countChildKeysFromJSON(tt.input)
if !reflect.DeepEqual(got, tt.expected) {
- t.Errorf("countChildKeysFromJSON() = %v, expected %v", got, tt.expected)
+ t.Errorf("countChildKeysFromJSON() = %v, expected %v", got, tt.expected)
}
})
}
result, err := analyzeHierarchy(tt.parentDataJson, tt.dataPath)
if tt.expectedErr {
- assert.Error(t, err)
+ assert.Error(t, err)
} else {
- assert.NoError(t, err)
- assert.Equal(t, tt.expectedPath, result)
+ assert.NoError(t, err)
+ assert.Equal(t, tt.expectedPath, result)
}
})
}
name: "Success - Valid Parent Data Exists",
inputPath: "/parent/child",
mockResponse: map[string]interface{}{
- "child": "data",
+ "child": "data",
},
mockError: nil,
expectedOutput: "/parent/child",
t.Run(tt.name, func(t *testing.T) {
// Mock function behavior
opasdkGetData = func(ctx context.Context, dataPath string) (*oapicodegen.OPADataResponse_Data, error) {
- if tt.mockResponse != nil {
- jsonData, _ := json.Marshal(tt.mockResponse)
- var resData oapicodegen.OPADataResponse_Data
- _ = json.Unmarshal(jsonData, &resData)
- return &resData, tt.mockError
- }
- return nil, tt.mockError
+ if tt.mockResponse != nil {
+ jsonData, _ := json.Marshal(tt.mockResponse)
+ var resData oapicodegen.OPADataResponse_Data
+ _ = json.Unmarshal(jsonData, &resData)
+ return &resData, tt.mockError
+ }
+ return nil, tt.mockError
}
// Call function
// Validate results
if tt.expectError {
- assert.Error(t, err, "Expected error but got none")
+ assert.Error(t, err, "Expected error but got none")
} else {
- assert.NoError(t, err, "Expected no error but got one")
- assert.Equal(t, tt.expectedOutput, output, "Unexpected output")
+ assert.NoError(t, err, "Expected no error but got one")
+ assert.Equal(t, tt.expectedOutput, output, "Unexpected output")
}
})
}
t.Run(tt.name, func(t *testing.T) {
// Mock function variables
analyseEmptyParentNodesFunc = func(dataPath string) (string, error) {
- return dataPath, tt.mockAnalyseErr
+ return dataPath, tt.mockAnalyseErr
}
deleteDataSdkFunc = func(ctx context.Context, dataPath string) error {
- return tt.mockDeleteErr
+ return tt.mockDeleteErr
}
removeDataDirectoryFunc = func(keyPath string) error {
- return tt.mockRemoveErr
+ return tt.mockRemoveErr
}
// Call function
// Normalize nil vs empty slice
if failureMessages == nil {
- failureMessages = []string{}
+ failureMessages = []string{}
}
// Validate results
{
name: "Valid data keys",
policy: map[string]interface{}{
- "data": []interface{}{"policy.rule1", "policy.rule2"},
+ "data": []interface{}{"policy.rule1", "policy.rule2"},
},
expectedFailures: nil,
},
{
name: "Invalid data key type",
policy: map[string]interface{}{
- "data": []interface{}{"policy.rule1", 123}, // Invalid integer key
+ "data": []interface{}{"policy.rule1", 123}, // Invalid integer key
},
expectedFailures: []string{"Invalid Key :123"},
},
{
name: "Invalid JSON structure",
policy: map[string]interface{}{
- "policyId": "test-policy",
- "policyVersion": "1.0",
+ "policyId": "test-policy",
+ "policyVersion": "1.0",
},
expectedFailures: []string{": Invalid JSON structure: 'data' is missing or not an array"},
},
{
name: "Deletion failure",
policy: map[string]interface{}{
- "data": []interface{}{"invalid.path"},
+ "data": []interface{}{"invalid.path"},
},
expectedFailures: []string{"Failed to delete: /invalid/path"},
},
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"policy-opa-pdp/cfg"
"policy-opa-pdp/pkg/log"
- "sync"
"time"
)
var (
// Declare a global variable to hold the singleton KafkaConsumer
consumerInstance *KafkaConsumer
- consumerOnce sync.Once // sync.Once ensures that the consumer is created only once
)
// KafkaConsumerInterface defines the interface for a Kafka consumer.
var KafkaNewConsumer KafkaNewConsumerFunc = kafka.NewConsumer
// NewKafkaConsumer creates a new Kafka consumer and returns
-func NewKafkaConsumer() (*KafkaConsumer, error) {
+func NewKafkaConsumer(topic string, groupid string) (*KafkaConsumer, error) {
// Initialize the consumer instance only once
- consumerOnce.Do(func() {
- log.Debugf("Creating Kafka Consumer singleton instance")
- brokers := cfg.BootstrapServer
- groupid := cfg.GroupId
- topic := cfg.Topic
- useSASL := cfg.UseSASLForKAFKA
- username := cfg.KAFKA_USERNAME
- password := cfg.KAFKA_PASSWORD
-
- // Add Kafka connection properties
- configMap := &kafka.ConfigMap{
- "bootstrap.servers": brokers,
- "group.id": groupid,
- "auto.offset.reset": "latest",
- }
- fmt.Print(configMap)
- // If SASL is enabled, add SASL properties
- if useSASL == "true" {
- configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104
- configMap.SetKey("sasl.username", username) // #nosec G104
- configMap.SetKey("sasl.password", password) // #nosec G104
- configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
- configMap.SetKey("fetch.max.bytes", 50*1024*1024) // #nosec G104
- configMap.SetKey("max.partition.fetch.bytes", 50*1024*1024) // #nosec G104
- configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) // #nosec G104
- configMap.SetKey("session.timeout.ms", "30000") // #nosec G104
- configMap.SetKey("max.poll.interval.ms", "300000") // #nosec G104
- configMap.SetKey("enable.partition.eof", true) // #nosec G104
- configMap.SetKey("enable.auto.commit", true) // #nosec G104
- // configMap.SetKey("debug", "all") // Uncomment for debug
- }
+ log.Debugf("Creating Kafka Consumer singleton instance")
+ brokers := cfg.BootstrapServer
+ useSASL := cfg.UseSASLForKAFKA
+ username := cfg.KAFKA_USERNAME
+ password := cfg.KAFKA_PASSWORD
+
+ // Add Kafka connection properties
+ configMap := &kafka.ConfigMap{
+ "bootstrap.servers": brokers,
+ "group.id": groupid,
+ "auto.offset.reset": "latest",
+ }
+ fmt.Print(configMap)
+ // If SASL is enabled, add SASL properties
+ if useSASL == "true" {
+ configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104
+ configMap.SetKey("sasl.username", username) // #nosec G104
+ configMap.SetKey("sasl.password", password) // #nosec G104
+ configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
+ configMap.SetKey("fetch.max.bytes", 50*1024*1024) // #nosec G104
+ configMap.SetKey("max.partition.fetch.bytes", 50*1024*1024) // #nosec G104
+ configMap.SetKey("socket.receive.buffer.bytes", 50*1024*1024) // #nosec G104
+ configMap.SetKey("session.timeout.ms", "30000") // #nosec G104
+ configMap.SetKey("max.poll.interval.ms", "300000") // #nosec G104
+ configMap.SetKey("enable.partition.eof", true) // #nosec G104
+ configMap.SetKey("enable.auto.commit", true) // #nosec G104
+ // configMap.SetKey("debug", "all") // Uncomment for debug
+ }
- // Create a new Kafka consumer
- consumer, err := KafkaNewConsumer(configMap)
- if err != nil {
- log.Warnf("Error creating consumer: %v", err)
- return
- }
- if consumer == nil {
- log.Warnf("Kafka Consumer is nil after creation")
- return
- }
+ // Create a new Kafka consumer
+ consumer, err := KafkaNewConsumer(configMap)
+ if err != nil {
+ log.Warnf("Error creating consumer: %v", err)
+ return nil, fmt.Errorf("error creating consumer: %w", err)
+ }
+ if consumer == nil {
+ log.Warnf("Kafka Consumer is nil after creation")
+ return nil, fmt.Errorf("Kafka Consumer is nil after creation")
+ }
- // Subscribe to the topic
- err = consumer.SubscribeTopics([]string{topic}, nil)
- if err != nil {
- log.Warnf("Error subscribing to topic: %v", err)
- return
- }
- log.Debugf("Topic Subscribed: %v", topic)
+ // Subscribe to the topic
+ err = consumer.SubscribeTopics([]string{topic}, nil)
+ if err != nil {
+ log.Warnf("Error subscribing to topic: %v", err)
+ return nil, fmt.Errorf("error subscribing to topic: %w", err)
+ }
+ log.Debugf("Topic Subscribed: %v", topic)
- // Assign the consumer instance
- consumerInstance = &KafkaConsumer{Consumer: consumer}
- log.Debugf("Created SIngleton consumer instance")
- })
+ // Assign the consumer instance
+ consumerInstance = &KafkaConsumer{Consumer: consumer}
+ log.Debugf("Created SIngleton consumer instance")
// Return the singleton consumer instance
if consumerInstance == nil {
"github.com/stretchr/testify/mock"
"policy-opa-pdp/cfg"
"policy-opa-pdp/pkg/kafkacomm/mocks"
- "sync"
"testing"
)
mockConsumer := new(MockKafkaConsumer)
- consumer, err := NewKafkaConsumer()
+ consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId)
assert.NoError(t, err)
assert.NotNil(t, consumer)
// Assuming configuration is correctly loaded from cfg package
// You can mock or override cfg values here if needed
- consumer, err := NewKafkaConsumer()
+ consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId)
assert.NoError(t, err, "Expected no error when creating Kafka consumer")
assert.NotNil(t, consumer, "Expected a non-nil KafkaConsumer")
// Helper function to reset
func resetKafkaConsumerSingleton() {
- consumerOnce = sync.Once{}
consumerInstance = nil
}
return nil, fmt.Errorf("mock error creating consumer")
}
- consumer, err := NewKafkaConsumer()
+ consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId)
assert.Nil(t, consumer)
- assert.EqualError(t, err, "Kafka Consumer instance not created")
+ assert.EqualError(t, err, "error creating consumer: mock error creating consumer")
KafkaNewConsumer = originalNewKafkaConsumer
}
return nil, nil
}
- consumer, err := NewKafkaConsumer()
+ consumer, err := NewKafkaConsumer(cfg.Topic, cfg.GroupId)
assert.Nil(t, consumer)
- assert.EqualError(t, err, "Kafka Consumer instance not created")
+ assert.EqualError(t, err, "Kafka Consumer is nil after creation")
KafkaNewConsumer = originalNewKafkaConsumer
}
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"log"
"policy-opa-pdp/cfg"
- "sync"
)
type KafkaProducerInterface interface {
var (
instance *KafkaProducer
- once sync.Once
)
// GetKafkaProducer initializes and returns a KafkaProducer instance which is a singleton.
// If SASL authentication is enabled via the configuration, the necessary credentials
// are set in the producer configuration.
//
-//nolint:gosec
+
func GetKafkaProducer(bootstrapServers, topic string) (*KafkaProducer, error) {
var err error
- once.Do(func() {
- brokers := cfg.BootstrapServer
- useSASL := cfg.UseSASLForKAFKA
- username := cfg.KAFKA_USERNAME
- password := cfg.KAFKA_PASSWORD
-
- // Add Kafka Connection Properties ....
- configMap := &kafka.ConfigMap{
- "bootstrap.servers": brokers,
- }
+ instance, err = initializeKafkaProducer(topic)
+ return instance, err
+}
- if useSASL == "true" {
- configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104
- configMap.SetKey("sasl.username", username) // #nosec G104
- configMap.SetKey("sasl.password", password) // #nosec G104
- configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
- }
+//nolint:gosec
+func initializeKafkaProducer(topic string) (*KafkaProducer, error) {
+ brokers := cfg.BootstrapServer
+ useSASL := cfg.UseSASLForKAFKA
+ username := cfg.KAFKA_USERNAME
+ password := cfg.KAFKA_PASSWORD
- p, err := kafka.NewProducer(configMap)
- if err != nil {
- return
- }
- instance = &KafkaProducer{
- producer: p,
- topic: topic,
- }
+ configMap := &kafka.ConfigMap{
+ "bootstrap.servers": brokers,
+ }
- })
- return instance, err
+ if useSASL == "true" {
+ configMap.SetKey("sasl.mechanism", "SCRAM-SHA-512") // #nosec G104
+ configMap.SetKey("sasl.username", username) // #nosec G104
+ configMap.SetKey("sasl.password", password) // #nosec G104
+ configMap.SetKey("security.protocol", "SASL_PLAINTEXT") // #nosec G104
+ }
+
+ p, err := kafka.NewProducer(configMap)
+ if err != nil {
+ return nil, err
+ }
+
+ return &KafkaProducer{
+ producer: p,
+ topic: topic,
+ }, nil
}
// Produce sends a message to the configured Kafka topic.
// It takes the message payload as a byte slice and returns any errors
func (kp *KafkaProducer) Produce(kafkaMessage *kafka.Message, eventChan chan kafka.Event) error {
+ log.Println("KafkaProducer or producer produce message")
if kafkaMessage.TopicPartition.Topic == nil {
kafkaMessage.TopicPartition = kafka.TopicPartition{
Topic: &kp.topic,
--- /dev/null
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+//
+
+package publisher
+
+import (
+ "encoding/json"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+ "policy-opa-pdp/cfg"
+ "policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/opasdk"
+)
+
+type RealPatchSender struct {
+ Producer kafkacomm.KafkaProducerInterface
+}
+
+type PatchKafkaPayload struct {
+ PatchInfos []opasdk.PatchImpl `json:"patchInfos"`
+}
+
+func (s *RealPatchSender) SendPatchMessage(patchInfos []opasdk.PatchImpl) error {
+ log.Debugf("In SendPatchMessage")
+ var topic string
+ topic = cfg.PatchTopic
+ kafkaPayload := PatchKafkaPayload{
+ PatchInfos: patchInfos,
+ }
+
+ jsonMessage, err := json.Marshal(kafkaPayload)
+ if err != nil {
+ log.Warnf("failed to marshal Patch Payload to JSON: %v", err)
+ return err
+ }
+
+ kafkaMessage := &kafka.Message{
+ TopicPartition: kafka.TopicPartition{
+ Topic: &topic,
+ Partition: kafka.PartitionAny,
+ },
+ Value: jsonMessage,
+ }
+ var eventChan chan kafka.Event = nil
+ err = s.Producer.Produce(kafkaMessage, eventChan)
+ if err != nil {
+ log.Warnf("Error producing message: %v\n", err)
+ return err
+ } else {
+ log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage))
+ }
+
+ return nil
+}
--- /dev/null
+// -
+// ========================LICENSE_START=================================
+// Copyright (C) 2025: Deutsche Telekom
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
+//
+
+package publisher
+
+import (
+ "encoding/json"
+ "errors"
+ "github.com/confluentinc/confluent-kafka-go/v2/kafka"
+ "github.com/open-policy-agent/opa/v1/storage"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "policy-opa-pdp/pkg/opasdk"
+ "testing"
+)
+
+// --- Sample PatchImpl for testing ---
+func samplePatchData() []opasdk.PatchImpl {
+ return []opasdk.PatchImpl{
+ {
+ Path: storage.MustParsePath("/policy/config/name"),
+ Op: storage.ReplaceOp,
+ Value: "NewPolicyName",
+ },
+ }
+}
+
+// --- Helper to get mock sender ---
+func getMockSender() (*RealPatchSender, *MockKafkaProducer) {
+ mockProducer := new(MockKafkaProducer)
+ sender := &RealPatchSender{
+ Producer: mockProducer,
+ }
+ return sender, mockProducer
+}
+
+// --- Test: Successful message send ---
+func TestSendPatchMessage_Success(t *testing.T) {
+ sender, mockProducer := getMockSender()
+
+ mockProducer.On("Produce", mock.Anything).Return(nil)
+
+ err := sender.SendPatchMessage(samplePatchData())
+ assert.NoError(t, err)
+ mockProducer.AssertExpectations(t)
+}
+
+// --- Test: Kafka produce failure ---
+func TestSendPatchMessage_ProduceError(t *testing.T) {
+ sender, mockProducer := getMockSender()
+
+ mockProducer.On("Produce", mock.Anything).Return(errors.New("kafka error"))
+
+ err := sender.SendPatchMessage(samplePatchData())
+ assert.Error(t, err)
+ assert.EqualError(t, err, "kafka error")
+ mockProducer.AssertExpectations(t)
+}
+
+// --- Test: JSON marshal error ---
+func TestSendPatchMessage_MarshalError(t *testing.T) {
+ sender, _ := getMockSender()
+
+ badData := []opasdk.PatchImpl{
+ {
+ Path: storage.MustParsePath("/invalid"),
+ Op: storage.AddOp,
+ Value: make(chan int), // JSON marshal fails on channels
+ },
+ }
+
+ err := sender.SendPatchMessage(badData)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "json: unsupported type: chan int")
+}
+
+// --- Test: Validate payload content ---
+func TestSendPatchMessage_PayloadContent(t *testing.T) {
+ sender, mockProducer := getMockSender()
+
+ mockProducer.On("Produce", mock.MatchedBy(func(msg *kafka.Message) bool {
+ var payload PatchKafkaPayload
+ err := json.Unmarshal(msg.Value, &payload)
+ return err == nil &&
+ len(payload.PatchInfos) == 1 &&
+ payload.PatchInfos[0].Path.String() == "/policy/config/name" &&
+ payload.PatchInfos[0].Value == "NewPolicyName"
+ })).Return(nil)
+
+ err := sender.SendPatchMessage(samplePatchData())
+ assert.NoError(t, err)
+ mockProducer.AssertExpectations(t)
+}
import (
"fmt"
+ "github.com/google/uuid"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/policymap"
"sync"
"time"
- "github.com/google/uuid"
)
var (
// -
-// ========================LICENSE_START=================================
-// Copyright (C) 2024-2025: Deutsche Telekom
//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
+// ========================LICENSE_START=================================
+// Copyright (C) 2024-2025: Deutsche Telekom
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-// SPDX-License-Identifier: Apache-2.0
-// ========================LICENSE_END===================================
+// http://www.apache.org/licenses/LICENSE-2.0
//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// SPDX-License-Identifier: Apache-2.0
+// ========================LICENSE_END===================================
package publisher
+
import (
"errors"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
"policy-opa-pdp/pkg/policymap"
"testing"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
)
+
/*
Success Case 1
TestStartHeartbeatIntervalTimer_ValidInterval
t.Errorf("Expected currentInterval to be %d, got %d", intervalMs, currentInterval)
}
}
+
/*
Failure Case 1
TestStartHeartbeatIntervalTimer_InvalidInterval
t.Log("Expected ticker to be nil for invalid interval")
}
}
+
/*
TestSendPDPHeartBeat_Success 2
Description: Test sending a heartbeat successfully.
err := sendPDPHeartBeat(mockSender)
assert.NoError(t, err)
}
+
/*
TestSendPDPHeartBeat_Failure 2
Description: Test failing to send a heartbeat.
err := sendPDPHeartBeat(mockSender)
assert.Error(t, err)
}
+
/*
TestsendPDPHeartBeat_Success 3
Description: Test sending a heartbeat successfully with some deployed policies.
err := sendPDPHeartBeat(mockSender)
assert.NoError(t, err)
}
+
/*
TestsendPDPHeartBeat_Success 4
Description: Test sending a heartbeat successfully with no deployed policies.
err := sendPDPHeartBeat(mockSender)
assert.NoError(t, err)
}
+
/*
TestStopTicker_Success 3
Description: Test stopping the ticker.
t.Errorf("Expected ticker to be nil")
}
}
+
/*
TestStopTicker_NotRunning 3
Description: Test stopping the ticker when it is not running.
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
"github.com/google/uuid"
)
-type(
+type (
SendPdpUpdateResponseFunc func(s PdpStatusSender, pdpUpdate *model.PdpUpdate, resMessage string) error
)
package metrics
import (
- "sync"
- "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus"
+ "sync"
)
+
// global counter variables
var TotalErrorCount int64
var DecisionSuccessCount int64
var UndeployFailureCount int64
var UndeploySuccessCount int64
var TotalPoliciesCount int64
+var DynamicDataUpdateSuccessCount int64
+var DynamicDataUpdateFailureCount int64
var mu sync.Mutex
-//Decision and Data counters to be used in prometheus
+// Decision and Data counters to be used in prometheus
var (
- DecisionResponseTime = prometheus.NewSummary(prometheus.SummaryOpts{
- Name: "opa_decision_response_time_seconds",
- Help: "Response time of OPA decision handler",
+ DecisionResponseTime_Prom = prometheus.NewSummary(prometheus.SummaryOpts{
+ Name: "opa_decision_response_time_seconds",
+ Help: "Response time of OPA decision handler",
+ })
+ DataResponseTime_Prom = prometheus.NewSummary(prometheus.SummaryOpts{
+ Name: "opa_data_response_time_seconds",
+ Help: "Response time of OPA data handler",
+ })
+ DecisionHandlerCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pdpo_policy_decisions_total",
+ Help: "Total Number of Decision Handler hits for OPA",
+ })
+ DeploymentSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pdpo_policy_deployments_total",
+ Help: "Total Number of Successful Deployment for OPA",
+ })
+ DeploymentFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pdpo_policy_failures_total",
+ Help: "Total Number of Deployment Failures for OPA",
+ })
+ DynamicDataUpdatesSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pdpo_dynamic_data_success_total",
+ Help: "Total Number of Successful Dynamic Data Updates for OPA",
+ })
+ DynamicDataUpdatesFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pdpo_dynamic_data_failures_total",
+ Help: "Total Number of Failed Dynamic Data Updates for OPA",
})
- DataResponseTime = prometheus.NewSummary(prometheus.SummaryOpts{
- Name: "opa_data_response_time_seconds",
- Help: "Response time of OPA data handler",
+ UndeploymentSuccessCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pdpo_policy_undeployments_success_total",
+ Help: "Total Number of Successful Deployment for OPA",
+ })
+ UndeploymentFailureCount_Prom = prometheus.NewCounter(prometheus.CounterOpts{
+ Name: "pdpo_policy_undeployments_failures_total",
+ Help: "Total Number of Deployment Failures for OPA",
})
)
-//register counters in init
+// register counters in init
func init() {
- prometheus.MustRegister(DecisionResponseTime)
- prometheus.MustRegister(DataResponseTime)
+ prometheus.MustRegister(DecisionResponseTime_Prom)
+ prometheus.MustRegister(DataResponseTime_Prom)
+ prometheus.MustRegister(DecisionHandlerCount_Prom)
+ prometheus.MustRegister(DeploymentSuccessCount_Prom)
+ prometheus.MustRegister(DeploymentFailureCount_Prom)
+ prometheus.MustRegister(DynamicDataUpdatesSuccessCount_Prom)
+ prometheus.MustRegister(DynamicDataUpdatesFailureCount_Prom)
+ prometheus.MustRegister(UndeploymentSuccessCount_Prom)
+ prometheus.MustRegister(UndeploymentFailureCount_Prom)
}
// Increment counter
return &TotalErrorCount
}
+func IncrementDynamicDataUpdateSuccessCount() {
+ mu.Lock()
+ DynamicDataUpdateSuccessCount++
+ DynamicDataUpdatesSuccessCount_Prom.Inc()
+ mu.Unlock()
+}
+
+func totalDynamicDataUpdateSuccessCountRef() *int64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return &DynamicDataUpdateSuccessCount
+
+}
+
+func IncrementDynamicDataUpdateFailureCount() {
+ mu.Lock()
+ DynamicDataUpdateFailureCount++
+ DynamicDataUpdatesFailureCount_Prom.Inc()
+ mu.Unlock()
+}
+
+func totalDynamicDataUpdateFailureCountRef() *int64 {
+ mu.Lock()
+ defer mu.Unlock()
+ return &DynamicDataUpdateFailureCount
+
+}
+
// Increment counter
func IncrementDecisionSuccessCount() {
mu.Lock()
DecisionSuccessCount++
+ DecisionHandlerCount_Prom.Inc()
mu.Unlock()
}
func IncrementDecisionFailureCount() {
mu.Lock()
DecisionFailureCount++
+ DecisionHandlerCount_Prom.Inc()
mu.Unlock()
}
func IncrementDeploySuccessCount() {
mu.Lock()
DeploySuccessCount++
+ DeploymentSuccessCount_Prom.Inc()
mu.Unlock()
}
func IncrementDeployFailureCount() {
mu.Lock()
DeployFailureCount++
+ DeploymentFailureCount_Prom.Inc()
mu.Unlock()
}
func IncrementUndeploySuccessCount() {
mu.Lock()
UndeploySuccessCount++
+ UndeploymentSuccessCount_Prom.Inc()
mu.Unlock()
}
func IncrementUndeployFailureCount() {
mu.Lock()
UndeployFailureCount++
+ UndeploymentFailureCount_Prom.Inc()
mu.Unlock()
}
import (
"encoding/json"
+ "github.com/google/uuid"
+ openapi_types "github.com/oapi-codegen/runtime/types"
"net/http"
+ "policy-opa-pdp/consts"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model/oapicodegen"
"policy-opa-pdp/pkg/utils"
- "policy-opa-pdp/consts"
- "github.com/google/uuid"
- openapi_types "github.com/oapi-codegen/runtime/types"
)
func FetchCurrentStatistics(res http.ResponseWriter, req *http.Request) {
statReport.UndeployFailureCount = totalUndeployFailureCountRef()
statReport.UndeploySuccessCount = totalUndeploySuccessCountRef()
statReport.TotalPoliciesCount = totalPoliciesCountRef()
+ statReport.DynamicDataUpdateFailureCount = totalDynamicDataUpdateFailureCountRef()
+ statReport.DynamicDataUpdateSuccessCount = totalDynamicDataUpdateSuccessCountRef()
// not implemented hardcoding the values to zero
// will be implemeneted in phase-2
// StatisticsReport defines model for StatisticsReport.
type StatisticsReport struct {
- Code *int32 `json:"code,omitempty"`
- DecisionFailureCount *int64 `json:"decisionFailureCount,omitempty"`
- DecisionSuccessCount *int64 `json:"decisionSuccessCount,omitempty"`
- DeployFailureCount *int64 `json:"deployFailureCount,omitempty"`
- DeploySuccessCount *int64 `json:"deploySuccessCount,omitempty"`
- TotalErrorCount *int64 `json:"totalErrorCount,omitempty"`
- TotalPoliciesCount *int64 `json:"totalPoliciesCount,omitempty"`
- TotalPolicyTypesCount *int64 `json:"totalPolicyTypesCount,omitempty"`
- UndeployFailureCount *int64 `json:"undeployFailureCount,omitempty"`
- UndeploySuccessCount *int64 `json:"undeploySuccessCount,omitempty"`
+ Code *int32 `json:"code,omitempty"`
+ DecisionFailureCount *int64 `json:"decisionFailureCount,omitempty"`
+ DecisionSuccessCount *int64 `json:"decisionSuccessCount,omitempty"`
+ DeployFailureCount *int64 `json:"deployFailureCount,omitempty"`
+ DeploySuccessCount *int64 `json:"deploySuccessCount,omitempty"`
+ DynamicDataUpdateFailureCount *int64 `json:"dynamicDataUpdateFailureCount,omitempty"`
+ DynamicDataUpdateSuccessCount *int64 `json:"dynamicDataUpdateSuccessCount,omitempty"`
+ TotalErrorCount *int64 `json:"totalErrorCount,omitempty"`
+ TotalPoliciesCount *int64 `json:"totalPoliciesCount,omitempty"`
+ TotalPolicyTypesCount *int64 `json:"totalPolicyTypesCount,omitempty"`
+ UndeployFailureCount *int64 `json:"undeployFailureCount,omitempty"`
+ UndeploySuccessCount *int64 `json:"undeploySuccessCount,omitempty"`
}
// DataGetParams defines parameters for DataGet.
package pdpattributes
import (
- "policy-opa-pdp/pkg/log"
"github.com/google/uuid"
+ "policy-opa-pdp/pkg/log"
)
var (
)
type (
- CreateDirectoryFunc func(dirPath string) error
+ CreateDirectoryFunc func(dirPath string) error
ValidateFieldsStructsFunc func(pdpUpdate model.PdpUpdate) error
)
var (
- CreateDirectoryVar CreateDirectoryFunc = CreateDirectory
- removeAll = os.RemoveAll
+ CreateDirectoryVar CreateDirectoryFunc = CreateDirectory
+ removeAll = os.RemoveAll
ValidateFieldsStructsVar ValidateFieldsStructsFunc = ValidateFieldsStructs
)
return string(output), nil
}
-
type CommonFields struct {
CurrentDate *string
CurrentDateTime *time.Time
}
commonFields := CommonFields{
- CurrentDate: ¤tDate,
- CurrentDateTime: updateReq.CurrentDateTime,
- CurrentTime: updateReq.CurrentTime,
- TimeOffset: updateReq.TimeOffset,
- TimeZone: updateReq.TimeZone,
- OnapComponent: updateReq.OnapComponent,
- OnapInstance: updateReq.OnapInstance,
- OnapName: updateReq.OnapName,
- PolicyName: convertPtrToString(updateReq.PolicyName),
-
+ CurrentDate: ¤tDate,
+ CurrentDateTime: updateReq.CurrentDateTime,
+ CurrentTime: updateReq.CurrentTime,
+ TimeOffset: updateReq.TimeOffset,
+ TimeZone: updateReq.TimeZone,
+ OnapComponent: updateReq.OnapComponent,
+ OnapInstance: updateReq.OnapInstance,
+ OnapName: updateReq.OnapName,
+ PolicyName: convertPtrToString(updateReq.PolicyName),
}
- return validateCommonFields(commonFields)
+ return validateCommonFields(commonFields)
}
}
commonFields := CommonFields{
- CurrentDate: ¤tDate,
- CurrentDateTime: decisionReq.CurrentDateTime,
- CurrentTime: decisionReq.CurrentTime,
- TimeOffset: decisionReq.TimeOffset,
- TimeZone: decisionReq.TimeZone,
- OnapComponent: decisionReq.OnapComponent,
- OnapInstance: decisionReq.OnapInstance,
- OnapName: decisionReq.OnapName,
- PolicyName: decisionReq.PolicyName,
+ CurrentDate: ¤tDate,
+ CurrentDateTime: decisionReq.CurrentDateTime,
+ CurrentTime: decisionReq.CurrentTime,
+ TimeOffset: decisionReq.TimeOffset,
+ TimeZone: decisionReq.TimeZone,
+ OnapComponent: decisionReq.OnapComponent,
+ OnapInstance: decisionReq.OnapInstance,
+ OnapName: decisionReq.OnapName,
+ PolicyName: decisionReq.PolicyName,
}
return validateCommonFields(commonFields)