Use lombok in drools-pdp #3
[policy/drools-pdp.git] / feature-lifecycle / src / main / java / org / onap / policy / drools / lifecycle / LifecycleFsm.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2021 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2021 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.lifecycle;
23
24 import com.google.re2j.Pattern;
25 import java.lang.reflect.InvocationTargetException;
26 import java.time.Instant;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Objects;
34 import java.util.Properties;
35 import java.util.Set;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledFuture;
38 import java.util.concurrent.ScheduledThreadPoolExecutor;
39 import java.util.concurrent.TimeUnit;
40 import java.util.stream.Collectors;
41 import lombok.Getter;
42 import lombok.NonNull;
43 import lombok.Setter;
44 import org.apache.commons.beanutils.BeanUtils;
45 import org.apache.commons.lang3.StringUtils;
46 import org.onap.policy.common.capabilities.Startable;
47 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
48 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
49 import org.onap.policy.common.endpoints.event.comm.TopicSink;
50 import org.onap.policy.common.endpoints.event.comm.TopicSource;
51 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
52 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
53 import org.onap.policy.common.endpoints.listeners.ScoListener;
54 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
55 import org.onap.policy.common.utils.coder.StandardCoderObject;
56 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
57 import org.onap.policy.drools.policies.DomainMaker;
58 import org.onap.policy.drools.system.PolicyController;
59 import org.onap.policy.drools.system.PolicyEngineConstants;
60 import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
61 import org.onap.policy.models.pdp.concepts.PdpStateChange;
62 import org.onap.policy.models.pdp.concepts.PdpStatistics;
63 import org.onap.policy.models.pdp.concepts.PdpStatus;
64 import org.onap.policy.models.pdp.concepts.PdpUpdate;
65 import org.onap.policy.models.pdp.enums.PdpHealthStatus;
66 import org.onap.policy.models.pdp.enums.PdpMessageType;
67 import org.onap.policy.models.pdp.enums.PdpState;
68 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
69 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
70 import org.slf4j.Logger;
71 import org.slf4j.LoggerFactory;
72
73 /**
74  * Lifecycle FSM.
75  */
76 public class LifecycleFsm implements Startable {
77
78     /**
79      * Default Status Timer in seconds.
80      */
81     public static final long DEFAULT_STATUS_TIMER_SECONDS = 120L;
82
83     private static final Logger logger = LoggerFactory.getLogger(LifecycleFsm.class);
84     private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
85
86     protected static final String CONFIGURATION_PROPERTIES_NAME = "feature-lifecycle";
87     protected static final String GROUP_NAME = "lifecycle.pdp.group";
88     protected static final String PDP_TYPE = "lifecycle.pdp.type";
89     protected static final String MANDATORY_POLICY_TYPES = "lifecycle.pdp.policytypes";
90
91     protected static final String DEFAULT_PDP_GROUP = "defaultGroup";
92     protected static final String DEFAULT_PDP_TYPE = "drools";
93
94     protected static final long MIN_STATUS_INTERVAL_SECONDS = 5L;
95     protected static final String PDP_MESSAGE_NAME = "messageName";
96
97     protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_RULES =
98             new ToscaConceptIdentifier("onap.policies.native.drools.Artifact", "1.0.0");
99
100     protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_CONTROLLER =
101             new ToscaConceptIdentifier("onap.policies.native.drools.Controller", "1.0.0");
102
103     @Getter
104     protected final Properties properties;
105
106     @Getter
107     protected TopicSource source;
108
109     @Getter
110     protected TopicSinkClient client;
111
112     @Getter
113     protected final String name = PolicyEngineConstants.PDP_NAME;
114
115     protected LifecycleState state = new LifecycleStateTerminated(this);
116
117     @GsonJsonIgnore
118     protected ScheduledExecutorService scheduler = makeExecutor();
119
120     @GsonJsonIgnore
121     protected ScheduledFuture<?> statusTask;
122
123     @GsonJsonIgnore
124     protected MessageTypeDispatcher sourceDispatcher = new MessageTypeDispatcher(PDP_MESSAGE_NAME);
125
126     @GsonJsonIgnore
127     protected PdpStateChangeFeed stateChangeFeed = new PdpStateChangeFeed(PdpStateChange.class, this);
128
129     @GsonJsonIgnore
130     protected PdpUpdateFeed updateFeed = new PdpUpdateFeed(PdpUpdate.class, this);
131
132     @Getter
133     @Setter
134     protected long statusTimerSeconds = DEFAULT_STATUS_TIMER_SECONDS;
135
136     @Getter
137     private String group;
138
139     @Getter
140     protected String subGroup;
141
142     @Getter
143     @Setter
144     protected String pdpType;
145
146     @Getter
147     protected Set<String> mandatoryPolicyTypes = new HashSet<>();
148
149     @Getter
150     protected final Map<ToscaConceptIdentifier, PolicyTypeController> policyTypesMap = new HashMap<>();
151
152     @Getter
153     protected final Map<ToscaConceptIdentifier, ToscaPolicy> policiesMap = new HashMap<>();
154
155     @Getter
156     protected final PdpStatistics stats = new PdpStatistics();
157
158     /**
159      * Constructor.
160      */
161     public LifecycleFsm() {
162         properties = SystemPersistenceConstants.getManager().getProperties(CONFIGURATION_PROPERTIES_NAME);
163         setGroup(properties.getProperty(GROUP_NAME, DEFAULT_PDP_GROUP));
164         setPdpType(properties.getProperty(PDP_TYPE, DEFAULT_PDP_TYPE));
165
166         policyTypesMap.put(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER,
167                 new PolicyTypeNativeDroolsController(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER, this));
168         policyTypesMap.put(
169                 POLICY_TYPE_DROOLS_NATIVE_RULES,
170                  new PolicyTypeNativeArtifactController(POLICY_TYPE_DROOLS_NATIVE_RULES, this));
171
172         String commaSeparatedPolicyTypes = properties.getProperty(MANDATORY_POLICY_TYPES);
173         if (!StringUtils.isBlank(commaSeparatedPolicyTypes)) {
174             Collections.addAll(mandatoryPolicyTypes, COMMA_SPACE_PAT.split(commaSeparatedPolicyTypes));
175         }
176
177         logger.info("The mandatory Policy Types are {}. Compliance is {}",
178                 mandatoryPolicyTypes, isMandatoryPolicyTypesCompliant());
179
180         stats.setPdpInstanceId(PolicyEngineConstants.PDP_NAME);
181     }
182
183     @GsonJsonIgnore
184     public DomainMaker getDomainMaker() {
185         return PolicyEngineConstants.getManager().getDomainMaker();
186     }
187
188     @Override
189     public boolean isAlive() {
190         return client != null && client.getSink().isAlive();
191     }
192
193     /**
194      * Current state.
195      */
196     public PdpState state() {
197         return state.state();
198     }
199
200     /**
201      * set group.
202      */
203     public synchronized void setGroup(String group) {
204         this.group = group;
205         this.stats.setPdpGroupName(group);
206     }
207
208     /**
209      * set subgroup.
210      */
211     public synchronized void setSubGroup(String subGroup) {
212         this.subGroup = subGroup;
213         this.stats.setPdpSubGroupName(subGroup);
214     }
215
216     /* ** FSM events - entry points of events into the FSM ** */
217
218     @Override
219     public synchronized boolean start() {
220         logger.info("lifecycle event: start engine");
221         return state.start();
222     }
223
224     /**
225      * Start a controller event.
226      */
227     public synchronized void start(@NonNull PolicyController controller) {
228         logger.info("lifecycle event: start controller: {}", controller.getName());
229         if (!controller.getDrools().isBrained()) {
230             logger.warn("ignoring lifecycle event: start controller: {}", controller);
231             return;
232         }
233
234         for (ToscaConceptIdentifier id : controller.getPolicyTypes()) {
235             PolicyTypeDroolsController ptDc = (PolicyTypeDroolsController) policyTypesMap.get(id); //NOSONAR
236             if (ptDc == null) {
237                 policyTypesMap.put(id, new PolicyTypeDroolsController(id, this, controller));
238                 logger.info("policy-type {} added", id);
239             } else {
240                 ptDc.add(controller);
241             }
242         }
243     }
244
245     /**
246      * Patch a controller event.
247      */
248     public synchronized void patch(@NonNull PolicyController controller) {
249         logger.info("lifecycle event: patch controller: {}", controller.getName());
250         if (controller.getDrools().isBrained()) {
251             this.start(controller);
252         } else {
253             this.stop(controller);
254         }
255     }
256
257     @Override
258     public synchronized boolean stop() {
259         logger.info("lifecycle event: stop engine");
260         return state.stop();
261     }
262
263     /**
264      * Stop a controller event.
265      */
266     public synchronized void stop(@NonNull PolicyController controller) {
267         logger.info("lifecycle event: stop controller: {}", controller.getName());
268
269         List<PolicyTypeDroolsController> opControllers =
270             policyTypesMap.values().stream()
271                 .filter(PolicyTypeDroolsController.class::isInstance)
272                 .map(PolicyTypeDroolsController.class::cast)
273                 .filter(opController -> opController.getControllers().containsKey(controller.getName()))
274                 .collect(Collectors.toList());
275
276         for (PolicyTypeDroolsController opController : opControllers) {
277             opController.remove(controller);
278             if (opController.controllers().isEmpty()) {
279                 policyTypesMap.remove(opController.getPolicyType());
280                 logger.info("policy-type {} removed", opController.getPolicyType());
281             }
282         }
283     }
284
285     @Override
286     public synchronized void shutdown() {
287         logger.info("lifecycle event: shutdown engine");
288         state.shutdown();
289     }
290
291     /**
292      * Status reporting event.
293      * @return true if successful
294      */
295     public synchronized boolean status() {
296         logger.info("lifecycle event: status");
297         return state.status();
298     }
299
300     public synchronized boolean stateChange(PdpStateChange stateChange) {
301         logger.info("lifecycle event: state-change");
302         return state.stateChange(stateChange);
303     }
304
305     public synchronized boolean update(PdpUpdate update) {
306         logger.info("lifecycle event: update");
307         return state.update(update);
308     }
309
310     /* FSM State Actions (executed sequentially) */
311
312     protected boolean startAction() {
313         if (isAlive()) {
314             return true;
315         }
316
317         return startIo() && startTimers();
318     }
319
320     protected boolean stopAction() {
321         if (!isAlive()) {
322             return true;
323         }
324
325         boolean successTimers = stopTimers();
326         boolean successIo = stopIo();
327         return successTimers && successIo;
328     }
329
330     protected void shutdownAction() {
331         shutdownIo();
332         shutdownTimers();
333     }
334
335     protected boolean statusAction() {
336         return statusAction(null);
337     }
338
339     protected boolean statusAction(PdpResponseDetails response) {
340         return statusAction(state(), response);
341     }
342
343     protected boolean statusAction(PdpState state, PdpResponseDetails response) {
344         if (!isAlive()) {
345             return false;
346         }
347
348         PdpStatus status = statusPayload(state);
349         if (response != null) {
350             status.setRequestId(response.getResponseTo());
351             status.setResponse(response);
352         }
353
354         return client.send(status);
355     }
356
357     protected synchronized void transitionToAction(@NonNull LifecycleState newState) {
358         state = newState;
359     }
360
361     protected boolean setStatusIntervalAction(long intervalSeconds) {
362         if (intervalSeconds == statusTimerSeconds || intervalSeconds == 0) {
363             return true;
364         }
365
366         if (intervalSeconds <= MIN_STATUS_INTERVAL_SECONDS) {
367             logger.warn("interval is too low (< {}): {}", MIN_STATUS_INTERVAL_SECONDS, intervalSeconds);
368             return false;
369         }
370
371         setStatusTimerSeconds(intervalSeconds);
372         return stopTimers() && startTimers();
373     }
374
375     protected List<ToscaPolicy> getDeployablePoliciesAction(@NonNull List<ToscaPolicy> policies) {
376         List<ToscaPolicy> deployPolicies = new ArrayList<>(policies);
377         deployPolicies.removeAll(getActivePolicies());
378
379         // Ensure that the sequence of policy deployments is sane to minimize potential errors,
380         // First policies to deploy are the controller related ones, those that affect the lifecycle of
381         // controllers, starting with the ones that affect the existence of the controller (native controller),
382         // second the ones that "brain" the controller with application logic (native artifacts).
383         // Lastly the application specific ones such as operational policies.
384
385         // group policies by policy types
386         Map<String, List<ToscaPolicy>> policyTypeGroups = groupPoliciesByPolicyType(deployPolicies);
387
388         // place native controller policies at the start of the list
389         List<ToscaPolicy> orderedDeployableList = new ArrayList<>(
390                 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList()));
391
392         // add to the working list the native controller policies
393         orderedDeployableList.addAll(
394             policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList()));
395
396         // place non-native policies to place at the end of the list
397         orderedDeployableList.addAll(getNonNativePolicies(policyTypeGroups));
398
399         return orderedDeployableList;
400     }
401
402     protected List<ToscaPolicy> getUndeployablePoliciesAction(@NonNull List<ToscaPolicy> policies) {
403         List<ToscaPolicy> undeployPolicies = new ArrayList<>(getActivePolicies());
404         undeployPolicies.removeAll(policies);
405         if (undeployPolicies.isEmpty()) {
406             return undeployPolicies;
407         }
408
409         // Ensure that the sequence of policy undeployments is sane to minimize potential errors,
410         // as it is assumed not smart ordering from the policies sent by the PAP.
411         // First policies to undeploy are those that are only of relevance within a drools container,
412         // such as the operational policies.   The next set of policies to undeploy are those that
413         // affect the overall PDP-D application support, firstly the ones that supports the
414         // application software wiring (native rules policies), and second those that relate
415         // to the PDP-D controllers lifecycle.
416
417         // group policies by policy types
418         Map<String, List<ToscaPolicy>> policyTypeGroups = groupPoliciesByPolicyType(undeployPolicies);
419
420         // place controller only (non-native policies) at the start of the list of the undeployment list
421         List<ToscaPolicy> orderedUndeployableList = getNonNativePolicies(policyTypeGroups);
422
423         // add to the working list the native rules policies if any
424         orderedUndeployableList.addAll(
425             policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList()));
426
427         // finally add to the working list native controller policies if any
428         orderedUndeployableList.addAll(
429             policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList()));
430
431         return orderedUndeployableList;
432     }
433
434     protected void deployedPolicyAction(@NonNull ToscaPolicy policy) {
435         policiesMap.computeIfAbsent(policy.getIdentifier(), key -> {
436             // avoid counting reapplies in a second pass when a mix of native and non-native
437             // policies are present.
438             getStats().setPolicyDeployCount(getStats().getPolicyDeployCount() + 1);
439             getStats().setPolicyDeploySuccessCount(getStats().getPolicyDeploySuccessCount() + 1);
440             return policy;
441         });
442     }
443
444     protected void undeployedPolicyAction(@NonNull ToscaPolicy policy) {
445         policiesMap.computeIfPresent(policy.getIdentifier(), (key, value) -> {
446             // avoid counting reapplies in a second pass when a mix of native and non-native
447             // policies are present.
448             getStats().setPolicyUndeployCount(getStats().getPolicyUndeployCount() + 1);
449             getStats().setPolicyUndeploySuccessCount(getStats().getPolicyUndeploySuccessCount() + 1);
450             return null;
451         });
452     }
453
454     protected void failedDeployPolicyAction(@NonNull ToscaPolicy failedPolicy) {    // NOSONAR
455         getStats().setPolicyDeployCount(getStats().getPolicyDeployCount() + 1);
456         getStats().setPolicyDeployFailCount(getStats().getPolicyDeployFailCount() + 1);
457     }
458
459     protected void failedUndeployPolicyAction(ToscaPolicy failedPolicy) {
460         getStats().setPolicyUndeployCount(getStats().getPolicyUndeployCount() + 1);
461         getStats().setPolicyUndeployFailCount(getStats().getPolicyUndeployFailCount() + 1);
462         policiesMap.remove(failedPolicy.getIdentifier());
463     }
464
465     protected void updateDeployCountsAction(Long deployCount, Long deploySuccesses, Long deployFailures) {
466         PdpStatistics statistics = getStats();
467         if (deployCount != null) {
468             statistics.setPolicyDeployCount(deployCount);
469         }
470
471         if (deploySuccesses != null) {
472             statistics.setPolicyDeploySuccessCount(deploySuccesses);
473         }
474
475         if (deployFailures != null) {
476             statistics.setPolicyDeployFailCount(deployFailures);
477         }
478     }
479
480     protected void updateUndeployCountsAction(Long undeployCount, Long undeploySuccesses, Long undeployFailures) {
481         PdpStatistics statistics = getStats();
482         if (undeployCount != null) {
483             statistics.setPolicyUndeployCount(undeployCount);
484         }
485
486         if (undeploySuccesses != null) {
487             statistics.setPolicyUndeploySuccessCount(undeploySuccesses);
488         }
489
490         if (undeployFailures != null) {
491             statistics.setPolicyUndeployFailCount(undeployFailures);
492         }
493     }
494
495     protected List<ToscaPolicy> resetPoliciesAction() {
496         updateDeployCountsAction(0L, 0L, 0L);
497         updateUndeployCountsAction(0L, 0L, 0L);
498         List<ToscaPolicy> policies = new ArrayList<>(getActivePolicies());
499         policiesMap.clear();
500         return policies;
501     }
502
503     protected void updatePoliciesAction(List<ToscaPolicy> toscaPolicies) {
504         this.scheduler.submit(() -> state.updatePolicies(toscaPolicies));
505     }
506
507     protected PolicyTypeController getController(ToscaConceptIdentifier policyType) {
508         return policyTypesMap.get(policyType);
509     }
510
511     protected Map<String, List<ToscaPolicy>> groupPoliciesByPolicyType(List<ToscaPolicy> deployPolicies) {
512         return deployPolicies.stream()
513             .distinct()
514             .collect(Collectors.groupingBy(policy -> policy.getTypeIdentifier().getName()));
515     }
516
517     protected List<ToscaPolicy> getNonNativePolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
518         return policyTypeGroups.entrySet().stream()
519             .filter(entry -> !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName())
520                 && !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName()))
521             .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
522     }
523
524     protected List<ToscaPolicy> getNativeArtifactPolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
525         return policyTypeGroups.entrySet().stream()
526             .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName()))
527             .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
528     }
529
530     protected List<ToscaPolicy> getNativeControllerPolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
531         return policyTypeGroups.entrySet().stream()
532                        .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName()))
533                        .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
534     }
535
536     /**
537      * Get the policy identifiers.
538      */
539     public List<ToscaConceptIdentifier> getPolicyIds(List<ToscaPolicy> policies) {
540         return policies.stream()
541                        .map(ToscaPolicy::getIdentifier)
542                        .distinct()
543                        .collect(Collectors.toList());
544     }
545
546     protected String getPolicyIdsMessage(List<ToscaPolicy> policies) {
547         return getPolicyIds(policies).toString();
548     }
549
550     protected List<ToscaPolicy> removeByPolicyId(@NonNull List<ToscaPolicy> policies,
551             @NonNull List<ToscaConceptIdentifier> toRemoveList) {
552         policies.removeIf(policy -> toRemoveList.contains(policy.getIdentifier()));
553         return policies;
554     }
555
556     protected List<ToscaPolicy> removeByPolicyId(@NonNull List<ToscaConceptIdentifier> toRemoveList) {
557         return removeByPolicyId(getActivePolicies(), toRemoveList);
558     }
559
560     protected List<ToscaPolicy> mergePolicies(@NonNull List<ToscaPolicy> addPolicies,
561             @NonNull List<ToscaConceptIdentifier> removePolicies) {
562
563         if (addPolicies.isEmpty() && removePolicies.isEmpty()) {
564             return getActivePolicies();
565         }
566
567         List<ToscaPolicy> policies = getActivePolicies();
568         policies.addAll(addPolicies);
569         return removeByPolicyId(new ArrayList<>(new HashSet<>(policies)), removePolicies);
570     }
571
572
573     /**
574      * Do I support the mandatory policy types?.
575      */
576     protected boolean isMandatoryPolicyTypesCompliant() {
577         return getCurrentPolicyTypes().containsAll(getMandatoryPolicyTypes());
578     }
579
580     protected Set<String> getCurrentPolicyTypes() {
581         return getPolicyTypesMap().keySet().stream()
582                        .map(ToscaConceptIdentifier::getName).collect(Collectors.toSet());
583     }
584
585     protected List<ToscaPolicy> getActivePolicies() {
586         return new ArrayList<>(policiesMap.values());
587     }
588
589     /* ** Action Helpers ** */
590
591     private boolean startIo() {
592         return source() && sink();
593     }
594
595     private boolean startTimers() {
596         statusTask =
597                 this.scheduler.scheduleAtFixedRate(this::status, 0, statusTimerSeconds, TimeUnit.SECONDS);
598         return !statusTask.isCancelled() && !statusTask.isDone();
599     }
600
601     private boolean stopIo() {
602         source.unregister(sourceDispatcher);
603         boolean successSource = source.stop();
604         boolean successSink = client.getSink().stop();
605         return successSource && successSink;
606     }
607
608     private boolean stopTimers() {
609         var success = true;
610         if (statusTask != null) {
611             success = statusTask.cancel(false);
612         }
613
614         return success;
615     }
616
617     private void shutdownIo() {
618         client.getSink().shutdown();
619         source.shutdown();
620     }
621
622     private void shutdownTimers() {
623         scheduler.shutdownNow();
624     }
625
626     protected PdpStatus statusPayload(@NonNull PdpState state) {
627         var status = new PdpStatus();
628         status.setName(name);
629         status.setPdpGroup(group);
630         status.setPdpSubgroup(subGroup);
631         status.setState(state);
632         status.setHealthy(isAlive() ? PdpHealthStatus.HEALTHY : PdpHealthStatus.NOT_HEALTHY);
633         status.setPdpType(getPdpType());
634         status.setPolicies(new ArrayList<>(policiesMap.keySet()));
635         status.setStatistics(statisticsPayload());
636         return status;
637     }
638
639     /**
640      * It provides a snapshot of the current statistics.
641      */
642     public PdpStatistics statisticsPayload() {
643         var updateStats = new PdpStatistics(stats);
644         updateStats.setTimeStamp(Instant.now());
645
646         try {
647             BeanUtils.copyProperties(updateStats, PolicyEngineConstants.getManager().getStats().getGroupStat());
648         } catch (IllegalAccessException | InvocationTargetException ex) {
649             logger.debug("statistics mapping failure", ex);
650         }
651
652         return updateStats;
653     }
654
655     private boolean source() {
656         List<TopicSource> sources = TopicEndpointManager.getManager().addTopicSources(properties);
657         if (sources.isEmpty()) {
658             return false;
659         }
660
661         if (sources.size() != 1) {
662             logger.warn("Lifecycle Manager: unexpected: more than one source configured ({})", sources.size());
663         }
664
665         this.source = sources.get(0);
666         this.source.register(this.sourceDispatcher);
667         this.sourceDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), stateChangeFeed);
668         this.sourceDispatcher.register(PdpMessageType.PDP_UPDATE.name(), updateFeed);
669         return source.start();
670     }
671
672     private boolean sink() {
673         List<TopicSink> sinks = TopicEndpointManager.getManager().addTopicSinks(properties);
674         if (sinks.isEmpty()) {
675             logger.error("Lifecycle Manager sinks have not been configured");
676             return false;
677         }
678
679         if (sinks.size() != 1) {
680             logger.warn("Lifecycle Manager: unexpected: more than one sink configured ({})", sinks.size());
681         }
682
683         this.client = new TopicSinkClient(sinks.get(0));
684         return this.client.getSink().start();
685     }
686
687     protected boolean isItMe(String name, String group, String subgroup) {
688         if (Objects.equals(name, getName())) {
689             return true;
690         }
691
692         return name == null && group != null
693             && Objects.equals(group, getGroup())
694             && Objects.equals(subgroup, getSubGroup());
695     }
696
697     /* **** IO listeners ***** */
698
699     /**
700      * PDP State Change Message Listener.
701      */
702     public static class PdpStateChangeFeed extends ScoListener<PdpStateChange> {
703
704         protected final LifecycleFsm fsm;
705
706         protected PdpStateChangeFeed(Class<PdpStateChange> clazz, LifecycleFsm fsm) {
707             super(clazz);
708             this.fsm = fsm;
709         }
710
711         @Override
712         public void onTopicEvent(CommInfrastructure comm, String topic,
713                                  StandardCoderObject coder, PdpStateChange stateChange) {
714
715             if (!isMine(stateChange)) {
716                 logger.warn("pdp-state-chage from {}:{} is invalid: {}", comm, topic, stateChange);
717                 return;
718             }
719
720             fsm.stateChange(stateChange);
721         }
722
723         protected boolean isMine(PdpStateChange change) {
724             if (change == null) {
725                 return false;
726             }
727
728             return fsm.isItMe(change.getName(), change.getPdpGroup(), change.getPdpSubgroup());
729         }
730     }
731
732     /**
733      * PDP Update Message Listener.
734      */
735     public static class PdpUpdateFeed extends ScoListener<PdpUpdate> {
736
737         protected final LifecycleFsm fsm;
738
739         public PdpUpdateFeed(Class<PdpUpdate> clazz, LifecycleFsm fsm) {
740             super(clazz);
741             this.fsm = fsm;
742         }
743
744         @Override
745         public void onTopicEvent(CommInfrastructure comm, String topic,
746                                  StandardCoderObject coder, PdpUpdate update) {
747
748             if (!isMine(update)) {
749                 logger.warn("pdp-update from {}:{} is invalid: {}", comm, topic, update);
750                 return;
751             }
752
753             fsm.update(update);
754         }
755
756         protected boolean isMine(PdpUpdate update) {
757             if (update == null) {
758                 return false;
759             }
760
761             return fsm.isItMe(update.getName(), update.getPdpGroup(), update.getPdpSubgroup());
762         }
763     }
764
765     // these may be overridden by junit tests
766
767     protected ScheduledExecutorService makeExecutor() {
768         var exec = new ScheduledThreadPoolExecutor(1);
769         exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
770         exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
771         exec.setRemoveOnCancelPolicy(true);
772
773         return exec;
774     }
775 }