2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.runtime.impl;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25 import com.google.gson.JsonElement;
26 import com.google.gson.JsonParser;
28 import java.io.ByteArrayInputStream;
29 import java.io.ByteArrayOutputStream;
30 import java.util.Arrays;
31 import java.util.Collection;
33 import java.util.Map.Entry;
34 import java.util.concurrent.BlockingQueue;
36 import org.onap.policy.apex.context.ContextException;
37 import org.onap.policy.apex.context.ContextRuntimeException;
38 import org.onap.policy.apex.context.SchemaHelper;
39 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
40 import org.onap.policy.apex.core.engine.engine.ApexEngine;
41 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
42 import org.onap.policy.apex.core.engine.event.EnEvent;
43 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
44 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
45 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
46 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
49 import org.onap.policy.apex.model.basicmodel.service.ModelService;
50 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
51 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
52 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
53 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
54 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
55 import org.onap.policy.apex.service.engine.event.ApexEvent;
56 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
57 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
58 import org.onap.policy.apex.service.engine.runtime.EngineService;
59 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
60 import org.slf4j.ext.XLogger;
61 import org.slf4j.ext.XLoggerFactory;
64 * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies
65 * defined in the {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy
66 * is triggered by an Apex event, and when the policy is triggered it runs through to completion in
69 * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it
70 * events, and receiving events from it.
72 * @author Liam Fallon (liam.fallon@ericsson.com)
74 final class EngineWorker implements EngineService {
75 // Logger for this class
76 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
78 // Recurring string constants
79 private static final String IS_NULL_SUFFIX = " is null";
80 private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
81 private static final String ENGINE_SUFFIX = " of this engine";
82 private static final String BAD_KEY_MATCH_TAG = " does not match the key";
83 private static final String ENGINE_KEY_PREFIX = "engine key ";
85 // The ID of this engine
86 private final AxArtifactKey engineWorkerKey;
88 // The Apex engine which is running the policies in this worker
89 private final ApexEngine engine;
91 // The event processor is an inner class, an instance of which runs as a thread that reads
92 // incoming events from a queue and forwards them to the Apex engine
93 private EventProcessor processor = null;
95 // Thread handling for the worker
96 private final ApplicationThreadFactory threadFactory;
97 private Thread processorThread;
99 // Converts ApexEvent instances to and from EnEvent instances
100 private ApexEvent2EnEventConverter apexEnEventConverter = null;
103 * Constructor that creates an Apex engine, an event processor for events to be sent to that
104 * engine, and an {@link ApexModelReader} instance to read Apex models using JAXB.
106 * @param engineWorkerKey the engine worker key
107 * @param queue the queue on which events for this Apex worker will come
108 * @param threadFactory the thread factory to use for creating the event processing thread
109 * @throws ApexException thrown on errors on worker instantiation
111 EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
112 final ApplicationThreadFactory threadFactory) {
113 LOGGER.entry(engineWorkerKey);
115 this.engineWorkerKey = engineWorkerKey;
116 this.threadFactory = threadFactory;
118 // Create the Apex engine
119 engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
121 // Create and run the event processor
122 processor = new EventProcessor(queue);
124 // Set the Event converter up
125 apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
134 * org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang.
135 * String, org.onap.policy.apex.service.engine.runtime.ApexEventListener)
138 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
139 // Sanity checks on the Apex model
140 if (engine == null) {
141 LOGGER.warn("listener registration on engine with key " + engineWorkerKey.getId()
142 + ", failed, listener is null");
146 engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
153 * org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang.
157 public void deregisterActionListener(final String listenerName) {
158 // Sanity checks on the Apex model
159 if (engine == null) {
160 LOGGER.warn("listener deregistration on engine with key " + engineWorkerKey.getId()
161 + ", failed, listener is null");
165 engine.removeEventListener(listenerName);
172 * org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
175 public EngineServiceEventInterface getEngineServiceEventInterface() {
176 throw new UnsupportedOperationException(
177 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
183 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
186 public AxArtifactKey getKey() {
187 return engineWorkerKey;
193 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
196 public Collection<AxArtifactKey> getEngineKeys() {
197 return Arrays.asList(engineWorkerKey);
203 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
206 public AxArtifactKey getApexModelKey() {
207 if (ModelService.existsModel(AxPolicyModel.class)) {
208 return ModelService.getModel(AxPolicyModel.class).getKey();
218 * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.
219 * model. basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
222 public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
223 throws ApexException {
224 LOGGER.entry(engineKey);
226 // Read the Apex model into memory using the Apex Model Reader
227 AxPolicyModel apexPolicyModel = null;
229 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
230 apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
231 } catch (final ApexModelException e) {
232 LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
233 throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
236 if (apexPolicyModel == null) {
237 LOGGER.error("apex model null on engine " + engineKey.getId());
238 throw new ApexException("apex model null on engine " + engineKey.getId());
241 // Update the Apex model in the Apex engine
242 updateModel(engineKey, apexPolicyModel, forceFlag);
251 * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.
252 * model. basicmodel.concepts.AxArtifactKey,
253 * org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
256 public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
257 throws ApexException {
258 LOGGER.entry(engineKey);
260 // Check if the key on the update request is correct
261 if (!engineWorkerKey.equals(engineKey)) {
262 String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
264 LOGGER.warn(message);
265 throw new ApexException(message);
268 // Sanity checks on the Apex model
269 if (engine == null) {
270 LOGGER.warn("engine with key " + engineKey.getId() + " not initialized");
271 throw new ApexException("engine with key " + engineKey.getId() + " not initialized");
274 // Check model compatibility
275 if (ModelService.existsModel(AxPolicyModel.class)) {
276 // The current policy model may or may not be defined
277 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
278 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
280 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
281 + "\" is not a compatible model update from the existing engine model with key \""
282 + currentModel.getKey().getId() + "\"");
284 throw new ContextException(
285 "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
286 + "\" is not a compatible model update from the existing engine model with key \""
287 + currentModel.getKey().getId() + "\"");
292 // Update the Apex model in the Apex engine
294 engine.updateModel(apexModel);
296 LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
303 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
306 public AxEngineState getState() {
307 return engine.getState();
313 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
316 public void startAll() throws ApexException {
317 start(this.getKey());
324 * org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.
325 * model. concepts.AxArtifactKey)
328 public void start(final AxArtifactKey engineKey) throws ApexException {
329 LOGGER.entry(engineKey);
331 // Check if the key on the start request is correct
332 if (!engineWorkerKey.equals(engineKey)) {
333 LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
335 throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
336 + engineWorkerKey.getId() + ENGINE_SUFFIX);
339 if (engine == null) {
340 String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is null";
341 LOGGER.error(message);
342 throw new ApexException(message);
345 // Starts the event processing thread that handles incoming events
346 if (processorThread != null && processorThread.isAlive()) {
347 String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
349 LOGGER.error(message);
350 throw new ApexException(message);
356 // Start a thread to process events for the engine
357 processorThread = threadFactory.newThread(processor);
358 processorThread.start();
360 LOGGER.exit(engineKey);
366 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
369 public void stop() throws ApexException {
377 * org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.
378 * model. concepts.AxArtifactKey)
381 public void stop(final AxArtifactKey engineKey) throws ApexException {
382 // Check if the key on the start request is correct
383 if (!engineWorkerKey.equals(engineKey)) {
384 LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
386 throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
387 + engineWorkerKey.getId() + ENGINE_SUFFIX);
390 if (engine == null) {
391 String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is null";
392 LOGGER.error(message);
393 throw new ApexException(message);
396 // Interrupt the worker to stop its thread
397 if (processorThread == null || !processorThread.isAlive()) {
398 processorThread = null;
400 LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state "
405 // Interrupt the thread that is handling events toward the engine
406 processorThread.interrupt();
407 processorThread = null;
412 LOGGER.exit(engineKey);
418 * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
421 public void clear() throws ApexException {
422 clear(this.getKey());
429 * org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core.
430 * model. concepts.AxArtifactKey)
433 public void clear(final AxArtifactKey engineKey) throws ApexException {
434 // Check if the key on the start request is correct
435 if (!engineWorkerKey.equals(engineKey)) {
436 LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
438 throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG
439 + engineWorkerKey.getId() + ENGINE_SUFFIX);
442 if (engine == null) {
443 LOGGER.error(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + IS_NULL_SUFFIX);
444 throw new ApexException(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + IS_NULL_SUFFIX);
447 // Interrupt the worker to stop its thread
448 if (processorThread != null && !processorThread.isAlive()) {
449 LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state "
457 LOGGER.exit(engineKey);
463 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
466 public boolean isStarted() {
467 return isStarted(this.getKey());
474 * org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.
475 * model. basicmodel.concepts.AxArtifactKey)
478 public boolean isStarted(final AxArtifactKey engineKey) {
479 final AxEngineState engstate = getState();
487 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
497 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
500 public boolean isStopped() {
501 return isStopped(this.getKey());
508 * org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.
509 * model. basicmodel.concepts.AxArtifactKey)
512 public boolean isStopped(final AxArtifactKey engineKey) {
513 final AxEngineState engstate = getState();
521 return processorThread == null || !processorThread.isAlive();
531 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
534 public void startPeriodicEvents(final long period) {
535 throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
541 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
544 public void stopPeriodicEvents() {
545 throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
552 * org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core
553 * .model .concepts.AxArtifactKey)
556 public String getStatus(final AxArtifactKey engineKey) {
557 // Get the information from the engine that we want to return
558 final AxEngineModel apexEngineModel = engine.getEngineStatus();
559 apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
561 // Convert that information into a string
563 final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
564 final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
565 modelWriter.write(apexEngineModel, baOutputStream);
566 return baOutputStream.toString();
567 } catch (final Exception e) {
568 LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
577 * org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex
578 * .core.model.concepts.AxArtifactKey)
581 public String getRuntimeInfo(final AxArtifactKey engineKey) {
582 // We'll build up the JSON string for runtime information bit by bit
583 final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
585 // Get the engine information
586 final AxEngineModel engineModel = engine.getEngineStatus();
587 final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
589 // Use GSON to convert our context information into JSON
590 final Gson gson = new GsonBuilder().setPrettyPrinting().create();
592 // Get context into a JSON string
593 runtimeJsonStringBuilder.append("{\"TimeStamp\":");
594 runtimeJsonStringBuilder.append(engineModel.getTimestamp());
595 runtimeJsonStringBuilder.append(",\"State\":");
596 runtimeJsonStringBuilder.append(engineModel.getState());
597 runtimeJsonStringBuilder.append(",\"Stats\":");
598 runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
600 // Get context into a JSON string
601 runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
603 boolean firstAlbum = true;
604 for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
608 runtimeJsonStringBuilder.append(",");
611 runtimeJsonStringBuilder.append("{\"AlbumKey\":");
612 runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
613 runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
616 // Get the schema helper to use to marshal context album objects to JSON
617 final AxContextAlbum axContextAlbum =
618 ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey());
619 SchemaHelper schemaHelper = null;
622 // Get a schema helper to manage the translations between objects on the album map
624 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
625 axContextAlbum.getItemSchema());
626 } catch (final ContextRuntimeException e) {
627 final String resultString =
628 "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON";
629 LOGGER.warn(resultString, e);
631 // End of context album entry
632 runtimeJsonStringBuilder.append(resultString);
633 runtimeJsonStringBuilder.append("]}");
638 boolean firstEntry = true;
639 for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
643 runtimeJsonStringBuilder.append(",");
645 runtimeJsonStringBuilder.append("{\"EntryName\":");
646 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
647 runtimeJsonStringBuilder.append(",\"EntryContent\":");
648 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
650 // End of context entry
651 runtimeJsonStringBuilder.append("}");
654 // End of context album entry
655 runtimeJsonStringBuilder.append("]}");
658 runtimeJsonStringBuilder.append("]}");
660 // Tidy up the JSON string
661 final JsonParser jsonParser = new JsonParser();
662 final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
663 final String tidiedRuntimeString = gson.toJson(jsonElement);
665 LOGGER.debug("runtime information={}", tidiedRuntimeString);
667 return tidiedRuntimeString;
671 * This is an event processor thread, this class decouples the events handling logic from core
672 * business logic. This class runs its own thread and continuously querying the blocking queue
673 * for the events that have been sent to the worker for processing by the Apex engine.
675 * @author Liam Fallon (liam.fallon@ericsson.com)
677 private class EventProcessor implements Runnable {
678 private final boolean debugEnabled = LOGGER.isDebugEnabled();
680 private BlockingQueue<ApexEvent> eventProcessingQueue = null;
683 * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
685 * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger
688 EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
689 this.eventProcessingQueue = eventProcessingQueue;
695 * @see java.lang.Runnable#run()
699 LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
701 // Take events from the event processing queue of the worker and pass them to the engine
703 boolean stopFlag = false;
704 while (!processorThread.isInterrupted() && ! stopFlag) {
705 ApexEvent event = null;
707 event = eventProcessingQueue.take();
708 } catch (final InterruptedException e) {
709 // restore the interrupt status
710 Thread.currentThread().interrupt();
711 LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
717 debugEventIfDebugEnabled(event);
719 final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
720 engine.handleEvent(enevent);
722 } catch (final ApexException e) {
723 LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
724 } catch (final Exception e) {
725 LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
729 LOGGER.debug("Engine {} completed processing", engineWorkerKey);
733 * Debug the event if debug is enabled.
735 * @param event the event to debug
737 private void debugEventIfDebugEnabled(ApexEvent event) {
739 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);