2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.controller.internal;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
29 import org.apache.commons.collections4.queue.CircularFifoQueue;
30 import org.drools.core.ClassObjectFilter;
31 import org.kie.api.runtime.KieSession;
32 import org.kie.api.runtime.rule.FactHandle;
33 import org.kie.api.runtime.rule.QueryResults;
34 import org.kie.api.runtime.rule.QueryResultsRow;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.openecomp.policy.drools.controller.DroolsController;
38 import org.openecomp.policy.drools.core.PolicyContainer;
39 import org.openecomp.policy.drools.core.PolicySession;
40 import org.openecomp.policy.drools.core.jmx.PdpJmx;
41 import org.openecomp.policy.drools.event.comm.TopicSink;
42 import org.openecomp.policy.drools.protocol.coders.EventProtocolCoder;
43 import org.openecomp.policy.drools.protocol.coders.JsonProtocolFilter;
44 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
45 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
46 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
47 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
48 import org.openecomp.policy.drools.utils.ReflectionUtil;
50 import com.fasterxml.jackson.annotation.JsonIgnore;
51 import com.fasterxml.jackson.annotation.JsonProperty;
54 * Maven-based Drools Controller that interacts with the
55 * policy-core PolicyContainer and PolicySession to manage
56 * Drools containers instantiated using Maven.
58 public class MavenDroolsController implements DroolsController {
63 private static Logger logger = LoggerFactory.getLogger(MavenDroolsController.class);
66 * Policy Container, the access object to the policy-core layer
69 protected final PolicyContainer policyContainer;
72 * alive status of this drools controller,
73 * reflects invocation of start()/stop() only
75 protected volatile boolean alive = false;
78 * locked status of this drools controller,
79 * reflects if i/o drools related operations are permitted,
80 * more specifically: offer() and deliver().
81 * It does not affect the ability to start and stop
82 * underlying drools infrastructure
84 protected volatile boolean locked = false;
87 * list of topics, each with associated decoder classes, each
88 * with a list of associated filters.
90 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
93 * list of topics, each with associated encoder classes, each
94 * with a list of associated filters.
96 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
99 * recent source events processed
101 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<Object>(10);
104 * recent sink events processed
106 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<String>(10);
109 * original Drools Model/Rules classloader hash
111 protected int modelClassLoaderHash;
114 * Expanded version of the constructor
116 * @param groupId maven group id
117 * @param artifactId maven artifact id
118 * @param version maven version
119 * @param decoderConfiguration list of topic -> decoders -> filters mapping
120 * @param encoderConfiguration list of topic -> encoders -> filters mapping
122 * @throws IllegalArgumentException invalid arguments passed in
124 public MavenDroolsController(String groupId,
127 List<TopicCoderFilterConfiguration> decoderConfigurations,
128 List<TopicCoderFilterConfiguration> encoderConfigurations)
129 throws IllegalArgumentException {
131 if (logger.isInfoEnabled())
132 logger.info("DROOLS CONTROLLER: instantiation " + this +
133 " -> {" + groupId + ":" + artifactId + ":" + version + "}");
135 if (groupId == null || artifactId == null || version == null ||
136 groupId.isEmpty() || artifactId.isEmpty() || version.isEmpty()) {
137 throw new IllegalArgumentException("Missing maven coordinates: " +
138 groupId + ":" + artifactId + ":" +
142 this.policyContainer= new PolicyContainer(groupId, artifactId, version);
143 this.init(decoderConfigurations, encoderConfigurations);
145 if (logger.isInfoEnabled())
146 logger.info("DROOLS CONTROLLER: instantiation completed " + this);
150 * init encoding/decoding configuration
151 * @param decoderConfiguration list of topic -> decoders -> filters mapping
152 * @param encoderConfiguration list of topic -> encoders -> filters mapping
154 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
155 List<TopicCoderFilterConfiguration> encoderConfigurations) {
157 this.decoderConfigurations = decoderConfigurations;
158 this.encoderConfigurations = encoderConfigurations;
160 this.initCoders(decoderConfigurations, true);
161 this.initCoders(encoderConfigurations, false);
163 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
170 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
171 List<TopicCoderFilterConfiguration> decoderConfigurations,
172 List<TopicCoderFilterConfiguration> encoderConfigurations)
173 throws IllegalArgumentException, LinkageError, Exception {
175 if (logger.isInfoEnabled())
176 logger.info("UPDATE-TO-VERSION: " + this + " -> {" + newGroupId + ":" + newArtifactId + ":" + newVersion + "}");
178 if (newGroupId == null || newArtifactId == null || newVersion == null ||
179 newGroupId.isEmpty() || newArtifactId.isEmpty() || newVersion.isEmpty()) {
180 throw new IllegalArgumentException("Missing maven coordinates: " +
181 newGroupId + ":" + newArtifactId + ":" +
185 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) ||
186 newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) ||
187 newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
188 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " +
189 newGroupId + ":" + newArtifactId + ":" +
193 if (newGroupId.equalsIgnoreCase(this.getGroupId()) &&
194 newArtifactId.equalsIgnoreCase(this.getArtifactId()) &&
195 newVersion.equalsIgnoreCase(this.getVersion())) {
196 logger.warn("Al in the right version: " + newGroupId + ":" +
197 newArtifactId + ":" + newVersion + " vs. " + this);
201 if (!newGroupId.equalsIgnoreCase(this.getGroupId()) ||
202 !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
203 throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " +
204 newGroupId + ":" + newArtifactId + ":" +
205 newVersion + " vs. " + this);
209 String messages = this.policyContainer.updateToVersion(newVersion);
210 if (logger.isWarnEnabled())
211 logger.warn(this + "UPGRADE results: " + messages);
214 * If all sucessful (can load new container), now we can remove all coders from previous sessions
221 this.init(decoderConfigurations, encoderConfigurations);
223 if (logger.isInfoEnabled())
224 logger.info("UPDATE-TO-VERSION: completed " + this);
228 * initialize decoders for all the topics supported by this controller
229 * Note this is critical to be done after the Policy Container is
230 * instantiated to be able to fetch the corresponding classes.
232 * @param decoderConfiguration list of topic -> decoders -> filters mapping
234 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
236 throws IllegalArgumentException {
238 if (logger.isInfoEnabled())
239 logger.info("INIT-CODERS: " + this);
241 if (coderConfigurations == null) {
246 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
247 String topic = coderConfig.getTopic();
249 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
250 if (coderConfig.getCustomGsonCoder() != null &&
251 coderConfig.getCustomGsonCoder().getClassContainer() != null &&
252 !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
254 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
255 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
256 customGsonCoderClass)) {
257 logger.error(customGsonCoderClass + " cannot be retrieved");
258 throw new IllegalArgumentException(customGsonCoderClass + " cannot be retrieved");
260 if (logger.isInfoEnabled())
261 logger.info("CLASS FETCHED " + customGsonCoderClass);
265 CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
266 if (coderConfig.getCustomJacksonCoder() != null &&
267 coderConfig.getCustomJacksonCoder().getClassContainer() != null &&
268 !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
270 String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
271 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
272 customJacksonCoderClass)) {
273 logger.error(customJacksonCoderClass + " cannot be retrieved");
274 throw new IllegalArgumentException(customJacksonCoderClass + " cannot be retrieved");
276 if (logger.isInfoEnabled())
277 logger.info("CLASS FETCHED " + customJacksonCoderClass);
281 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
282 if (coderFilters == null || coderFilters.isEmpty()) {
286 for (PotentialCoderFilter coderFilter : coderFilters) {
287 String potentialCodedClass = coderFilter.getCodedClass();
288 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
290 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
291 potentialCodedClass)) {
292 logger.error(potentialCodedClass + " cannot be retrieved");
293 throw new IllegalArgumentException(potentialCodedClass + " cannot be retrieved");
295 if (logger.isInfoEnabled())
296 logger.info("CLASS FETCHED " + potentialCodedClass);
300 EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(),
301 topic, potentialCodedClass, protocolFilter,
304 this.policyContainer.getClassLoader().hashCode());
306 EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(),
307 topic, potentialCodedClass, protocolFilter,
310 this.policyContainer.getClassLoader().hashCode());
319 protected void removeDecoders()
320 throws IllegalArgumentException {
321 if (logger.isInfoEnabled())
322 logger.info("REMOVE-DECODERS: " + this);
324 if (this.decoderConfigurations == null) {
329 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
330 String topic = coderConfig.getTopic();
331 EventProtocolCoder.manager.removeDecoders
332 (this.getGroupId(), this.getArtifactId(), topic);
339 protected void removeEncoders()
340 throws IllegalArgumentException {
342 if (logger.isInfoEnabled())
343 logger.info("REMOVE-ENCODERS: " + this);
345 if (this.encoderConfigurations == null)
349 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
350 String topic = coderConfig.getTopic();
351 EventProtocolCoder.manager.removeEncoders
352 (this.getGroupId(), this.getArtifactId(), topic);
361 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) throws IllegalStateException {
362 if (!ReflectionUtil.isClass
363 (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
364 logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. ");
368 if (modelHash == this.modelClassLoaderHash) {
369 if (logger.isInfoEnabled())
370 logger.info(coderClass.getCanonicalName() +
371 this + " class loader matches original drools controller rules classloader " +
372 coderClass.getClassLoader());
375 if (logger.isWarnEnabled())
376 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " +
377 coderClass.getClassLoader() + " vs " +
378 this.policyContainer.getClassLoader());
387 public boolean start() {
389 if (logger.isInfoEnabled())
390 logger.info("START: " + this);
392 synchronized (this) {
399 return this.policyContainer.start();
406 public boolean stop() {
408 logger.info("STOP: " + this);
410 synchronized (this) {
417 return this.policyContainer.stop();
425 public void shutdown() throws IllegalStateException {
426 logger.info("{}: SHUTDOWN", this);
431 } catch (Exception e) {
432 logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e);
434 this.policyContainer.shutdown();
444 public void halt() throws IllegalStateException {
445 logger.info("{}: HALT", this);
450 } catch (Exception e) {
451 logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e);
453 this.policyContainer.destroy();
458 * removes this drools controllers and encoders and decoders from operation
460 protected void removeCoders() {
461 logger.info("{}: REMOVE-CODERS", this);
464 this.removeDecoders();
465 } catch (IllegalArgumentException e) {
466 logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e);
470 this.removeEncoders();
471 } catch (IllegalArgumentException e) {
472 logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e);
480 public boolean isAlive() {
488 public boolean offer(String topic, String event) {
489 logger.debug("{}: OFFER: {} <- {}", this, topic, event);
497 // 0. Check if the policy container has any sessions
499 if (this.policyContainer.getPolicySessions().size() <= 0) {
504 // 1. Now, check if this topic has a decoder:
506 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
507 this.getArtifactId(),
510 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", this,
511 topic, this.getGroupId(), this.getArtifactId());
519 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
520 this.getArtifactId(),
523 } catch (UnsupportedOperationException uoe) {
524 logger.info("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
525 event, uoe.getMessage(), uoe);
527 } catch (Exception e) {
528 logger.warn("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
529 event, e.getMessage(), e);
533 synchronized(this.recentSourceEvents) {
534 this.recentSourceEvents.add(anEvent);
537 // increment event count for Nagios monitoring
538 PdpJmx.getInstance().updateOccured();
542 if (logger.isInfoEnabled())
543 logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName());
545 if (!this.policyContainer.insertAll(anEvent))
546 logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames());
555 public boolean deliver(TopicSink sink, Object event)
556 throws IllegalArgumentException,
557 IllegalStateException,
558 UnsupportedOperationException {
560 if (logger.isInfoEnabled())
561 logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink);
564 throw new IllegalArgumentException
565 (this + " invalid sink");
568 throw new IllegalArgumentException
569 (this + " invalid event");
572 throw new IllegalStateException
573 (this + " is locked");
576 throw new IllegalStateException
577 (this + " is stopped");
580 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
582 synchronized(this.recentSinkEvents) {
583 this.recentSinkEvents.add(json);
586 return sink.send(json);
594 public String getVersion() {
595 return this.policyContainer.getVersion();
602 public String getArtifactId() {
603 return this.policyContainer.getArtifactId();
610 public String getGroupId() {
611 return this.policyContainer.getGroupId();
615 * @return the modelClassLoaderHash
617 public int getModelClassLoaderHash() {
618 return modelClassLoaderHash;
625 public synchronized boolean lock() {
626 logger.info("LOCK: " + this);
636 public synchronized boolean unlock() {
637 logger.info("UNLOCK: " + this);
647 public boolean isLocked() {
652 * gets the policy container
653 * @return the underlying container
656 protected PolicyContainer getContainer() {
657 return this.policyContainer;
663 @JsonProperty("sessions")
665 public List<String> getSessionNames() {
666 return getSessionNames(true);
672 @JsonProperty("sessionCoordinates")
674 public List<String> getCanonicalSessionNames() {
675 return getSessionNames(false);
680 * @param abbreviated true for the short form, otherwise the long form
681 * @return session names
683 protected List<String> getSessionNames(boolean abbreviated) {
684 List<String> sessionNames = new ArrayList<String>();
686 for (PolicySession session: this.policyContainer.getPolicySessions()) {
688 sessionNames.add(session.getName());
690 sessionNames.add(session.getFullName());
692 } catch (Exception e) {
693 logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e);
694 sessionNames.add(e.getMessage());
700 * provides the underlying core layer container sessions
702 * @return the attached Policy Container
704 protected List<PolicySession> getSessions() {
705 List<PolicySession> sessions = new ArrayList<PolicySession>();
706 sessions.addAll(this.policyContainer.getPolicySessions());
711 * provides the underlying core layer container session with name sessionName
713 * @param sessionName session name
714 * @return the attached Policy Container
715 * @throws IllegalArgumentException when an invalid session name is provided
716 * @throws IllegalStateException when the drools controller is in an invalid state
718 protected PolicySession getSession(String sessionName) {
719 if (sessionName == null || sessionName.isEmpty())
720 throw new IllegalArgumentException("A Session Name must be provided");
722 List<PolicySession> sessions = this.getSessions();
723 for (PolicySession session : sessions) {
724 if (sessionName.equals(session.getName()) || sessionName.equals(session.getName()))
728 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
735 public Map<String,Integer> factClassNames(String sessionName) throws IllegalArgumentException {
736 if (sessionName == null || sessionName.isEmpty())
737 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
739 // List<String> classNames = new ArrayList<>();
740 Map<String,Integer> classNames = new HashMap<>();
742 PolicySession session = getSession(sessionName);
743 KieSession kieSession = session.getKieSession();
745 Collection<FactHandle> facts = session.getKieSession().getFactHandles();
746 for (FactHandle fact : facts) {
748 String className = kieSession.getObject(fact).getClass().getName();
749 if (classNames.containsKey(className))
750 classNames.put(className, classNames.get(className) + 1);
752 classNames.put(className, 1);
753 } catch (Exception e) {
754 if (logger.isInfoEnabled())
755 logger.info("Object cannot be retrieved from fact: " + fact);
766 public long factCount(String sessionName) throws IllegalArgumentException {
767 if (sessionName == null || sessionName.isEmpty())
768 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
770 PolicySession session = getSession(sessionName);
771 return session.getKieSession().getFactCount();
778 public List<Object> facts(String sessionName, String className, boolean delete) {
779 if (sessionName == null || sessionName.isEmpty())
780 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
782 if (className == null || className.isEmpty())
783 throw new IllegalArgumentException("Invalid Class Name: " + className);
786 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
787 if (factClass == null)
788 throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className);
790 PolicySession session = getSession(sessionName);
791 KieSession kieSession = session.getKieSession();
793 List<Object> factObjects = new ArrayList<>();
795 Collection<FactHandle> factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass));
796 for (FactHandle factHandle : factHandles) {
798 factObjects.add(kieSession.getObject(factHandle));
800 kieSession.delete(factHandle);
801 } catch (Exception e) {
802 if (logger.isInfoEnabled())
803 logger.info("Object cannot be retrieved from fact: " + factHandle);
814 public List<Object> factQuery(String sessionName, String queryName, String queriedEntity, boolean delete, Object... queryParams) {
815 if (sessionName == null || sessionName.isEmpty())
816 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
818 if (queryName == null || queryName.isEmpty())
819 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
821 if (queriedEntity == null || queriedEntity.isEmpty())
822 throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity);
824 PolicySession session = getSession(sessionName);
825 KieSession kieSession = session.getKieSession();
827 List<Object> factObjects = new ArrayList<>();
829 QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams);
830 for (QueryResultsRow row : queryResults) {
832 factObjects.add(row.get(queriedEntity));
834 kieSession.delete(row.getFactHandle(queriedEntity));
835 } catch (Exception e) {
836 if (logger.isInfoEnabled())
837 logger.info("Object cannot be retrieved from fact: " + row);
848 public Class<?> fetchModelClass(String className) throws IllegalStateException {
849 Class<?> modelClass =
850 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
855 * @return the recentSourceEvents
858 public Object[] getRecentSourceEvents() {
859 synchronized(this.recentSourceEvents) {
860 Object[] events = new Object[recentSourceEvents.size()];
861 return recentSourceEvents.toArray(events);
866 * @return the recentSinkEvents
869 public String[] getRecentSinkEvents() {
870 synchronized(this.recentSinkEvents) {
871 String[] events = new String[recentSinkEvents.size()];
872 return recentSinkEvents.toArray(events);
881 public boolean isBrained() {
887 public String toString() {
888 StringBuilder builder = new StringBuilder();
889 builder.append("MavenDroolsController [policyContainer=")
890 .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":")
892 .append(alive).append(", locked=")
893 .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]");
894 return builder.toString();