2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.controller.internal;
23 import com.fasterxml.jackson.annotation.JsonIgnore;
24 import com.fasterxml.jackson.annotation.JsonProperty;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.List;
30 import org.apache.commons.collections4.queue.CircularFifoQueue;
31 import org.drools.core.ClassObjectFilter;
32 import org.kie.api.definition.KiePackage;
33 import org.kie.api.definition.rule.Query;
34 import org.kie.api.runtime.KieSession;
35 import org.kie.api.runtime.rule.FactHandle;
36 import org.kie.api.runtime.rule.QueryResults;
37 import org.kie.api.runtime.rule.QueryResultsRow;
38 import org.onap.policy.common.endpoints.event.comm.TopicSink;
39 import org.onap.policy.drools.controller.DroolsController;
40 import org.onap.policy.drools.core.PolicyContainer;
41 import org.onap.policy.drools.core.PolicySession;
42 import org.onap.policy.drools.core.jmx.PdpJmx;
43 import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
44 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
45 import org.onap.policy.drools.protocol.coders.EventProtocolParams;
46 import org.onap.policy.drools.protocol.coders.JsonProtocolFilter;
47 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
48 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
49 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
50 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
51 import org.onap.policy.drools.utils.ReflectionUtil;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
56 * Maven-based Drools Controller that interacts with the
57 * policy-core PolicyContainer and PolicySession to manage
58 * Drools containers instantiated using Maven.
60 public class MavenDroolsController implements DroolsController {
65 private static Logger logger = LoggerFactory.getLogger(MavenDroolsController.class);
68 * Policy Container, the access object to the policy-core layer.
71 protected final PolicyContainer policyContainer;
74 * alive status of this drools controller,
75 * reflects invocation of start()/stop() only.
77 protected volatile boolean alive = false;
80 * locked status of this drools controller,
81 * reflects if i/o drools related operations are permitted,
82 * more specifically: offer() and deliver().
83 * It does not affect the ability to start and stop
84 * underlying drools infrastructure
86 protected volatile boolean locked = false;
89 * list of topics, each with associated decoder classes, each
90 * with a list of associated filters.
92 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
95 * list of topics, each with associated encoder classes, each
96 * with a list of associated filters.
98 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
101 * recent source events processed.
103 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<>(10);
106 * recent sink events processed.
108 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<>(10);
111 * original Drools Model/Rules classloader hash.
113 protected int modelClassLoaderHash;
116 * Expanded version of the constructor.
118 * @param groupId maven group id
119 * @param artifactId maven artifact id
120 * @param version maven version
121 * @param decoderConfigurations list of topic -> decoders -> filters mapping
122 * @param encoderConfigurations list of topic -> encoders -> filters mapping
124 * @throws IllegalArgumentException invalid arguments passed in
126 public MavenDroolsController(String groupId,
129 List<TopicCoderFilterConfiguration> decoderConfigurations,
130 List<TopicCoderFilterConfiguration> encoderConfigurations) {
132 logger.info("drools-controller instantiation [{}:{}:{}]", groupId, artifactId, version);
134 if (groupId == null || groupId.isEmpty()) {
135 throw new IllegalArgumentException("Missing maven group-id coordinate");
138 if (artifactId == null || artifactId.isEmpty()) {
139 throw new IllegalArgumentException("Missing maven artifact-id coordinate");
142 if (version == null || version.isEmpty()) {
143 throw new IllegalArgumentException("Missing maven version coordinate");
146 this.policyContainer = new PolicyContainer(groupId, artifactId, version);
147 this.init(decoderConfigurations, encoderConfigurations);
149 logger.debug("{}: instantiation completed ", this);
153 * init encoding/decoding configuration.
155 * @param decoderConfigurations list of topic -> decoders -> filters mapping
156 * @param encoderConfigurations list of topic -> encoders -> filters mapping
158 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
159 List<TopicCoderFilterConfiguration> encoderConfigurations) {
161 this.decoderConfigurations = decoderConfigurations;
162 this.encoderConfigurations = encoderConfigurations;
164 this.initCoders(decoderConfigurations, true);
165 this.initCoders(encoderConfigurations, false);
167 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
171 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
172 List<TopicCoderFilterConfiguration> decoderConfigurations,
173 List<TopicCoderFilterConfiguration> encoderConfigurations)
174 throws LinkageError {
176 logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion);
178 if (newGroupId == null || newGroupId.isEmpty()) {
179 throw new IllegalArgumentException("Missing maven group-id coordinate");
182 if (newArtifactId == null || newArtifactId.isEmpty()) {
183 throw new IllegalArgumentException("Missing maven artifact-id coordinate");
186 if (newVersion == null || newVersion.isEmpty()) {
187 throw new IllegalArgumentException("Missing maven version coordinate");
190 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID)
191 || newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID)
192 || newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
193 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
194 + newGroupId + ":" + newArtifactId + ":"
198 if (newGroupId.equalsIgnoreCase(this.getGroupId())
199 && newArtifactId.equalsIgnoreCase(this.getArtifactId())
200 && newVersion.equalsIgnoreCase(this.getVersion())) {
201 logger.warn("Al in the right version: " + newGroupId + ":"
202 + newArtifactId + ":" + newVersion + " vs. " + this);
206 if (!newGroupId.equalsIgnoreCase(this.getGroupId())
207 || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
208 throw new IllegalArgumentException(
209 "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
210 + newGroupId + ":" + newArtifactId + ":"
211 + newVersion + " vs. " + this);
215 String messages = this.policyContainer.updateToVersion(newVersion);
216 if (logger.isWarnEnabled()) {
217 logger.warn("{} UPGRADE results: {}", this, messages);
221 * If all sucessful (can load new container), now we can remove all coders from previous sessions
228 this.init(decoderConfigurations, encoderConfigurations);
230 if (logger.isInfoEnabled()) {
231 logger.info("UPDATE-TO-VERSION: completed " + this);
236 * initialize decoders for all the topics supported by this controller
237 * Note this is critical to be done after the Policy Container is
238 * instantiated to be able to fetch the corresponding classes.
240 * @param coderConfigurations list of topic -> decoders -> filters mapping
242 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
245 if (logger.isInfoEnabled()) {
246 logger.info("INIT-CODERS: " + this);
249 if (coderConfigurations == null) {
254 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
255 String topic = coderConfig.getTopic();
257 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
258 if (coderConfig.getCustomGsonCoder() != null
259 && coderConfig.getCustomGsonCoder().getClassContainer() != null
260 && !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
262 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
263 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
264 customGsonCoderClass)) {
265 throw makeRetrieveEx(customGsonCoderClass);
267 if (logger.isInfoEnabled()) {
268 logClassFetched(customGsonCoderClass);
273 CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
274 if (coderConfig.getCustomJacksonCoder() != null
275 && coderConfig.getCustomJacksonCoder().getClassContainer() != null
276 && !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
278 String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
279 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
280 customJacksonCoderClass)) {
281 throw makeRetrieveEx(customJacksonCoderClass);
283 if (logger.isInfoEnabled()) {
284 logClassFetched(customJacksonCoderClass);
289 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
290 if (coderFilters == null || coderFilters.isEmpty()) {
294 for (PotentialCoderFilter coderFilter : coderFilters) {
295 String potentialCodedClass = coderFilter.getCodedClass();
296 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
298 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
299 potentialCodedClass)) {
300 throw makeRetrieveEx(potentialCodedClass);
302 if (logger.isInfoEnabled()) {
303 logClassFetched(potentialCodedClass);
308 EventProtocolCoder.manager.addDecoder(EventProtocolParams.builder()
309 .groupId(this.getGroupId())
310 .artifactId(this.getArtifactId())
312 .eventClass(potentialCodedClass)
313 .protocolFilter(protocolFilter)
314 .customGsonCoder(customGsonCoder)
315 .customJacksonCoder(customJacksonCoder)
316 .modelClassLoaderHash(this.policyContainer.getClassLoader().hashCode()));
318 EventProtocolCoder.manager.addEncoder(
319 EventProtocolParams.builder().groupId(this.getGroupId())
320 .artifactId(this.getArtifactId()).topic(topic)
321 .eventClass(potentialCodedClass).protocolFilter(protocolFilter)
322 .customGsonCoder(customGsonCoder).customJacksonCoder(customJacksonCoder)
323 .modelClassLoaderHash(this.policyContainer.getClassLoader().hashCode()));
330 * Logs an error and makes an exception for an item that cannot be retrieved.
331 * @param itemName the item to retrieve
332 * @return a new exception
334 private IllegalArgumentException makeRetrieveEx(String itemName) {
335 logger.error("{} cannot be retrieved", itemName);
336 return new IllegalArgumentException(itemName + " cannot be retrieved");
340 * Logs the name of the class that was fetched.
341 * @param className class name fetched
343 private void logClassFetched(String className) {
344 logger.info("CLASS FETCHED {}", className);
351 protected void removeDecoders() {
352 if (logger.isInfoEnabled()) {
353 logger.info("REMOVE-DECODERS: {}", this);
356 if (this.decoderConfigurations == null) {
361 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
362 String topic = coderConfig.getTopic();
363 EventProtocolCoder.manager.removeDecoders(this.getGroupId(), this.getArtifactId(), topic);
370 protected void removeEncoders() {
372 if (logger.isInfoEnabled()) {
373 logger.info("REMOVE-ENCODERS: {}", this);
376 if (this.encoderConfigurations == null) {
380 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
381 String topic = coderConfig.getTopic();
382 EventProtocolCoder.manager.removeEncoders(this.getGroupId(), this.getArtifactId(), topic);
388 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) {
389 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
390 logger.error("{}{} cannot be retrieved. ", this, coderClass.getCanonicalName());
394 if (modelHash == this.modelClassLoaderHash) {
395 if (logger.isInfoEnabled()) {
396 logger.info(coderClass.getCanonicalName()
397 + this + " class loader matches original drools controller rules classloader "
398 + coderClass.getClassLoader());
402 if (logger.isWarnEnabled()) {
403 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match "
404 + coderClass.getClassLoader() + " vs "
405 + this.policyContainer.getClassLoader());
412 public boolean start() {
414 if (logger.isInfoEnabled()) {
415 logger.info("START: {}", this);
418 synchronized (this) {
425 return this.policyContainer.start();
429 public boolean stop() {
431 logger.info("STOP: {}", this);
433 synchronized (this) {
440 return this.policyContainer.stop();
444 public void shutdown() {
445 logger.info("{}: SHUTDOWN", this);
450 } catch (Exception e) {
451 logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e);
453 this.policyContainer.shutdown();
460 logger.info("{}: HALT", this);
465 } catch (Exception e) {
466 logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e);
468 this.policyContainer.destroy();
473 * removes this drools controllers and encoders and decoders from operation.
475 protected void removeCoders() {
476 logger.info("{}: REMOVE-CODERS", this);
479 this.removeDecoders();
480 } catch (IllegalArgumentException e) {
481 logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e);
485 this.removeEncoders();
486 } catch (IllegalArgumentException e) {
487 logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e);
492 public boolean isAlive() {
497 public boolean offer(String topic, String event) {
498 logger.debug("{}: OFFER: {} <- {}", this, topic, event);
507 // 0. Check if the policy container has any sessions
509 if (this.policyContainer.getPolicySessions().isEmpty()) {
514 // 1. Now, check if this topic has a decoder:
516 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
517 this.getArtifactId(),
520 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", this,
521 topic, this.getGroupId(), this.getArtifactId());
529 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
530 this.getArtifactId(),
533 } catch (UnsupportedOperationException uoe) {
534 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
535 event, uoe.getMessage(), uoe);
537 } catch (Exception e) {
538 logger.warn("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
539 event, e.getMessage(), e);
543 synchronized (this.recentSourceEvents) {
544 this.recentSourceEvents.add(anEvent);
547 // increment event count for Nagios monitoring
548 PdpJmx.getInstance().updateOccured();
552 if (logger.isInfoEnabled()) {
553 logger.info("{} BROADCAST-INJECT of {} FROM {} INTO {}",
554 this, event, topic, this.policyContainer.getName());
557 for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) {
559 if (feature.beforeInsert(this, anEvent)) {
562 } catch (Exception e) {
563 logger.error("{}: feature {} before-insert failure because of {}",
564 this, feature.getClass().getName(), e.getMessage(), e);
568 boolean successInject = this.policyContainer.insertAll(anEvent);
569 if (!successInject) {
570 logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames());
573 for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) {
575 if (feature.afterInsert(this, anEvent, successInject)) {
578 } catch (Exception e) {
579 logger.error("{}: feature {} after-insert failure because of {}",
580 this, feature.getClass().getName(), e.getMessage(), e);
588 public boolean deliver(TopicSink sink, Object event) {
590 if (logger.isInfoEnabled()) {
591 logger.info("{}DELIVER: {} FROM {} TO {}", this, event, this, sink);
595 throw new IllegalArgumentException(this + " invalid sink");
599 throw new IllegalArgumentException(this + " invalid event");
603 throw new IllegalStateException(this + " is locked");
607 throw new IllegalStateException(this + " is stopped");
611 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
613 synchronized (this.recentSinkEvents) {
614 this.recentSinkEvents.add(json);
617 return sink.send(json);
622 public String getVersion() {
623 return this.policyContainer.getVersion();
627 public String getArtifactId() {
628 return this.policyContainer.getArtifactId();
632 public String getGroupId() {
633 return this.policyContainer.getGroupId();
637 * Get model class loader hash.
639 * @return the modelClassLoaderHash
641 public int getModelClassLoaderHash() {
642 return modelClassLoaderHash;
646 public synchronized boolean lock() {
647 logger.info("LOCK: {}", this);
654 public synchronized boolean unlock() {
655 logger.info("UNLOCK: {}", this);
662 public boolean isLocked() {
668 public PolicyContainer getContainer() {
669 return this.policyContainer;
672 @JsonProperty("sessions")
674 public List<String> getSessionNames() {
675 return getSessionNames(true);
681 * @param abbreviated true for the short form, otherwise the long form
682 * @return session names
684 protected List<String> getSessionNames(boolean abbreviated) {
685 List<String> sessionNames = new ArrayList<>();
687 for (PolicySession session: this.policyContainer.getPolicySessions()) {
689 sessionNames.add(session.getName());
691 sessionNames.add(session.getFullName());
694 } catch (Exception e) {
695 logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e);
696 sessionNames.add(e.getMessage());
701 @JsonProperty("sessionCoordinates")
703 public List<String> getCanonicalSessionNames() {
704 return getSessionNames(false);
708 * provides the underlying core layer container sessions.
710 * @return the attached Policy Container
712 protected List<PolicySession> getSessions() {
713 List<PolicySession> sessions = new ArrayList<>();
714 sessions.addAll(this.policyContainer.getPolicySessions());
719 * provides the underlying core layer container session with name sessionName.
721 * @param sessionName session name
722 * @return the attached Policy Container
723 * @throws IllegalArgumentException when an invalid session name is provided
724 * @throws IllegalStateException when the drools controller is in an invalid state
726 protected PolicySession getSession(String sessionName) {
727 if (sessionName == null || sessionName.isEmpty()) {
728 throw new IllegalArgumentException("A Session Name must be provided");
731 List<PolicySession> sessions = this.getSessions();
732 for (PolicySession session : sessions) {
733 if (sessionName.equals(session.getName()) || sessionName.equals(session.getFullName())) {
738 throw invalidSessNameEx(sessionName);
741 private IllegalArgumentException invalidSessNameEx(String sessionName) {
742 return new IllegalArgumentException("Invalid Session Name: " + sessionName);
746 public Map<String,Integer> factClassNames(String sessionName) {
747 if (sessionName == null || sessionName.isEmpty()) {
748 throw invalidSessNameEx(sessionName);
751 Map<String,Integer> classNames = new HashMap<>();
753 PolicySession session = getSession(sessionName);
754 KieSession kieSession = session.getKieSession();
756 Collection<FactHandle> facts = session.getKieSession().getFactHandles();
757 for (FactHandle fact : facts) {
759 String className = kieSession.getObject(fact).getClass().getName();
760 if (classNames.containsKey(className)) {
761 classNames.put(className, classNames.get(className) + 1);
763 classNames.put(className, 1);
765 } catch (Exception e) {
766 logger.warn("Object cannot be retrieved from fact {}", fact, e);
774 public long factCount(String sessionName) {
775 if (sessionName == null || sessionName.isEmpty()) {
776 throw invalidSessNameEx(sessionName);
779 PolicySession session = getSession(sessionName);
780 return session.getKieSession().getFactCount();
784 public List<Object> facts(String sessionName, String className, boolean delete) {
785 if (sessionName == null || sessionName.isEmpty()) {
786 throw invalidSessNameEx(sessionName);
789 if (className == null || className.isEmpty()) {
790 throw new IllegalArgumentException("Invalid Class Name: " + className);
794 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
795 if (factClass == null) {
796 throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className);
799 PolicySession session = getSession(sessionName);
800 KieSession kieSession = session.getKieSession();
802 List<Object> factObjects = new ArrayList<>();
804 Collection<FactHandle> factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass));
805 for (FactHandle factHandle : factHandles) {
807 factObjects.add(kieSession.getObject(factHandle));
809 kieSession.delete(factHandle);
811 } catch (Exception e) {
812 logger.warn("Object cannot be retrieved from fact {}", factHandle, e);
820 public List<Object> factQuery(String sessionName, String queryName, String queriedEntity,
821 boolean delete, Object... queryParams) {
822 if (sessionName == null || sessionName.isEmpty()) {
823 throw invalidSessNameEx(sessionName);
826 if (queryName == null || queryName.isEmpty()) {
827 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
830 if (queriedEntity == null || queriedEntity.isEmpty()) {
831 throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity);
834 PolicySession session = getSession(sessionName);
835 KieSession kieSession = session.getKieSession();
837 boolean found = false;
838 for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
839 for (Query q : kiePackage.getQueries()) {
840 if (q.getName() != null && q.getName().equals(queryName)) {
847 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
850 List<Object> factObjects = new ArrayList<>();
852 QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams);
853 for (QueryResultsRow row : queryResults) {
855 factObjects.add(row.get(queriedEntity));
857 kieSession.delete(row.getFactHandle(queriedEntity));
859 } catch (Exception e) {
860 logger.warn("Object cannot be retrieved from row: {}", row, e);
868 public Class<?> fetchModelClass(String className) {
869 return ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
873 * Get recent source events.
875 * @return the recentSourceEvents
878 public Object[] getRecentSourceEvents() {
879 synchronized (this.recentSourceEvents) {
880 Object[] events = new Object[recentSourceEvents.size()];
881 return recentSourceEvents.toArray(events);
886 * Get recent sink events.
888 * @return the recentSinkEvents
891 public String[] getRecentSinkEvents() {
892 synchronized (this.recentSinkEvents) {
893 String[] events = new String[recentSinkEvents.size()];
894 return recentSinkEvents.toArray(events);
899 public boolean isBrained() {
905 public String toString() {
906 StringBuilder builder = new StringBuilder();
908 .append("MavenDroolsController [policyContainer=")
909 .append((policyContainer != null) ? policyContainer.getName() : "NULL")
914 .append(", modelClassLoaderHash=")
915 .append(modelClassLoaderHash)
917 return builder.toString();