2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.runtime.impl;
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.util.Arrays;
26 import java.util.Collection;
28 import java.util.Map.Entry;
29 import java.util.concurrent.BlockingQueue;
31 import org.onap.policy.apex.context.ContextException;
32 import org.onap.policy.apex.context.ContextRuntimeException;
33 import org.onap.policy.apex.context.SchemaHelper;
34 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
35 import org.onap.policy.apex.core.engine.engine.ApexEngine;
36 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
37 import org.onap.policy.apex.core.engine.event.EnEvent;
38 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
39 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
40 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
41 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
42 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
43 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
44 import org.onap.policy.apex.model.basicmodel.service.ModelService;
45 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
46 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
47 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
48 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
49 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
50 import org.onap.policy.apex.service.engine.event.ApexEvent;
51 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
52 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
53 import org.onap.policy.apex.service.engine.runtime.EngineService;
54 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
55 import org.slf4j.ext.XLogger;
56 import org.slf4j.ext.XLoggerFactory;
58 import com.google.gson.Gson;
59 import com.google.gson.GsonBuilder;
60 import com.google.gson.JsonElement;
61 import com.google.gson.JsonParser;
64 * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies
65 * defined in the {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy
66 * is triggered by an Apex event, and when the policy is triggered it runs through to completion in
69 * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it
70 * events, and receiving events from it.
72 * @author Liam Fallon (liam.fallon@ericsson.com)
74 final class EngineWorker implements EngineService {
75 // Logger for this class
76 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
78 // The ID of this engine
79 private final AxArtifactKey engineWorkerKey;
81 // The Apex engine which is running the policies in this worker
82 private final ApexEngine engine;
84 // The event processor is an inner class, an instance of which runs as a thread that reads
85 // incoming events from a queue and forwards them to the Apex engine
86 private EventProcessor processor = null;
88 // Thread handling for the worker
89 private final ApplicationThreadFactory threadFactory;
90 private Thread processorThread;
92 // Converts ApexEvent instances to and from EnEvent instances
93 private ApexEvent2EnEventConverter apexEnEventConverter = null;
96 * Constructor that creates an Apex engine, an event processor for events to be sent to that
97 * engine, and an {@link ApexModelReader} instance to read Apex models using JAXB.
99 * @param engineWorkerKey the engine worker key
100 * @param queue the queue on which events for this Apex worker will come
101 * @param threadFactory the thread factory to use for creating the event processing thread
102 * @throws ApexException thrown on errors on worker instantiation
104 EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
105 final ApplicationThreadFactory threadFactory) throws ApexException {
106 LOGGER.entry(engineWorkerKey);
108 this.engineWorkerKey = engineWorkerKey;
109 this.threadFactory = threadFactory;
111 // Create the Apex engine
112 engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
114 // Create and run the event processor
115 processor = new EventProcessor(queue);
117 // Set the Event converter up
118 apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
127 * org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang.
128 * String, org.onap.policy.apex.service.engine.runtime.ApexEventListener)
131 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
132 // Sanity checks on the Apex model
133 if (engine == null) {
134 LOGGER.warn("listener registration on engine with key " + engineWorkerKey.getID()
135 + ", failed, listener is null");
139 engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
146 * org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang.
150 public void deregisterActionListener(final String listenerName) {
151 // Sanity checks on the Apex model
152 if (engine == null) {
153 LOGGER.warn("listener deregistration on engine with key " + engineWorkerKey.getID()
154 + ", failed, listener is null");
158 engine.removeEventListener(listenerName);
165 * org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
168 public EngineServiceEventInterface getEngineServiceEventInterface() {
169 throw new UnsupportedOperationException(
170 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
176 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
179 public AxArtifactKey getKey() {
180 return engineWorkerKey;
186 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
189 public Collection<AxArtifactKey> getEngineKeys() {
190 return Arrays.asList(engineWorkerKey);
196 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
199 public AxArtifactKey getApexModelKey() {
200 if (ModelService.existsModel(AxPolicyModel.class)) {
201 return ModelService.getModel(AxPolicyModel.class).getKey();
211 * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.
212 * model. basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
215 public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
216 throws ApexException {
217 LOGGER.entry(engineKey);
219 // Read the Apex model into memory using the Apex Model Reader
220 AxPolicyModel apexPolicyModel = null;
222 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
223 apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
224 } catch (final ApexModelException e) {
225 LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getID(), e);
226 throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getID(), e);
229 if (apexPolicyModel == null) {
230 LOGGER.error("apex model null on engine " + engineKey.getID());
231 throw new ApexException("apex model null on engine " + engineKey.getID());
234 // Update the Apex model in the Apex engine
235 updateModel(engineKey, apexPolicyModel, forceFlag);
244 * org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.
245 * model. basicmodel.concepts.AxArtifactKey,
246 * org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
249 public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
250 throws ApexException {
251 LOGGER.entry(engineKey);
253 // Check if the key on the update request is correct
254 if (!engineWorkerKey.equals(engineKey)) {
255 LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID()
256 + " of this engine");
257 throw new ApexException("engine key " + engineKey.getID() + " does not match the key"
258 + engineWorkerKey.getID() + " of this engine");
261 // Sanity checks on the Apex model
262 if (engine == null) {
263 LOGGER.warn("engine with key " + engineKey.getID() + " not initialized");
264 throw new ApexException("engine with key " + engineKey.getID() + " not initialized");
267 // Check model compatibility
268 if (ModelService.existsModel(AxPolicyModel.class)) {
269 // The current policy model may or may not be defined
270 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
271 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
273 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getID()
274 + "\" is not a compatible model update from the existing engine model with key \""
275 + currentModel.getKey().getID() + "\"");
277 throw new ContextException(
278 "apex model update failed, supplied model with key \"" + apexModel.getKey().getID()
279 + "\" is not a compatible model update from the existing engine model with key \""
280 + currentModel.getKey().getID() + "\"");
285 // Update the Apex model in the Apex engine
286 engine.updateModel(apexModel);
288 LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getID(), engineWorkerKey);
295 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
298 public AxEngineState getState() {
299 return engine.getState();
305 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
308 public void startAll() throws ApexException {
309 start(this.getKey());
316 * org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.
317 * model. concepts.AxArtifactKey)
320 public void start(final AxArtifactKey engineKey) throws ApexException {
321 LOGGER.entry(engineKey);
323 // Check if the key on the start request is correct
324 if (!engineWorkerKey.equals(engineKey)) {
325 LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID()
326 + " of this engine");
327 throw new ApexException("engine key " + engineKey.getID() + " does not match the key"
328 + engineWorkerKey.getID() + " of this engine");
331 if (engine == null) {
332 LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " null");
333 throw new ApexException("apex engine for engine key" + engineWorkerKey.getID() + " null");
336 // Starts the event processing thread that handles incoming events
337 if (processorThread != null && processorThread.isAlive()) {
338 LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " is already running with state "
340 throw new ApexException("apex engine for engine key" + engineWorkerKey.getID()
341 + " is already running with state " + getState());
347 // Start a thread to process events for the engine
348 processorThread = threadFactory.newThread(processor);
349 processorThread.start();
351 LOGGER.exit(engineKey);
357 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
360 public void stop() throws ApexException {
368 * org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.
369 * model. concepts.AxArtifactKey)
372 public void stop(final AxArtifactKey engineKey) throws ApexException {
373 // Check if the key on the start request is correct
374 if (!engineWorkerKey.equals(engineKey)) {
375 LOGGER.warn("engine key " + engineKey.getID() + " does not match the key" + engineWorkerKey.getID()
376 + " of this engine");
377 throw new ApexException("engine key " + engineKey.getID() + " does not match the key"
378 + engineWorkerKey.getID() + " of this engine");
381 if (engine == null) {
382 LOGGER.error("apex engine for engine key" + engineWorkerKey.getID() + " null");
383 throw new ApexException("apex engine for engine key" + engineWorkerKey.getID() + " null");
386 // Interrupt the worker to stop its thread
387 if (processorThread == null || !processorThread.isAlive()) {
388 processorThread = null;
390 LOGGER.warn("apex engine for engine key" + engineWorkerKey.getID() + " is already stopped with state "
395 // Interrupt the thread that is handling events toward the engine
396 processorThread.interrupt();
402 LOGGER.exit(engineKey);
408 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
411 public boolean isStarted() {
412 return isStarted(this.getKey());
419 * org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.
420 * model. basicmodel.concepts.AxArtifactKey)
423 public boolean isStarted(final AxArtifactKey engineKey) {
424 final AxEngineState engstate = getState();
432 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
442 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
445 public boolean isStopped() {
446 return isStopped(this.getKey());
453 * org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.
454 * model. basicmodel.concepts.AxArtifactKey)
457 public boolean isStopped(final AxArtifactKey engineKey) {
458 final AxEngineState engstate = getState();
466 return processorThread == null || !processorThread.isAlive();
476 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
479 public void startPeriodicEvents(final long period) {
480 throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
486 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
489 public void stopPeriodicEvents() {
490 throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
497 * org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core
498 * .model .concepts.AxArtifactKey)
501 public String getStatus(final AxArtifactKey engineKey) {
502 // Get the information from the engine that we want to return
503 final AxEngineModel apexEngineModel = engine.getEngineStatus();
504 apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
506 // Convert that information into a string
508 final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
509 final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
510 modelWriter.write(apexEngineModel, baOutputStream);
511 return baOutputStream.toString();
512 } catch (final Exception e) {
513 LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
522 * org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex
523 * .core.model.concepts.AxArtifactKey)
526 public String getRuntimeInfo(final AxArtifactKey engineKey) {
527 // We'll build up the JSON string for runtime information bit by bit
528 final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
530 // Get the engine information
531 final AxEngineModel engineModel = engine.getEngineStatus();
532 final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
534 // Use GSON to convert our context information into JSON
535 final Gson gson = new GsonBuilder().setPrettyPrinting().create();
537 // Get context into a JSON string
538 runtimeJsonStringBuilder.append("{\"TimeStamp\":");
539 runtimeJsonStringBuilder.append(engineModel.getTimestamp());
540 runtimeJsonStringBuilder.append(",\"State\":");
541 runtimeJsonStringBuilder.append(engineModel.getState());
542 runtimeJsonStringBuilder.append(",\"Stats\":");
543 runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
545 // Get context into a JSON string
546 runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
548 boolean firstAlbum = true;
549 for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
553 runtimeJsonStringBuilder.append(",");
556 runtimeJsonStringBuilder.append("{\"AlbumKey\":");
557 runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
558 runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
561 // Get the schema helper to use to marshal context album objects to JSON
562 final AxContextAlbum axContextAlbum =
563 ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey());
564 SchemaHelper schemaHelper = null;
567 // Get a schema helper to manage the translations between objects on the album map
569 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
570 axContextAlbum.getItemSchema());
571 } catch (final ContextRuntimeException e) {
572 final String resultString =
573 "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON";
574 LOGGER.warn(resultString, e);
576 // End of context album entry
577 runtimeJsonStringBuilder.append(resultString);
578 runtimeJsonStringBuilder.append("]}");
583 boolean firstEntry = true;
584 for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
588 runtimeJsonStringBuilder.append(",");
590 runtimeJsonStringBuilder.append("{\"EntryName\":");
591 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
592 runtimeJsonStringBuilder.append(",\"EntryContent\":");
593 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
595 // End of context entry
596 runtimeJsonStringBuilder.append("}");
599 // End of context album entry
600 runtimeJsonStringBuilder.append("]}");
603 runtimeJsonStringBuilder.append("]}");
605 // Tidy up the JSON string
606 final JsonParser jsonParser = new JsonParser();
607 final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
608 final String tidiedRuntimeString = gson.toJson(jsonElement);
610 LOGGER.debug("runtime information=" + tidiedRuntimeString);
612 return tidiedRuntimeString;
616 * This is an event processor thread, this class decouples the events handling logic from core
617 * business logic. This class runs its own thread and continuously querying the blocking queue
618 * for the events that have been sent to the worker for processing by the Apex engine.
620 * @author Liam Fallon (liam.fallon@ericsson.com)
622 private class EventProcessor implements Runnable {
623 private final boolean debugEnabled = LOGGER.isDebugEnabled();
625 private BlockingQueue<ApexEvent> eventProcessingQueue = null;
628 * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
630 * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger
633 EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
634 this.eventProcessingQueue = eventProcessingQueue;
640 * @see java.lang.Runnable#run()
644 LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
646 // Take events from the event processing queue of the worker and pass them to the engine
648 while (!processorThread.isInterrupted()) {
649 ApexEvent event = null;
651 event = eventProcessingQueue.take();
652 } catch (final InterruptedException e) {
653 // restore the interrupt status
654 Thread.currentThread().interrupt();
655 LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
662 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);
664 final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
665 engine.handleEvent(enevent);
667 } catch (final ApexException e) {
668 LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
669 } catch (final Exception e) {
670 LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
674 LOGGER.debug("Engine {} completed processing", engineWorkerKey);