2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2019 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.controller.internal;
23 import com.fasterxml.jackson.annotation.JsonIgnore;
24 import com.fasterxml.jackson.annotation.JsonProperty;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.List;
30 import org.apache.commons.collections4.queue.CircularFifoQueue;
31 import org.drools.core.ClassObjectFilter;
32 import org.kie.api.definition.KiePackage;
33 import org.kie.api.definition.rule.Query;
34 import org.kie.api.runtime.KieSession;
35 import org.kie.api.runtime.rule.FactHandle;
36 import org.kie.api.runtime.rule.QueryResults;
37 import org.kie.api.runtime.rule.QueryResultsRow;
38 import org.onap.policy.common.endpoints.event.comm.TopicSink;
39 import org.onap.policy.common.gson.annotation.GsonJsonIgnore;
40 import org.onap.policy.common.gson.annotation.GsonJsonProperty;
41 import org.onap.policy.drools.controller.DroolsController;
42 import org.onap.policy.drools.core.PolicyContainer;
43 import org.onap.policy.drools.core.PolicySession;
44 import org.onap.policy.drools.core.jmx.PdpJmx;
45 import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
46 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
47 import org.onap.policy.drools.protocol.coders.EventProtocolParams;
48 import org.onap.policy.drools.protocol.coders.JsonProtocolFilter;
49 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
50 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
51 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
52 import org.onap.policy.drools.utils.ReflectionUtil;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
57 * Maven-based Drools Controller that interacts with the
58 * policy-core PolicyContainer and PolicySession to manage
59 * Drools containers instantiated using Maven.
61 public class MavenDroolsController implements DroolsController {
66 private static Logger logger = LoggerFactory.getLogger(MavenDroolsController.class);
69 * Policy Container, the access object to the policy-core layer.
73 protected final PolicyContainer policyContainer;
76 * alive status of this drools controller,
77 * reflects invocation of start()/stop() only.
79 protected volatile boolean alive = false;
82 * locked status of this drools controller,
83 * reflects if i/o drools related operations are permitted,
84 * more specifically: offer() and deliver().
85 * It does not affect the ability to start and stop
86 * underlying drools infrastructure
88 protected volatile boolean locked = false;
91 * list of topics, each with associated decoder classes, each
92 * with a list of associated filters.
94 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
97 * list of topics, each with associated encoder classes, each
98 * with a list of associated filters.
100 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
103 * recent source events processed.
105 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<>(10);
108 * recent sink events processed.
110 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<>(10);
113 * original Drools Model/Rules classloader hash.
115 protected int modelClassLoaderHash;
118 * Expanded version of the constructor.
120 * @param groupId maven group id
121 * @param artifactId maven artifact id
122 * @param version maven version
123 * @param decoderConfigurations list of topic -> decoders -> filters mapping
124 * @param encoderConfigurations list of topic -> encoders -> filters mapping
126 * @throws IllegalArgumentException invalid arguments passed in
128 public MavenDroolsController(String groupId,
131 List<TopicCoderFilterConfiguration> decoderConfigurations,
132 List<TopicCoderFilterConfiguration> encoderConfigurations) {
134 logger.info("drools-controller instantiation [{}:{}:{}]", groupId, artifactId, version);
136 if (groupId == null || groupId.isEmpty()) {
137 throw new IllegalArgumentException("Missing maven group-id coordinate");
140 if (artifactId == null || artifactId.isEmpty()) {
141 throw new IllegalArgumentException("Missing maven artifact-id coordinate");
144 if (version == null || version.isEmpty()) {
145 throw new IllegalArgumentException("Missing maven version coordinate");
148 this.policyContainer = new PolicyContainer(groupId, artifactId, version);
149 this.init(decoderConfigurations, encoderConfigurations);
151 logger.debug("{}: instantiation completed ", this);
155 * init encoding/decoding configuration.
157 * @param decoderConfigurations list of topic -> decoders -> filters mapping
158 * @param encoderConfigurations list of topic -> encoders -> filters mapping
160 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
161 List<TopicCoderFilterConfiguration> encoderConfigurations) {
163 this.decoderConfigurations = decoderConfigurations;
164 this.encoderConfigurations = encoderConfigurations;
166 this.initCoders(decoderConfigurations, true);
167 this.initCoders(encoderConfigurations, false);
169 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
173 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
174 List<TopicCoderFilterConfiguration> decoderConfigurations,
175 List<TopicCoderFilterConfiguration> encoderConfigurations)
176 throws LinkageError {
178 logger.info("updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion);
180 if (newGroupId == null || newGroupId.isEmpty()) {
181 throw new IllegalArgumentException("Missing maven group-id coordinate");
184 if (newArtifactId == null || newArtifactId.isEmpty()) {
185 throw new IllegalArgumentException("Missing maven artifact-id coordinate");
188 if (newVersion == null || newVersion.isEmpty()) {
189 throw new IllegalArgumentException("Missing maven version coordinate");
192 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID)
193 || newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID)
194 || newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
195 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: "
196 + newGroupId + ":" + newArtifactId + ":"
200 if (newGroupId.equalsIgnoreCase(this.getGroupId())
201 && newArtifactId.equalsIgnoreCase(this.getArtifactId())
202 && newVersion.equalsIgnoreCase(this.getVersion())) {
203 logger.warn("Al in the right version: " + newGroupId + ":"
204 + newArtifactId + ":" + newVersion + " vs. " + this);
208 if (!newGroupId.equalsIgnoreCase(this.getGroupId())
209 || !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
210 throw new IllegalArgumentException(
211 "Group ID and Artifact ID maven coordinates must be identical for the upgrade: "
212 + newGroupId + ":" + newArtifactId + ":"
213 + newVersion + " vs. " + this);
217 String messages = this.policyContainer.updateToVersion(newVersion);
218 if (logger.isWarnEnabled()) {
219 logger.warn("{} UPGRADE results: {}", this, messages);
223 * If all sucessful (can load new container), now we can remove all coders from previous sessions
230 this.init(decoderConfigurations, encoderConfigurations);
232 if (logger.isInfoEnabled()) {
233 logger.info("UPDATE-TO-VERSION: completed " + this);
238 * initialize decoders for all the topics supported by this controller
239 * Note this is critical to be done after the Policy Container is
240 * instantiated to be able to fetch the corresponding classes.
242 * @param coderConfigurations list of topic -> decoders -> filters mapping
244 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
247 if (logger.isInfoEnabled()) {
248 logger.info("INIT-CODERS: " + this);
251 if (coderConfigurations == null) {
256 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
257 String topic = coderConfig.getTopic();
259 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
260 if (coderConfig.getCustomGsonCoder() != null
261 && coderConfig.getCustomGsonCoder().getClassContainer() != null
262 && !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
264 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
265 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
266 customGsonCoderClass)) {
267 throw makeRetrieveEx(customGsonCoderClass);
269 if (logger.isInfoEnabled()) {
270 logClassFetched(customGsonCoderClass);
275 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
276 if (coderFilters == null || coderFilters.isEmpty()) {
280 for (PotentialCoderFilter coderFilter : coderFilters) {
281 String potentialCodedClass = coderFilter.getCodedClass();
282 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
284 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
285 potentialCodedClass)) {
286 throw makeRetrieveEx(potentialCodedClass);
288 if (logger.isInfoEnabled()) {
289 logClassFetched(potentialCodedClass);
294 EventProtocolCoder.manager.addDecoder(EventProtocolParams.builder()
295 .groupId(this.getGroupId())
296 .artifactId(this.getArtifactId())
298 .eventClass(potentialCodedClass)
299 .protocolFilter(protocolFilter)
300 .customGsonCoder(customGsonCoder)
301 .modelClassLoaderHash(this.policyContainer.getClassLoader().hashCode()));
303 EventProtocolCoder.manager.addEncoder(
304 EventProtocolParams.builder().groupId(this.getGroupId())
305 .artifactId(this.getArtifactId()).topic(topic)
306 .eventClass(potentialCodedClass).protocolFilter(protocolFilter)
307 .customGsonCoder(customGsonCoder)
308 .modelClassLoaderHash(this.policyContainer.getClassLoader().hashCode()));
315 * Logs an error and makes an exception for an item that cannot be retrieved.
316 * @param itemName the item to retrieve
317 * @return a new exception
319 private IllegalArgumentException makeRetrieveEx(String itemName) {
320 logger.error("{} cannot be retrieved", itemName);
321 return new IllegalArgumentException(itemName + " cannot be retrieved");
325 * Logs the name of the class that was fetched.
326 * @param className class name fetched
328 private void logClassFetched(String className) {
329 logger.info("CLASS FETCHED {}", className);
336 protected void removeDecoders() {
337 if (logger.isInfoEnabled()) {
338 logger.info("REMOVE-DECODERS: {}", this);
341 if (this.decoderConfigurations == null) {
346 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
347 String topic = coderConfig.getTopic();
348 EventProtocolCoder.manager.removeDecoders(this.getGroupId(), this.getArtifactId(), topic);
355 protected void removeEncoders() {
357 if (logger.isInfoEnabled()) {
358 logger.info("REMOVE-ENCODERS: {}", this);
361 if (this.encoderConfigurations == null) {
365 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
366 String topic = coderConfig.getTopic();
367 EventProtocolCoder.manager.removeEncoders(this.getGroupId(), this.getArtifactId(), topic);
373 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) {
374 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
375 logger.error("{}{} cannot be retrieved. ", this, coderClass.getCanonicalName());
379 if (modelHash == this.modelClassLoaderHash) {
380 if (logger.isInfoEnabled()) {
381 logger.info(coderClass.getCanonicalName()
382 + this + " class loader matches original drools controller rules classloader "
383 + coderClass.getClassLoader());
387 if (logger.isWarnEnabled()) {
388 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match "
389 + coderClass.getClassLoader() + " vs "
390 + this.policyContainer.getClassLoader());
397 public boolean start() {
399 if (logger.isInfoEnabled()) {
400 logger.info("START: {}", this);
403 synchronized (this) {
410 return this.policyContainer.start();
414 public boolean stop() {
416 logger.info("STOP: {}", this);
418 synchronized (this) {
425 return this.policyContainer.stop();
429 public void shutdown() {
430 logger.info("{}: SHUTDOWN", this);
435 } catch (Exception e) {
436 logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e);
438 this.policyContainer.shutdown();
445 logger.info("{}: HALT", this);
450 } catch (Exception e) {
451 logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e);
453 this.policyContainer.destroy();
458 * removes this drools controllers and encoders and decoders from operation.
460 protected void removeCoders() {
461 logger.info("{}: REMOVE-CODERS", this);
464 this.removeDecoders();
465 } catch (IllegalArgumentException e) {
466 logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e);
470 this.removeEncoders();
471 } catch (IllegalArgumentException e) {
472 logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e);
477 public boolean isAlive() {
482 public boolean offer(String topic, String event) {
483 logger.debug("{}: OFFER raw event from {}", this, topic);
485 if (this.locked || !this.alive || this.policyContainer.getPolicySessions().isEmpty()) {
489 // 1. Now, check if this topic has a decoder:
491 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
492 this.getArtifactId(),
495 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", this,
496 topic, this.getGroupId(), this.getArtifactId());
504 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
505 this.getArtifactId(),
508 } catch (UnsupportedOperationException uoe) {
509 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
510 event, uoe.getMessage(), uoe);
512 } catch (Exception e) {
513 logger.warn("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
514 event, e.getMessage(), e);
518 return offer(anEvent);
523 public <T> boolean offer(T event) {
524 logger.debug("{}: OFFER event", this);
526 if (this.locked || !this.alive || this.policyContainer.getPolicySessions().isEmpty()) {
530 synchronized (this.recentSourceEvents) {
531 this.recentSourceEvents.add(event);
534 PdpJmx.getInstance().updateOccured();
538 for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) {
540 if (feature.beforeInsert(this, event)) {
543 } catch (Exception e) {
544 logger.error("{}: feature {} before-insert failure because of {}",
545 this, feature.getClass().getName(), e.getMessage(), e);
549 boolean successInject = this.policyContainer.insertAll(event);
550 if (!successInject) {
551 logger.warn(this + "Failed to inject into PolicyContainer {}", this.getSessionNames());
554 for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) {
556 if (feature.afterInsert(this, event, successInject)) {
559 } catch (Exception e) {
560 logger.error("{}: feature {} after-insert failure because of {}",
561 this, feature.getClass().getName(), e.getMessage(), e);
570 public boolean deliver(TopicSink sink, Object event) {
572 if (logger.isInfoEnabled()) {
573 logger.info("{}DELIVER: {} FROM {} TO {}", this, event, this, sink);
576 for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) {
578 if (feature.beforeDeliver(this, sink, event)) {
582 catch (Exception e) {
583 logger.error("{}: feature {} before-deliver failure because of {}", this, feature.getClass().getName(),
589 throw new IllegalArgumentException(this + " invalid sink");
593 throw new IllegalArgumentException(this + " invalid event");
597 throw new IllegalStateException(this + " is locked");
601 throw new IllegalStateException(this + " is stopped");
605 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
607 synchronized (this.recentSinkEvents) {
608 this.recentSinkEvents.add(json);
611 boolean success = sink.send(json);
613 for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) {
615 if (feature.afterDeliver(this, sink, event, json, success)) {
619 catch (Exception e) {
620 logger.error("{}: feature {} after-deliver failure because of {}", this, feature.getClass().getName(),
630 public String getVersion() {
631 return this.policyContainer.getVersion();
635 public String getArtifactId() {
636 return this.policyContainer.getArtifactId();
640 public String getGroupId() {
641 return this.policyContainer.getGroupId();
645 * Get model class loader hash.
647 * @return the modelClassLoaderHash
649 public int getModelClassLoaderHash() {
650 return modelClassLoaderHash;
654 public synchronized boolean lock() {
655 logger.info("LOCK: {}", this);
662 public synchronized boolean unlock() {
663 logger.info("UNLOCK: {}", this);
670 public boolean isLocked() {
677 public PolicyContainer getContainer() {
678 return this.policyContainer;
681 @JsonProperty("sessions")
682 @GsonJsonProperty("sessions")
684 public List<String> getSessionNames() {
685 return getSessionNames(true);
691 * @param abbreviated true for the short form, otherwise the long form
692 * @return session names
694 protected List<String> getSessionNames(boolean abbreviated) {
695 List<String> sessionNames = new ArrayList<>();
697 for (PolicySession session: this.policyContainer.getPolicySessions()) {
699 sessionNames.add(session.getName());
701 sessionNames.add(session.getFullName());
704 } catch (Exception e) {
705 logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e);
706 sessionNames.add(e.getMessage());
711 @JsonProperty("sessionCoordinates")
712 @GsonJsonProperty("sessionCoordinates")
714 public List<String> getCanonicalSessionNames() {
715 return getSessionNames(false);
719 public List<String> getBaseDomainNames() {
720 return new ArrayList<>(this.policyContainer.getKieContainer().getKieBaseNames());
724 * provides the underlying core layer container sessions.
726 * @return the attached Policy Container
728 protected List<PolicySession> getSessions() {
729 List<PolicySession> sessions = new ArrayList<>();
730 sessions.addAll(this.policyContainer.getPolicySessions());
735 * provides the underlying core layer container session with name sessionName.
737 * @param sessionName session name
738 * @return the attached Policy Container
739 * @throws IllegalArgumentException when an invalid session name is provided
740 * @throws IllegalStateException when the drools controller is in an invalid state
742 protected PolicySession getSession(String sessionName) {
743 if (sessionName == null || sessionName.isEmpty()) {
744 throw new IllegalArgumentException("A Session Name must be provided");
747 List<PolicySession> sessions = this.getSessions();
748 for (PolicySession session : sessions) {
749 if (sessionName.equals(session.getName()) || sessionName.equals(session.getFullName())) {
754 throw invalidSessNameEx(sessionName);
757 private IllegalArgumentException invalidSessNameEx(String sessionName) {
758 return new IllegalArgumentException("Invalid Session Name: " + sessionName);
762 public Map<String,Integer> factClassNames(String sessionName) {
763 if (sessionName == null || sessionName.isEmpty()) {
764 throw invalidSessNameEx(sessionName);
767 Map<String,Integer> classNames = new HashMap<>();
769 PolicySession session = getSession(sessionName);
770 KieSession kieSession = session.getKieSession();
772 Collection<FactHandle> facts = session.getKieSession().getFactHandles();
773 for (FactHandle fact : facts) {
775 String className = kieSession.getObject(fact).getClass().getName();
776 if (classNames.containsKey(className)) {
777 classNames.put(className, classNames.get(className) + 1);
779 classNames.put(className, 1);
781 } catch (Exception e) {
782 logger.warn("Object cannot be retrieved from fact {}", fact, e);
790 public long factCount(String sessionName) {
791 if (sessionName == null || sessionName.isEmpty()) {
792 throw invalidSessNameEx(sessionName);
795 PolicySession session = getSession(sessionName);
796 return session.getKieSession().getFactCount();
800 public List<Object> facts(String sessionName, String className, boolean delete) {
801 if (sessionName == null || sessionName.isEmpty()) {
802 throw invalidSessNameEx(sessionName);
805 if (className == null || className.isEmpty()) {
806 throw new IllegalArgumentException("Invalid Class Name: " + className);
810 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
811 if (factClass == null) {
812 throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className);
815 PolicySession session = getSession(sessionName);
816 KieSession kieSession = session.getKieSession();
818 List<Object> factObjects = new ArrayList<>();
820 Collection<FactHandle> factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass));
821 for (FactHandle factHandle : factHandles) {
823 factObjects.add(kieSession.getObject(factHandle));
825 kieSession.delete(factHandle);
827 } catch (Exception e) {
828 logger.warn("Object cannot be retrieved from fact {}", factHandle, e);
836 public List<Object> factQuery(String sessionName, String queryName, String queriedEntity,
837 boolean delete, Object... queryParams) {
838 if (sessionName == null || sessionName.isEmpty()) {
839 throw invalidSessNameEx(sessionName);
842 if (queryName == null || queryName.isEmpty()) {
843 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
846 if (queriedEntity == null || queriedEntity.isEmpty()) {
847 throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity);
850 PolicySession session = getSession(sessionName);
851 KieSession kieSession = session.getKieSession();
853 boolean found = false;
854 for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
855 for (Query q : kiePackage.getQueries()) {
856 if (q.getName() != null && q.getName().equals(queryName)) {
863 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
866 List<Object> factObjects = new ArrayList<>();
868 QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams);
869 for (QueryResultsRow row : queryResults) {
871 factObjects.add(row.get(queriedEntity));
873 kieSession.delete(row.getFactHandle(queriedEntity));
875 } catch (Exception e) {
876 logger.warn("Object cannot be retrieved from row: {}", row, e);
884 public Class<?> fetchModelClass(String className) {
885 return ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
889 * Get recent source events.
891 * @return the recentSourceEvents
894 public Object[] getRecentSourceEvents() {
895 synchronized (this.recentSourceEvents) {
896 Object[] events = new Object[recentSourceEvents.size()];
897 return recentSourceEvents.toArray(events);
902 * Get recent sink events.
904 * @return the recentSinkEvents
907 public String[] getRecentSinkEvents() {
908 synchronized (this.recentSinkEvents) {
909 String[] events = new String[recentSinkEvents.size()];
910 return recentSinkEvents.toArray(events);
915 public boolean isBrained() {
921 public String toString() {
922 StringBuilder builder = new StringBuilder();
924 .append("MavenDroolsController [policyContainer=")
925 .append((policyContainer != null) ? policyContainer.getName() : "NULL")
930 .append(", modelClassLoaderHash=")
931 .append(modelClassLoaderHash)
933 return builder.toString();