retry registration using heartbeat ticker 75/140575/3 1.0.4
authorgururajarao79 <gb00566633@techmahindra.com>
Tue, 25 Mar 2025 09:14:38 +0000 (10:14 +0100)
committergururajarao79 <gb00566633@techmahindra.com>
Tue, 25 Mar 2025 10:09:51 +0000 (11:09 +0100)
Issue-ID: POLICY-5325
Change-Id: I5f5aba2d758a53ea9f44ef19a25c87f448cb4c2f
Signed-off-by: gururajarao79 <gb00566633@techmahindra.com>
cmd/opa-pdp/opa-pdp.go
consts/constants.go
pkg/kafkacomm/handler/pdp_update_message_handler.go
pkg/kafkacomm/publisher/pdp-heartbeat.go
pkg/kafkacomm/publisher/pdp-heartbeat_test.go
pkg/pdpattributes/pdpattributes.go
pkg/pdpattributes/pdpattributes_test.go

index fa83cdc..6880f0e 100644 (file)
@@ -34,6 +34,7 @@ import (
        "policy-opa-pdp/pkg/kafkacomm/publisher"
        "policy-opa-pdp/pkg/log"
        "policy-opa-pdp/pkg/opasdk"
+       "policy-opa-pdp/pkg/pdpattributes"
        "syscall"
        "time"
 )
@@ -93,11 +94,8 @@ func main() {
 
        time.Sleep(10 * time.Second)
 
-       // pdp registration
-       isRegistered := registerPDPFunc(sender)
-       if !isRegistered {
-               return
-       }
+       pdpattributes.SetPdpHeartbeatInterval(int64(consts.DefaultHeartbeatMS))
+       go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, sender)
 
        time.Sleep(10 * time.Second)
        log.Debugf("After registration successful delay")
index d345895..6e2eedb 100644 (file)
@@ -49,6 +49,7 @@ package consts
 //     OkCode              - The Code for HealthCheck response
 //     HealthCheckMessage  - The Healtcheck Message
 //     SingleHierarchy     - The Counter indicates the length of datakey path
+//      DefaultHeartbeatMS  - The default interval for heartbeat signals in milliseconds.
 var (
        LogFilePath      = "/var/logs/logs.log"
        LogMaxSize       = 10
@@ -77,4 +78,5 @@ var (
        OkCode             = int32(200)
        HealthCheckMessage = "alive"
        SingleHierarchy    = 4
+       DefaultHeartbeatMS = 60000
 )
index 58ee1b0..9268115 100644 (file)
@@ -79,7 +79,6 @@ func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error
        log.Debugf("PDP_UPDATE Message received: %s", string(message))
 
        pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup)
-       pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
 
        if len(pdpUpdate.PoliciesToBeDeployed) > 0 {
                failureMessage, successfullyDeployedPolicies := handlePolicyDeploymentVar(pdpUpdate, p)
@@ -131,7 +130,15 @@ func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error
                }
        }
        log.Infof("PDP_STATUS Message Sent Successfully")
-       go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+       log.Debug(pdpUpdate.PdpHeartbeatIntervalMs)
+
+       if pdpattributes.PdpHeartbeatInterval != pdpUpdate.PdpHeartbeatIntervalMs && pdpUpdate.PdpHeartbeatIntervalMs != 0 {
+               //restart the ticker.
+               publisher.StopTicker()
+               pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
+               go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+       }
+
        return nil
 }
 
