"policy-opa-pdp/pkg/kafkacomm/publisher"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/opasdk"
+ "policy-opa-pdp/pkg/pdpattributes"
"syscall"
"time"
)
time.Sleep(10 * time.Second)
- // pdp registration
- isRegistered := registerPDPFunc(sender)
- if !isRegistered {
- return
- }
+ pdpattributes.SetPdpHeartbeatInterval(int64(consts.DefaultHeartbeatMS))
+ go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, sender)
time.Sleep(10 * time.Second)
log.Debugf("After registration successful delay")
// OkCode - The Code for HealthCheck response
// HealthCheckMessage - The Healtcheck Message
// SingleHierarchy - The Counter indicates the length of datakey path
+// DefaultHeartbeatMS - The default interval for heartbeat signals in milliseconds.
var (
LogFilePath = "/var/logs/logs.log"
LogMaxSize = 10
OkCode = int32(200)
HealthCheckMessage = "alive"
SingleHierarchy = 4
+ DefaultHeartbeatMS = 60000
)
log.Debugf("PDP_UPDATE Message received: %s", string(message))
pdpattributes.SetPdpSubgroup(pdpUpdate.PdpSubgroup)
- pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
if len(pdpUpdate.PoliciesToBeDeployed) > 0 {
failureMessage, successfullyDeployedPolicies := handlePolicyDeploymentVar(pdpUpdate, p)
}
}
log.Infof("PDP_STATUS Message Sent Successfully")
- go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+ log.Debug(pdpUpdate.PdpHeartbeatIntervalMs)
+
+ if pdpattributes.PdpHeartbeatInterval != pdpUpdate.PdpHeartbeatIntervalMs && pdpUpdate.PdpHeartbeatIntervalMs != 0 {
+ //restart the ticker.
+ publisher.StopTicker()
+ pdpattributes.SetPdpHeartbeatInterval(pdpUpdate.PdpHeartbeatIntervalMs)
+ go publisher.StartHeartbeatIntervalTimer(pdpattributes.PdpHeartbeatInterval, p)
+ }
+
return nil
}
import (
"fmt"
- "github.com/google/uuid"
"policy-opa-pdp/consts"
"policy-opa-pdp/pkg/log"
"policy-opa-pdp/pkg/model"
"policy-opa-pdp/pkg/policymap"
"sync"
"time"
+ "github.com/google/uuid"
)
var (
stopChan chan bool
currentInterval int64
mu sync.Mutex
+ wg sync.WaitGroup
)
// Initializes a timer that sends periodic heartbeat messages to indicate the health and state of the PDP.
return
} else if intervalMs == 0 {
intervalMs = currentInterval
-
}
if ticker != nil && intervalMs == currentInterval {
return
}
- if ticker != nil {
- ticker.Stop()
- }
-
currentInterval = intervalMs
ticker = time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
- log.Debugf("New Ticker %d", currentInterval)
+ log.Debugf("New Ticker started with interval %d", currentInterval)
stopChan = make(chan bool)
+ wg.Add(1)
go func() {
+ defer wg.Done()
for {
select {
case <-ticker.C:
log.Errorf("Failed to send PDP Heartbeat: %v", err)
}
case <-stopChan:
+ log.Debugf("Stopping ticker")
ticker.Stop()
return
}
}
}
+ pdpSubGroup := pdpattributes.GetPdpSubgroup()
+
+ if pdpSubGroup == "" || len(pdpSubGroup) == 0 {
+ pdpStatus.PdpSubgroup = nil
+ pdpStatus.Description = "Pdp Status Registration Message"
+ }
+
err := s.SendPdpStatus(pdpStatus)
log.Debugf("Sending Heartbeat ...")
if err != nil {
if ticker != nil && stopChan != nil {
stopChan <- true
close(stopChan)
- ticker.Stop()
- ticker = nil
stopChan = nil
+ wg.Wait() //wait for the goroutine to finish
+ log.Debugf("goroutine finsihed ...")
} else {
log.Debugf("Ticker is not Running")
}
import (
"errors"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
"policy-opa-pdp/pkg/kafkacomm/publisher/mocks"
"policy-opa-pdp/pkg/policymap"
"testing"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
)
/*
mockSender := new(mocks.PdpStatusSender)
mockSender.On("SendPdpStatus", mock.Anything).Return(nil)
StartHeartbeatIntervalTimer(1000, mockSender)
-
+ wg.Done()
StopTicker()
mu.Lock()
defer mu.Unlock()
- if ticker != nil {
+ if stopChan != nil {
t.Errorf("Expected ticker to be nil")
}
}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package pdpattributes
import (
- "github.com/google/uuid"
"policy-opa-pdp/pkg/log"
+ "github.com/google/uuid"
)
var (
}
// Retrieves the current PDP subgroup value.
-func getPdpSubgroup() string {
+func GetPdpSubgroup() string {
return PdpSubgroup
}
// -
// ========================LICENSE_START=================================
-// Copyright (C) 2024: Deutsche Telekom
+// Copyright (C) 2024-2025: Deutsche Telekom
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
t.Run("ValidSubgroup", func(t *testing.T) {
expectedSubgroup := "subgroup1"
SetPdpSubgroup(expectedSubgroup)
- assert.Equal(t, expectedSubgroup, getPdpSubgroup(), "Expected PDP subgroup to match set value")
+ assert.Equal(t, expectedSubgroup, GetPdpSubgroup(), "Expected PDP subgroup to match set value")
})
}
func TestSetPdpSubgroup_Failure(t *testing.T) {
t.Run("EmptySubgroup", func(t *testing.T) {
SetPdpSubgroup("")
- assert.Equal(t, "", getPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
+ assert.Equal(t, "", GetPdpSubgroup(), "Expected PDP subgroup to be empty when set to empty string")
})
t.Run("LargeSubgroup", func(t *testing.T) {
largeSubgroup[i] = 'a'
}
SetPdpSubgroup(string(largeSubgroup))
- assert.Equal(t, string(largeSubgroup), getPdpSubgroup(), "Expected large PDP subgroup to match set value")
+ assert.Equal(t, string(largeSubgroup), GetPdpSubgroup(), "Expected large PDP subgroup to match set value")
})
}