a24d9618adc8ccc344d91fe5525b9ee018c8eb4d
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / runtime / impl / EngineWorker.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  * 
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  * 
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  * 
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.runtime.impl;
23
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonElement;
27 import com.google.gson.JsonParser;
28
29 import java.io.ByteArrayInputStream;
30 import java.io.ByteArrayOutputStream;
31 import java.util.Arrays;
32 import java.util.Collection;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.concurrent.BlockingQueue;
36
37 import org.onap.policy.apex.context.ContextException;
38 import org.onap.policy.apex.context.ContextRuntimeException;
39 import org.onap.policy.apex.context.SchemaHelper;
40 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
41 import org.onap.policy.apex.core.engine.engine.ApexEngine;
42 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
43 import org.onap.policy.apex.core.engine.event.EnEvent;
44 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
45 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
46 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
49 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
50 import org.onap.policy.apex.model.basicmodel.service.ModelService;
51 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
52 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
53 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
54 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
55 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
56 import org.onap.policy.apex.service.engine.event.ApexEvent;
57 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
58 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
59 import org.onap.policy.apex.service.engine.runtime.EngineService;
60 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
61 import org.slf4j.ext.XLogger;
62 import org.slf4j.ext.XLoggerFactory;
63
64 /**
65  * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the
66  * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and
67  * when the policy is triggered it runs through to completion in the ApexEngine.
68  *
69  * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving
70  * events from it.
71  *
72  * @author Liam Fallon (liam.fallon@ericsson.com)
73  */
74 final class EngineWorker implements EngineService {
75     // Logger for this class
76     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
77
78     // Recurring string constants
79     private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
80     private static final String ENGINE_SUFFIX = " of this engine";
81     private static final String BAD_KEY_MATCH_TAG = " does not match the key";
82     private static final String ENGINE_KEY_PREFIX = "engine key ";
83
84     // The ID of this engine
85     private final AxArtifactKey engineWorkerKey;
86
87     // The Apex engine which is running the policies in this worker
88     private final ApexEngine engine;
89
90     // The event processor is an inner class, an instance of which runs as a thread that reads
91     // incoming events from a queue and forwards them to the Apex engine
92     private EventProcessor processor = null;
93
94     // Thread handling for the worker
95     private final ApplicationThreadFactory threadFactory;
96     private Thread processorThread;
97
98     // Converts ApexEvent instances to and from EnEvent instances
99     private ApexEvent2EnEventConverter apexEnEventConverter = null;
100
101     /**
102      * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
103      * {@link ApexModelReader} instance to read Apex models using JAXB.
104      *
105      * @param engineWorkerKey the engine worker key
106      * @param queue the queue on which events for this Apex worker will come
107      * @param threadFactory the thread factory to use for creating the event processing thread
108      * @throws ApexException thrown on errors on worker instantiation
109      */
110     protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
111         final ApplicationThreadFactory threadFactory) {
112         LOGGER.entry(engineWorkerKey);
113
114         this.engineWorkerKey = engineWorkerKey;
115         this.threadFactory = threadFactory;
116
117         // Create the Apex engine
118         engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
119
120         // Create and run the event processor
121         processor = new EventProcessor(queue);
122
123         // Set the Event converter up
124         apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
125
126         LOGGER.exit();
127     }
128
129     /**
130      * {@inheritDoc}.
131      */
132     @Override
133     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
134         engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
135     }
136
137     /**
138      * {@inheritDoc}.
139      */
140     @Override
141     public void deregisterActionListener(final String listenerName) {
142         engine.removeEventListener(listenerName);
143     }
144
145     /**
146      * {@inheritDoc}.
147      */
148     @Override
149     public EngineServiceEventInterface getEngineServiceEventInterface() {
150         throw new UnsupportedOperationException(
151             "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
152     }
153
154     /**
155      * {@inheritDoc}.
156      */
157     @Override
158     public AxArtifactKey getKey() {
159         return engineWorkerKey;
160     }
161
162     /**
163      * {@inheritDoc}.
164      */
165     @Override
166     public Collection<AxArtifactKey> getEngineKeys() {
167         return Arrays.asList(engineWorkerKey);
168     }
169
170     /**
171      * {@inheritDoc}.
172      */
173     @Override
174     public AxArtifactKey getApexModelKey() {
175         if (ModelService.existsModel(AxPolicyModel.class)) {
176             return ModelService.getModel(AxPolicyModel.class).getKey();
177         } else {
178             return null;
179         }
180     }
181
182     /**
183      * {@inheritDoc}.
184      */
185     @Override
186     public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
187         throws ApexException {
188         LOGGER.entry(engineKey);
189
190         // Read the Apex model into memory using the Apex Model Reader
191         AxPolicyModel apexPolicyModel = null;
192         try {
193             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
194             apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
195         } catch (final ApexModelException e) {
196             LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
197             throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
198         }
199
200         // Update the Apex model in the Apex engine
201         updateModel(engineKey, apexPolicyModel, forceFlag);
202
203         LOGGER.exit();
204     }
205
206     /**
207      * {@inheritDoc}.
208      */
209     @Override
210     public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
211         throws ApexException {
212         LOGGER.entry(engineKey);
213
214         // Check if the key on the update request is correct
215         if (!engineWorkerKey.equals(engineKey)) {
216             String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
217                 + ENGINE_SUFFIX;
218             LOGGER.warn(message);
219             throw new ApexException(message);
220         }
221
222         // Check model compatibility
223         if (ModelService.existsModel(AxPolicyModel.class)) {
224             // The current policy model may or may not be defined
225             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
226             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
227                 if (forceFlag) {
228                     LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
229                         + "\" is not a compatible model update from the existing engine model with key \""
230                         + currentModel.getKey().getId() + "\"");
231                 } else {
232                     throw new ContextException(
233                         "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
234                             + "\" is not a compatible model update from the existing engine model with key \""
235                             + currentModel.getKey().getId() + "\"");
236                 }
237             }
238         }
239
240         // Update the Apex model in the Apex engine
241         engine.updateModel(apexModel);
242
243         LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
244         LOGGER.exit();
245     }
246
247     /**
248      * {@inheritDoc}.
249      */
250     @Override
251     public AxEngineState getState() {
252         return engine.getState();
253     }
254
255     /**
256      * {@inheritDoc}.
257      */
258     @Override
259     public void startAll() throws ApexException {
260         start(this.getKey());
261     }
262
263     /**
264      * {@inheritDoc}.
265      */
266     @Override
267     public void start(final AxArtifactKey engineKey) throws ApexException {
268         LOGGER.entry(engineKey);
269
270         // Check if the key on the start request is correct
271         if (!engineWorkerKey.equals(engineKey)) {
272             LOGGER.warn(
273                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
274             throw new ApexException(
275                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
276         }
277
278         // Starts the event processing thread that handles incoming events
279         if (processorThread != null && processorThread.isAlive()) {
280             String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
281                 + getState();
282             LOGGER.error(message);
283             throw new ApexException(message);
284         }
285
286         // Start the engine
287         engine.start();
288
289         // Start a thread to process events for the engine
290         processorThread = threadFactory.newThread(processor);
291         processorThread.start();
292
293         LOGGER.exit(engineKey);
294     }
295
296     /**
297      * {@inheritDoc}.
298      */
299     @Override
300     public void stop() throws ApexException {
301         stop(this.getKey());
302     }
303
304     /**
305      * {@inheritDoc}.
306      */
307     @Override
308     public void stop(final AxArtifactKey engineKey) throws ApexException {
309         // Check if the key on the start request is correct
310         if (!engineWorkerKey.equals(engineKey)) {
311             LOGGER.warn(
312                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
313             throw new ApexException(
314                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
315         }
316
317         // Interrupt the worker to stop its thread
318         if (processorThread == null || !processorThread.isAlive()) {
319             processorThread = null;
320
321             LOGGER
322                 .warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState());
323             return;
324         }
325
326         // Interrupt the thread that is handling events toward the engine
327         processorThread.interrupt();
328         processorThread = null;
329
330         // Stop the engine
331         engine.stop();
332
333         LOGGER.exit(engineKey);
334     }
335
336     /**
337      * {@inheritDoc}.
338      */
339     @Override
340     public void clear() throws ApexException {
341         clear(this.getKey());
342     }
343
344     /**
345      * {@inheritDoc}.
346      */
347     @Override
348     public void clear(final AxArtifactKey engineKey) throws ApexException {
349         // Check if the key on the start request is correct
350         if (!engineWorkerKey.equals(engineKey)) {
351             LOGGER.warn(
352                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
353             throw new ApexException(
354                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
355         }
356
357         // Interrupt the worker to stop its thread
358         if (processorThread != null && !processorThread.isAlive()) {
359             LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState());
360             return;
361         }
362
363         // Clear the engine
364         engine.clear();
365
366         LOGGER.exit(engineKey);
367     }
368
369     /**
370      * {@inheritDoc}.
371      */
372     @Override
373     public boolean isStarted() {
374         return isStarted(this.getKey());
375     }
376
377     /**
378      * {@inheritDoc}.
379      */
380     @Override
381     public boolean isStarted(final AxArtifactKey engineKey) {
382         final AxEngineState engstate = getState();
383         switch (engstate) {
384             case STOPPED:
385             case STOPPING:
386             case UNDEFINED:
387                 return false;
388             case EXECUTING:
389             case READY:
390                 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
391             default:
392                 break;
393         }
394         return false;
395     }
396
397     /**
398      * {@inheritDoc}.
399      */
400     @Override
401     public boolean isStopped() {
402         return isStopped(this.getKey());
403     }
404
405     /**
406      * {@inheritDoc}.
407      */
408     @Override
409     public boolean isStopped(final AxArtifactKey engineKey) {
410         final AxEngineState engstate = getState();
411         switch (engstate) {
412             case STOPPING:
413             case UNDEFINED:
414             case EXECUTING:
415             case READY:
416                 return false;
417             case STOPPED:
418                 return processorThread == null || !processorThread.isAlive();
419             default:
420                 break;
421         }
422         return false;
423     }
424
425     /**
426      * {@inheritDoc}.
427      */
428     @Override
429     public void startPeriodicEvents(final long period) {
430         throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
431     }
432
433     /**
434      * {@inheritDoc}.
435      */
436     @Override
437     public void stopPeriodicEvents() {
438         throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
439     }
440
441     /**
442      * {@inheritDoc}.
443      */
444     @Override
445     public String getStatus(final AxArtifactKey engineKey) {
446         // Get the information from the engine that we want to return
447         final AxEngineModel apexEngineModel = engine.getEngineStatus();
448         apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
449
450         // Convert that information into a string
451         try {
452             final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
453             final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
454             modelWriter.setJsonOutput(true);
455             modelWriter.write(apexEngineModel, baOutputStream);
456             return baOutputStream.toString();
457         } catch (final Exception e) {
458             LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
459             return null;
460         }
461     }
462
463     /**
464      * {@inheritDoc}.
465      */
466     @Override
467     public String getRuntimeInfo(final AxArtifactKey engineKey) {
468         // We'll build up the JSON string for runtime information bit by bit
469         final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
470
471         // Get the engine information
472         final AxEngineModel engineModel = engine.getEngineStatus();
473         final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
474
475         // Use GSON to convert our context information into JSON
476         final Gson gson = new GsonBuilder().setPrettyPrinting().create();
477
478         // Get context into a JSON string
479         runtimeJsonStringBuilder.append("{\"TimeStamp\":");
480         runtimeJsonStringBuilder.append(engineModel.getTimestamp());
481         runtimeJsonStringBuilder.append(",\"State\":");
482         runtimeJsonStringBuilder.append(engineModel.getState());
483         runtimeJsonStringBuilder.append(",\"Stats\":");
484         runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
485
486         // Get context into a JSON string
487         runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
488
489         boolean firstAlbum = true;
490         for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
491             if (firstAlbum) {
492                 firstAlbum = false;
493             } else {
494                 runtimeJsonStringBuilder.append(",");
495             }
496
497             runtimeJsonStringBuilder.append("{\"AlbumKey\":");
498             runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
499             runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
500
501             // Get the schema helper to use to marshal context album objects to JSON
502             final AxContextAlbum axContextAlbum = ModelService.getModel(AxContextAlbums.class)
503                 .get(contextAlbumEntry.getKey());
504             SchemaHelper schemaHelper = null;
505
506             try {
507                 // Get a schema helper to manage the translations between objects on the album map
508                 // for this album
509                 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
510                     axContextAlbum.getItemSchema());
511             } catch (final ContextRuntimeException e) {
512                 final String resultString = "could not find schema helper to marshal context album \"" + axContextAlbum
513                     + "\" to JSON";
514                 LOGGER.warn(resultString, e);
515
516                 // End of context album entry
517                 runtimeJsonStringBuilder.append(resultString);
518                 runtimeJsonStringBuilder.append("]}");
519
520                 continue;
521             }
522
523             boolean firstEntry = true;
524             for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
525                 if (firstEntry) {
526                     firstEntry = false;
527                 } else {
528                     runtimeJsonStringBuilder.append(",");
529                 }
530                 runtimeJsonStringBuilder.append("{\"EntryName\":");
531                 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
532                 runtimeJsonStringBuilder.append(",\"EntryContent\":");
533                 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
534
535                 // End of context entry
536                 runtimeJsonStringBuilder.append("}");
537             }
538
539             // End of context album entry
540             runtimeJsonStringBuilder.append("]}");
541         }
542
543         runtimeJsonStringBuilder.append("]}");
544
545         // Tidy up the JSON string
546         final JsonParser jsonParser = new JsonParser();
547         final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
548         final String tidiedRuntimeString = gson.toJson(jsonElement);
549
550         LOGGER.debug("runtime information={}", tidiedRuntimeString);
551
552         return tidiedRuntimeString;
553     }
554
555     /**
556      * This is an event processor thread, this class decouples the events handling logic from core business logic. This
557      * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the
558      * worker for processing by the Apex engine.
559      *
560      * @author Liam Fallon (liam.fallon@ericsson.com)
561      */
562     private class EventProcessor implements Runnable {
563         private final boolean debugEnabled = LOGGER.isDebugEnabled();
564         // the events queue
565         private BlockingQueue<ApexEvent> eventProcessingQueue = null;
566
567         /**
568          * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
569          *
570          * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events.
571          */
572         EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
573             this.eventProcessingQueue = eventProcessingQueue;
574         }
575
576         /**
577          * {@inheritDoc}.
578          */
579         @Override
580         public void run() {
581             LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
582
583             // Take events from the event processing queue of the worker and pass them to the engine
584             // for processing
585             boolean stopFlag = false;
586             while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) {
587                 ApexEvent event = null;
588                 try {
589                     event = eventProcessingQueue.take();
590                 } catch (final InterruptedException e) {
591                     // restore the interrupt status
592                     Thread.currentThread().interrupt();
593                     LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
594                     break;
595                 }
596
597                 try {
598                     if (event != null) {
599                         debugEventIfDebugEnabled(event);
600
601                         final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
602                         engine.handleEvent(enevent);
603                     }
604                 } catch (final ApexException e) {
605                     LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
606                 } catch (final Exception e) {
607                     LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
608                     stopFlag = true;
609                 }
610             }
611             LOGGER.debug("Engine {} completed processing", engineWorkerKey);
612         }
613
614         /**
615          * Debug the event if debug is enabled.
616          * 
617          * @param event the event to debug
618          */
619         private void debugEventIfDebugEnabled(ApexEvent event) {
620             if (debugEnabled) {
621                 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);
622             }
623         }
624     }
625 }