2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.controller.internal;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
29 import org.apache.commons.collections4.queue.CircularFifoQueue;
30 import org.drools.core.ClassObjectFilter;
31 import org.kie.api.runtime.KieSession;
32 import org.kie.api.runtime.rule.FactHandle;
33 import org.kie.api.runtime.rule.QueryResults;
34 import org.kie.api.runtime.rule.QueryResultsRow;
35 import org.openecomp.policy.common.logging.eelf.MessageCodes;
36 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
37 import org.openecomp.policy.common.logging.flexlogger.Logger;
38 import org.openecomp.policy.drools.controller.DroolsController;
39 import org.openecomp.policy.drools.core.PolicyContainer;
40 import org.openecomp.policy.drools.core.PolicySession;
41 import org.openecomp.policy.drools.core.jmx.PdpJmx;
42 import org.openecomp.policy.drools.event.comm.TopicSink;
43 import org.openecomp.policy.drools.protocol.coders.EventProtocolCoder;
44 import org.openecomp.policy.drools.protocol.coders.JsonProtocolFilter;
45 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
46 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
47 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
48 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
49 import org.openecomp.policy.drools.utils.ReflectionUtil;
51 import com.fasterxml.jackson.annotation.JsonIgnore;
52 import com.fasterxml.jackson.annotation.JsonProperty;
55 * Maven-based Drools Controller that interacts with the
56 * policy-core PolicyContainer and PolicySession to manage
57 * Drools containers instantiated using Maven.
59 public class MavenDroolsController implements DroolsController {
64 private static Logger logger = FlexLogger.getLogger(MavenDroolsController.class);
67 * Policy Container, the access object to the policy-core layer
70 protected final PolicyContainer policyContainer;
73 * alive status of this drools controller,
74 * reflects invocation of start()/stop() only
76 protected volatile boolean alive = false;
79 * locked status of this drools controller,
80 * reflects if i/o drools related operations are permitted,
81 * more specifically: offer() and deliver().
82 * It does not affect the ability to start and stop
83 * underlying drools infrastructure
85 protected volatile boolean locked = false;
88 * list of topics, each with associated decoder classes, each
89 * with a list of associated filters.
91 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
94 * list of topics, each with associated encoder classes, each
95 * with a list of associated filters.
97 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
100 * recent source events processed
102 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<Object>(10);
105 * recent sink events processed
107 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<String>(10);
110 * original Drools Model/Rules classloader hash
112 protected int modelClassLoaderHash;
115 * Expanded version of the constructor
117 * @param groupId maven group id
118 * @param artifactId maven artifact id
119 * @param version maven version
120 * @param decoderConfiguration list of topic -> decoders -> filters mapping
121 * @param encoderConfiguration list of topic -> encoders -> filters mapping
123 * @throws IllegalArgumentException invalid arguments passed in
125 public MavenDroolsController(String groupId,
128 List<TopicCoderFilterConfiguration> decoderConfigurations,
129 List<TopicCoderFilterConfiguration> encoderConfigurations)
130 throws IllegalArgumentException {
132 if (logger.isInfoEnabled())
133 logger.info("DROOLS CONTROLLER: instantiation " + this +
134 " -> {" + groupId + ":" + artifactId + ":" + version + "}");
136 if (groupId == null || artifactId == null || version == null ||
137 groupId.isEmpty() || artifactId.isEmpty() || version.isEmpty()) {
138 throw new IllegalArgumentException("Missing maven coordinates: " +
139 groupId + ":" + artifactId + ":" +
143 this.policyContainer= new PolicyContainer(groupId, artifactId, version);
144 this.init(decoderConfigurations, encoderConfigurations);
146 if (logger.isInfoEnabled())
147 logger.info("DROOLS CONTROLLER: instantiation completed " + this);
151 * init encoding/decoding configuration
152 * @param decoderConfiguration list of topic -> decoders -> filters mapping
153 * @param encoderConfiguration list of topic -> encoders -> filters mapping
155 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
156 List<TopicCoderFilterConfiguration> encoderConfigurations) {
158 this.decoderConfigurations = decoderConfigurations;
159 this.encoderConfigurations = encoderConfigurations;
161 this.initCoders(decoderConfigurations, true);
162 this.initCoders(encoderConfigurations, false);
164 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
171 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
172 List<TopicCoderFilterConfiguration> decoderConfigurations,
173 List<TopicCoderFilterConfiguration> encoderConfigurations)
174 throws IllegalArgumentException, LinkageError, Exception {
176 if (logger.isInfoEnabled())
177 logger.info("UPDATE-TO-VERSION: " + this + " -> {" + newGroupId + ":" + newArtifactId + ":" + newVersion + "}");
179 if (newGroupId == null || newArtifactId == null || newVersion == null ||
180 newGroupId.isEmpty() || newArtifactId.isEmpty() || newVersion.isEmpty()) {
181 throw new IllegalArgumentException("Missing maven coordinates: " +
182 newGroupId + ":" + newArtifactId + ":" +
186 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) ||
187 newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) ||
188 newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
189 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " +
190 newGroupId + ":" + newArtifactId + ":" +
194 if (newGroupId.equalsIgnoreCase(this.getGroupId()) &&
195 newArtifactId.equalsIgnoreCase(this.getArtifactId()) &&
196 newVersion.equalsIgnoreCase(this.getVersion())) {
197 logger.warn("Al in the right version: " + newGroupId + ":" +
198 newArtifactId + ":" + newVersion + " vs. " + this);
202 if (!newGroupId.equalsIgnoreCase(this.getGroupId()) ||
203 !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
204 throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " +
205 newGroupId + ":" + newArtifactId + ":" +
206 newVersion + " vs. " + this);
210 String messages = this.policyContainer.updateToVersion(newVersion);
211 if (logger.isWarnEnabled())
212 logger.warn(this + "UPGRADE results: " + messages);
215 * If all sucessful (can load new container), now we can remove all coders from previous sessions
222 this.init(decoderConfigurations, encoderConfigurations);
224 if (logger.isInfoEnabled())
225 logger.info("UPDATE-TO-VERSION: completed " + this);
229 * initialize decoders for all the topics supported by this controller
230 * Note this is critical to be done after the Policy Container is
231 * instantiated to be able to fetch the corresponding classes.
233 * @param decoderConfiguration list of topic -> decoders -> filters mapping
235 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
237 throws IllegalArgumentException {
239 if (logger.isInfoEnabled())
240 logger.info("INIT-CODERS: " + this);
242 if (coderConfigurations == null) {
247 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
248 String topic = coderConfig.getTopic();
250 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
251 if (coderConfig.getCustomGsonCoder() != null &&
252 coderConfig.getCustomGsonCoder().getClassContainer() != null &&
253 !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
255 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
256 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
257 customGsonCoderClass)) {
258 logger.error(customGsonCoderClass + " cannot be retrieved");
259 throw new IllegalArgumentException(customGsonCoderClass + " cannot be retrieved");
261 if (logger.isInfoEnabled())
262 logger.info("CLASS FETCHED " + customGsonCoderClass);
266 CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
267 if (coderConfig.getCustomJacksonCoder() != null &&
268 coderConfig.getCustomJacksonCoder().getClassContainer() != null &&
269 !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
271 String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
272 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
273 customJacksonCoderClass)) {
274 logger.error(customJacksonCoderClass + " cannot be retrieved");
275 throw new IllegalArgumentException(customJacksonCoderClass + " cannot be retrieved");
277 if (logger.isInfoEnabled())
278 logger.info("CLASS FETCHED " + customJacksonCoderClass);
282 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
283 if (coderFilters == null || coderFilters.isEmpty()) {
287 for (PotentialCoderFilter coderFilter : coderFilters) {
288 String potentialCodedClass = coderFilter.getCodedClass();
289 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
291 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
292 potentialCodedClass)) {
293 logger.error(potentialCodedClass + " cannot be retrieved");
294 throw new IllegalArgumentException(potentialCodedClass + " cannot be retrieved");
296 if (logger.isInfoEnabled())
297 logger.info("CLASS FETCHED " + potentialCodedClass);
301 EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(),
302 topic, potentialCodedClass, protocolFilter,
305 this.policyContainer.getClassLoader().hashCode());
307 EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(),
308 topic, potentialCodedClass, protocolFilter,
311 this.policyContainer.getClassLoader().hashCode());
320 protected void removeDecoders()
321 throws IllegalArgumentException {
322 if (logger.isInfoEnabled())
323 logger.info("REMOVE-DECODERS: " + this);
325 if (this.decoderConfigurations == null) {
330 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
331 String topic = coderConfig.getTopic();
332 EventProtocolCoder.manager.removeDecoders
333 (this.getGroupId(), this.getArtifactId(), topic);
340 protected void removeEncoders()
341 throws IllegalArgumentException {
343 if (logger.isInfoEnabled())
344 logger.info("REMOVE-ENCODERS: " + this);
346 if (this.encoderConfigurations == null)
350 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
351 String topic = coderConfig.getTopic();
352 EventProtocolCoder.manager.removeEncoders
353 (this.getGroupId(), this.getArtifactId(), topic);
362 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) throws IllegalStateException {
363 if (!ReflectionUtil.isClass
364 (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
365 logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. ");
369 if (modelHash == this.modelClassLoaderHash) {
370 if (logger.isInfoEnabled())
371 logger.info(coderClass.getCanonicalName() +
372 this + " class loader matches original drools controller rules classloader " +
373 coderClass.getClassLoader());
376 if (logger.isWarnEnabled())
377 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " +
378 coderClass.getClassLoader() + " vs " +
379 this.policyContainer.getClassLoader());
388 public boolean start() {
390 if (logger.isInfoEnabled())
391 logger.info("START: " + this);
393 synchronized (this) {
400 return this.policyContainer.start();
407 public boolean stop() {
409 logger.info("STOP: " + this);
411 synchronized (this) {
418 return this.policyContainer.stop();
426 public void shutdown() throws IllegalStateException {
428 if (logger.isInfoEnabled())
429 logger.info(this + "SHUTDOWN");
434 } catch (Exception e) {
435 logger.error(MessageCodes.EXCEPTION_ERROR, e, "stop", this.toString());
437 this.policyContainer.shutdown();
447 public void halt() throws IllegalStateException {
448 if (logger.isInfoEnabled())
449 logger.info(this + "SHUTDOWN");
454 } catch (Exception e) {
455 logger.error(MessageCodes.EXCEPTION_ERROR, e, "halt", this.toString());
457 this.policyContainer.destroy();
462 * removes this drools controllers and encoders and decoders from operation
464 protected void removeCoders() {
466 if (logger.isInfoEnabled())
467 logger.info(this + "REMOVE-CODERS");
470 this.removeDecoders();
471 } catch (IllegalArgumentException e) {
472 logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeDecoders", this.toString());
476 this.removeEncoders();
477 } catch (IllegalArgumentException e) {
478 logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeEncoders", this.toString());
486 public boolean isAlive() {
494 public boolean offer(String topic, String event) {
496 if (logger.isInfoEnabled())
497 logger.info("OFFER: " + topic + ":" + event + " INTO " + this);
505 // 0. Check if the policy container has any sessions
507 if (this.policyContainer.getPolicySessions().size() <= 0) {
512 // 1. Now, check if this topic has a decoder:
514 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
515 this.getArtifactId(),
518 logger.warn("DECODING-UNSUPPORTED: " + ":" + this.getGroupId() +
519 ":" + this.getArtifactId() + ":" + topic + " IN " + this);
527 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
528 this.getArtifactId(),
531 } catch (UnsupportedOperationException uoe) {
532 if (logger.isInfoEnabled())
533 logger.info("DECODE:"+ this + ":" + topic + ":" + event);
535 } catch (Exception e) {
537 logger.error("DECODE:"+ this + ":" + topic + ":" + event);
541 synchronized(this.recentSourceEvents) {
542 this.recentSourceEvents.add(anEvent);
545 // increment event count for Nagios monitoring
546 PdpJmx.getInstance().updateOccured();
550 if (logger.isInfoEnabled())
551 logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName());
553 if (!this.policyContainer.insertAll(anEvent))
554 logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames());
563 public boolean deliver(TopicSink sink, Object event)
564 throws IllegalArgumentException,
565 IllegalStateException,
566 UnsupportedOperationException {
568 if (logger.isInfoEnabled())
569 logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink);
572 throw new IllegalArgumentException
573 (this + " invalid sink");
576 throw new IllegalArgumentException
577 (this + " invalid event");
580 throw new IllegalStateException
581 (this + " is locked");
584 throw new IllegalStateException
585 (this + " is stopped");
588 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
590 synchronized(this.recentSinkEvents) {
591 this.recentSinkEvents.add(json);
594 return sink.send(json);
602 public String getVersion() {
603 return this.policyContainer.getVersion();
610 public String getArtifactId() {
611 return this.policyContainer.getArtifactId();
618 public String getGroupId() {
619 return this.policyContainer.getGroupId();
623 * @return the modelClassLoaderHash
625 public int getModelClassLoaderHash() {
626 return modelClassLoaderHash;
633 public synchronized boolean lock() {
634 logger.info("LOCK: " + this);
644 public synchronized boolean unlock() {
645 logger.info("UNLOCK: " + this);
655 public boolean isLocked() {
660 * gets the policy container
661 * @return the underlying container
664 protected PolicyContainer getContainer() {
665 return this.policyContainer;
671 @JsonProperty("sessions")
673 public List<String> getSessionNames() {
674 return getSessionNames(true);
680 @JsonProperty("sessionCoordinates")
682 public List<String> getCanonicalSessionNames() {
683 return getSessionNames(false);
688 * @param abbreviated true for the short form, otherwise the long form
689 * @return session names
691 protected List<String> getSessionNames(boolean abbreviated) {
692 List<String> sessionNames = new ArrayList<String>();
694 for (PolicySession session: this.policyContainer.getPolicySessions()) {
696 sessionNames.add(session.getName());
698 sessionNames.add(session.getFullName());
700 } catch (Exception e) {
701 logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e);
702 sessionNames.add(e.getMessage());
708 * provides the underlying core layer container sessions
710 * @return the attached Policy Container
712 protected List<PolicySession> getSessions() {
713 List<PolicySession> sessions = new ArrayList<PolicySession>();
714 sessions.addAll(this.policyContainer.getPolicySessions());
719 * provides the underlying core layer container session with name sessionName
721 * @param sessionName session name
722 * @return the attached Policy Container
723 * @throws IllegalArgumentException when an invalid session name is provided
724 * @throws IllegalStateException when the drools controller is in an invalid state
726 protected PolicySession getSession(String sessionName) {
727 if (sessionName == null || sessionName.isEmpty())
728 throw new IllegalArgumentException("A Session Name must be provided");
730 List<PolicySession> sessions = this.getSessions();
731 for (PolicySession session : sessions) {
732 if (sessionName.equals(session.getName()) || sessionName.equals(session.getName()))
736 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
743 public Map<String,Integer> factClassNames(String sessionName) throws IllegalArgumentException {
744 if (sessionName == null || sessionName.isEmpty())
745 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
747 // List<String> classNames = new ArrayList<>();
748 Map<String,Integer> classNames = new HashMap<>();
750 PolicySession session = getSession(sessionName);
751 KieSession kieSession = session.getKieSession();
753 Collection<FactHandle> facts = session.getKieSession().getFactHandles();
754 for (FactHandle fact : facts) {
756 String className = kieSession.getObject(fact).getClass().getName();
757 if (classNames.containsKey(className))
758 classNames.put(className, classNames.get(className) + 1);
760 classNames.put(className, 1);
761 } catch (Exception e) {
762 if (logger.isInfoEnabled())
763 logger.info("Object cannot be retrieved from fact: " + fact);
774 public long factCount(String sessionName) throws IllegalArgumentException {
775 if (sessionName == null || sessionName.isEmpty())
776 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
778 PolicySession session = getSession(sessionName);
779 return session.getKieSession().getFactCount();
786 public List<Object> facts(String sessionName, String className, boolean delete) {
787 if (sessionName == null || sessionName.isEmpty())
788 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
790 if (className == null || className.isEmpty())
791 throw new IllegalArgumentException("Invalid Class Name: " + className);
794 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
795 if (factClass == null)
796 throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className);
798 PolicySession session = getSession(sessionName);
799 KieSession kieSession = session.getKieSession();
801 List<Object> factObjects = new ArrayList<>();
803 Collection<FactHandle> factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass));
804 for (FactHandle factHandle : factHandles) {
806 factObjects.add(kieSession.getObject(factHandle));
808 kieSession.delete(factHandle);
809 } catch (Exception e) {
810 if (logger.isInfoEnabled())
811 logger.info("Object cannot be retrieved from fact: " + factHandle);
822 public List<Object> factQuery(String sessionName, String queryName, String queriedEntity, boolean delete, Object... queryParams) {
823 if (sessionName == null || sessionName.isEmpty())
824 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
826 if (queryName == null || queryName.isEmpty())
827 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
829 if (queriedEntity == null || queriedEntity.isEmpty())
830 throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity);
832 PolicySession session = getSession(sessionName);
833 KieSession kieSession = session.getKieSession();
835 List<Object> factObjects = new ArrayList<>();
837 QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams);
838 for (QueryResultsRow row : queryResults) {
840 factObjects.add(row.get(queriedEntity));
842 kieSession.delete(row.getFactHandle(queriedEntity));
843 } catch (Exception e) {
844 if (logger.isInfoEnabled())
845 logger.info("Object cannot be retrieved from fact: " + row);
856 public Class<?> fetchModelClass(String className) throws IllegalStateException {
857 Class<?> modelClass =
858 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
863 * @return the recentSourceEvents
866 public Object[] getRecentSourceEvents() {
867 synchronized(this.recentSourceEvents) {
868 Object[] events = new Object[recentSourceEvents.size()];
869 return recentSourceEvents.toArray(events);
874 * @return the recentSinkEvents
877 public String[] getRecentSinkEvents() {
878 synchronized(this.recentSinkEvents) {
879 String[] events = new String[recentSinkEvents.size()];
880 return recentSinkEvents.toArray(events);
889 public boolean isBrained() {
895 public String toString() {
896 StringBuilder builder = new StringBuilder();
897 builder.append("MavenDroolsController [policyContainer=")
898 .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":")
900 .append(alive).append(", locked=")
901 .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]");
902 return builder.toString();