2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.system.internal;
23 import com.fasterxml.jackson.annotation.JsonIgnore;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Properties;
29 import java.util.concurrent.atomic.AtomicReference;
30 import java.util.stream.Collectors;
31 import org.onap.policy.common.endpoints.event.comm.Topic;
32 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
33 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
34 import org.onap.policy.common.endpoints.event.comm.TopicListener;
35 import org.onap.policy.common.endpoints.event.comm.TopicSink;
36 import org.onap.policy.common.endpoints.event.comm.TopicSource;
37 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
38 import org.onap.policy.common.utils.services.FeatureApiUtils;
39 import org.onap.policy.drools.controller.DroolsController;
40 import org.onap.policy.drools.controller.DroolsControllerConstants;
41 import org.onap.policy.drools.controller.DroolsControllerFactory;
42 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
43 import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants;
44 import org.onap.policy.drools.persistence.SystemPersistence;
45 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
46 import org.onap.policy.drools.properties.DroolsPropertyConstants;
47 import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
48 import org.onap.policy.drools.system.PolicyController;
49 import org.onap.policy.drools.utils.PropertyUtil;
50 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyTypeIdentifier;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * This implementation of the Policy Controller merely aggregates and tracks for management purposes
56 * all underlying resources that this controller depends upon.
58 public class AggregatedPolicyController implements PolicyController, TopicListener {
60 private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}";
61 private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}";
66 private static final Logger logger = LoggerFactory.getLogger(AggregatedPolicyController.class);
69 * identifier for this policy controller.
71 private final String name;
74 * Abstracted Event Sources List regardless communication technology.
76 private final List<TopicSource> sources;
79 * Abstracted Event Sinks List regardless communication technology.
81 private final List<TopicSink> sinks;
84 * Mapping topics to sinks.
88 private final HashMap<String, TopicSink> topic2Sinks = new HashMap<>();
91 * Is this Policy Controller running (alive) ? reflects invocation of start()/stop() only.
93 private volatile boolean alive;
96 * Is this Policy Controller locked ? reflects if i/o controller related operations and start
97 * are permitted, more specifically: start(), deliver() and onTopicEvent(). It does not affect
98 * the ability to stop the underlying drools infrastructure
100 private volatile boolean locked;
103 * Policy Drools Controller.
105 private final AtomicReference<DroolsController> droolsController = new AtomicReference<>();
108 * Properties used to initialize controller.
110 private final Properties properties;
115 private List<ToscaPolicyTypeIdentifier> policyTypes;
118 * Constructor version mainly used for bootstrapping at initialization time a policy engine
121 * @param name controller name
124 * @throws IllegalArgumentException when invalid arguments are provided
126 public AggregatedPolicyController(String name, Properties properties) {
131 * 1. Register read topics with network infrastructure (ueb, dmaap, rest) 2. Register write
132 * topics with network infrastructure (ueb, dmaap, rest) 3. Register with drools
136 // Create/Reuse Readers/Writers for all event sources endpoints
138 this.sources = getEndpointManager().addTopicSources(properties);
139 this.sinks = getEndpointManager().addTopicSinks(properties);
141 initDrools(properties);
144 /* persist new properties */
145 getPersistenceManager().storeController(name, properties);
146 this.properties = PropertyUtil.getInterpolatedProperties(properties);
148 this.policyTypes = getPolicyTypesFromProperties();
152 public List<ToscaPolicyTypeIdentifier> getPolicyTypes() {
153 if (!policyTypes.isEmpty()) {
157 return droolsController
159 .getBaseDomainNames()
161 .map(d -> new ToscaPolicyTypeIdentifier(d,
162 DroolsPropertyConstants.DEFAULT_CONTROLLER_POLICY_TYPE_VERSION))
163 .collect(Collectors.toList());
166 protected List<ToscaPolicyTypeIdentifier> getPolicyTypesFromProperties() {
167 List<ToscaPolicyTypeIdentifier> policyTypeIds = new ArrayList<>();
169 String ptiPropValue = properties.getProperty(DroolsPropertyConstants.PROPERTY_CONTROLLER_POLICY_TYPES);
170 if (ptiPropValue == null) {
171 return policyTypeIds;
174 List<String> ptiPropList = new ArrayList<>(Arrays.asList(ptiPropValue.split("\\s*,\\s*")));
175 for (String pti : ptiPropList) {
176 String[] ptv = pti.split(":");
177 if (ptv.length == 1) {
178 policyTypeIds.add(new ToscaPolicyTypeIdentifier(ptv[0],
179 DroolsPropertyConstants.DEFAULT_CONTROLLER_POLICY_TYPE_VERSION));
180 } else if (ptv.length == 2) {
181 policyTypeIds.add(new ToscaPolicyTypeIdentifier(ptv[0], ptv[1]));
185 return policyTypeIds;
189 * initialize drools layer.
191 * @throws IllegalArgumentException if invalid parameters are passed in
193 private void initDrools(Properties properties) {
195 // Register with drools infrastructure
196 this.droolsController.set(getDroolsFactory().build(properties, sources, sinks));
197 } catch (Exception | LinkageError e) {
198 logger.error("{}: cannot init-drools", this);
199 throw new IllegalArgumentException(e);
206 * @throws IllegalArgumentException if invalid parameters are passed in
208 private void initSinks() {
209 this.topic2Sinks.clear();
210 for (TopicSink sink : sinks) {
211 this.topic2Sinks.put(sink.getTopic(), sink);
219 public boolean updateDrools(DroolsConfiguration newDroolsConfiguration) {
220 DroolsController controller = this.droolsController.get();
221 DroolsConfiguration oldDroolsConfiguration = new DroolsConfiguration(controller.getArtifactId(),
222 controller.getGroupId(), controller.getVersion());
224 if (oldDroolsConfiguration.getGroupId().equalsIgnoreCase(newDroolsConfiguration.getGroupId())
225 && oldDroolsConfiguration.getArtifactId().equalsIgnoreCase(newDroolsConfiguration.getArtifactId())
226 && oldDroolsConfiguration.getVersion().equalsIgnoreCase(newDroolsConfiguration.getVersion())) {
227 logger.warn("{}: cannot update-drools: identical configuration {} vs {}", this, oldDroolsConfiguration,
228 newDroolsConfiguration);
232 if (FeatureApiUtils.apply(getProviders(),
233 feature -> feature.beforePatch(this, oldDroolsConfiguration, newDroolsConfiguration),
234 (feature, ex) -> logger.error("{}: feature {} before-patch failure because of {}", this,
235 feature.getClass().getName(), ex.getMessage(), ex))) {
239 if (controller.isBrained()
240 && (newDroolsConfiguration.getArtifactId() == null
241 || DroolsControllerConstants.NO_ARTIFACT_ID.equals(newDroolsConfiguration.getArtifactId()))) {
242 // detach maven artifact
243 DroolsControllerConstants.getFactory().destroy(controller);
246 boolean success = true;
248 this.properties.setProperty(DroolsPropertyConstants.RULES_GROUPID, newDroolsConfiguration.getGroupId());
249 this.properties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
250 newDroolsConfiguration.getArtifactId());
251 this.properties.setProperty(DroolsPropertyConstants.RULES_VERSION, newDroolsConfiguration.getVersion());
252 getPersistenceManager().storeController(name, this.properties);
254 this.initDrools(this.properties);
256 // have a new controller now - get it
257 controller = this.droolsController.get();
270 } catch (RuntimeException e) {
271 logger.error("{}: cannot update-drools because of {}", this, e.getMessage(), e);
275 boolean finalSuccess = success;
276 FeatureApiUtils.apply(getProviders(),
277 feature -> feature.afterPatch(this, oldDroolsConfiguration, newDroolsConfiguration, finalSuccess),
278 (feature, ex) -> logger.error("{}: feature {} after-patch failure because of {}", this,
279 feature.getClass().getName(), ex.getMessage(), ex));
288 public String getName() {
296 public boolean start() {
297 logger.info("{}: start", this);
299 if (FeatureApiUtils.apply(getProviders(),
300 feature -> feature.beforeStart(this),
301 (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
302 feature.getClass().getName(), ex.getMessage(), ex))) {
306 if (this.isLocked()) {
307 throw new IllegalStateException("Policy Controller " + name + " is locked");
310 synchronized (this) {
318 final boolean success = this.droolsController.get().start();
320 // register for events
322 for (TopicSource source : sources) {
323 source.register(this);
326 for (TopicSink sink : sinks) {
329 } catch (Exception e) {
330 logger.error("{}: cannot start {} because of {}", this, sink, e.getMessage(), e);
334 FeatureApiUtils.apply(getProviders(),
335 feature -> feature.afterStart(this),
336 (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
337 feature.getClass().getName(), ex.getMessage(), ex));
346 public boolean stop() {
347 logger.info("{}: stop", this);
349 if (FeatureApiUtils.apply(getProviders(),
350 feature -> feature.beforeStop(this),
351 (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
352 feature.getClass().getName(), ex.getMessage(), ex))) {
356 /* stop regardless locked state */
358 synchronized (this) {
366 // 1. Stop registration
368 for (TopicSource source : sources) {
369 source.unregister(this);
372 boolean success = this.droolsController.get().stop();
374 FeatureApiUtils.apply(getProviders(),
375 feature -> feature.afterStop(this),
376 (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
377 feature.getClass().getName(), ex.getMessage(), ex));
386 public void shutdown() {
387 logger.info("{}: shutdown", this);
389 if (FeatureApiUtils.apply(getProviders(),
390 feature -> feature.beforeShutdown(this),
391 (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
392 feature.getClass().getName(), ex.getMessage(), ex))) {
398 getDroolsFactory().shutdown(this.droolsController.get());
400 FeatureApiUtils.apply(getProviders(),
401 feature -> feature.afterShutdown(this),
402 (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
403 feature.getClass().getName(), ex.getMessage(), ex));
411 logger.info("{}: halt", this);
413 if (FeatureApiUtils.apply(getProviders(),
414 feature -> feature.beforeHalt(this),
415 (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this,
416 feature.getClass().getName(), ex.getMessage(), ex))) {
421 getDroolsFactory().destroy(this.droolsController.get());
422 getPersistenceManager().deleteController(this.name);
424 FeatureApiUtils.apply(getProviders(),
425 feature -> feature.afterHalt(this),
426 (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this,
427 feature.getClass().getName(), ex.getMessage(), ex));
434 public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event) {
435 logger.debug("{}: raw event offered from {}:{}: {}", this, commType, topic, event);
441 if (FeatureApiUtils.apply(getProviders(),
442 feature -> feature.beforeOffer(this, commType, topic, event),
443 (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
444 feature.getClass().getName(), ex.getMessage(), ex))) {
448 boolean success = this.droolsController.get().offer(topic, event);
450 FeatureApiUtils.apply(getProviders(),
451 feature -> feature.afterOffer(this, commType, topic, event, success),
452 (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
453 feature.getClass().getName(), ex.getMessage(), ex));
457 public <T> boolean offer(T event) {
458 logger.debug("{}: event offered: {}", this, event);
464 if (FeatureApiUtils.apply(getProviders(),
465 feature -> feature.beforeOffer(this, event),
466 (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
467 feature.getClass().getName(), ex.getMessage(), ex))) {
471 boolean success = this.droolsController.get().offer(event);
473 FeatureApiUtils.apply(getProviders(),
474 feature -> feature.afterOffer(this, event, success),
475 (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
476 feature.getClass().getName(), ex.getMessage(), ex));
481 private boolean skipOffer() {
482 return isLocked() || !isAlive();
489 public boolean deliver(Topic.CommInfrastructure commType, String topic, Object event) {
491 logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event);
493 if (FeatureApiUtils.apply(getProviders(),
494 feature -> feature.beforeDeliver(this, commType, topic, event),
495 (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this,
496 feature.getClass().getName(), ex.getMessage(), ex))) {
500 if (topic == null || topic.isEmpty()) {
501 throw new IllegalArgumentException("Invalid Topic");
505 throw new IllegalArgumentException("Invalid Event");
508 if (!this.isAlive()) {
509 throw new IllegalStateException("Policy Engine is stopped");
512 if (this.isLocked()) {
513 throw new IllegalStateException("Policy Engine is locked");
516 if (!this.topic2Sinks.containsKey(topic)) {
517 logger.warn("{}: cannot deliver event because the sink {}:{} is not registered: {}", this, commType, topic,
519 throw new IllegalArgumentException("Unsupported topic " + topic + " for delivery");
522 boolean success = this.droolsController.get().deliver(this.topic2Sinks.get(topic), event);
524 FeatureApiUtils.apply(getProviders(),
525 feature -> feature.afterDeliver(this, commType, topic, event, success),
526 (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this,
527 feature.getClass().getName(), ex.getMessage(), ex));
536 public boolean isAlive() {
544 public boolean lock() {
545 logger.info("{}: lock", this);
547 if (FeatureApiUtils.apply(getProviders(),
548 feature -> feature.beforeLock(this),
549 (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
550 feature.getClass().getName(), ex.getMessage(), ex))) {
554 synchronized (this) {
562 // it does not affect associated sources/sinks, they are
563 // autonomous entities
565 boolean success = this.droolsController.get().lock();
567 FeatureApiUtils.apply(getProviders(),
568 feature -> feature.afterLock(this),
569 (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
570 feature.getClass().getName(), ex.getMessage(), ex));
579 public boolean unlock() {
581 logger.info("{}: unlock", this);
583 if (FeatureApiUtils.apply(getProviders(),
584 feature -> feature.beforeUnlock(this),
585 (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
586 feature.getClass().getName(), ex.getMessage(), ex))) {
590 synchronized (this) {
598 boolean success = this.droolsController.get().unlock();
600 FeatureApiUtils.apply(getProviders(),
601 feature -> feature.afterUnlock(this),
602 (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
603 feature.getClass().getName(), ex.getMessage(), ex));
612 public boolean isLocked() {
620 public List<TopicSource> getTopicSources() {
628 public List<TopicSink> getTopicSinks() {
636 public DroolsController getDrools() {
637 return this.droolsController.get();
647 public Properties getProperties() {
648 return this.properties;
652 public String toString() {
653 return "AggregatedPolicyController [name=" + name + ", alive=" + alive
654 + ", locked=" + locked + ", droolsController=" + droolsController + "]";
657 // the following methods may be overridden by junit tests
659 protected SystemPersistence getPersistenceManager() {
660 return SystemPersistenceConstants.getManager();
663 protected TopicEndpoint getEndpointManager() {
664 return TopicEndpointManager.getManager();
667 protected DroolsControllerFactory getDroolsFactory() {
668 return DroolsControllerConstants.getFactory();
671 protected List<PolicyControllerFeatureApi> getProviders() {
672 return PolicyControllerFeatureApiConstants.getProviders().getList();