From: gururajarao79 Date: Tue, 25 Mar 2025 09:14:38 +0000 (+0100) Subject: retry registration using heartbeat ticker X-Git-Tag: 1.0.4^0 X-Git-Url: https://gerrit.onap.org/r/gitweb?a=commitdiff_plain;h=3773a47d2a2d58a13c9d29ecdf2ddbb4ceead45d;p=policy%2Fopa-pdp.git retry registration using heartbeat ticker Issue-ID: POLICY-5325 Change-Id: I5f5aba2d758a53ea9f44ef19a25c87f448cb4c2f Signed-off-by: gururajarao79 --- diff --git a/cmd/opa-pdp/opa-pdp.go b/cmd/opa-pdp/opa-pdp.go index fa83cdc..6880f0e 100644 --- a/cmd/opa-pdp/opa-pdp.go +++ b/cmd/opa-pdp/opa-pdp.go @@ -34,6 +34,7 @@ import ( "policy-opa-pdp/pkg/kafkacomm/publisher" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/opasdk" + "policy-opa-pdp/pkg/pdpattributes" "syscall" "time" ) @@ -93,11 +94,8 @@ func main() { time.Sleep(10 * time.Second) - // pdp registration - isRegistered := registerPDPFunc(sender) - if !isRegistered { - return - } + pdpattributes.SetPdpHeartbeatInterval(int64(consts.DefaultHeartbeatMS)) + go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, sender) time.Sleep(10 * time.Second) log.Debugf("After registration successful delay") diff --git a/consts/constants.go b/consts/constants.go index d345895..6e2eedb 100644 --- a/consts/constants.go +++ b/consts/constants.go @@ -49,6 +49,7 @@ package consts // OkCode - The Code for HealthCheck response // HealthCheckMessage - The Healtcheck Message // SingleHierarchy - The Counter indicates the length of datakey path +// DefaultHeartbeatMS - The default interval for heartbeat signals in milliseconds. var ( LogFilePath = "/var/logs/logs.log" LogMaxSize = 10 @@ -77,4 +78,5 @@ var ( OkCode = int32(200) HealthCheckMessage = "alive" SingleHierarchy = 4 + DefaultHeartbeatMS = 60000 ) diff --git a/pkg/kafkacomm/handler/pdp_update_message_handler.go b/pkg/kafkacomm/handler/pdp_update_message_handler.go index 58ee1b0..9268115 100644 --- a/pkg/kafkacomm/handler/pdp_update_message_handler.go +++ b/pkg/kafkacomm/handler/pdp_update_message_handler.go @@ -79,7 +79,6 @@ func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error log.Debugf("PDP_UPDATE Message received: %s", string(message)) pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup) - pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs) if len(pdpUpdate.PoliciesToBeDeployed) > 0 { failureMessage, successfullyDeployedPolicies := handlePolicyDeploymentVar(pdpUpdate, p) @@ -131,7 +130,15 @@ func pdpUpdateMessageHandler(message []byte, p publisher.PdpStatusSender) error } } log.Infof("PDP_STATUS Message Sent Successfully") - go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p) + log.Debug(pdpUpdate.PdpHeartbeatIntervalMs) + + if pdpattributes.PdpHeartbeatInterval != pdpUpdate.PdpHeartbeatIntervalMs && pdpUpdate.PdpHeartbeatIntervalMs != 0 { + //restart the ticker. + publisher.StopTicker() + pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs) + go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p) + } + return nil } diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat.go b/pkg/kafkacomm/publisher/pdp-heartbeat.go index 5d5165c..da4888f 100644 --- a/pkg/kafkacomm/publisher/pdp-heartbeat.go +++ b/pkg/kafkacomm/publisher/pdp-heartbeat.go @@ -24,7 +24,6 @@ package publisher import ( "fmt" - "github.com/google/uuid" "policy-opa-pdp/consts" "policy-opa-pdp/pkg/log" "policy-opa-pdp/pkg/model" @@ -33,6 +32,7 @@ import ( "policy-opa-pdp/pkg/policymap" "sync" "time" + "github.com/google/uuid" ) var ( @@ -40,6 +40,7 @@ var ( stopChan chan bool currentInterval int64 mu sync.Mutex + wg sync.WaitGroup ) // Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP. @@ -53,7 +54,6 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) { return } else if intervalMs == 0 { intervalMs = currentInterval - } if ticker != nil && intervalMs == currentInterval { @@ -61,16 +61,14 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) { return } - if ticker != nil { - ticker.Stop() - } - currentInterval = intervalMs ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond) - log.Debugf("New Ticker %d", currentInterval) + log.Debugf("New Ticker started with interval %d", currentInterval) stopChan = make(chan bool) + wg.Add(1) go func() { + defer wg.Done() for { select { case <-ticker.C: @@ -78,6 +76,7 @@ func StartHeartbeatIntervalTimer(intervalMs int64, s PdpStatusSender) { log.Errorf("Failed to send PDP Heartbeat: %v", err) } case <-stopChan: + log.Debugf("Stopping ticker") ticker.Stop() return } @@ -111,6 +110,13 @@ func sendPDPHeartBeat(s PdpStatusSender) error { } } + pdpSubGroup := pdpattributes.GetPdpSubgroup() + + if pdpSubGroup == "" || len(pdpSubGroup) == 0 { + pdpStatus.PdpSubgroup = nil + pdpStatus.Description = "Pdp Status Registration Message" + } + err := s.SendPdpStatus(pdpStatus) log.Debugf("Sending Heartbeat ...") if err != nil { @@ -128,9 +134,9 @@ func StopTicker() { if ticker != nil && stopChan != nil { stopChan <- true close(stopChan) - ticker.Stop() - ticker = nil stopChan = nil + wg.Wait() //wait for the goroutine to finish + log.Debugf("goroutine finsihed ...") } else { log.Debugf("Ticker is not Running") } diff --git a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go index 9309728..ac0ad02 100644 --- a/pkg/kafkacomm/publisher/pdp-heartbeat_test.go +++ b/pkg/kafkacomm/publisher/pdp-heartbeat_test.go @@ -21,11 +21,11 @@ package publisher import ( "errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "policy-opa-pdp/pkg/kafkacomm/publisher/mocks" "policy-opa-pdp/pkg/policymap" "testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) /* @@ -151,11 +151,11 @@ func TestStopTicker_Success(t *testing.T) { mockSender := new(mocks.PdpStatusSender) mockSender.On("SendPdpStatus", mock.Anything).Return(nil) StartHeartbeatIntervalTimer(1000, mockSender) - + wg.Done() StopTicker() mu.Lock() defer mu.Unlock() - if ticker != nil { + if stopChan != nil { t.Errorf("Expected ticker to be nil") } } diff --git a/pkg/pdpattributes/pdpattributes.go b/pkg/pdpattributes/pdpattributes.go index 005dd03..a40a55a 100644 --- a/pkg/pdpattributes/pdpattributes.go +++ b/pkg/pdpattributes/pdpattributes.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,8 +22,8 @@ package pdpattributes import ( - "github.com/google/uuid" "policy-opa-pdp/pkg/log" + "github.com/google/uuid" ) var ( @@ -48,7 +48,7 @@ func SetPdpSubgroup(pdpsubgroup string) { } // Retrieves the current PDP subgroup value. -func getPdpSubgroup() string { +func GetPdpSubgroup() string { return PdpSubgroup } diff --git a/pkg/pdpattributes/pdpattributes_test.go b/pkg/pdpattributes/pdpattributes_test.go index c9a41c7..700a264 100644 --- a/pkg/pdpattributes/pdpattributes_test.go +++ b/pkg/pdpattributes/pdpattributes_test.go @@ -1,6 +1,6 @@ // - // ========================LICENSE_START================================= -// Copyright (C) 2024: Deutsche Telekom +// Copyright (C) 2024-2025: Deutsche Telekom // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -46,14 +46,14 @@ func TestSetPdpSubgroup_Success(t *testing.T) { t.Run("ValidSubgroup", func(t *testing.T) { expectedSubgroup := "subgroup1" SetPdpSubgroup(expectedSubgroup) - assert.Equal(t, expectedSubgroup, getPdpSubgroup(), "Expected PDP subgroup to match set value") + assert.Equal(t, expectedSubgroup, GetPdpSubgroup(), "Expected PDP subgroup to match set value") }) } func TestSetPdpSubgroup_Failure(t *testing.T) { t.Run("EmptySubgroup", func(t *testing.T) { SetPdpSubgroup("") - assert.Equal(t, "", getPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string") + assert.Equal(t, "", GetPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string") }) t.Run("LargeSubgroup", func(t *testing.T) { @@ -62,7 +62,7 @@ func TestSetPdpSubgroup_Failure(t *testing.T) { largeSubgroup[i] = 'a' } SetPdpSubgroup(string(largeSubgroup)) - assert.Equal(t, string(largeSubgroup), getPdpSubgroup(), "Expected large PDP subgroup to match set value") + assert.Equal(t, string(largeSubgroup), GetPdpSubgroup(), "Expected large PDP subgroup to match set value") }) }