2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2021, 2023-2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.lifecycle;
24 import com.google.re2j.Pattern;
25 import io.prometheus.client.Counter;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashMap;
29 import java.util.HashSet;
30 import java.util.List;
32 import java.util.Objects;
33 import java.util.Properties;
35 import java.util.concurrent.ScheduledExecutorService;
36 import java.util.concurrent.ScheduledFuture;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.stream.Collectors;
41 import lombok.NonNull;
43 import org.apache.commons.lang3.StringUtils;
44 import org.onap.policy.common.capabilities.Startable;
45 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
46 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
47 import org.onap.policy.common.endpoints.event.comm.TopicSink;
48 import org.onap.policy.common.endpoints.event.comm.TopicSource;
49 import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
50 import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
51 import org.onap.policy.common.endpoints.listeners.ScoListener;
52 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
53 import org.onap.policy.common.utils.coder.StandardCoderObject;
54 import org.onap.policy.common.utils.resources.PrometheusUtils;
55 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
56 import org.onap.policy.drools.policies.DomainMaker;
57 import org.onap.policy.drools.system.PolicyController;
58 import org.onap.policy.drools.system.PolicyEngineConstants;
59 import org.onap.policy.models.pdp.concepts.PdpResponseDetails;
60 import org.onap.policy.models.pdp.concepts.PdpStateChange;
61 import org.onap.policy.models.pdp.concepts.PdpStatus;
62 import org.onap.policy.models.pdp.concepts.PdpUpdate;
63 import org.onap.policy.models.pdp.enums.PdpHealthStatus;
64 import org.onap.policy.models.pdp.enums.PdpMessageType;
65 import org.onap.policy.models.pdp.enums.PdpResponseStatus;
66 import org.onap.policy.models.pdp.enums.PdpState;
67 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
68 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy;
69 import org.slf4j.Logger;
70 import org.slf4j.LoggerFactory;
75 public class LifecycleFsm implements Startable {
78 * Default Status Timer in seconds.
80 public static final long DEFAULT_STATUS_TIMER_SECONDS = 120L;
82 private static final Logger logger = LoggerFactory.getLogger(LifecycleFsm.class);
83 private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*");
85 protected static final String CONFIGURATION_PROPERTIES_NAME = "feature-lifecycle";
86 protected static final String GROUP_NAME = "lifecycle.pdp.group";
87 protected static final String PDP_TYPE = "lifecycle.pdp.type";
88 protected static final String MANDATORY_POLICY_TYPES = "lifecycle.pdp.policytypes";
90 protected static final String DEFAULT_PDP_GROUP = "defaultGroup";
91 protected static final String DEFAULT_PDP_TYPE = "drools";
93 protected static final long MIN_STATUS_INTERVAL_SECONDS = 5L;
94 protected static final String PDP_MESSAGE_NAME = "messageName";
96 protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_RULES =
97 new ToscaConceptIdentifier("onap.policies.native.drools.Artifact", "1.0.0");
99 protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_CONTROLLER =
100 new ToscaConceptIdentifier("onap.policies.native.drools.Controller", "1.0.0");
102 protected static final String PROMETHEUS_NAMESPACE = "pdpd";
104 protected static final Counter deploymentsCounter =
105 Counter.build().namespace(PROMETHEUS_NAMESPACE).name(PrometheusUtils.POLICY_DEPLOYMENTS_METRIC)
106 .labelNames(PrometheusUtils.STATE_METRIC_LABEL,
107 PrometheusUtils.OPERATION_METRIC_LABEL,
108 PrometheusUtils.STATUS_METRIC_LABEL)
109 .help(PrometheusUtils.POLICY_DEPLOYMENT_HELP)
113 protected final Properties properties;
116 protected TopicSource source;
119 protected TopicSinkClient client;
121 protected LifecycleState state = new LifecycleStateTerminated(this);
124 protected ScheduledExecutorService scheduler = makeExecutor();
127 protected ScheduledFuture<?> statusTask;
130 protected MessageTypeDispatcher sourceDispatcher = new MessageTypeDispatcher(PDP_MESSAGE_NAME);
133 protected PdpStateChangeFeed stateChangeFeed = new PdpStateChangeFeed(PdpStateChange.class, this);
136 protected PdpUpdateFeed updateFeed = new PdpUpdateFeed(PdpUpdate.class, this);
140 protected long statusTimerSeconds = DEFAULT_STATUS_TIMER_SECONDS;
143 private String group;
146 protected String subGroup;
150 protected String pdpType;
152 protected volatile String pdpName;
155 protected Set<String> mandatoryPolicyTypes = new HashSet<>();
158 protected final Map<ToscaConceptIdentifier, PolicyTypeController> policyTypesMap = new HashMap<>();
161 protected final Map<ToscaConceptIdentifier, ToscaPolicy> policiesMap = new HashMap<>();
166 public LifecycleFsm() {
167 properties = SystemPersistenceConstants.getManager().getProperties(CONFIGURATION_PROPERTIES_NAME);
168 setGroup(properties.getProperty(GROUP_NAME, DEFAULT_PDP_GROUP));
169 setPdpType(properties.getProperty(PDP_TYPE, DEFAULT_PDP_TYPE));
171 policyTypesMap.put(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER,
172 new PolicyTypeNativeDroolsController(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER, this));
174 POLICY_TYPE_DROOLS_NATIVE_RULES,
175 new PolicyTypeNativeArtifactController(POLICY_TYPE_DROOLS_NATIVE_RULES, this));
177 String commaSeparatedPolicyTypes = properties.getProperty(MANDATORY_POLICY_TYPES);
178 if (!StringUtils.isBlank(commaSeparatedPolicyTypes)) {
179 Collections.addAll(mandatoryPolicyTypes, COMMA_SPACE_PAT.split(commaSeparatedPolicyTypes));
182 logger.info("The mandatory Policy Types are {}. Compliance is {}",
183 mandatoryPolicyTypes, isMandatoryPolicyTypesCompliant());
187 public DomainMaker getDomainMaker() {
188 return PolicyEngineConstants.getManager().getDomainMaker();
192 public boolean isAlive() {
193 return client != null && client.getSink().isAlive();
197 * Returns the PDP Name.
199 public String getPdpName() {
200 if (this.pdpName == null) {
201 this.pdpName = PolicyEngineConstants.getManager().getPdpName();
210 public PdpState state() {
211 return state.state();
217 public synchronized void setGroup(String group) {
224 public synchronized void setSubGroup(String subGroup) {
225 this.subGroup = subGroup;
228 /* ** FSM events - entry points of events into the FSM ** */
231 public synchronized boolean start() {
232 this.pdpName = PolicyEngineConstants.getManager().getPdpName();
233 logger.info("lifecycle event: start engine");
234 return state.start();
238 * Start a controller event.
240 public synchronized void start(@NonNull PolicyController controller) {
241 logger.info("lifecycle event: start controller: {}", controller.getName());
242 if (!controller.getDrools().isBrained()) {
243 logger.warn("ignoring lifecycle event: start controller: {}", controller);
247 for (ToscaConceptIdentifier id : controller.getPolicyTypes()) {
248 PolicyTypeDroolsController ptDc = (PolicyTypeDroolsController) policyTypesMap.get(id); //NOSONAR
250 policyTypesMap.put(id, new PolicyTypeDroolsController(id, this, controller));
251 logger.info("policy-type {} added", id);
253 ptDc.add(controller);
259 * Patch a controller event.
261 public synchronized void patch(@NonNull PolicyController controller) {
262 logger.info("lifecycle event: patch controller: {}", controller.getName());
263 if (controller.getDrools().isBrained()) {
264 this.start(controller);
266 this.stop(controller);
271 public synchronized boolean stop() {
272 logger.info("lifecycle event: stop engine");
277 * Stop a controller event.
279 public synchronized void stop(@NonNull PolicyController controller) {
280 logger.info("lifecycle event: stop controller: {}", controller.getName());
282 List<PolicyTypeDroolsController> opControllers =
283 policyTypesMap.values().stream()
284 .filter(PolicyTypeDroolsController.class::isInstance)
285 .map(PolicyTypeDroolsController.class::cast)
286 .filter(opController -> opController.getControllers().containsKey(controller.getName())).toList();
288 for (PolicyTypeDroolsController opController : opControllers) {
289 opController.remove(controller);
290 if (opController.controllers().isEmpty()) {
291 policyTypesMap.remove(opController.getPolicyType());
292 logger.info("policy-type {} removed", opController.getPolicyType());
298 public synchronized void shutdown() {
299 logger.info("lifecycle event: shutdown engine");
304 * Status reporting event.
305 * @return true if successful
307 public synchronized boolean status() {
308 logger.info("lifecycle event: status");
309 return state.status();
312 public synchronized boolean stateChange(PdpStateChange stateChange) {
313 logger.info("lifecycle event: state-change");
314 return state.stateChange(stateChange);
317 public synchronized boolean update(PdpUpdate update) {
318 logger.info("lifecycle event: update");
319 return state.update(update);
322 /* FSM State Actions (executed sequentially) */
324 protected boolean startAction() {
329 return startIo() && startTimers();
332 protected boolean stopAction() {
337 boolean successTimers = stopTimers();
338 boolean successIo = stopIo();
339 return successTimers && successIo;
342 protected void shutdownAction() {
347 protected boolean statusAction() {
348 return statusAction(null);
351 protected boolean statusAction(PdpResponseDetails response) {
352 return statusAction(state(), response);
355 protected boolean statusAction(PdpState state, PdpResponseDetails response) {
360 PdpStatus status = statusPayload(state);
361 if (response != null) {
362 status.setRequestId(response.getResponseTo());
363 status.setResponse(response);
366 return client.send(status);
369 protected synchronized void transitionToAction(@NonNull LifecycleState newState) {
373 protected boolean setStatusIntervalAction(long intervalSeconds) {
374 if (intervalSeconds == statusTimerSeconds || intervalSeconds == 0) {
378 if (intervalSeconds <= MIN_STATUS_INTERVAL_SECONDS) {
379 logger.warn("interval is too low (< {}): {}", MIN_STATUS_INTERVAL_SECONDS, intervalSeconds);
383 setStatusTimerSeconds(intervalSeconds);
384 return stopTimers() && startTimers();
387 protected List<ToscaPolicy> getDeployablePoliciesAction(@NonNull List<ToscaPolicy> policies) {
388 List<ToscaPolicy> deployPolicies = new ArrayList<>(policies);
389 deployPolicies.removeAll(getActivePolicies());
391 // Ensure that the sequence of policy deployments is sane to minimize potential errors,
392 // First policies to deploy are the controller related ones, those that affect the lifecycle of
393 // controllers, starting with the ones that affect the existence of the controller (native controller),
394 // second the ones that "brain" the controller with application logic (native artifacts).
395 // Lastly the application specific ones such as operational policies.
397 // group policies by policy types
398 Map<String, List<ToscaPolicy>> policyTypeGroups = groupPoliciesByPolicyType(deployPolicies);
400 // place native controller policies at the start of the list
401 List<ToscaPolicy> orderedDeployableList = new ArrayList<>(
402 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList()));
404 // add to the working list the native controller policies
405 orderedDeployableList.addAll(
406 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList()));
408 // place non-native policies to place at the end of the list
409 orderedDeployableList.addAll(getNonNativePolicies(policyTypeGroups));
411 return orderedDeployableList;
414 protected List<ToscaPolicy> getUndeployablePoliciesAction(@NonNull List<ToscaPolicy> policies) {
415 List<ToscaPolicy> undeployPolicies = new ArrayList<>(getActivePolicies());
416 undeployPolicies.removeAll(policies);
417 if (undeployPolicies.isEmpty()) {
418 return undeployPolicies;
421 // Ensure that the sequence of policy undeployments is sane to minimize potential errors,
422 // as it is assumed not smart ordering from the policies sent by the PAP.
423 // First policies to undeploy are those that are only of relevance within a drools container,
424 // such as the operational policies. The next set of policies to undeploy are those that
425 // affect the overall PDP-D application support, firstly the ones that supports the
426 // application software wiring (native rules policies), and second those that relate
427 // to the PDP-D controllers lifecycle.
429 // group policies by policy types
430 Map<String, List<ToscaPolicy>> policyTypeGroups = groupPoliciesByPolicyType(undeployPolicies);
432 // place controller only (non-native policies) at the start of the list of the undeployment list
433 List<ToscaPolicy> orderedUndeployableList = getNonNativePolicies(policyTypeGroups);
435 // add to the working list the native rules policies if any
436 orderedUndeployableList.addAll(
437 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList()));
439 // finally add to the working list native controller policies if any
440 orderedUndeployableList.addAll(
441 policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList()));
443 return orderedUndeployableList;
446 protected void deployedPolicyAction(@NonNull ToscaPolicy policy) {
447 policiesMap.computeIfAbsent(policy.getIdentifier(), key -> {
448 // avoid counting reapplies in a second pass when a mix of native and non-native
449 // policies are present.
450 deploymentsCounter.labels(state.state().name(),
451 PrometheusUtils.DEPLOY_OPERATION,
452 PdpResponseStatus.SUCCESS.name()).inc();
457 protected void undeployedPolicyAction(@NonNull ToscaPolicy policy) {
458 policiesMap.computeIfPresent(policy.getIdentifier(), (key, value) -> {
459 // avoid counting reapplies in a second pass when a mix of native and non-native
460 // policies are present.
461 deploymentsCounter.labels(state.state().name(),
462 PrometheusUtils.UNDEPLOY_OPERATION,
463 PdpResponseStatus.SUCCESS.name()).inc();
468 protected void failedDeployPolicyAction(@NonNull ToscaPolicy failedPolicy) { // NOSONAR
469 deploymentsCounter.labels(state.state().name(),
470 PrometheusUtils.DEPLOY_OPERATION,
471 PdpResponseStatus.FAIL.name()).inc();
474 protected void failedUndeployPolicyAction(ToscaPolicy failedPolicy) {
475 deploymentsCounter.labels(state.state().name(),
476 PrometheusUtils.UNDEPLOY_OPERATION,
477 PdpResponseStatus.FAIL.name()).inc();
478 policiesMap.remove(failedPolicy.getIdentifier());
481 protected List<ToscaPolicy> resetPoliciesAction() {
482 List<ToscaPolicy> policies = new ArrayList<>(getActivePolicies());
487 protected void updatePoliciesAction(List<ToscaPolicy> toscaPolicies) {
488 this.scheduler.submit(() -> state.updatePolicies(toscaPolicies));
491 protected PolicyTypeController getController(ToscaConceptIdentifier policyType) {
492 return policyTypesMap.get(policyType);
495 protected Map<String, List<ToscaPolicy>> groupPoliciesByPolicyType(List<ToscaPolicy> deployPolicies) {
496 return deployPolicies.stream()
498 .collect(Collectors.groupingBy(policy -> policy.getTypeIdentifier().getName()));
501 protected List<ToscaPolicy> getNonNativePolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
502 return policyTypeGroups.entrySet().stream()
503 .filter(entry -> !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName())
504 && !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName()))
505 .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
508 protected List<ToscaPolicy> getNativeArtifactPolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
509 return policyTypeGroups.entrySet().stream()
510 .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName()))
511 .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
514 protected List<ToscaPolicy> getNativeControllerPolicies(@NonNull Map<String, List<ToscaPolicy>> policyTypeGroups) {
515 return policyTypeGroups.entrySet().stream()
516 .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName()))
517 .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList());
521 * Get the policy identifiers.
523 public List<ToscaConceptIdentifier> getPolicyIds(List<ToscaPolicy> policies) {
524 return policies.stream()
525 .map(ToscaPolicy::getIdentifier)
527 .collect(Collectors.toList());
530 protected String getPolicyIdsMessage(List<ToscaPolicy> policies) {
531 return getPolicyIds(policies).toString();
534 protected List<ToscaPolicy> removeByPolicyId(@NonNull List<ToscaPolicy> policies,
535 @NonNull List<ToscaConceptIdentifier> toRemoveList) {
536 policies.removeIf(policy -> toRemoveList.contains(policy.getIdentifier()));
540 protected List<ToscaPolicy> removeByPolicyId(@NonNull List<ToscaConceptIdentifier> toRemoveList) {
541 return removeByPolicyId(getActivePolicies(), toRemoveList);
544 protected List<ToscaPolicy> mergePolicies(@NonNull List<ToscaPolicy> addPolicies,
545 @NonNull List<ToscaConceptIdentifier> removePolicies) {
547 if (addPolicies.isEmpty() && removePolicies.isEmpty()) {
548 return getActivePolicies();
551 List<ToscaPolicy> policies = getActivePolicies();
552 policies.addAll(addPolicies);
553 return removeByPolicyId(new ArrayList<>(new HashSet<>(policies)), removePolicies);
558 * Do I support the mandatory policy types?.
560 protected boolean isMandatoryPolicyTypesCompliant() {
561 return getCurrentPolicyTypes().containsAll(getMandatoryPolicyTypes());
564 protected Set<String> getCurrentPolicyTypes() {
565 return getPolicyTypesMap().keySet().stream()
566 .map(ToscaConceptIdentifier::getName).collect(Collectors.toSet());
569 protected List<ToscaPolicy> getActivePolicies() {
570 return new ArrayList<>(policiesMap.values());
573 /* ** Action Helpers ** */
575 private boolean startIo() {
576 return source() && sink();
579 private boolean startTimers() {
581 this.scheduler.scheduleAtFixedRate(this::status, 0, statusTimerSeconds, TimeUnit.SECONDS);
582 return !statusTask.isCancelled() && !statusTask.isDone();
585 private boolean stopIo() {
586 source.unregister(sourceDispatcher);
587 boolean successSource = source.stop();
588 boolean successSink = client.getSink().stop();
589 return successSource && successSink;
592 private boolean stopTimers() {
594 if (statusTask != null) {
595 success = statusTask.cancel(false);
601 private void shutdownIo() {
602 client.getSink().shutdown();
606 private void shutdownTimers() {
607 scheduler.shutdownNow();
610 protected PdpStatus statusPayload(@NonNull PdpState state) {
611 var status = new PdpStatus();
612 status.setName(getPdpName());
613 status.setPdpGroup(group);
614 status.setPdpSubgroup(subGroup);
615 status.setState(state);
616 status.setHealthy(isAlive() ? PdpHealthStatus.HEALTHY : PdpHealthStatus.NOT_HEALTHY);
617 status.setPdpType(getPdpType());
618 status.setPolicies(new ArrayList<>(policiesMap.keySet()));
622 private boolean source() {
623 List<TopicSource> sources = TopicEndpointManager.getManager().addTopicSources(properties);
624 if (sources.isEmpty()) {
628 if (sources.size() != 1) {
629 logger.warn("Lifecycle Manager: unexpected: more than one source configured ({})", sources.size());
632 this.source = sources.get(0);
633 this.source.register(this.sourceDispatcher);
634 this.sourceDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), stateChangeFeed);
635 this.sourceDispatcher.register(PdpMessageType.PDP_UPDATE.name(), updateFeed);
636 return source.start();
639 private boolean sink() {
640 List<TopicSink> sinks = TopicEndpointManager.getManager().addTopicSinks(properties);
641 if (sinks.isEmpty()) {
642 logger.error("Lifecycle Manager sinks have not been configured");
646 if (sinks.size() != 1) {
647 logger.warn("Lifecycle Manager: unexpected: more than one sink configured ({})", sinks.size());
650 this.client = new TopicSinkClient(sinks.get(0));
651 return this.client.getSink().start();
654 protected boolean isItMe(String name, String group, String subgroup) {
655 if (Objects.equals(name, getPdpName())) {
659 return name == null && group != null
660 && Objects.equals(group, getGroup())
661 && Objects.equals(subgroup, getSubGroup());
664 /* **** IO listeners ***** */
667 * PDP State Change Message Listener.
669 public static class PdpStateChangeFeed extends ScoListener<PdpStateChange> {
671 protected final LifecycleFsm fsm;
673 protected PdpStateChangeFeed(Class<PdpStateChange> clazz, LifecycleFsm fsm) {
679 public void onTopicEvent(CommInfrastructure comm, String topic,
680 StandardCoderObject coder, PdpStateChange stateChange) {
682 if (!isMine(stateChange)) {
683 logger.warn("pdp-state-chage from {}:{} is invalid: {}", comm, topic, stateChange);
687 fsm.stateChange(stateChange);
690 protected boolean isMine(PdpStateChange change) {
691 if (change == null) {
695 return fsm.isItMe(change.getName(), change.getPdpGroup(), change.getPdpSubgroup());
700 * PDP Update Message Listener.
702 public static class PdpUpdateFeed extends ScoListener<PdpUpdate> {
704 protected final LifecycleFsm fsm;
706 public PdpUpdateFeed(Class<PdpUpdate> clazz, LifecycleFsm fsm) {
712 public void onTopicEvent(CommInfrastructure comm, String topic,
713 StandardCoderObject coder, PdpUpdate update) {
715 if (!isMine(update)) {
716 logger.warn("pdp-update from {}:{} is invalid: {}", comm, topic, update);
723 protected boolean isMine(PdpUpdate update) {
724 if (update == null) {
728 return fsm.isItMe(update.getName(), update.getPdpGroup(), update.getPdpSubgroup());
732 // these may be overridden by junit tests
734 protected ScheduledExecutorService makeExecutor() {
735 var exec = new ScheduledThreadPoolExecutor(1);
736 exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
737 exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
738 exec.setRemoveOnCancelPolicy(true);