2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.controller.internal;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
29 import org.apache.commons.collections4.queue.CircularFifoQueue;
30 import org.drools.core.ClassObjectFilter;
31 import org.kie.api.definition.KiePackage;
32 import org.kie.api.definition.rule.Query;
33 import org.kie.api.runtime.KieSession;
34 import org.kie.api.runtime.rule.FactHandle;
35 import org.kie.api.runtime.rule.QueryResults;
36 import org.kie.api.runtime.rule.QueryResultsRow;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39 import org.openecomp.policy.drools.controller.DroolsController;
40 import org.openecomp.policy.drools.core.PolicyContainer;
41 import org.openecomp.policy.drools.core.PolicySession;
42 import org.openecomp.policy.drools.core.jmx.PdpJmx;
43 import org.openecomp.policy.drools.event.comm.TopicSink;
44 import org.openecomp.policy.drools.protocol.coders.EventProtocolCoder;
45 import org.openecomp.policy.drools.protocol.coders.JsonProtocolFilter;
46 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
47 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
48 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
49 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
50 import org.openecomp.policy.drools.utils.ReflectionUtil;
52 import com.fasterxml.jackson.annotation.JsonIgnore;
53 import com.fasterxml.jackson.annotation.JsonProperty;
56 * Maven-based Drools Controller that interacts with the
57 * policy-core PolicyContainer and PolicySession to manage
58 * Drools containers instantiated using Maven.
60 public class MavenDroolsController implements DroolsController {
65 private static Logger logger = LoggerFactory.getLogger(MavenDroolsController.class);
68 * Policy Container, the access object to the policy-core layer
71 protected final PolicyContainer policyContainer;
74 * alive status of this drools controller,
75 * reflects invocation of start()/stop() only
77 protected volatile boolean alive = false;
80 * locked status of this drools controller,
81 * reflects if i/o drools related operations are permitted,
82 * more specifically: offer() and deliver().
83 * It does not affect the ability to start and stop
84 * underlying drools infrastructure
86 protected volatile boolean locked = false;
89 * list of topics, each with associated decoder classes, each
90 * with a list of associated filters.
92 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
95 * list of topics, each with associated encoder classes, each
96 * with a list of associated filters.
98 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
101 * recent source events processed
103 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<Object>(10);
106 * recent sink events processed
108 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<String>(10);
111 * original Drools Model/Rules classloader hash
113 protected int modelClassLoaderHash;
116 * Expanded version of the constructor
118 * @param groupId maven group id
119 * @param artifactId maven artifact id
120 * @param version maven version
121 * @param decoderConfiguration list of topic -> decoders -> filters mapping
122 * @param encoderConfiguration list of topic -> encoders -> filters mapping
124 * @throws IllegalArgumentException invalid arguments passed in
126 public MavenDroolsController(String groupId,
129 List<TopicCoderFilterConfiguration> decoderConfigurations,
130 List<TopicCoderFilterConfiguration> encoderConfigurations)
131 throws IllegalArgumentException {
133 if (logger.isInfoEnabled())
134 logger.info("DROOLS CONTROLLER: instantiation " + this +
135 " -> {" + groupId + ":" + artifactId + ":" + version + "}");
137 if (groupId == null || artifactId == null || version == null ||
138 groupId.isEmpty() || artifactId.isEmpty() || version.isEmpty()) {
139 throw new IllegalArgumentException("Missing maven coordinates: " +
140 groupId + ":" + artifactId + ":" +
144 this.policyContainer= new PolicyContainer(groupId, artifactId, version);
145 this.init(decoderConfigurations, encoderConfigurations);
147 if (logger.isInfoEnabled())
148 logger.info("DROOLS CONTROLLER: instantiation completed " + this);
152 * init encoding/decoding configuration
153 * @param decoderConfiguration list of topic -> decoders -> filters mapping
154 * @param encoderConfiguration list of topic -> encoders -> filters mapping
156 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
157 List<TopicCoderFilterConfiguration> encoderConfigurations) {
159 this.decoderConfigurations = decoderConfigurations;
160 this.encoderConfigurations = encoderConfigurations;
162 this.initCoders(decoderConfigurations, true);
163 this.initCoders(encoderConfigurations, false);
165 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
172 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
173 List<TopicCoderFilterConfiguration> decoderConfigurations,
174 List<TopicCoderFilterConfiguration> encoderConfigurations)
175 throws IllegalArgumentException, LinkageError, Exception {
177 if (logger.isInfoEnabled())
178 logger.info("UPDATE-TO-VERSION: " + this + " -> {" + newGroupId + ":" + newArtifactId + ":" + newVersion + "}");
180 if (newGroupId == null || newArtifactId == null || newVersion == null ||
181 newGroupId.isEmpty() || newArtifactId.isEmpty() || newVersion.isEmpty()) {
182 throw new IllegalArgumentException("Missing maven coordinates: " +
183 newGroupId + ":" + newArtifactId + ":" +
187 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) ||
188 newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) ||
189 newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
190 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " +
191 newGroupId + ":" + newArtifactId + ":" +
195 if (newGroupId.equalsIgnoreCase(this.getGroupId()) &&
196 newArtifactId.equalsIgnoreCase(this.getArtifactId()) &&
197 newVersion.equalsIgnoreCase(this.getVersion())) {
198 logger.warn("Al in the right version: " + newGroupId + ":" +
199 newArtifactId + ":" + newVersion + " vs. " + this);
203 if (!newGroupId.equalsIgnoreCase(this.getGroupId()) ||
204 !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
205 throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " +
206 newGroupId + ":" + newArtifactId + ":" +
207 newVersion + " vs. " + this);
211 String messages = this.policyContainer.updateToVersion(newVersion);
212 if (logger.isWarnEnabled())
213 logger.warn(this + "UPGRADE results: " + messages);
216 * If all sucessful (can load new container), now we can remove all coders from previous sessions
223 this.init(decoderConfigurations, encoderConfigurations);
225 if (logger.isInfoEnabled())
226 logger.info("UPDATE-TO-VERSION: completed " + this);
230 * initialize decoders for all the topics supported by this controller
231 * Note this is critical to be done after the Policy Container is
232 * instantiated to be able to fetch the corresponding classes.
234 * @param decoderConfiguration list of topic -> decoders -> filters mapping
236 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
238 throws IllegalArgumentException {
240 if (logger.isInfoEnabled())
241 logger.info("INIT-CODERS: " + this);
243 if (coderConfigurations == null) {
248 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
249 String topic = coderConfig.getTopic();
251 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
252 if (coderConfig.getCustomGsonCoder() != null &&
253 coderConfig.getCustomGsonCoder().getClassContainer() != null &&
254 !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
256 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
257 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
258 customGsonCoderClass)) {
259 logger.error(customGsonCoderClass + " cannot be retrieved");
260 throw new IllegalArgumentException(customGsonCoderClass + " cannot be retrieved");
262 if (logger.isInfoEnabled())
263 logger.info("CLASS FETCHED " + customGsonCoderClass);
267 CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
268 if (coderConfig.getCustomJacksonCoder() != null &&
269 coderConfig.getCustomJacksonCoder().getClassContainer() != null &&
270 !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
272 String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
273 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
274 customJacksonCoderClass)) {
275 logger.error(customJacksonCoderClass + " cannot be retrieved");
276 throw new IllegalArgumentException(customJacksonCoderClass + " cannot be retrieved");
278 if (logger.isInfoEnabled())
279 logger.info("CLASS FETCHED " + customJacksonCoderClass);
283 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
284 if (coderFilters == null || coderFilters.isEmpty()) {
288 for (PotentialCoderFilter coderFilter : coderFilters) {
289 String potentialCodedClass = coderFilter.getCodedClass();
290 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
292 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
293 potentialCodedClass)) {
294 logger.error(potentialCodedClass + " cannot be retrieved");
295 throw new IllegalArgumentException(potentialCodedClass + " cannot be retrieved");
297 if (logger.isInfoEnabled())
298 logger.info("CLASS FETCHED " + potentialCodedClass);
302 EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(),
303 topic, potentialCodedClass, protocolFilter,
306 this.policyContainer.getClassLoader().hashCode());
308 EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(),
309 topic, potentialCodedClass, protocolFilter,
312 this.policyContainer.getClassLoader().hashCode());
321 protected void removeDecoders()
322 throws IllegalArgumentException {
323 if (logger.isInfoEnabled())
324 logger.info("REMOVE-DECODERS: " + this);
326 if (this.decoderConfigurations == null) {
331 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
332 String topic = coderConfig.getTopic();
333 EventProtocolCoder.manager.removeDecoders
334 (this.getGroupId(), this.getArtifactId(), topic);
341 protected void removeEncoders()
342 throws IllegalArgumentException {
344 if (logger.isInfoEnabled())
345 logger.info("REMOVE-ENCODERS: " + this);
347 if (this.encoderConfigurations == null)
351 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
352 String topic = coderConfig.getTopic();
353 EventProtocolCoder.manager.removeEncoders
354 (this.getGroupId(), this.getArtifactId(), topic);
363 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) throws IllegalStateException {
364 if (!ReflectionUtil.isClass
365 (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
366 logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. ");
370 if (modelHash == this.modelClassLoaderHash) {
371 if (logger.isInfoEnabled())
372 logger.info(coderClass.getCanonicalName() +
373 this + " class loader matches original drools controller rules classloader " +
374 coderClass.getClassLoader());
377 if (logger.isWarnEnabled())
378 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " +
379 coderClass.getClassLoader() + " vs " +
380 this.policyContainer.getClassLoader());
389 public boolean start() {
391 if (logger.isInfoEnabled())
392 logger.info("START: " + this);
394 synchronized (this) {
401 return this.policyContainer.start();
408 public boolean stop() {
410 logger.info("STOP: " + this);
412 synchronized (this) {
419 return this.policyContainer.stop();
427 public void shutdown() throws IllegalStateException {
428 logger.info("{}: SHUTDOWN", this);
433 } catch (Exception e) {
434 logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e);
436 this.policyContainer.shutdown();
446 public void halt() throws IllegalStateException {
447 logger.info("{}: HALT", this);
452 } catch (Exception e) {
453 logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e);
455 this.policyContainer.destroy();
460 * removes this drools controllers and encoders and decoders from operation
462 protected void removeCoders() {
463 logger.info("{}: REMOVE-CODERS", this);
466 this.removeDecoders();
467 } catch (IllegalArgumentException e) {
468 logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e);
472 this.removeEncoders();
473 } catch (IllegalArgumentException e) {
474 logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e);
482 public boolean isAlive() {
490 public boolean offer(String topic, String event) {
491 logger.debug("{}: OFFER: {} <- {}", this, topic, event);
499 // 0. Check if the policy container has any sessions
501 if (this.policyContainer.getPolicySessions().size() <= 0) {
506 // 1. Now, check if this topic has a decoder:
508 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
509 this.getArtifactId(),
512 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", this,
513 topic, this.getGroupId(), this.getArtifactId());
521 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
522 this.getArtifactId(),
525 } catch (UnsupportedOperationException uoe) {
526 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
527 event, uoe.getMessage(), uoe);
529 } catch (Exception e) {
530 logger.warn("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
531 event, e.getMessage(), e);
535 synchronized(this.recentSourceEvents) {
536 this.recentSourceEvents.add(anEvent);
539 // increment event count for Nagios monitoring
540 PdpJmx.getInstance().updateOccured();
544 if (logger.isInfoEnabled())
545 logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName());
547 if (!this.policyContainer.insertAll(anEvent))
548 logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames());
557 public boolean deliver(TopicSink sink, Object event)
558 throws IllegalArgumentException,
559 IllegalStateException,
560 UnsupportedOperationException {
562 if (logger.isInfoEnabled())
563 logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink);
566 throw new IllegalArgumentException
567 (this + " invalid sink");
570 throw new IllegalArgumentException
571 (this + " invalid event");
574 throw new IllegalStateException
575 (this + " is locked");
578 throw new IllegalStateException
579 (this + " is stopped");
582 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
584 synchronized(this.recentSinkEvents) {
585 this.recentSinkEvents.add(json);
588 return sink.send(json);
596 public String getVersion() {
597 return this.policyContainer.getVersion();
604 public String getArtifactId() {
605 return this.policyContainer.getArtifactId();
612 public String getGroupId() {
613 return this.policyContainer.getGroupId();
617 * @return the modelClassLoaderHash
619 public int getModelClassLoaderHash() {
620 return modelClassLoaderHash;
627 public synchronized boolean lock() {
628 logger.info("LOCK: " + this);
638 public synchronized boolean unlock() {
639 logger.info("UNLOCK: " + this);
649 public boolean isLocked() {
657 public PolicyContainer getContainer() {
658 return this.policyContainer;
664 @JsonProperty("sessions")
666 public List<String> getSessionNames() {
667 return getSessionNames(true);
673 @JsonProperty("sessionCoordinates")
675 public List<String> getCanonicalSessionNames() {
676 return getSessionNames(false);
681 * @param abbreviated true for the short form, otherwise the long form
682 * @return session names
684 protected List<String> getSessionNames(boolean abbreviated) {
685 List<String> sessionNames = new ArrayList<String>();
687 for (PolicySession session: this.policyContainer.getPolicySessions()) {
689 sessionNames.add(session.getName());
691 sessionNames.add(session.getFullName());
693 } catch (Exception e) {
694 logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e);
695 sessionNames.add(e.getMessage());
701 * provides the underlying core layer container sessions
703 * @return the attached Policy Container
705 protected List<PolicySession> getSessions() {
706 List<PolicySession> sessions = new ArrayList<PolicySession>();
707 sessions.addAll(this.policyContainer.getPolicySessions());
712 * provides the underlying core layer container session with name sessionName
714 * @param sessionName session name
715 * @return the attached Policy Container
716 * @throws IllegalArgumentException when an invalid session name is provided
717 * @throws IllegalStateException when the drools controller is in an invalid state
719 protected PolicySession getSession(String sessionName) {
720 if (sessionName == null || sessionName.isEmpty())
721 throw new IllegalArgumentException("A Session Name must be provided");
723 List<PolicySession> sessions = this.getSessions();
724 for (PolicySession session : sessions) {
725 if (sessionName.equals(session.getName()) || sessionName.equals(session.getName()))
729 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
736 public Map<String,Integer> factClassNames(String sessionName) throws IllegalArgumentException {
737 if (sessionName == null || sessionName.isEmpty())
738 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
740 // List<String> classNames = new ArrayList<>();
741 Map<String,Integer> classNames = new HashMap<>();
743 PolicySession session = getSession(sessionName);
744 KieSession kieSession = session.getKieSession();
746 Collection<FactHandle> facts = session.getKieSession().getFactHandles();
747 for (FactHandle fact : facts) {
749 String className = kieSession.getObject(fact).getClass().getName();
750 if (classNames.containsKey(className))
751 classNames.put(className, classNames.get(className) + 1);
753 classNames.put(className, 1);
754 } catch (Exception e) {
755 if (logger.isInfoEnabled())
756 logger.info("Object cannot be retrieved from fact: " + fact);
767 public long factCount(String sessionName) throws IllegalArgumentException {
768 if (sessionName == null || sessionName.isEmpty())
769 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
771 PolicySession session = getSession(sessionName);
772 return session.getKieSession().getFactCount();
779 public List<Object> facts(String sessionName, String className, boolean delete) {
780 if (sessionName == null || sessionName.isEmpty())
781 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
783 if (className == null || className.isEmpty())
784 throw new IllegalArgumentException("Invalid Class Name: " + className);
787 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
788 if (factClass == null)
789 throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className);
791 PolicySession session = getSession(sessionName);
792 KieSession kieSession = session.getKieSession();
794 List<Object> factObjects = new ArrayList<>();
796 Collection<FactHandle> factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass));
797 for (FactHandle factHandle : factHandles) {
799 factObjects.add(kieSession.getObject(factHandle));
801 kieSession.delete(factHandle);
802 } catch (Exception e) {
803 if (logger.isInfoEnabled())
804 logger.info("Object cannot be retrieved from fact: " + factHandle);
815 public List<Object> factQuery(String sessionName, String queryName, String queriedEntity, boolean delete, Object... queryParams) {
816 if (sessionName == null || sessionName.isEmpty())
817 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
819 if (queryName == null || queryName.isEmpty())
820 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
822 if (queriedEntity == null || queriedEntity.isEmpty())
823 throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity);
825 PolicySession session = getSession(sessionName);
826 KieSession kieSession = session.getKieSession();
828 boolean found = false;
829 for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
830 for (Query q : kiePackage.getQueries()) {
831 if (q.getName() != null && q.getName().equals(queryName)) {
838 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
840 List<Object> factObjects = new ArrayList<>();
842 QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams);
843 for (QueryResultsRow row : queryResults) {
845 factObjects.add(row.get(queriedEntity));
847 kieSession.delete(row.getFactHandle(queriedEntity));
848 } catch (Exception e) {
849 if (logger.isInfoEnabled())
850 logger.info("Object cannot be retrieved from fact: " + row);
861 public Class<?> fetchModelClass(String className) throws IllegalStateException {
862 Class<?> modelClass =
863 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
868 * @return the recentSourceEvents
871 public Object[] getRecentSourceEvents() {
872 synchronized(this.recentSourceEvents) {
873 Object[] events = new Object[recentSourceEvents.size()];
874 return recentSourceEvents.toArray(events);
879 * @return the recentSinkEvents
882 public String[] getRecentSinkEvents() {
883 synchronized(this.recentSinkEvents) {
884 String[] events = new String[recentSinkEvents.size()];
885 return recentSinkEvents.toArray(events);
894 public boolean isBrained() {
900 public String toString() {
901 StringBuilder builder = new StringBuilder();
902 builder.append("MavenDroolsController [policyContainer=")
903 .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":")
905 .append(alive).append(", locked=")
906 .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]");
907 return builder.toString();