2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.drools.controller.internal;
23 import com.fasterxml.jackson.annotation.JsonIgnore;
24 import com.fasterxml.jackson.annotation.JsonProperty;
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.List;
30 import org.apache.commons.collections4.queue.CircularFifoQueue;
31 import org.drools.core.ClassObjectFilter;
32 import org.kie.api.definition.KiePackage;
33 import org.kie.api.definition.rule.Query;
34 import org.kie.api.runtime.KieSession;
35 import org.kie.api.runtime.rule.FactHandle;
36 import org.kie.api.runtime.rule.QueryResults;
37 import org.kie.api.runtime.rule.QueryResultsRow;
38 import org.onap.policy.drools.controller.DroolsController;
39 import org.onap.policy.drools.core.PolicyContainer;
40 import org.onap.policy.drools.core.PolicySession;
41 import org.onap.policy.drools.core.jmx.PdpJmx;
42 import org.onap.policy.drools.event.comm.TopicSink;
43 import org.onap.policy.drools.features.DroolsControllerFeatureAPI;
44 import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
45 import org.onap.policy.drools.protocol.coders.JsonProtocolFilter;
46 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
47 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
48 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
49 import org.onap.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
50 import org.onap.policy.drools.utils.ReflectionUtil;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Maven-based Drools Controller that interacts with the
56 * policy-core PolicyContainer and PolicySession to manage
57 * Drools containers instantiated using Maven.
59 public class MavenDroolsController implements DroolsController {
64 private static Logger logger = LoggerFactory.getLogger(MavenDroolsController.class);
67 * Policy Container, the access object to the policy-core layer
70 protected final PolicyContainer policyContainer;
73 * alive status of this drools controller,
74 * reflects invocation of start()/stop() only
76 protected volatile boolean alive = false;
79 * locked status of this drools controller,
80 * reflects if i/o drools related operations are permitted,
81 * more specifically: offer() and deliver().
82 * It does not affect the ability to start and stop
83 * underlying drools infrastructure
85 protected volatile boolean locked = false;
88 * list of topics, each with associated decoder classes, each
89 * with a list of associated filters.
91 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
94 * list of topics, each with associated encoder classes, each
95 * with a list of associated filters.
97 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
100 * recent source events processed
102 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<>(10);
105 * recent sink events processed
107 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<>(10);
110 * original Drools Model/Rules classloader hash
112 protected int modelClassLoaderHash;
115 * Expanded version of the constructor
117 * @param groupId maven group id
118 * @param artifactId maven artifact id
119 * @param version maven version
120 * @param decoderConfigurations list of topic -> decoders -> filters mapping
121 * @param encoderConfigurations list of topic -> encoders -> filters mapping
123 * @throws IllegalArgumentException invalid arguments passed in
125 public MavenDroolsController(String groupId,
128 List<TopicCoderFilterConfiguration> decoderConfigurations,
129 List<TopicCoderFilterConfiguration> encoderConfigurations) {
131 logger.info("drools-controller instantiation [{}:{}:{}]", groupId, artifactId, version);
133 if (groupId == null || groupId.isEmpty())
134 throw new IllegalArgumentException("Missing maven group-id coordinate");
136 if (artifactId == null || artifactId.isEmpty())
137 throw new IllegalArgumentException("Missing maven artifact-id coordinate");
139 if (version == null || version.isEmpty())
140 throw new IllegalArgumentException("Missing maven version coordinate");
142 this.policyContainer= new PolicyContainer(groupId, artifactId, version);
143 this.init(decoderConfigurations, encoderConfigurations);
145 logger.debug("{}: instantiation completed ", this);
149 * init encoding/decoding configuration
150 * @param decoderConfigurations list of topic -> decoders -> filters mapping
151 * @param encoderConfigurations list of topic -> encoders -> filters mapping
153 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
154 List<TopicCoderFilterConfiguration> encoderConfigurations) {
156 this.decoderConfigurations = decoderConfigurations;
157 this.encoderConfigurations = encoderConfigurations;
159 this.initCoders(decoderConfigurations, true);
160 this.initCoders(encoderConfigurations, false);
162 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
166 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
167 List<TopicCoderFilterConfiguration> decoderConfigurations,
168 List<TopicCoderFilterConfiguration> encoderConfigurations)
169 throws LinkageError {
171 logger.info("{}: updating version -> [{}:{}:{}]", newGroupId, newArtifactId, newVersion);
173 if (newGroupId == null || newGroupId.isEmpty())
174 throw new IllegalArgumentException("Missing maven group-id coordinate");
176 if (newArtifactId == null || newArtifactId.isEmpty())
177 throw new IllegalArgumentException("Missing maven artifact-id coordinate");
179 if (newVersion == null || newVersion.isEmpty())
180 throw new IllegalArgumentException("Missing maven version coordinate");
182 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) ||
183 newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) ||
184 newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
185 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " +
186 newGroupId + ":" + newArtifactId + ":" +
190 if (newGroupId.equalsIgnoreCase(this.getGroupId()) &&
191 newArtifactId.equalsIgnoreCase(this.getArtifactId()) &&
192 newVersion.equalsIgnoreCase(this.getVersion())) {
193 logger.warn("Al in the right version: " + newGroupId + ":" +
194 newArtifactId + ":" + newVersion + " vs. " + this);
198 if (!newGroupId.equalsIgnoreCase(this.getGroupId()) ||
199 !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
200 throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " +
201 newGroupId + ":" + newArtifactId + ":" +
202 newVersion + " vs. " + this);
206 String messages = this.policyContainer.updateToVersion(newVersion);
207 if (logger.isWarnEnabled())
208 logger.warn(this + "UPGRADE results: " + messages);
211 * If all sucessful (can load new container), now we can remove all coders from previous sessions
218 this.init(decoderConfigurations, encoderConfigurations);
220 if (logger.isInfoEnabled())
221 logger.info("UPDATE-TO-VERSION: completed " + this);
225 * initialize decoders for all the topics supported by this controller
226 * Note this is critical to be done after the Policy Container is
227 * instantiated to be able to fetch the corresponding classes.
229 * @param coderConfigurations list of topic -> decoders -> filters mapping
231 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
234 if (logger.isInfoEnabled())
235 logger.info("INIT-CODERS: " + this);
237 if (coderConfigurations == null) {
242 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
243 String topic = coderConfig.getTopic();
245 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
246 if (coderConfig.getCustomGsonCoder() != null &&
247 coderConfig.getCustomGsonCoder().getClassContainer() != null &&
248 !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
250 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
251 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
252 customGsonCoderClass)) {
253 throw makeRetrieveEx(customGsonCoderClass);
255 if (logger.isInfoEnabled())
256 logClassFetched(customGsonCoderClass);
260 CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
261 if (coderConfig.getCustomJacksonCoder() != null &&
262 coderConfig.getCustomJacksonCoder().getClassContainer() != null &&
263 !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
265 String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
266 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
267 customJacksonCoderClass)) {
268 throw makeRetrieveEx(customJacksonCoderClass);
270 if (logger.isInfoEnabled())
271 logClassFetched(customJacksonCoderClass);
275 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
276 if (coderFilters == null || coderFilters.isEmpty()) {
280 for (PotentialCoderFilter coderFilter : coderFilters) {
281 String potentialCodedClass = coderFilter.getCodedClass();
282 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
284 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
285 potentialCodedClass)) {
286 throw makeRetrieveEx(potentialCodedClass);
288 if (logger.isInfoEnabled())
289 logClassFetched(potentialCodedClass);
293 EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(),
294 topic, potentialCodedClass, protocolFilter,
297 this.policyContainer.getClassLoader().hashCode());
299 EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(),
300 topic, potentialCodedClass, protocolFilter,
303 this.policyContainer.getClassLoader().hashCode());
309 * Logs an error and makes an exception for an item that cannot be retrieved.
311 * @return a new exception
313 private IllegalArgumentException makeRetrieveEx(String itemName) {
314 logger.error(itemName + " cannot be retrieved");
315 return new IllegalArgumentException(itemName + " cannot be retrieved");
319 * Logs the name of the class that was fetched.
322 private void logClassFetched(String className) {
323 logger.info("CLASS FETCHED " + className);
330 protected void removeDecoders(){
331 if (logger.isInfoEnabled())
332 logger.info("REMOVE-DECODERS: " + this);
334 if (this.decoderConfigurations == null) {
339 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
340 String topic = coderConfig.getTopic();
341 EventProtocolCoder.manager.removeDecoders
342 (this.getGroupId(), this.getArtifactId(), topic);
349 protected void removeEncoders() {
351 if (logger.isInfoEnabled())
352 logger.info("REMOVE-ENCODERS: " + this);
354 if (this.encoderConfigurations == null)
358 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
359 String topic = coderConfig.getTopic();
360 EventProtocolCoder.manager.removeEncoders
361 (this.getGroupId(), this.getArtifactId(), topic);
367 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) {
368 if (!ReflectionUtil.isClass
369 (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
370 logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. ");
374 if (modelHash == this.modelClassLoaderHash) {
375 if (logger.isInfoEnabled())
376 logger.info(coderClass.getCanonicalName() +
377 this + " class loader matches original drools controller rules classloader " +
378 coderClass.getClassLoader());
381 if (logger.isWarnEnabled())
382 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " +
383 coderClass.getClassLoader() + " vs " +
384 this.policyContainer.getClassLoader());
390 public boolean start() {
392 if (logger.isInfoEnabled())
393 logger.info("START: " + this);
395 synchronized (this) {
402 return this.policyContainer.start();
406 public boolean stop() {
408 logger.info("STOP: " + this);
410 synchronized (this) {
417 return this.policyContainer.stop();
421 public void shutdown() {
422 logger.info("{}: SHUTDOWN", this);
427 } catch (Exception e) {
428 logger.error("{} SHUTDOWN FAILED because of {}", this, e.getMessage(), e);
430 this.policyContainer.shutdown();
437 logger.info("{}: HALT", this);
442 } catch (Exception e) {
443 logger.error("{} HALT FAILED because of {}", this, e.getMessage(), e);
445 this.policyContainer.destroy();
450 * removes this drools controllers and encoders and decoders from operation
452 protected void removeCoders() {
453 logger.info("{}: REMOVE-CODERS", this);
456 this.removeDecoders();
457 } catch (IllegalArgumentException e) {
458 logger.error("{} REMOVE-DECODERS FAILED because of {}", this, e.getMessage(), e);
462 this.removeEncoders();
463 } catch (IllegalArgumentException e) {
464 logger.error("{} REMOVE-ENCODERS FAILED because of {}", this, e.getMessage(), e);
469 public boolean isAlive() {
474 public boolean offer(String topic, String event) {
475 logger.debug("{}: OFFER: {} <- {}", this, topic, event);
483 // 0. Check if the policy container has any sessions
485 if (this.policyContainer.getPolicySessions().isEmpty()) {
490 // 1. Now, check if this topic has a decoder:
492 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
493 this.getArtifactId(),
496 logger.warn("{}: DECODING-UNSUPPORTED {}:{}:{}", this,
497 topic, this.getGroupId(), this.getArtifactId());
505 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
506 this.getArtifactId(),
509 } catch (UnsupportedOperationException uoe) {
510 logger.debug("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
511 event, uoe.getMessage(), uoe);
513 } catch (Exception e) {
514 logger.warn("{}: DECODE FAILED: {} <- {} because of {}", this, topic,
515 event, e.getMessage(), e);
519 synchronized(this.recentSourceEvents) {
520 this.recentSourceEvents.add(anEvent);
523 // increment event count for Nagios monitoring
524 PdpJmx.getInstance().updateOccured();
528 if (logger.isInfoEnabled())
529 logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName());
531 for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) {
533 if (feature.beforeInsert(this, anEvent))
535 } catch (Exception e) {
536 logger.error("{}: feature {} before-insert failure because of {}",
537 this, feature.getClass().getName(), e.getMessage(), e);
541 boolean successInject = this.policyContainer.insertAll(anEvent);
543 logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames());
545 for (DroolsControllerFeatureAPI feature : DroolsControllerFeatureAPI.providers.getList()) {
547 if (feature.afterInsert(this, anEvent, successInject))
549 } catch (Exception e) {
550 logger.error("{}: feature {} after-insert failure because of {}",
551 this, feature.getClass().getName(), e.getMessage(), e);
559 public boolean deliver(TopicSink sink, Object event) {
561 if (logger.isInfoEnabled())
562 logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink);
565 throw new IllegalArgumentException
566 (this + " invalid sink");
569 throw new IllegalArgumentException
570 (this + " invalid event");
573 throw new IllegalStateException
574 (this + " is locked");
577 throw new IllegalStateException
578 (this + " is stopped");
581 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
583 synchronized(this.recentSinkEvents) {
584 this.recentSinkEvents.add(json);
587 return sink.send(json);
592 public String getVersion() {
593 return this.policyContainer.getVersion();
597 public String getArtifactId() {
598 return this.policyContainer.getArtifactId();
602 public String getGroupId() {
603 return this.policyContainer.getGroupId();
607 * @return the modelClassLoaderHash
609 public int getModelClassLoaderHash() {
610 return modelClassLoaderHash;
614 public synchronized boolean lock() {
615 logger.info("LOCK: " + this);
622 public synchronized boolean unlock() {
623 logger.info("UNLOCK: " + this);
630 public boolean isLocked() {
636 public PolicyContainer getContainer() {
637 return this.policyContainer;
640 @JsonProperty("sessions")
642 public List<String> getSessionNames() {
643 return getSessionNames(true);
646 @JsonProperty("sessionCoordinates")
648 public List<String> getCanonicalSessionNames() {
649 return getSessionNames(false);
654 * @param abbreviated true for the short form, otherwise the long form
655 * @return session names
657 protected List<String> getSessionNames(boolean abbreviated) {
658 List<String> sessionNames = new ArrayList<>();
660 for (PolicySession session: this.policyContainer.getPolicySessions()) {
662 sessionNames.add(session.getName());
664 sessionNames.add(session.getFullName());
666 } catch (Exception e) {
667 logger.warn("Can't retrieve CORE sessions: " + e.getMessage(), e);
668 sessionNames.add(e.getMessage());
674 * provides the underlying core layer container sessions
676 * @return the attached Policy Container
678 protected List<PolicySession> getSessions() {
679 List<PolicySession> sessions = new ArrayList<>();
680 sessions.addAll(this.policyContainer.getPolicySessions());
685 * provides the underlying core layer container session with name sessionName
687 * @param sessionName session name
688 * @return the attached Policy Container
689 * @throws IllegalArgumentException when an invalid session name is provided
690 * @throws IllegalStateException when the drools controller is in an invalid state
692 protected PolicySession getSession(String sessionName) {
693 if (sessionName == null || sessionName.isEmpty())
694 throw new IllegalArgumentException("A Session Name must be provided");
696 List<PolicySession> sessions = this.getSessions();
697 for (PolicySession session : sessions) {
698 if (sessionName.equals(session.getName()) || sessionName.equals(session.getFullName()))
702 throw invalidSessNameEx(sessionName);
705 private IllegalArgumentException invalidSessNameEx(String sessionName) {
706 return new IllegalArgumentException("Invalid Session Name: " + sessionName);
710 public Map<String,Integer> factClassNames(String sessionName) {
711 if (sessionName == null || sessionName.isEmpty())
712 throw invalidSessNameEx(sessionName);
714 Map<String,Integer> classNames = new HashMap<>();
716 PolicySession session = getSession(sessionName);
717 KieSession kieSession = session.getKieSession();
719 Collection<FactHandle> facts = session.getKieSession().getFactHandles();
720 for (FactHandle fact : facts) {
722 String className = kieSession.getObject(fact).getClass().getName();
723 if (classNames.containsKey(className))
724 classNames.put(className, classNames.get(className) + 1);
726 classNames.put(className, 1);
727 } catch (Exception e) {
728 logger.warn("Object cannot be retrieved from fact {}", fact, e);
736 public long factCount(String sessionName) {
737 if (sessionName == null || sessionName.isEmpty())
738 throw invalidSessNameEx(sessionName);
740 PolicySession session = getSession(sessionName);
741 return session.getKieSession().getFactCount();
745 public List<Object> facts(String sessionName, String className, boolean delete) {
746 if (sessionName == null || sessionName.isEmpty())
747 throw invalidSessNameEx(sessionName);
749 if (className == null || className.isEmpty())
750 throw new IllegalArgumentException("Invalid Class Name: " + className);
753 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
754 if (factClass == null)
755 throw new IllegalArgumentException("Class cannot be fetched in model's classloader: " + className);
757 PolicySession session = getSession(sessionName);
758 KieSession kieSession = session.getKieSession();
760 List<Object> factObjects = new ArrayList<>();
762 Collection<FactHandle> factHandles = kieSession.getFactHandles(new ClassObjectFilter(factClass));
763 for (FactHandle factHandle : factHandles) {
765 factObjects.add(kieSession.getObject(factHandle));
767 kieSession.delete(factHandle);
768 } catch (Exception e) {
769 logger.warn("Object cannot be retrieved from fact {}", factHandle, e);
777 public List<Object> factQuery(String sessionName, String queryName, String queriedEntity, boolean delete, Object... queryParams) {
778 if (sessionName == null || sessionName.isEmpty())
779 throw invalidSessNameEx(sessionName);
781 if (queryName == null || queryName.isEmpty())
782 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
784 if (queriedEntity == null || queriedEntity.isEmpty())
785 throw new IllegalArgumentException("Invalid Queried Entity: " + queriedEntity);
787 PolicySession session = getSession(sessionName);
788 KieSession kieSession = session.getKieSession();
790 boolean found = false;
791 for (KiePackage kiePackage : kieSession.getKieBase().getKiePackages()) {
792 for (Query q : kiePackage.getQueries()) {
793 if (q.getName() != null && q.getName().equals(queryName)) {
800 throw new IllegalArgumentException("Invalid Query Name: " + queryName);
802 List<Object> factObjects = new ArrayList<>();
804 QueryResults queryResults = kieSession.getQueryResults(queryName, queryParams);
805 for (QueryResultsRow row : queryResults) {
807 factObjects.add(row.get(queriedEntity));
809 kieSession.delete(row.getFactHandle(queriedEntity));
810 } catch (Exception e) {
811 logger.warn("Object cannot be retrieved from row: {}", row, e);
819 public Class<?> fetchModelClass(String className) {
820 return ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
824 * @return the recentSourceEvents
827 public Object[] getRecentSourceEvents() {
828 synchronized(this.recentSourceEvents) {
829 Object[] events = new Object[recentSourceEvents.size()];
830 return recentSourceEvents.toArray(events);
835 * @return the recentSinkEvents
838 public String[] getRecentSinkEvents() {
839 synchronized(this.recentSinkEvents) {
840 String[] events = new String[recentSinkEvents.size()];
841 return recentSinkEvents.toArray(events);
846 public boolean isBrained() {
852 public String toString() {
853 StringBuilder builder = new StringBuilder();
854 builder.append("MavenDroolsController [policyContainer=")
855 .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":")
857 .append(alive).append(", locked=")
858 .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]");
859 return builder.toString();