/*-
* ============LICENSE_START=======================================================
* policy-management
* ================================================================================
* Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.policy.drools.system;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.onap.policy.drools.controller.DroolsController;
import org.onap.policy.drools.core.PolicyContainer;
import org.onap.policy.drools.core.jmx.PdpJmxListener;
import org.onap.policy.drools.event.comm.Topic;
import org.onap.policy.drools.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.event.comm.TopicEndpoint;
import org.onap.policy.drools.event.comm.TopicListener;
import org.onap.policy.drools.event.comm.TopicSink;
import org.onap.policy.drools.event.comm.TopicSource;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
import org.onap.policy.drools.http.server.HttpServletServer;
import org.onap.policy.drools.persistence.SystemPersistence;
import org.onap.policy.drools.properties.Lockable;
import org.onap.policy.drools.properties.PolicyProperties;
import org.onap.policy.drools.properties.Startable;
import org.onap.policy.drools.protocol.coders.EventProtocolCoder;
import org.onap.policy.drools.protocol.configuration.ControllerConfiguration;
import org.onap.policy.drools.protocol.configuration.PdpdConfiguration;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
/**
* Policy Engine, the top abstraction for the Drools PDP Policy Engine.
* It abstracts away a Drools PDP Engine from management purposes.
* This is the best place to looking at the code from a top down approach.
* Other managed entities can be obtained from the PolicyEngine, hierarchically.
*
* PolicyEngine 1 --- * PolicyController 1 --- 1 DroolsController 1 --- 1 PolicyContainer 1 --- * PolicySession
*
* PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicReader 1 --- 1 UebTopicReader
*
* PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicReader 1 --- 1 DmaapTopicReader
*
* PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicWriter 1 --- 1 DmaapTopicWriter
*
* PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicReader 1 --- 1 RestTopicReader
*
* PolicyEngine 1 --- 1 TopicEndpointManager 1 -- * TopicWriter 1 --- 1 RestTopicWriter
*
* PolicyEngine 1 --- 1 ManagementServer
*/
public interface PolicyEngine extends Startable, Lockable, TopicListener {
/**
* Default Config Server Port
*/
public static final int CONFIG_SERVER_DEFAULT_PORT = 9696;
/**
* Default Config Server Hostname
*/
public static final String CONFIG_SERVER_DEFAULT_HOST = "localhost";
/**
* Boot the engine
*
* @param cliArgs command line arguments
*/
public void boot(String cliArgs[]);
/**
* configure the policy engine according to the given properties
*
* @param properties Policy Engine properties
* @throws IllegalArgumentException when invalid or insufficient
* properties are provided
*/
public void configure(Properties properties) throws IllegalArgumentException;
/**
* registers a new Policy Controller with the Policy Engine
* initialized per properties.
*
* @param controller name
* @param properties properties to initialize the Policy Controller
* @throws IllegalArgumentException when invalid or insufficient
* properties are provided
* @throws IllegalStateException when the engine is in a state where
* this operation is not permitted.
* @return the newly instantiated Policy Controller
*/
public PolicyController createPolicyController(String name, Properties properties)
throws IllegalArgumentException, IllegalStateException;
/**
* updates the Policy Engine with the given configuration
*
* @param configuration the configuration
* @return success or failure
* @throws IllegalArgumentException if invalid argument provided
* @throws IllegalStateException if the system is in an invalid state
*/
public boolean configure(PdpdConfiguration configuration)
throws IllegalArgumentException, IllegalStateException;
/**
* updates a set of Policy Controllers with configuration information
*
* @param configuration
* @return
* @throws IllegalArgumentException
* @throws IllegalStateException
*/
public List updatePolicyControllers(List configuration)
throws IllegalArgumentException, IllegalStateException;
/**
* updates an already existing Policy Controller with configuration information
*
* @param configuration configuration
*
* @return the updated Policy Controller
* @throws IllegalArgumentException in the configuration is invalid
* @throws IllegalStateException if the controller is in a bad state
* @throws Exception any other reason
*/
public PolicyController updatePolicyController(ControllerConfiguration configuration)
throws Exception;
/**
* removes the Policy Controller identified by its name from the Policy Engine
*
* @param name name of the Policy Controller
* @return the removed Policy Controller
*/
public void removePolicyController(String name);
/**
* removes a Policy Controller from the Policy Engine
* @param controller the Policy Controller to remove from the Policy Engine
*/
public void removePolicyController(PolicyController controller);
/**
* returns a list of the available Policy Controllers
*
* @return list of Policy Controllers
*/
public List getPolicyControllers();
/**
* get policy controller names
*
* @return list of controller names
*/
public List getPolicyControllerIds();
/**
* get unmanaged sources
*
* @return unmanaged sources
*/
public List getSources();
/**
* get unmanaged sinks
*
* @return unmanaged sinks
*/
public List getSinks();
/**
* get unmmanaged http servers list
* @return http servers
*/
public List getHttpServers();
/**
* get properties configuration
*
* @return properties objects
*/
public Properties getProperties();
/**
* get features attached to the Policy Engine
* @return list of features
*/
public List getFeatureProviders();
/**
* get named feature attached to the Policy Engine
* @return the feature
*/
public PolicyEngineFeatureAPI getFeatureProvider(String featureName)
throws IllegalArgumentException;
/**
* get features attached to the Policy Engine
* @return list of features
*/
public List getFeatures();
/**
* Attempts the dispatching of an "event" object
*
* @param topic topic
* @param event the event object to send
*
* @return true if successful, false if a failure has occurred.
* @throws IllegalArgumentException when invalid or insufficient
* properties are provided
* @throws IllegalStateException when the engine is in a state where
* this operation is not permitted (ie. locked or stopped).
*/
public boolean deliver(String topic, Object event)
throws IllegalArgumentException, IllegalStateException;
/**
* Attempts the dispatching of an "event" object over communication
* infrastructure "busType"
*
* @param eventBus Communication infrastructure identifier
* @param topic topic
* @param event the event object to send
*
* @return true if successful, false if a failure has occurred.
* @throws IllegalArgumentException when invalid or insufficient
* properties are provided
* @throws IllegalStateException when the engine is in a state where
* this operation is not permitted (ie. locked or stopped).
* @throws UnsupportedOperationException when the engine cannot deliver due
* to the functionality missing (ie. communication infrastructure
* not supported.
*/
public boolean deliver(String busType, String topic, Object event)
throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException;
/**
* Attempts the dispatching of an "event" object over communication
* infrastructure "busType"
*
* @param eventBus Communication infrastructure enum
* @param topic topic
* @param event the event object to send
*
* @return true if successful, false if a failure has occurred.
* @throws IllegalArgumentException when invalid or insufficient
* properties are provided
* @throws IllegalStateException when the engine is in a state where
* this operation is not permitted (ie. locked or stopped).
* @throws UnsupportedOperationException when the engine cannot deliver due
* to the functionality missing (ie. communication infrastructure
* not supported.
*/
public boolean deliver(CommInfrastructure busType, String topic, Object event)
throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException;
/**
* Attempts delivering of an String over communication
* infrastructure "busType"
*
* @param eventBus Communication infrastructure identifier
* @param topic topic
* @param event the event object to send
*
* @return true if successful, false if a failure has occurred.
* @throws IllegalArgumentException when invalid or insufficient
* properties are provided
* @throws IllegalStateException when the engine is in a state where
* this operation is not permitted (ie. locked or stopped).
* @throws UnsupportedOperationException when the engine cannot deliver due
* to the functionality missing (ie. communication infrastructure
* not supported.
*/
public boolean deliver(CommInfrastructure busType, String topic,
String event)
throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException;
/**
* Invoked when the host goes into the active state.
*/
public void activate();
/**
* Invoked when the host goes into the standby state.
*/
public void deactivate();
/**
* Policy Engine Manager
*/
public final static PolicyEngine manager = new PolicyEngineManager();
}
/**
* Policy Engine Manager Implementation
*/
class PolicyEngineManager implements PolicyEngine {
/**
* logger
*/
private static Logger logger = LoggerFactory.getLogger(PolicyEngineManager.class);
/**
* Is the Policy Engine running?
*/
protected boolean alive = false;
/**
* Is the engine locked?
*/
protected boolean locked = false;
/**
* Properties used to initialize the engine
*/
protected Properties properties;
/**
* Policy Engine Sources
*/
protected List extends TopicSource> sources = new ArrayList<>();
/**
* Policy Engine Sinks
*/
protected List extends TopicSink> sinks = new ArrayList<>();
/**
* Policy Engine HTTP Servers
*/
protected List httpServers = new ArrayList();
/**
* gson parser to decode configuration requests
*/
protected Gson decoder = new GsonBuilder().disableHtmlEscaping().create();
/**
* {@inheritDoc}
*/
@Override
public synchronized void boot(String cliArgs[]) {
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeBoot(this, cliArgs))
return;
} catch (Exception e) {
logger.error("{}: feature {} before-boot failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
try {
PolicyContainer.globalInit(cliArgs);
} catch (Exception e) {
logger.error("{}: cannot init policy-container because of {}", this, e.getMessage(), e);
}
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterBoot(this))
return;
} catch (Exception e) {
logger.error("{}: feature {} after-boot failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void configure(Properties properties) throws IllegalArgumentException {
if (properties == null) {
logger.warn("No properties provided");
throw new IllegalArgumentException("No properties provided");
}
/* policy-engine dispatch pre configure hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeConfigure(this, properties))
return;
} catch (Exception e) {
logger.error("{}: feature {} before-configure failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
this.properties = properties;
try {
this.sources = TopicEndpoint.manager.addTopicSources(properties);
for (TopicSource source: this.sources) {
source.register(this);
}
} catch (Exception e) {
logger.error("{}: add-sources failed", this, e);
}
try {
this.sinks = TopicEndpoint.manager.addTopicSinks(properties);
} catch (IllegalArgumentException e) {
logger.error("{}: add-sinks failed", this, e);
}
try {
this.httpServers = HttpServletServer.factory.build(properties);
} catch (IllegalArgumentException e) {
logger.error("{}: add-http-servers failed", this, e);
}
/* policy-engine dispatch post configure hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterConfigure(this))
return;
} catch (Exception e) {
logger.error("{}: feature {} after-configure failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
return;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized PolicyController createPolicyController(String name, Properties properties)
throws IllegalArgumentException, IllegalStateException {
// check if a PROPERTY_CONTROLLER_NAME property is present
// if so, override the given name
String propertyControllerName = properties.getProperty(PolicyProperties.PROPERTY_CONTROLLER_NAME);
if (propertyControllerName != null && !propertyControllerName.isEmpty()) {
if (!propertyControllerName.equals(name)) {
throw new IllegalStateException("Proposed name (" + name +
") and properties name (" + propertyControllerName +
") don't match");
}
name = propertyControllerName;
}
PolicyController controller;
for (PolicyControllerFeatureAPI controllerFeature : PolicyControllerFeatureAPI.providers.getList()) {
try {
controller = controllerFeature.beforeCreate(name, properties);
if (controller != null)
return controller;
} catch (Exception e) {
logger.error("{}: feature {} before-controller-create failure because of {}",
this, controllerFeature.getClass().getName(), e.getMessage(), e);
}
}
controller = PolicyController.factory.build(name, properties);
if (this.isLocked())
controller.lock();
// feature hook
for (PolicyControllerFeatureAPI controllerFeature : PolicyControllerFeatureAPI.providers.getList()) {
try {
if (controllerFeature.afterCreate(controller))
return controller;
} catch (Exception e) {
logger.error("{}: feature {} after-controller-create failure because of {}",
this, controllerFeature.getClass().getName(), e.getMessage(), e);
}
}
return controller;
}
/**
* {@inheritDoc}
*/
@Override
public boolean configure(PdpdConfiguration config) throws IllegalArgumentException, IllegalStateException {
if (config == null)
throw new IllegalArgumentException("No configuration provided");
String entity = config.getEntity();
switch (entity) {
case PdpdConfiguration.CONFIG_ENTITY_CONTROLLER:
/* only this one supported for now */
List configControllers = config.getControllers();
if (configControllers == null || configControllers.isEmpty()) {
if (logger.isInfoEnabled())
logger.info("No controller configuration provided: " + config);
return false;
}
List policyControllers = this.updatePolicyControllers(config.getControllers());
if (policyControllers == null || policyControllers.isEmpty())
return false;
else if (policyControllers.size() == configControllers.size())
return true;
return false;
default:
String msg = "Configuration Entity is not supported: " + entity;
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
}
/**
* {@inheritDoc}
*/
@Override
public List updatePolicyControllers(List configControllers)
throws IllegalArgumentException, IllegalStateException {
List policyControllers = new ArrayList();
if (configControllers == null || configControllers.isEmpty()) {
if (logger.isInfoEnabled())
logger.info("No controller configuration provided: " + configControllers);
return policyControllers;
}
for (ControllerConfiguration configController: configControllers) {
try {
PolicyController policyController = this.updatePolicyController(configController);
policyControllers.add(policyController);
} catch (Exception e) {
logger.error("{}: cannot update-policy-controllers because of {}", this, e.getMessage(), e);
}
}
return policyControllers;
}
/**
* {@inheritDoc}
*/
@Override
public PolicyController updatePolicyController(ControllerConfiguration configController)
throws Exception {
if (configController == null)
throw new IllegalArgumentException("No controller configuration has been provided");
String controllerName = configController.getName();
if (controllerName == null || controllerName.isEmpty()) {
logger.warn("controller-name must be provided");
throw new IllegalArgumentException("No controller configuration has been provided");
}
PolicyController policyController = null;
try {
String operation = configController.getOperation();
if (operation == null || operation.isEmpty()) {
logger.warn("operation must be provided");
throw new IllegalArgumentException("operation must be provided");
}
try {
policyController = PolicyController.factory.get(controllerName);
} catch (IllegalArgumentException e) {
// not found
logger.warn("Policy Controller " + controllerName + " not found");
}
if (policyController == null) {
if (operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK) ||
operation.equalsIgnoreCase(ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK)) {
throw new IllegalArgumentException(controllerName + " is not available for operation " + operation);
}
/* Recovery case */
logger.warn("controller " + controllerName + " does not exist. " +
"Attempting recovery from disk");
Properties properties =
SystemPersistence.manager.getControllerProperties(controllerName);
/*
* returned properties cannot be null (per implementation)
* assert (properties != null)
*/
if (properties == null) {
throw new IllegalArgumentException(controllerName + " is invalid");
}
logger.warn("controller " + controllerName + " being recovered. " +
"Reset controller's bad maven coordinates to brainless");
/*
* try to bring up bad controller in brainless mode,
* after having it working, apply the new create/update operation.
*/
properties.setProperty(PolicyProperties.RULES_GROUPID, DroolsController.NO_GROUP_ID);
properties.setProperty(PolicyProperties.RULES_ARTIFACTID, DroolsController.NO_ARTIFACT_ID);
properties.setProperty(PolicyProperties.RULES_VERSION, DroolsController.NO_VERSION);
policyController = PolicyEngine.manager.createPolicyController(controllerName, properties);
/* fall through to do brain update operation*/
}
switch (operation) {
case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_CREATE:
PolicyController.factory.patch(policyController, configController.getDrools());
break;
case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UPDATE:
policyController.unlock();
PolicyController.factory.patch(policyController, configController.getDrools());
break;
case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_LOCK:
policyController.lock();
break;
case ControllerConfiguration.CONFIG_CONTROLLER_OPERATION_UNLOCK:
policyController.unlock();
break;
default:
String msg = "Controller Operation Configuration is not supported: " +
operation + " for " + controllerName;
logger.warn(msg);
throw new IllegalArgumentException(msg);
}
return policyController;
} catch (Exception e) {
logger.error("{}: cannot update-policy-controller because of {}", this, e.getMessage(), e);
throw e;
} catch (LinkageError e) {
logger.error("{}: cannot update-policy-controllers (rules) because of {}", this, e.getMessage(), e);
throw new IllegalStateException(e);
}
}
/**
* {@inheritDoc}
*/
@Override
public synchronized boolean start() throws IllegalStateException {
/* policy-engine dispatch pre start hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeStart(this))
return true;
} catch (Exception e) {
logger.error("{}: feature {} before-start failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
boolean success = true;
if (this.locked)
throw new IllegalStateException("Engine is locked");
this.alive = true;
/* Start Policy Engine exclusively-owned (unmanaged) http servers */
for (HttpServletServer httpServer: this.httpServers) {
try {
if (!httpServer.waitedStart(5 * 1000L))
success = false;
} catch (Exception e) {
logger.error("{}: cannot start http-server {} because of {}", this,
httpServer, e.getMessage(), e);
}
}
/* Start Policy Engine exclusively-owned (unmanaged) sources */
for (TopicSource source: this.sources) {
try {
if (!source.start())
success = false;
} catch (Exception e) {
logger.error("{}: cannot start topic-source {} because of {}", this,
source, e.getMessage(), e);
}
}
/* Start Policy Engine owned (unmanaged) sinks */
for (TopicSink sink: this.sinks) {
try {
if (!sink.start())
success = false;
} catch (Exception e) {
logger.error("{}: cannot start topic-sink {} because of {}", this,
sink, e.getMessage(), e);
}
}
/* Start Policy Controllers */
List controllers = PolicyController.factory.inventory();
for (PolicyController controller : controllers) {
try {
if (!controller.start())
success = false;
} catch (Exception e) {
logger.error("{}: cannot start policy-controller {} because of {}", this,
controller, e.getMessage(), e);
success = false;
}
}
/* Start managed Topic Endpoints */
try {
if (!TopicEndpoint.manager.start())
success = false;
} catch (IllegalStateException e) {
logger.warn("{}: Topic Endpoint Manager is in an invalid state because of {}", this, e.getMessage(), e);
}
// Start the JMX listener
PdpJmxListener.start();
/* policy-engine dispatch after start hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterStart(this))
return success;
} catch (Exception e) {
logger.error("{}: feature {} after-start failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
return success;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized boolean stop() {
/* policy-engine dispatch pre stop hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeStop(this))
return true;
} catch (Exception e) {
logger.error("{}: feature {} before-stop failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
/* stop regardless of the lock state */
boolean success = true;
if (!this.alive)
return true;
this.alive = false;
List controllers = PolicyController.factory.inventory();
for (PolicyController controller : controllers) {
try {
if (!controller.stop())
success = false;
} catch (Exception e) {
logger.error("{}: cannot stop policy-controller {} because of {}", this,
controller, e.getMessage(), e);
success = false;
}
}
/* Stop Policy Engine owned (unmanaged) sources */
for (TopicSource source: this.sources) {
try {
if (!source.stop())
success = false;
} catch (Exception e) {
logger.error("{}: cannot start topic-source {} because of {}", this,
source, e.getMessage(), e);
}
}
/* Stop Policy Engine owned (unmanaged) sinks */
for (TopicSink sink: this.sinks) {
try {
if (!sink.stop())
success = false;
} catch (Exception e) {
logger.error("{}: cannot start topic-sink {} because of {}", this,
sink, e.getMessage(), e);
}
}
/* stop all managed topics sources and sinks */
if (!TopicEndpoint.manager.stop())
success = false;
/* stop all unmanaged http servers */
for (HttpServletServer httpServer: this.httpServers) {
try {
if (!httpServer.stop())
success = false;
} catch (Exception e) {
logger.error("{}: cannot start http-server {} because of {}", this,
httpServer, e.getMessage(), e);
}
}
/* policy-engine dispatch pre stop hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterStop(this))
return success;
} catch (Exception e) {
logger.error("{}: feature {} after-stop failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
return success;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void shutdown() throws IllegalStateException {
/* policy-engine dispatch pre shutdown hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeShutdown(this))
return;
} catch (Exception e) {
logger.error("{}: feature {} before-shutdown failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
this.alive = false;
/* Shutdown Policy Engine owned (unmanaged) sources */
for (TopicSource source: this.sources) {
try {
source.shutdown();
} catch (Exception e) {
logger.error("{}: cannot shutdown topic-source {} because of {}", this,
source, e.getMessage(), e);
}
}
/* Shutdown Policy Engine owned (unmanaged) sinks */
for (TopicSink sink: this.sinks) {
try {
sink.shutdown();
} catch (Exception e) {
logger.error("{}: cannot shutdown topic-sink {} because of {}", this,
sink, e.getMessage(), e);
}
}
/* Shutdown managed resources */
PolicyController.factory.shutdown();
TopicEndpoint.manager.shutdown();
HttpServletServer.factory.destroy();
// Stop the JMX listener
PdpJmxListener.stop();
/* policy-engine dispatch post shutdown hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterShutdown(this))
return;
} catch (Exception e) {
logger.error("{}: feature {} after-shutdown failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
logger.warn("{}: interrupted-exception while shutting down management server: ", this);
}
/* shutdown all unmanaged http servers */
for (HttpServletServer httpServer: getHttpServers()) {
try {
httpServer.shutdown();
} catch (Exception e) {
logger.error("{}: cannot shutdown http-server {} because of {}", this,
httpServer, e.getMessage(), e);
}
}
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
logger.warn("{}: interrupted-exception while shutting down management server: ", this);
}
System.exit(0);
}
}).start();
}
/**
* {@inheritDoc}
*/
@Override
public synchronized boolean isAlive() {
return this.alive;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized boolean lock() {
/* policy-engine dispatch pre lock hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeLock(this))
return true;
} catch (Exception e) {
logger.error("{}: feature {} before-lock failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
if (this.locked)
return true;
this.locked = true;
boolean success = true;
List controllers = PolicyController.factory.inventory();
for (PolicyController controller : controllers) {
try {
success = controller.lock() && success;
} catch (Exception e) {
logger.error("{}: cannot lock policy-controller {} because of {}", this,
controller, e.getMessage(), e);
success = false;
}
}
success = TopicEndpoint.manager.lock() && success;
/* policy-engine dispatch post lock hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterLock(this))
return success;
} catch (Exception e) {
logger.error("{}: feature {} after-lock failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
return success;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized boolean unlock() {
/* policy-engine dispatch pre unlock hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeUnlock(this))
return true;
} catch (Exception e) {
logger.error("{}: feature {} before-unlock failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
if (!this.locked)
return true;
this.locked = false;
boolean success = true;
List controllers = PolicyController.factory.inventory();
for (PolicyController controller : controllers) {
try {
success = controller.unlock() && success;
} catch (Exception e) {
logger.error("{}: cannot unlock policy-controller {} because of {}", this,
controller, e.getMessage(), e);
success = false;
}
}
success = TopicEndpoint.manager.unlock() && success;
/* policy-engine dispatch after unlock hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterUnlock(this))
return success;
} catch (Exception e) {
logger.error("{}: feature {} after-unlock failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
return success;
}
/**
* {@inheritDoc}
*/
@Override
public synchronized boolean isLocked() {
return this.locked;
}
/**
* {@inheritDoc}
*/
@Override
public void removePolicyController(String name) {
PolicyController.factory.destroy(name);
}
/**
* {@inheritDoc}
*/
@Override
public void removePolicyController(PolicyController controller) {
PolicyController.factory.destroy(controller);
}
/**
* {@inheritDoc}
*/
@JsonIgnore
@Override
public List getPolicyControllers() {
return PolicyController.factory.inventory();
}
/**
* {@inheritDoc}
*/
@JsonProperty("controllers")
@Override
public List getPolicyControllerIds() {
List controllerNames = new ArrayList();
for (PolicyController controller: PolicyController.factory.inventory()) {
controllerNames.add(controller.getName());
}
return controllerNames;
}
/**
* {@inheritDoc}
*/
@Override
@JsonIgnore
public Properties getProperties() {
return this.properties;
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public List getSources() {
return (List) this.sources;
}
/**
* {@inheritDoc}
*/
@SuppressWarnings("unchecked")
@Override
public List getSinks() {
return (List) this.sinks;
}
/**
* {@inheritDoc}
*/
@Override
public List getHttpServers() {
return this.httpServers;
}
/**
* {@inheritDoc}
*/
@Override
public List getFeatures() {
List features = new ArrayList();
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
features.add(feature.getName());
}
return features;
}
/**
* {@inheritDoc}
*/
@JsonIgnore
@Override
public List getFeatureProviders() {
return PolicyEngineFeatureAPI.providers.getList();
}
/**
* {@inheritDoc}
*/
@Override
public PolicyEngineFeatureAPI getFeatureProvider(String featureName) throws IllegalArgumentException {
if (featureName == null || featureName.isEmpty())
throw new IllegalArgumentException("A feature name must be provided");
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
if (feature.getName().equals(featureName))
return feature;
}
throw new IllegalArgumentException("Invalid Feature Name: " + featureName);
}
/**
* {@inheritDoc}
*/
@Override
public void onTopicEvent(CommInfrastructure commType, String topic, String event) {
/* configuration request */
try {
PdpdConfiguration configuration = this.decoder.fromJson(event, PdpdConfiguration.class);
this.configure(configuration);
} catch (Exception e) {
logger.error("{}: configuration-error due to {} because of {}",
this, event, e.getMessage(), e);
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean deliver(String topic, Object event)
throws IllegalArgumentException, IllegalStateException {
/*
* Note this entry point is usually from the DRL
*/
if (topic == null || topic.isEmpty())
throw new IllegalArgumentException("Invalid Topic");
if (event == null)
throw new IllegalArgumentException("Invalid Event");
if (!this.isAlive())
throw new IllegalStateException("Policy Engine is stopped");
if (this.isLocked())
throw new IllegalStateException("Policy Engine is locked");
List extends TopicSink> sinks =
TopicEndpoint.manager.getTopicSinks(topic);
if (sinks == null || sinks.isEmpty() || sinks.size() > 1)
throw new IllegalStateException
("Cannot ensure correct delivery on topic " + topic + ": " + sinks);
return this.deliver(sinks.get(0).getTopicCommInfrastructure(),
topic, event);
}
/**
* {@inheritDoc}
*/
@Override
public boolean deliver(String busType, String topic, Object event)
throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException {
/*
* Note this entry point is usually from the DRL (one of the reasons
* busType is String.
*/
if (busType == null || busType.isEmpty())
throw new IllegalArgumentException
("Invalid Communication Infrastructure");
if (topic == null || topic.isEmpty())
throw new IllegalArgumentException("Invalid Topic");
if (event == null)
throw new IllegalArgumentException("Invalid Event");
boolean valid = false;
for (Topic.CommInfrastructure comm: Topic.CommInfrastructure.values()) {
if (comm.name().equals(busType)) {
valid = true;
}
}
if (!valid)
throw new IllegalArgumentException
("Invalid Communication Infrastructure: " + busType);
if (!this.isAlive())
throw new IllegalStateException("Policy Engine is stopped");
if (this.isLocked())
throw new IllegalStateException("Policy Engine is locked");
return this.deliver(Topic.CommInfrastructure.valueOf(busType),
topic, event);
}
/**
* {@inheritDoc}
*/
@Override
public boolean deliver(Topic.CommInfrastructure busType,
String topic, Object event)
throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException {
if (topic == null || topic.isEmpty())
throw new IllegalArgumentException("Invalid Topic");
if (event == null)
throw new IllegalArgumentException("Invalid Event");
if (!this.isAlive())
throw new IllegalStateException("Policy Engine is stopped");
if (this.isLocked())
throw new IllegalStateException("Policy Engine is locked");
/* Try to send through the controller, this is the
* preferred way, since it may want to apply additional
* processing
*/
try {
DroolsController droolsController =
EventProtocolCoder.manager.getDroolsController(topic, event);
PolicyController controller = PolicyController.factory.get(droolsController);
if (controller != null)
return controller.deliver(busType, topic, event);
} catch (Exception e) {
logger.warn("{}: cannot find policy-controller to deliver {} over {}:{} because of {}",
this, event, busType, topic, e.getMessage(), e);
/* continue (try without routing through the controller) */
}
/*
* cannot route through the controller, send directly through
* the topic sink
*/
try {
String json = EventProtocolCoder.manager.encode(topic, event);
return this.deliver(busType, topic, json);
} catch (Exception e) {
logger.warn("{}: cannot deliver {} over {}:{} because of {}",
this, event, busType, topic, e.getMessage(), e);
throw e;
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean deliver(Topic.CommInfrastructure busType,
String topic, String event)
throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException {
if (topic == null || topic.isEmpty())
throw new IllegalArgumentException("Invalid Topic");
if (event == null || event.isEmpty())
throw new IllegalArgumentException("Invalid Event");
if (!this.isAlive())
throw new IllegalStateException("Policy Engine is stopped");
if (this.isLocked())
throw new IllegalStateException("Policy Engine is locked");
try {
TopicSink sink =
TopicEndpoint.manager.getTopicSink
(busType, topic);
if (sink == null)
throw new IllegalStateException("Inconsistent State: " + this);
return sink.send(event);
} catch (Exception e) {
logger.warn("{}: cannot deliver {} over {}:{} because of {}",
this, event, busType, topic, e.getMessage(), e);
throw e;
}
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void activate() {
/* policy-engine dispatch pre activate hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeActivate(this))
return;
} catch (Exception e) {
logger.error("{}: feature {} before-activate failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
// activate 'policy-management'
for (PolicyController policyController : getPolicyControllers()) {
try {
policyController.unlock();
policyController.start();
} catch (Exception e) {
logger.error("{}: cannot activate of policy-controller {} because of {}",
this, policyController,e.getMessage(), e);
} catch (LinkageError e) {
logger.error("{}: cannot activate (rules compilation) of policy-controller {} because of {}",
this, policyController,e.getMessage(), e);
}
}
this.unlock();
/* policy-engine dispatch post activate hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterActivate(this))
return;
} catch (Exception e) {
logger.error("{}: feature {} after-activate failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void deactivate() {
/* policy-engine dispatch pre deactivate hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.beforeDeactivate(this))
return;
} catch (Exception e) {
logger.error("{}: feature {} before-deactivate failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
this.lock();
for (PolicyController policyController : getPolicyControllers()) {
try {
policyController.stop();
} catch (Exception e) {
logger.error("{}: cannot deactivate (stop) policy-controller {} because of {}",
this, policyController, e.getMessage(), e);
} catch (LinkageError e) {
logger.error("{}: cannot deactivate (stop) policy-controller {} because of {}",
this, policyController, e.getMessage(), e);
}
}
/* policy-engine dispatch post deactivate hook */
for (PolicyEngineFeatureAPI feature : PolicyEngineFeatureAPI.providers.getList()) {
try {
if (feature.afterDeactivate(this))
return;
} catch (Exception e) {
logger.error("{}: feature {} after-deactivate failure because of {}",
this, feature.getClass().getName(), e.getMessage(), e);
}
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("PolicyEngineManager [alive=").append(alive).append(", locked=").append(locked).append("]");
return builder.toString();
}
}