index 5d5165c..da4888f 100644 (file)
@@ -24,7 +24,6 @@ package publisher
 
 import (
        "fmt"
-       "github.com/google/uuid"
        "policy-opa-pdp/consts"
        "policy-opa-pdp/pkg/log"
        "policy-opa-pdp/pkg/model"
@@ -33,6 +32,7 @@ import (
        "policy-opa-pdp/pkg/policymap"
        "sync"
        "time"
+       "github.com/google/uuid"
 )
 
 var (
@@ -40,6 +40,7 @@ var (
        stopChan        chan bool
        currentInterval int64
        mu              sync.Mutex
+       wg              sync.WaitGroup
 )
 
 // Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP.
@@ -53,7 +54,6 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
                return
        } else if intervalMs == 0 {
                intervalMs = currentInterval
-
        }
 
        if ticker != nil && intervalMs == currentInterval {
@@ -61,16 +61,14 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
                return
        }
 
-       if ticker != nil {
-               ticker.Stop()
-       }
-
        currentInterval = intervalMs
 
        ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
-       log.Debugf("New Ticker %d", currentInterval)
+       log.Debugf("New Ticker started with interval %d", currentInterval)
        stopChan = make(chan bool)
+       wg.Add(1)
        go func() {
+               defer wg.Done()
                for {
                        select {
                        case <-ticker.C:
@@ -78,6 +76,7 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) {
                                        log.Errorf("Failed to send PDP Heartbeat: %v", err)
                                }
                        case <-stopChan:
+                               log.Debugf("Stopping ticker")
                                ticker.Stop()
                                return
                        }
@@ -111,6 +110,13 @@ func sendPDPHeartBeat(s PdpStatusSender) error {
                }
        }
 
+       pdpSubGroup := pdpattributes.GetPdpSubgroup()
+
+       if pdpSubGroup == "" || len(pdpSubGroup) == 0 {
+               pdpStatus.PdpSubgroup = nil
+               pdpStatus.Description = "Pdp Status Registration Message"
+       }
+
        err := s.SendPdpStatus(pdpStatus)
        log.Debugf("Sending Heartbeat ...")
        if err != nil {
@@ -128,9 +134,9 @@ func StopTicker() {
        if ticker != nil && stopChan != nil {
                stopChan <- true
                close(stopChan)
-               ticker.Stop()
-               ticker = nil
                stopChan = nil
+               wg.Wait() //wait for the goroutine to finish
+               log.Debugf("goroutine finsihed ...")
        } else {
                log.Debugf("Ticker is not Running")
        }
index 9309728..ac0ad02 100644 (file)
@@ -21,11 +21,11 @@ package publisher
 
 import (
        "errors"
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/mock"
        "policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
        "policy-opa-pdp/pkg/policymap"
        "testing"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/mock"
 )
 
 /*
@@ -151,11 +151,11 @@ func TestStopTicker_Success(t *testing.T) {
        mockSender := new(mocks.PdpStatusSender)
        mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
        StartHeartbeatIntervalTimer(1000, mockSender)
-
+       wg.Done()
        StopTicker()
        mu.Lock()
        defer mu.Unlock()
-       if ticker != nil {
+       if stopChan != nil {
                t.Errorf("Expected ticker to be nil")
        }
 }
index 005dd03..a40a55a 100644 (file)
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -22,8 +22,8 @@
 package pdpattributes
 
 import (
-       "github.com/google/uuid"
        "policy-opa-pdp/pkg/log"
+       "github.com/google/uuid"
 )
 
 var (
@@ -48,7 +48,7 @@ func SetPdpSubgroup(pdpsubgroup string) {
 }
 
 // Retrieves the current PDP subgroup value.
-func getPdpSubgroup() string {
+func GetPdpSubgroup() string {
        return PdpSubgroup
 }
 
index c9a41c7..700a264 100644 (file)
@@ -1,6 +1,6 @@
 // -
 //   ========================LICENSE_START=================================
-//   Copyright (C) 2024: Deutsche Telekom
+//   Copyright (C) 2024-2025: Deutsche Telekom
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -46,14 +46,14 @@ func TestSetPdpSubgroup_Success(t *testing.T) {
        t.Run("ValidSubgroup", func(t *testing.T) {
                expectedSubgroup := "subgroup1"
                SetPdpSubgroup(expectedSubgroup)
-               assert.Equal(t, expectedSubgroup, getPdpSubgroup(), "Expected PDP subgroup to match set value")
+               assert.Equal(t, expectedSubgroup, GetPdpSubgroup(), "Expected PDP subgroup to match set value")
        })
 }
 
 func TestSetPdpSubgroup_Failure(t *testing.T) {
        t.Run("EmptySubgroup", func(t *testing.T) {
                SetPdpSubgroup("")
-               assert.Equal(t, "", getPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
+               assert.Equal(t, "", GetPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
        })
 
        t.Run("LargeSubgroup", func(t *testing.T) {
@@ -62,7 +62,7 @@ func TestSetPdpSubgroup_Failure(t *testing.T) {
                        largeSubgroup[i] = 'a'
                }
                SetPdpSubgroup(string(largeSubgroup))
-               assert.Equal(t, string(largeSubgroup), getPdpSubgroup(), "Expected large PDP subgroup to match set value")
+               assert.Equal(t, string(largeSubgroup), GetPdpSubgroup(), "Expected large PDP subgroup to match set value")
        })
 }