2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.extension.system;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collections;
26 import java.util.HashMap;
27 import java.util.List;
29 import java.util.Properties;
30 import org.apache.commons.collections4.queue.CircularFifoQueue;
31 import org.checkerframework.checker.nullness.qual.NonNull;
32 import org.onap.policy.common.endpoints.event.comm.Topic;
33 import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
34 import org.onap.policy.common.endpoints.event.comm.TopicSink;
35 import org.onap.policy.common.endpoints.event.comm.TopicSource;
36 import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
37 import org.onap.policy.common.utils.services.OrderedServiceImpl;
38 import org.onap.policy.drools.controller.DroolsController;
39 import org.onap.policy.drools.core.PolicyContainer;
40 import org.onap.policy.drools.features.DroolsControllerFeatureApi;
41 import org.onap.policy.drools.features.DroolsControllerFeatureApiConstants;
42 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
43 import org.onap.policy.drools.protocol.coders.EventProtocolCoderConstants;
44 import org.onap.policy.drools.protocol.coders.EventProtocolParams;
45 import org.onap.policy.drools.protocol.coders.JsonProtocolFilter;
46 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
47 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
48 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
49 import org.onap.policy.drools.system.internal.AggregatedPolicyController;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
54 * This class combines the 'PolicyController' and 'DroolsController'
55 * interfaces, and provides a controller that does not have Drools running
56 * underneath. It also contains some code copied from 'MavenDroolsController'
57 * and 'NullDroolsController'. The goal is to have it look like other
58 * controllers, use the same style property file, and provide access to
59 * UEB/DMAAP message streams associated with the controller.
61 public class NonDroolsPolicyController extends AggregatedPolicyController implements DroolsController {
65 private static final Logger logger = LoggerFactory.getLogger(NonDroolsPolicyController.class);
68 * The PolicyController and DroolsController factories assume that the
69 * controllers are separate objects, but in this case, the same object
70 * is used for both. We want the DroolsController 'build' method to
71 * return the same object; however, at the point the DroolsController
72 * build is taking place, the PolicyController hasn't yet been placed
73 * in any tables. The following variable is used to pass this information
74 * from one stack frame to another within the same thread.
76 private static ThreadLocal<NonDroolsPolicyController> buildInProgress = new ThreadLocal<>();
79 * alive status of this drools controller,
80 * reflects invocation of start()/stop() only.
82 protected volatile boolean alive = false;
85 * locked status of this drools controller,
86 * reflects if i/o drools related operations are permitted,
87 * more specifically: offer() and deliver().
88 * It does not affect the ability to start and stop
89 * underlying drools infrastructure
91 protected volatile boolean locked = false;
94 * list of topics, each with associated decoder classes, each
95 * with a list of associated filters.
97 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
100 * list of topics, each with associated encoder classes, each
101 * with a list of associated filters.
103 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
106 * recent sink events processed.
108 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<>(10);
110 // this is used to avoid infinite recursion in a shutdown or halt operation
111 private boolean shutdownInProgress = false;
113 private static Properties convert(String name, Properties properties) {
115 Properties newProperties = new Properties();
116 for (String pname : properties.stringPropertyNames()) {
117 newProperties.setProperty(pname, properties.getProperty(pname));
120 newProperties.setProperty("rules.groupId", "NonDroolsPolicyController");
121 newProperties.setProperty("rules.artifactId", name);
122 newProperties.setProperty("rules.version", "1.0");
123 return newProperties;
127 * constructor -- pass parameters to superclass.
128 * @param name controller name
129 * @param properties contents of controller properties file
131 public NonDroolsPolicyController(String name, Properties properties) {
132 super(name, convert(name, properties));
136 * This is used to pass the 'NonDroolsPolicyController' object to the
137 * 'DroolsPolicyBuilder' object, as the same object is used for both
138 * 'PolicyController' and 'DroolsController'.
140 * @return the NonDroolsPolicyController object ('null' if not available)
142 public static NonDroolsPolicyController getBuildInProgress() {
143 return buildInProgress.get();
146 protected void initDrools(Properties properties) {
148 // Register with drools factory
149 buildInProgress.set(this);
150 this.droolsController.set(getDroolsFactory().build(properties, sources, sinks));
151 buildInProgress.set(null);
152 } catch (Exception | LinkageError e) {
153 logger.error("{}: cannot init-drools", this);
154 throw new IllegalArgumentException(e);
157 decoderConfigurations = codersAndFilters(properties, sources);
158 encoderConfigurations = codersAndFilters(properties, sinks);
160 // add to 'EventProtocolCoderConstants.getManager()' table
161 for (TopicCoderFilterConfiguration tcfc : decoderConfigurations) {
162 for (PotentialCoderFilter pcf : tcfc.getCoderFilters()) {
163 getCoderManager().addDecoder(
164 EventProtocolParams.builder()
165 .groupId(getGroupId())
166 .artifactId(getArtifactId())
167 .topic(tcfc.getTopic())
168 .eventClass(pcf.getCodedClass())
169 .protocolFilter(pcf.getFilter())
170 .customGsonCoder(tcfc.getCustomGsonCoder())
171 .modelClassLoaderHash(NonDroolsPolicyController.class.getClassLoader().hashCode()));
174 for (TopicCoderFilterConfiguration tcfc : encoderConfigurations) {
175 for (PotentialCoderFilter pcf : tcfc.getCoderFilters()) {
176 getCoderManager().addEncoder(
177 EventProtocolParams.builder()
178 .groupId(getGroupId())
179 .artifactId(getArtifactId())
180 .topic(tcfc.getTopic())
181 .eventClass(pcf.getCodedClass())
182 .protocolFilter(pcf.getFilter())
183 .customGsonCoder(tcfc.getCustomGsonCoder())
184 .modelClassLoaderHash(NonDroolsPolicyController.class.getClassLoader().hashCode()));
189 /*==============================*/
190 /* 'DroolsController' interface */
191 /*==============================*/
193 // methods copied from 'MavenDroolsController' and 'NullDroolsController'
196 public boolean start() {
198 logger.info("START: {}", this);
200 synchronized (this) {
211 public boolean stop() {
213 logger.info("STOP: {}", this);
215 synchronized (this) {
226 public void shutdown() {
227 if (shutdownInProgress) {
228 // avoid infinite recursion
231 logger.info("{}: SHUTDOWN", this);
236 shutdownInProgress = true;
238 // the following method calls 'this.shutdown' recursively
239 getDroolsFactory().shutdown(this);
240 } catch (Exception e) {
241 logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e);
243 shutdownInProgress = false;
249 if (shutdownInProgress) {
250 // avoid infinite recursion
253 logger.info("{}: HALT", this);
258 shutdownInProgress = true;
260 // the following method calls 'this.halt' recursively
261 getDroolsFactory().destroy(this);
262 } catch (Exception e) {
263 logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e);
265 shutdownInProgress = false;
270 public boolean isAlive() {
275 public boolean lock() {
276 logger.info("LOCK: {}", this);
283 public boolean unlock() {
284 logger.info("UNLOCK: {}", this);
291 public boolean isLocked() {
296 public String getGroupId() {
297 return "NonDroolsPolicyController";
301 public String getArtifactId() {
306 public String getVersion() {
311 public List<String> getSessionNames() {
312 return new ArrayList<>();
316 public List<String> getCanonicalSessionNames() {
317 return new ArrayList<>();
321 public List<String> getBaseDomainNames() {
322 return Collections.emptyList();
326 public boolean offer(String topic, String event) {
331 public <T> boolean offer(T event) {
336 public boolean deliver(TopicSink sink, Object event) {
338 // this one is from 'MavenDroolsController'
340 logger.info("{} DELIVER: {} FROM {} TO {}", this, event, this, sink);
342 for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
344 if (feature.beforeDeliver(this, sink, event)) {
347 } catch (Exception e) {
348 logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(),
354 throw new IllegalArgumentException(this + " invalid sink");
358 throw new IllegalArgumentException(this + " invalid event");
362 throw new IllegalStateException(this + " is locked");
366 throw new IllegalStateException(this + " is stopped");
370 getCoderManager().encode(sink.getTopic(), event, this);
372 synchronized (this.recentSinkEvents) {
373 this.recentSinkEvents.add(json);
376 boolean success = sink.send(json);
378 for (DroolsControllerFeatureApi feature : getDroolsProviders().getList()) {
380 if (feature.afterDeliver(this, sink, event, json, success)) {
383 } catch (Exception e) {
384 logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(),
394 public Object[] getRecentSourceEvents() {
395 return new String[0];
399 public PolicyContainer getContainer() {
404 public String[] getRecentSinkEvents() {
405 synchronized (this.recentSinkEvents) {
406 String[] events = new String[recentSinkEvents.size()];
407 return recentSinkEvents.toArray(events);
412 public boolean ownsCoder(Class<?> coderClass, int modelHash) {
413 //throw new IllegalStateException(makeInvokeMsg());
418 public Class<?> fetchModelClass(String className) {
420 return Class.forName(className);
421 } catch (ClassNotFoundException e) {
422 throw new IllegalArgumentException(makeInvokeMsg());
427 public boolean isBrained() {
432 public String toString() {
433 StringBuilder builder = new StringBuilder();
434 builder.append("NonDroolsPolicyController []");
435 return builder.toString();
439 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
440 List<TopicCoderFilterConfiguration> decoderConfigurations,
441 List<TopicCoderFilterConfiguration> encoderConfigurations)
442 throws LinkageError {
443 throw new IllegalStateException(makeInvokeMsg());
447 public Map<String, Integer> factClassNames(String sessionName) {
448 return new HashMap<>();
452 public long factCount(String sessionName) {
457 public List<Object> facts(String sessionName, String className, boolean delete) {
458 return new ArrayList<>();
462 public <T> List<T> facts(@NonNull String sessionName, @NonNull Class<T> clazz) {
463 return new ArrayList<>();
467 public List<Object> factQuery(String sessionName, String queryName,
468 String queriedEntity,
469 boolean delete, Object... queryParams) {
470 return new ArrayList<>();
474 public <T> boolean delete(@NonNull String sessionName, @NonNull T fact) {
479 public <T> boolean delete(@NonNull T fact) {
484 public <T> boolean delete(@NonNull String sessionName, @NonNull Class<T> fact) {
489 public <T> boolean delete(@NonNull Class<T> fact) {
493 private String makeInvokeMsg() {
494 return this.getClass().getName() + " invoked";
500 protected void removeDecoders() {
501 logger.info("REMOVE-DECODERS: {}", this);
503 if (this.decoderConfigurations == null) {
508 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
509 String topic = coderConfig.getTopic();
510 getCoderManager().removeDecoders(this.getGroupId(), this.getArtifactId(), topic);
517 protected void removeEncoders() {
519 logger.info("REMOVE-ENCODERS: {}", this);
521 if (this.encoderConfigurations == null) {
525 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
526 String topic = coderConfig.getTopic();
527 getCoderManager().removeEncoders(this.getGroupId(), this.getArtifactId(), topic);
532 * removes this drools controllers and encoders and decoders from operation.
534 protected void removeCoders() {
535 logger.info("{}: REMOVE-CODERS", this);
538 this.removeDecoders();
539 } catch (IllegalArgumentException e) {
540 logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e);
544 this.removeEncoders();
545 } catch (IllegalArgumentException e) {
546 logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e);
550 protected List<TopicCoderFilterConfiguration> codersAndFilters(Properties properties,
551 List<? extends Topic> topicEntities) {
553 List<TopicCoderFilterConfiguration> topics2DecodedClasses2Filters = new ArrayList<>();
555 if (topicEntities == null || topicEntities.isEmpty()) {
556 return topics2DecodedClasses2Filters;
559 for (Topic topic : topicEntities) {
561 // 1. first the topic
563 String firstTopic = topic.getTopic();
565 String propertyTopicEntityPrefix = getPropertyTopicPrefix(topic) + firstTopic;
567 // 2. check if there is a custom decoder for this topic that the user prefers to use
568 // instead of the ones provided in the platform
570 CustomGsonCoder customGsonCoder = getCustomCoder(properties, propertyTopicEntityPrefix);
572 // 3. second the list of classes associated with each topic
574 String eventClasses = properties
575 .getProperty(propertyTopicEntityPrefix + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX);
577 if (eventClasses == null || eventClasses.isEmpty()) {
578 logger.warn("There are no event classes for topic {}", firstTopic);
582 List<PotentialCoderFilter> classes2Filters =
583 getFilterExpressions(properties, propertyTopicEntityPrefix, eventClasses);
585 TopicCoderFilterConfiguration topic2Classes2Filters =
586 new TopicCoderFilterConfiguration(firstTopic, classes2Filters, customGsonCoder);
587 topics2DecodedClasses2Filters.add(topic2Classes2Filters);
590 return topics2DecodedClasses2Filters;
593 private String getPropertyTopicPrefix(Topic topic) {
594 boolean isSource = topic instanceof TopicSource;
595 CommInfrastructure commInfra = topic.getTopicCommInfrastructure();
596 if (commInfra == CommInfrastructure.UEB) {
598 return PolicyEndPointProperties.PROPERTY_UEB_SOURCE_TOPICS + ".";
600 return PolicyEndPointProperties.PROPERTY_UEB_SINK_TOPICS + ".";
602 } else if (commInfra == CommInfrastructure.DMAAP) {
604 return PolicyEndPointProperties.PROPERTY_DMAAP_SOURCE_TOPICS + ".";
606 return PolicyEndPointProperties.PROPERTY_DMAAP_SINK_TOPICS + ".";
608 } else if (commInfra == CommInfrastructure.NOOP) {
610 return PolicyEndPointProperties.PROPERTY_NOOP_SOURCE_TOPICS + ".";
612 return PolicyEndPointProperties.PROPERTY_NOOP_SINK_TOPICS + ".";
615 throw new IllegalArgumentException("Invalid Communication Infrastructure: " + commInfra);
619 private CustomGsonCoder getCustomCoder(Properties properties, String propertyPrefix) {
620 String customGson = properties.getProperty(propertyPrefix
621 + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_CUSTOM_MODEL_CODER_GSON_SUFFIX);
623 CustomGsonCoder customGsonCoder = null;
624 if (customGson != null && !customGson.isEmpty()) {
626 customGsonCoder = new CustomGsonCoder(customGson);
627 } catch (IllegalArgumentException e) {
628 logger.warn("{}: cannot create custom-gson-coder {} because of {}", this, customGson,
632 return customGsonCoder;
635 private List<PotentialCoderFilter> getFilterExpressions(Properties properties, String propertyPrefix,
636 String eventClasses) {
638 List<PotentialCoderFilter> classes2Filters = new ArrayList<>();
640 List<String> topicClasses = new ArrayList<>(Arrays.asList(eventClasses.split("\\s*,\\s*")));
642 for (String theClass : topicClasses) {
644 // 4. for each coder class, get the filter expression
646 String filter = properties
647 .getProperty(propertyPrefix
648 + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_SUFFIX
649 + "." + theClass + PolicyEndPointProperties.PROPERTY_TOPIC_EVENTS_FILTER_SUFFIX);
651 JsonProtocolFilter protocolFilter = new JsonProtocolFilter(filter);
652 PotentialCoderFilter class2Filters = new PotentialCoderFilter(theClass, protocolFilter);
653 classes2Filters.add(class2Filters);
656 return classes2Filters;
659 // these may be overridden by junit tests
661 protected EventProtocolCoder getCoderManager() {
662 return EventProtocolCoderConstants.getManager();
665 protected OrderedServiceImpl<DroolsControllerFeatureApi> getDroolsProviders() {
666 return DroolsControllerFeatureApiConstants.getProviders();