2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.policy.drools.controller.internal;
23 import java.util.ArrayList;
24 import java.util.List;
26 import org.apache.commons.collections4.queue.CircularFifoQueue;
28 import org.openecomp.policy.common.logging.eelf.MessageCodes;
29 import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
30 import org.openecomp.policy.common.logging.flexlogger.Logger;
31 import org.openecomp.policy.drools.controller.DroolsController;
32 import org.openecomp.policy.drools.core.PolicyContainer;
33 import org.openecomp.policy.drools.core.PolicySession;
34 import org.openecomp.policy.drools.core.jmx.PdpJmx;
35 import org.openecomp.policy.drools.event.comm.TopicSink;
36 import org.openecomp.policy.drools.protocol.coders.EventProtocolCoder;
37 import org.openecomp.policy.drools.protocol.coders.JsonProtocolFilter;
38 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration;
39 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomGsonCoder;
40 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.CustomJacksonCoder;
41 import org.openecomp.policy.drools.protocol.coders.TopicCoderFilterConfiguration.PotentialCoderFilter;
42 import org.openecomp.policy.drools.utils.ReflectionUtil;
43 import com.fasterxml.jackson.annotation.JsonIgnore;
46 * Maven-based Drools Controller that interacts with the
47 * policy-core PolicyContainer and PolicySession to manage
48 * Drools containers instantiated using Maven.
50 public class MavenDroolsController implements DroolsController {
55 private static Logger logger = FlexLogger.getLogger(MavenDroolsController.class);
58 * Policy Container, the access object to the policy-core layer
61 protected final PolicyContainer policyContainer;
64 * alive status of this drools controller,
65 * reflects invocation of start()/stop() only
67 protected volatile boolean alive = false;
70 * locked status of this drools controller,
71 * reflects if i/o drools related operations are permitted,
72 * more specifically: offer() and deliver().
73 * It does not affect the ability to start and stop
74 * underlying drools infrastructure
76 protected volatile boolean locked = false;
79 * list of topics, each with associated decoder classes, each
80 * with a list of associated filters.
82 protected List<TopicCoderFilterConfiguration> decoderConfigurations;
85 * list of topics, each with associated encoder classes, each
86 * with a list of associated filters.
88 protected List<TopicCoderFilterConfiguration> encoderConfigurations;
91 * recent source events processed
93 protected final CircularFifoQueue<Object> recentSourceEvents = new CircularFifoQueue<Object>(10);
96 * recent sink events processed
98 protected final CircularFifoQueue<String> recentSinkEvents = new CircularFifoQueue<String>(10);
101 * original Drools Model/Rules classloader hash
103 protected int modelClassLoaderHash;
106 * Expanded version of the constructor
108 * @param groupId maven group id
109 * @param artifactId maven artifact id
110 * @param version maven version
111 * @param decoderConfiguration list of topic -> decoders -> filters mapping
112 * @param encoderConfiguration list of topic -> encoders -> filters mapping
114 * @throws IllegalArgumentException invalid arguments passed in
116 public MavenDroolsController(String groupId,
119 List<TopicCoderFilterConfiguration> decoderConfigurations,
120 List<TopicCoderFilterConfiguration> encoderConfigurations)
121 throws IllegalArgumentException {
123 if (logger.isInfoEnabled())
124 logger.info("DROOLS CONTROLLER: instantiation " + this +
125 " -> {" + groupId + ":" + artifactId + ":" + version + "}");
127 if (groupId == null || artifactId == null || version == null ||
128 groupId.isEmpty() || artifactId.isEmpty() || version.isEmpty()) {
129 throw new IllegalArgumentException("Missing maven coordinates: " +
130 groupId + ":" + artifactId + ":" +
134 this.policyContainer= new PolicyContainer(groupId, artifactId, version);
135 this.init(decoderConfigurations, encoderConfigurations);
137 if (logger.isInfoEnabled())
138 logger.info("DROOLS CONTROLLER: instantiation completed " + this);
142 * init encoding/decoding configuration
143 * @param decoderConfiguration list of topic -> decoders -> filters mapping
144 * @param encoderConfiguration list of topic -> encoders -> filters mapping
146 protected void init(List<TopicCoderFilterConfiguration> decoderConfigurations,
147 List<TopicCoderFilterConfiguration> encoderConfigurations) {
149 this.decoderConfigurations = decoderConfigurations;
150 this.encoderConfigurations = encoderConfigurations;
152 this.initCoders(decoderConfigurations, true);
153 this.initCoders(encoderConfigurations, false);
155 this.modelClassLoaderHash = this.policyContainer.getClassLoader().hashCode();
162 public void updateToVersion(String newGroupId, String newArtifactId, String newVersion,
163 List<TopicCoderFilterConfiguration> decoderConfigurations,
164 List<TopicCoderFilterConfiguration> encoderConfigurations)
165 throws IllegalArgumentException, LinkageError, Exception {
167 if (logger.isInfoEnabled())
168 logger.info("UPDATE-TO-VERSION: " + this + " -> {" + newGroupId + ":" + newArtifactId + ":" + newVersion + "}");
170 if (newGroupId == null || newArtifactId == null || newVersion == null ||
171 newGroupId.isEmpty() || newArtifactId.isEmpty() || newVersion.isEmpty()) {
172 throw new IllegalArgumentException("Missing maven coordinates: " +
173 newGroupId + ":" + newArtifactId + ":" +
177 if (newGroupId.equalsIgnoreCase(DroolsController.NO_GROUP_ID) ||
178 newArtifactId.equalsIgnoreCase(DroolsController.NO_ARTIFACT_ID) ||
179 newVersion.equalsIgnoreCase(DroolsController.NO_VERSION)) {
180 throw new IllegalArgumentException("BRAINLESS maven coordinates provided: " +
181 newGroupId + ":" + newArtifactId + ":" +
185 if (newGroupId.equalsIgnoreCase(this.getGroupId()) &&
186 newArtifactId.equalsIgnoreCase(this.getArtifactId()) &&
187 newVersion.equalsIgnoreCase(this.getVersion())) {
188 logger.warn("Al in the right version: " + newGroupId + ":" +
189 newArtifactId + ":" + newVersion + " vs. " + this);
193 if (!newGroupId.equalsIgnoreCase(this.getGroupId()) ||
194 !newArtifactId.equalsIgnoreCase(this.getArtifactId())) {
195 throw new IllegalArgumentException("Group ID and Artifact ID maven coordinates must be identical for the upgrade: " +
196 newGroupId + ":" + newArtifactId + ":" +
197 newVersion + " vs. " + this);
201 String messages = this.policyContainer.updateToVersion(newVersion);
202 if (logger.isWarnEnabled())
203 logger.warn(this + "UPGRADE results: " + messages);
206 * If all sucessful (can load new container), now we can remove all coders from previous sessions
213 this.init(decoderConfigurations, encoderConfigurations);
215 if (logger.isInfoEnabled())
216 logger.info("UPDATE-TO-VERSION: completed " + this);
220 * initialize decoders for all the topics supported by this controller
221 * Note this is critical to be done after the Policy Container is
222 * instantiated to be able to fetch the corresponding classes.
224 * @param decoderConfiguration list of topic -> decoders -> filters mapping
226 protected void initCoders(List<TopicCoderFilterConfiguration> coderConfigurations,
228 throws IllegalArgumentException {
230 if (logger.isInfoEnabled())
231 logger.info("INIT-CODERS: " + this);
233 if (coderConfigurations == null) {
238 for (TopicCoderFilterConfiguration coderConfig: coderConfigurations) {
239 String topic = coderConfig.getTopic();
241 CustomGsonCoder customGsonCoder = coderConfig.getCustomGsonCoder();
242 if (coderConfig.getCustomGsonCoder() != null &&
243 coderConfig.getCustomGsonCoder().getClassContainer() != null &&
244 !coderConfig.getCustomGsonCoder().getClassContainer().isEmpty()) {
246 String customGsonCoderClass = coderConfig.getCustomGsonCoder().getClassContainer();
247 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
248 customGsonCoderClass)) {
249 logger.error(customGsonCoderClass + " cannot be retrieved");
250 throw new IllegalArgumentException(customGsonCoderClass + " cannot be retrieved");
252 if (logger.isInfoEnabled())
253 logger.info("CLASS FETCHED " + customGsonCoderClass);
257 CustomJacksonCoder customJacksonCoder = coderConfig.getCustomJacksonCoder();
258 if (coderConfig.getCustomJacksonCoder() != null &&
259 coderConfig.getCustomJacksonCoder().getClassContainer() != null &&
260 !coderConfig.getCustomJacksonCoder().getClassContainer().isEmpty()) {
262 String customJacksonCoderClass = coderConfig.getCustomJacksonCoder().getClassContainer();
263 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
264 customJacksonCoderClass)) {
265 logger.error(customJacksonCoderClass + " cannot be retrieved");
266 throw new IllegalArgumentException(customJacksonCoderClass + " cannot be retrieved");
268 if (logger.isInfoEnabled())
269 logger.info("CLASS FETCHED " + customJacksonCoderClass);
273 List<PotentialCoderFilter> coderFilters = coderConfig.getCoderFilters();
274 if (coderFilters == null || coderFilters.isEmpty()) {
278 for (PotentialCoderFilter coderFilter : coderFilters) {
279 String potentialCodedClass = coderFilter.getCodedClass();
280 JsonProtocolFilter protocolFilter = coderFilter.getFilter();
282 if (!ReflectionUtil.isClass(this.policyContainer.getClassLoader(),
283 potentialCodedClass)) {
284 logger.error(potentialCodedClass + " cannot be retrieved");
285 throw new IllegalArgumentException(potentialCodedClass + " cannot be retrieved");
287 if (logger.isInfoEnabled())
288 logger.info("CLASS FETCHED " + potentialCodedClass);
292 EventProtocolCoder.manager.addDecoder(this.getGroupId(), this.getArtifactId(),
293 topic, potentialCodedClass, protocolFilter,
296 this.policyContainer.getClassLoader().hashCode());
298 EventProtocolCoder.manager.addEncoder(this.getGroupId(), this.getArtifactId(),
299 topic, potentialCodedClass, protocolFilter,
302 this.policyContainer.getClassLoader().hashCode());
311 protected void removeDecoders()
312 throws IllegalArgumentException {
313 if (logger.isInfoEnabled())
314 logger.info("REMOVE-DECODERS: " + this);
316 if (this.decoderConfigurations == null) {
321 for (TopicCoderFilterConfiguration coderConfig: decoderConfigurations) {
322 String topic = coderConfig.getTopic();
323 EventProtocolCoder.manager.removeDecoders
324 (this.getGroupId(), this.getArtifactId(), topic);
331 protected void removeEncoders()
332 throws IllegalArgumentException {
334 if (logger.isInfoEnabled())
335 logger.info("REMOVE-ENCODERS: " + this);
337 if (this.encoderConfigurations == null)
341 for (TopicCoderFilterConfiguration coderConfig: encoderConfigurations) {
342 String topic = coderConfig.getTopic();
343 EventProtocolCoder.manager.removeEncoders
344 (this.getGroupId(), this.getArtifactId(), topic);
353 public boolean ownsCoder(Class<? extends Object> coderClass, int modelHash) throws IllegalStateException {
354 if (!ReflectionUtil.isClass
355 (this.policyContainer.getClassLoader(), coderClass.getCanonicalName())) {
356 logger.error(this + coderClass.getCanonicalName() + " cannot be retrieved. ");
360 if (modelHash == this.modelClassLoaderHash) {
361 if (logger.isInfoEnabled())
362 logger.info(coderClass.getCanonicalName() +
363 this + " class loader matches original drools controller rules classloader " +
364 coderClass.getClassLoader());
367 if (logger.isWarnEnabled())
368 logger.warn(this + coderClass.getCanonicalName() + " class loaders don't match " +
369 coderClass.getClassLoader() + " vs " +
370 this.policyContainer.getClassLoader());
379 public boolean start() {
381 if (logger.isInfoEnabled())
382 logger.info("START: " + this);
384 synchronized (this) {
391 return this.policyContainer.start();
398 public boolean stop() {
400 logger.info("STOP: " + this);
402 synchronized (this) {
409 return this.policyContainer.stop();
417 public void shutdown() throws IllegalStateException {
419 if (logger.isInfoEnabled())
420 logger.info(this + "SHUTDOWN");
425 } catch (Exception e) {
426 logger.error(MessageCodes.EXCEPTION_ERROR, e, "stop", this.toString());
428 this.policyContainer.shutdown();
438 public void halt() throws IllegalStateException {
439 if (logger.isInfoEnabled())
440 logger.info(this + "SHUTDOWN");
445 } catch (Exception e) {
446 logger.error(MessageCodes.EXCEPTION_ERROR, e, "halt", this.toString());
448 this.policyContainer.destroy();
453 * removes this drools controllers and encoders and decoders from operation
455 protected void removeCoders() {
457 if (logger.isInfoEnabled())
458 logger.info(this + "REMOVE-CODERS");
461 this.removeDecoders();
462 } catch (IllegalArgumentException e) {
463 logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeDecoders", this.toString());
467 this.removeEncoders();
468 } catch (IllegalArgumentException e) {
469 logger.error(MessageCodes.EXCEPTION_ERROR, e, "removeEncoders", this.toString());
477 public boolean isAlive() {
485 public boolean offer(String topic, String event) {
487 if (logger.isInfoEnabled())
488 logger.info("OFFER: " + topic + ":" + event + " INTO " + this);
496 // 0. Check if the policy container has any sessions
498 if (this.policyContainer.getPolicySessions().size() <= 0) {
503 // 1. Now, check if this topic has a decoder:
505 if (!EventProtocolCoder.manager.isDecodingSupported(this.getGroupId(),
506 this.getArtifactId(),
509 logger.warn("DECODING-UNSUPPORTED: " + ":" + this.getGroupId() +
510 ":" + this.getArtifactId() + ":" + topic + " IN " + this);
518 anEvent = EventProtocolCoder.manager.decode(this.getGroupId(),
519 this.getArtifactId(),
522 } catch (UnsupportedOperationException uoe) {
523 if (logger.isInfoEnabled())
524 logger.info("DECODE:"+ this + ":" + topic + ":" + event);
526 } catch (Exception e) {
528 logger.error("DECODE:"+ this + ":" + topic + ":" + event);
532 synchronized(this.recentSourceEvents) {
533 this.recentSourceEvents.add(anEvent);
536 // increment event count for Nagios monitoring
537 PdpJmx.getInstance().updateOccured();
541 if (logger.isInfoEnabled())
542 logger.info(this + "BROADCAST-INJECT of " + event + " FROM " + topic + " INTO " + this.policyContainer.getName());
544 if (!this.policyContainer.insertAll(anEvent))
545 logger.warn(this + "Failed to inject into PolicyContainer " + this.getSessionNames());
554 public boolean deliver(TopicSink sink, Object event)
555 throws IllegalArgumentException,
556 IllegalStateException,
557 UnsupportedOperationException {
559 if (logger.isInfoEnabled())
560 logger.info(this + "DELIVER: " + event + " FROM " + this + " TO " + sink);
563 throw new IllegalArgumentException
564 (this + " invalid sink");
567 throw new IllegalArgumentException
568 (this + " invalid event");
571 throw new IllegalStateException
572 (this + " is locked");
575 throw new IllegalStateException
576 (this + " is stopped");
579 EventProtocolCoder.manager.encode(sink.getTopic(), event, this);
581 synchronized(this.recentSinkEvents) {
582 this.recentSinkEvents.add(json);
585 return sink.send(json);
593 public String getVersion() {
594 return this.policyContainer.getVersion();
601 public String getArtifactId() {
602 return this.policyContainer.getArtifactId();
609 public String getGroupId() {
610 return this.policyContainer.getGroupId();
614 * @return the modelClassLoaderHash
616 public int getModelClassLoaderHash() {
617 return modelClassLoaderHash;
624 public synchronized boolean lock() {
625 logger.info("LOCK: " + this);
635 public synchronized boolean unlock() {
636 logger.info("UNLOCK: " + this);
646 public boolean isLocked() {
651 public List<String> getSessionNames() {
652 List<String> sessionNames = new ArrayList<String>();
654 for (PolicySession session: this.policyContainer.getPolicySessions()) {
655 sessionNames.add(session.getFullName());
657 } catch (Exception e) {
658 logger.warn(MessageCodes.EXCEPTION_ERROR, e,
659 "Can't retrieve POLICY-CORE sessions: " + e.getMessage(),
661 sessionNames.add(e.getMessage());
670 public Class<?> fetchModelClass(String className) throws IllegalStateException {
671 Class<?> modelClass =
672 ReflectionUtil.fetchClass(this.policyContainer.getClassLoader(), className);
677 * @return the recentSourceEvents
680 public Object[] getRecentSourceEvents() {
681 synchronized(this.recentSourceEvents) {
682 Object[] events = new Object[recentSourceEvents.size()];
683 return recentSourceEvents.toArray(events);
688 * @return the recentSinkEvents
691 public String[] getRecentSinkEvents() {
692 synchronized(this.recentSinkEvents) {
693 String[] events = new String[recentSinkEvents.size()];
694 return recentSinkEvents.toArray(events);
703 public boolean isBrained() {
709 public String toString() {
710 StringBuilder builder = new StringBuilder();
711 builder.append("MavenDroolsController [policyContainer=")
712 .append((policyContainer != null) ? policyContainer.getName() : "NULL").append(":")
714 .append(alive).append(", locked=")
715 .append(", modelClassLoaderHash=").append(modelClassLoaderHash).append("]");
716 return builder.toString();