2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.controller.internal;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
29 import org.apache.commons.collections4.queue.CircularFifoQueue;
30 import org.drools.core.ClassObjectFilter;
31 import org.kie.api.definition.KiePackage;
32 import org.kie.api.definition.rule.Query;
33 import org.kie.api.runtime.KieSession;
34 import org.kie.api.runtime.rule.FactHandle;
35 import org.kie.api.runtime.rule.QueryResults;
36 import org.kie.api.runtime.rule.QueryResultsRow;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.onap.policy.drools.controller.DroolsController;
40 import org.onap.policy.drools.core.PolicyContainer;
41 import org.onap.policy.drools.core.PolicySession;
42 import org.onap.policy.drools.core.jmx.PdpJmx;
43 import org.onap.policy.drools.event.comm.TopicSink;
44 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
45 import org.onap.policy.drools.protocol.coders.JsonProtocolFilter;
46 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
47 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
48 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
49 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
50 import org.onap.policy.drools.utils.ReflectionUtil;
52 import com.fasterxml.jackson.annotation.JsonIgnore;
53 import com.fasterxml.jackson.annotation.JsonProperty;
56 * Maven-based Drools Controller that interacts with the
57 * policy-core PolicyContainer and PolicySession to manage
58 * Drools containers instantiated using Maven.
60 public class MavenDroolsController implements DroolsController {
65 private static Logger logger = LoggerFactory.getLogger(MavenDroolsController.class);
68 * Policy Container, the access object to the policy-core layer
71 protected final PolicyContainer policyContainer;
74 * alive status of this drools controller,
75 * reflects invocation of start()/stop() only
77 protected volatile boolean alive = false;
80 * locked status of this drools controller,
81 * reflects if i/o drools related operations are permitted,
82 * more specifically: offer() and deliver().
83 * It does not affect the ability to start and stop
84 * underlying drools infrastructure
86 protected volatile boolean locked = false;
89 * list of topics, each with associated decoder classes, each
90 * with a list of associated filters.
92 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
95 * list of topics, each with associated encoder classes, each
96 * with a list of associated filters.
98 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
101 * recent source events processed
103 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<>(10);
106 * recent sink events processed
108 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<>(10);
111 * original Drools Model/Rules classloader hash
113 protected int modelClassLoaderHash;
116 * Expanded version of the constructor
118 * @param groupId maven group id
119 * @param artifactId maven artifact id
120 * @param version maven version
121 * @param decoderConfiguration list of topic -> decoders -> filters mapping
122 * @param encoderConfiguration list of topic -> encoders -> filters mapping
124 * @throws IllegalArgumentException invalid arguments passed in
126 public MavenDroolsController(String groupId,
129 List<TopicCoderFilterConfiguration> decoderConfigurations,
130 List<TopicCoderFilterConfiguration> encoderConfigurations) {
132 logger.info("drools-controller instantiation [{}:{}:{}]", groupId, artifactId, version);
134 if (groupId == null || groupId.isEmpty())
135 throw new IllegalArgumentException("Missing maven group-id coordinate");
137 if (artifactId == null || artifactId.isEmpty())
138 throw new IllegalArgumentException("Missing maven artifact-id coordinate");
140 if (version == null || version.isEmpty())
141 throw new IllegalArgumentException("Missing maven version coordinate");
143 this.policyContainer= new PolicyContainer(groupId, artifactId, version);
144 this.init(decoderConfigurations, encoderConfigurations);
146 logger.debug("{}: instantiation completed ", this);
150 * init encoding/decoding configuration
151 * @param decoderConfiguration list of topic -> decoders -> filters mapping
152 * @param encoderConfiguration list of topic -> encoders -> filters mapping
154 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
155 List<TopicCoderFilterConfiguration> encoderConfigurations) {
157 this.decoderConfigurations = decoderConfigurations;
158 this.encoderConfigurations = encoderConfigurations;
160 this.initCoders(decoderConfigurations, true);
161 this.initCoders(encoderConfigurations, false);
163 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
167 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
168 List<TopicCoderFilterConfiguration> decoderConfigurations,
169 List<TopicCoderFilterConfiguration> encoderConfigurations)
170 throws LinkageError {
172 logger.info("{}: updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion);
174 if (newGroupId == null || newGroupId.isEmpty())
175 throw new IllegalArgumentException("Missing maven group-id coordinate");
177 if (newArtifactId == null || newArtifactId.isEmpty())
178 throw new IllegalArgumentException("Missing maven artifact-id coordinate");
180 if (newVersion == null || newVersion.isEmpty())
181 throw new IllegalArgumentException("Missing maven version coordinate");
183 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) ||
184 newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) ||
185 newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
186 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " +
187 newGroupId + ":" + newArtifactId + ":" +
191 if (newGroupId.equalsIgnoreCase(this.getGroupId()) &&
192 newArtifactId.equalsIgnoreCase(this.getArtifactId()) &&
193 newVersion.equalsIgnoreCase(this.getVersion())) {
194 logger.warn("Al in the right version: " + newGroupId + ":" +
195 newArtifactId + ":" + newVersion + " vs. " + this);
199 if (!newGroupId.equalsIgnoreCase(this.getGroupId()) ||
200 !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
201 throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " +
202 newGroupId + ":" + newArtifactId + ":" +
203 newVersion + " vs. " + this);
207 String messages = this.policyContainer.updateToVersion(newVersion);
208 if (logger.isWarnEnabled())
209 logger.warn(this + "UPGRADE results: " + messages);
212 * If all sucessful (can load new container), now we can remove all coders from previous sessions
219 this.init(decoderConfigurations, encoderConfigurations);
221 if (logger.isInfoEnabled())
222 logger.info("UPDATE-TO-VERSION: completed " + this);
226 * initialize decoders for all the topics supported by this controller
227 * Note this is critical to be done after the Policy Container is
228 * instantiated to be able to fetch the corresponding classes.
230 * @param decoderConfiguration list of topic -> decoders -> filters mapping
232 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
235 if (logger.isInfoEnabled())
236 logger.info("INIT-CODERS: " + this);
238 if (coderConfigurations == null) {
243 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
244 String topic = coderConfig.getTopic();
246 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
247 if (coderConfig.getCustomGsonCoder() != null &&
248 coderConfig.getCustomGsonCoder().getClassContainer() != null &&
249 !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
251 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
252 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
253 customGsonCoderClass)) {
254 logger.error(customGsonCoderClass + " cannot be retrieved");
255 throw new IllegalArgumentException(customGsonCoderClass + " cannot be retrieved");
257 if (logger.isInfoEnabled())
258 logger.info("CLASS FETCHED " + customGsonCoderClass);
262 CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
263 if (coderConfig.getCustomJacksonCoder() != null &&
264 coderConfig.getCustomJacksonCoder().getClassContainer() != null &&
265 !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
267 String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
268 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
269 customJacksonCoderClass)) {
270 logger.error(customJacksonCoderClass + " cannot be retrieved");
271 throw new IllegalArgumentException(customJacksonCoderClass + " cannot be retrieved");
273 if (logger.isInfoEnabled())
274 logger.info("CLASS FETCHED " + customJacksonCoderClass);
278 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
279 if (coderFilters == null || coderFilters.isEmpty()) {
283 for (PotentialCoderFilter coderFilter : coderFilters) {
284 String potentialCodedClass = coderFilter.getCodedClass();
285 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
287 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
288 potentialCodedClass)) {
289 logger.error(potentialCodedClass + " cannot be retrieved");
290 throw new IllegalArgumentException(potentialCodedClass + " cannot be retrieved");
292 if (logger.isInfoEnabled())
293 logger.info("CLASS FETCHED " + potentialCodedClass);
297 EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(),
298 topic, potentialCodedClass, protocolFilter,
301 this.policyContainer.getClassLoader().hashCode());
303 EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(),
304 topic, potentialCodedClass, protocolFilter,
307 this.policyContainer.getClassLoader().hashCode());
316 protected void removeDecoders(){
317 if (logger.isInfoEnabled())
318 logger.info("REMOVE-DECODERS: " + this);
320 if (this.decoderConfigurations == null) {
325 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
326 String topic = coderConfig.getTopic();
327 EventProtocolCoder.manager.removeDecoders
328 (this.getGroupId(), this.getArtifactId(), topic);
335 protected void removeEncoders() {
337 if (logger.isInfoEnabled())
338 logger.info("REMOVE-ENCODERS: " + this);
340 if (this.encoderConfigurations == null)
344 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
345 String topic = coderConfig.getTopic();
346 EventProtocolCoder.manager.removeEncoders
347 (this.getGroupId(), this.getArtifactId(), topic);
353 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) {
354 if (!ReflectionUtil.isClass
355 (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
356 logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. ");
360 if (modelHash == this.modelClassLoaderHash) {
361 if (logger.isInfoEnabled())
362 logger.info(coderClass.getCanonicalName() +
363 this + " class loader matches original drools controller rules classloader " +
364 coderClass.getClassLoader());
367 if (logger.isWarnEnabled())
368 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " +
369 coderClass.getClassLoader() + " vs " +
370 this.policyContainer.getClassLoader());
376 public boolean start() {
378 if (logger.isInfoEnabled())
379 logger.info("START: " + this);
381 synchronized (this) {
388 return this.policyContainer.start();
392 public boolean stop() {
394 logger.info("STOP: " + this);
396 synchronized (this) {
403 return this.policyContainer.stop();
407 public void shutdown() {
408 logger.info("{}: SHUTDOWN", this);
413 } catch (Exception e) {
414 logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e);
416 this.policyContainer.shutdown();
423 logger.info("{}: HALT", this);
428 } catch (Exception e) {
429 logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e);
431 this.policyContainer.destroy();
436 * removes this drools controllers and encoders and decoders from operation
438 protected void removeCoders() {
439 logger.info("{}: REMOVE-CODERS", this);
442 this.removeDecoders();
443 } catch (IllegalArgumentException e) {
444 logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e);
448 this.removeEncoders();
449 } catch (IllegalArgumentException e) {
450 logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e);
455 public boolean isAlive() {
460 public boolean offer(String topic, String event) {
461 logger.debug("{}: OFFER: {} <- {}", this, topic, event);
469 // 0. Check if the policy container has any sessions
471 if (this.policyContainer.getPolicySessions().isEmpty()) {
476 // 1. Now, check if this topic has a decoder:
478 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
479 this.getArtifactId(),
482 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", this,
483 topic, this.getGroupId(), this.getArtifactId());
491 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
492 this.getArtifactId(),
495 } catch (UnsupportedOperationException uoe) {
496 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
497 event, uoe.getMessage(), uoe);
499 } catch (Exception e) {
500 logger.warn("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
501 event, e.getMessage(), e);
505 synchronized(this.recentSourceEvents) {
506 this.recentSourceEvents.add(anEvent);
509 // increment event count for Nagios monitoring
510 PdpJmx.getInstance().updateOccured();
514 if (logger.isInfoEnabled())
515 logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName());
517 if (!this.policyContainer.insertAll(anEvent))
518 logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames());
524 public boolean deliver(TopicSink sink, Object event)
525 throws UnsupportedOperationException {
527 if (logger.isInfoEnabled())
528 logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink);
531 throw new IllegalArgumentException
532 (this + " invalid sink");
535 throw new IllegalArgumentException
536 (this + " invalid event");
539 throw new IllegalStateException
540 (this + " is locked");
543 throw new IllegalStateException
544 (this + " is stopped");
547 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
549 synchronized(this.recentSinkEvents) {
550 this.recentSinkEvents.add(json);
553 return sink.send(json);
558 public String getVersion() {
559 return this.policyContainer.getVersion();
563 public String getArtifactId() {
564 return this.policyContainer.getArtifactId();
568 public String getGroupId() {
569 return this.policyContainer.getGroupId();
573 * @return the modelClassLoaderHash
575 public int getModelClassLoaderHash() {
576 return modelClassLoaderHash;
580 public synchronized boolean lock() {
581 logger.info("LOCK: " + this);
588 public synchronized boolean unlock() {
589 logger.info("UNLOCK: " + this);
596 public boolean isLocked() {
602 public PolicyContainer getContainer() {
603 return this.policyContainer;
606 @JsonProperty("sessions")
608 public List<String> getSessionNames() {
609 return getSessionNames(true);
612 @JsonProperty("sessionCoordinates")
614 public List<String> getCanonicalSessionNames() {
615 return getSessionNames(false);
620 * @param abbreviated true for the short form, otherwise the long form
621 * @return session names
623 protected List<String> getSessionNames(boolean abbreviated) {
624 List<String> sessionNames = new ArrayList<>();
626 for (PolicySession session: this.policyContainer.getPolicySessions()) {
628 sessionNames.add(session.getName());
630 sessionNames.add(session.getFullName());
632 } catch (Exception e) {
633 logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e);
634 sessionNames.add(e.getMessage());
640 * provides the underlying core layer container sessions
642 * @return the attached Policy Container
644 protected List<PolicySession> getSessions() {
645 List<PolicySession> sessions = new ArrayList<>();
646 sessions.addAll(this.policyContainer.getPolicySessions());
651 * provides the underlying core layer container session with name sessionName
653 * @param sessionName session name
654 * @return the attached Policy Container
655 * @throws IllegalArgumentException when an invalid session name is provided
656 * @throws IllegalStateException when the drools controller is in an invalid state
658 protected PolicySession getSession(String sessionName) {
659 if (sessionName == null || sessionName.isEmpty())
660 throw new IllegalArgumentException("A Session Name must be provided");
662 List<PolicySession> sessions = this.getSessions();
663 for (PolicySession session : sessions) {
664 if (sessionName.equals(session.getName()) || sessionName.equals(session.getFullName()))
668 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
672 public Map<String,Integer> factClassNames(String sessionName) {
673 if (sessionName == null || sessionName.isEmpty())
674 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
676 Map<String,Integer> classNames = new HashMap<>();
678 PolicySession session = getSession(sessionName);
679 KieSession kieSession = session.getKieSession();
681 Collection<FactHandle> facts = session.getKieSession().getFactHandles();
682 for (FactHandle fact : facts) {
684 String className = kieSession.getObject(fact).getClass().getName();
685 if (classNames.containsKey(className))
686 classNames.put(className, classNames.get(className) + 1);
688 classNames.put(className, 1);
689 } catch (Exception e) {
690 logger.warn("Object cannot be retrieved from fact {}", fact, e);
698 public long factCount(String sessionName) {
699 if (sessionName == null || sessionName.isEmpty())
700 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
702 PolicySession session = getSession(sessionName);
703 return session.getKieSession().getFactCount();
707 public List<Object> facts(String sessionName, String className, boolean delete) {
708 if (sessionName == null || sessionName.isEmpty())
709 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
711 if (className == null || className.isEmpty())
712 throw new IllegalArgumentException("Invalid Class Name: " + className);
715 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
716 if (factClass == null)
717 throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className);
719 PolicySession session = getSession(sessionName);
720 KieSession kieSession = session.getKieSession();
722 List<Object> factObjects = new ArrayList<>();
724 Collection<FactHandle> factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass));
725 for (FactHandle factHandle : factHandles) {
727 factObjects.add(kieSession.getObject(factHandle));
729 kieSession.delete(factHandle);
730 } catch (Exception e) {
731 logger.warn("Object cannot be retrieved from fact {}", factHandle, e);
739 public List<Object> factQuery(String sessionName, String queryName, String queriedEntity, boolean delete, Object... queryParams) {
740 if (sessionName == null || sessionName.isEmpty())
741 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
743 if (queryName == null || queryName.isEmpty())
744 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
746 if (queriedEntity == null || queriedEntity.isEmpty())
747 throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity);
749 PolicySession session = getSession(sessionName);
750 KieSession kieSession = session.getKieSession();
752 boolean found = false;
753 for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
754 for (Query q : kiePackage.getQueries()) {
755 if (q.getName() != null && q.getName().equals(queryName)) {
762 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
764 List<Object> factObjects = new ArrayList<>();
766 QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams);
767 for (QueryResultsRow row : queryResults) {
769 factObjects.add(row.get(queriedEntity));
771 kieSession.delete(row.getFactHandle(queriedEntity));
772 } catch (Exception e) {
773 logger.warn("Object cannot be retrieved from row: {}", row, e);
781 public Class<?> fetchModelClass(String className) {
782 Class<?> modelClass =
783 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
788 * @return the recentSourceEvents
791 public Object[] getRecentSourceEvents() {
792 synchronized(this.recentSourceEvents) {
793 Object[] events = new Object[recentSourceEvents.size()];
794 return recentSourceEvents.toArray(events);
799 * @return the recentSinkEvents
802 public String[] getRecentSinkEvents() {
803 synchronized(this.recentSinkEvents) {
804 String[] events = new String[recentSinkEvents.size()];
805 return recentSinkEvents.toArray(events);
810 public boolean isBrained() {
816 public String toString() {
817 StringBuilder builder = new StringBuilder();
818 builder.append("MavenDroolsController [policyContainer=")
819 .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":")
821 .append(alive).append(", locked=")
822 .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]");
823 return builder.toString();