2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2021 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.lifecycle;
24 import com.google.re2j.Pattern;
25 import java.lang.reflect.InvocationTargetException;
26 import java.time.Instant;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
33 import java.util.Objects;
34 import java.util.Properties;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import java.util.stream.Collectors;
42 import lombok.NonNull;
44 import org.apache.commons.beanutils.BeanUtils;
45 import org.apache.commons.lang3.StringUtils;
46 import org.onap.policy.common.capabilities.Startable;
47 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
48 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
49 import org.onap.policy.common.endpoints.event.comm.TopicSink;
50 import org.onap.policy.common.endpoints.event.comm.TopicSource;
51 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
52 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
53 import org.onap.policy.common.endpoints.listeners.ScoListener;
54 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
55 import org.onap.policy.common.utils.coder.StandardCoderObject;
56 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
57 import org.onap.policy.drools.policies.DomainMaker;
58 import org.onap.policy.drools.system.PolicyController;
59 import org.onap.policy.drools.system.PolicyEngineConstants;
60 import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
61 import org.onap.policy.models.pdp.concepts.PdpStateChange;
62 import org.onap.policy.models.pdp.concepts.PdpStatistics;
63 import org.onap.policy.models.pdp.concepts.PdpStatus;
64 import org.onap.policy.models.pdp.concepts.PdpUpdate;
65 import org.onap.policy.models.pdp.enums.PdpHealthStatus;
66 import org.onap.policy.models.pdp.enums.PdpMessageType;
67 import org.onap.policy.models.pdp.enums.PdpState;
68 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
69 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
76 public class LifecycleFsm implements Startable {
79 * Default Status Timer in seconds.
81 public static final long DEFAULT_STATUS_TIMER_SECONDS = 120L;
83 private static final Logger logger = LoggerFactory.getLogger(LifecycleFsm.class);
84 private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
86 protected static final String CONFIGURATION_PROPERTIES_NAME = "feature-lifecycle";
87 protected static final String GROUP_NAME = "lifecycle.pdp.group";
88 protected static final String PDP_TYPE = "lifecycle.pdp.type";
89 protected static final String MANDATORY_POLICY_TYPES = "lifecycle.pdp.policytypes";
91 protected static final String DEFAULT_PDP_GROUP = "defaultGroup";
92 protected static final String DEFAULT_PDP_TYPE = "drools";
94 protected static final long MIN_STATUS_INTERVAL_SECONDS = 5L;
95 protected static final String PDP_MESSAGE_NAME = "messageName";
97 protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_RULES =
98 new ToscaConceptIdentifier("onap.policies.native.drools.Artifact", "1.0.0");
100 protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_CONTROLLER =
101 new ToscaConceptIdentifier("onap.policies.native.drools.Controller", "1.0.0");
104 protected final Properties properties;
107 protected TopicSource source;
110 protected TopicSinkClient client;
113 protected final String name = PolicyEngineConstants.PDP_NAME;
115 protected LifecycleState state = new LifecycleStateTerminated(this);
118 protected ScheduledExecutorService scheduler = makeExecutor();
121 protected ScheduledFuture<?> statusTask;
124 protected MessageTypeDispatcher sourceDispatcher = new MessageTypeDispatcher(PDP_MESSAGE_NAME);
127 protected PdpStateChangeFeed stateChangeFeed = new PdpStateChangeFeed(PdpStateChange.class, this);
130 protected PdpUpdateFeed updateFeed = new PdpUpdateFeed(PdpUpdate.class, this);
134 protected long statusTimerSeconds = DEFAULT_STATUS_TIMER_SECONDS;
137 private String group;
140 protected String subGroup;
144 protected String pdpType;
147 protected Set<String> mandatoryPolicyTypes = new HashSet<>();
150 protected final Map<ToscaConceptIdentifier, PolicyTypeController> policyTypesMap = new HashMap<>();
153 protected final Map<ToscaConceptIdentifier, ToscaPolicy> policiesMap = new HashMap<>();
156 protected final PdpStatistics stats = new PdpStatistics();
161 public LifecycleFsm() {
162 properties = SystemPersistenceConstants.getManager().getProperties(CONFIGURATION_PROPERTIES_NAME);
163 setGroup(properties.getProperty(GROUP_NAME, DEFAULT_PDP_GROUP));
164 setPdpType(properties.getProperty(PDP_TYPE, DEFAULT_PDP_TYPE));
166 policyTypesMap.put(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER,
167 new PolicyTypeNativeDroolsController(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER, this));
169 POLICY_TYPE_DROOLS_NATIVE_RULES,
170 new PolicyTypeNativeArtifactController(POLICY_TYPE_DROOLS_NATIVE_RULES, this));
172 String commaSeparatedPolicyTypes = properties.getProperty(MANDATORY_POLICY_TYPES);
173 if (!StringUtils.isBlank(commaSeparatedPolicyTypes)) {
174 Collections.addAll(mandatoryPolicyTypes, COMMA_SPACE_PAT.split(commaSeparatedPolicyTypes));
177 logger.info("The mandatory Policy Types are {}. Compliance is {}",
178 mandatoryPolicyTypes, isMandatoryPolicyTypesCompliant());
180 stats.setPdpInstanceId(PolicyEngineConstants.PDP_NAME);
184 public DomainMaker getDomainMaker() {
185 return PolicyEngineConstants.getManager().getDomainMaker();
189 public boolean isAlive() {
190 return client != null && client.getSink().isAlive();
196 public PdpState state() {
197 return state.state();
203 public synchronized void setGroup(String group) {
205 this.stats.setPdpGroupName(group);
211 public synchronized void setSubGroup(String subGroup) {
212 this.subGroup = subGroup;
213 this.stats.setPdpSubGroupName(subGroup);
216 /* ** FSM events - entry points of events into the FSM ** */
219 public synchronized boolean start() {
220 logger.info("lifecycle event: start engine");
221 return state.start();
225 * Start a controller event.
227 public synchronized void start(@NonNull PolicyController controller) {
228 logger.info("lifecycle event: start controller: {}", controller.getName());
229 if (!controller.getDrools().isBrained()) {
230 logger.warn("ignoring lifecycle event: start controller: {}", controller);
234 for (ToscaConceptIdentifier id : controller.getPolicyTypes()) {
235 PolicyTypeDroolsController ptDc = (PolicyTypeDroolsController) policyTypesMap.get(id); //NOSONAR
237 policyTypesMap.put(id, new PolicyTypeDroolsController(id, this, controller));
238 logger.info("policy-type {} added", id);
240 ptDc.add(controller);
246 * Patch a controller event.
248 public synchronized void patch(@NonNull PolicyController controller) {
249 logger.info("lifecycle event: patch controller: {}", controller.getName());
250 if (controller.getDrools().isBrained()) {
251 this.start(controller);
253 this.stop(controller);
258 public synchronized boolean stop() {
259 logger.info("lifecycle event: stop engine");
264 * Stop a controller event.
266 public synchronized void stop(@NonNull PolicyController controller) {
267 logger.info("lifecycle event: stop controller: {}", controller.getName());
269 List<PolicyTypeDroolsController> opControllers =
270 policyTypesMap.values().stream()
271 .filter(PolicyTypeDroolsController.class::isInstance)
272 .map(PolicyTypeDroolsController.class::cast)
273 .filter(opController -> opController.getControllers().containsKey(controller.getName()))
274 .collect(Collectors.toList());
276 for (PolicyTypeDroolsController opController : opControllers) {
277 opController.remove(controller);
278 if (opController.controllers().isEmpty()) {
279 policyTypesMap.remove(opController.getPolicyType());
280 logger.info("policy-type {} removed", opController.getPolicyType());
286 public synchronized void shutdown() {
287 logger.info("lifecycle event: shutdown engine");
292 * Status reporting event.
293 * @return true if successful
295 public synchronized boolean status() {
296 logger.info("lifecycle event: status");
297 return state.status();
300 public synchronized boolean stateChange(PdpStateChange stateChange) {
301 logger.info("lifecycle event: state-change");
302 return state.stateChange(stateChange);
305 public synchronized boolean update(PdpUpdate update) {
306 logger.info("lifecycle event: update");
307 return state.update(update);
310 /* FSM State Actions (executed sequentially) */
312 protected boolean startAction() {
317 return startIo() && startTimers();
320 protected boolean stopAction() {
325 boolean successTimers = stopTimers();
326 boolean successIo = stopIo();
327 return successTimers && successIo;
330 protected void shutdownAction() {
335 protected boolean statusAction() {
336 return statusAction(null);
339 protected boolean statusAction(PdpResponseDetails response) {
340 return statusAction(state(), response);
343 protected boolean statusAction(PdpState state, PdpResponseDetails response) {
348 PdpStatus status = statusPayload(state);
349 if (response != null) {
350 status.setRequestId(response.getResponseTo());
351 status.setResponse(response);
354 return client.send(status);
357 protected synchronized void transitionToAction(@NonNull LifecycleState newState) {
361 protected boolean setStatusIntervalAction(long intervalSeconds) {
362 if (intervalSeconds == statusTimerSeconds || intervalSeconds == 0) {
366 if (intervalSeconds <= MIN_STATUS_INTERVAL_SECONDS) {
367 logger.warn("interval is too low (< {}): {}", MIN_STATUS_INTERVAL_SECONDS, intervalSeconds);
371 setStatusTimerSeconds(intervalSeconds);
372 return stopTimers() && startTimers();
375 protected List<ToscaPolicy> getDeployablePoliciesAction(@NonNull List<ToscaPolicy> policies) {
376 List<ToscaPolicy> deployPolicies = new ArrayList<>(policies);
377 deployPolicies.removeAll(getActivePolicies());
379 // Ensure that the sequence of policy deployments is sane to minimize potential errors,
380 // First policies to deploy are the controller related ones, those that affect the lifecycle of
381 // controllers, starting with the ones that affect the existence of the controller (native controller),
382 // second the ones that "brain" the controller with application logic (native artifacts).
383 // Lastly the application specific ones such as operational policies.
385 // group policies by policy types
386 Map<String, List<ToscaPolicy>> policyTypeGroups = groupPoliciesByPolicyType(deployPolicies);
388 // place native controller policies at the start of the list
389 List<ToscaPolicy> orderedDeployableList = new ArrayList<>(
390 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList()));
392 // add to the working list the native controller policies
393 orderedDeployableList.addAll(
394 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList()));
396 // place non-native policies to place at the end of the list
397 orderedDeployableList.addAll(getNonNativePolicies(policyTypeGroups));
399 return orderedDeployableList;
402 protected List<ToscaPolicy> getUndeployablePoliciesAction(@NonNull List<ToscaPolicy> policies) {
403 List<ToscaPolicy> undeployPolicies = new ArrayList<>(getActivePolicies());
404 undeployPolicies.removeAll(policies);
405 if (undeployPolicies.isEmpty()) {
406 return undeployPolicies;
409 // Ensure that the sequence of policy undeployments is sane to minimize potential errors,
410 // as it is assumed not smart ordering from the policies sent by the PAP.
411 // First policies to undeploy are those that are only of relevance within a drools container,
412 // such as the operational policies. The next set of policies to undeploy are those that
413 // affect the overall PDP-D application support, firstly the ones that supports the
414 // application software wiring (native rules policies), and second those that relate
415 // to the PDP-D controllers lifecycle.
417 // group policies by policy types
418 Map<String, List<ToscaPolicy>> policyTypeGroups = groupPoliciesByPolicyType(undeployPolicies);
420 // place controller only (non-native policies) at the start of the list of the undeployment list
421 List<ToscaPolicy> orderedUndeployableList = getNonNativePolicies(policyTypeGroups);
423 // add to the working list the native rules policies if any
424 orderedUndeployableList.addAll(
425 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList()));
427 // finally add to the working list native controller policies if any
428 orderedUndeployableList.addAll(
429 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList()));
431 return orderedUndeployableList;
434 protected void deployedPolicyAction(@NonNull ToscaPolicy policy) {
435 policiesMap.computeIfAbsent(policy.getIdentifier(), key -> {
436 // avoid counting reapplies in a second pass when a mix of native and non-native
437 // policies are present.
438 getStats().setPolicyDeployCount(getStats().getPolicyDeployCount() + 1);
439 getStats().setPolicyDeploySuccessCount(getStats().getPolicyDeploySuccessCount() + 1);
444 protected void undeployedPolicyAction(@NonNull ToscaPolicy policy) {
445 policiesMap.computeIfPresent(policy.getIdentifier(), (key, value) -> {
446 // avoid counting reapplies in a second pass when a mix of native and non-native
447 // policies are present.
448 getStats().setPolicyUndeployCount(getStats().getPolicyUndeployCount() + 1);
449 getStats().setPolicyUndeploySuccessCount(getStats().getPolicyUndeploySuccessCount() + 1);
454 protected void failedDeployPolicyAction(@NonNull ToscaPolicy failedPolicy) { // NOSONAR
455 getStats().setPolicyDeployCount(getStats().getPolicyDeployCount() + 1);
456 getStats().setPolicyDeployFailCount(getStats().getPolicyDeployFailCount() + 1);
459 protected void failedUndeployPolicyAction(ToscaPolicy failedPolicy) {
460 getStats().setPolicyUndeployCount(getStats().getPolicyUndeployCount() + 1);
461 getStats().setPolicyUndeployFailCount(getStats().getPolicyUndeployFailCount() + 1);
462 policiesMap.remove(failedPolicy.getIdentifier());
465 protected void updateDeployCountsAction(Long deployCount, Long deploySuccesses, Long deployFailures) {
466 PdpStatistics statistics = getStats();
467 if (deployCount != null) {
468 statistics.setPolicyDeployCount(deployCount);
471 if (deploySuccesses != null) {
472 statistics.setPolicyDeploySuccessCount(deploySuccesses);
475 if (deployFailures != null) {
476 statistics.setPolicyDeployFailCount(deployFailures);
480 protected void updateUndeployCountsAction(Long undeployCount, Long undeploySuccesses, Long undeployFailures) {
481 PdpStatistics statistics = getStats();
482 if (undeployCount != null) {
483 statistics.setPolicyUndeployCount(undeployCount);
486 if (undeploySuccesses != null) {
487 statistics.setPolicyUndeploySuccessCount(undeploySuccesses);
490 if (undeployFailures != null) {
491 statistics.setPolicyUndeployFailCount(undeployFailures);
495 protected List<ToscaPolicy> resetPoliciesAction() {
496 updateDeployCountsAction(0L, 0L, 0L);
497 updateUndeployCountsAction(0L, 0L, 0L);
498 List<ToscaPolicy> policies = new ArrayList<>(getActivePolicies());
503 protected void updatePoliciesAction(List<ToscaPolicy> toscaPolicies) {
504 this.scheduler.submit(() -> state.updatePolicies(toscaPolicies));
507 protected PolicyTypeController getController(ToscaConceptIdentifier policyType) {
508 return policyTypesMap.get(policyType);
511 protected Map<String, List<ToscaPolicy>> groupPoliciesByPolicyType(List<ToscaPolicy> deployPolicies) {
512 return deployPolicies.stream()
514 .collect(Collectors.groupingBy(policy -> policy.getTypeIdentifier().getName()));
517 protected List<ToscaPolicy> getNonNativePolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
518 return policyTypeGroups.entrySet().stream()
519 .filter(entry -> !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName())
520 && !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName()))
521 .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
524 protected List<ToscaPolicy> getNativeArtifactPolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
525 return policyTypeGroups.entrySet().stream()
526 .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName()))
527 .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
530 protected List<ToscaPolicy> getNativeControllerPolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
531 return policyTypeGroups.entrySet().stream()
532 .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName()))
533 .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
537 * Get the policy identifiers.
539 public List<ToscaConceptIdentifier> getPolicyIds(List<ToscaPolicy> policies) {
540 return policies.stream()
541 .map(ToscaPolicy::getIdentifier)
543 .collect(Collectors.toList());
546 protected String getPolicyIdsMessage(List<ToscaPolicy> policies) {
547 return getPolicyIds(policies).toString();
550 protected List<ToscaPolicy> removeByPolicyId(@NonNull List<ToscaPolicy> policies,
551 @NonNull List<ToscaConceptIdentifier> toRemoveList) {
552 policies.removeIf(policy -> toRemoveList.contains(policy.getIdentifier()));
556 protected List<ToscaPolicy> removeByPolicyId(@NonNull List<ToscaConceptIdentifier> toRemoveList) {
557 return removeByPolicyId(getActivePolicies(), toRemoveList);
560 protected List<ToscaPolicy> mergePolicies(@NonNull List<ToscaPolicy> addPolicies,
561 @NonNull List<ToscaConceptIdentifier> removePolicies) {
563 if (addPolicies.isEmpty() && removePolicies.isEmpty()) {
564 return getActivePolicies();
567 List<ToscaPolicy> policies = getActivePolicies();
568 policies.addAll(addPolicies);
569 return removeByPolicyId(new ArrayList<>(new HashSet<>(policies)), removePolicies);
574 * Do I support the mandatory policy types?.
576 protected boolean isMandatoryPolicyTypesCompliant() {
577 return getCurrentPolicyTypes().containsAll(getMandatoryPolicyTypes());
580 protected Set<String> getCurrentPolicyTypes() {
581 return getPolicyTypesMap().keySet().stream()
582 .map(ToscaConceptIdentifier::getName).collect(Collectors.toSet());
585 protected List<ToscaPolicy> getActivePolicies() {
586 return new ArrayList<>(policiesMap.values());
589 /* ** Action Helpers ** */
591 private boolean startIo() {
592 return source() && sink();
595 private boolean startTimers() {
597 this.scheduler.scheduleAtFixedRate(this::status, 0, statusTimerSeconds, TimeUnit.SECONDS);
598 return !statusTask.isCancelled() && !statusTask.isDone();
601 private boolean stopIo() {
602 source.unregister(sourceDispatcher);
603 boolean successSource = source.stop();
604 boolean successSink = client.getSink().stop();
605 return successSource && successSink;
608 private boolean stopTimers() {
610 if (statusTask != null) {
611 success = statusTask.cancel(false);
617 private void shutdownIo() {
618 client.getSink().shutdown();
622 private void shutdownTimers() {
623 scheduler.shutdownNow();
626 protected PdpStatus statusPayload(@NonNull PdpState state) {
627 var status = new PdpStatus();
628 status.setName(name);
629 status.setPdpGroup(group);
630 status.setPdpSubgroup(subGroup);
631 status.setState(state);
632 status.setHealthy(isAlive() ? PdpHealthStatus.HEALTHY : PdpHealthStatus.NOT_HEALTHY);
633 status.setPdpType(getPdpType());
634 status.setPolicies(new ArrayList<>(policiesMap.keySet()));
635 status.setStatistics(statisticsPayload());
640 * It provides a snapshot of the current statistics.
642 public PdpStatistics statisticsPayload() {
643 var updateStats = new PdpStatistics(stats);
644 updateStats.setTimeStamp(Instant.now());
647 BeanUtils.copyProperties(updateStats, PolicyEngineConstants.getManager().getStats().getGroupStat());
648 } catch (IllegalAccessException | InvocationTargetException ex) {
649 logger.debug("statistics mapping failure", ex);
655 private boolean source() {
656 List<TopicSource> sources = TopicEndpointManager.getManager().addTopicSources(properties);
657 if (sources.isEmpty()) {
661 if (sources.size() != 1) {
662 logger.warn("Lifecycle Manager: unexpected: more than one source configured ({})", sources.size());
665 this.source = sources.get(0);
666 this.source.register(this.sourceDispatcher);
667 this.sourceDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), stateChangeFeed);
668 this.sourceDispatcher.register(PdpMessageType.PDP_UPDATE.name(), updateFeed);
669 return source.start();
672 private boolean sink() {
673 List<TopicSink> sinks = TopicEndpointManager.getManager().addTopicSinks(properties);
674 if (sinks.isEmpty()) {
675 logger.error("Lifecycle Manager sinks have not been configured");
679 if (sinks.size() != 1) {
680 logger.warn("Lifecycle Manager: unexpected: more than one sink configured ({})", sinks.size());
683 this.client = new TopicSinkClient(sinks.get(0));
684 return this.client.getSink().start();
687 protected boolean isItMe(String name, String group, String subgroup) {
688 if (Objects.equals(name, getName())) {
692 return name == null && group != null
693 && Objects.equals(group, getGroup())
694 && Objects.equals(subgroup, getSubGroup());
697 /* **** IO listeners ***** */
700 * PDP State Change Message Listener.
702 public static class PdpStateChangeFeed extends ScoListener<PdpStateChange> {
704 protected final LifecycleFsm fsm;
706 protected PdpStateChangeFeed(Class<PdpStateChange> clazz, LifecycleFsm fsm) {
712 public void onTopicEvent(CommInfrastructure comm, String topic,
713 StandardCoderObject coder, PdpStateChange stateChange) {
715 if (!isMine(stateChange)) {
716 logger.warn("pdp-state-chage from {}:{} is invalid: {}", comm, topic, stateChange);
720 fsm.stateChange(stateChange);
723 protected boolean isMine(PdpStateChange change) {
724 if (change == null) {
728 return fsm.isItMe(change.getName(), change.getPdpGroup(), change.getPdpSubgroup());
733 * PDP Update Message Listener.
735 public static class PdpUpdateFeed extends ScoListener<PdpUpdate> {
737 protected final LifecycleFsm fsm;
739 public PdpUpdateFeed(Class<PdpUpdate> clazz, LifecycleFsm fsm) {
745 public void onTopicEvent(CommInfrastructure comm, String topic,
746 StandardCoderObject coder, PdpUpdate update) {
748 if (!isMine(update)) {
749 logger.warn("pdp-update from {}:{} is invalid: {}", comm, topic, update);
756 protected boolean isMine(PdpUpdate update) {
757 if (update == null) {
761 return fsm.isItMe(update.getName(), update.getPdpGroup(), update.getPdpSubgroup());
765 // these may be overridden by junit tests
767 protected ScheduledExecutorService makeExecutor() {
768 var exec = new ScheduledThreadPoolExecutor(1);
769 exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
770 exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
771 exec.setRemoveOnCancelPolicy(true);