2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.runtime.impl;
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonElement;
27 import com.google.gson.JsonParser;
28 import java.io.ByteArrayInputStream;
29 import java.io.ByteArrayOutputStream;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.List;
35 import java.util.Map.Entry;
36 import java.util.concurrent.BlockingQueue;
38 import org.onap.policy.apex.context.ContextException;
39 import org.onap.policy.apex.context.ContextRuntimeException;
40 import org.onap.policy.apex.context.SchemaHelper;
41 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
42 import org.onap.policy.apex.core.engine.engine.ApexEngine;
43 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
44 import org.onap.policy.apex.core.engine.event.EnEvent;
45 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
46 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
47 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
49 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
50 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
51 import org.onap.policy.apex.model.basicmodel.service.ModelService;
52 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
53 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
54 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
55 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
56 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
57 import org.onap.policy.apex.service.engine.event.ApexEvent;
58 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
59 import org.onap.policy.apex.service.engine.main.ApexPolicyStatisticsManager;
60 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
61 import org.onap.policy.apex.service.engine.runtime.EngineService;
62 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
63 import org.slf4j.ext.XLogger;
64 import org.slf4j.ext.XLoggerFactory;
67 * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the
68 * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and
69 * when the policy is triggered it runs through to completion in the ApexEngine.
71 * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving
74 * @author Liam Fallon (liam.fallon@ericsson.com)
76 final class EngineWorker implements EngineService {
77 // Logger for this class
78 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
80 // Recurring string constants
81 private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
82 private static final String ENGINE_SUFFIX = " of this engine";
83 private static final String BAD_KEY_MATCH_TAG = " does not match the key";
84 private static final String ENGINE_KEY_PREFIX = "engine key ";
86 // The ID of this engine
87 private final AxArtifactKey engineWorkerKey;
89 // The Apex engine which is running the policies in this worker
90 private final ApexEngine engine;
92 // The event processor is an inner class, an instance of which runs as a thread that reads
93 // incoming events from a queue and forwards them to the Apex engine
94 private EventProcessor processor = null;
96 // Thread handling for the worker
97 private final ApplicationThreadFactory threadFactory;
98 private Thread processorThread;
100 // Converts ApexEvent instances to and from EnEvent instances
101 private ApexEvent2EnEventConverter apexEnEventConverter = null;
104 private boolean isSubsequentInstance;
107 * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
108 * {@link ApexModelReader} instance to read Apex models using JAXB.
110 * @param engineWorkerKey the engine worker key
111 * @param queue the queue on which events for this Apex worker will come
112 * @param threadFactory the thread factory to use for creating the event processing thread
113 * @throws ApexException thrown on errors on worker instantiation
115 protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
116 final ApplicationThreadFactory threadFactory) {
117 LOGGER.entry(engineWorkerKey);
119 this.engineWorkerKey = engineWorkerKey;
120 this.threadFactory = threadFactory;
122 // Create the Apex engine
123 engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
125 // Create and run the event processor
126 processor = new EventProcessor(queue);
128 // Set the Event converter up
129 apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
138 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
139 engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
146 public void deregisterActionListener(final String listenerName) {
147 engine.removeEventListener(listenerName);
154 public EngineServiceEventInterface getEngineServiceEventInterface() {
155 throw new UnsupportedOperationException(
156 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
163 public AxArtifactKey getKey() {
164 return engineWorkerKey;
171 public Collection<AxArtifactKey> getEngineKeys() {
172 return Arrays.asList(engineWorkerKey);
179 public AxArtifactKey getApexModelKey() {
180 if (ModelService.existsModel(AxPolicyModel.class)) {
181 return ModelService.getModel(AxPolicyModel.class).getKey();
191 public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
192 throws ApexException {
193 LOGGER.entry(engineKey);
195 // Read the Apex model into memory using the Apex Model Reader
196 AxPolicyModel apexPolicyModel = null;
198 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
199 apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
200 } catch (final ApexModelException e) {
201 LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
202 throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
205 // Update the Apex model in the Apex engine
206 updateModel(engineKey, apexPolicyModel, forceFlag);
215 public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
216 throws ApexException {
217 LOGGER.entry(engineKey);
219 // Check if the key on the update request is correct
220 if (!engineWorkerKey.equals(engineKey)) {
221 String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
223 LOGGER.warn(message);
224 throw new ApexException(message);
227 // Check model compatibility
228 if (ModelService.existsModel(AxPolicyModel.class)) {
229 // The current policy model may or may not be defined
230 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
231 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
233 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
234 + "\" is not a compatible model update from the existing engine model with key \""
235 + currentModel.getKey().getId() + "\"");
237 throw new ContextException(
238 "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
239 + "\" is not a compatible model update from the existing engine model with key \""
240 + currentModel.getKey().getId() + "\"");
244 // Update the Apex model in the Apex engine
245 engine.updateModel(apexModel, isSubsequentInstance);
247 LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
255 public AxEngineState getState() {
256 return engine.getState();
263 public void startAll() throws ApexException {
264 start(this.getKey());
271 public void start(final AxArtifactKey engineKey) throws ApexException {
272 LOGGER.entry(engineKey);
274 // Check if the key on the start request is correct
275 if (!engineWorkerKey.equals(engineKey)) {
277 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
278 throw new ApexException(
279 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
282 // Starts the event processing thread that handles incoming events
283 if (processorThread != null && processorThread.isAlive()) {
284 String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
286 LOGGER.error(message);
287 throw new ApexException(message);
293 // Start a thread to process events for the engine
294 processorThread = threadFactory.newThread(processor);
295 processorThread.start();
297 LOGGER.exit(engineKey);
304 public void stop() throws ApexException {
312 public void stop(final AxArtifactKey engineKey) throws ApexException {
313 // Check if the key on the start request is correct
314 if (!engineWorkerKey.equals(engineKey)) {
316 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
317 throw new ApexException(
318 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
321 // Interrupt the worker to stop its thread
322 if (processorThread == null || !processorThread.isAlive()) {
323 processorThread = null;
326 .warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState());
330 // Interrupt the thread that is handling events toward the engine
331 processorThread.interrupt();
332 processorThread = null;
337 LOGGER.exit(engineKey);
344 public void clear() throws ApexException {
345 clear(this.getKey());
352 public void clear(final AxArtifactKey engineKey) throws ApexException {
353 // Check if the key on the start request is correct
354 if (!engineWorkerKey.equals(engineKey)) {
356 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
357 throw new ApexException(
358 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
361 // Interrupt the worker to stop its thread
362 if (processorThread != null && !processorThread.isAlive()) {
363 LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState());
370 LOGGER.exit(engineKey);
377 public boolean isStarted() {
378 return isStarted(this.getKey());
385 public boolean isStarted(final AxArtifactKey engineKey) {
386 final AxEngineState engstate = getState();
394 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
405 public boolean isStopped() {
406 return isStopped(this.getKey());
413 public boolean isStopped(final AxArtifactKey engineKey) {
414 final AxEngineState engstate = getState();
422 return processorThread == null || !processorThread.isAlive();
433 public void startPeriodicEvents(final long period) {
434 throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
441 public void stopPeriodicEvents() {
442 throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
449 public String getStatus(final AxArtifactKey engineKey) {
450 // Get the information from the engine that we want to return
451 final AxEngineModel apexEngineModel = engine.getEngineStatus();
452 apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
454 // Convert that information into a string
456 final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
457 final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
458 modelWriter.setJsonOutput(true);
459 modelWriter.write(apexEngineModel, baOutputStream);
460 return baOutputStream.toString();
461 } catch (final Exception e) {
462 LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
471 public List<AxEngineModel> getEngineStats() {
472 List<AxEngineModel> engineStats = new ArrayList<>();
473 engineStats.add(engine.getEngineStatus());
481 public String getRuntimeInfo(final AxArtifactKey engineKey) {
482 // We'll build up the JSON string for runtime information bit by bit
483 final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
485 // Get the engine information
486 final AxEngineModel engineModel = engine.getEngineStatus();
487 final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
489 // Use GSON to convert our context information into JSON
490 final Gson gson = new GsonBuilder().setPrettyPrinting().create();
492 // Get context into a JSON string
493 runtimeJsonStringBuilder.append("{\"TimeStamp\":");
494 runtimeJsonStringBuilder.append(engineModel.getTimestamp());
495 runtimeJsonStringBuilder.append(",\"State\":");
496 runtimeJsonStringBuilder.append(engineModel.getState());
497 runtimeJsonStringBuilder.append(",\"Stats\":");
498 runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
500 // Get context into a JSON string
501 runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
503 boolean firstAlbum = true;
504 for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
508 runtimeJsonStringBuilder.append(",");
511 runtimeJsonStringBuilder.append("{\"AlbumKey\":");
512 runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
513 runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
515 // Get the schema helper to use to marshal context album objects to JSON
516 final AxContextAlbum axContextAlbum = ModelService.getModel(AxContextAlbums.class)
517 .get(contextAlbumEntry.getKey());
518 SchemaHelper schemaHelper = null;
521 // Get a schema helper to manage the translations between objects on the album map
523 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
524 axContextAlbum.getItemSchema());
525 } catch (final ContextRuntimeException e) {
526 final String resultString = "could not find schema helper to marshal context album \"" + axContextAlbum
528 LOGGER.warn(resultString, e);
530 // End of context album entry
531 runtimeJsonStringBuilder.append(resultString);
532 runtimeJsonStringBuilder.append("]}");
537 boolean firstEntry = true;
538 for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
542 runtimeJsonStringBuilder.append(",");
544 runtimeJsonStringBuilder.append("{\"EntryName\":");
545 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
546 runtimeJsonStringBuilder.append(",\"EntryContent\":");
547 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
549 // End of context entry
550 runtimeJsonStringBuilder.append("}");
553 // End of context album entry
554 runtimeJsonStringBuilder.append("]}");
557 runtimeJsonStringBuilder.append("]}");
559 // Tidy up the JSON string
560 final JsonParser jsonParser = new JsonParser();
561 final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
562 final String tidiedRuntimeString = gson.toJson(jsonElement);
564 LOGGER.debug("runtime information={}", tidiedRuntimeString);
566 return tidiedRuntimeString;
570 * This is an event processor thread, this class decouples the events handling logic from core business logic. This
571 * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the
572 * worker for processing by the Apex engine.
574 * @author Liam Fallon (liam.fallon@ericsson.com)
576 private class EventProcessor implements Runnable {
577 private final boolean debugEnabled = LOGGER.isDebugEnabled();
579 private BlockingQueue<ApexEvent> eventProcessingQueue = null;
582 * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
584 * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events.
586 EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
587 this.eventProcessingQueue = eventProcessingQueue;
595 LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
597 // Take events from the event processing queue of the worker and pass them to the engine
599 boolean stopFlag = false;
600 while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) {
601 ApexEvent event = null;
603 event = eventProcessingQueue.take();
604 } catch (final InterruptedException e) {
605 // restore the interrupt status
606 Thread.currentThread().interrupt();
607 LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
610 boolean executedResult = false;
613 debugEventIfDebugEnabled(event);
615 final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
616 executedResult = engine.handleEvent(enevent);
618 } catch (final ApexException e) {
619 LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
620 } catch (final Exception e) {
621 LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
624 ApexPolicyStatisticsManager apexPolicyCounter = ApexPolicyStatisticsManager.getInstanceFromRegistry();
625 if (!stopFlag && apexPolicyCounter != null) {
626 apexPolicyCounter.updatePolicyExecutedCounter(executedResult);
629 LOGGER.debug("Engine {} completed processing", engineWorkerKey);
633 * Debug the event if debug is enabled.
635 * @param event the event to debug
637 private void debugEventIfDebugEnabled(ApexEvent event) {
639 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);