2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019-2020 Nordix Foundation.
5 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.service.engine.runtime.impl;
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonParser;
27 import java.io.ByteArrayInputStream;
28 import java.io.ByteArrayOutputStream;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.List;
34 import java.util.Map.Entry;
35 import java.util.concurrent.BlockingQueue;
37 import org.onap.policy.apex.context.ContextException;
38 import org.onap.policy.apex.context.ContextRuntimeException;
39 import org.onap.policy.apex.context.SchemaHelper;
40 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
41 import org.onap.policy.apex.core.engine.engine.ApexEngine;
42 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
43 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
44 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
45 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
46 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
49 import org.onap.policy.apex.model.basicmodel.service.ModelService;
50 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
51 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
52 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
53 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
54 import org.onap.policy.apex.service.engine.event.ApexEvent;
55 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
56 import org.onap.policy.apex.service.engine.main.ApexPolicyStatisticsManager;
57 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
58 import org.onap.policy.apex.service.engine.runtime.EngineService;
59 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
60 import org.slf4j.ext.XLogger;
61 import org.slf4j.ext.XLoggerFactory;
64 * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the
65 * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and
66 * when the policy is triggered it runs through to completion in the ApexEngine.
68 * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving
71 * @author Liam Fallon (liam.fallon@ericsson.com)
73 final class EngineWorker implements EngineService {
74 // Logger for this class
75 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
77 // Recurring string constants
78 private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
79 private static final String ENGINE_SUFFIX = " of this engine";
80 private static final String BAD_KEY_MATCH_TAG = " does not match the key";
81 private static final String ENGINE_KEY_PREFIX = "engine key ";
83 // The ID of this engine
84 private final AxArtifactKey engineWorkerKey;
86 // The Apex engine which is running the policies in this worker
87 private final ApexEngine engine;
89 // The event processor is an inner class, an instance of which runs as a thread that reads
90 // incoming events from a queue and forwards them to the Apex engine
91 private EventProcessor processor = null;
93 // Thread handling for the worker
94 private final ApplicationThreadFactory threadFactory;
95 private Thread processorThread;
97 // Converts ApexEvent instances to and from EnEvent instances
98 private ApexEvent2EnEventConverter apexEnEventConverter = null;
101 private boolean isSubsequentInstance;
104 * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
105 * {@link ApexModelReader} instance to read Apex models using JAXB.
107 * @param engineWorkerKey the engine worker key
108 * @param queue the queue on which events for this Apex worker will come
109 * @param threadFactory the thread factory to use for creating the event processing thread
110 * @throws ApexException thrown on errors on worker instantiation
112 protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
113 final ApplicationThreadFactory threadFactory) {
114 LOGGER.entry(engineWorkerKey);
116 this.engineWorkerKey = engineWorkerKey;
117 this.threadFactory = threadFactory;
119 // Create the Apex engine
120 engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
122 // Create and run the event processor
123 processor = new EventProcessor(queue);
125 // Set the Event converter up
126 apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
135 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
136 engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
143 public void deregisterActionListener(final String listenerName) {
144 engine.removeEventListener(listenerName);
151 public EngineServiceEventInterface getEngineServiceEventInterface() {
152 throw new UnsupportedOperationException(
153 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
160 public AxArtifactKey getKey() {
161 return engineWorkerKey;
168 public Collection<AxArtifactKey> getEngineKeys() {
169 return Arrays.asList(engineWorkerKey);
176 public AxArtifactKey getApexModelKey() {
177 if (ModelService.existsModel(AxPolicyModel.class)) {
178 return ModelService.getModel(AxPolicyModel.class).getKey();
188 public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
189 throws ApexException {
190 LOGGER.entry(engineKey);
192 // Read the Apex model into memory using the Apex Model Reader
193 AxPolicyModel apexPolicyModel = null;
195 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
196 apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
197 } catch (final ApexModelException e) {
198 throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
201 // Update the Apex model in the Apex engine
202 updateModel(engineKey, apexPolicyModel, forceFlag);
211 public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
212 throws ApexException {
213 LOGGER.entry(engineKey);
215 // Check if the key on the update request is correct
216 if (!engineWorkerKey.equals(engineKey)) {
218 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX;
219 throw new ApexException(message);
222 // Check model compatibility
223 if (ModelService.existsModel(AxPolicyModel.class)) {
224 // The current policy model may or may not be defined
225 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
226 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
229 "apex model update forced, supplied model with key \"{}\" is not a compatible model update "
230 + "from the existing engine model with key \"{}\"",
231 apexModel.getKey().getId(), currentModel.getKey().getId());
233 throw new ContextException(
234 "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
235 + "\" is not a compatible model update from the existing engine model with key \""
236 + currentModel.getKey().getId() + "\"");
240 // Update the Apex model in the Apex engine
241 engine.updateModel(apexModel, isSubsequentInstance);
243 LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
251 public AxEngineState getState() {
252 return engine.getState();
259 public void startAll() throws ApexException {
260 start(this.getKey());
267 public void start(final AxArtifactKey engineKey) throws ApexException {
268 LOGGER.entry(engineKey);
270 // Check if the key on the start request is correct
271 if (!engineWorkerKey.equals(engineKey)) {
272 throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
276 // Starts the event processing thread that handles incoming events
277 if (processorThread != null && processorThread.isAlive()) {
279 ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state " + getState();
280 LOGGER.error(message);
281 throw new ApexException(message);
287 // Start a thread to process events for the engine
288 processorThread = threadFactory.newThread(processor);
289 processorThread.start();
291 LOGGER.exit(engineKey);
298 public void stop() throws ApexException {
306 public void stop(final AxArtifactKey engineKey) throws ApexException {
307 // Check if the key on the start request is correct
308 if (!engineWorkerKey.equals(engineKey)) {
309 LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
311 throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
315 // Interrupt the worker to stop its thread
316 if (processorThread == null || !processorThread.isAlive()) {
317 processorThread = null;
320 ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState());
324 // Interrupt the thread that is handling events toward the engine
325 processorThread.interrupt();
326 processorThread = null;
331 LOGGER.exit(engineKey);
338 public void clear() throws ApexException {
339 clear(this.getKey());
346 public void clear(final AxArtifactKey engineKey) throws ApexException {
347 // Check if the key on the start request is correct
348 if (!engineWorkerKey.equals(engineKey)) {
349 LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
351 throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
355 // Interrupt the worker to stop its thread
356 if (processorThread != null && !processorThread.isAlive()) {
357 LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState());
364 LOGGER.exit(engineKey);
371 public boolean isStarted() {
372 return isStarted(this.getKey());
379 public boolean isStarted(final AxArtifactKey engineKey) {
380 final AxEngineState engstate = getState();
388 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
399 public boolean isStopped() {
400 return isStopped(this.getKey());
407 public boolean isStopped(final AxArtifactKey engineKey) {
408 final AxEngineState engstate = getState();
416 return processorThread == null || !processorThread.isAlive();
427 public void startPeriodicEvents(final long period) {
428 throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
435 public void stopPeriodicEvents() {
436 throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
443 public String getStatus(final AxArtifactKey engineKey) {
444 // Get the information from the engine that we want to return
445 final AxEngineModel apexEngineModel = engine.getEngineStatus();
446 apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
448 // Convert that information into a string
450 final var baOutputStream = new ByteArrayOutputStream();
451 final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
452 modelWriter.setJsonOutput(true);
453 modelWriter.write(apexEngineModel, baOutputStream);
454 return baOutputStream.toString();
455 } catch (final Exception e) {
456 LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
465 public List<AxEngineModel> getEngineStats() {
466 List<AxEngineModel> engineStats = new ArrayList<>();
467 engineStats.add(engine.getEngineStatus());
475 public String getRuntimeInfo(final AxArtifactKey engineKey) {
476 // We'll build up the JSON string for runtime information bit by bit
477 final var runtimeJsonStringBuilder = new StringBuilder();
479 // Get the engine information
480 final AxEngineModel engineModel = engine.getEngineStatus();
481 final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
483 // Use GSON to convert our context information into JSON
484 final var gson = new GsonBuilder().setPrettyPrinting().create();
486 // Get context into a JSON string
487 runtimeJsonStringBuilder.append("{\"TimeStamp\":");
488 runtimeJsonStringBuilder.append(engineModel.getTimestamp());
489 runtimeJsonStringBuilder.append(",\"State\":");
490 runtimeJsonStringBuilder.append(engineModel.getState());
491 runtimeJsonStringBuilder.append(",\"Stats\":");
492 runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
494 // Get context into a JSON string
495 runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
497 var firstAlbum = true;
498 for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
502 runtimeJsonStringBuilder.append(",");
505 runtimeJsonStringBuilder.append("{\"AlbumKey\":");
506 runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
507 runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
509 // Get the schema helper to use to marshal context album objects to JSON
510 final var axContextAlbum =
511 ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey());
512 SchemaHelper schemaHelper = null;
515 // Get a schema helper to manage the translations between objects on the album map
517 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
518 axContextAlbum.getItemSchema());
519 } catch (final ContextRuntimeException e) {
520 final var resultString =
521 "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON";
522 LOGGER.warn(resultString, e);
524 // End of context album entry
525 runtimeJsonStringBuilder.append(resultString);
526 runtimeJsonStringBuilder.append("]}");
531 var firstEntry = true;
532 for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
536 runtimeJsonStringBuilder.append(",");
538 runtimeJsonStringBuilder.append("{\"EntryName\":");
539 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
540 runtimeJsonStringBuilder.append(",\"EntryContent\":");
541 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
543 // End of context entry
544 runtimeJsonStringBuilder.append("}");
547 // End of context album entry
548 runtimeJsonStringBuilder.append("]}");
551 runtimeJsonStringBuilder.append("]}");
553 // Tidy up the JSON string
554 final var jsonElement = JsonParser.parseString(runtimeJsonStringBuilder.toString());
555 final var tidiedRuntimeString = gson.toJson(jsonElement);
557 LOGGER.debug("runtime information={}", tidiedRuntimeString);
559 return tidiedRuntimeString;
563 * This is an event processor thread, this class decouples the events handling logic from core business logic. This
564 * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the
565 * worker for processing by the Apex engine.
567 * @author Liam Fallon (liam.fallon@ericsson.com)
569 private class EventProcessor implements Runnable {
570 private final boolean debugEnabled = LOGGER.isDebugEnabled();
572 private BlockingQueue<ApexEvent> eventProcessingQueue = null;
575 * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
577 * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events.
579 EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
580 this.eventProcessingQueue = eventProcessingQueue;
588 LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
590 // Take events from the event processing queue of the worker and pass them to the engine
592 var stopFlag = false;
593 while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) {
594 ApexEvent event = null;
596 event = eventProcessingQueue.take();
597 } catch (final InterruptedException e) {
598 // restore the interrupt status
599 Thread.currentThread().interrupt();
600 LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
603 var executedResult = false;
606 debugEventIfDebugEnabled(event);
608 final var enevent = apexEnEventConverter.fromApexEvent(event);
609 executedResult = engine.handleEvent(enevent);
611 } catch (final ApexException e) {
612 LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
613 } catch (final Exception e) {
614 LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
617 var apexPolicyCounter = ApexPolicyStatisticsManager.getInstanceFromRegistry();
618 if (!stopFlag && apexPolicyCounter != null) {
619 apexPolicyCounter.updatePolicyExecutedCounter(executedResult);
622 LOGGER.debug("Engine {} completed processing", engineWorkerKey);
626 * Debug the event if debug is enabled.
628 * @param event the event to debug
630 private void debugEventIfDebugEnabled(ApexEvent event) {
632 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);