2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.system.internal;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Properties;
28 import java.util.concurrent.atomic.AtomicReference;
29 import java.util.stream.Collectors;
30 import org.onap.policy.common.endpoints.event.comm.Topic;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
32 import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
33 import org.onap.policy.common.endpoints.event.comm.TopicListener;
34 import org.onap.policy.common.endpoints.event.comm.TopicSink;
35 import org.onap.policy.common.endpoints.event.comm.TopicSource;
36 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
37 import org.onap.policy.common.utils.services.FeatureApiUtils;
38 import org.onap.policy.drools.controller.DroolsController;
39 import org.onap.policy.drools.controller.DroolsControllerConstants;
40 import org.onap.policy.drools.controller.DroolsControllerFactory;
41 import org.onap.policy.drools.features.PolicyControllerFeatureApi;
42 import org.onap.policy.drools.features.PolicyControllerFeatureApiConstants;
43 import org.onap.policy.drools.persistence.SystemPersistence;
44 import org.onap.policy.drools.persistence.SystemPersistenceConstants;
45 import org.onap.policy.drools.properties.DroolsPropertyConstants;
46 import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
47 import org.onap.policy.drools.system.PolicyController;
48 import org.onap.policy.drools.utils.PropertyUtil;
49 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyTypeIdentifier;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 * This implementation of the Policy Controller merely aggregates and tracks for management purposes
55 * all underlying resources that this controller depends upon.
57 public class AggregatedPolicyController implements PolicyController, TopicListener {
59 private static final String BEFORE_OFFER_FAILURE = "{}: feature {} before-offer failure because of {}";
60 private static final String AFTER_OFFER_FAILURE = "{}: feature {} after-offer failure because of {}";
65 private static final Logger logger = LoggerFactory.getLogger(AggregatedPolicyController.class);
68 * identifier for this policy controller.
70 private final String name;
73 * Abstracted Event Sources List regardless communication technology.
75 protected final List<TopicSource> sources;
78 * Abstracted Event Sinks List regardless communication technology.
80 protected final List<TopicSink> sinks;
83 * Mapping topics to sinks.
86 private final HashMap<String, TopicSink> topic2Sinks = new HashMap<>();
89 * Is this Policy Controller running (alive) ? reflects invocation of start()/stop() only.
91 private volatile boolean alive;
94 * Is this Policy Controller locked ? reflects if i/o controller related operations and start
95 * are permitted, more specifically: start(), deliver() and onTopicEvent(). It does not affect
96 * the ability to stop the underlying drools infrastructure
98 private volatile boolean locked;
101 * Policy Drools Controller.
103 protected final AtomicReference<DroolsController> droolsController = new AtomicReference<>();
106 * Properties used to initialize controller.
108 private final Properties properties;
113 private List<ToscaPolicyTypeIdentifier> policyTypes;
116 * Constructor version mainly used for bootstrapping at initialization time a policy engine
119 * @param name controller name
122 * @throws IllegalArgumentException when invalid arguments are provided
124 public AggregatedPolicyController(String name, Properties properties) {
129 * 1. Register read topics with network infrastructure (ueb, dmaap, rest) 2. Register write
130 * topics with network infrastructure (ueb, dmaap, rest) 3. Register with drools
134 // Create/Reuse Readers/Writers for all event sources endpoints
136 this.sources = getEndpointManager().addTopicSources(properties);
137 this.sinks = getEndpointManager().addTopicSinks(properties);
139 initDrools(properties);
142 /* persist new properties */
143 getPersistenceManager().storeController(name, properties);
144 this.properties = PropertyUtil.getInterpolatedProperties(properties);
146 this.policyTypes = getPolicyTypesFromProperties();
150 public List<ToscaPolicyTypeIdentifier> getPolicyTypes() {
151 if (!policyTypes.isEmpty()) {
155 return droolsController
157 .getBaseDomainNames()
159 .map(d -> new ToscaPolicyTypeIdentifier(d,
160 DroolsPropertyConstants.DEFAULT_CONTROLLER_POLICY_TYPE_VERSION))
161 .collect(Collectors.toList());
164 protected List<ToscaPolicyTypeIdentifier> getPolicyTypesFromProperties() {
165 List<ToscaPolicyTypeIdentifier> policyTypeIds = new ArrayList<>();
167 String ptiPropValue = properties.getProperty(DroolsPropertyConstants.PROPERTY_CONTROLLER_POLICY_TYPES);
168 if (ptiPropValue == null) {
169 return policyTypeIds;
172 List<String> ptiPropList = new ArrayList<>(Arrays.asList(ptiPropValue.split("\\s*,\\s*")));
173 for (String pti : ptiPropList) {
174 String[] ptv = pti.split(":");
175 if (ptv.length == 1) {
176 policyTypeIds.add(new ToscaPolicyTypeIdentifier(ptv[0],
177 DroolsPropertyConstants.DEFAULT_CONTROLLER_POLICY_TYPE_VERSION));
178 } else if (ptv.length == 2) {
179 policyTypeIds.add(new ToscaPolicyTypeIdentifier(ptv[0], ptv[1]));
183 return policyTypeIds;
187 * initialize drools layer.
189 * @throws IllegalArgumentException if invalid parameters are passed in
191 protected void initDrools(Properties properties) {
193 // Register with drools infrastructure
194 this.droolsController.set(getDroolsFactory().build(properties, sources, sinks));
195 } catch (Exception | LinkageError e) {
196 logger.error("{}: cannot init-drools", this);
197 throw new IllegalArgumentException(e);
204 * @throws IllegalArgumentException if invalid parameters are passed in
206 private void initSinks() {
207 this.topic2Sinks.clear();
208 for (TopicSink sink : sinks) {
209 this.topic2Sinks.put(sink.getTopic(), sink);
217 public boolean updateDrools(DroolsConfiguration newDroolsConfiguration) {
218 DroolsController controller = this.droolsController.get();
219 DroolsConfiguration oldDroolsConfiguration = new DroolsConfiguration(controller.getArtifactId(),
220 controller.getGroupId(), controller.getVersion());
222 if (oldDroolsConfiguration.getGroupId().equalsIgnoreCase(newDroolsConfiguration.getGroupId())
223 && oldDroolsConfiguration.getArtifactId().equalsIgnoreCase(newDroolsConfiguration.getArtifactId())
224 && oldDroolsConfiguration.getVersion().equalsIgnoreCase(newDroolsConfiguration.getVersion())) {
225 logger.warn("{}: cannot update-drools: identical configuration {} vs {}", this, oldDroolsConfiguration,
226 newDroolsConfiguration);
230 if (FeatureApiUtils.apply(getProviders(),
231 feature -> feature.beforePatch(this, oldDroolsConfiguration, newDroolsConfiguration),
232 (feature, ex) -> logger.error("{}: feature {} before-patch failure because of {}", this,
233 feature.getClass().getName(), ex.getMessage(), ex))) {
237 if (controller.isBrained()
238 && (newDroolsConfiguration.getArtifactId() == null
239 || DroolsControllerConstants.NO_ARTIFACT_ID.equals(newDroolsConfiguration.getArtifactId()))) {
240 // detach maven artifact
241 DroolsControllerConstants.getFactory().destroy(controller);
244 boolean success = true;
246 this.properties.setProperty(DroolsPropertyConstants.RULES_GROUPID, newDroolsConfiguration.getGroupId());
247 this.properties.setProperty(DroolsPropertyConstants.RULES_ARTIFACTID,
248 newDroolsConfiguration.getArtifactId());
249 this.properties.setProperty(DroolsPropertyConstants.RULES_VERSION, newDroolsConfiguration.getVersion());
250 getPersistenceManager().storeController(name, this.properties);
252 this.initDrools(this.properties);
254 // have a new controller now - get it
255 controller = this.droolsController.get();
268 } catch (RuntimeException e) {
269 logger.error("{}: cannot update-drools because of {}", this, e.getMessage(), e);
273 boolean finalSuccess = success;
274 FeatureApiUtils.apply(getProviders(),
275 feature -> feature.afterPatch(this, oldDroolsConfiguration, newDroolsConfiguration, finalSuccess),
276 (feature, ex) -> logger.error("{}: feature {} after-patch failure because of {}", this,
277 feature.getClass().getName(), ex.getMessage(), ex));
286 public String getName() {
294 public boolean start() {
295 logger.info("{}: start", this);
297 if (FeatureApiUtils.apply(getProviders(),
298 feature -> feature.beforeStart(this),
299 (feature, ex) -> logger.error("{}: feature {} before-start failure because of {}", this,
300 feature.getClass().getName(), ex.getMessage(), ex))) {
304 if (this.isLocked()) {
305 throw new IllegalStateException("Policy Controller " + name + " is locked");
308 synchronized (this) {
316 final boolean success = this.droolsController.get().start();
318 // register for events
320 for (TopicSource source : sources) {
321 source.register(this);
324 for (TopicSink sink : sinks) {
327 } catch (Exception e) {
328 logger.error("{}: cannot start {} because of {}", this, sink, e.getMessage(), e);
332 FeatureApiUtils.apply(getProviders(),
333 feature -> feature.afterStart(this),
334 (feature, ex) -> logger.error("{}: feature {} after-start failure because of {}", this,
335 feature.getClass().getName(), ex.getMessage(), ex));
344 public boolean stop() {
345 logger.info("{}: stop", this);
347 if (FeatureApiUtils.apply(getProviders(),
348 feature -> feature.beforeStop(this),
349 (feature, ex) -> logger.error("{}: feature {} before-stop failure because of {}", this,
350 feature.getClass().getName(), ex.getMessage(), ex))) {
354 /* stop regardless locked state */
356 synchronized (this) {
364 // 1. Stop registration
366 for (TopicSource source : sources) {
367 source.unregister(this);
370 boolean success = this.droolsController.get().stop();
372 FeatureApiUtils.apply(getProviders(),
373 feature -> feature.afterStop(this),
374 (feature, ex) -> logger.error("{}: feature {} after-stop failure because of {}", this,
375 feature.getClass().getName(), ex.getMessage(), ex));
384 public void shutdown() {
385 logger.info("{}: shutdown", this);
387 if (FeatureApiUtils.apply(getProviders(),
388 feature -> feature.beforeShutdown(this),
389 (feature, ex) -> logger.error("{}: feature {} before-shutdown failure because of {}", this,
390 feature.getClass().getName(), ex.getMessage(), ex))) {
396 getDroolsFactory().shutdown(this.droolsController.get());
398 FeatureApiUtils.apply(getProviders(),
399 feature -> feature.afterShutdown(this),
400 (feature, ex) -> logger.error("{}: feature {} after-shutdown failure because of {}", this,
401 feature.getClass().getName(), ex.getMessage(), ex));
409 logger.info("{}: halt", this);
411 if (FeatureApiUtils.apply(getProviders(),
412 feature -> feature.beforeHalt(this),
413 (feature, ex) -> logger.error("{}: feature {} before-halt failure because of {}", this,
414 feature.getClass().getName(), ex.getMessage(), ex))) {
419 getDroolsFactory().destroy(this.droolsController.get());
420 getPersistenceManager().deleteController(this.name);
422 FeatureApiUtils.apply(getProviders(),
423 feature -> feature.afterHalt(this),
424 (feature, ex) -> logger.error("{}: feature {} after-halt failure because of {}", this,
425 feature.getClass().getName(), ex.getMessage(), ex));
432 public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event) {
433 logger.debug("{}: raw event offered from {}:{}: {}", this, commType, topic, event);
439 if (FeatureApiUtils.apply(getProviders(),
440 feature -> feature.beforeOffer(this, commType, topic, event),
441 (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
442 feature.getClass().getName(), ex.getMessage(), ex))) {
446 boolean success = this.droolsController.get().offer(topic, event);
448 FeatureApiUtils.apply(getProviders(),
449 feature -> feature.afterOffer(this, commType, topic, event, success),
450 (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
451 feature.getClass().getName(), ex.getMessage(), ex));
455 public <T> boolean offer(T event) {
456 logger.debug("{}: event offered: {}", this, event);
462 if (FeatureApiUtils.apply(getProviders(),
463 feature -> feature.beforeOffer(this, event),
464 (feature, ex) -> logger.error(BEFORE_OFFER_FAILURE, this,
465 feature.getClass().getName(), ex.getMessage(), ex))) {
469 boolean success = this.droolsController.get().offer(event);
471 FeatureApiUtils.apply(getProviders(),
472 feature -> feature.afterOffer(this, event, success),
473 (feature, ex) -> logger.error(AFTER_OFFER_FAILURE, this,
474 feature.getClass().getName(), ex.getMessage(), ex));
479 private boolean skipOffer() {
480 return isLocked() || !isAlive();
487 public boolean deliver(Topic.CommInfrastructure commType, String topic, Object event) {
489 logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event);
491 if (FeatureApiUtils.apply(getProviders(),
492 feature -> feature.beforeDeliver(this, commType, topic, event),
493 (feature, ex) -> logger.error("{}: feature {} before-deliver failure because of {}", this,
494 feature.getClass().getName(), ex.getMessage(), ex))) {
498 if (topic == null || topic.isEmpty()) {
499 throw new IllegalArgumentException("Invalid Topic");
503 throw new IllegalArgumentException("Invalid Event");
506 if (!this.isAlive()) {
507 throw new IllegalStateException("Policy Engine is stopped");
510 if (this.isLocked()) {
511 throw new IllegalStateException("Policy Engine is locked");
514 if (!this.topic2Sinks.containsKey(topic)) {
515 logger.warn("{}: cannot deliver event because the sink {}:{} is not registered: {}", this, commType, topic,
517 throw new IllegalArgumentException("Unsupported topic " + topic + " for delivery");
520 boolean success = this.droolsController.get().deliver(this.topic2Sinks.get(topic), event);
522 FeatureApiUtils.apply(getProviders(),
523 feature -> feature.afterDeliver(this, commType, topic, event, success),
524 (feature, ex) -> logger.error("{}: feature {} after-deliver failure because of {}", this,
525 feature.getClass().getName(), ex.getMessage(), ex));
534 public boolean isAlive() {
542 public boolean lock() {
543 logger.info("{}: lock", this);
545 if (FeatureApiUtils.apply(getProviders(),
546 feature -> feature.beforeLock(this),
547 (feature, ex) -> logger.error("{}: feature {} before-lock failure because of {}", this,
548 feature.getClass().getName(), ex.getMessage(), ex))) {
552 synchronized (this) {
560 // it does not affect associated sources/sinks, they are
561 // autonomous entities
563 boolean success = this.droolsController.get().lock();
565 FeatureApiUtils.apply(getProviders(),
566 feature -> feature.afterLock(this),
567 (feature, ex) -> logger.error("{}: feature {} after-lock failure because of {}", this,
568 feature.getClass().getName(), ex.getMessage(), ex));
577 public boolean unlock() {
579 logger.info("{}: unlock", this);
581 if (FeatureApiUtils.apply(getProviders(),
582 feature -> feature.beforeUnlock(this),
583 (feature, ex) -> logger.error("{}: feature {} before-unlock failure because of {}", this,
584 feature.getClass().getName(), ex.getMessage(), ex))) {
588 synchronized (this) {
596 boolean success = this.droolsController.get().unlock();
598 FeatureApiUtils.apply(getProviders(),
599 feature -> feature.afterUnlock(this),
600 (feature, ex) -> logger.error("{}: feature {} after-unlock failure because of {}", this,
601 feature.getClass().getName(), ex.getMessage(), ex));
610 public boolean isLocked() {
618 public List<TopicSource> getTopicSources() {
626 public List<TopicSink> getTopicSinks() {
634 public DroolsController getDrools() {
635 return this.droolsController.get();
644 public Properties getProperties() {
645 return this.properties;
649 public String toString() {
650 return "AggregatedPolicyController [name=" + name + ", alive=" + alive
651 + ", locked=" + locked + ", droolsController=" + droolsController + "]";
654 // the following methods may be overridden by junit tests
656 protected SystemPersistence getPersistenceManager() {
657 return SystemPersistenceConstants.getManager();
660 protected TopicEndpoint getEndpointManager() {
661 return TopicEndpointManager.getManager();
664 protected DroolsControllerFactory getDroolsFactory() {
665 return DroolsControllerConstants.getFactory();
668 protected List<PolicyControllerFeatureApi> getProviders() {
669 return PolicyControllerFeatureApiConstants.getProviders().getList();