/*- * ============LICENSE_START======================================================= * Copyright (C) 2016-2018 Ericsson. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ package org.onap.policy.apex.service.engine.runtime.impl; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import org.onap.policy.apex.context.ContextException; import org.onap.policy.apex.context.ContextRuntimeException; import org.onap.policy.apex.context.SchemaHelper; import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory; import org.onap.policy.apex.core.engine.engine.ApexEngine; import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory; import org.onap.policy.apex.core.engine.event.EnEvent; import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.handling.ApexModelException; import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader; import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter; import org.onap.policy.apex.model.basicmodel.service.ModelService; import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum; import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel; import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState; import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel; import org.onap.policy.apex.service.engine.event.ApexEvent; import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter; import org.onap.policy.apex.service.engine.runtime.ApexEventListener; import org.onap.policy.apex.service.engine.runtime.EngineService; import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonParser; /** * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies * defined in the {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy * is triggered by an Apex event, and when the policy is triggered it runs through to completion in * the ApexEngine. * *

This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it * events, and receiving events from it. * * @author Liam Fallon (liam.fallon@ericsson.com) */ final class EngineWorker implements EngineService { // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class); // The ID of this engine private final AxArtifactKey engineWorkerKey; // The Apex engine which is running the policies in this worker private final ApexEngine engine; // The event processor is an inner class, an instance of which runs as a thread that reads // incoming events from a queue and forwards them to the Apex engine private EventProcessor processor = null; // Thread handling for the worker private final ApplicationThreadFactory threadFactory; private Thread processorThread; // Converts ApexEvent instances to and from EnEvent instances private ApexEvent2EnEventConverter apexEnEventConverter = null; /** * Constructor that creates an Apex engine, an event processor for events to be sent to that * engine, and an {@link ApexModelReader} instance to read Apex models using JAXB. * * @param engineWorkerKey the engine worker key * @param queue the queue on which events for this Apex worker will come * @param threadFactory the thread factory to use for creating the event processing thread * @throws ApexException thrown on errors on worker instantiation */ EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue queue, final ApplicationThreadFactory threadFactory) throws ApexException { LOGGER.entry(engineWorkerKey); this.engineWorkerKey = engineWorkerKey; this.threadFactory = threadFactory; // Create the Apex engine engine = new ApexEngineFactory().createApexEngine(engineWorkerKey); // Create and run the event processor processor = new EventProcessor(queue); // Set the Event converter up apexEnEventConverter = new ApexEvent2EnEventConverter(engine); LOGGER.exit(); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. * String, org.onap.policy.apex.service.engine.runtime.ApexEventListener) */ @Override public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) { // Sanity checks on the Apex model if (engine == null) { LOGGER.warn("listener registration on engine with key " + engineWorkerKey.getID() + ", failed, listener is null"); return; } engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter)); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. * String) */ @Override public void deregisterActionListener(final String listenerName) { // Sanity checks on the Apex model if (engine == null) { LOGGER.warn("listener deregistration on engine with key " + engineWorkerKey.getID() + ", failed, listener is null"); return; } engine.removeEventListener(listenerName); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface() */ @Override public EngineServiceEventInterface getEngineServiceEventInterface() { throw new UnsupportedOperationException( "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker"); } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey() */ @Override public AxArtifactKey getKey() { return engineWorkerKey; } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo() */ @Override public Collection getEngineKeys() { return Arrays.asList(engineWorkerKey); } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey() */ @Override public AxArtifactKey getApexModelKey() { if (ModelService.existsModel(AxPolicyModel.class)) { return ModelService.getModel(AxPolicyModel.class).getKey(); } else { return null; } } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. * model. basicmodel.concepts.AxArtifactKey, java.lang.String, boolean) */ @Override public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag) throws ApexException { LOGGER.entry(engineKey); // Read the Apex model into memory using the Apex Model Reader AxPolicyModel apexPolicyModel = null; try { final ApexModelReader modelReader = new ApexModelReader<>(AxPolicyModel.class); apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes())); } catch (final ApexModelException e) { LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getID(), e); throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getID(), e); } if (apexPolicyModel == null) { LOGGER.error("apex model null on engine " + engineKey.getID()); throw new ApexException("apex model null on engine " + engineKey.getID()); } // Update the Apex model in the Apex engine updateModel(engineKey, apexPolicyModel, forceFlag); LOGGER.exit(); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. * model. basicmodel.concepts.AxArtifactKey, * org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean) */ @Override public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag) throws ApexException { LOGGER.entry(engineKey); // Check if the key on the update request is correct if (!engineWorkerKey.equals(engineKey)) { LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + " of this engine"); throw new ApexException("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + " of this engine"); } // Sanity checks on the Apex model if (engine == null) { LOGGER.warn("engine with key " + engineKey.getID() + " not initialized"); throw new ApexException("engine with key " + engineKey.getID() + " not initialized"); } // Check model compatibility if (ModelService.existsModel(AxPolicyModel.class)) { // The current policy model may or may not be defined final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class); if (!currentModel.getKey().isCompatible(apexModel.getKey())) { if (forceFlag) { LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getID() + "\" is not a compatible model update from the existing engine model with key \"" + currentModel.getKey().getID() + "\""); } else { throw new ContextException( "apex model update failed, supplied model with key \"" + apexModel.getKey().getID() + "\" is not a compatible model update from the existing engine model with key \"" + currentModel.getKey().getID() + "\""); } } } // Update the Apex model in the Apex engine engine.updateModel(apexModel); LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getID(), engineWorkerKey); LOGGER.exit(); } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState() */ @Override public AxEngineState getState() { return engine.getState(); } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll() */ @Override public void startAll() throws ApexException { start(this.getKey()); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core. * model. concepts.AxArtifactKey) */ @Override public void start(final AxArtifactKey engineKey) throws ApexException { LOGGER.entry(engineKey); // Check if the key on the start request is correct if (!engineWorkerKey.equals(engineKey)) { LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + " of this engine"); throw new ApexException("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + " of this engine"); } if (engine == null) { LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " null"); throw new ApexException("apex engine for engine key" + engineWorkerKey.getID() + " null"); } // Starts the event processing thread that handles incoming events if (processorThread != null && processorThread.isAlive()) { LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " is already running with state " + getState()); throw new ApexException("apex engine for engine key" + engineWorkerKey.getID() + " is already running with state " + getState()); } // Start the engine engine.start(); // Start a thread to process events for the engine processorThread = threadFactory.newThread(processor); processorThread.start(); LOGGER.exit(engineKey); } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop() */ @Override public void stop() throws ApexException { stop(this.getKey()); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core. * model. concepts.AxArtifactKey) */ @Override public void stop(final AxArtifactKey engineKey) throws ApexException { // Check if the key on the start request is correct if (!engineWorkerKey.equals(engineKey)) { LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + " of this engine"); throw new ApexException("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID() + " of this engine"); } if (engine == null) { LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " null"); throw new ApexException("apex engine for engine key" + engineWorkerKey.getID() + " null"); } // Interrupt the worker to stop its thread if (processorThread == null || !processorThread.isAlive()) { processorThread = null; LOGGER.warn("apex engine for engine key" + engineWorkerKey.getID() + " is already stopped with state " + getState()); return; } // Interrupt the thread that is handling events toward the engine processorThread.interrupt(); // Stop the engine engine.stop(); engine.clear(); LOGGER.exit(engineKey); } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted() */ @Override public boolean isStarted() { return isStarted(this.getKey()); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex. * model. basicmodel.concepts.AxArtifactKey) */ @Override public boolean isStarted(final AxArtifactKey engineKey) { final AxEngineState engstate = getState(); switch (engstate) { case STOPPED: case STOPPING: case UNDEFINED: return false; case EXECUTING: case READY: return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted(); default: break; } return false; } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped() */ @Override public boolean isStopped() { return isStopped(this.getKey()); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex. * model. basicmodel.concepts.AxArtifactKey) */ @Override public boolean isStopped(final AxArtifactKey engineKey) { final AxEngineState engstate = getState(); switch (engstate) { case STOPPING: case UNDEFINED: case EXECUTING: case READY: return false; case STOPPED: return processorThread == null || !processorThread.isAlive(); default: break; } return false; } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long) */ @Override public void startPeriodicEvents(final long period) { throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker"); } /* * (non-Javadoc) * * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents() */ @Override public void stopPeriodicEvents() { throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker"); } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core * .model .concepts.AxArtifactKey) */ @Override public String getStatus(final AxArtifactKey engineKey) { // Get the information from the engine that we want to return final AxEngineModel apexEngineModel = engine.getEngineStatus(); apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel); // Convert that information into a string try { final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream(); final ApexModelWriter modelWriter = new ApexModelWriter<>(AxEngineModel.class); modelWriter.write(apexEngineModel, baOutputStream); return baOutputStream.toString(); } catch (final Exception e) { LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e); return null; } } /* * (non-Javadoc) * * @see * org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex * .core.model.concepts.AxArtifactKey) */ @Override public String getRuntimeInfo(final AxArtifactKey engineKey) { // We'll build up the JSON string for runtime information bit by bit final StringBuilder runtimeJsonStringBuilder = new StringBuilder(); // Get the engine information final AxEngineModel engineModel = engine.getEngineStatus(); final Map> engineContextAlbums = engine.getEngineContext(); // Use GSON to convert our context information into JSON final Gson gson = new GsonBuilder().setPrettyPrinting().create(); // Get context into a JSON string runtimeJsonStringBuilder.append("{\"TimeStamp\":"); runtimeJsonStringBuilder.append(engineModel.getTimestamp()); runtimeJsonStringBuilder.append(",\"State\":"); runtimeJsonStringBuilder.append(engineModel.getState()); runtimeJsonStringBuilder.append(",\"Stats\":"); runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats())); // Get context into a JSON string runtimeJsonStringBuilder.append(",\"ContextAlbums\":["); boolean firstAlbum = true; for (final Entry> contextAlbumEntry : engineContextAlbums.entrySet()) { if (firstAlbum) { firstAlbum = false; } else { runtimeJsonStringBuilder.append(","); } runtimeJsonStringBuilder.append("{\"AlbumKey\":"); runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey())); runtimeJsonStringBuilder.append(",\"AlbumContent\":["); // Get the schema helper to use to marshal context album objects to JSON final AxContextAlbum axContextAlbum = ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey()); SchemaHelper schemaHelper = null; try { // Get a schema helper to manage the translations between objects on the album map // for this album schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(), axContextAlbum.getItemSchema()); } catch (final ContextRuntimeException e) { final String resultString = "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON"; LOGGER.warn(resultString, e); // End of context album entry runtimeJsonStringBuilder.append(resultString); runtimeJsonStringBuilder.append("]}"); continue; } boolean firstEntry = true; for (final Entry contextEntry : contextAlbumEntry.getValue().entrySet()) { if (firstEntry) { firstEntry = false; } else { runtimeJsonStringBuilder.append(","); } runtimeJsonStringBuilder.append("{\"EntryName\":"); runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey())); runtimeJsonStringBuilder.append(",\"EntryContent\":"); runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue()))); // End of context entry runtimeJsonStringBuilder.append("}"); } // End of context album entry runtimeJsonStringBuilder.append("]}"); } runtimeJsonStringBuilder.append("]}"); // Tidy up the JSON string final JsonParser jsonParser = new JsonParser(); final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString()); final String tidiedRuntimeString = gson.toJson(jsonElement); LOGGER.debug("runtime information=" + tidiedRuntimeString); return tidiedRuntimeString; } /** * This is an event processor thread, this class decouples the events handling logic from core * business logic. This class runs its own thread and continuously querying the blocking queue * for the events that have been sent to the worker for processing by the Apex engine. * * @author Liam Fallon (liam.fallon@ericsson.com) */ private class EventProcessor implements Runnable { private final boolean debugEnabled = LOGGER.isDebugEnabled(); // the events queue private BlockingQueue eventProcessingQueue = null; /** * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects. * * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger * events. */ EventProcessor(final BlockingQueue eventProcessingQueue) { this.eventProcessingQueue = eventProcessingQueue; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { LOGGER.debug("Engine {} processing ... ", engineWorkerKey); // Take events from the event processing queue of the worker and pass them to the engine // for processing while (!processorThread.isInterrupted()) { ApexEvent event = null; try { event = eventProcessingQueue.take(); } catch (final InterruptedException e) { // restore the interrupt status Thread.currentThread().interrupt(); LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey); break; } try { if (event != null) { if (debugEnabled) { LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event); } final EnEvent enevent = apexEnEventConverter.fromApexEvent(event); engine.handleEvent(enevent); } } catch (final ApexException e) { LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e); } catch (final Exception e) { LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e); break; } } LOGGER.debug("Engine {} completed processing", engineWorkerKey); } } }