2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * Modifications Copyright (C) 2021 Nordix Foundation.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.drools.system.internal;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Properties;
29 import java.util.concurrent.atomic.AtomicReference;
30 import java.util.stream.Collectors;
31 import org.onap.policy.common.endpoints.event.comm.Topic;
32 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
33 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
34 import org.onap.policy.common.endpoints.event.comm.TopicListener;
35 import org.onap.policy.common.endpoints.event.comm.TopicSink;
36 import org.onap.policy.common.endpoints.event.comm.TopicSource;
37 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
38 import org.onap.policy.common.utils.services.FeatureApiUtils;
39 import org.onap.policy.drools.controller.DroolsController;
40 import org.onap.policy.drools.controller.DroolsControllerConstants;
41 import org.onap.policy.drools.controller.DroolsControllerFactory;
42 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
43 import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants;
44 import org.onap.policy.drools.persistence.SystemPersistence;
45 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
46 import org.onap.policy.drools.properties.DroolsPropertyConstants;
47 import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
48 import org.onap.policy.drools.system.PolicyController;
49 import org.onap.policy.drools.utils.PropertyUtil;
50 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * This implementation of the Policy Controller merely aggregates and tracks for management purposes
56 * all underlying resources that this controller depends upon.
58 public class AggregatedPolicyController implements PolicyController, TopicListener {
60 private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}";
61 private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}";
66 private static final Logger logger = LoggerFactory.getLogger(AggregatedPolicyController.class);
69 * identifier for this policy controller.
71 private final String name;
74 * Abstracted Event Sources List regardless communication technology.
76 protected final List<TopicSource> sources;
79 * Abstracted Event Sinks List regardless communication technology.
81 protected final List<TopicSink> sinks;
84 * Mapping topics to sinks.
87 private final HashMap<String, TopicSink> topic2Sinks = new HashMap<>();
90 * Is this Policy Controller running (alive) ? reflects invocation of start()/stop() only.
92 private volatile boolean alive;
95 * Is this Policy Controller locked ? reflects if i/o controller related operations and start
96 * are permitted, more specifically: start(), deliver() and onTopicEvent(). It does not affect
97 * the ability to stop the underlying drools infrastructure
99 private volatile boolean locked;
102 * Policy Drools Controller.
104 protected final AtomicReference<DroolsController> droolsController = new AtomicReference<>();
107 * Properties used to initialize controller.
109 private final Properties properties;
114 private List<ToscaConceptIdentifier> policyTypes;
117 * Constructor version mainly used for bootstrapping at initialization time a policy engine
120 * @param name controller name
123 * @throws IllegalArgumentException when invalid arguments are provided
125 public AggregatedPolicyController(String name, Properties properties) {
130 * 1. Register read topics with network infrastructure (ueb, dmaap, rest) 2. Register write
131 * topics with network infrastructure (ueb, dmaap, rest) 3. Register with drools
135 // Create/Reuse Readers/Writers for all event sources endpoints
137 this.sources = getEndpointManager().addTopicSources(properties);
138 this.sinks = getEndpointManager().addTopicSinks(properties);
140 initDrools(properties);
143 /* persist new properties */
144 getPersistenceManager().storeController(name, properties);
145 this.properties = PropertyUtil.getInterpolatedProperties(properties);
147 this.policyTypes = getPolicyTypesFromProperties();
151 public List<ToscaConceptIdentifier> getPolicyTypes() {
152 if (!policyTypes.isEmpty()) {
156 return droolsController
158 .getBaseDomainNames()
160 .map(d -> new ToscaConceptIdentifier(d,
161 DroolsPropertyConstants.DEFAULT_CONTROLLER_POLICY_TYPE_VERSION))
162 .collect(Collectors.toList());
165 protected List<ToscaConceptIdentifier> getPolicyTypesFromProperties() {
166 List<ToscaConceptIdentifier> policyTypeIds = new ArrayList<>();
168 String ptiPropValue = properties.getProperty(DroolsPropertyConstants.PROPERTY_CONTROLLER_POLICY_TYPES);
169 if (ptiPropValue == null) {
170 return policyTypeIds;
173 List<String> ptiPropList = new ArrayList<>(Arrays.asList(ptiPropValue.split("\\s*,\\s*")));
174 for (String pti : ptiPropList) {
175 String[] ptv = pti.split(":");
176 if (ptv.length == 1) {
177 policyTypeIds.add(new ToscaConceptIdentifier(ptv[0],
178 DroolsPropertyConstants.DEFAULT_CONTROLLER_POLICY_TYPE_VERSION));
179 } else if (ptv.length == 2) {
180 policyTypeIds.add(new ToscaConceptIdentifier(ptv[0], ptv[1]));
184 return policyTypeIds;
188 * initialize drools layer.
190 * @throws IllegalArgumentException if invalid parameters are passed in
192 protected void initDrools(Properties properties) {
194 // Register with drools infrastructure
195 this.droolsController.set(getDroolsFactory().build(properties, sources, sinks));
196 } catch (Exception | LinkageError e) {
197 logger.error("{}: cannot init-drools", this);
198 throw new IllegalArgumentException(e);
205 * @throws IllegalArgumentException if invalid parameters are passed in
207 private void initSinks() {
208 this.topic2Sinks.clear();
209 for (TopicSink sink : sinks) {
210 this.topic2Sinks.put(sink.getTopic(), sink);
218 public boolean updateDrools(DroolsConfiguration newDroolsConfiguration) {
219 DroolsController controller = this.droolsController.get();
220 DroolsConfiguration oldDroolsConfiguration = new DroolsConfiguration(controller.getArtifactId(),
221 controller.getGroupId(), controller.getVersion());
223 if (oldDroolsConfiguration.getGroupId().equalsIgnoreCase(newDroolsConfiguration.getGroupId())
224 && oldDroolsConfiguration.getArtifactId().equalsIgnoreCase(newDroolsConfiguration.getArtifactId())
225 && oldDroolsConfiguration.getVersion().equalsIgnoreCase(newDroolsConfiguration.getVersion())) {
226 logger.warn("{}: cannot update-drools: identical configuration {} vs {}", this, oldDroolsConfiguration,
227 newDroolsConfiguration);
231 if (FeatureApiUtils.apply(getProviders(),
232 feature -> feature.beforePatch(this, oldDroolsConfiguration, newDroolsConfiguration),
233 (feature, ex) -> logger.error("{}: feature {} before-patch failure because of {}", this,
234 feature.getClass().getName(), ex.getMessage(), ex))) {
238 if (controller.isBrained()
239 && (newDroolsConfiguration.getArtifactId() == null
240 || DroolsControllerConstants.NO_ARTIFACT_ID.equals(newDroolsConfiguration.getArtifactId()))) {
241 // detach maven artifact
242 DroolsControllerConstants.getFactory().destroy(controller);
245 boolean success = true;
247 this.properties.setProperty(DroolsPropertyConstants.RULES_GROUPID, newDroolsConfiguration.getGroupId());
248 this.properties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
249 newDroolsConfiguration.getArtifactId());
250 this.properties.setProperty(DroolsPropertyConstants.RULES_VERSION, newDroolsConfiguration.getVersion());
251 getPersistenceManager().storeController(name, this.properties);
253 this.initDrools(this.properties);
255 // have a new controller now - get it
256 controller = this.droolsController.get();
269 } catch (RuntimeException e) {
270 logger.error("{}: cannot update-drools because of {}", this, e.getMessage(), e);
274 boolean finalSuccess = success;
275 FeatureApiUtils.apply(getProviders(),
276 feature -> feature.afterPatch(this, oldDroolsConfiguration, newDroolsConfiguration, finalSuccess),
277 (feature, ex) -> logger.error("{}: feature {} after-patch failure because of {}", this,
278 feature.getClass().getName(), ex.getMessage(), ex));
287 public String getName() {
295 public boolean start() {
296 logger.info("{}: start", this);
298 if (FeatureApiUtils.apply(getProviders(),
299 feature -> feature.beforeStart(this),
300 (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
301 feature.getClass().getName(), ex.getMessage(), ex))) {
305 if (this.isLocked()) {
306 throw new IllegalStateException("Policy Controller " + name + " is locked");
309 synchronized (this) {
317 final boolean success = this.droolsController.get().start();
319 // register for events
321 for (TopicSource source : sources) {
322 source.register(this);
325 for (TopicSink sink : sinks) {
328 } catch (Exception e) {
329 logger.error("{}: cannot start {} because of {}", this, sink, e.getMessage(), e);
333 FeatureApiUtils.apply(getProviders(),
334 feature -> feature.afterStart(this),
335 (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
336 feature.getClass().getName(), ex.getMessage(), ex));
345 public boolean stop() {
346 logger.info("{}: stop", this);
348 if (FeatureApiUtils.apply(getProviders(),
349 feature -> feature.beforeStop(this),
350 (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
351 feature.getClass().getName(), ex.getMessage(), ex))) {
355 /* stop regardless locked state */
357 synchronized (this) {
365 // 1. Stop registration
367 for (TopicSource source : sources) {
368 source.unregister(this);
371 boolean success = this.droolsController.get().stop();
373 FeatureApiUtils.apply(getProviders(),
374 feature -> feature.afterStop(this),
375 (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
376 feature.getClass().getName(), ex.getMessage(), ex));
385 public void shutdown() {
386 logger.info("{}: shutdown", this);
388 if (FeatureApiUtils.apply(getProviders(),
389 feature -> feature.beforeShutdown(this),
390 (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
391 feature.getClass().getName(), ex.getMessage(), ex))) {
397 getDroolsFactory().shutdown(this.droolsController.get());
399 FeatureApiUtils.apply(getProviders(),
400 feature -> feature.afterShutdown(this),
401 (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
402 feature.getClass().getName(), ex.getMessage(), ex));
410 logger.info("{}: halt", this);
412 if (FeatureApiUtils.apply(getProviders(),
413 feature -> feature.beforeHalt(this),
414 (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this,
415 feature.getClass().getName(), ex.getMessage(), ex))) {
420 getDroolsFactory().destroy(this.droolsController.get());
421 getPersistenceManager().deleteController(this.name);
423 FeatureApiUtils.apply(getProviders(),
424 feature -> feature.afterHalt(this),
425 (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this,
426 feature.getClass().getName(), ex.getMessage(), ex));
433 public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event) {
434 logger.debug("{}: raw event offered from {}:{}: {}", this, commType, topic, event);
440 if (FeatureApiUtils.apply(getProviders(),
441 feature -> feature.beforeOffer(this, commType, topic, event),
442 (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
443 feature.getClass().getName(), ex.getMessage(), ex))) {
447 boolean success = this.droolsController.get().offer(topic, event);
449 FeatureApiUtils.apply(getProviders(),
450 feature -> feature.afterOffer(this, commType, topic, event, success),
451 (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
452 feature.getClass().getName(), ex.getMessage(), ex));
456 public <T> boolean offer(T event) {
457 logger.debug("{}: event offered: {}", this, event);
463 if (FeatureApiUtils.apply(getProviders(),
464 feature -> feature.beforeOffer(this, event),
465 (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
466 feature.getClass().getName(), ex.getMessage(), ex))) {
470 boolean success = this.droolsController.get().offer(event);
472 FeatureApiUtils.apply(getProviders(),
473 feature -> feature.afterOffer(this, event, success),
474 (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
475 feature.getClass().getName(), ex.getMessage(), ex));
480 private boolean skipOffer() {
481 return isLocked() || !isAlive();
488 public boolean deliver(Topic.CommInfrastructure commType, String topic, Object event) {
490 logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event);
492 if (FeatureApiUtils.apply(getProviders(),
493 feature -> feature.beforeDeliver(this, commType, topic, event),
494 (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this,
495 feature.getClass().getName(), ex.getMessage(), ex))) {
499 if (topic == null || topic.isEmpty()) {
500 throw new IllegalArgumentException("Invalid Topic");
504 throw new IllegalArgumentException("Invalid Event");
507 if (!this.isAlive()) {
508 throw new IllegalStateException("Policy Engine is stopped");
511 if (this.isLocked()) {
512 throw new IllegalStateException("Policy Engine is locked");
515 if (!this.topic2Sinks.containsKey(topic)) {
516 logger.warn("{}: cannot deliver event because the sink {}:{} is not registered: {}", this, commType, topic,
518 throw new IllegalArgumentException("Unsupported topic " + topic + " for delivery");
521 boolean success = this.droolsController.get().deliver(this.topic2Sinks.get(topic), event);
523 FeatureApiUtils.apply(getProviders(),
524 feature -> feature.afterDeliver(this, commType, topic, event, success),
525 (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this,
526 feature.getClass().getName(), ex.getMessage(), ex));
535 public boolean isAlive() {
543 public boolean lock() {
544 logger.info("{}: lock", this);
546 if (FeatureApiUtils.apply(getProviders(),
547 feature -> feature.beforeLock(this),
548 (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
549 feature.getClass().getName(), ex.getMessage(), ex))) {
553 synchronized (this) {
561 // it does not affect associated sources/sinks, they are
562 // autonomous entities
564 boolean success = this.droolsController.get().lock();
566 FeatureApiUtils.apply(getProviders(),
567 feature -> feature.afterLock(this),
568 (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
569 feature.getClass().getName(), ex.getMessage(), ex));
578 public boolean unlock() {
580 logger.info("{}: unlock", this);
582 if (FeatureApiUtils.apply(getProviders(),
583 feature -> feature.beforeUnlock(this),
584 (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
585 feature.getClass().getName(), ex.getMessage(), ex))) {
589 synchronized (this) {
597 boolean success = this.droolsController.get().unlock();
599 FeatureApiUtils.apply(getProviders(),
600 feature -> feature.afterUnlock(this),
601 (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
602 feature.getClass().getName(), ex.getMessage(), ex));
611 public boolean isLocked() {
619 public List<TopicSource> getTopicSources() {
627 public List<TopicSink> getTopicSinks() {
635 public DroolsController getDrools() {
636 return this.droolsController.get();
645 public Properties getProperties() {
646 return this.properties;
650 public String toString() {
651 return "AggregatedPolicyController [name=" + name + ", alive=" + alive
652 + ", locked=" + locked + ", droolsController=" + droolsController + "]";
655 // the following methods may be overridden by junit tests
657 protected SystemPersistence getPersistenceManager() {
658 return SystemPersistenceConstants.getManager();
661 protected TopicEndpoint getEndpointManager() {
662 return TopicEndpointManager.getManager();
665 protected DroolsControllerFactory getDroolsFactory() {
666 return DroolsControllerConstants.getFactory();
669 protected List<PolicyControllerFeatureApi> getProviders() {
670 return PolicyControllerFeatureApiConstants.getProviders().getList();