2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.controller.internal;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashMap;
26 import java.util.List;
29 import org.apache.commons.collections4.queue.CircularFifoQueue;
30 import org.drools.core.ClassObjectFilter;
31 import org.kie.api.runtime.KieSession;
32 import org.kie.api.runtime.rule.FactHandle;
33 import org.kie.api.runtime.rule.QueryResults;
34 import org.kie.api.runtime.rule.QueryResultsRow;
35 import org.openecomp.policy.common.logging.eelf.MessageCodes;
36 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
37 import org.openecomp.policy.common.logging.flexlogger.Logger;
38 import org.openecomp.policy.drools.controller.DroolsController;
39 import org.openecomp.policy.drools.core.PolicyContainer;
40 import org.openecomp.policy.drools.core.PolicySession;
41 import org.openecomp.policy.drools.core.jmx.PdpJmx;
42 import org.openecomp.policy.drools.event.comm.TopicSink;
43 import org.openecomp.policy.drools.protocol.coders.EventProtocolCoder;
44 import org.openecomp.policy.drools.protocol.coders.JsonProtocolFilter;
45 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
46 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
47 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
48 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
49 import org.openecomp.policy.drools.utils.ReflectionUtil;
51 import com.fasterxml.jackson.annotation.JsonIgnore;
54 * Maven-based Drools Controller that interacts with the
55 * policy-core PolicyContainer and PolicySession to manage
56 * Drools containers instantiated using Maven.
58 public class MavenDroolsController implements DroolsController {
63 private static Logger logger = FlexLogger.getLogger(MavenDroolsController.class);
66 * Policy Container, the access object to the policy-core layer
69 protected final PolicyContainer policyContainer;
72 * alive status of this drools controller,
73 * reflects invocation of start()/stop() only
75 protected volatile boolean alive = false;
78 * locked status of this drools controller,
79 * reflects if i/o drools related operations are permitted,
80 * more specifically: offer() and deliver().
81 * It does not affect the ability to start and stop
82 * underlying drools infrastructure
84 protected volatile boolean locked = false;
87 * list of topics, each with associated decoder classes, each
88 * with a list of associated filters.
90 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
93 * list of topics, each with associated encoder classes, each
94 * with a list of associated filters.
96 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
99 * recent source events processed
101 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<Object>(10);
104 * recent sink events processed
106 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<String>(10);
109 * original Drools Model/Rules classloader hash
111 protected int modelClassLoaderHash;
114 * Expanded version of the constructor
116 * @param groupId maven group id
117 * @param artifactId maven artifact id
118 * @param version maven version
119 * @param decoderConfiguration list of topic -> decoders -> filters mapping
120 * @param encoderConfiguration list of topic -> encoders -> filters mapping
122 * @throws IllegalArgumentException invalid arguments passed in
124 public MavenDroolsController(String groupId,
127 List<TopicCoderFilterConfiguration> decoderConfigurations,
128 List<TopicCoderFilterConfiguration> encoderConfigurations)
129 throws IllegalArgumentException {
131 if (logger.isInfoEnabled())
132 logger.info("DROOLS CONTROLLER: instantiation " + this +
133 " -> {" + groupId + ":" + artifactId + ":" + version + "}");
135 if (groupId == null || artifactId == null || version == null ||
136 groupId.isEmpty() || artifactId.isEmpty() || version.isEmpty()) {
137 throw new IllegalArgumentException("Missing maven coordinates: " +
138 groupId + ":" + artifactId + ":" +
142 this.policyContainer= new PolicyContainer(groupId, artifactId, version);
143 this.init(decoderConfigurations, encoderConfigurations);
145 if (logger.isInfoEnabled())
146 logger.info("DROOLS CONTROLLER: instantiation completed " + this);
150 * init encoding/decoding configuration
151 * @param decoderConfiguration list of topic -> decoders -> filters mapping
152 * @param encoderConfiguration list of topic -> encoders -> filters mapping
154 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
155 List<TopicCoderFilterConfiguration> encoderConfigurations) {
157 this.decoderConfigurations = decoderConfigurations;
158 this.encoderConfigurations = encoderConfigurations;
160 this.initCoders(decoderConfigurations, true);
161 this.initCoders(encoderConfigurations, false);
163 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
170 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
171 List<TopicCoderFilterConfiguration> decoderConfigurations,
172 List<TopicCoderFilterConfiguration> encoderConfigurations)
173 throws IllegalArgumentException, LinkageError, Exception {
175 if (logger.isInfoEnabled())
176 logger.info("UPDATE-TO-VERSION: " + this + " -> {" + newGroupId + ":" + newArtifactId + ":" + newVersion + "}");
178 if (newGroupId == null || newArtifactId == null || newVersion == null ||
179 newGroupId.isEmpty() || newArtifactId.isEmpty() || newVersion.isEmpty()) {
180 throw new IllegalArgumentException("Missing maven coordinates: " +
181 newGroupId + ":" + newArtifactId + ":" +
185 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) ||
186 newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) ||
187 newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
188 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " +
189 newGroupId + ":" + newArtifactId + ":" +
193 if (newGroupId.equalsIgnoreCase(this.getGroupId()) &&
194 newArtifactId.equalsIgnoreCase(this.getArtifactId()) &&
195 newVersion.equalsIgnoreCase(this.getVersion())) {
196 logger.warn("Al in the right version: " + newGroupId + ":" +
197 newArtifactId + ":" + newVersion + " vs. " + this);
201 if (!newGroupId.equalsIgnoreCase(this.getGroupId()) ||
202 !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
203 throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " +
204 newGroupId + ":" + newArtifactId + ":" +
205 newVersion + " vs. " + this);
209 String messages = this.policyContainer.updateToVersion(newVersion);
210 if (logger.isWarnEnabled())
211 logger.warn(this + "UPGRADE results: " + messages);
214 * If all sucessful (can load new container), now we can remove all coders from previous sessions
221 this.init(decoderConfigurations, encoderConfigurations);
223 if (logger.isInfoEnabled())
224 logger.info("UPDATE-TO-VERSION: completed " + this);
228 * initialize decoders for all the topics supported by this controller
229 * Note this is critical to be done after the Policy Container is
230 * instantiated to be able to fetch the corresponding classes.
232 * @param decoderConfiguration list of topic -> decoders -> filters mapping
234 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
236 throws IllegalArgumentException {
238 if (logger.isInfoEnabled())
239 logger.info("INIT-CODERS: " + this);
241 if (coderConfigurations == null) {
246 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
247 String topic = coderConfig.getTopic();
249 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
250 if (coderConfig.getCustomGsonCoder() != null &&
251 coderConfig.getCustomGsonCoder().getClassContainer() != null &&
252 !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
254 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
255 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
256 customGsonCoderClass)) {
257 logger.error(customGsonCoderClass + " cannot be retrieved");
258 throw new IllegalArgumentException(customGsonCoderClass + " cannot be retrieved");
260 if (logger.isInfoEnabled())
261 logger.info("CLASS FETCHED " + customGsonCoderClass);
265 CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
266 if (coderConfig.getCustomJacksonCoder() != null &&
267 coderConfig.getCustomJacksonCoder().getClassContainer() != null &&
268 !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
270 String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
271 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
272 customJacksonCoderClass)) {
273 logger.error(customJacksonCoderClass + " cannot be retrieved");
274 throw new IllegalArgumentException(customJacksonCoderClass + " cannot be retrieved");
276 if (logger.isInfoEnabled())
277 logger.info("CLASS FETCHED " + customJacksonCoderClass);
281 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
282 if (coderFilters == null || coderFilters.isEmpty()) {
286 for (PotentialCoderFilter coderFilter : coderFilters) {
287 String potentialCodedClass = coderFilter.getCodedClass();
288 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
290 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
291 potentialCodedClass)) {
292 logger.error(potentialCodedClass + " cannot be retrieved");
293 throw new IllegalArgumentException(potentialCodedClass + " cannot be retrieved");
295 if (logger.isInfoEnabled())
296 logger.info("CLASS FETCHED " + potentialCodedClass);
300 EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(),
301 topic, potentialCodedClass, protocolFilter,
304 this.policyContainer.getClassLoader().hashCode());
306 EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(),
307 topic, potentialCodedClass, protocolFilter,
310 this.policyContainer.getClassLoader().hashCode());
319 protected void removeDecoders()
320 throws IllegalArgumentException {
321 if (logger.isInfoEnabled())
322 logger.info("REMOVE-DECODERS: " + this);
324 if (this.decoderConfigurations == null) {
329 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
330 String topic = coderConfig.getTopic();
331 EventProtocolCoder.manager.removeDecoders
332 (this.getGroupId(), this.getArtifactId(), topic);
339 protected void removeEncoders()
340 throws IllegalArgumentException {
342 if (logger.isInfoEnabled())
343 logger.info("REMOVE-ENCODERS: " + this);
345 if (this.encoderConfigurations == null)
349 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
350 String topic = coderConfig.getTopic();
351 EventProtocolCoder.manager.removeEncoders
352 (this.getGroupId(), this.getArtifactId(), topic);
361 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) throws IllegalStateException {
362 if (!ReflectionUtil.isClass
363 (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
364 logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. ");
368 if (modelHash == this.modelClassLoaderHash) {
369 if (logger.isInfoEnabled())
370 logger.info(coderClass.getCanonicalName() +
371 this + " class loader matches original drools controller rules classloader " +
372 coderClass.getClassLoader());
375 if (logger.isWarnEnabled())
376 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " +
377 coderClass.getClassLoader() + " vs " +
378 this.policyContainer.getClassLoader());
387 public boolean start() {
389 if (logger.isInfoEnabled())
390 logger.info("START: " + this);
392 synchronized (this) {
399 return this.policyContainer.start();
406 public boolean stop() {
408 logger.info("STOP: " + this);
410 synchronized (this) {
417 return this.policyContainer.stop();
425 public void shutdown() throws IllegalStateException {
427 if (logger.isInfoEnabled())
428 logger.info(this + "SHUTDOWN");
433 } catch (Exception e) {
434 logger.error(MessageCodes.EXCEPTION_ERROR, e, "stop", this.toString());
436 this.policyContainer.shutdown();
446 public void halt() throws IllegalStateException {
447 if (logger.isInfoEnabled())
448 logger.info(this + "SHUTDOWN");
453 } catch (Exception e) {
454 logger.error(MessageCodes.EXCEPTION_ERROR, e, "halt", this.toString());
456 this.policyContainer.destroy();
461 * removes this drools controllers and encoders and decoders from operation
463 protected void removeCoders() {
465 if (logger.isInfoEnabled())
466 logger.info(this + "REMOVE-CODERS");
469 this.removeDecoders();
470 } catch (IllegalArgumentException e) {
471 logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeDecoders", this.toString());
475 this.removeEncoders();
476 } catch (IllegalArgumentException e) {
477 logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeEncoders", this.toString());
485 public boolean isAlive() {
493 public boolean offer(String topic, String event) {
495 if (logger.isInfoEnabled())
496 logger.info("OFFER: " + topic + ":" + event + " INTO " + this);
504 // 0. Check if the policy container has any sessions
506 if (this.policyContainer.getPolicySessions().size() <= 0) {
511 // 1. Now, check if this topic has a decoder:
513 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
514 this.getArtifactId(),
517 logger.warn("DECODING-UNSUPPORTED: " + ":" + this.getGroupId() +
518 ":" + this.getArtifactId() + ":" + topic + " IN " + this);
526 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
527 this.getArtifactId(),
530 } catch (UnsupportedOperationException uoe) {
531 if (logger.isInfoEnabled())
532 logger.info("DECODE:"+ this + ":" + topic + ":" + event);
534 } catch (Exception e) {
536 logger.error("DECODE:"+ this + ":" + topic + ":" + event);
540 synchronized(this.recentSourceEvents) {
541 this.recentSourceEvents.add(anEvent);
544 // increment event count for Nagios monitoring
545 PdpJmx.getInstance().updateOccured();
549 if (logger.isInfoEnabled())
550 logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName());
552 if (!this.policyContainer.insertAll(anEvent))
553 logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames());
562 public boolean deliver(TopicSink sink, Object event)
563 throws IllegalArgumentException,
564 IllegalStateException,
565 UnsupportedOperationException {
567 if (logger.isInfoEnabled())
568 logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink);
571 throw new IllegalArgumentException
572 (this + " invalid sink");
575 throw new IllegalArgumentException
576 (this + " invalid event");
579 throw new IllegalStateException
580 (this + " is locked");
583 throw new IllegalStateException
584 (this + " is stopped");
587 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
589 synchronized(this.recentSinkEvents) {
590 this.recentSinkEvents.add(json);
593 return sink.send(json);
601 public String getVersion() {
602 return this.policyContainer.getVersion();
609 public String getArtifactId() {
610 return this.policyContainer.getArtifactId();
617 public String getGroupId() {
618 return this.policyContainer.getGroupId();
622 * @return the modelClassLoaderHash
624 public int getModelClassLoaderHash() {
625 return modelClassLoaderHash;
632 public synchronized boolean lock() {
633 logger.info("LOCK: " + this);
643 public synchronized boolean unlock() {
644 logger.info("UNLOCK: " + this);
654 public boolean isLocked() {
659 * gets the policy container
660 * @return the underlying container
663 protected PolicyContainer getContainer() {
664 return this.policyContainer;
671 public List<String> getSessionNames() {
672 return getSessionNames(true);
679 public List<String> getCanonicalSessionNames() {
680 return getSessionNames(false);
685 * @param abbreviated true for the short form, otherwise the long form
686 * @return session names
688 protected List<String> getSessionNames(boolean abbreviated) {
689 List<String> sessionNames = new ArrayList<String>();
691 for (PolicySession session: this.policyContainer.getPolicySessions()) {
693 sessionNames.add(session.getName());
695 sessionNames.add(session.getFullName());
697 } catch (Exception e) {
698 logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e);
699 sessionNames.add(e.getMessage());
705 * provides the underlying core layer container sessions
707 * @return the attached Policy Container
709 protected List<PolicySession> getSessions() {
710 List<PolicySession> sessions = new ArrayList<PolicySession>();
711 sessions.addAll(this.policyContainer.getPolicySessions());
716 * provides the underlying core layer container session with name sessionName
718 * @param sessionName session name
719 * @return the attached Policy Container
720 * @throws IllegalArgumentException when an invalid session name is provided
721 * @throws IllegalStateException when the drools controller is in an invalid state
723 protected PolicySession getSession(String sessionName) {
724 if (sessionName == null || sessionName.isEmpty())
725 throw new IllegalArgumentException("A Session Name must be provided");
727 List<PolicySession> sessions = this.getSessions();
728 for (PolicySession session : sessions) {
729 if (sessionName.equals(session.getName()) || sessionName.equals(session.getName()))
733 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
740 public Map<String,Integer> factClassNames(String sessionName) throws IllegalArgumentException {
741 if (sessionName == null || sessionName.isEmpty())
742 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
744 // List<String> classNames = new ArrayList<>();
745 Map<String,Integer> classNames = new HashMap<>();
747 PolicySession session = getSession(sessionName);
748 KieSession kieSession = session.getKieSession();
750 Collection<FactHandle> facts = session.getKieSession().getFactHandles();
751 for (FactHandle fact : facts) {
753 String className = kieSession.getObject(fact).getClass().getName();
754 if (classNames.containsKey(className))
755 classNames.put(className, classNames.get(className) + 1);
757 classNames.put(className, 1);
758 } catch (Exception e) {
759 if (logger.isInfoEnabled())
760 logger.info("Object cannot be retrieved from fact: " + fact);
771 public long factCount(String sessionName) throws IllegalArgumentException {
772 if (sessionName == null || sessionName.isEmpty())
773 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
775 PolicySession session = getSession(sessionName);
776 return session.getKieSession().getFactCount();
783 public List<Object> facts(String sessionName, String className, boolean delete) {
784 if (sessionName == null || sessionName.isEmpty())
785 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
787 if (className == null || className.isEmpty())
788 throw new IllegalArgumentException("Invalid Class Name: " + className);
791 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
792 if (factClass == null)
793 throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className);
795 PolicySession session = getSession(sessionName);
796 KieSession kieSession = session.getKieSession();
798 List<Object> factObjects = new ArrayList<>();
800 Collection<FactHandle> factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass));
801 for (FactHandle factHandle : factHandles) {
803 factObjects.add(kieSession.getObject(factHandle));
805 kieSession.delete(factHandle);
806 } catch (Exception e) {
807 if (logger.isInfoEnabled())
808 logger.info("Object cannot be retrieved from fact: " + factHandle);
819 public List<Object> factQuery(String sessionName, String queryName, String queriedEntity, boolean delete, Object... queryParams) {
820 if (sessionName == null || sessionName.isEmpty())
821 throw new IllegalArgumentException("Invalid Session Name: " + sessionName);
823 if (queryName == null || queryName.isEmpty())
824 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
826 if (queriedEntity == null || queriedEntity.isEmpty())
827 throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity);
829 PolicySession session = getSession(sessionName);
830 KieSession kieSession = session.getKieSession();
832 List<Object> factObjects = new ArrayList<>();
834 QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams);
835 for (QueryResultsRow row : queryResults) {
837 factObjects.add(row.get(queriedEntity));
839 kieSession.delete(row.getFactHandle(queriedEntity));
840 } catch (Exception e) {
841 if (logger.isInfoEnabled())
842 logger.info("Object cannot be retrieved from fact: " + row);
853 public Class<?> fetchModelClass(String className) throws IllegalStateException {
854 Class<?> modelClass =
855 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
860 * @return the recentSourceEvents
863 public Object[] getRecentSourceEvents() {
864 synchronized(this.recentSourceEvents) {
865 Object[] events = new Object[recentSourceEvents.size()];
866 return recentSourceEvents.toArray(events);
871 * @return the recentSinkEvents
874 public String[] getRecentSinkEvents() {
875 synchronized(this.recentSinkEvents) {
876 String[] events = new String[recentSinkEvents.size()];
877 return recentSinkEvents.toArray(events);
886 public boolean isBrained() {
892 public String toString() {
893 StringBuilder builder = new StringBuilder();
894 builder.append("MavenDroolsController [policyContainer=")
895 .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":")
897 .append(alive).append(", locked=")
898 .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]");
899 return builder.toString();