Removing deprecated DMAAP library
[policy/drools-pdp.git] / policy-management / src / main / java / org / onap / policy / drools / system / PolicyEngineManager.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP
4  * ================================================================================
5  * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6  * Modifications Copyright (C) 2024 Nordix Foundation.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.drools.system;
23
24 import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_HOST;
25 import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
26 import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
27
28 import com.google.gson.Gson;
29 import com.google.gson.GsonBuilder;
30 import io.prometheus.client.Summary;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Objects;
34 import java.util.Properties;
35 import java.util.UUID;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.atomic.AtomicReference;
39 import java.util.function.BiConsumer;
40 import java.util.function.Consumer;
41 import java.util.stream.Collectors;
42 import java.util.stream.Stream;
43 import lombok.AccessLevel;
44 import lombok.Getter;
45 import lombok.NonNull;
46 import lombok.Setter;
47 import lombok.Synchronized;
48 import lombok.ToString;
49 import org.apache.commons.lang3.StringUtils;
50 import org.onap.policy.common.endpoints.event.comm.Topic;
51 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
52 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
53 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
54 import org.onap.policy.common.endpoints.event.comm.TopicSink;
55 import org.onap.policy.common.endpoints.event.comm.TopicSource;
56 import org.onap.policy.common.endpoints.http.client.HttpClient;
57 import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
58 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
59 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
60 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory;
61 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
62 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
63 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
64 import org.onap.policy.common.gson.annotation.GsonJsonProperty;
65 import org.onap.policy.common.utils.logging.LoggerUtils;
66 import org.onap.policy.common.utils.network.NetworkUtil;
67 import org.onap.policy.common.utils.resources.PrometheusUtils;
68 import org.onap.policy.common.utils.services.FeatureApiUtils;
69 import org.onap.policy.drools.controller.DroolsControllerConstants;
70 import org.onap.policy.drools.core.PolicyContainer;
71 import org.onap.policy.drools.core.jmx.PdpJmxListener;
72 import org.onap.policy.drools.core.lock.Lock;
73 import org.onap.policy.drools.core.lock.LockCallback;
74 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
75 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
76 import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants;
77 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
78 import org.onap.policy.drools.features.PolicyEngineFeatureApiConstants;
79 import org.onap.policy.drools.metrics.Metric;
80 import org.onap.policy.drools.persistence.SystemPersistence;
81 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
82 import org.onap.policy.drools.policies.DomainMaker;
83 import org.onap.policy.drools.properties.DroolsPropertyConstants;
84 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
85 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
86 import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
87 import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
88 import org.onap.policy.drools.server.restful.RestManager;
89 import org.onap.policy.drools.stats.PolicyStatsManager;
90 import org.onap.policy.drools.system.internal.SimpleLockManager;
91 import org.onap.policy.drools.utils.PropertyUtil;
92 import org.onap.policy.drools.utils.logging.MdcTransaction;
93 import org.onap.policy.models.pdp.enums.PdpResponseStatus;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
96
97 /**
98  * Policy Engine Manager Implementation.
99  */
100 @ToString(onlyExplicitlyIncluded = true)
101 class PolicyEngineManager implements PolicyEngine {
102     /**
103      * String literals.
104      */
105     private static final String INVALID_TOPIC_MSG = "Invalid Topic";
106     private static final String INVALID_EVENT_MSG = "Invalid Event";
107
108     private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped";
109     private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked";
110
111     public static final String EXECUTOR_THREAD_PROP = "executor.threads";
112     protected static final int DEFAULT_EXECUTOR_THREADS = 5;
113
114     public static final String CLUSTER_NAME_PROP = "engine.cluster";
115
116     /**
117      * logger.
118      */
119     private static final Logger logger = LoggerFactory.getLogger(PolicyEngineManager.class);
120
121     /**
122      * Is the Policy Engine running.
123      */
124     @Getter
125     @ToString.Include
126     private volatile boolean alive = false;
127
128     /**
129      * Is the engine locked.
130      */
131     @Getter
132     @ToString.Include
133     private volatile boolean locked = false;
134
135     /**
136      * Properties used to initialize the engine.
137      */
138     private Properties properties;
139
140     /**
141      * Environment Properties.
142      */
143     private final Properties environment = new Properties();
144
145     /**
146      * Policy Engine Sources.
147      */
148     @Getter
149     private List<TopicSource> sources = new ArrayList<>();
150
151     /**
152      * Policy Engine Sinks.
153      */
154     @Getter
155     private List<TopicSink> sinks = new ArrayList<>();
156
157     /**
158      * Policy Engine HTTP Servers.
159      */
160     @Getter
161     private List<HttpServletServer> httpServers = new ArrayList<>();
162
163     /**
164      * Thread pool used to execute background tasks.
165      */
166     private ScheduledExecutorService executorService = null;
167
168     /**
169      * Lock manager used to create locks.
170      */
171     @Getter(AccessLevel.PROTECTED)
172     private PolicyResourceLockManager lockManager = null;
173
174     private final DomainMaker domainMaker = new DomainMaker();
175
176     @Getter
177     private final PolicyStatsManager stats = new PolicyStatsManager();
178
179     @Getter(onMethod_ = {@Synchronized})
180     @Setter(onMethod_ = {@Synchronized})
181     private String clusterName = UUID.randomUUID().toString();
182
183     @Getter(onMethod_ = {@Synchronized})
184     @Setter(onMethod_ = {@Synchronized})
185     private String hostName = NetworkUtil.getHostname();
186
187     @Getter(onMethod_ = {@Synchronized})
188     @Setter(onMethod_ = {@Synchronized})
189     private String pdpName;
190
191     /**
192      * gson parser to decode configuration requests.
193      */
194     private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create();
195
196     protected static final String CONTROLLOOP_NAME_LABEL = "controlloop";
197     protected static final String CONTROLLER_LABEL = "controller";
198     protected static final String POLICY_LABEL = "policy";
199
200     protected static final Summary transLatencySecsSummary =
201             Summary.build().namespace(PrometheusUtils.PdpType.PDPD.getNamespace())
202                     .name(PrometheusUtils.POLICY_EXECUTIONS_LATENCY_SECONDS_METRIC)
203                     .labelNames(CONTROLLER_LABEL,
204                             CONTROLLOOP_NAME_LABEL,
205                             POLICY_LABEL,
206                             PrometheusUtils.STATUS_METRIC_LABEL)
207                     .help(PrometheusUtils.POLICY_EXECUTIONS_LATENCY_SECONDS_HELP)
208                     .register();
209
210
211     @Override
212     public synchronized void boot(String[] cliArgs) {
213
214         if (FeatureApiUtils.apply(getEngineProviders(),
215             feature -> feature.beforeBoot(this, cliArgs),
216             (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this,
217                             feature.getClass().getName(), ex.getMessage(), ex))) {
218             return;
219         }
220
221         try {
222             globalInitContainer(cliArgs);
223         } catch (final Exception e) {
224             logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
225         }
226
227         FeatureApiUtils.apply(getEngineProviders(),
228             feature -> feature.afterBoot(this),
229             (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this,
230                             feature.getClass().getName(), ex.getMessage(), ex));
231     }
232
233     @Override
234     public synchronized void setEnvironment(Properties properties) {
235         this.environment.putAll(PropertyUtil.getInterpolatedProperties(properties));
236     }
237
238     @GsonJsonIgnore
239     @Override
240     public synchronized Properties getEnvironment() {
241         return this.environment;
242     }
243
244     @GsonJsonIgnore
245     @Override
246     public DomainMaker getDomainMaker() {
247         return this.domainMaker;
248     }
249
250     @Override
251     public synchronized String getEnvironmentProperty(String envKey) {
252         String value = this.environment.getProperty(envKey);
253         if (value == null) {
254             value = System.getProperty(envKey);
255             if (value == null) {
256                 value = System.getenv(envKey);
257             }
258         }
259         return value;
260     }
261
262     @Override
263     public synchronized String setEnvironmentProperty(String envKey, String envValue) {
264         return (String) this.environment.setProperty(envKey, envValue);
265     }
266
267     @Override
268     public final Properties defaultTelemetryConfig() {
269         final var defaultConfig = new Properties();
270
271         defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, "TELEMETRY");
272         defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
273                 + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, TELEMETRY_SERVER_DEFAULT_HOST);
274         defaultConfig.put(
275                 PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
276                         + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
277                 "" + TELEMETRY_SERVER_DEFAULT_PORT);
278         defaultConfig.put(
279                 PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
280                         + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX,
281                 RestManager.class.getPackage().getName());
282         defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
283                 + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "" + Boolean.TRUE);
284         defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
285                 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "" + Boolean.FALSE);
286
287         return defaultConfig;
288     }
289
290     @Override
291     public void metric(String controllerName, String policyName, Metric metric) {
292         // sub-operations are not being tracked
293     }
294
295     @Override
296     public void transaction(@NonNull String controllerName,                         // NOSONAR placeholder
297             @NonNull String controlLoopName, @NonNull Metric transaction) {
298
299         // keeping stats on a per control loop name,
300         // applications must report the controller name too
301         // for completeness and to avoid being modified when/if
302         // the controller name is used for tracking purposes
303
304         getStats().stat(controlLoopName, transaction);
305
306         Long elapsedTime = transaction.getElapsedTime();
307         String policyName = transaction.getServiceInstanceId();
308         if (Objects.isNull(elapsedTime) || StringUtils.isEmpty(policyName)) {
309             logger.warn("{} transaction in controller {} incomplete transaction object: {}",
310                     controlLoopName, controllerName, transaction);
311             return;
312         }
313
314         transLatencySecsSummary
315             .labels(controllerName,
316                     controlLoopName,
317                     policyName,
318                     transaction.isSuccess() ? PdpResponseStatus.SUCCESS.name() : PdpResponseStatus.FAIL.name())
319             .observe(transaction.getElapsedTime() / 1000D);
320     }
321
322     @Override
323     @GsonJsonIgnore
324     public ScheduledExecutorService getExecutorService() {
325         return executorService;
326     }
327
328     private ScheduledExecutorService makeExecutorService(Properties properties) {
329         int nthreads = DEFAULT_EXECUTOR_THREADS;
330
331         try {
332             nthreads = Integer.parseInt(
333                             properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS)));
334
335         } catch (NumberFormatException e) {
336             logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e);
337         }
338
339         return makeScheduledExecutor(nthreads);
340     }
341
342     private void createLockManager(Properties properties) {
343         for (PolicyEngineFeatureApi feature : getEngineProviders()) {
344             try {
345                 this.lockManager = feature.beforeCreateLockManager(this, properties);
346                 if (this.lockManager != null) {
347                     logger.info("overridden lock manager is {}", this.lockManager);
348                     return;
349                 }
350             } catch (RuntimeException e) {
351                 logger.error("{}: feature {} before-create-lock-manager failure because of {}", this,
352                                 feature.getClass().getName(), e.getMessage(), e);
353             }
354         }
355
356         try {
357             this.lockManager = new SimpleLockManager(this, properties);
358         } catch (RuntimeException e) {
359             logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e);
360             this.lockManager = new SimpleLockManager(this, new Properties());
361         }
362
363         logger.info("lock manager is {}", this.lockManager);
364
365         /* policy-engine dispatch post operation hook */
366         FeatureApiUtils.apply(getEngineProviders(),
367             feature -> feature.afterCreateLockManager(this, properties, this.lockManager),
368             (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}",
369                             this, feature.getClass().getName(), ex.getMessage(), ex));
370     }
371
372     @Override
373     public synchronized void configure(Properties properties) {
374
375         if (properties == null) {
376             logger.warn("No properties provided");
377             throw new IllegalArgumentException("No properties provided");
378         }
379
380         /* policy-engine dispatch pre configure hook */
381         if (FeatureApiUtils.apply(getEngineProviders(),
382             feature -> feature.beforeConfigure(this, properties),
383             (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this,
384                             feature.getClass().getName(), ex.getMessage(), ex))) {
385             return;
386         }
387
388         this.properties = properties;
389         if (!StringUtils.isBlank(this.properties.getProperty(CLUSTER_NAME_PROP))) {
390             this.clusterName = this.properties.getProperty(CLUSTER_NAME_PROP, this.clusterName);
391         }
392         this.pdpName = hostName + "." + this.clusterName;
393
394         try {
395             this.sources = getTopicEndpointManager().addTopicSources(properties);
396             for (final TopicSource source : this.sources) {
397                 source.register(this);
398             }
399         } catch (final Exception e) {
400             logger.error("{}: add-sources failed", this, e);
401         }
402
403         try {
404             this.sinks = getTopicEndpointManager().addTopicSinks(properties);
405         } catch (final IllegalArgumentException e) {
406             logger.error("{}: add-sinks failed", this, e);
407         }
408
409         try {
410             this.httpServers = getServletFactory().build(properties);
411         } catch (final IllegalArgumentException e) {
412             logger.error("{}: add-http-servers failed", this, e);
413         }
414
415         executorService = makeExecutorService(properties);
416
417         createLockManager(properties);
418
419         /* policy-engine dispatch post configure hook */
420         FeatureApiUtils.apply(getEngineProviders(),
421             feature -> feature.afterConfigure(this),
422             (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this,
423                             feature.getClass().getName(), ex.getMessage(), ex));
424     }
425
426     @Override
427     public boolean configure(PdpdConfiguration config) {
428
429         if (config == null) {
430             throw new IllegalArgumentException("No configuration provided");
431         }
432
433         final String entity = config.getEntity();
434
435         var mdcTrans = MdcTransaction.newTransaction(config.getRequestId(), "brmsgw");
436         if (this.getSources().size() == 1) {
437             Topic topic = this.getSources().get(0);
438             mdcTrans.setServiceName(topic.getTopic()).setRemoteHost(topic.getServers().toString())
439                     .setTargetEntity(config.getEntity());
440         }
441
442
443         if (PdpdConfiguration.CONFIG_ENTITY_CONTROLLER.equals(entity)) {
444             boolean success = controllerConfig(config);
445             mdcTrans.resetSubTransaction().setStatusCode(success).transaction();
446             return success;
447
448         } else {
449             final String msg = "Configuration Entity is not supported: " + entity;
450             mdcTrans.resetSubTransaction().setStatusCode(false).setResponseDescription(msg).flush();
451             logger.warn(LoggerUtils.TRANSACTION_LOG_MARKER, msg);
452             throw new IllegalArgumentException(msg);
453         }
454     }
455
456     @Override
457     public synchronized PolicyController createPolicyController(String name, Properties properties) {
458         String tempName = name;
459         // check if a PROPERTY_CONTROLLER_NAME property is present
460         // if so, override the given name
461
462         final String propertyControllerName = properties.getProperty(DroolsPropertyConstants.PROPERTY_CONTROLLER_NAME);
463         if (propertyControllerName != null && !propertyControllerName.isEmpty()) {
464             if (!propertyControllerName.equals(tempName)) {
465                 throw new IllegalStateException("Proposed name (" + tempName + ") and properties name ("
466                         + propertyControllerName + ") don't match");
467             }
468             tempName = propertyControllerName;
469         }
470
471         PolicyController controller;
472         for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
473             try {
474                 controller = controllerFeature.beforeCreate(tempName, properties);
475                 if (controller != null) {
476                     return controller;
477                 }
478             } catch (final Exception e) {
479                 logger.error("{}: feature {} before-controller-create failure because of {}", this,
480                         controllerFeature.getClass().getName(), e.getMessage(), e);
481             }
482         }
483
484         controller = getControllerFactory().build(tempName, properties);
485         if (this.isLocked()) {
486             controller.lock();
487         }
488
489         // feature hook
490         PolicyController controller2 = controller;
491         FeatureApiUtils.apply(getControllerProviders(),
492             feature -> feature.afterCreate(controller2),
493             (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}",
494                             this, feature.getClass().getName(), ex.getMessage(), ex));
495
496         return controller;
497     }
498
499
500     @Override
501     public List<PolicyController> updatePolicyControllers(List<ControllerConfiguration> configControllers) {
502
503         final List<PolicyController> policyControllers = new ArrayList<>();
504         if (configControllers == null || configControllers.isEmpty()) {
505             logger.info("No controller configuration provided: {}", configControllers);
506             return policyControllers;
507         }
508
509         for (final ControllerConfiguration configController : configControllers) {
510             MdcTransaction mdcTrans = MdcTransaction.newSubTransaction(null).setTargetEntity(configController.getName())
511                     .setTargetServiceName(configController.getOperation())
512                     .setTargetVirtualEntity("" + configController.getDrools());
513             try {
514                 var policyController = this.updatePolicyController(configController);
515                 policyControllers.add(policyController);
516                 mdcTrans.setStatusCode(true).transaction();
517             } catch (final Exception e) {
518                 mdcTrans.setStatusCode(false).setResponseCode(e.getClass().getName())
519                         .setResponseDescription(e.getMessage()).flush();
520                 logger.error(LoggerUtils.TRANSACTION_LOG_MARKER,
521                         "{}: cannot update-policy-controllers because of {}", this, e.getMessage(), e);
522             }
523         }
524
525         return policyControllers;
526     }
527
528     @Override
529     public synchronized PolicyController updatePolicyController(ControllerConfiguration configController) {
530
531         if (configController == null) {
532             throw new IllegalArgumentException("No controller configuration has been provided");
533         }
534
535         final String controllerName = configController.getName();
536         if (controllerName == null || controllerName.isEmpty()) {
537             logger.warn("controller-name  must be provided");
538             throw new IllegalArgumentException("No controller configuration has been provided");
539         }
540
541         try {
542             final String operation = configController.getOperation();
543             if (operation == null || operation.isEmpty()) {
544                 logger.warn("operation must be provided");
545                 throw new IllegalArgumentException("operation must be provided");
546             }
547
548             var policyController = getController(controllerName);
549             if (policyController == null) {
550                 policyController = findController(controllerName, operation);
551
552                 /* fall through to do brain update operation */
553             }
554
555             updateController(controllerName, policyController, operation, configController);
556
557             return policyController;
558         } catch (final Exception e) {
559             logger.error("{}: cannot update-policy-controller", this);
560             throw e;
561         } catch (final LinkageError e) {
562             logger.error("{}: cannot update-policy-controllers (rules)", this);
563             throw new IllegalStateException(e);
564         }
565     }
566
567     private PolicyController getController(final String controllerName) {
568         PolicyController policyController = null;
569         try {
570             policyController = getControllerFactory().get(controllerName);
571         } catch (final IllegalArgumentException e) {
572             // not found
573             logger.warn("Policy Controller {} not found", controllerName, e);
574         }
575         return policyController;
576     }
577
578     private PolicyController findController(final String controllerName, final String operation) {
579         if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
580                 || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
581             throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
582         }
583
584         /* Recovery case */
585
586         logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
587
588         var controllerProperties =
589                 getPersistenceManager().getControllerProperties(controllerName);
590
591         /*
592          * returned properties cannot be null (per implementation) assert (properties !=
593          * null)
594          */
595
596         if (controllerProperties == null) {
597             throw new IllegalArgumentException(controllerName + " is invalid");
598         }
599
600         logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
601                 controllerName);
602
603         /*
604          * try to bring up bad controller in brainless mode, after having it
605          * working, apply the new create/update operation.
606          */
607         controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
608                         DroolsControllerConstants.NO_GROUP_ID);
609         controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
610                         DroolsControllerConstants.NO_ARTIFACT_ID);
611         controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
612                         DroolsControllerConstants.NO_VERSION);
613
614         return getPolicyEngine().createPolicyController(controllerName, controllerProperties);
615     }
616
617     private void updateController(final String controllerName, PolicyController policyController,
618                     final String operation, ControllerConfiguration configController) {
619         switch (operation) {
620             case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
621                 getControllerFactory().patch(policyController, configController.getDrools());
622                 break;
623             case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
624                 policyController.unlock();
625                 getControllerFactory().patch(policyController, configController.getDrools());
626                 break;
627             case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
628                 policyController.lock();
629                 break;
630             case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
631                 policyController.unlock();
632                 break;
633             default:
634                 final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
635                         + controllerName;
636                 logger.warn(msg);
637                 throw new IllegalArgumentException(msg);
638         }
639     }
640
641     @Override
642     public synchronized boolean start() {
643
644         /* policy-engine dispatch pre start hook */
645         if (FeatureApiUtils.apply(getEngineProviders(),
646             feature -> feature.beforeStart(this),
647             (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
648                             feature.getClass().getName(), ex.getMessage(), ex))) {
649             return true;
650         }
651
652         if (this.locked) {
653             throw new IllegalStateException(ENGINE_LOCKED_MSG);
654         }
655
656         this.alive = true;
657
658         AtomicReference<Boolean> success = new AtomicReference<>(true);
659
660         try {
661             success.compareAndSet(true, this.lockManager.start());
662         } catch (final RuntimeException e) {
663             logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e);
664             success.set(false);
665         }
666
667         /* Start managed and unmanaged http servers */
668
669         attempt(success,
670             Stream.concat(getServletFactory().inventory().stream(), this.httpServers.stream())
671                 .collect(Collectors.toList()),
672             httpServer -> httpServer.waitedStart(10 * 1000L),
673             (item, ex) -> logger.error("{}: cannot start http-server {} because of {}", this, item,
674                 ex.getMessage(), ex));
675
676         /* Start managed Http Clients */
677
678         attempt(success, getHttpClientFactory().inventory(),
679             HttpClient::start,
680             (item, ex) -> logger.error("{}: cannot start http-client {} because of {}",
681                 this, item, ex.getMessage(), ex));
682
683         /* Start Policy Controllers */
684
685         attempt(success, getControllerFactory().inventory(),
686             PolicyController::start,
687             (item, ex) -> {
688                 logger.error("{}: cannot start policy-controller {} because of {}", this, item,
689                                 ex.getMessage(), ex);
690                 success.set(false);
691             });
692
693         /* Start managed Topic Endpoints */
694
695         try {
696             if (!getTopicEndpointManager().start()) {
697                 success.set(false);
698             }
699         } catch (final IllegalStateException e) {
700             logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
701         }
702
703         // Start the JMX listener
704
705         startPdpJmxListener();
706
707         /* policy-engine dispatch after start hook */
708         FeatureApiUtils.apply(getEngineProviders(),
709             feature -> feature.afterStart(this),
710             (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
711                             feature.getClass().getName(), ex.getMessage(), ex));
712
713         return success.get();
714     }
715
716     @Override
717     public synchronized boolean open() {
718
719         /* pre-open hook */
720         if (FeatureApiUtils.apply(getEngineProviders(),
721             feature -> feature.beforeOpen(this),
722             (feature, ex) -> logger.error("{}: feature {} before-open failure because of {}", this,
723                 feature.getClass().getName(), ex.getMessage(), ex))) {
724             return true;
725         }
726
727         if (this.locked) {
728             throw new IllegalStateException(ENGINE_LOCKED_MSG);
729         }
730
731         if (!this.alive) {
732             throw new IllegalStateException(ENGINE_STOPPED_MSG);
733         }
734
735         AtomicReference<Boolean> success = new AtomicReference<>(true);
736
737         /* Open the unmanaged topics to external components for configuration purposes */
738
739         attempt(success, this.sources,
740             TopicSource::start,
741             (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
742                 this, item, ex.getMessage(), ex));
743
744         attempt(success, this.sinks,
745             TopicSink::start,
746             (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
747                 this, item, ex.getMessage(), ex));
748
749         /* post-open hook */
750         FeatureApiUtils.apply(getEngineProviders(),
751             feature -> feature.afterOpen(this),
752             (feature, ex) -> logger.error("{}: feature {} after-open failure because of {}", this,
753                 feature.getClass().getName(), ex.getMessage(), ex));
754
755         return success.get();
756     }
757
758     @FunctionalInterface
759     private interface PredicateWithEx<T> {
760         boolean test(T value) throws InterruptedException;
761     }
762
763     @Override
764     public synchronized boolean stop() {
765
766         /* policy-engine dispatch pre stop hook */
767         if (FeatureApiUtils.apply(getEngineProviders(),
768             feature -> feature.beforeStop(this),
769             (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
770                             feature.getClass().getName(), ex.getMessage(), ex))) {
771             return true;
772         }
773
774         /* stop regardless of the lock state */
775
776         if (!this.alive) {
777             return true;
778         }
779
780         this.alive = false;
781
782         AtomicReference<Boolean> success = new AtomicReference<>(true);
783
784         attempt(success, getControllerFactory().inventory(),
785             PolicyController::stop,
786             (item, ex) -> {
787                 logger.error("{}: cannot stop policy-controller {} because of {}", this, item,
788                                 ex.getMessage(), ex);
789                 success.set(false);
790             });
791
792         /* Stop Policy Engine owned (unmanaged) sources */
793         attempt(success, this.sources,
794             TopicSource::stop,
795             (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item,
796                             ex.getMessage(), ex));
797
798         /* Stop Policy Engine owned (unmanaged) sinks */
799         attempt(success, this.sinks,
800             TopicSink::stop,
801             (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item,
802                             ex.getMessage(), ex));
803
804         /* stop all managed topics sources and sinks */
805         if (!getTopicEndpointManager().stop()) {
806             success.set(false);
807         }
808
809         /* stop all managed and unmanaged http servers */
810         attempt(success,
811             Stream.concat(getServletFactory().inventory().stream(), this.httpServers.stream())
812                     .collect(Collectors.toList()),
813             HttpServletServer::stop,
814             (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
815                 ex.getMessage(), ex));
816
817         /* stop all managed http clients */
818         attempt(success,
819             getHttpClientFactory().inventory(),
820             HttpClient::stop,
821             (item, ex) -> logger.error("{}: cannot stop http-client {} because of {}", this, item,
822                 ex.getMessage(), ex));
823
824         try {
825             success.compareAndSet(true, this.lockManager.stop());
826         } catch (final RuntimeException e) {
827             logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e);
828             success.set(false);
829         }
830
831         // stop JMX?
832
833         /* policy-engine dispatch post stop hook */
834         FeatureApiUtils.apply(getEngineProviders(),
835             feature -> feature.afterStop(this),
836             (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
837                             feature.getClass().getName(), ex.getMessage(), ex));
838
839         return success.get();
840     }
841
842     @Override
843     public synchronized void shutdown() {
844
845         /*
846          * shutdown activity even when underlying subcomponents (features, controllers, topics, etc
847          * ..) are stuck
848          */
849
850         var exitThread = makeShutdownThread();
851         exitThread.start();
852
853         /* policy-engine dispatch pre shutdown hook */
854         if (FeatureApiUtils.apply(getEngineProviders(),
855             feature -> feature.beforeShutdown(this),
856             (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
857                             feature.getClass().getName(), ex.getMessage(), ex))) {
858             return;
859         }
860
861         this.alive = false;
862
863         /* Shutdown Policy Engine owned (unmanaged) sources */
864         applyAll(this.sources,
865             TopicSource::shutdown,
866             (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item,
867                             ex.getMessage(), ex));
868
869         /* Shutdown Policy Engine owned (unmanaged) sinks */
870         applyAll(this.sinks,
871             TopicSink::shutdown,
872             (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item,
873                             ex.getMessage(), ex));
874
875         /* Shutdown managed resources */
876         getControllerFactory().shutdown();
877         getTopicEndpointManager().shutdown();
878         getServletFactory().destroy();
879         getHttpClientFactory().destroy();
880
881         try {
882             this.lockManager.shutdown();
883         } catch (final RuntimeException e) {
884             logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e);
885         }
886
887         executorService.shutdownNow();
888
889         // Stop the JMX listener
890
891         stopPdpJmxListener();
892
893         /* policy-engine dispatch post shutdown hook */
894         FeatureApiUtils.apply(getEngineProviders(),
895             feature -> feature.afterShutdown(this),
896             (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
897                             feature.getClass().getName(), ex.getMessage(), ex));
898
899         exitThread.interrupt();
900         logger.info("{}: normal termination", this);
901     }
902
903     private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred,
904                     BiConsumer<T, Exception> handleEx) {
905
906         for (T item : items) {
907             try {
908                 if (!pred.test(item)) {
909                     success.set(false);
910                 }
911
912             } catch (InterruptedException ex) {
913                 handleEx.accept(item, ex);
914                 Thread.currentThread().interrupt();
915
916             } catch (RuntimeException ex) {
917                 handleEx.accept(item, ex);
918             }
919         }
920     }
921
922     private <T> void applyAll(List<T> items, Consumer<T> function,
923                     BiConsumer<T, Exception> handleEx) {
924
925         for (T item : items) {
926             try {
927                 function.accept(item);
928
929             } catch (RuntimeException ex) {
930                 handleEx.accept(item, ex);
931             }
932         }
933     }
934
935     /**
936      * Thread that shuts down http servers.
937      */
938     protected class ShutdownThread extends Thread {
939         private static final long SHUTDOWN_MAX_GRACE_TIME = 30000L;
940
941         @Override
942         public void run() {
943             try {
944                 doSleep(SHUTDOWN_MAX_GRACE_TIME);
945                 logger.warn("{}: abnormal termination - shutdown graceful time period expiration",
946                         PolicyEngineManager.this);
947             } catch (final InterruptedException e) {
948                 synchronized (PolicyEngineManager.this) {
949                     /* courtesy to shutdown() to allow it to return */
950                     Thread.currentThread().interrupt();
951                 }
952                 logger.info("{}: finishing a graceful shutdown ", PolicyEngineManager.this, e);
953             } finally {
954                 /*
955                  * shut down the Policy Engine owned http servers as the very last thing
956                  */
957                 applyAll(PolicyEngineManager.this.getHttpServers(),
958                     HttpServletServer::shutdown,
959                     (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item,
960                                     ex.getMessage(), ex));
961
962                 logger.info("{}: exit", PolicyEngineManager.this);
963                 doExit(0);
964             }
965         }
966
967         // these may be overridden by junit tests
968
969         protected void doSleep(long sleepMs) throws InterruptedException {
970             Thread.sleep(sleepMs);
971         }
972
973         protected void doExit(int code) {
974             System.exit(code);
975         }
976     }
977
978     @Override
979     public synchronized boolean lock() {
980
981         /* policy-engine dispatch pre lock hook */
982         if (FeatureApiUtils.apply(getEngineProviders(),
983             feature -> feature.beforeLock(this),
984             (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
985                             feature.getClass().getName(), ex.getMessage(), ex))) {
986             return true;
987         }
988
989         if (this.locked) {
990             return true;
991         }
992
993         this.locked = true;
994
995         var success = true;
996         final List<PolicyController> controllers = getControllerFactory().inventory();
997         for (final PolicyController controller : controllers) {
998             try {
999                 success = controller.lock() && success;
1000             } catch (final Exception e) {
1001                 logger.error("{}: cannot lock policy-controller {} because of {}", this, controller, e.getMessage(), e);
1002                 success = false;
1003             }
1004         }
1005
1006         success = getTopicEndpointManager().lock() && success;
1007
1008         try {
1009             success = (this.lockManager == null || this.lockManager.lock()) && success;
1010         } catch (final RuntimeException e) {
1011             logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e);
1012             success = false;
1013         }
1014
1015         /* policy-engine dispatch post lock hook */
1016         FeatureApiUtils.apply(getEngineProviders(),
1017             feature -> feature.afterLock(this),
1018             (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
1019                             feature.getClass().getName(), ex.getMessage(), ex));
1020
1021         return success;
1022     }
1023
1024     @Override
1025     public synchronized boolean unlock() {
1026
1027         /* policy-engine dispatch pre unlock hook */
1028         if (FeatureApiUtils.apply(getEngineProviders(),
1029             feature -> feature.beforeUnlock(this),
1030             (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
1031                             feature.getClass().getName(), ex.getMessage(), ex))) {
1032             return true;
1033         }
1034
1035         if (!this.locked) {
1036             return true;
1037         }
1038
1039         this.locked = false;
1040
1041         boolean success;
1042
1043         try {
1044             success = this.lockManager == null || this.lockManager.unlock();
1045         } catch (final RuntimeException e) {
1046             logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e);
1047             success = false;
1048         }
1049
1050         final List<PolicyController> controllers = getControllerFactory().inventory();
1051         for (final PolicyController controller : controllers) {
1052             try {
1053                 success = controller.unlock() && success;
1054             } catch (final Exception e) {
1055                 logger.error("{}: cannot unlock policy-controller {} because of {}", this, controller, e.getMessage(),
1056                         e);
1057                 success = false;
1058             }
1059         }
1060
1061         success = getTopicEndpointManager().unlock() && success;
1062
1063         /* policy-engine dispatch after unlock hook */
1064         FeatureApiUtils.apply(getEngineProviders(),
1065             feature -> feature.afterUnlock(this),
1066             (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
1067                             feature.getClass().getName(), ex.getMessage(), ex));
1068
1069         return success;
1070     }
1071
1072     @Override
1073     public void removePolicyController(String name) {
1074         getControllerFactory().destroy(name);
1075     }
1076
1077     @Override
1078     public void removePolicyController(PolicyController controller) {
1079         getControllerFactory().destroy(controller);
1080     }
1081
1082     @GsonJsonIgnore
1083     @Override
1084     public List<PolicyController> getPolicyControllers() {
1085         return getControllerFactory().inventory();
1086     }
1087
1088     @GsonJsonProperty("controllers")
1089     @Override
1090     public List<String> getPolicyControllerIds() {
1091         final List<String> controllerNames = new ArrayList<>();
1092         for (final PolicyController controller : getControllerFactory().inventory()) {
1093             controllerNames.add(controller.getName());
1094         }
1095         return controllerNames;
1096     }
1097
1098     @Override
1099     @GsonJsonIgnore
1100     public Properties getProperties() {
1101         return this.properties;
1102     }
1103
1104     @Override
1105     public List<String> getFeatures() {
1106         final List<String> features = new ArrayList<>();
1107         for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
1108             features.add(feature.getName());
1109         }
1110         return features;
1111     }
1112
1113     @GsonJsonIgnore
1114     @Override
1115     public List<PolicyEngineFeatureApi> getFeatureProviders() {
1116         return getEngineProviders();
1117     }
1118
1119     @Override
1120     public PolicyEngineFeatureApi getFeatureProvider(String featureName) {
1121         if (featureName == null || featureName.isEmpty()) {
1122             throw new IllegalArgumentException("A feature name must be provided");
1123         }
1124
1125         for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
1126             if (feature.getName().equals(featureName)) {
1127                 return feature;
1128             }
1129         }
1130
1131         throw new IllegalArgumentException("Invalid Feature Name: " + featureName);
1132     }
1133
1134     @Override
1135     public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
1136         /* policy-engine pre topic event hook */
1137         if (FeatureApiUtils.apply(getFeatureProviders(),
1138             feature -> feature.beforeOnTopicEvent(this, commType, topic, event),
1139             (feature, ex) -> logger.error(
1140                             "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
1141                             feature.getClass().getName(), event, ex.getMessage(), ex))) {
1142             return;
1143         }
1144
1145         /* configuration request */
1146         PdpdConfiguration configuration = null;
1147         try {
1148             configuration = this.decoder.fromJson(event, PdpdConfiguration.class);
1149             this.configure(configuration);
1150         } catch (final Exception e) {
1151             logger.error("{}: configuration-error due to {} because of {}", this, event, e.getMessage(), e);
1152         }
1153
1154         /* policy-engine after topic event hook */
1155         PdpdConfiguration configuration2 = configuration;
1156         FeatureApiUtils.apply(getFeatureProviders(),
1157             feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event),
1158             (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
1159                             feature.getClass().getName(), event, ex.getMessage(), ex));
1160     }
1161
1162     @Override
1163     public boolean deliver(String topic, Object event) {
1164
1165         /*
1166          * Note this entry point is usually from the DRL
1167          */
1168
1169         if (topic == null || topic.isEmpty()) {
1170             throw new IllegalArgumentException(INVALID_TOPIC_MSG);
1171         }
1172
1173         if (event == null) {
1174             throw new IllegalArgumentException(INVALID_EVENT_MSG);
1175         }
1176
1177         if (!this.isAlive()) {
1178             throw new IllegalStateException(ENGINE_STOPPED_MSG);
1179         }
1180
1181         if (this.isLocked()) {
1182             throw new IllegalStateException(ENGINE_LOCKED_MSG);
1183         }
1184
1185         final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
1186         if (topicSinks == null || topicSinks.size() != 1) {
1187             throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
1188         }
1189
1190         return this.deliver(topicSinks.get(0).getTopicCommInfrastructure(), topic, event);
1191     }
1192
1193     @Override
1194     public boolean deliver(String busType, String topic, Object event) {
1195
1196         /*
1197          * Note this entry point is usually from the DRL (one of the reasons busType is String.
1198          */
1199
1200         if (StringUtils.isBlank(busType)) {
1201             throw new IllegalArgumentException("Invalid Communication Infrastructure");
1202         }
1203
1204         if (StringUtils.isBlank(topic)) {
1205             throw new IllegalArgumentException(INVALID_TOPIC_MSG);
1206         }
1207
1208         if (event == null) {
1209             throw new IllegalArgumentException(INVALID_EVENT_MSG);
1210         }
1211
1212         boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name)
1213                         .anyMatch(name -> name.equals(busType));
1214
1215         if (!valid) {
1216             throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType);
1217         }
1218
1219
1220         if (!this.isAlive()) {
1221             throw new IllegalStateException(ENGINE_STOPPED_MSG);
1222         }
1223
1224         if (this.isLocked()) {
1225             throw new IllegalStateException(ENGINE_LOCKED_MSG);
1226         }
1227
1228
1229         return this.deliver(Topic.CommInfrastructure.valueOf(busType), topic, event);
1230     }
1231
1232     @Override
1233     public boolean deliver(Topic.CommInfrastructure busType, String topic, Object event) {
1234
1235         if (topic == null || topic.isEmpty()) {
1236             throw new IllegalArgumentException(INVALID_TOPIC_MSG);
1237         }
1238
1239         if (event == null) {
1240             throw new IllegalArgumentException(INVALID_EVENT_MSG);
1241         }
1242
1243         if (!this.isAlive()) {
1244             throw new IllegalStateException(ENGINE_STOPPED_MSG);
1245         }
1246
1247         if (this.isLocked()) {
1248             throw new IllegalStateException(ENGINE_LOCKED_MSG);
1249         }
1250
1251         /*
1252          * Try to send through the controller, this is the preferred way, since it may want to apply
1253          * additional processing
1254          */
1255         try {
1256             var droolsController = getProtocolCoder().getDroolsController(topic, event);
1257             final PolicyController controller = getControllerFactory().get(droolsController);
1258             if (controller != null) {
1259                 return controller.deliver(busType, topic, event);
1260             }
1261         } catch (final Exception e) {
1262             logger.warn("{}: cannot find policy-controller to deliver {} over {}:{} because of {}", this, event,
1263                     busType, topic, e.getMessage(), e);
1264
1265             /* continue (try without routing through the controller) */
1266         }
1267
1268         /*
1269          * cannot route through the controller, send directly through the topic sink
1270          */
1271         try {
1272             final String json = getProtocolCoder().encode(topic, event);
1273             return this.deliver(busType, topic, json);
1274
1275         } catch (final Exception e) {
1276             logger.warn("{}: cannot deliver {} over {}:{}", this, event, busType, topic);
1277             throw e;
1278         }
1279     }
1280
1281     @Override
1282     public boolean deliver(Topic.CommInfrastructure busType, String topic, String event) {
1283
1284         if (topic == null || topic.isEmpty()) {
1285             throw new IllegalArgumentException(INVALID_TOPIC_MSG);
1286         }
1287
1288         if (event == null || event.isEmpty()) {
1289             throw new IllegalArgumentException(INVALID_EVENT_MSG);
1290         }
1291
1292         if (!this.isAlive()) {
1293             throw new IllegalStateException(ENGINE_STOPPED_MSG);
1294         }
1295
1296         if (this.isLocked()) {
1297             throw new IllegalStateException(ENGINE_LOCKED_MSG);
1298         }
1299
1300         try {
1301             var sink = getTopicEndpointManager().getTopicSink(busType, topic);
1302
1303             if (sink == null) {
1304                 throw new IllegalStateException("Inconsistent State: " + this);
1305             }
1306
1307             return sink.send(event);
1308
1309         } catch (final Exception e) {
1310             logger.warn("{}: cannot deliver {} over {}:{}", this, event, busType, topic);
1311             throw e;
1312         }
1313     }
1314
1315     @Override
1316     public synchronized void activate() {
1317
1318         /* policy-engine dispatch pre activate hook */
1319         if (FeatureApiUtils.apply(getEngineProviders(),
1320             feature -> feature.beforeActivate(this),
1321             (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this,
1322                             feature.getClass().getName(), ex.getMessage(), ex))) {
1323             return;
1324         }
1325
1326         // activate 'policy-management'
1327         for (final PolicyController policyController : this.getPolicyControllers()) {
1328             try {
1329                 policyController.unlock();
1330                 policyController.start();
1331             } catch (final Exception e) {
1332                 logger.error("{}: cannot activate of policy-controller {} because of {}", this, policyController,
1333                         e.getMessage(), e);
1334             } catch (final LinkageError e) {
1335                 logger.error("{}: cannot activate (rules compilation) of policy-controller {} because of {}", this,
1336                         policyController, e.getMessage(), e);
1337             }
1338         }
1339
1340         this.unlock();
1341
1342         /* policy-engine dispatch post activate hook */
1343         FeatureApiUtils.apply(getEngineProviders(),
1344             feature -> feature.afterActivate(this),
1345             (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this,
1346                             feature.getClass().getName(), ex.getMessage(), ex));
1347     }
1348
1349     @Override
1350     public synchronized void deactivate() {
1351
1352         /* policy-engine dispatch pre deactivate hook */
1353         if (FeatureApiUtils.apply(getEngineProviders(),
1354             feature -> feature.beforeDeactivate(this),
1355             (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this,
1356                             feature.getClass().getName(), ex.getMessage(), ex))) {
1357             return;
1358         }
1359
1360         this.lock();
1361
1362         for (final PolicyController policyController : this.getPolicyControllers()) {
1363             try {
1364                 policyController.stop();
1365             } catch (final Exception | LinkageError e) {
1366                 logger.error("{}: cannot deactivate (stop) policy-controller {} because of {}", this, policyController,
1367                         e.getMessage(), e);
1368             }
1369         }
1370
1371         /* policy-engine dispatch post deactivate hook */
1372         FeatureApiUtils.apply(getEngineProviders(),
1373             feature -> feature.afterDeactivate(this),
1374             (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this,
1375                             feature.getClass().getName(), ex.getMessage(), ex));
1376     }
1377
1378     @Override
1379     public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec,
1380                     @NonNull LockCallback callback, boolean waitForLock) {
1381
1382         if (holdSec < 0) {
1383             throw new IllegalArgumentException("holdSec is negative");
1384         }
1385
1386         if (lockManager == null) {
1387             throw new IllegalStateException("lock manager has not been initialized");
1388         }
1389
1390         return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock);
1391     }
1392
1393     private boolean controllerConfig(PdpdConfiguration config) {
1394         /* only this one supported for now */
1395         final List<ControllerConfiguration> configControllers = config.getControllers();
1396         if (configControllers == null || configControllers.isEmpty()) {
1397             logger.info("No controller configuration provided: {}", config);
1398             return false;
1399         }
1400
1401         final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers());
1402         return (policyControllers != null && !policyControllers.isEmpty()
1403                         && policyControllers.size() == configControllers.size());
1404     }
1405
1406     // these methods may be overridden by junit tests
1407
1408     protected List<PolicyEngineFeatureApi> getEngineProviders() {
1409         return PolicyEngineFeatureApiConstants.getProviders().getList();
1410     }
1411
1412     protected List<PolicyControllerFeatureApi> getControllerProviders() {
1413         return PolicyControllerFeatureApiConstants.getProviders().getList();
1414     }
1415
1416     protected void globalInitContainer(String[] cliArgs) {
1417         PolicyContainer.globalInit(cliArgs);
1418     }
1419
1420     protected TopicEndpoint getTopicEndpointManager() {
1421         return TopicEndpointManager.getManager();
1422     }
1423
1424     protected HttpServletServerFactory getServletFactory() {
1425         return HttpServletServerFactoryInstance.getServerFactory();
1426     }
1427
1428     protected HttpClientFactory getHttpClientFactory() {
1429         return HttpClientFactoryInstance.getClientFactory();
1430     }
1431
1432     protected PolicyControllerFactory getControllerFactory() {
1433         return PolicyControllerConstants.getFactory();
1434     }
1435
1436     protected void startPdpJmxListener() {
1437         PdpJmxListener.start();
1438     }
1439
1440     protected void stopPdpJmxListener() {
1441         PdpJmxListener.stop();
1442     }
1443
1444     protected Thread makeShutdownThread() {
1445         return new ShutdownThread();
1446     }
1447
1448     protected EventProtocolCoder getProtocolCoder() {
1449         return EventProtocolCoderConstants.getManager();
1450     }
1451
1452     protected SystemPersistence getPersistenceManager() {
1453         return SystemPersistenceConstants.getManager();
1454     }
1455
1456     protected PolicyEngine getPolicyEngine() {
1457         return PolicyEngineConstants.getManager();
1458     }
1459
1460     protected ScheduledExecutorService makeScheduledExecutor(int nthreads) {
1461         var exsvc = new ScheduledThreadPoolExecutor(nthreads);
1462         exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
1463         exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
1464         exsvc.setRemoveOnCancelPolicy(true);
1465
1466         return exsvc;
1467     }
1468 }