2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.system.internal;
23 import com.fasterxml.jackson.annotation.JsonIgnore;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Properties;
29 import java.util.stream.Collectors;
30 import org.onap.policy.common.endpoints.event.comm.Topic;
31 import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
32 import org.onap.policy.common.endpoints.event.comm.TopicListener;
33 import org.onap.policy.common.endpoints.event.comm.TopicSink;
34 import org.onap.policy.common.endpoints.event.comm.TopicSource;
35 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
36 import org.onap.policy.drools.controller.DroolsController;
37 import org.onap.policy.drools.controller.DroolsControllerFactory;
38 import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
39 import org.onap.policy.drools.persistence.SystemPersistence;
40 import org.onap.policy.drools.properties.DroolsProperties;
41 import org.onap.policy.drools.protocol.configuration.DroolsConfiguration;
42 import org.onap.policy.drools.system.PolicyController;
43 import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicyTypeIdentifier;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
48 * This implementation of the Policy Controller merely aggregates and tracks for management purposes
49 * all underlying resources that this controller depends upon.
51 public class AggregatedPolicyController implements PolicyController, TopicListener {
56 private static final Logger logger = LoggerFactory.getLogger(AggregatedPolicyController.class);
59 * identifier for this policy controller.
61 private final String name;
64 * Abstracted Event Sources List regardless communication technology.
66 private final List<? extends TopicSource> sources;
69 * Abstracted Event Sinks List regardless communication technology.
71 private final List<? extends TopicSink> sinks;
74 * Mapping topics to sinks.
78 private final HashMap<String, TopicSink> topic2Sinks = new HashMap<>();
81 * Is this Policy Controller running (alive) ? reflects invocation of start()/stop() only.
83 private volatile boolean alive;
86 * Is this Policy Controller locked ? reflects if i/o controller related operations and start
87 * are permitted, more specifically: start(), deliver() and onTopicEvent(). It does not affect
88 * the ability to stop the underlying drools infrastructure
90 private volatile boolean locked;
93 * Policy Drools Controller.
95 private volatile DroolsController droolsController;
98 * Properties used to initialize controller.
100 private final Properties properties;
105 private List<ToscaPolicyTypeIdentifier> policyTypes;
108 * Constructor version mainly used for bootstrapping at initialization time a policy engine
111 * @param name controller name
114 * @throws IllegalArgumentException when invalid arguments are provided
116 public AggregatedPolicyController(String name, Properties properties) {
121 * 1. Register read topics with network infrastructure (ueb, dmaap, rest) 2. Register write
122 * topics with network infrastructure (ueb, dmaap, rest) 3. Register with drools
126 // Create/Reuse Readers/Writers for all event sources endpoints
128 this.sources = getEndpointManager().addTopicSources(properties);
129 this.sinks = getEndpointManager().addTopicSinks(properties);
131 initDrools(properties);
134 /* persist new properties */
135 getPersistenceManager().storeController(name, properties);
136 this.properties = properties;
138 this.policyTypes = getPolicyTypesFromProperties();
142 public List<ToscaPolicyTypeIdentifier> getPolicyTypes() {
143 if (!policyTypes.isEmpty()) {
147 return droolsController
148 .getBaseDomainNames()
150 .map(d -> new ToscaPolicyTypeIdentifier(d, DroolsProperties.DEFAULT_CONTROLLER_POLICY_TYPE_VERSION))
151 .collect(Collectors.toList());
154 protected List<ToscaPolicyTypeIdentifier> getPolicyTypesFromProperties() {
155 List<ToscaPolicyTypeIdentifier> policyTypeIds = new ArrayList<>();
157 String ptiPropValue = properties.getProperty(DroolsProperties.PROPERTY_CONTROLLER_POLICY_TYPES);
158 if (ptiPropValue == null) {
159 return policyTypeIds;
162 List<String> ptiPropList = new ArrayList<>(Arrays.asList(ptiPropValue.split("\\s*,\\s*")));
163 for (String pti : ptiPropList) {
164 String[] ptv = pti.split(":");
165 if (ptv.length == 1) {
166 policyTypeIds.add(new ToscaPolicyTypeIdentifier(ptv[0],
167 DroolsProperties.DEFAULT_CONTROLLER_POLICY_TYPE_VERSION));
168 } else if (ptv.length == 2) {
169 policyTypeIds.add(new ToscaPolicyTypeIdentifier(ptv[0], ptv[1]));
173 return policyTypeIds;
177 * initialize drools layer.
179 * @throws IllegalArgumentException if invalid parameters are passed in
181 private void initDrools(Properties properties) {
183 // Register with drools infrastructure
184 this.droolsController = getDroolsFactory().build(properties, sources, sinks);
185 } catch (Exception | LinkageError e) {
186 logger.error("{}: cannot init-drools because of {}", this, e.getMessage(), e);
187 throw new IllegalArgumentException(e);
194 * @throws IllegalArgumentException if invalid parameters are passed in
196 private void initSinks() {
197 this.topic2Sinks.clear();
198 for (TopicSink sink : sinks) {
199 this.topic2Sinks.put(sink.getTopic(), sink);
207 public boolean updateDrools(DroolsConfiguration newDroolsConfiguration) {
209 DroolsConfiguration oldDroolsConfiguration = new DroolsConfiguration(this.droolsController.getArtifactId(),
210 this.droolsController.getGroupId(), this.droolsController.getVersion());
212 if (oldDroolsConfiguration.getGroupId().equalsIgnoreCase(newDroolsConfiguration.getGroupId())
213 && oldDroolsConfiguration.getArtifactId().equalsIgnoreCase(newDroolsConfiguration.getArtifactId())
214 && oldDroolsConfiguration.getVersion().equalsIgnoreCase(newDroolsConfiguration.getVersion())) {
215 logger.warn("{}: cannot update-drools: identical configuration {} vs {}", this, oldDroolsConfiguration,
216 newDroolsConfiguration);
221 /* Drools Controller created, update initialization properties for restarts */
223 this.properties.setProperty(DroolsProperties.RULES_GROUPID, newDroolsConfiguration.getGroupId());
224 this.properties.setProperty(DroolsProperties.RULES_ARTIFACTID, newDroolsConfiguration.getArtifactId());
225 this.properties.setProperty(DroolsProperties.RULES_VERSION, newDroolsConfiguration.getVersion());
227 getPersistenceManager().storeController(name, this.properties);
229 this.initDrools(this.properties);
231 /* set drools controller to current locked status */
233 if (this.isLocked()) {
234 this.droolsController.lock();
236 this.droolsController.unlock();
239 /* set drools controller to current alive status */
241 if (this.isAlive()) {
242 this.droolsController.start();
244 this.droolsController.stop();
247 } catch (IllegalArgumentException e) {
248 logger.error("{}: cannot update-drools because of {}", this, e.getMessage(), e);
259 public String getName() {
267 public boolean start() {
268 logger.info("{}: start", this);
270 for (PolicyControllerFeatureAPI feature : getProviders()) {
272 if (feature.beforeStart(this)) {
275 } catch (Exception e) {
276 logger.error("{}: feature {} before-start failure because of {}", this, feature.getClass().getName(),
281 if (this.isLocked()) {
282 throw new IllegalStateException("Policy Controller " + name + " is locked");
285 synchronized (this) {
293 final boolean success = this.droolsController.start();
295 // register for events
297 for (TopicSource source : sources) {
298 source.register(this);
301 for (TopicSink sink : sinks) {
304 } catch (Exception e) {
305 logger.error("{}: cannot start {} because of {}", this, sink, e.getMessage(), e);
309 for (PolicyControllerFeatureAPI feature : getProviders()) {
311 if (feature.afterStart(this)) {
314 } catch (Exception e) {
315 logger.error("{}: feature {} after-start failure because of {}", this, feature.getClass().getName(),
327 public boolean stop() {
328 logger.info("{}: stop", this);
330 for (PolicyControllerFeatureAPI feature : getProviders()) {
332 if (feature.beforeStop(this)) {
335 } catch (Exception e) {
336 logger.error("{}: feature {} before-stop failure because of {}", this, feature.getClass().getName(),
341 /* stop regardless locked state */
343 synchronized (this) {
351 // 1. Stop registration
353 for (TopicSource source : sources) {
354 source.unregister(this);
357 boolean success = this.droolsController.stop();
359 for (PolicyControllerFeatureAPI feature : getProviders()) {
361 if (feature.afterStop(this)) {
364 } catch (Exception e) {
365 logger.error("{}: feature {} after-stop failure because of {}", this, feature.getClass().getName(),
377 public void shutdown() {
378 logger.info("{}: shutdown", this);
380 for (PolicyControllerFeatureAPI feature : getProviders()) {
382 if (feature.beforeShutdown(this)) {
385 } catch (Exception e) {
386 logger.error("{}: feature {} before-shutdown failure because of {}", this, feature.getClass().getName(),
393 getDroolsFactory().shutdown(this.droolsController);
395 for (PolicyControllerFeatureAPI feature : getProviders()) {
397 if (feature.afterShutdown(this)) {
400 } catch (Exception e) {
401 logger.error("{}: feature {} after-shutdown failure because of {}", this, feature.getClass().getName(),
412 logger.info("{}: halt", this);
414 for (PolicyControllerFeatureAPI feature : getProviders()) {
416 if (feature.beforeHalt(this)) {
419 } catch (Exception e) {
420 logger.error("{}: feature {} before-halt failure because of {}", this, feature.getClass().getName(),
426 getDroolsFactory().destroy(this.droolsController);
427 getPersistenceManager().deleteController(this.name);
429 for (PolicyControllerFeatureAPI feature : getProviders()) {
431 if (feature.afterHalt(this)) {
434 } catch (Exception e) {
435 logger.error("{}: feature {} after-halt failure because of {}", this, feature.getClass().getName(),
445 public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event) {
446 logger.debug("{}: raw event offered from {}:{}: {}", this, commType, topic, event);
452 for (PolicyControllerFeatureAPI feature : getProviders()) {
454 if (feature.beforeOffer(this, commType, topic, event)) {
457 } catch (Exception e) {
458 logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
463 boolean success = this.droolsController.offer(topic, event);
465 for (PolicyControllerFeatureAPI feature : getProviders()) {
467 if (feature.afterOffer(this, commType, topic, event, success)) {
470 } catch (Exception e) {
471 logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
478 public <T> boolean offer(T event) {
479 logger.debug("{}: event offered: {}", this, event);
485 for (PolicyControllerFeatureAPI feature : getProviders()) {
487 if (feature.beforeOffer(this, event)) {
490 } catch (Exception e) {
491 logger.error("{}: feature {} before-offer failure because of {}", this, feature.getClass().getName(),
496 boolean success = this.droolsController.offer(event);
498 for (PolicyControllerFeatureAPI feature : getProviders()) {
500 if (feature.afterOffer(this, event, success)) {
503 } catch (Exception e) {
504 logger.error("{}: feature {} after-offer failure because of {}", this, feature.getClass().getName(),
512 private boolean skipOffer() {
513 return isLocked() || !isAlive();
520 public boolean deliver(Topic.CommInfrastructure commType, String topic, Object event) {
522 logger.debug("{}: deliver event to {}:{}: {}", this, commType, topic, event);
524 for (PolicyControllerFeatureAPI feature : getProviders()) {
526 if (feature.beforeDeliver(this, commType, topic, event)) {
529 } catch (Exception e) {
530 logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(),
535 if (topic == null || topic.isEmpty()) {
536 throw new IllegalArgumentException("Invalid Topic");
540 throw new IllegalArgumentException("Invalid Event");
543 if (!this.isAlive()) {
544 throw new IllegalStateException("Policy Engine is stopped");
547 if (this.isLocked()) {
548 throw new IllegalStateException("Policy Engine is locked");
551 if (!this.topic2Sinks.containsKey(topic)) {
552 logger.warn("{}: cannot deliver event because the sink {}:{} is not registered: {}", this, commType, topic,
554 throw new IllegalArgumentException("Unsupported topic " + topic + " for delivery");
557 boolean success = this.droolsController.deliver(this.topic2Sinks.get(topic), event);
559 for (PolicyControllerFeatureAPI feature : getProviders()) {
561 if (feature.afterDeliver(this, commType, topic, event, success)) {
564 } catch (Exception e) {
565 logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(),
577 public boolean isAlive() {
585 public boolean lock() {
586 logger.info("{}: lock", this);
588 for (PolicyControllerFeatureAPI feature : getProviders()) {
590 if (feature.beforeLock(this)) {
593 } catch (Exception e) {
594 logger.error("{}: feature {} before-lock failure because of {}", this, feature.getClass().getName(),
599 synchronized (this) {
607 // it does not affect associated sources/sinks, they are
608 // autonomous entities
610 boolean success = this.droolsController.lock();
612 for (PolicyControllerFeatureAPI feature : getProviders()) {
614 if (feature.afterLock(this)) {
617 } catch (Exception e) {
618 logger.error("{}: feature {} after-lock failure because of {}", this, feature.getClass().getName(),
630 public boolean unlock() {
632 logger.info("{}: unlock", this);
634 for (PolicyControllerFeatureAPI feature : getProviders()) {
636 if (feature.beforeUnlock(this)) {
639 } catch (Exception e) {
640 logger.error("{}: feature {} before-unlock failure because of {}", this, feature.getClass().getName(),
645 synchronized (this) {
653 boolean success = this.droolsController.unlock();
655 for (PolicyControllerFeatureAPI feature : getProviders()) {
657 if (feature.afterUnlock(this)) {
660 } catch (Exception e) {
661 logger.error("{}: feature {} after-unlock failure because of {}", this, feature.getClass().getName(),
673 public boolean isLocked() {
681 public List<? extends TopicSource> getTopicSources() {
689 public List<? extends TopicSink> getTopicSinks() {
697 public DroolsController getDrools() {
698 return this.droolsController;
708 public Properties getProperties() {
709 return this.properties;
713 public String toString() {
714 return "AggregatedPolicyController [name=" + name + ", alive=" + alive
715 + ", locked=" + locked + ", droolsController=" + droolsController + "]";
718 // the following methods may be overridden by junit tests
720 protected SystemPersistence getPersistenceManager() {
721 return SystemPersistence.manager;
724 protected TopicEndpoint getEndpointManager() {
725 return TopicEndpoint.manager;
728 protected DroolsControllerFactory getDroolsFactory() {
729 return DroolsController.factory;
732 protected List<PolicyControllerFeatureAPI> getProviders() {
733 return PolicyControllerFeatureAPI.providers.getList();