2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Copyright (C) 2019 Nordix Foundation.
5 * ================================================================================
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
18 * SPDX-License-Identifier: Apache-2.0
19 * ============LICENSE_END=========================================================
22 package org.onap.policy.apex.service.engine.runtime.impl;
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonElement;
27 import com.google.gson.JsonParser;
29 import java.io.ByteArrayInputStream;
30 import java.io.ByteArrayOutputStream;
31 import java.util.Arrays;
32 import java.util.Collection;
34 import java.util.Map.Entry;
35 import java.util.concurrent.BlockingQueue;
37 import org.onap.policy.apex.context.ContextException;
38 import org.onap.policy.apex.context.ContextRuntimeException;
39 import org.onap.policy.apex.context.SchemaHelper;
40 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
41 import org.onap.policy.apex.core.engine.engine.ApexEngine;
42 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
43 import org.onap.policy.apex.core.engine.event.EnEvent;
44 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
45 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
46 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
49 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
50 import org.onap.policy.apex.model.basicmodel.service.ModelService;
51 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
52 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
53 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
54 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
55 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
56 import org.onap.policy.apex.service.engine.event.ApexEvent;
57 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
58 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
59 import org.onap.policy.apex.service.engine.runtime.EngineService;
60 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
61 import org.slf4j.ext.XLogger;
62 import org.slf4j.ext.XLoggerFactory;
65 * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the
66 * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and
67 * when the policy is triggered it runs through to completion in the ApexEngine.
69 * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving
72 * @author Liam Fallon (liam.fallon@ericsson.com)
74 final class EngineWorker implements EngineService {
75 // Logger for this class
76 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
78 // Recurring string constants
79 private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
80 private static final String ENGINE_SUFFIX = " of this engine";
81 private static final String BAD_KEY_MATCH_TAG = " does not match the key";
82 private static final String ENGINE_KEY_PREFIX = "engine key ";
84 // The ID of this engine
85 private final AxArtifactKey engineWorkerKey;
87 // The Apex engine which is running the policies in this worker
88 private final ApexEngine engine;
90 // The event processor is an inner class, an instance of which runs as a thread that reads
91 // incoming events from a queue and forwards them to the Apex engine
92 private EventProcessor processor = null;
94 // Thread handling for the worker
95 private final ApplicationThreadFactory threadFactory;
96 private Thread processorThread;
98 // Converts ApexEvent instances to and from EnEvent instances
99 private ApexEvent2EnEventConverter apexEnEventConverter = null;
102 * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
103 * {@link ApexModelReader} instance to read Apex models using JAXB.
105 * @param engineWorkerKey the engine worker key
106 * @param queue the queue on which events for this Apex worker will come
107 * @param threadFactory the thread factory to use for creating the event processing thread
108 * @throws ApexException thrown on errors on worker instantiation
110 protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
111 final ApplicationThreadFactory threadFactory) {
112 LOGGER.entry(engineWorkerKey);
114 this.engineWorkerKey = engineWorkerKey;
115 this.threadFactory = threadFactory;
117 // Create the Apex engine
118 engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
120 // Create and run the event processor
121 processor = new EventProcessor(queue);
123 // Set the Event converter up
124 apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
132 * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String,
133 * org.onap.policy.apex.service.engine.runtime.ApexEventListener)
136 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
137 engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
143 * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String)
146 public void deregisterActionListener(final String listenerName) {
147 engine.removeEventListener(listenerName);
153 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
156 public EngineServiceEventInterface getEngineServiceEventInterface() {
157 throw new UnsupportedOperationException(
158 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
164 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
167 public AxArtifactKey getKey() {
168 return engineWorkerKey;
174 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
177 public Collection<AxArtifactKey> getEngineKeys() {
178 return Arrays.asList(engineWorkerKey);
184 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
187 public AxArtifactKey getApexModelKey() {
188 if (ModelService.existsModel(AxPolicyModel.class)) {
189 return ModelService.getModel(AxPolicyModel.class).getKey();
198 * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. model.
199 * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
202 public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
203 throws ApexException {
204 LOGGER.entry(engineKey);
206 // Read the Apex model into memory using the Apex Model Reader
207 AxPolicyModel apexPolicyModel = null;
209 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
210 apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
211 } catch (final ApexModelException e) {
212 LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
213 throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
216 // Update the Apex model in the Apex engine
217 updateModel(engineKey, apexPolicyModel, forceFlag);
225 * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex. model.
226 * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
229 public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
230 throws ApexException {
231 LOGGER.entry(engineKey);
233 // Check if the key on the update request is correct
234 if (!engineWorkerKey.equals(engineKey)) {
235 String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
237 LOGGER.warn(message);
238 throw new ApexException(message);
241 // Check model compatibility
242 if (ModelService.existsModel(AxPolicyModel.class)) {
243 // The current policy model may or may not be defined
244 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
245 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
247 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
248 + "\" is not a compatible model update from the existing engine model with key \""
249 + currentModel.getKey().getId() + "\"");
251 throw new ContextException(
252 "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
253 + "\" is not a compatible model update from the existing engine model with key \""
254 + currentModel.getKey().getId() + "\"");
259 // Update the Apex model in the Apex engine
260 engine.updateModel(apexModel);
262 LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
269 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
272 public AxEngineState getState() {
273 return engine.getState();
279 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
282 public void startAll() throws ApexException {
283 start(this.getKey());
289 * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core. model.
290 * concepts.AxArtifactKey)
293 public void start(final AxArtifactKey engineKey) throws ApexException {
294 LOGGER.entry(engineKey);
296 // Check if the key on the start request is correct
297 if (!engineWorkerKey.equals(engineKey)) {
299 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
300 throw new ApexException(
301 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
304 // Starts the event processing thread that handles incoming events
305 if (processorThread != null && processorThread.isAlive()) {
306 String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
308 LOGGER.error(message);
309 throw new ApexException(message);
315 // Start a thread to process events for the engine
316 processorThread = threadFactory.newThread(processor);
317 processorThread.start();
319 LOGGER.exit(engineKey);
325 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
328 public void stop() throws ApexException {
335 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core. model.
336 * concepts.AxArtifactKey)
339 public void stop(final AxArtifactKey engineKey) throws ApexException {
340 // Check if the key on the start request is correct
341 if (!engineWorkerKey.equals(engineKey)) {
343 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
344 throw new ApexException(
345 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
348 // Interrupt the worker to stop its thread
349 if (processorThread == null || !processorThread.isAlive()) {
350 processorThread = null;
353 .warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState());
357 // Interrupt the thread that is handling events toward the engine
358 processorThread.interrupt();
359 processorThread = null;
364 LOGGER.exit(engineKey);
370 * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
373 public void clear() throws ApexException {
374 clear(this.getKey());
380 * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core. model.
381 * concepts.AxArtifactKey)
384 public void clear(final AxArtifactKey engineKey) throws ApexException {
385 // Check if the key on the start request is correct
386 if (!engineWorkerKey.equals(engineKey)) {
388 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
389 throw new ApexException(
390 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
393 // Interrupt the worker to stop its thread
394 if (processorThread != null && !processorThread.isAlive()) {
395 LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState());
402 LOGGER.exit(engineKey);
408 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
411 public boolean isStarted() {
412 return isStarted(this.getKey());
418 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex. model.
419 * basicmodel.concepts.AxArtifactKey)
422 public boolean isStarted(final AxArtifactKey engineKey) {
423 final AxEngineState engstate = getState();
431 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
441 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
444 public boolean isStopped() {
445 return isStopped(this.getKey());
451 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex. model.
452 * basicmodel.concepts.AxArtifactKey)
455 public boolean isStopped(final AxArtifactKey engineKey) {
456 final AxEngineState engstate = getState();
464 return processorThread == null || !processorThread.isAlive();
474 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
477 public void startPeriodicEvents(final long period) {
478 throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
484 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
487 public void stopPeriodicEvents() {
488 throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
494 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core .model
495 * .concepts.AxArtifactKey)
498 public String getStatus(final AxArtifactKey engineKey) {
499 // Get the information from the engine that we want to return
500 final AxEngineModel apexEngineModel = engine.getEngineStatus();
501 apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
503 // Convert that information into a string
505 final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
506 final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
507 modelWriter.setJsonOutput(true);
508 modelWriter.write(apexEngineModel, baOutputStream);
509 return baOutputStream.toString();
510 } catch (final Exception e) {
511 LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
519 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex
520 * .core.model.concepts.AxArtifactKey)
523 public String getRuntimeInfo(final AxArtifactKey engineKey) {
524 // We'll build up the JSON string for runtime information bit by bit
525 final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
527 // Get the engine information
528 final AxEngineModel engineModel = engine.getEngineStatus();
529 final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
531 // Use GSON to convert our context information into JSON
532 final Gson gson = new GsonBuilder().setPrettyPrinting().create();
534 // Get context into a JSON string
535 runtimeJsonStringBuilder.append("{\"TimeStamp\":");
536 runtimeJsonStringBuilder.append(engineModel.getTimestamp());
537 runtimeJsonStringBuilder.append(",\"State\":");
538 runtimeJsonStringBuilder.append(engineModel.getState());
539 runtimeJsonStringBuilder.append(",\"Stats\":");
540 runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
542 // Get context into a JSON string
543 runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
545 boolean firstAlbum = true;
546 for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
550 runtimeJsonStringBuilder.append(",");
553 runtimeJsonStringBuilder.append("{\"AlbumKey\":");
554 runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
555 runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
557 // Get the schema helper to use to marshal context album objects to JSON
558 final AxContextAlbum axContextAlbum = ModelService.getModel(AxContextAlbums.class)
559 .get(contextAlbumEntry.getKey());
560 SchemaHelper schemaHelper = null;
563 // Get a schema helper to manage the translations between objects on the album map
565 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
566 axContextAlbum.getItemSchema());
567 } catch (final ContextRuntimeException e) {
568 final String resultString = "could not find schema helper to marshal context album \"" + axContextAlbum
570 LOGGER.warn(resultString, e);
572 // End of context album entry
573 runtimeJsonStringBuilder.append(resultString);
574 runtimeJsonStringBuilder.append("]}");
579 boolean firstEntry = true;
580 for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
584 runtimeJsonStringBuilder.append(",");
586 runtimeJsonStringBuilder.append("{\"EntryName\":");
587 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
588 runtimeJsonStringBuilder.append(",\"EntryContent\":");
589 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
591 // End of context entry
592 runtimeJsonStringBuilder.append("}");
595 // End of context album entry
596 runtimeJsonStringBuilder.append("]}");
599 runtimeJsonStringBuilder.append("]}");
601 // Tidy up the JSON string
602 final JsonParser jsonParser = new JsonParser();
603 final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
604 final String tidiedRuntimeString = gson.toJson(jsonElement);
606 LOGGER.debug("runtime information={}", tidiedRuntimeString);
608 return tidiedRuntimeString;
612 * This is an event processor thread, this class decouples the events handling logic from core business logic. This
613 * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the
614 * worker for processing by the Apex engine.
616 * @author Liam Fallon (liam.fallon@ericsson.com)
618 private class EventProcessor implements Runnable {
619 private final boolean debugEnabled = LOGGER.isDebugEnabled();
621 private BlockingQueue<ApexEvent> eventProcessingQueue = null;
624 * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
626 * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events.
628 EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
629 this.eventProcessingQueue = eventProcessingQueue;
635 * @see java.lang.Runnable#run()
639 LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
641 // Take events from the event processing queue of the worker and pass them to the engine
643 boolean stopFlag = false;
644 while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) {
645 ApexEvent event = null;
647 event = eventProcessingQueue.take();
648 } catch (final InterruptedException e) {
649 // restore the interrupt status
650 Thread.currentThread().interrupt();
651 LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
657 debugEventIfDebugEnabled(event);
659 final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
660 engine.handleEvent(enevent);
662 } catch (final ApexException e) {
663 LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
664 } catch (final Exception e) {
665 LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
669 LOGGER.debug("Engine {} completed processing", engineWorkerKey);
673 * Debug the event if debug is enabled.
675 * @param event the event to debug
677 private void debugEventIfDebugEnabled(ApexEvent event) {
679 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);