2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2024 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.system;
24 import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_HOST;
25 import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_NAME;
26 import static org.onap.policy.drools.system.PolicyEngineConstants.TELEMETRY_SERVER_DEFAULT_PORT;
28 import com.google.gson.Gson;
29 import com.google.gson.GsonBuilder;
30 import io.prometheus.client.Summary;
31 import java.util.ArrayList;
32 import java.util.List;
33 import java.util.Objects;
34 import java.util.Properties;
35 import java.util.UUID;
36 import java.util.concurrent.ScheduledExecutorService;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.atomic.AtomicReference;
39 import java.util.function.BiConsumer;
40 import java.util.function.Consumer;
41 import java.util.stream.Collectors;
42 import java.util.stream.Stream;
43 import lombok.AccessLevel;
45 import lombok.NonNull;
47 import lombok.Synchronized;
48 import lombok.ToString;
49 import org.apache.commons.lang3.StringUtils;
50 import org.onap.policy.common.endpoints.event.comm.Topic;
51 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
52 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
53 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
54 import org.onap.policy.common.endpoints.event.comm.TopicSink;
55 import org.onap.policy.common.endpoints.event.comm.TopicSource;
56 import org.onap.policy.common.endpoints.http.client.HttpClient;
57 import org.onap.policy.common.endpoints.http.client.HttpClientFactory;
58 import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance;
59 import org.onap.policy.common.endpoints.http.server.HttpServletServer;
60 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactory;
61 import org.onap.policy.common.endpoints.http.server.HttpServletServerFactoryInstance;
62 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
63 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
64 import org.onap.policy.common.gson.annotation.GsonJsonProperty;
65 import org.onap.policy.common.utils.logging.LoggerUtils;
66 import org.onap.policy.common.utils.network.NetworkUtil;
67 import org.onap.policy.common.utils.resources.PrometheusUtils;
68 import org.onap.policy.common.utils.services.FeatureApiUtils;
69 import org.onap.policy.drools.controller.DroolsControllerConstants;
70 import org.onap.policy.drools.core.PolicyContainer;
71 import org.onap.policy.drools.core.jmx.PdpJmxListener;
72 import org.onap.policy.drools.core.lock.Lock;
73 import org.onap.policy.drools.core.lock.LockCallback;
74 import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
75 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
76 import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants;
77 import org.onap.policy.drools.features.PolicyEngineFeatureApi;
78 import org.onap.policy.drools.features.PolicyEngineFeatureApiConstants;
79 import org.onap.policy.drools.metrics.Metric;
80 import org.onap.policy.drools.persistence.SystemPersistence;
81 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
82 import org.onap.policy.drools.policies.DomainMaker;
83 import org.onap.policy.drools.properties.DroolsPropertyConstants;
84 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
85 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
86 import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
87 import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
88 import org.onap.policy.drools.server.restful.RestManager;
89 import org.onap.policy.drools.stats.PolicyStatsManager;
90 import org.onap.policy.drools.system.internal.SimpleLockManager;
91 import org.onap.policy.drools.utils.PropertyUtil;
92 import org.onap.policy.drools.utils.logging.MdcTransaction;
93 import org.onap.policy.models.pdp.enums.PdpResponseStatus;
94 import org.slf4j.Logger;
95 import org.slf4j.LoggerFactory;
98 * Policy Engine Manager Implementation.
100 @ToString(onlyExplicitlyIncluded = true)
101 class PolicyEngineManager implements PolicyEngine {
105 private static final String INVALID_TOPIC_MSG = "Invalid Topic";
106 private static final String INVALID_EVENT_MSG = "Invalid Event";
108 private static final String ENGINE_STOPPED_MSG = "Policy Engine is stopped";
109 private static final String ENGINE_LOCKED_MSG = "Policy Engine is locked";
111 public static final String EXECUTOR_THREAD_PROP = "executor.threads";
112 protected static final int DEFAULT_EXECUTOR_THREADS = 5;
114 public static final String CLUSTER_NAME_PROP = "engine.cluster";
119 private static final Logger logger = LoggerFactory.getLogger(PolicyEngineManager.class);
122 * Is the Policy Engine running.
126 private volatile boolean alive = false;
129 * Is the engine locked.
133 private volatile boolean locked = false;
136 * Properties used to initialize the engine.
138 private Properties properties;
141 * Environment Properties.
143 private final Properties environment = new Properties();
146 * Policy Engine Sources.
149 private List<TopicSource> sources = new ArrayList<>();
152 * Policy Engine Sinks.
155 private List<TopicSink> sinks = new ArrayList<>();
158 * Policy Engine HTTP Servers.
161 private List<HttpServletServer> httpServers = new ArrayList<>();
164 * Thread pool used to execute background tasks.
166 private ScheduledExecutorService executorService = null;
169 * Lock manager used to create locks.
171 @Getter(AccessLevel.PROTECTED)
172 private PolicyResourceLockManager lockManager = null;
174 private final DomainMaker domainMaker = new DomainMaker();
177 private final PolicyStatsManager stats = new PolicyStatsManager();
179 @Getter(onMethod_ = {@Synchronized})
180 @Setter(onMethod_ = {@Synchronized})
181 private String clusterName = UUID.randomUUID().toString();
183 @Getter(onMethod_ = {@Synchronized})
184 @Setter(onMethod_ = {@Synchronized})
185 private String hostName = NetworkUtil.getHostname();
187 @Getter(onMethod_ = {@Synchronized})
188 @Setter(onMethod_ = {@Synchronized})
189 private String pdpName;
192 * gson parser to decode configuration requests.
194 private final Gson decoder = new GsonBuilder().disableHtmlEscaping().create();
196 protected static final String CONTROLLOOP_NAME_LABEL = "controlloop";
197 protected static final String CONTROLLER_LABEL = "controller";
198 protected static final String POLICY_LABEL = "policy";
200 protected static final Summary transLatencySecsSummary =
201 Summary.build().namespace(PrometheusUtils.PdpType.PDPD.getNamespace())
202 .name(PrometheusUtils.POLICY_EXECUTIONS_LATENCY_SECONDS_METRIC)
203 .labelNames(CONTROLLER_LABEL,
204 CONTROLLOOP_NAME_LABEL,
206 PrometheusUtils.STATUS_METRIC_LABEL)
207 .help(PrometheusUtils.POLICY_EXECUTIONS_LATENCY_SECONDS_HELP)
212 public synchronized void boot(String[] cliArgs) {
214 if (FeatureApiUtils.apply(getEngineProviders(),
215 feature -> feature.beforeBoot(this, cliArgs),
216 (feature, ex) -> logger.error("{}: feature {} before-boot failure because of {}", this,
217 feature.getClass().getName(), ex.getMessage(), ex))) {
222 globalInitContainer(cliArgs);
223 } catch (final Exception e) {
224 logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
227 FeatureApiUtils.apply(getEngineProviders(),
228 feature -> feature.afterBoot(this),
229 (feature, ex) -> logger.error("{}: feature {} after-boot failure because of {}", this,
230 feature.getClass().getName(), ex.getMessage(), ex));
234 public synchronized void setEnvironment(Properties properties) {
235 this.environment.putAll(PropertyUtil.getInterpolatedProperties(properties));
240 public synchronized Properties getEnvironment() {
241 return this.environment;
246 public DomainMaker getDomainMaker() {
247 return this.domainMaker;
251 public synchronized String getEnvironmentProperty(String envKey) {
252 String value = this.environment.getProperty(envKey);
254 value = System.getProperty(envKey);
256 value = System.getenv(envKey);
263 public synchronized String setEnvironmentProperty(String envKey, String envValue) {
264 return (String) this.environment.setProperty(envKey, envValue);
268 public final Properties defaultTelemetryConfig() {
269 final var defaultConfig = new Properties();
271 defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES, "TELEMETRY");
272 defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
273 + PolicyEndPointProperties.PROPERTY_HTTP_HOST_SUFFIX, TELEMETRY_SERVER_DEFAULT_HOST);
275 PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
276 + PolicyEndPointProperties.PROPERTY_HTTP_PORT_SUFFIX,
277 "" + TELEMETRY_SERVER_DEFAULT_PORT);
279 PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
280 + PolicyEndPointProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX,
281 RestManager.class.getPackage().getName());
282 defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
283 + PolicyEndPointProperties.PROPERTY_HTTP_SWAGGER_SUFFIX, "" + Boolean.TRUE);
284 defaultConfig.put(PolicyEndPointProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + TELEMETRY_SERVER_DEFAULT_NAME
285 + PolicyEndPointProperties.PROPERTY_MANAGED_SUFFIX, "" + Boolean.FALSE);
287 return defaultConfig;
291 public void metric(String controllerName, String policyName, Metric metric) {
292 // sub-operations are not being tracked
296 public void transaction(@NonNull String controllerName, // NOSONAR placeholder
297 @NonNull String controlLoopName, @NonNull Metric transaction) {
299 // keeping stats on a per control loop name,
300 // applications must report the controller name too
301 // for completeness and to avoid being modified when/if
302 // the controller name is used for tracking purposes
304 getStats().stat(controlLoopName, transaction);
306 Long elapsedTime = transaction.getElapsedTime();
307 String policyName = transaction.getServiceInstanceId();
308 if (Objects.isNull(elapsedTime) || StringUtils.isEmpty(policyName)) {
309 logger.warn("{} transaction in controller {} incomplete transaction object: {}",
310 controlLoopName, controllerName, transaction);
314 transLatencySecsSummary
315 .labels(controllerName,
318 transaction.isSuccess() ? PdpResponseStatus.SUCCESS.name() : PdpResponseStatus.FAIL.name())
319 .observe(transaction.getElapsedTime() / 1000D);
324 public ScheduledExecutorService getExecutorService() {
325 return executorService;
328 private ScheduledExecutorService makeExecutorService(Properties properties) {
329 int nthreads = DEFAULT_EXECUTOR_THREADS;
332 nthreads = Integer.parseInt(
333 properties.getProperty(EXECUTOR_THREAD_PROP, String.valueOf(DEFAULT_EXECUTOR_THREADS)));
335 } catch (NumberFormatException e) {
336 logger.error("invalid number for " + EXECUTOR_THREAD_PROP + " property", e);
339 return makeScheduledExecutor(nthreads);
342 private void createLockManager(Properties properties) {
343 for (PolicyEngineFeatureApi feature : getEngineProviders()) {
345 this.lockManager = feature.beforeCreateLockManager(this, properties);
346 if (this.lockManager != null) {
347 logger.info("overridden lock manager is {}", this.lockManager);
350 } catch (RuntimeException e) {
351 logger.error("{}: feature {} before-create-lock-manager failure because of {}", this,
352 feature.getClass().getName(), e.getMessage(), e);
357 this.lockManager = new SimpleLockManager(this, properties);
358 } catch (RuntimeException e) {
359 logger.error("{}: cannot create simple lock manager because of {}", this, e.getMessage(), e);
360 this.lockManager = new SimpleLockManager(this, new Properties());
363 logger.info("lock manager is {}", this.lockManager);
365 /* policy-engine dispatch post operation hook */
366 FeatureApiUtils.apply(getEngineProviders(),
367 feature -> feature.afterCreateLockManager(this, properties, this.lockManager),
368 (feature, ex) -> logger.error("{}: feature {} after-create-lock-manager failure because of {}",
369 this, feature.getClass().getName(), ex.getMessage(), ex));
373 public synchronized void configure(Properties properties) {
375 if (properties == null) {
376 logger.warn("No properties provided");
377 throw new IllegalArgumentException("No properties provided");
380 /* policy-engine dispatch pre configure hook */
381 if (FeatureApiUtils.apply(getEngineProviders(),
382 feature -> feature.beforeConfigure(this, properties),
383 (feature, ex) -> logger.error("{}: feature {} before-configure failure because of {}", this,
384 feature.getClass().getName(), ex.getMessage(), ex))) {
388 this.properties = properties;
389 if (!StringUtils.isBlank(this.properties.getProperty(CLUSTER_NAME_PROP))) {
390 this.clusterName = this.properties.getProperty(CLUSTER_NAME_PROP, this.clusterName);
392 this.pdpName = hostName + "." + this.clusterName;
395 this.sources = getTopicEndpointManager().addTopicSources(properties);
396 for (final TopicSource source : this.sources) {
397 source.register(this);
399 } catch (final Exception e) {
400 logger.error("{}: add-sources failed", this, e);
404 this.sinks = getTopicEndpointManager().addTopicSinks(properties);
405 } catch (final IllegalArgumentException e) {
406 logger.error("{}: add-sinks failed", this, e);
410 this.httpServers = getServletFactory().build(properties);
411 } catch (final IllegalArgumentException e) {
412 logger.error("{}: add-http-servers failed", this, e);
415 executorService = makeExecutorService(properties);
417 createLockManager(properties);
419 /* policy-engine dispatch post configure hook */
420 FeatureApiUtils.apply(getEngineProviders(),
421 feature -> feature.afterConfigure(this),
422 (feature, ex) -> logger.error("{}: feature {} after-configure failure because of {}", this,
423 feature.getClass().getName(), ex.getMessage(), ex));
427 public boolean configure(PdpdConfiguration config) {
429 if (config == null) {
430 throw new IllegalArgumentException("No configuration provided");
433 final String entity = config.getEntity();
435 var mdcTrans = MdcTransaction.newTransaction(config.getRequestId(), "brmsgw");
436 if (this.getSources().size() == 1) {
437 Topic topic = this.getSources().get(0);
438 mdcTrans.setServiceName(topic.getTopic()).setRemoteHost(topic.getServers().toString())
439 .setTargetEntity(config.getEntity());
443 if (PdpdConfiguration.CONFIG_ENTITY_CONTROLLER.equals(entity)) {
444 boolean success = controllerConfig(config);
445 mdcTrans.resetSubTransaction().setStatusCode(success).transaction();
449 final String msg = "Configuration Entity is not supported: " + entity;
450 mdcTrans.resetSubTransaction().setStatusCode(false).setResponseDescription(msg).flush();
451 logger.warn(LoggerUtils.TRANSACTION_LOG_MARKER, msg);
452 throw new IllegalArgumentException(msg);
457 public synchronized PolicyController createPolicyController(String name, Properties properties) {
458 String tempName = name;
459 // check if a PROPERTY_CONTROLLER_NAME property is present
460 // if so, override the given name
462 final String propertyControllerName = properties.getProperty(DroolsPropertyConstants.PROPERTY_CONTROLLER_NAME);
463 if (propertyControllerName != null && !propertyControllerName.isEmpty()) {
464 if (!propertyControllerName.equals(tempName)) {
465 throw new IllegalStateException("Proposed name (" + tempName + ") and properties name ("
466 + propertyControllerName + ") don't match");
468 tempName = propertyControllerName;
471 PolicyController controller;
472 for (final PolicyControllerFeatureApi controllerFeature : getControllerProviders()) {
474 controller = controllerFeature.beforeCreate(tempName, properties);
475 if (controller != null) {
478 } catch (final Exception e) {
479 logger.error("{}: feature {} before-controller-create failure because of {}", this,
480 controllerFeature.getClass().getName(), e.getMessage(), e);
484 controller = getControllerFactory().build(tempName, properties);
485 if (this.isLocked()) {
490 PolicyController controller2 = controller;
491 FeatureApiUtils.apply(getControllerProviders(),
492 feature -> feature.afterCreate(controller2),
493 (feature, ex) -> logger.error("{}: feature {} after-controller-create failure because of {}",
494 this, feature.getClass().getName(), ex.getMessage(), ex));
501 public List<PolicyController> updatePolicyControllers(List<ControllerConfiguration> configControllers) {
503 final List<PolicyController> policyControllers = new ArrayList<>();
504 if (configControllers == null || configControllers.isEmpty()) {
505 logger.info("No controller configuration provided: {}", configControllers);
506 return policyControllers;
509 for (final ControllerConfiguration configController : configControllers) {
510 MdcTransaction mdcTrans = MdcTransaction.newSubTransaction(null).setTargetEntity(configController.getName())
511 .setTargetServiceName(configController.getOperation())
512 .setTargetVirtualEntity("" + configController.getDrools());
514 var policyController = this.updatePolicyController(configController);
515 policyControllers.add(policyController);
516 mdcTrans.setStatusCode(true).transaction();
517 } catch (final Exception e) {
518 mdcTrans.setStatusCode(false).setResponseCode(e.getClass().getName())
519 .setResponseDescription(e.getMessage()).flush();
520 logger.error(LoggerUtils.TRANSACTION_LOG_MARKER,
521 "{}: cannot update-policy-controllers because of {}", this, e.getMessage(), e);
525 return policyControllers;
529 public synchronized PolicyController updatePolicyController(ControllerConfiguration configController) {
531 if (configController == null) {
532 throw new IllegalArgumentException("No controller configuration has been provided");
535 final String controllerName = configController.getName();
536 if (controllerName == null || controllerName.isEmpty()) {
537 logger.warn("controller-name must be provided");
538 throw new IllegalArgumentException("No controller configuration has been provided");
542 final String operation = configController.getOperation();
543 if (operation == null || operation.isEmpty()) {
544 logger.warn("operation must be provided");
545 throw new IllegalArgumentException("operation must be provided");
548 var policyController = getController(controllerName);
549 if (policyController == null) {
550 policyController = findController(controllerName, operation);
552 /* fall through to do brain update operation */
555 updateController(controllerName, policyController, operation, configController);
557 return policyController;
558 } catch (final Exception e) {
559 logger.error("{}: cannot update-policy-controller", this);
561 } catch (final LinkageError e) {
562 logger.error("{}: cannot update-policy-controllers (rules)", this);
563 throw new IllegalStateException(e);
567 private PolicyController getController(final String controllerName) {
568 PolicyController policyController = null;
570 policyController = getControllerFactory().get(controllerName);
571 } catch (final IllegalArgumentException e) {
573 logger.warn("Policy Controller {} not found", controllerName, e);
575 return policyController;
578 private PolicyController findController(final String controllerName, final String operation) {
579 if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK)
580 || operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
581 throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
586 logger.warn("controller {} does not exist. Attempting recovery from disk", controllerName);
588 var controllerProperties =
589 getPersistenceManager().getControllerProperties(controllerName);
592 * returned properties cannot be null (per implementation) assert (properties !=
596 if (controllerProperties == null) {
597 throw new IllegalArgumentException(controllerName + " is invalid");
600 logger.warn("controller being recovered. {} Reset controller's bad maven coordinates to brainless",
604 * try to bring up bad controller in brainless mode, after having it
605 * working, apply the new create/update operation.
607 controllerProperties.setProperty(DroolsPropertyConstants.RULES_GROUPID,
608 DroolsControllerConstants.NO_GROUP_ID);
609 controllerProperties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
610 DroolsControllerConstants.NO_ARTIFACT_ID);
611 controllerProperties.setProperty(DroolsPropertyConstants.RULES_VERSION,
612 DroolsControllerConstants.NO_VERSION);
614 return getPolicyEngine().createPolicyController(controllerName, controllerProperties);
617 private void updateController(final String controllerName, PolicyController policyController,
618 final String operation, ControllerConfiguration configController) {
620 case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
621 getControllerFactory().patch(policyController, configController.getDrools());
623 case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
624 policyController.unlock();
625 getControllerFactory().patch(policyController, configController.getDrools());
627 case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
628 policyController.lock();
630 case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
631 policyController.unlock();
634 final String msg = "Controller Operation Configuration is not supported: " + operation + " for "
637 throw new IllegalArgumentException(msg);
642 public synchronized boolean start() {
644 /* policy-engine dispatch pre start hook */
645 if (FeatureApiUtils.apply(getEngineProviders(),
646 feature -> feature.beforeStart(this),
647 (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
648 feature.getClass().getName(), ex.getMessage(), ex))) {
653 throw new IllegalStateException(ENGINE_LOCKED_MSG);
658 AtomicReference<Boolean> success = new AtomicReference<>(true);
661 success.compareAndSet(true, this.lockManager.start());
662 } catch (final RuntimeException e) {
663 logger.warn("{}: cannot start lock manager because of {}", this, e.getMessage(), e);
667 /* Start managed and unmanaged http servers */
670 Stream.concat(getServletFactory().inventory().stream(), this.httpServers.stream())
671 .collect(Collectors.toList()),
672 httpServer -> httpServer.waitedStart(10 * 1000L),
673 (item, ex) -> logger.error("{}: cannot start http-server {} because of {}", this, item,
674 ex.getMessage(), ex));
676 /* Start managed Http Clients */
678 attempt(success, getHttpClientFactory().inventory(),
680 (item, ex) -> logger.error("{}: cannot start http-client {} because of {}",
681 this, item, ex.getMessage(), ex));
683 /* Start Policy Controllers */
685 attempt(success, getControllerFactory().inventory(),
686 PolicyController::start,
688 logger.error("{}: cannot start policy-controller {} because of {}", this, item,
689 ex.getMessage(), ex);
693 /* Start managed Topic Endpoints */
696 if (!getTopicEndpointManager().start()) {
699 } catch (final IllegalStateException e) {
700 logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
703 // Start the JMX listener
705 startPdpJmxListener();
707 /* policy-engine dispatch after start hook */
708 FeatureApiUtils.apply(getEngineProviders(),
709 feature -> feature.afterStart(this),
710 (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
711 feature.getClass().getName(), ex.getMessage(), ex));
713 return success.get();
717 public synchronized boolean open() {
720 if (FeatureApiUtils.apply(getEngineProviders(),
721 feature -> feature.beforeOpen(this),
722 (feature, ex) -> logger.error("{}: feature {} before-open failure because of {}", this,
723 feature.getClass().getName(), ex.getMessage(), ex))) {
728 throw new IllegalStateException(ENGINE_LOCKED_MSG);
732 throw new IllegalStateException(ENGINE_STOPPED_MSG);
735 AtomicReference<Boolean> success = new AtomicReference<>(true);
737 /* Open the unmanaged topics to external components for configuration purposes */
739 attempt(success, this.sources,
741 (item, ex) -> logger.error("{}: cannot start topic-source {} because of {}",
742 this, item, ex.getMessage(), ex));
744 attempt(success, this.sinks,
746 (item, ex) -> logger.error("{}: cannot start topic-sink {} because of {}",
747 this, item, ex.getMessage(), ex));
750 FeatureApiUtils.apply(getEngineProviders(),
751 feature -> feature.afterOpen(this),
752 (feature, ex) -> logger.error("{}: feature {} after-open failure because of {}", this,
753 feature.getClass().getName(), ex.getMessage(), ex));
755 return success.get();
759 private interface PredicateWithEx<T> {
760 boolean test(T value) throws InterruptedException;
764 public synchronized boolean stop() {
766 /* policy-engine dispatch pre stop hook */
767 if (FeatureApiUtils.apply(getEngineProviders(),
768 feature -> feature.beforeStop(this),
769 (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
770 feature.getClass().getName(), ex.getMessage(), ex))) {
774 /* stop regardless of the lock state */
782 AtomicReference<Boolean> success = new AtomicReference<>(true);
784 attempt(success, getControllerFactory().inventory(),
785 PolicyController::stop,
787 logger.error("{}: cannot stop policy-controller {} because of {}", this, item,
788 ex.getMessage(), ex);
792 /* Stop Policy Engine owned (unmanaged) sources */
793 attempt(success, this.sources,
795 (item, ex) -> logger.error("{}: cannot stop topic-source {} because of {}", this, item,
796 ex.getMessage(), ex));
798 /* Stop Policy Engine owned (unmanaged) sinks */
799 attempt(success, this.sinks,
801 (item, ex) -> logger.error("{}: cannot stop topic-sink {} because of {}", this, item,
802 ex.getMessage(), ex));
804 /* stop all managed topics sources and sinks */
805 if (!getTopicEndpointManager().stop()) {
809 /* stop all managed and unmanaged http servers */
811 Stream.concat(getServletFactory().inventory().stream(), this.httpServers.stream())
812 .collect(Collectors.toList()),
813 HttpServletServer::stop,
814 (item, ex) -> logger.error("{}: cannot stop http-server {} because of {}", this, item,
815 ex.getMessage(), ex));
817 /* stop all managed http clients */
819 getHttpClientFactory().inventory(),
821 (item, ex) -> logger.error("{}: cannot stop http-client {} because of {}", this, item,
822 ex.getMessage(), ex));
825 success.compareAndSet(true, this.lockManager.stop());
826 } catch (final RuntimeException e) {
827 logger.warn("{}: cannot stop lock manager because of {}", this, e.getMessage(), e);
833 /* policy-engine dispatch post stop hook */
834 FeatureApiUtils.apply(getEngineProviders(),
835 feature -> feature.afterStop(this),
836 (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
837 feature.getClass().getName(), ex.getMessage(), ex));
839 return success.get();
843 public synchronized void shutdown() {
846 * shutdown activity even when underlying subcomponents (features, controllers, topics, etc
850 var exitThread = makeShutdownThread();
853 /* policy-engine dispatch pre shutdown hook */
854 if (FeatureApiUtils.apply(getEngineProviders(),
855 feature -> feature.beforeShutdown(this),
856 (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
857 feature.getClass().getName(), ex.getMessage(), ex))) {
863 /* Shutdown Policy Engine owned (unmanaged) sources */
864 applyAll(this.sources,
865 TopicSource::shutdown,
866 (item, ex) -> logger.error("{}: cannot shutdown topic-source {} because of {}", this, item,
867 ex.getMessage(), ex));
869 /* Shutdown Policy Engine owned (unmanaged) sinks */
872 (item, ex) -> logger.error("{}: cannot shutdown topic-sink {} because of {}", this, item,
873 ex.getMessage(), ex));
875 /* Shutdown managed resources */
876 getControllerFactory().shutdown();
877 getTopicEndpointManager().shutdown();
878 getServletFactory().destroy();
879 getHttpClientFactory().destroy();
882 this.lockManager.shutdown();
883 } catch (final RuntimeException e) {
884 logger.warn("{}: cannot shutdown lock manager because of {}", this, e.getMessage(), e);
887 executorService.shutdownNow();
889 // Stop the JMX listener
891 stopPdpJmxListener();
893 /* policy-engine dispatch post shutdown hook */
894 FeatureApiUtils.apply(getEngineProviders(),
895 feature -> feature.afterShutdown(this),
896 (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
897 feature.getClass().getName(), ex.getMessage(), ex));
899 exitThread.interrupt();
900 logger.info("{}: normal termination", this);
903 private <T> void attempt(AtomicReference<Boolean> success, List<T> items, PredicateWithEx<T> pred,
904 BiConsumer<T, Exception> handleEx) {
906 for (T item : items) {
908 if (!pred.test(item)) {
912 } catch (InterruptedException ex) {
913 handleEx.accept(item, ex);
914 Thread.currentThread().interrupt();
916 } catch (RuntimeException ex) {
917 handleEx.accept(item, ex);
922 private <T> void applyAll(List<T> items, Consumer<T> function,
923 BiConsumer<T, Exception> handleEx) {
925 for (T item : items) {
927 function.accept(item);
929 } catch (RuntimeException ex) {
930 handleEx.accept(item, ex);
936 * Thread that shuts down http servers.
938 protected class ShutdownThread extends Thread {
939 private static final long SHUTDOWN_MAX_GRACE_TIME = 30000L;
944 doSleep(SHUTDOWN_MAX_GRACE_TIME);
945 logger.warn("{}: abnormal termination - shutdown graceful time period expiration",
946 PolicyEngineManager.this);
947 } catch (final InterruptedException e) {
948 synchronized (PolicyEngineManager.this) {
949 /* courtesy to shutdown() to allow it to return */
950 Thread.currentThread().interrupt();
952 logger.info("{}: finishing a graceful shutdown ", PolicyEngineManager.this, e);
955 * shut down the Policy Engine owned http servers as the very last thing
957 applyAll(PolicyEngineManager.this.getHttpServers(),
958 HttpServletServer::shutdown,
959 (item, ex) -> logger.error("{}: cannot shutdown http-server {} because of {}", this, item,
960 ex.getMessage(), ex));
962 logger.info("{}: exit", PolicyEngineManager.this);
967 // these may be overridden by junit tests
969 protected void doSleep(long sleepMs) throws InterruptedException {
970 Thread.sleep(sleepMs);
973 protected void doExit(int code) {
979 public synchronized boolean lock() {
981 /* policy-engine dispatch pre lock hook */
982 if (FeatureApiUtils.apply(getEngineProviders(),
983 feature -> feature.beforeLock(this),
984 (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
985 feature.getClass().getName(), ex.getMessage(), ex))) {
996 final List<PolicyController> controllers = getControllerFactory().inventory();
997 for (final PolicyController controller : controllers) {
999 success = controller.lock() && success;
1000 } catch (final Exception e) {
1001 logger.error("{}: cannot lock policy-controller {} because of {}", this, controller, e.getMessage(), e);
1006 success = getTopicEndpointManager().lock() && success;
1009 success = (this.lockManager == null || this.lockManager.lock()) && success;
1010 } catch (final RuntimeException e) {
1011 logger.warn("{}: cannot lock() lock manager because of {}", this, e.getMessage(), e);
1015 /* policy-engine dispatch post lock hook */
1016 FeatureApiUtils.apply(getEngineProviders(),
1017 feature -> feature.afterLock(this),
1018 (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
1019 feature.getClass().getName(), ex.getMessage(), ex));
1025 public synchronized boolean unlock() {
1027 /* policy-engine dispatch pre unlock hook */
1028 if (FeatureApiUtils.apply(getEngineProviders(),
1029 feature -> feature.beforeUnlock(this),
1030 (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
1031 feature.getClass().getName(), ex.getMessage(), ex))) {
1039 this.locked = false;
1044 success = this.lockManager == null || this.lockManager.unlock();
1045 } catch (final RuntimeException e) {
1046 logger.warn("{}: cannot unlock() lock manager because of {}", this, e.getMessage(), e);
1050 final List<PolicyController> controllers = getControllerFactory().inventory();
1051 for (final PolicyController controller : controllers) {
1053 success = controller.unlock() && success;
1054 } catch (final Exception e) {
1055 logger.error("{}: cannot unlock policy-controller {} because of {}", this, controller, e.getMessage(),
1061 success = getTopicEndpointManager().unlock() && success;
1063 /* policy-engine dispatch after unlock hook */
1064 FeatureApiUtils.apply(getEngineProviders(),
1065 feature -> feature.afterUnlock(this),
1066 (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
1067 feature.getClass().getName(), ex.getMessage(), ex));
1073 public void removePolicyController(String name) {
1074 getControllerFactory().destroy(name);
1078 public void removePolicyController(PolicyController controller) {
1079 getControllerFactory().destroy(controller);
1084 public List<PolicyController> getPolicyControllers() {
1085 return getControllerFactory().inventory();
1088 @GsonJsonProperty("controllers")
1090 public List<String> getPolicyControllerIds() {
1091 final List<String> controllerNames = new ArrayList<>();
1092 for (final PolicyController controller : getControllerFactory().inventory()) {
1093 controllerNames.add(controller.getName());
1095 return controllerNames;
1100 public Properties getProperties() {
1101 return this.properties;
1105 public List<String> getFeatures() {
1106 final List<String> features = new ArrayList<>();
1107 for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
1108 features.add(feature.getName());
1115 public List<PolicyEngineFeatureApi> getFeatureProviders() {
1116 return getEngineProviders();
1120 public PolicyEngineFeatureApi getFeatureProvider(String featureName) {
1121 if (featureName == null || featureName.isEmpty()) {
1122 throw new IllegalArgumentException("A feature name must be provided");
1125 for (final PolicyEngineFeatureApi feature : getEngineProviders()) {
1126 if (feature.getName().equals(featureName)) {
1131 throw new IllegalArgumentException("Invalid Feature Name: " + featureName);
1135 public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
1136 /* policy-engine pre topic event hook */
1137 if (FeatureApiUtils.apply(getFeatureProviders(),
1138 feature -> feature.beforeOnTopicEvent(this, commType, topic, event),
1139 (feature, ex) -> logger.error(
1140 "{}: feature {} beforeOnTopicEvent failure on event {} because of {}", this,
1141 feature.getClass().getName(), event, ex.getMessage(), ex))) {
1145 /* configuration request */
1146 PdpdConfiguration configuration = null;
1148 configuration = this.decoder.fromJson(event, PdpdConfiguration.class);
1149 this.configure(configuration);
1150 } catch (final Exception e) {
1151 logger.error("{}: configuration-error due to {} because of {}", this, event, e.getMessage(), e);
1154 /* policy-engine after topic event hook */
1155 PdpdConfiguration configuration2 = configuration;
1156 FeatureApiUtils.apply(getFeatureProviders(),
1157 feature -> feature.afterOnTopicEvent(this, configuration2, commType, topic, event),
1158 (feature, ex) -> logger.error("{}: feature {} afterOnTopicEvent failure on event {} because of {}", this,
1159 feature.getClass().getName(), event, ex.getMessage(), ex));
1163 public boolean deliver(String topic, Object event) {
1166 * Note this entry point is usually from the DRL
1169 if (topic == null || topic.isEmpty()) {
1170 throw new IllegalArgumentException(INVALID_TOPIC_MSG);
1173 if (event == null) {
1174 throw new IllegalArgumentException(INVALID_EVENT_MSG);
1177 if (!this.isAlive()) {
1178 throw new IllegalStateException(ENGINE_STOPPED_MSG);
1181 if (this.isLocked()) {
1182 throw new IllegalStateException(ENGINE_LOCKED_MSG);
1185 final List<TopicSink> topicSinks = getTopicEndpointManager().getTopicSinks(topic);
1186 if (topicSinks == null || topicSinks.size() != 1) {
1187 throw new IllegalStateException("Cannot ensure correct delivery on topic " + topic + ": " + topicSinks);
1190 return this.deliver(topicSinks.get(0).getTopicCommInfrastructure(), topic, event);
1194 public boolean deliver(String busType, String topic, Object event) {
1197 * Note this entry point is usually from the DRL (one of the reasons busType is String.
1200 if (StringUtils.isBlank(busType)) {
1201 throw new IllegalArgumentException("Invalid Communication Infrastructure");
1204 if (StringUtils.isBlank(topic)) {
1205 throw new IllegalArgumentException(INVALID_TOPIC_MSG);
1208 if (event == null) {
1209 throw new IllegalArgumentException(INVALID_EVENT_MSG);
1212 boolean valid = Stream.of(Topic.CommInfrastructure.values()).map(Enum::name)
1213 .anyMatch(name -> name.equals(busType));
1216 throw new IllegalArgumentException("Invalid Communication Infrastructure: " + busType);
1220 if (!this.isAlive()) {
1221 throw new IllegalStateException(ENGINE_STOPPED_MSG);
1224 if (this.isLocked()) {
1225 throw new IllegalStateException(ENGINE_LOCKED_MSG);
1229 return this.deliver(Topic.CommInfrastructure.valueOf(busType), topic, event);
1233 public boolean deliver(Topic.CommInfrastructure busType, String topic, Object event) {
1235 if (topic == null || topic.isEmpty()) {
1236 throw new IllegalArgumentException(INVALID_TOPIC_MSG);
1239 if (event == null) {
1240 throw new IllegalArgumentException(INVALID_EVENT_MSG);
1243 if (!this.isAlive()) {
1244 throw new IllegalStateException(ENGINE_STOPPED_MSG);
1247 if (this.isLocked()) {
1248 throw new IllegalStateException(ENGINE_LOCKED_MSG);
1252 * Try to send through the controller, this is the preferred way, since it may want to apply
1253 * additional processing
1256 var droolsController = getProtocolCoder().getDroolsController(topic, event);
1257 final PolicyController controller = getControllerFactory().get(droolsController);
1258 if (controller != null) {
1259 return controller.deliver(busType, topic, event);
1261 } catch (final Exception e) {
1262 logger.warn("{}: cannot find policy-controller to deliver {} over {}:{} because of {}", this, event,
1263 busType, topic, e.getMessage(), e);
1265 /* continue (try without routing through the controller) */
1269 * cannot route through the controller, send directly through the topic sink
1272 final String json = getProtocolCoder().encode(topic, event);
1273 return this.deliver(busType, topic, json);
1275 } catch (final Exception e) {
1276 logger.warn("{}: cannot deliver {} over {}:{}", this, event, busType, topic);
1282 public boolean deliver(Topic.CommInfrastructure busType, String topic, String event) {
1284 if (topic == null || topic.isEmpty()) {
1285 throw new IllegalArgumentException(INVALID_TOPIC_MSG);
1288 if (event == null || event.isEmpty()) {
1289 throw new IllegalArgumentException(INVALID_EVENT_MSG);
1292 if (!this.isAlive()) {
1293 throw new IllegalStateException(ENGINE_STOPPED_MSG);
1296 if (this.isLocked()) {
1297 throw new IllegalStateException(ENGINE_LOCKED_MSG);
1301 var sink = getTopicEndpointManager().getTopicSink(busType, topic);
1304 throw new IllegalStateException("Inconsistent State: " + this);
1307 return sink.send(event);
1309 } catch (final Exception e) {
1310 logger.warn("{}: cannot deliver {} over {}:{}", this, event, busType, topic);
1316 public synchronized void activate() {
1318 /* policy-engine dispatch pre activate hook */
1319 if (FeatureApiUtils.apply(getEngineProviders(),
1320 feature -> feature.beforeActivate(this),
1321 (feature, ex) -> logger.error("{}: feature {} before-activate failure because of {}", this,
1322 feature.getClass().getName(), ex.getMessage(), ex))) {
1326 // activate 'policy-management'
1327 for (final PolicyController policyController : this.getPolicyControllers()) {
1329 policyController.unlock();
1330 policyController.start();
1331 } catch (final Exception e) {
1332 logger.error("{}: cannot activate of policy-controller {} because of {}", this, policyController,
1334 } catch (final LinkageError e) {
1335 logger.error("{}: cannot activate (rules compilation) of policy-controller {} because of {}", this,
1336 policyController, e.getMessage(), e);
1342 /* policy-engine dispatch post activate hook */
1343 FeatureApiUtils.apply(getEngineProviders(),
1344 feature -> feature.afterActivate(this),
1345 (feature, ex) -> logger.error("{}: feature {} after-activate failure because of {}", this,
1346 feature.getClass().getName(), ex.getMessage(), ex));
1350 public synchronized void deactivate() {
1352 /* policy-engine dispatch pre deactivate hook */
1353 if (FeatureApiUtils.apply(getEngineProviders(),
1354 feature -> feature.beforeDeactivate(this),
1355 (feature, ex) -> logger.error("{}: feature {} before-deactivate failure because of {}", this,
1356 feature.getClass().getName(), ex.getMessage(), ex))) {
1362 for (final PolicyController policyController : this.getPolicyControllers()) {
1364 policyController.stop();
1365 } catch (final Exception | LinkageError e) {
1366 logger.error("{}: cannot deactivate (stop) policy-controller {} because of {}", this, policyController,
1371 /* policy-engine dispatch post deactivate hook */
1372 FeatureApiUtils.apply(getEngineProviders(),
1373 feature -> feature.afterDeactivate(this),
1374 (feature, ex) -> logger.error("{}: feature {} after-deactivate failure because of {}", this,
1375 feature.getClass().getName(), ex.getMessage(), ex));
1379 public Lock createLock(@NonNull String resourceId, @NonNull String ownerKey, int holdSec,
1380 @NonNull LockCallback callback, boolean waitForLock) {
1383 throw new IllegalArgumentException("holdSec is negative");
1386 if (lockManager == null) {
1387 throw new IllegalStateException("lock manager has not been initialized");
1390 return lockManager.createLock(resourceId, ownerKey, holdSec, callback, waitForLock);
1393 private boolean controllerConfig(PdpdConfiguration config) {
1394 /* only this one supported for now */
1395 final List<ControllerConfiguration> configControllers = config.getControllers();
1396 if (configControllers == null || configControllers.isEmpty()) {
1397 logger.info("No controller configuration provided: {}", config);
1401 final List<PolicyController> policyControllers = this.updatePolicyControllers(config.getControllers());
1402 return (policyControllers != null && !policyControllers.isEmpty()
1403 && policyControllers.size() == configControllers.size());
1406 // these methods may be overridden by junit tests
1408 protected List<PolicyEngineFeatureApi> getEngineProviders() {
1409 return PolicyEngineFeatureApiConstants.getProviders().getList();
1412 protected List<PolicyControllerFeatureApi> getControllerProviders() {
1413 return PolicyControllerFeatureApiConstants.getProviders().getList();
1416 protected void globalInitContainer(String[] cliArgs) {
1417 PolicyContainer.globalInit(cliArgs);
1420 protected TopicEndpoint getTopicEndpointManager() {
1421 return TopicEndpointManager.getManager();
1424 protected HttpServletServerFactory getServletFactory() {
1425 return HttpServletServerFactoryInstance.getServerFactory();
1428 protected HttpClientFactory getHttpClientFactory() {
1429 return HttpClientFactoryInstance.getClientFactory();
1432 protected PolicyControllerFactory getControllerFactory() {
1433 return PolicyControllerConstants.getFactory();
1436 protected void startPdpJmxListener() {
1437 PdpJmxListener.start();
1440 protected void stopPdpJmxListener() {
1441 PdpJmxListener.stop();
1444 protected Thread makeShutdownThread() {
1445 return new ShutdownThread();
1448 protected EventProtocolCoder getProtocolCoder() {
1449 return EventProtocolCoderConstants.getManager();
1452 protected SystemPersistence getPersistenceManager() {
1453 return SystemPersistenceConstants.getManager();
1456 protected PolicyEngine getPolicyEngine() {
1457 return PolicyEngineConstants.getManager();
1460 protected ScheduledExecutorService makeScheduledExecutor(int nthreads) {
1461 var exsvc = new ScheduledThreadPoolExecutor(nthreads);
1462 exsvc.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
1463 exsvc.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
1464 exsvc.setRemoveOnCancelPolicy(true);