Removing deprecated DMAAP library
[policy/drools-pdp.git] / feature-lifecycle / src / main / java / org / onap / policy / drools / lifecycle / LifecycleFsm.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2021, 2023-2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.lifecycle;
23
24 import com.google.re2j.Pattern;
25 import io.prometheus.client.Counter;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.Objects;
33 import java.util.Properties;
34 import java.util.Set;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.stream.Collectors;
40 import lombok.Getter;
41 import lombok.NonNull;
42 import lombok.Setter;
43 import org.apache.commons.lang3.StringUtils;
44 import org.onap.policy.common.capabilities.Startable;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
47 import org.onap.policy.common.endpoints.event.comm.TopicSink;
48 import org.onap.policy.common.endpoints.event.comm.TopicSource;
49 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
50 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
51 import org.onap.policy.common.endpoints.listeners.ScoListener;
52 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
53 import org.onap.policy.common.utils.coder.StandardCoderObject;
54 import org.onap.policy.common.utils.resources.PrometheusUtils;
55 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
56 import org.onap.policy.drools.policies.DomainMaker;
57 import org.onap.policy.drools.system.PolicyController;
58 import org.onap.policy.drools.system.PolicyEngineConstants;
59 import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
60 import org.onap.policy.models.pdp.concepts.PdpStateChange;
61 import org.onap.policy.models.pdp.concepts.PdpStatus;
62 import org.onap.policy.models.pdp.concepts.PdpUpdate;
63 import org.onap.policy.models.pdp.enums.PdpHealthStatus;
64 import org.onap.policy.models.pdp.enums.PdpMessageType;
65 import org.onap.policy.models.pdp.enums.PdpResponseStatus;
66 import org.onap.policy.models.pdp.enums.PdpState;
67 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
68 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
71
72 /**
73  * Lifecycle FSM.
74  */
75 public class LifecycleFsm implements Startable {
76
77     /**
78      * Default Status Timer in seconds.
79      */
80     public static final long DEFAULT_STATUS_TIMER_SECONDS = 120L;
81
82     private static final Logger logger = LoggerFactory.getLogger(LifecycleFsm.class);
83     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
84
85     protected static final String CONFIGURATION_PROPERTIES_NAME = "feature-lifecycle";
86     protected static final String GROUP_NAME = "lifecycle.pdp.group";
87     protected static final String PDP_TYPE = "lifecycle.pdp.type";
88     protected static final String MANDATORY_POLICY_TYPES = "lifecycle.pdp.policytypes";
89
90     protected static final String DEFAULT_PDP_GROUP = "defaultGroup";
91     protected static final String DEFAULT_PDP_TYPE = "drools";
92
93     protected static final long MIN_STATUS_INTERVAL_SECONDS = 5L;
94     protected static final String PDP_MESSAGE_NAME = "messageName";
95
96     protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_RULES =
97             new ToscaConceptIdentifier("onap.policies.native.drools.Artifact", "1.0.0");
98
99     protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_CONTROLLER =
100             new ToscaConceptIdentifier("onap.policies.native.drools.Controller", "1.0.0");
101
102     protected static final String PROMETHEUS_NAMESPACE = "pdpd";
103
104     protected static final Counter deploymentsCounter =
105             Counter.build().namespace(PROMETHEUS_NAMESPACE).name(PrometheusUtils.POLICY_DEPLOYMENTS_METRIC)
106                     .labelNames(PrometheusUtils.STATE_METRIC_LABEL,
107                             PrometheusUtils.OPERATION_METRIC_LABEL,
108                             PrometheusUtils.STATUS_METRIC_LABEL)
109                     .help(PrometheusUtils.POLICY_DEPLOYMENT_HELP)
110                     .register();
111
112     @Getter
113     protected final Properties properties;
114
115     @Getter
116     protected TopicSource source;
117
118     @Getter
119     protected TopicSinkClient client;
120
121     protected LifecycleState state = new LifecycleStateTerminated(this);
122
123     @GsonJsonIgnore
124     protected ScheduledExecutorService scheduler = makeExecutor();
125
126     @GsonJsonIgnore
127     protected ScheduledFuture<?> statusTask;
128
129     @GsonJsonIgnore
130     protected MessageTypeDispatcher sourceDispatcher = new MessageTypeDispatcher(PDP_MESSAGE_NAME);
131
132     @GsonJsonIgnore
133     protected PdpStateChangeFeed stateChangeFeed = new PdpStateChangeFeed(PdpStateChange.class, this);
134
135     @GsonJsonIgnore
136     protected PdpUpdateFeed updateFeed = new PdpUpdateFeed(PdpUpdate.class, this);
137
138     @Getter
139     @Setter
140     protected long statusTimerSeconds = DEFAULT_STATUS_TIMER_SECONDS;
141
142     @Getter
143     private String group;
144
145     @Getter
146     protected String subGroup;
147
148     @Getter
149     @Setter
150     protected String pdpType;
151
152     protected volatile String pdpName;
153
154     @Getter
155     protected Set<String> mandatoryPolicyTypes = new HashSet<>();
156
157     @Getter
158     protected final Map<ToscaConceptIdentifier, PolicyTypeController> policyTypesMap = new HashMap<>();
159
160     @Getter
161     protected final Map<ToscaConceptIdentifier, ToscaPolicy> policiesMap = new HashMap<>();
162
163     /**
164      * Constructor.
165      */
166     public LifecycleFsm() {
167         properties = SystemPersistenceConstants.getManager().getProperties(CONFIGURATION_PROPERTIES_NAME);
168         setGroup(properties.getProperty(GROUP_NAME, DEFAULT_PDP_GROUP));
169         setPdpType(properties.getProperty(PDP_TYPE, DEFAULT_PDP_TYPE));
170
171         policyTypesMap.put(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER,
172                 new PolicyTypeNativeDroolsController(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER, this));
173         policyTypesMap.put(
174                 POLICY_TYPE_DROOLS_NATIVE_RULES,
175                  new PolicyTypeNativeArtifactController(POLICY_TYPE_DROOLS_NATIVE_RULES, this));
176
177         String commaSeparatedPolicyTypes = properties.getProperty(MANDATORY_POLICY_TYPES);
178         if (!StringUtils.isBlank(commaSeparatedPolicyTypes)) {
179             Collections.addAll(mandatoryPolicyTypes, COMMA_SPACE_PAT.split(commaSeparatedPolicyTypes));
180         }
181
182         logger.info("The mandatory Policy Types are {}. Compliance is {}",
183                 mandatoryPolicyTypes, isMandatoryPolicyTypesCompliant());
184     }
185
186     @GsonJsonIgnore
187     public DomainMaker getDomainMaker() {
188         return PolicyEngineConstants.getManager().getDomainMaker();
189     }
190
191     @Override
192     public boolean isAlive() {
193         return client != null && client.getSink().isAlive();
194     }
195
196     /**
197      * Returns the PDP Name.
198      */
199     public String getPdpName() {
200         if (this.pdpName == null) {
201             this.pdpName = PolicyEngineConstants.getManager().getPdpName();
202         }
203
204         return this.pdpName;
205     }
206
207     /**
208      * Current state.
209      */
210     public PdpState state() {
211         return state.state();
212     }
213
214     /**
215      * set group.
216      */
217     public synchronized void setGroup(String group) {
218         this.group = group;
219     }
220
221     /**
222      * set subgroup.
223      */
224     public synchronized void setSubGroup(String subGroup) {
225         this.subGroup = subGroup;
226     }
227
228     /* ** FSM events - entry points of events into the FSM ** */
229
230     @Override
231     public synchronized boolean start() {
232         this.pdpName = PolicyEngineConstants.getManager().getPdpName();
233         logger.info("lifecycle event: start engine");
234         return state.start();
235     }
236
237     /**
238      * Start a controller event.
239      */
240     public synchronized void start(@NonNull PolicyController controller) {
241         logger.info("lifecycle event: start controller: {}", controller.getName());
242         if (!controller.getDrools().isBrained()) {
243             logger.warn("ignoring lifecycle event: start controller: {}", controller);
244             return;
245         }
246
247         for (ToscaConceptIdentifier id : controller.getPolicyTypes()) {
248             PolicyTypeDroolsController ptDc = (PolicyTypeDroolsController) policyTypesMap.get(id); //NOSONAR
249             if (ptDc == null) {
250                 policyTypesMap.put(id, new PolicyTypeDroolsController(id, this, controller));
251                 logger.info("policy-type {} added", id);
252             } else {
253                 ptDc.add(controller);
254             }
255         }
256     }
257
258     /**
259      * Patch a controller event.
260      */
261     public synchronized void patch(@NonNull PolicyController controller) {
262         logger.info("lifecycle event: patch controller: {}", controller.getName());
263         if (controller.getDrools().isBrained()) {
264             this.start(controller);
265         } else {
266             this.stop(controller);
267         }
268     }
269
270     @Override
271     public synchronized boolean stop() {
272         logger.info("lifecycle event: stop engine");
273         return state.stop();
274     }
275
276     /**
277      * Stop a controller event.
278      */
279     public synchronized void stop(@NonNull PolicyController controller) {
280         logger.info("lifecycle event: stop controller: {}", controller.getName());
281
282         List<PolicyTypeDroolsController> opControllers =
283             policyTypesMap.values().stream()
284                 .filter(PolicyTypeDroolsController.class::isInstance)
285                 .map(PolicyTypeDroolsController.class::cast)
286                 .filter(opController -> opController.getControllers().containsKey(controller.getName())).toList();
287
288         for (PolicyTypeDroolsController opController : opControllers) {
289             opController.remove(controller);
290             if (opController.controllers().isEmpty()) {
291                 policyTypesMap.remove(opController.getPolicyType());
292                 logger.info("policy-type {} removed", opController.getPolicyType());
293             }
294         }
295     }
296
297     @Override
298     public synchronized void shutdown() {
299         logger.info("lifecycle event: shutdown engine");
300         state.shutdown();
301     }
302
303     /**
304      * Status reporting event.
305      * @return true if successful
306      */
307     public synchronized boolean status() {
308         logger.info("lifecycle event: status");
309         return state.status();
310     }
311
312     public synchronized boolean stateChange(PdpStateChange stateChange) {
313         logger.info("lifecycle event: state-change");
314         return state.stateChange(stateChange);
315     }
316
317     public synchronized boolean update(PdpUpdate update) {
318         logger.info("lifecycle event: update");
319         return state.update(update);
320     }
321
322     /* FSM State Actions (executed sequentially) */
323
324     protected boolean startAction() {
325         if (isAlive()) {
326             return true;
327         }
328
329         return startIo() && startTimers();
330     }
331
332     protected boolean stopAction() {
333         if (!isAlive()) {
334             return true;
335         }
336
337         boolean successTimers = stopTimers();
338         boolean successIo = stopIo();
339         return successTimers && successIo;
340     }
341
342     protected void shutdownAction() {
343         shutdownIo();
344         shutdownTimers();
345     }
346
347     protected boolean statusAction() {
348         return statusAction(null);
349     }
350
351     protected boolean statusAction(PdpResponseDetails response) {
352         return statusAction(state(), response);
353     }
354
355     protected boolean statusAction(PdpState state, PdpResponseDetails response) {
356         if (!isAlive()) {
357             return false;
358         }
359
360         PdpStatus status = statusPayload(state);
361         if (response != null) {
362             status.setRequestId(response.getResponseTo());
363             status.setResponse(response);
364         }
365
366         return client.send(status);
367     }
368
369     protected synchronized void transitionToAction(@NonNull LifecycleState newState) {
370         state = newState;
371     }
372
373     protected boolean setStatusIntervalAction(long intervalSeconds) {
374         if (intervalSeconds == statusTimerSeconds || intervalSeconds == 0) {
375             return true;
376         }
377
378         if (intervalSeconds <= MIN_STATUS_INTERVAL_SECONDS) {
379             logger.warn("interval is too low (< {}): {}", MIN_STATUS_INTERVAL_SECONDS, intervalSeconds);
380             return false;
381         }
382
383         setStatusTimerSeconds(intervalSeconds);
384         return stopTimers() && startTimers();
385     }
386
387     protected List<ToscaPolicy> getDeployablePoliciesAction(@NonNull List<ToscaPolicy> policies) {
388         List<ToscaPolicy> deployPolicies = new ArrayList<>(policies);
389         deployPolicies.removeAll(getActivePolicies());
390
391         // Ensure that the sequence of policy deployments is sane to minimize potential errors,
392         // First policies to deploy are the controller related ones, those that affect the lifecycle of
393         // controllers, starting with the ones that affect the existence of the controller (native controller),
394         // second the ones that "brain" the controller with application logic (native artifacts).
395         // Lastly the application specific ones such as operational policies.
396
397         // group policies by policy types
398         Map<String, List<ToscaPolicy>> policyTypeGroups = groupPoliciesByPolicyType(deployPolicies);
399
400         // place native controller policies at the start of the list
401         List<ToscaPolicy> orderedDeployableList = new ArrayList<>(
402                 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList()));
403
404         // add to the working list the native controller policies
405         orderedDeployableList.addAll(
406             policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList()));
407
408         // place non-native policies to place at the end of the list
409         orderedDeployableList.addAll(getNonNativePolicies(policyTypeGroups));
410
411         return orderedDeployableList;
412     }
413
414     protected List<ToscaPolicy> getUndeployablePoliciesAction(@NonNull List<ToscaPolicy> policies) {
415         List<ToscaPolicy> undeployPolicies = new ArrayList<>(getActivePolicies());
416         undeployPolicies.removeAll(policies);
417         if (undeployPolicies.isEmpty()) {
418             return undeployPolicies;
419         }
420
421         // Ensure that the sequence of policy undeployments is sane to minimize potential errors,
422         // as it is assumed not smart ordering from the policies sent by the PAP.
423         // First policies to undeploy are those that are only of relevance within a drools container,
424         // such as the operational policies.   The next set of policies to undeploy are those that
425         // affect the overall PDP-D application support, firstly the ones that supports the
426         // application software wiring (native rules policies), and second those that relate
427         // to the PDP-D controllers lifecycle.
428
429         // group policies by policy types
430         Map<String, List<ToscaPolicy>> policyTypeGroups = groupPoliciesByPolicyType(undeployPolicies);
431
432         // place controller only (non-native policies) at the start of the list of the undeployment list
433         List<ToscaPolicy> orderedUndeployableList = getNonNativePolicies(policyTypeGroups);
434
435         // add to the working list the native rules policies if any
436         orderedUndeployableList.addAll(
437             policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList()));
438
439         // finally add to the working list native controller policies if any
440         orderedUndeployableList.addAll(
441             policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList()));
442
443         return orderedUndeployableList;
444     }
445
446     protected void deployedPolicyAction(@NonNull ToscaPolicy policy) {
447         policiesMap.computeIfAbsent(policy.getIdentifier(), key -> {
448             // avoid counting reapplies in a second pass when a mix of native and non-native
449             // policies are present.
450             deploymentsCounter.labels(state.state().name(),
451                     PrometheusUtils.DEPLOY_OPERATION,
452                     PdpResponseStatus.SUCCESS.name()).inc();
453             return policy;
454         });
455     }
456
457     protected void undeployedPolicyAction(@NonNull ToscaPolicy policy) {
458         policiesMap.computeIfPresent(policy.getIdentifier(), (key, value) -> {
459             // avoid counting reapplies in a second pass when a mix of native and non-native
460             // policies are present.
461             deploymentsCounter.labels(state.state().name(),
462                     PrometheusUtils.UNDEPLOY_OPERATION,
463                     PdpResponseStatus.SUCCESS.name()).inc();
464             return null;
465         });
466     }
467
468     protected void failedDeployPolicyAction(@NonNull ToscaPolicy failedPolicy) {    // NOSONAR
469         deploymentsCounter.labels(state.state().name(),
470                 PrometheusUtils.DEPLOY_OPERATION,
471                 PdpResponseStatus.FAIL.name()).inc();
472     }
473
474     protected void failedUndeployPolicyAction(ToscaPolicy failedPolicy) {
475         deploymentsCounter.labels(state.state().name(),
476                 PrometheusUtils.UNDEPLOY_OPERATION,
477                 PdpResponseStatus.FAIL.name()).inc();
478         policiesMap.remove(failedPolicy.getIdentifier());
479     }
480
481     protected List<ToscaPolicy> resetPoliciesAction() {
482         List<ToscaPolicy> policies = new ArrayList<>(getActivePolicies());
483         policiesMap.clear();
484         return policies;
485     }
486
487     protected void updatePoliciesAction(List<ToscaPolicy> toscaPolicies) {
488         this.scheduler.submit(() -> state.updatePolicies(toscaPolicies));
489     }
490
491     protected PolicyTypeController getController(ToscaConceptIdentifier policyType) {
492         return policyTypesMap.get(policyType);
493     }
494
495     protected Map<String, List<ToscaPolicy>> groupPoliciesByPolicyType(List<ToscaPolicy> deployPolicies) {
496         return deployPolicies.stream()
497             .distinct()
498             .collect(Collectors.groupingBy(policy -> policy.getTypeIdentifier().getName()));
499     }
500
501     protected List<ToscaPolicy> getNonNativePolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
502         return policyTypeGroups.entrySet().stream()
503             .filter(entry -> !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName())
504                 && !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName()))
505             .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
506     }
507
508     protected List<ToscaPolicy> getNativeArtifactPolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
509         return policyTypeGroups.entrySet().stream()
510             .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName()))
511             .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
512     }
513
514     protected List<ToscaPolicy> getNativeControllerPolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
515         return policyTypeGroups.entrySet().stream()
516                        .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName()))
517                        .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
518     }
519
520     /**
521      * Get the policy identifiers.
522      */
523     public List<ToscaConceptIdentifier> getPolicyIds(List<ToscaPolicy> policies) {
524         return policies.stream()
525                        .map(ToscaPolicy::getIdentifier)
526                        .distinct()
527                        .collect(Collectors.toList());
528     }
529
530     protected String getPolicyIdsMessage(List<ToscaPolicy> policies) {
531         return getPolicyIds(policies).toString();
532     }
533
534     protected List<ToscaPolicy> removeByPolicyId(@NonNull List<ToscaPolicy> policies,
535             @NonNull List<ToscaConceptIdentifier> toRemoveList) {
536         policies.removeIf(policy -> toRemoveList.contains(policy.getIdentifier()));
537         return policies;
538     }
539
540     protected List<ToscaPolicy> removeByPolicyId(@NonNull List<ToscaConceptIdentifier> toRemoveList) {
541         return removeByPolicyId(getActivePolicies(), toRemoveList);
542     }
543
544     protected List<ToscaPolicy> mergePolicies(@NonNull List<ToscaPolicy> addPolicies,
545             @NonNull List<ToscaConceptIdentifier> removePolicies) {
546
547         if (addPolicies.isEmpty() && removePolicies.isEmpty()) {
548             return getActivePolicies();
549         }
550
551         List<ToscaPolicy> policies = getActivePolicies();
552         policies.addAll(addPolicies);
553         return removeByPolicyId(new ArrayList<>(new HashSet<>(policies)), removePolicies);
554     }
555
556
557     /**
558      * Do I support the mandatory policy types?.
559      */
560     protected boolean isMandatoryPolicyTypesCompliant() {
561         return getCurrentPolicyTypes().containsAll(getMandatoryPolicyTypes());
562     }
563
564     protected Set<String> getCurrentPolicyTypes() {
565         return getPolicyTypesMap().keySet().stream()
566                        .map(ToscaConceptIdentifier::getName).collect(Collectors.toSet());
567     }
568
569     protected List<ToscaPolicy> getActivePolicies() {
570         return new ArrayList<>(policiesMap.values());
571     }
572
573     /* ** Action Helpers ** */
574
575     private boolean startIo() {
576         return source() && sink();
577     }
578
579     private boolean startTimers() {
580         statusTask =
581                 this.scheduler.scheduleAtFixedRate(this::status, 0, statusTimerSeconds, TimeUnit.SECONDS);
582         return !statusTask.isCancelled() && !statusTask.isDone();
583     }
584
585     private boolean stopIo() {
586         source.unregister(sourceDispatcher);
587         boolean successSource = source.stop();
588         boolean successSink = client.getSink().stop();
589         return successSource && successSink;
590     }
591
592     private boolean stopTimers() {
593         var success = true;
594         if (statusTask != null) {
595             success = statusTask.cancel(false);
596         }
597
598         return success;
599     }
600
601     private void shutdownIo() {
602         client.getSink().shutdown();
603         source.shutdown();
604     }
605
606     private void shutdownTimers() {
607         scheduler.shutdownNow();
608     }
609
610     protected PdpStatus statusPayload(@NonNull PdpState state) {
611         var status = new PdpStatus();
612         status.setName(getPdpName());
613         status.setPdpGroup(group);
614         status.setPdpSubgroup(subGroup);
615         status.setState(state);
616         status.setHealthy(isAlive() ? PdpHealthStatus.HEALTHY : PdpHealthStatus.NOT_HEALTHY);
617         status.setPdpType(getPdpType());
618         status.setPolicies(new ArrayList<>(policiesMap.keySet()));
619         return status;
620     }
621
622     private boolean source() {
623         List<TopicSource> sources = TopicEndpointManager.getManager().addTopicSources(properties);
624         if (sources.isEmpty()) {
625             return false;
626         }
627
628         if (sources.size() != 1) {
629             logger.warn("Lifecycle Manager: unexpected: more than one source configured ({})", sources.size());
630         }
631
632         this.source = sources.get(0);
633         this.source.register(this.sourceDispatcher);
634         this.sourceDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), stateChangeFeed);
635         this.sourceDispatcher.register(PdpMessageType.PDP_UPDATE.name(), updateFeed);
636         return source.start();
637     }
638
639     private boolean sink() {
640         List<TopicSink> sinks = TopicEndpointManager.getManager().addTopicSinks(properties);
641         if (sinks.isEmpty()) {
642             logger.error("Lifecycle Manager sinks have not been configured");
643             return false;
644         }
645
646         if (sinks.size() != 1) {
647             logger.warn("Lifecycle Manager: unexpected: more than one sink configured ({})", sinks.size());
648         }
649
650         this.client = new TopicSinkClient(sinks.get(0));
651         return this.client.getSink().start();
652     }
653
654     protected boolean isItMe(String name, String group, String subgroup) {
655         if (Objects.equals(name, getPdpName())) {
656             return true;
657         }
658
659         return name == null && group != null
660             && Objects.equals(group, getGroup())
661             && Objects.equals(subgroup, getSubGroup());
662     }
663
664     /* **** IO listeners ***** */
665
666     /**
667      * PDP State Change Message Listener.
668      */
669     public static class PdpStateChangeFeed extends ScoListener<PdpStateChange> {
670
671         protected final LifecycleFsm fsm;
672
673         protected PdpStateChangeFeed(Class<PdpStateChange> clazz, LifecycleFsm fsm) {
674             super(clazz);
675             this.fsm = fsm;
676         }
677
678         @Override
679         public void onTopicEvent(CommInfrastructure comm, String topic,
680                                  StandardCoderObject coder, PdpStateChange stateChange) {
681
682             if (!isMine(stateChange)) {
683                 logger.warn("pdp-state-chage from {}:{} is invalid: {}", comm, topic, stateChange);
684                 return;
685             }
686
687             fsm.stateChange(stateChange);
688         }
689
690         protected boolean isMine(PdpStateChange change) {
691             if (change == null) {
692                 return false;
693             }
694
695             return fsm.isItMe(change.getName(), change.getPdpGroup(), change.getPdpSubgroup());
696         }
697     }
698
699     /**
700      * PDP Update Message Listener.
701      */
702     public static class PdpUpdateFeed extends ScoListener<PdpUpdate> {
703
704         protected final LifecycleFsm fsm;
705
706         public PdpUpdateFeed(Class<PdpUpdate> clazz, LifecycleFsm fsm) {
707             super(clazz);
708             this.fsm = fsm;
709         }
710
711         @Override
712         public void onTopicEvent(CommInfrastructure comm, String topic,
713                                  StandardCoderObject coder, PdpUpdate update) {
714
715             if (!isMine(update)) {
716                 logger.warn("pdp-update from {}:{} is invalid: {}", comm, topic, update);
717                 return;
718             }
719
720             fsm.update(update);
721         }
722
723         protected boolean isMine(PdpUpdate update) {
724             if (update == null) {
725                 return false;
726             }
727
728             return fsm.isItMe(update.getName(), update.getPdpGroup(), update.getPdpSubgroup());
729         }
730     }
731
732     // these may be overridden by junit tests
733
734     protected ScheduledExecutorService makeExecutor() {
735         var exec = new ScheduledThreadPoolExecutor(1);
736         exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
737         exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
738         exec.setRemoveOnCancelPolicy(true);
739
740         return exec;
741     }
742 }