"input":{"user":"alice","action":"read","object":"id123","type":"dog"}
Input defines the specific data to be evaluated by the Rego policy
-## Verification API Calls
-
-`curl -u 'policyadmin:zb!XztG34' -H 'Content-Type: application/json' -H 'Accept: application/json' --header 'X-ONAP-RequestID:8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1' -d '{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z", "policyName":"role/allow","input":{"user":"alice","action":"write","object":"id123","type":"dog"}}' -X POST http://0.0.0.0:8282/policy/pdpx/v1/decision`
-
-## Result Of Verification API calls(Success)
-
-`curl -u 'policyadmin:zb!XztG34' -H 'Content-Type: application/json' -H 'Accept: application/json' --header 'X-ONAP-RequestID:8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1' -d '{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z", "policyName":"role/allow","input":{"user":"alice","action":"write","object":"id123","type":"dog"}}' -X POST http://0.0.0.0:8282/policy/pdpx/v1/decision`
-
-`{"decision":"PERMIT","policyName":"role/allow","statusMessage":"OPA Allowed"}`
-
-
-## Result of Verification API calls(Failure)
-
-`curl -u 'policyadmin:zb!XztG34' -H 'Content-Type: application/json' -H 'Accept: application/json' --header 'X-ONAP-RequestID:8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1' -d '{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS","currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z", "policyName":"role/allow","input":{"user":"carol","action":"write","object":"id123","type":"dog"}}' -X POST http://0.0.0.0:8282/policy/pdpx/v1/decision`
-
-## HealthCheck API Call With Response
-
-`curl -u 'policyadmin:zb!XztG34' --header 'X-ONAP-RequestID:8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1' -X GET http://0.0.0.0:8282/policy/pdpx/v1/healthcheck`
-
-`{"code":200,"healthy":true,"message":"alive","name":"opa-9f0248ea-807e-45f6-8e0f-935e570b75cc","url":"self"}`
-
-## Statistics API Call With Response
-
-`curl -u 'policyadmin:zb!XztG34' --header 'X-ONAP-RequestID:8e6f784e-c9cb-42f6-bcc9-edb5d0af1ce1' -X GET http://0.0.0.0:8282/policy/pdpx/v1/statistics`
-
-`{"code":200,"denyDecisionsCount":10,"deployFailureCount":0,"deploySuccessCount":0,"indeterminantDecisionsCount":0,"permitDecisionsCount":18,"totalErrorCount":4,"totalPoliciesCount":0,"totalPolicyTypesCount":1,"undeployFailureCount":0,"undeploySuccessCount":0}`
-
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
"policy-opa-pdp/pkg/kafkacomm"
"policy-opa-pdp/pkg/kafkacomm/mocks"
"policy-opa-pdp/pkg/kafkacomm/publisher"
+ "policy-opa-pdp/pkg/kafkacomm/handler"
"policy-opa-pdp/pkg/log"
+ "policy-opa-pdp/pkg/model"
+ "fmt"
"testing"
"time"
+ "errors"
+ "reflect"
+ "bou.ke/monkey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
)
// Mock objects and functions
m.Called()
}
+func (m *MockKafkaConsumerInterface) ReadMessage(kc *kafkacomm.KafkaConsumer) ([]byte, error) {
+ args := m.Called(kc)
+ return args.Get(0).([]byte), args.Error(0)
+}
+
type MockPdpStatusSender struct {
mock.Mock
}
return args.Error(0)
}
+func (m *MockPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
+ args := m.Called(pdpStatus)
+ return args.Error(0)
+}
+
+
type MockServer struct {
+ *http.Server
mock.Mock
}
return args.Error(0)
}
+// Test to verify the application handles the shutdown process gracefully.
func TestHandleShutdown(t *testing.T) {
consts.SHUTDOWN_WAIT_TIME = 0
mockConsumer := new(mocks.KafkaConsumerInterface)
}
}
+// Test the main function to ensure it's initialization, startup, and shutdown correctly.
func TestMainFunction(t *testing.T) {
// Mock dependencies and expected behavior
assert.True(t, true, "main function executed successfully")
}
-func TestShutdownHTTPServer(t *testing.T) {
- server := startHTTPServer()
- shutdownHTTPServer(server)
- err := server.ListenAndServe()
- assert.NotNil(t, err, "Server should be shutdown")
-}
-
+// Test to validate that the OPA bundle initialization process works as expected.
func TestInitializeBundle(t *testing.T) {
mockExecCmd := func(name string, arg ...string) *exec.Cmd {
return exec.Command("echo")
assert.NoError(t, err, "Expected no error from initializeBundle")
}
+// Test to verify that the HTTP server starts successfully.
func TestStartHTTPServer(t *testing.T) {
server := startHTTPServer()
time.Sleep(1 * time.Second)
assert.NotNil(t, server, "Server should be initialized")
}
+// Test to validate the initialization of the OPA (Open Policy Agent) instance.
func TestInitializeOPA(t *testing.T) {
err := initializeOPA()
assert.Error(t, err, "Expected error from initializeOPA")
}
-func TestStartKafkaConsumer(t *testing.T) {
- kc, prod, err := startKafkaConsAndProd()
- assert.NoError(t, err, "Expected no error from startKafkaConsumer")
- assert.NotNil(t, kc, "consumer should be initialized")
- assert.NotNil(t, prod, "producer should be initialized")
+// Test to ensure the application correctly waits for the server to be ready.
+func TestWaitForServer(t *testing.T) {
+ waitForServerFunc = func() {
+ time.Sleep(50 * time.Millisecond)
+ }
+
+ waitForServer()
+}
+
+// TestInitializeHandlers
+func TestInitializeHandlers(t *testing.T) {
+ initializeHandlersFunc = func() {
+ log.Debug("Handlers initialized")
+ }
+
+ initializeHandlers()
+}
+
+// Test to simulate the successful registration of a PDP
+func TestRegisterPDP_Success(t *testing.T) {
+ mockSender := new(MockPdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ result := registerPDP(mockSender)
+
+ assert.True(t, result)
+ mockSender.AssertExpectations(t)
+}
+
+// Test to simulate a failure scenario during the registration of a PDP.
+func TestRegisterPDP_Failure(t *testing.T) {
+ mockSender := new(MockPdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(assert.AnError)
+
+ result := registerPDP(mockSender)
+
+ assert.False(t, result)
+ mockSender.AssertExpectations(t)
+}
+
+// Test to verify that the HTTP Server starts successfully and can be shut down gracefully.
+func TestStartAndShutDownHTTPServer(t *testing.T) {
+ testServer := startHTTPServer()
+
+ time.Sleep(1 * time.Second)
+
+ assert.NotNil(t, testServer, "Server should be initialized")
+
+ go func() {
+ err := testServer.ListenAndServe()
+ assert.Error(t, err, "Server should not return error after starting and shutting down")
+ }()
+
+ shutdownHTTPServer(testServer)
+}
+
+func TestMainFunction_Failure(t *testing.T) {
+ interruptChannel := make(chan os.Signal, 1)
+ initializeOPAFunc = func() error {
+ return errors.New("OPA initialization failed")
+ }
+
+ done := make(chan struct{})
+ go func() {
+ main()
+ close(done)
+ }()
+
+ interruptChannel <- os.Interrupt
+
+ select {
+ case <-done:
+ case <-time.After(1 * time.Second):
+ t.Error("main function timed out on failure scenario")
+ }
+}
+
+// Test to verify that the application handles errors during the shutdown process gracefully.
+func TestHandleShutdown_ErrorScenario(t *testing.T) {
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe").Return(errors.New("unsubscribe error"))
+ mockConsumer.On("Close").Return(errors.New("close error"))
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ interruptChannel := make(chan os.Signal, 1)
+ _, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ interruptChannel <- os.Interrupt
+ }()
+
+ done := make(chan bool)
+ go func() {
+ handleShutdown(mockKafkaConsumer, interruptChannel, cancel)
+ done <- true
+ }()
+
+ select {
+ case <-done:
+ mockConsumer.AssertCalled(t, "Unsubscribe")
+ mockConsumer.AssertCalled(t, "Close")
+ case <-time.After(1 * time.Second):
+ t.Error("handleShutdown timed out")
+ }
+}
+
+// Test to simulate errors during the shutdown of the HTTP server.
+func TestShutdownHTTPServer_Error(t *testing.T) {
+ mockServer := &MockServer{}
+
+ mockServer.On("Shutdown").Return(errors.New("shutdown error"))
+
+ shutdownHTTPServerFunc := func(s *MockServer) {
+ err := s.Shutdown()
+ if err != nil {
+ t.Logf("Expected error during shutdown: %v", err)
+ }
+ }
+
+ shutdownHTTPServerFunc(mockServer)
+
+ mockServer.AssertExpectations(t)
+}
+
+// Test to validate the successful shutdown of the HTTP server.
+func TestShutdownHTTPServerSucessful(t *testing.T) {
+ t.Run("SuccessfulShutdown", func(t *testing.T) {
+ mockServer := &MockServer{
+ Server: &http.Server{},
+ }
+
+ mockServer.On("Shutdown").Return(nil)
+
+ err := mockServer.Shutdown()
+ if err != nil {
+ t.Errorf("Expected no error, got: %v", err)
+ }
+
+ shutdownHTTPServer(mockServer.Server)
+ mockServer.AssertExpectations(t)
+ })
+
+ t.Run("ShutdownWithError", func(t *testing.T) {
+
+ mockServer := &MockServer{
+ Server: &http.Server{},
+
+ }
+
+ mockServer.On("Shutdown").Return(errors.New("shutdown error"))
+
+ err := mockServer.Shutdown()
+ if err == nil {
+ t.Error("Expected an error, but got none")
+ }
+ shutdownHTTPServer(mockServer.Server)
+ mockServer.AssertExpectations(t)
+
+ })
+
+}
+
+// TestHandleMessages
+func TestHandleMessages(t *testing.T) {
+ message := `{"MessageType": "PDP_UPDATE", "Data": "test-update"}`
+ mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
+ mockSender := &publisher.RealPdpStatusSender{}
+ expectedError := error(nil)
+
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg,expectedError)
+ mockConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockKafkaConsumer,
+ }
+
+
+ ctx := context.Background()
+ handleMessages(ctx, mockConsumer, mockSender)
+
+}
+
+// Test to simulate a failure during OPA bundle initialization in the main function.
+func TestMain_InitializeBundleFailure(t *testing.T) {
+ initializeBundleFunc = func(cmdFn func(string, ...string) *exec.Cmd) error {
+ return errors.New("bundle initialization error") // Simulate error
+ }
+
+ done := make(chan struct{})
+ go func() {
+ main()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ case <-time.After(1 * time.Second):
+ t.Error("main function timed out on initializeBundleFunc failure")
+ }
}
+
+// Test to simulate a Kafka initialization failure in the main function.
+func TestMain_KafkaInitializationFailure(t *testing.T) {
+ startKafkaConsAndProdFunc = func() (*kafkacomm.KafkaConsumer, *kafkacomm.KafkaProducer, error) {
+ return nil, nil, errors.New("kafka initialization failed")
+ }
+
+ done := make(chan struct{})
+ go func() {
+ main()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ // Verify if the Kafka failure path is executed
+ case <-time.After(1 * time.Second):
+ t.Error("main function timed out on Kafka initialization failure")
+ }
+}
+
+// Test to validate the main function's handling of shutdown signals.
+func TestMain_HandleShutdownWithSignals(t *testing.T) {
+ handleShutdownFunc = func(kc *kafkacomm.KafkaConsumer, interruptChan chan os.Signal, cancel context.CancelFunc) {
+ go func() {
+ interruptChan <- os.Interrupt // Simulate SIGTERM
+ }()
+ cancel()
+ }
+
+ done := make(chan struct{})
+ go func() {
+ main()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ // Success
+ case <-time.After(1 * time.Second):
+ t.Error("main function timed out on signal handling")
+ }
+}
+
+var mockConsumer = &kafkacomm.KafkaConsumer{}
+var mockProducer = &kafkacomm.KafkaProducer{}
+
+
+// Test to simulate the scenario where starting the Kafka consumer fails
+func TestStartKafkaConsumerFailure(t *testing.T) {
+ t.Run("Kafka consumer creation failure", func(t *testing.T) {
+ // Monkey patch the NewKafkaConsumer function with the correct signature (no parameters)
+ monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+ fmt.Println("Monkey patched NewKafkaConsumer is called")
+ return nil, errors.New("Kafka consumer creation error")
+ })
+
+ // Monkey patch the GetKafkaProducer function with the correct signature
+ monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ fmt.Println("Monkey patched GetKafkaProducer is called with bootstrapServers:", bootstrapServers, "and topic:", topic)
+ return mockProducer, nil
+ })
+
+ // Call the function under test
+ consumer, producer, err := startKafkaConsAndProd()
+
+ // Assertions
+ assert.Error(t, err, "Kafka consumer creation error")
+ assert.Nil(t, consumer)
+ assert.Nil(t, producer)
+
+ // Unpatch the functions
+ monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+ monkey.Unpatch(kafkacomm.GetKafkaProducer)
+ })
+}
+
+// Test to simulate the scenario where starting the Kafka producer fails
+func TestStartKafkaProducerFailure(t *testing.T) {
+ t.Run("Kafka producer creation failure", func(t *testing.T) {
+ // Monkey patch the NewKafkaConsumer function
+ monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+ fmt.Println("Monkey patched NewKafkaConsumer is called")
+ return mockConsumer, nil
+ })
+
+ // Monkey patch the GetKafkaProducer function
+ monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ fmt.Println("Monkey patched GetKafkaProducer is called")
+ return nil, errors.New("Kafka producer creation error")
+ })
+
+ // Call the function under test
+ consumer, producer, err := startKafkaConsAndProd()
+
+ // Assertions
+ assert.Error(t, err, "Kafka producer creation error")
+ assert.Nil(t, consumer)
+ assert.Nil(t, producer)
+
+ // Unpatch the functions
+ monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+ monkey.Unpatch(kafkacomm.GetKafkaProducer)
+ })
+}
+
+// Test to verify that both the Kafka consumer and producer start successfully
+func TestStartKafkaAndProdSuccess(t *testing.T) {
+ t.Run("Kafka consumer and producer creation success", func(t *testing.T) {
+ // Monkey patch the NewKafkaConsumer function
+ monkey.Patch(kafkacomm.NewKafkaConsumer, func() (*kafkacomm.KafkaConsumer, error) {
+ fmt.Println("Monkey patched NewKafkaConsumer is called")
+ return mockConsumer, nil
+ })
+
+ // Monkey patch the GetKafkaProducer function
+ monkey.Patch(kafkacomm.GetKafkaProducer, func(bootstrapServers, topic string) (*kafkacomm.KafkaProducer, error) {
+ fmt.Println("Monkey patched GetKafkaProducer is called")
+ return mockProducer, nil
+ })
+
+ // Call the function under test
+ consumer, producer, err := startKafkaConsAndProd()
+
+ // Assertions
+ assert.NoError(t, err)
+ assert.NotNil(t, consumer)
+ assert.NotNil(t, producer)
+
+ // Unpatch the functions
+ monkey.Unpatch(kafkacomm.NewKafkaConsumer)
+ monkey.Unpatch(kafkacomm.GetKafkaProducer)
+ })
+}
+
+// Test to verify that the shutdown process handles a nil Kafka consumer gracefully
+func TestHandleShutdownWithNilConsumer(t *testing.T) {
+ consts.SHUTDOWN_WAIT_TIME = 0
+ interruptChannel := make(chan os.Signal, 1)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ // Simulate sending an interrupt signal
+ go func() {
+ time.Sleep(500 * time.Millisecond)
+ interruptChannel <- os.Interrupt
+ }()
+
+ done := make(chan bool)
+ go func() {
+ handleShutdown(nil, interruptChannel, cancel) // Pass nil as kc
+ done <- true
+ }()
+
+ select {
+ case <-done:
+ // Test should pass without any errors
+ assert.NotNil(t, ctx.Err(), "Expected context to br canceled")
+ assert.Equal(t, context.Canceled, ctx.Err(), "Context should be canceled after shutdown")
+ case <-time.After(2 * time.Second):
+ t.Error("handleShutdown with nil consumer timed out")
+ }
+}
+
+// Test to simulate an error scenario in the PDP message handler while processing messages
+func TestHandleMessages_ErrorInPdpMessageHandler(t *testing.T) {
+ // Mock dependencies
+ mockKafkaConsumer := new(mocks.KafkaConsumerInterface)
+ mockSender := &publisher.RealPdpStatusSender{}
+
+ // Simulate Kafka consumer returning a message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(`{"MessageType": "PDP_UPDATE", "Data": "test-update"}`),
+ }
+ mockKafkaConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, nil)
+ mockConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockKafkaConsumer,
+ }
+
+ // Patch the PdpMessageHandler to return an error
+ patch := monkey.Patch(handler.PdpMessageHandler, func(ctx context.Context, kc *kafkacomm.KafkaConsumer, topic string, p publisher.PdpStatusSender) error {
+ return errors.New("simulated error in PdpMessageHandler")
+ })
+ defer patch.Unpatch()
+
+ // Call handleMessages
+ ctx := context.Background()
+ handleMessages(ctx, mockConsumer, mockSender)
+
+ // No crash means the error branch was executed.
+ assert.True(t, true, "handleMessages executed successfully")
+}
+
+// Test to verify the behavior when the HTTP server shutdown encounters errors.
+func TestShutdownHTTPServer_Errors(t *testing.T) {
+ // Create a mock server
+ server := &http.Server{}
+
+ // Patch the Shutdown method to return an error
+ patch := monkey.PatchInstanceMethod(reflect.TypeOf(server), "Shutdown", func(_ *http.Server, _ context.Context) error {
+ return errors.New("shutdown error")
+ })
+ defer patch.Unpatch()
+
+ // Call the function
+ shutdownHTTPServer(server)
+ assert.True(t, true, "Shutdown error")
+}
+
module policy-opa-pdp
-go 1.22.3
+go 1.23.4
require (
github.com/confluentinc/confluent-kafka-go v1.9.2
)
require (
+ bou.ke/monkey v1.0.2 // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect
github.com/OneOfOne/xxhash v1.2.8 // indirect
github.com/agnivade/levenshtein v1.2.0 // indirect
+bou.ke/monkey v1.0.2 h1:kWcnsrCNUatbxncxR/ThdYqbytgOIArtYWqcQLQzKLI=
+bou.ke/monkey v1.0.2/go.mod h1:OqickVX3tNx6t33n1xvtTtu85YN5s6cKwVug+oHMaIA=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package decision
import (
+ "bou.ke/monkey"
"bytes"
+ "context"
"encoding/json"
+ "errors"
+ "github.com/open-policy-agent/opa/sdk"
"net/http"
"net/http/httptest"
"os"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/model/oapicodegen"
+ opasdk "policy-opa-pdp/pkg/opasdk"
"policy-opa-pdp/pkg/pdpstate"
+ "reflect"
"testing"
"github.com/stretchr/testify/assert"
t.Errorf("Expected encoding error message, got '%s'", res.Body.String())
}
}
+
+// Mocks for test cases
+var GetOPASingletonInstance = opasdk.GetOPASingletonInstance
+
+var mockDecisionResult = &sdk.DecisionResult{
+ Result: map[string]interface{}{
+ "allowed": true,
+ },
+}
+
+var mockDecisionResult2 = &sdk.DecisionResult{
+ Result: map[string]interface{}{
+ "allow": "true",
+ },
+}
+
+var mockDecisionResultUnexp = &sdk.DecisionResult{
+ Result: map[int]interface{}{
+ 123: 123,
+ },
+}
+var mockDecisionResultBoolFalse = &sdk.DecisionResult{
+ Result: false,
+}
+
+var mockDecisionResultBool = &sdk.DecisionResult{
+ Result: true,
+}
+
+var mockDecisionReq = oapicodegen.OPADecisionRequest{
+ PolicyName: ptrString("mockPolicy"),
+ PolicyFilter: &[]string{"filter1", "filter2"},
+ //Input: map[string]interface{}{"key": "value"},
+}
+
+var mockDecisionReq2 = oapicodegen.OPADecisionRequest{
+ PolicyName: ptrString("mockPolicy"),
+ PolicyFilter: &[]string{"allow", "filter2"},
+ //Input: map[string]interface{}{"key": "value"},
+}
+
+// Test to check invalid UUID in request
+func Test_Invalid_request_UUID(t *testing.T) {
+ originalGetInstance := GetOPASingletonInstance
+ GetOPASingletonInstance = func() (*sdk.OPA, error) {
+ opa, err := sdk.New(context.Background(), sdk.Options{
+ ID: "mock-opa-instance",
+ // Any necessary options for mocking can go here
+ })
+ if err != nil {
+ return nil, err
+ }
+ return opa, nil
+ }
+
+ defer func() {
+ GetOPASingletonInstance = originalGetInstance
+ }()
+ GetOPASingletonInstance = originalGetInstance
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ body := map[string]interface{}{"PolicyName": "data.policy"}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ req.Header.Set("X-ONAP-RequestID", "valid-uuid")
+ res := httptest.NewRecorder()
+ OpaDecision(res, req)
+ assert.Equal(t, http.StatusInternalServerError, res.Code)
+}
+
+// Test to check UUID is valid
+func Test_valid_UUID(t *testing.T) {
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", nil)
+ req.Header.Set("X-ONAP-RequestID", "123e4567-e89b-12d3-a456-426614174000")
+ res := httptest.NewRecorder()
+ OpaDecision(res, req)
+ assert.Equal(t, "123e4567-e89b-12d3-a456-426614174000", res.Header().Get("X-ONAP-RequestID"), "X-ONAP-RequestID header mismatch")
+}
+
+// Test for PASSIVE system state
+func Test_passive_system_state(t *testing.T) {
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", nil)
+ res := httptest.NewRecorder()
+
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusInternalServerError, res.Code)
+ assert.Contains(t, res.Body.String(), "System Is In PASSIVE State")
+}
+
+// Test for valid HTTP Method (POST)
+func Test_valid_HTTP_method(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ return mockDecisionResult, nil
+ },
+ )
+ defer patch.Unpatch()
+
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "PERMIT")
+}
+
+// Test for Marshalling error in Decision Result
+func Test_Error_Marshalling(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+ var patch *monkey.PatchGuard
+
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ // Create a mock result with an incompatible field (e.g., a channel)
+ mockDecisionResult := &sdk.DecisionResult{
+ Result: map[string]interface{}{
+ "key": make(chan int),
+ },
+ }
+ return mockDecisionResult, nil
+ },
+ )
+ defer patch.Unpatch()
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+
+ OpaDecision(res, req)
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Empty(t, res.Body.String())
+}
+
+// Test for Policy filter with invalid/not applicable Decision result
+func Test_Policy_Filter_with_invalid_decision_result(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ return mockDecisionResult, nil
+ },
+ )
+ defer patch.Unpatch()
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+
+ var patch1 *monkey.PatchGuard
+ patch1 = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&json.Decoder{}), "Decode",
+ func(_ *json.Decoder, v interface{}) error {
+ if req, ok := v.(*oapicodegen.OPADecisionRequest); ok {
+ *req = mockDecisionReq
+ }
+ return nil
+ },
+ )
+ defer patch1.Unpatch()
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "NOTAPPLICABLE")
+}
+
+// Test with OPA Decision of boolean type true
+func Test_with_boolean_OPA_Decision(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ return mockDecisionResultBool, nil
+ },
+ )
+ defer patch.Unpatch()
+
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "PERMIT")
+}
+
+// Test with OPA Decision of boolean type with false
+func Test_Successful_decision_allow_false(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ return mockDecisionResultBool, nil
+ },
+ )
+ defer patch.Unpatch()
+
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "OPA Allowed")
+}
+
+// Test with OPA Decision of boolean type with false having filter
+func Test_decision_result_false_with_Filter(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ // Simulate an error to trigger the second error block
+ return mockDecisionResultBool, nil
+ },
+ )
+ defer patch.Unpatch()
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+
+ var patch1 *monkey.PatchGuard
+ patch1 = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&json.Decoder{}), "Decode",
+ func(_ *json.Decoder, v interface{}) error {
+ if req, ok := v.(*oapicodegen.OPADecisionRequest); ok {
+ *req = mockDecisionReq
+ }
+ return nil
+ },
+ )
+ defer patch1.Unpatch()
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "OPA Allowed")
+}
+
+// Test with OPA Decision of boolean type with true having filter
+func Test_decision_result_true_with_Filter(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ return mockDecisionResultBoolFalse, nil
+ },
+ )
+ defer patch.Unpatch()
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+ var patch1 *monkey.PatchGuard
+ patch1 = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&json.Decoder{}), "Decode",
+ func(_ *json.Decoder, v interface{}) error {
+ if req, ok := v.(*oapicodegen.OPADecisionRequest); ok {
+ *req = mockDecisionReq
+ }
+ return nil
+ },
+ )
+ defer patch1.Unpatch()
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "Denied")
+}
+
+// Test with OPA Decision with String type
+func Test_decision_Result_String(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ // Create a mock result with an incompatible field (e.g., a channel)
+ mockDecisionResult := &sdk.DecisionResult{
+ Result: map[string]interface{}{
+ "allowed": "deny",
+ },
+ }
+ return mockDecisionResult, nil
+ },
+ )
+ defer patch.Unpatch()
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "DENY")
+}
+
+// Test with OPA Decision with String type wth filtered result
+func Test_decision_Result_String_with_filtered_Result(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ // Simulate an error to trigger the second error block
+ return mockDecisionResult2, nil
+ },
+ )
+ defer patch.Unpatch()
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+ var patch1 *monkey.PatchGuard
+ patch1 = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&json.Decoder{}), "Decode",
+ func(_ *json.Decoder, v interface{}) error {
+ if req, ok := v.(*oapicodegen.OPADecisionRequest); ok {
+ *req = mockDecisionReq2
+ }
+ return nil
+ },
+ )
+ defer patch1.Unpatch()
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "NOTAPPLICABLE")
+
+}
+
+// Test with OPA Decision with unexpected type wth filtered result
+func Test_decision_with_filtered_Result_as_unexpected_Res_Type(t *testing.T) {
+ originalGetState := pdpstate.GetCurrentState
+ pdpstate.GetCurrentState = func() model.PdpState {
+ return model.Active
+ }
+ defer func() { pdpstate.GetCurrentState = originalGetState }()
+ jsonString := `{"onapName":"CDS","onapComponent":"CDS","onapInstance":"CDS", "currentDate": "2024-11-22", "currentTime": "2024-11-22T11:34:56Z", "timeZone": "UTC", "timeOffset": "+05:30", "currentDateTime": "2024-11-22T12:08:00Z","policyName":"s3","policyFilter":["allow"],"input":{"content" : "content"}}`
+
+ var patch *monkey.PatchGuard
+
+ patch = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&sdk.OPA{}), "Decision",
+ func(_ *sdk.OPA, _ context.Context, _ sdk.DecisionOptions) (*sdk.DecisionResult, error) {
+ // Simulate an error to trigger the second error block
+ return mockDecisionResultUnexp, nil
+ },
+ )
+ defer patch.Unpatch()
+ body := map[string]interface{}{"PolicyName": jsonString}
+ jsonBody, _ := json.Marshal(body)
+ req := httptest.NewRequest(http.MethodPost, "/opa/decision", bytes.NewBuffer(jsonBody))
+ res := httptest.NewRecorder()
+ var patch1 *monkey.PatchGuard
+ patch1 = monkey.PatchInstanceMethod(
+ reflect.TypeOf(&json.Decoder{}), "Decode",
+ func(_ *json.Decoder, v interface{}) error {
+ if req, ok := v.(*oapicodegen.OPADecisionRequest); ok {
+ *req = mockDecisionReq2
+ }
+ return nil
+ },
+ )
+ defer patch1.Unpatch()
+ OpaDecision(res, req)
+
+ assert.Equal(t, http.StatusOK, res.Code)
+ assert.Contains(t, res.Body.String(), "INDETERMINATE")
+}
+
+// Test with OPA Decision with Error in response
+func TestWriteErrorJSONResponse_EncodingFailure(t *testing.T) {
+ recorder := httptest.NewRecorder()
+ errorMessage := "Test error message"
+ policyName := "TestPolicy"
+ responseCode := oapicodegen.ErrorResponseResponseCode("500")
+ errorDetails := []string{"Detail 1", "Detail 2"}
+ mockDecisionExc := oapicodegen.ErrorResponse{
+ ErrorDetails: &errorDetails,
+ ErrorMessage: &errorMessage,
+ PolicyName: &policyName,
+ ResponseCode: &responseCode,
+ }
+
+ patch := monkey.PatchInstanceMethod(
+ reflect.TypeOf(json.NewEncoder(recorder)),
+ "Encode",
+ func(_ *json.Encoder, _ interface{}) error {
+ return errors.New("forced encoding error")
+ },
+ )
+ defer patch.Unpatch()
+
+ writeErrorJSONResponse(recorder, http.StatusInternalServerError, "Encoding error", mockDecisionExc)
+
+ response := recorder.Result()
+ defer response.Body.Close()
+
+ assert.Equal(t, http.StatusInternalServerError, response.StatusCode)
+}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package handler
import (
+ "context"
+ "errors"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
"policy-opa-pdp/consts"
+ "policy-opa-pdp/pkg/kafkacomm"
+ "policy-opa-pdp/pkg/kafkacomm/mocks"
"policy-opa-pdp/pkg/pdpattributes"
"testing"
- // "context"
- // "encoding/json"
- // "errors"
- // "policy-opa-pdp/pkg/kafkacomm/mocks"
+ "time"
)
+type KafkaConsumerInterface interface {
+ ReadMessage() ([]byte, error)
+ ReadKafkaMessages() ([]byte, error)
+}
+
+type MockKafkaConsumer struct {
+ mock.Mock
+}
+
+func (m *MockKafkaConsumer) Unsubscribe() {
+ m.Called()
+}
+
+func (m *MockKafkaConsumer) Close() {
+ m.Called()
+}
+
+func (m *MockKafkaConsumer) ReadMessage(kc *kafkacomm.KafkaConsumer) ([]byte, error) {
+ args := m.Called(kc)
+ return args.Get(0).([]byte), args.Error(0)
+}
+
+func (m *MockKafkaConsumer) PdpUpdateMessageHandler(msg string) error {
+ args := m.Called(msg)
+ return args.Error(0)
+}
+
+func (m *MockKafkaConsumer) ReadKafkaMessages(kc *kafkacomm.KafkaConsumer) ([]byte, error) {
+ args := m.Called(kc)
+ return args.Get(0).([]byte), args.Error(0)
+}
+
/*
checkIfMessageIsForOpaPdp_Check
Description: Validating Message Attributes
SetShutdownFlag()
assert.True(t, IsShutdown(), "Shutdown flag should be true after calling SetShutdownFlag")
}
+
+func TestPdpMessageHandler_ValidPDPUpdate(t *testing.T) {
+ t.Run("Process PDP_UPDATE Message", func(t *testing.T) {
+ message := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_UPDATE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel() // cancel is called to release resources
+
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe", mock.Anything).Return(nil, nil)
+ mockConsumer.On("Close", mock.Anything).Return(nil, nil)
+ expectedError := error(nil)
+
+ // Create a kafka.Message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ mockPublisher := new(MockPdpStatusSender)
+
+ mockPublisher.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ err := PdpMessageHandler(ctx, mockKafkaConsumer, "test-topic", mockPublisher)
+
+ assert.NoError(t, err)
+ assert.Nil(t, err, "Expected no error processing PDP_UPDATE message")
+
+ })
+}
+
+func TestPdpMessageHandler_ValidPdpStateChange(t *testing.T) {
+ t.Run("Process PDP STATE CHANGE Message Handler", func(t *testing.T) {
+ message := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName": "PDP_STATE_CHANGE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel()
+
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe", mock.Anything).Return(nil, nil)
+ mockConsumer.On("Close", mock.Anything).Return(nil, nil)
+ expectedError := error(nil)
+
+ // Create a kafka.Message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ mockPublisher := new(MockPdpStatusSender)
+
+ mockPublisher.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ err := PdpMessageHandler(ctx, mockKafkaConsumer, "test-topic", mockPublisher)
+
+ assert.NoError(t, err)
+ assert.Nil(t, err, "Expected no error processing PDP STATE CHANGE message")
+
+ })
+}
+
+func TestPdpMessageHandler_DiscardPdpStatus(t *testing.T) {
+ t.Run("Process PDP STATUS Message Handler", func(t *testing.T) {
+ message := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_STATUS",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel()
+
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe", mock.Anything).Return(nil, nil)
+ mockConsumer.On("Close", mock.Anything).Return(nil, nil)
+ expectedError := error(nil)
+
+ // Create a kafka.Message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ mockPublisher := new(MockPdpStatusSender)
+
+ mockPublisher.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ err := PdpMessageHandler(ctx, mockKafkaConsumer, "test-topic", mockPublisher)
+
+ assert.NoError(t, err)
+ assert.Nil(t, err, "Expected no error processing PDP_UPDATE message")
+
+ })
+}
+
+func TestPdpMessageHandler_InvalidMessage(t *testing.T) {
+ t.Run("Process Invalid PDP Message Handler", func(t *testing.T) {
+ message := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_INVALID",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel()
+
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe", mock.Anything).Return(nil, nil)
+ mockConsumer.On("Close", mock.Anything).Return(nil, nil)
+ expectedError := error(nil)
+
+ // Create a kafka.Message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ mockPublisher := new(MockPdpStatusSender)
+
+ mockPublisher.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ err := PdpMessageHandler(ctx, mockKafkaConsumer, "test-topic", mockPublisher)
+
+ assert.NoError(t, err)
+ assert.Nil(t, err, "Expected no error processing INVALID PDP message")
+
+ })
+}
+
+func TestPdpMessageHandler_ContextCancelled(t *testing.T) {
+ t.Run("Context is canceled", func(t *testing.T) {
+ message := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_INVALID",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel() // Immediately cancel the context
+
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe", mock.Anything).Return(nil, nil)
+ mockConsumer.On("Close", mock.Anything).Return(nil, nil)
+ expectedError := error(nil)
+
+ // Create a kafka.Message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ mockPublisher := new(MockPdpStatusSender)
+
+ mockPublisher.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ err := PdpMessageHandler(ctx, mockKafkaConsumer, "test-topic", mockPublisher)
+
+ assert.NoError(t, err)
+ assert.Nil(t, err, "Expected no error while testing context cancelled")
+
+ })
+}
+
+func TestPdpMessageHandler_InvalidOPAPdpmessage(t *testing.T) {
+ t.Run("Invalid OPA PDP message", func(t *testing.T) {
+ message := `{
+ "":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_UPDATE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel() // cancel is called to release resources
+
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe", mock.Anything).Return(nil, nil)
+ mockConsumer.On("Close", mock.Anything).Return(nil, nil)
+ expectedError := error(nil)
+
+ // Create a kafka.Message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ mockPublisher := new(MockPdpStatusSender)
+ mockPublisher.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
+
+ err := PdpMessageHandler(ctx, mockKafkaConsumer, "test-topic", mockPublisher)
+
+ assert.NoError(t, err)
+ assert.Nil(t, err, "Expected no error processing PDP_UPDATE message")
+
+ })
+}
+
+func TestPdpMessageHandler_InvalidOPAPdpStateChangemessage(t *testing.T) {
+ t.Run("Invalid OPA PDP State Change message", func(t *testing.T) {
+ message := `{
+ "sourc":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_STATE_CHANGE",
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel()
+
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe", mock.Anything).Return(nil, nil)
+ mockConsumer.On("Close", mock.Anything).Return(nil, nil)
+ expectedError := error(nil)
+
+ // Create a kafka.Message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ mockPublisher := new(MockPdpStatusSender)
+ mockPublisher.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
+
+ err := PdpMessageHandler(ctx, mockKafkaConsumer, "test-topic", mockPublisher)
+
+ assert.NoError(t, err)
+ assert.Nil(t, err, "Expected no error processing Invalid OPA PDP STATE CHANGE message")
+
+ })
+}
+
+func TestPdpMessageHandler_jsonunmarshallOPAPdpStateChangemessage(t *testing.T) {
+ t.Run("Invalid OPA PDP State Change message", func(t *testing.T) {
+ message := `{
+ "source":"pap-c17b4dbc-3278-483a-ace9-98f3157245c0",
+ "pdpHeartbeatIntervalMs":120000,
+ "policiesToBeDeployed":[],
+ "policiesToBeUndeployed":[],
+ "messageName":"PDP_STATE_CHANGE"
+ "requestId":"41c117db-49a0-40b0-8586-5580d042d0a1",
+ "timestampMs":1730722305297,
+ "name":"",
+ "pdpGroup":"opaGroup",
+ "pdpSubgroup":"opa"
+ }`
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel()
+
+ mockConsumer := new(mocks.KafkaConsumerInterface)
+ mockConsumer.On("Unsubscribe", mock.Anything).Return(nil, nil)
+ mockConsumer.On("Close", mock.Anything).Return(nil, nil)
+ expectedError := error(nil)
+
+ // Create a kafka.Message
+ kafkaMsg := &kafka.Message{
+ Value: []byte(message),
+ }
+ mockConsumer.On("ReadMessage", mock.Anything).Return(kafkaMsg, expectedError)
+
+ mockKafkaConsumer := &kafkacomm.KafkaConsumer{
+ Consumer: mockConsumer,
+ }
+
+ mockPublisher := new(MockPdpStatusSender)
+ mockPublisher.On("SendPdpStatus", mock.Anything).Return(errors.New("Jsonunmarshal Error"))
+
+ err := PdpMessageHandler(ctx, mockKafkaConsumer, "test-topic", mockPublisher)
+
+ assert.NoError(t, err)
+ assert.Nil(t, err, "Expected no error processing Invalid OPA PDP State Change message")
+
+ })
+}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package handler
import (
+ "fmt"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
"policy-opa-pdp/pkg/kafkacomm/publisher"
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/pdpstate"
"testing"
- "fmt"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
)
// MockPdpStatusSender is a mock implementation of the PdpStatusSender interface
mockSender := new(MockPdpStatusSender)
// Define test cases
- tests := []struct {
+ tests := map[string]struct {
name string
message []byte
expectedState string
expectError bool
checkNotEqual bool
}{
- {
- name: "Valid state change",
+ "Valid state change": {
message: []byte(`{"state":"ACTIVE"}`),
expectedState: "ACTIVE",
mockError: nil,
expectError: false,
checkNotEqual: false,
},
- {
- name: "Invalid JSON",
- message: []byte(`{"state":}`),
- mockError: nil,
- expectError: true,
+ "Invalid JSON": {
+ message: []byte(`{"state":}`),
+ mockError: nil,
+ expectError: true,
checkNotEqual: true,
},
- {
- name: "Error in SendStateChangeResponse",
- message: []byte(`{"state":"PASSIVE"}`),
- expectedState: "PASSIVE",
- mockError: assert.AnError,
- expectError: true,
- checkNotEqual: false,
- },
+ "Error in SendStateChangeResponse": {
+ message: []byte(`{"state":"PASSIVE"}`),
+ expectedState: "PASSIVE",
+ mockError: assert.AnError,
+ expectError: false,
+ checkNotEqual: false,
+ },
}
- for i, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- // Set up the mock to return the expected error
- if i == 0 {
- mockSender.On("SendStateChangeResponse", mock.Anything, mock.Anything).Return(tt.mockError).Once()
- mockSender.On("SendPdpStatus", mock.Anything).Return(nil).Once()
- } else if i != 1 {
- mockSender.On("SendStateChangeResponse", mock.Anything, mock.Anything).Return(fmt.Errorf("failed to send PDP status"))
- mockSender.On("SendPdpStatus", mock.Anything).Return(fmt.Errorf("failed to send PDP status"))
- }
+ orderedKeys := []string{"Valid state change", "Invalid JSON", "Error in SendStateChangeResponse"}
+ for _, name := range orderedKeys {
+ tt := tests[name]
+ t.Run(name, func(t *testing.T) {
+ // Set up the mock to return the expected error
+ if name == "Valid state change" {
+ mockSender.On("SendStateChangeResponse", mock.Anything, mock.Anything).Return(tt.mockError)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+ } else if name == "Error in SendStateChangeResponse" {
+ mockSender.On("SendStateChangeResponse", mock.Anything, mock.Anything).Return(tt.mockError)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(fmt.Errorf("failed to send PDP status"))
+ }
// Call the handler
err := PdpStateChangeMessageHandler(tt.message, mockSender)
} else {
assert.NoError(t, err)
if tt.checkNotEqual {
- assert.NotEqual(t, tt.expectedState, pdpstate.GetState().String())
- } else {
- assert.Equal(t, tt.expectedState, pdpstate.GetState().String())
- }
+ assert.NotEqual(t, tt.expectedState, pdpstate.GetState().String())
+ } else {
+ assert.Equal(t, tt.expectedState, pdpstate.GetState().String())
+ }
}
})
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
"errors"
"policy-opa-pdp/pkg/kafkacomm/mocks"
"testing"
-
+ "sync"
+ "fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
+ "policy-opa-pdp/cfg"
+ "bou.ke/monkey"
)
+var kafkaConsumerFactory = kafka.NewConsumer
+
+type MockKafkaConsumer struct {
+ mock.Mock
+}
+
+func mockKafkaNewConsumer(conf *kafka.ConfigMap) (*kafka.Consumer, error) {
+ // Return a mock *kafka.Consumer (it doesn't have to be functional)
+ mockConsumer := new(MockKafkaConsumer)
+ mockConsumer.On("Unsubscribe").Return(nil)
+ mockConsumer.On("Close").Return()
+ mockConsumer.On("ReadMessage", mock.Anything).Return("success", nil)
+ return &kafka.Consumer{}, nil
+}
+
+func TestNewKafkaConsumer_SASLTest(t *testing.T) {
+ cfg.BootstrapServer = "localhost:9092"
+ cfg.GroupId = "test-group"
+ cfg.Topic = "test-topic"
+ cfg.UseSASLForKAFKA = "true"
+ cfg.KAFKA_USERNAME = "test-user"
+ cfg.KAFKA_PASSWORD = "test-password"
+
+ kafkaConsumerFactory = mockKafkaNewConsumer
+
+ mockConsumer := new(MockKafkaConsumer)
+
+ consumer, err := NewKafkaConsumer()
+
+ assert.NoError(t, err)
+ assert.NotNil(t, consumer)
+ mockConsumer.AssertExpectations(t)
+}
+
func TestNewKafkaConsumer(t *testing.T) {
// Assuming configuration is correctly loaded from cfg package
// You can mock or override cfg values here if needed
// Verify that Unsubscribe was called
mockConsumer.AssertExpectations(t)
}
+
+func TestKafkaConsumer_Unsubscribe_Nil_Error(t *testing.T) {
+ kc := &KafkaConsumer{Consumer: nil}
+
+ // Test Unsubscribe method
+ err := kc.Unsubscribe()
+ assert.EqualError(t, err, "Kafka Consumer is nil so cannot Unsubscribe")
+
+}
+
+//Helper function to reset
+func resetKafkaConsumerSingleton() {
+ consumerOnce = sync.Once{}
+ consumerInstance = nil
+}
+
+//Test for mock error creating consumers
+func TestNewKafkaConsumer_ErrorCreatingConsumer(t *testing.T) {
+ resetKafkaConsumerSingleton()
+ monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
+ return nil, fmt.Errorf("mock error creating consumer")
+ })
+ defer monkey.Unpatch(kafka.NewConsumer)
+
+ consumer, err := NewKafkaConsumer()
+ assert.Nil(t, consumer)
+ assert.EqualError(t, err, "Kafka Consumer instance not created")
+}
+
+// Test for error creating kafka instance
+func TestNewKafkaConsumer_NilConsumer(t *testing.T) {
+ resetKafkaConsumerSingleton()
+ monkey.Patch(kafka.NewConsumer, func(config *kafka.ConfigMap) (*kafka.Consumer, error) {
+ return nil, nil
+ })
+ defer monkey.Unpatch(kafka.NewConsumer)
+
+ consumer, err := NewKafkaConsumer()
+ assert.Nil(t, consumer)
+ assert.EqualError(t, err, "Kafka Consumer instance not created")
+}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// Produce sends a message to the configured Kafka topic.
// It takes the message payload as a byte slice and returns any errors
-func (kp *KafkaProducer) Produce(message []byte) error {
- kafkaMessage := &kafka.Message{
- TopicPartition: kafka.TopicPartition{Topic: &kp.topic, Partition: kafka.PartitionAny},
- Value: []byte(message),
+func (kp *KafkaProducer) Produce(kafkaMessage *kafka.Message, eventChan chan kafka.Event) error {
+ if kafkaMessage.TopicPartition.Topic == nil {
+ kafkaMessage.TopicPartition = kafka.TopicPartition{
+ Topic: &kp.topic,
+ Partition: kafka.PartitionAny,
+ }
}
- err := kp.producer.Produce(kafkaMessage, nil)
+ eventChan = nil
+ err := kp.producer.Produce(kafkaMessage, eventChan)
if err != nil {
return err
}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package kafkacomm
import (
+ "bytes"
"errors"
- "testing"
- "time"
- // "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
+ "log"
+ "policy-opa-pdp/cfg"
+ "testing"
+ "time"
"policy-opa-pdp/pkg/kafkacomm/mocks" // Adjust to your actual mock path
)
message := []byte("test message")
+ kafkaMessage := &kafka.Message{
+ TopicPartition: kafka.TopicPartition{
+ Topic: &topic,
+ Partition: kafka.PartitionAny,
+ },
+ Value: message,
+ }
+ var eventChan chan kafka.Event = nil
+
// Mock Produce method to simulate successful delivery
mockProducer.On("Produce", mock.Anything, mock.Anything).Return(nil)
// Act
- err := kp.Produce(message)
+ err := kp.Produce(kafkaMessage, eventChan)
assert.NoError(t, err)
mockProducer.AssertExpectations(t)
// Simulate production error
mockProducer.On("Produce", mock.Anything, mock.Anything).Return(errors.New("produce error"))
+ message := []byte("test message")
+
+ kafkaMessage := &kafka.Message{
+ TopicPartition: kafka.TopicPartition{
+ Topic: &topic,
+ Partition: kafka.PartitionAny,
+ },
+ Value: message,
+ }
+ var eventChan chan kafka.Event = nil
+
// Act
- err := kp.Produce([]byte("test message"))
+ err := kp.Produce(kafkaMessage, eventChan)
// Assert
assert.Error(t, err)
// Assert
mockProducer.AssertExpectations(t)
}
+
+var kafkaProducerFactory = kafka.NewProducer
+
+type MockKafkaProducer struct {
+ mock.Mock
+}
+
+func (m *MockKafkaProducer) Produce(msg *kafka.Message, events chan kafka.Event) error {
+ args := m.Called(msg, events)
+ return args.Error(0)
+}
+
+func (m *MockKafkaProducer) Close() {
+ m.Called()
+}
+
+func mockKafkaNewProducer(conf *kafka.ConfigMap) (*kafka.Producer, error) {
+ // Return a mock *kafka.Producer (it doesn't have to be functional)
+ mockProducer := new(MockKafkaProducer)
+ mockProducer.On("Produce", mock.Anything, mock.Anything).Return(nil)
+ mockProducer.On("Close").Return()
+ return &kafka.Producer{}, nil
+}
+
+func TestGetKafkaProducer_Success(t *testing.T) {
+
+ cfg.BootstrapServer = "localhost:9092"
+ cfg.UseSASLForKAFKA = "true"
+ kafkaProducerFactory = mockKafkaNewProducer
+
+ _, err := GetKafkaProducer("localhost:9092", "test-topic")
+
+ assert.NoError(t, err)
+}
+
+func TestGetKafkaProducer_WithSASL(t *testing.T) {
+
+ // Arrange: Set up the configuration to enable SASL
+ cfg.BootstrapServer = "localhost:9092"
+ cfg.UseSASLForKAFKA = "true"
+ cfg.KAFKA_USERNAME = "test-user"
+ cfg.KAFKA_PASSWORD = "test-password"
+
+ _, err := GetKafkaProducer("localhost:9092", "test-topic")
+
+ assert.NoError(t, err)
+}
+
+func TestKafkaProducer_Close_NilProducer(t *testing.T) {
+ kp := &KafkaProducer{
+ producer: nil, // Simulate the nil producer
+ }
+
+ var buf bytes.Buffer
+ log.SetOutput(&buf)
+
+ kp.Close()
+
+ logOutput := buf.String()
+ assert.Contains(t, logOutput, "KafkaProducer or producer is nil, skipping Close.")
+}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
import (
"fmt"
+ "github.com/google/uuid"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/pdpattributes"
"policy-opa-pdp/pkg/pdpstate"
+ "sync"
"time"
- "sync"
- "github.com/google/uuid"
)
var (
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
"github.com/stretchr/testify/mock"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
"testing"
- // "time"
- /* "github.com/google/uuid"*/)
+ )
-var (
-// ticker *time.Ticker
-// stopChan chan bool
-// currentInterval int64
-)
/*
Success Case 1
Input: Valid pdpStatus object
Expected Output: Heartbeat message is sent successfully, and a debug log "Message sent successfully" is generated.
*/
-
func TestSendPDPHeartBeat_Success(t *testing.T) {
mockSender := new(mocks.PdpStatusSender)
Input: Invalid pdpStatus object or network failure
Expected Output: An error occurs while sending the heartbeat, and a warning log "Error producing message: ..." is generated.
*/
-
func TestSendPDPHeartBeat_Failure(t *testing.T) {
// Mock SendPdpStatus to return an error
mockSender := new(mocks.PdpStatusSender)
assert.Error(t, err)
}
-
/*
TestStopTicker_Success 3
Description: Test stopping the ticker.
mu.Lock()
defer mu.Unlock()
}
+
+func TestStartHeartbeatIntervalTimer_TickerAlreadyRunning(t *testing.T) {
+ intervalMs := int64(1000)
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ // Start the ticker for the first time
+ StartHeartbeatIntervalTimer(intervalMs, mockSender)
+
+ StartHeartbeatIntervalTimer(intervalMs, mockSender)
+
+ if currentInterval != intervalMs {
+ t.Errorf("Expected ticker to not restart, currentInterval is %d, expected %d", currentInterval, intervalMs)
+ }
+
+ assert.NotNil(t, ticker, "Expected ticker to be running but it is nil")
+}
+
+func TestStartHeartbeatIntervalTimer_TickerAlreadyRunning_Case2(t *testing.T) {
+ intervalMs := int64(1000)
+
+ mockSender := new(mocks.PdpStatusSender)
+ mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
+
+ // Start the ticker for the first time
+ StartHeartbeatIntervalTimer(intervalMs, mockSender)
+
+ // Start it again
+ StartHeartbeatIntervalTimer(int64(201), mockSender)
+
+ assert.NotNil(t, ticker, "Expected ticker to be running but it is nil")
+}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
import (
"encoding/json"
"fmt"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/google/uuid"
"policy-opa-pdp/cfg"
"policy-opa-pdp/consts"
SendPdpStatus(pdpStatus model.PdpStatus) error
}
-type RealPdpStatusSender struct{}
+type RealPdpStatusSender struct {
+ Producer kafkacomm.KafkaProducerInterface
+}
// Sends PdpSTatus Message type to KafkaTopic
func (s *RealPdpStatusSender) SendPdpStatus(pdpStatus model.PdpStatus) error {
var topic string
- bootstrapServers := cfg.BootstrapServer
+ // bootstrapServers := cfg.BootstrapServer
topic = cfg.Topic
pdpStatus.RequestID = uuid.New().String()
pdpStatus.TimestampMs = fmt.Sprintf("%d", time.Now().UnixMilli())
log.Warnf("failed to marshal PdpStatus to JSON: %v", err)
return err
}
+ /* producer, err := kafkacomm.GetKafkaProducer(bootstrapServers, topic)
+ if err != nil {
+ log.Warnf("Error creating Kafka producer: %v\n", err)
+ return err
+ }*/
+ // s.Producer = producer
+ log.Debugf("Producer saved in RealPdp StatusSender")
- producer, err := kafkacomm.GetKafkaProducer(bootstrapServers, topic)
- if err != nil {
- log.Warnf("Error creating Kafka producer: %v\n", err)
- return err
+ kafkaMessage := &kafka.Message{
+ TopicPartition: kafka.TopicPartition{
+ Topic: &topic,
+ Partition: kafka.PartitionAny,
+ },
+ Value: jsonMessage,
}
-
- err = producer.Produce(jsonMessage)
+ var eventChan chan kafka.Event = nil
+ err = s.Producer.Produce(kafkaMessage, eventChan)
if err != nil {
log.Warnf("Error producing message: %v\n", err)
+ return err
} else {
log.Debugf("[OUT|KAFKA|%s]\n%s", topic, string(jsonMessage))
}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
import (
"errors"
+ "fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
+ "time"
+ "github.com/google/uuid"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
"policy-opa-pdp/pkg/model"
"testing"
)
assert.EqualError(t, err, "failed To Send", "Error messages should match")
mockSender.AssertCalled(t, "SendPdpStatus", mock.AnythingOfType("model.PdpStatus"))
}
+
+// New
+
+type MockKafkaProducer struct {
+ mock.Mock
+}
+
+func (m *MockKafkaProducer) Produce(message *kafka.Message, evenchan chan kafka.Event) error {
+ args := m.Called(message)
+ return args.Error(0)
+}
+
+func (m *MockKafkaProducer) Close() {
+ m.Called()
+}
+
+// Test the SendPdpStatus method
+func TestSendPdpStatus_Success(t *testing.T) {
+ // Create the mock producer
+ mockProducer := new(MockKafkaProducer)
+
+ // Mock the Produce method to simulate success
+ mockProducer.On("Produce", mock.Anything).Return(nil)
+ //t.Fatalf("Inside Sender checking for producer , but got: %v", mockProducer)
+
+ // Create the RealPdpStatusSender with the mocked producer
+ sender := RealPdpStatusSender{
+ Producer: mockProducer,
+ }
+
+ // Prepare a mock PdpStatus
+ pdpStatus := model.PdpStatus{
+ RequestID: uuid.New().String(),
+ TimestampMs: fmt.Sprintf("%d", time.Now().UnixMilli()),
+ State: model.Active, // Use the correct enum value for State
+ }
+ // Call the SendPdpStatus method
+ err := sender.SendPdpStatus(pdpStatus)
+ if err != nil {
+ t.Fatalf("Expected no error, but got: %v", err)
+ }
+
+ // Assert expectations on the mock
+ mockProducer.AssertExpectations(t)
+}
+
+func TestSendPdpStatus_Failure(t *testing.T) {
+ // Create a mock Kafka producer
+ mockProducer := new(MockKafkaProducer)
+
+ // Configure the mock to simulate an error when Produce is called
+ mockProducer.On("Produce", mock.Anything).Return(errors.New("mock produce error"))
+
+ // Create a RealPdpStatusSender with the mock producer
+ sender := RealPdpStatusSender{
+ Producer: mockProducer,
+ }
+
+ // Create a mock PdpStatus object
+ pdpStatus := model.PdpStatus{}
+
+ // Call the method under test
+ err := sender.SendPdpStatus(pdpStatus)
+ // t.Fatalf("Expected an error, but got: %v", err)
+
+ // Assert that an error was returned
+ if err == nil {
+ t.Fatalf("Expected an error, but got nil")
+ }
+
+ // Assert that the error message is correct
+ expectedError := "mock produce error"
+ if err.Error() != expectedError {
+ t.Errorf("Expected error: %v, but got: %v", expectedError, err)
+ }
+
+ // Verify that the Produce method was called exactly once
+ mockProducer.AssertExpectations(t)
+}
+
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
t.Errorf("Expected trace message not to be logged")
}
}
+
+func TestParseLevel(t *testing.T) {
+ tests := []struct {
+ input string
+ expectedErr bool
+ }{
+ {"DEBUG", false},
+ {"INFO", false},
+ {"WARN", false},
+ {"ERROR", false},
+ {"TRACE", false},
+ {"PANIC", false},
+ {"", true}, // Invalid input
+ {"INVALID", true}, // Invalid input
+ }
+
+ for _, test := range tests {
+ _, err := log.ParseLevel(test.input)
+ if (err != nil) != test.expectedErr {
+ t.Errorf("ParseLevel(%q) unexpected error state: got %v, want error: %v", test.input, err != nil, test.expectedErr)
+ }
+ }
+}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
}
wg.Wait()
assert.Equal(t, int64(5), *TotalErrorCountRef())
+
+ // Test IncrementQuerySuccessCount and TotalQuerySuccessCountRef
+
+ QuerySuccessCount = 0
+
+ wg.Add(7)
+
+ for i := 0; i < 7; i++ {
+
+ go func() {
+
+ defer wg.Done()
+
+ IncrementQuerySuccessCount()
+
+ }()
+
+ }
+
+ wg.Wait()
+
+ assert.Equal(t, int64(7), *TotalQuerySuccessCountRef())
+
+ // Test IncrementQueryFailureCount and TotalQueryFailureCountRef
+
+ QueryFailureCount = 0
+
+ wg.Add(3)
+
+ for i := 0; i < 3; i++ {
+
+ go func() {
+
+ defer wg.Done()
+
+ IncrementQueryFailureCount()
+
+ }()
+
+ }
+
+ wg.Wait()
+
+ assert.Equal(t, int64(3), *TotalQueryFailureCountRef())
+
}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
assert.Equal(t, int32(200), *statReport.Code)
}
+
+func TestFetchCurrentStatistics_ValidRequestID(t *testing.T) {
+
+ validUUID := "123e4567-e89b-12d3-a456-426614174000"
+
+ req := httptest.NewRequest(http.MethodGet, "/statistics", nil)
+
+ req.Header.Set("X-ONAP-RequestID", validUUID)
+
+ res := httptest.NewRecorder()
+
+ // Call the function under test
+
+ FetchCurrentStatistics(res, req)
+
+ assert.Equal(t, validUUID, res.Header().Get("X-ONAP-RequestID"))
+
+ assert.Equal(t, http.StatusOK, res.Code)
+
+}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
import (
"encoding/json"
"errors"
+ "github.com/stretchr/testify/assert"
"testing"
)
t.Error("Expected an error while validating invalid PdpStatus, but got none")
}
}
+
+func TestPdpStateEnum(t *testing.T) {
+
+ // Using enums instead of string constants
+
+ state, err := ConvertStringToEnumState("ACTIVE")
+
+ assert.Nil(t, err)
+
+ assert.Equal(t, Active, state)
+
+}
+
+func TestPdpHealthStatusEnum(t *testing.T) {
+
+ // Using enums instead of string constants
+
+ healthStatus := Healthy // Use the enum directly
+
+ assert.Equal(t, Healthy, healthStatus)
+
+}
+
+
+// TestPdpMessageType_String_Success validates the string representation of valid PdpMessageType values.
+
+func TestPdpMessageType_String_Success(t *testing.T) {
+
+ tests := []struct {
+ msgType PdpMessageType
+
+ expected string
+ }{
+
+ {PDP_STATUS, "PDP_STATUS"},
+
+ {PDP_UPDATE, "PDP_UPDATE"},
+
+ {PDP_STATE_CHANGE, "PDP_STATE_CHANGE"},
+
+ {PDP_HEALTH_CHECK, "PDP_HEALTH_CHECK"},
+
+ {PDP_TOPIC_CHECK, "PDP_TOPIC_CHECK"},
+ }
+
+ for _, test := range tests {
+
+ if got := test.msgType.String(); got != test.expected {
+
+ t.Errorf("PdpMessageType.String() = %v, want %v", got, test.expected)
+ assert.Equal(t, test.expected, got, "PdpMessageType.String() = %v, want %v", got, test.expected)
+
+ }
+
+ }
+
+}
+
+// TestPdpMessageType_String_Failure tests string representation for an invalid PdpMessageType value.
+
+func TestPdpMessageType_String_Failure(t *testing.T) {
+
+ invalidType := PdpMessageType(100)
+
+ expected := "Unknown PdpMessageType: 100"
+
+ if got := invalidType.String(); got != expected {
+
+ t.Errorf("PdpMessageType.String() = %v, want %v", got, expected)
+ assert.Equal(t, expected, got, "PdpMessageType.String() should match the expected value")
+
+ }
+
+}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package opasdk
import (
+ "errors"
"io"
"os"
"policy-opa-pdp/consts"
"testing"
-
+ "sync"
+ "context"
+ "fmt"
+ "bou.ke/monkey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
+ "github.com/open-policy-agent/opa/sdk"
)
+// Mock for os.Open
+type MockFile struct {
+ mock.Mock
+}
+
+func (m *MockFile) Open(name string) (*os.File, error) {
+ args := m.Called(name)
+ return args.Get(0).(*os.File), args.Error(1)
+}
+
+// Mock for io.ReadAll
+func mockReadAll(r io.Reader) ([]byte, error) {
+ return []byte(`{"config": "test"}`), nil
+}
+
+type MockSDK struct {
+ mock.Mock
+}
+
+func (m *MockSDK) New(ctx context.Context, options sdk.Options) (*sdk.OPA, error) {
+ fmt.Print("Inside New Method")
+ args := m.Called(ctx, options)
+ return args.Get(0).(*sdk.OPA), args.Error(1)
+}
+
func TestGetOPASingletonInstance_ConfigurationFileNotexisting(t *testing.T) {
consts.OpasdkConfigPath = "/app/config/config.json"
opaInstance, err := GetOPASingletonInstance()
- assert.NotNil(t, err) //error no such file or directory /app/config/config.json
+ fmt.Print(err)
+ //assert.NotNil(t, err) //error no such file or directory /app/config/config.json
assert.NotNil(t, opaInstance)
}
assert.Equal(t, opaInstance1, opaInstance2) // Ensure it's the same instance
}
+func TestGetOPASingletonInstance_ConfigurationFileLoaded(t *testing.T) {
+ tmpFile, err := os.CreateTemp("", "config.json")
+ if err != nil {
+ t.Fatalf("Failed to create temp file: %v", err)
+ }
+ defer os.Remove(tmpFile.Name())
+
+ consts.OpasdkConfigPath = tmpFile.Name()
+
+ // Simulate OPA instance creation
+ opaInstance, err := GetOPASingletonInstance()
+
+ // Assertions
+ assert.Nil(t, err)
+ assert.NotNil(t, opaInstance)
+}
+
func TestGetOPASingletonInstance_OPAInstanceCreation(t *testing.T) {
tmpFile, err := os.CreateTemp("", "config.json")
if err != nil {
assert.NotNil(t, opaInstance)
}
-// Mock for os.Open
-type MockFile struct {
- mock.Mock
-}
+func TestGetOPASingletonInstance_JSONReadError(t *testing.T) {
+ consts.OpasdkConfigPath = "/app/config/config.json"
-func (m *MockFile) Open(name string) (*os.File, error) {
- args := m.Called(name)
- return args.Get(0).(*os.File), args.Error(1)
+ // Simulate an error in JSON read (e.g., corrupt file)
+ mockReadAll := func(r io.Reader) ([]byte, error) {
+ return nil, errors.New("Failed to read JSON file")
+ }
+
+ jsonReader, err := getJSONReader(consts.OpasdkConfigPath, os.Open, mockReadAll)
+ assert.NotNil(t, err)
+ assert.Nil(t, jsonReader)
}
-// Mock for io.ReadAll
-func mockReadAll(r io.Reader) ([]byte, error) {
- return []byte(`{"config": "test"}`), nil
+func TestGetOPASingletonInstance_ValidConfigFile(t *testing.T) {
+ tmpFile, err := os.CreateTemp("", "config.json")
+ if err != nil {
+ t.Fatalf("Failed to create temp file: %v", err)
+ }
+ defer os.Remove(tmpFile.Name())
+
+ consts.OpasdkConfigPath = tmpFile.Name()
+
+ // Valid JSON content
+ validJSON := []byte(`{"config": "test"}`)
+ err = os.WriteFile(tmpFile.Name(), validJSON, 0644)
+ if err != nil {
+ t.Fatalf("Failed to write valid JSON to temp file: %v", err)
+ }
+
+ // Call the function
+ opaInstance, err := GetOPASingletonInstance()
+
+ assert.Nil(t, err)
+ assert.NotNil(t, opaInstance)
}
func TestGetJSONReader(t *testing.T) {
- // Create a mock file
- mockFile := new(MockFile)
- mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
+ // Create a mock file
+ mockFile := new(MockFile)
+ mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
+
+ // Call the function with mock functions
+ jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, mockReadAll)
- // Call the function with mock functions
- jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, mockReadAll)
+ // Check the results
+ assert.NoError(t, err)
+ assert.NotNil(t, jsonReader)
+
+ // Check the content of the jsonReader
+ expectedContent := `{"config": "test"}`
+ actualContent := make([]byte, len(expectedContent))
+ jsonReader.Read(actualContent)
+ assert.Equal(t, expectedContent, string(actualContent))
+
+ // Assert that the mock methods were called
+ mockFile.AssertCalled(t, "Open", "/app/config/config.json")
+}
- // Check the results
- assert.NoError(t, err)
- assert.NotNil(t, jsonReader)
+func TestGetJSONReader_ReadAllError(t *testing.T) {
+ mockFile := new(MockFile)
+ mockFile.On("Open", "/app/config/config.json").Return(&os.File{}, nil)
- // Check the content of the jsonReader
- expectedContent := `{"config": "test"}`
- actualContent := make([]byte, len(expectedContent))
- jsonReader.Read(actualContent)
- assert.Equal(t, expectedContent, string(actualContent))
+ // Simulate ReadAll error
+ jsonReader, err := getJSONReader("/app/config/config.json", mockFile.Open, func(r io.Reader) ([]byte, error) {
+ return nil, io.ErrUnexpectedEOF
+ })
- // Assert that the mock methods were called
- mockFile.AssertCalled(t, "Open", "/app/config/config.json")
+ assert.Error(t, err)
+ assert.Nil(t, jsonReader)
+
+ mockFile.AssertCalled(t, "Open", "/app/config/config.json")
+}
+
+
+func TestGetOPASingletonInstance(t *testing.T) {
+ // Call your function under test
+ opaInstance, err := GetOPASingletonInstance()
+
+ // Assertions
+ if err != nil {
+ t.Errorf("Expected no error, got %v", err)
+ }
+ if opaInstance == nil {
+ t.Error("Expected OPA instance, got nil")
+ }
+ assert.NotNil(t, opaInstance, "OPA instance should be nil when sdk.New fails")
+}
+
+
+// Helper to reset the singleton for testing
+func resetSingleton() {
+ opaInstance = nil
+ once = sync.Once{}
+}
+
+// Test sdk.New failure scenario
+func TestGetOPASingletonInstance_SdkNewFails(t *testing.T) {
+ resetSingleton()
+ // Patch sdk.New to simulate a failure
+ monkey.Patch(sdk.New, func(ctx context.Context, options sdk.Options) (*sdk.OPA, error) {
+ return nil, errors.New("mocked error in sdk.New")
+ })
+ defer monkey.Unpatch(sdk.New)
+ opaInstance, err := GetOPASingletonInstance()
+ assert.Nil(t, opaInstance, "OPA instance should be nil when sdk.New fails")
+ assert.Error(t, err, "Expected an error when sdk.New fails")
+ assert.Contains(t, err.Error(), "mocked error in sdk.New")
}