e00515bd0f6a4ff96e236455859d910618f05d14
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.runtime.impl;
23
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonElement;
27 import com.google.gson.JsonParser;
28 import java.io.ByteArrayInputStream;
29 import java.io.ByteArrayOutputStream;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.Map;
33 import java.util.Map.Entry;
34 import java.util.concurrent.BlockingQueue;
35 import lombok.Setter;
36 import org.onap.policy.apex.context.ContextException;
37 import org.onap.policy.apex.context.ContextRuntimeException;
38 import org.onap.policy.apex.context.SchemaHelper;
39 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
40 import org.onap.policy.apex.core.engine.engine.ApexEngine;
41 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
42 import org.onap.policy.apex.core.engine.event.EnEvent;
43 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
44 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
45 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
46 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
49 import org.onap.policy.apex.model.basicmodel.service.ModelService;
50 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
51 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
52 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
53 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
54 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
55 import org.onap.policy.apex.service.engine.event.ApexEvent;
56 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
57 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
58 import org.onap.policy.apex.service.engine.runtime.EngineService;
59 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
60 import org.slf4j.ext.XLogger;
61 import org.slf4j.ext.XLoggerFactory;
62
63 /**
64  * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the
65  * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and
66  * when the policy is triggered it runs through to completion in the ApexEngine.
67  *
68  * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving
69  * events from it.
70  *
71  * @author Liam Fallon (liam.fallon@ericsson.com)
72  */
73 final class EngineWorker implements EngineService {
74     // Logger for this class
75     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
76
77     // Recurring string constants
78     private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
79     private static final String ENGINE_SUFFIX = " of this engine";
80     private static final String BAD_KEY_MATCH_TAG = " does not match the key";
81     private static final String ENGINE_KEY_PREFIX = "engine key ";
82
83     // The ID of this engine
84     private final AxArtifactKey engineWorkerKey;
85
86     // The Apex engine which is running the policies in this worker
87     private final ApexEngine engine;
88
89     // The event processor is an inner class, an instance of which runs as a thread that reads
90     // incoming events from a queue and forwards them to the Apex engine
91     private EventProcessor processor = null;
92
93     // Thread handling for the worker
94     private final ApplicationThreadFactory threadFactory;
95     private Thread processorThread;
96
97     // Converts ApexEvent instances to and from EnEvent instances
98     private ApexEvent2EnEventConverter apexEnEventConverter = null;
99
100     @Setter
101     private boolean isSubsequentInstance;
102
103     /**
104      * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
105      * {@link ApexModelReader} instance to read Apex models using JAXB.
106      *
107      * @param engineWorkerKey the engine worker key
108      * @param queue the queue on which events for this Apex worker will come
109      * @param threadFactory the thread factory to use for creating the event processing thread
110      * @throws ApexException thrown on errors on worker instantiation
111      */
112     protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
113         final ApplicationThreadFactory threadFactory) {
114         LOGGER.entry(engineWorkerKey);
115
116         this.engineWorkerKey = engineWorkerKey;
117         this.threadFactory = threadFactory;
118
119         // Create the Apex engine
120         engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
121
122         // Create and run the event processor
123         processor = new EventProcessor(queue);
124
125         // Set the Event converter up
126         apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
127
128         LOGGER.exit();
129     }
130
131     /**
132      * {@inheritDoc}.
133      */
134     @Override
135     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
136         engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
137     }
138
139     /**
140      * {@inheritDoc}.
141      */
142     @Override
143     public void deregisterActionListener(final String listenerName) {
144         engine.removeEventListener(listenerName);
145     }
146
147     /**
148      * {@inheritDoc}.
149      */
150     @Override
151     public EngineServiceEventInterface getEngineServiceEventInterface() {
152         throw new UnsupportedOperationException(
153             "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
154     }
155
156     /**
157      * {@inheritDoc}.
158      */
159     @Override
160     public AxArtifactKey getKey() {
161         return engineWorkerKey;
162     }
163
164     /**
165      * {@inheritDoc}.
166      */
167     @Override
168     public Collection<AxArtifactKey> getEngineKeys() {
169         return Arrays.asList(engineWorkerKey);
170     }
171
172     /**
173      * {@inheritDoc}.
174      */
175     @Override
176     public AxArtifactKey getApexModelKey() {
177         if (ModelService.existsModel(AxPolicyModel.class)) {
178             return ModelService.getModel(AxPolicyModel.class).getKey();
179         } else {
180             return null;
181         }
182     }
183
184     /**
185      * {@inheritDoc}.
186      */
187     @Override
188     public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
189         throws ApexException {
190         LOGGER.entry(engineKey);
191
192         // Read the Apex model into memory using the Apex Model Reader
193         AxPolicyModel apexPolicyModel = null;
194         try {
195             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
196             apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
197         } catch (final ApexModelException e) {
198             LOGGER.error("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
199             throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
200         }
201
202         // Update the Apex model in the Apex engine
203         updateModel(engineKey, apexPolicyModel, forceFlag);
204
205         LOGGER.exit();
206     }
207
208     /**
209      * {@inheritDoc}.
210      */
211     @Override
212     public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
213         throws ApexException {
214         LOGGER.entry(engineKey);
215
216         // Check if the key on the update request is correct
217         if (!engineWorkerKey.equals(engineKey)) {
218             String message = ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
219                 + ENGINE_SUFFIX;
220             LOGGER.warn(message);
221             throw new ApexException(message);
222         }
223
224         // Check model compatibility
225         if (ModelService.existsModel(AxPolicyModel.class)) {
226             // The current policy model may or may not be defined
227             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
228             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
229                 if (forceFlag) {
230                     LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
231                         + "\" is not a compatible model update from the existing engine model with key \""
232                         + currentModel.getKey().getId() + "\"");
233                 } else {
234                     throw new ContextException(
235                         "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
236                             + "\" is not a compatible model update from the existing engine model with key \""
237                             + currentModel.getKey().getId() + "\"");
238                 }
239             }
240         }
241         // Update the Apex model in the Apex engine
242         engine.updateModel(apexModel, isSubsequentInstance);
243
244         LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
245         LOGGER.exit();
246     }
247
248     /**
249      * {@inheritDoc}.
250      */
251     @Override
252     public AxEngineState getState() {
253         return engine.getState();
254     }
255
256     /**
257      * {@inheritDoc}.
258      */
259     @Override
260     public void startAll() throws ApexException {
261         start(this.getKey());
262     }
263
264     /**
265      * {@inheritDoc}.
266      */
267     @Override
268     public void start(final AxArtifactKey engineKey) throws ApexException {
269         LOGGER.entry(engineKey);
270
271         // Check if the key on the start request is correct
272         if (!engineWorkerKey.equals(engineKey)) {
273             LOGGER.warn(
274                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
275             throw new ApexException(
276                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
277         }
278
279         // Starts the event processing thread that handles incoming events
280         if (processorThread != null && processorThread.isAlive()) {
281             String message = ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state "
282                 + getState();
283             LOGGER.error(message);
284             throw new ApexException(message);
285         }
286
287         // Start the engine
288         engine.start();
289
290         // Start a thread to process events for the engine
291         processorThread = threadFactory.newThread(processor);
292         processorThread.start();
293
294         LOGGER.exit(engineKey);
295     }
296
297     /**
298      * {@inheritDoc}.
299      */
300     @Override
301     public void stop() throws ApexException {
302         stop(this.getKey());
303     }
304
305     /**
306      * {@inheritDoc}.
307      */
308     @Override
309     public void stop(final AxArtifactKey engineKey) throws ApexException {
310         // Check if the key on the start request is correct
311         if (!engineWorkerKey.equals(engineKey)) {
312             LOGGER.warn(
313                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
314             throw new ApexException(
315                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
316         }
317
318         // Interrupt the worker to stop its thread
319         if (processorThread == null || !processorThread.isAlive()) {
320             processorThread = null;
321
322             LOGGER
323                 .warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState());
324             return;
325         }
326
327         // Interrupt the thread that is handling events toward the engine
328         processorThread.interrupt();
329         processorThread = null;
330
331         // Stop the engine
332         engine.stop();
333
334         LOGGER.exit(engineKey);
335     }
336
337     /**
338      * {@inheritDoc}.
339      */
340     @Override
341     public void clear() throws ApexException {
342         clear(this.getKey());
343     }
344
345     /**
346      * {@inheritDoc}.
347      */
348     @Override
349     public void clear(final AxArtifactKey engineKey) throws ApexException {
350         // Check if the key on the start request is correct
351         if (!engineWorkerKey.equals(engineKey)) {
352             LOGGER.warn(
353                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
354             throw new ApexException(
355                 ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX);
356         }
357
358         // Interrupt the worker to stop its thread
359         if (processorThread != null && !processorThread.isAlive()) {
360             LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState());
361             return;
362         }
363
364         // Clear the engine
365         engine.clear();
366
367         LOGGER.exit(engineKey);
368     }
369
370     /**
371      * {@inheritDoc}.
372      */
373     @Override
374     public boolean isStarted() {
375         return isStarted(this.getKey());
376     }
377
378     /**
379      * {@inheritDoc}.
380      */
381     @Override
382     public boolean isStarted(final AxArtifactKey engineKey) {
383         final AxEngineState engstate = getState();
384         switch (engstate) {
385             case STOPPED:
386             case STOPPING:
387             case UNDEFINED:
388                 return false;
389             case EXECUTING:
390             case READY:
391                 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
392             default:
393                 break;
394         }
395         return false;
396     }
397
398     /**
399      * {@inheritDoc}.
400      */
401     @Override
402     public boolean isStopped() {
403         return isStopped(this.getKey());
404     }
405
406     /**
407      * {@inheritDoc}.
408      */
409     @Override
410     public boolean isStopped(final AxArtifactKey engineKey) {
411         final AxEngineState engstate = getState();
412         switch (engstate) {
413             case STOPPING:
414             case UNDEFINED:
415             case EXECUTING:
416             case READY:
417                 return false;
418             case STOPPED:
419                 return processorThread == null || !processorThread.isAlive();
420             default:
421                 break;
422         }
423         return false;
424     }
425
426     /**
427      * {@inheritDoc}.
428      */
429     @Override
430     public void startPeriodicEvents(final long period) {
431         throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
432     }
433
434     /**
435      * {@inheritDoc}.
436      */
437     @Override
438     public void stopPeriodicEvents() {
439         throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
440     }
441
442     /**
443      * {@inheritDoc}.
444      */
445     @Override
446     public String getStatus(final AxArtifactKey engineKey) {
447         // Get the information from the engine that we want to return
448         final AxEngineModel apexEngineModel = engine.getEngineStatus();
449         apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
450
451         // Convert that information into a string
452         try {
453             final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
454             final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
455             modelWriter.setJsonOutput(true);
456             modelWriter.write(apexEngineModel, baOutputStream);
457             return baOutputStream.toString();
458         } catch (final Exception e) {
459             LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
460             return null;
461         }
462     }
463
464     /**
465      * {@inheritDoc}.
466      */
467     @Override
468     public String getRuntimeInfo(final AxArtifactKey engineKey) {
469         // We'll build up the JSON string for runtime information bit by bit
470         final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
471
472         // Get the engine information
473         final AxEngineModel engineModel = engine.getEngineStatus();
474         final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
475
476         // Use GSON to convert our context information into JSON
477         final Gson gson = new GsonBuilder().setPrettyPrinting().create();
478
479         // Get context into a JSON string
480         runtimeJsonStringBuilder.append("{\"TimeStamp\":");
481         runtimeJsonStringBuilder.append(engineModel.getTimestamp());
482         runtimeJsonStringBuilder.append(",\"State\":");
483         runtimeJsonStringBuilder.append(engineModel.getState());
484         runtimeJsonStringBuilder.append(",\"Stats\":");
485         runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
486
487         // Get context into a JSON string
488         runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
489
490         boolean firstAlbum = true;
491         for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
492             if (firstAlbum) {
493                 firstAlbum = false;
494             } else {
495                 runtimeJsonStringBuilder.append(",");
496             }
497
498             runtimeJsonStringBuilder.append("{\"AlbumKey\":");
499             runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
500             runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
501
502             // Get the schema helper to use to marshal context album objects to JSON
503             final AxContextAlbum axContextAlbum = ModelService.getModel(AxContextAlbums.class)
504                 .get(contextAlbumEntry.getKey());
505             SchemaHelper schemaHelper = null;
506
507             try {
508                 // Get a schema helper to manage the translations between objects on the album map
509                 // for this album
510                 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
511                     axContextAlbum.getItemSchema());
512             } catch (final ContextRuntimeException e) {
513                 final String resultString = "could not find schema helper to marshal context album \"" + axContextAlbum
514                     + "\" to JSON";
515                 LOGGER.warn(resultString, e);
516
517                 // End of context album entry
518                 runtimeJsonStringBuilder.append(resultString);
519                 runtimeJsonStringBuilder.append("]}");
520
521                 continue;
522             }
523
524             boolean firstEntry = true;
525             for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
526                 if (firstEntry) {
527                     firstEntry = false;
528                 } else {
529                     runtimeJsonStringBuilder.append(",");
530                 }
531                 runtimeJsonStringBuilder.append("{\"EntryName\":");
532                 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
533                 runtimeJsonStringBuilder.append(",\"EntryContent\":");
534                 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
535
536                 // End of context entry
537                 runtimeJsonStringBuilder.append("}");
538             }
539
540             // End of context album entry
541             runtimeJsonStringBuilder.append("]}");
542         }
543
544         runtimeJsonStringBuilder.append("]}");
545
546         // Tidy up the JSON string
547         final JsonParser jsonParser = new JsonParser();
548         final JsonElement jsonElement = jsonParser.parse(runtimeJsonStringBuilder.toString());
549         final String tidiedRuntimeString = gson.toJson(jsonElement);
550
551         LOGGER.debug("runtime information={}", tidiedRuntimeString);
552
553         return tidiedRuntimeString;
554     }
555
556     /**
557      * This is an event processor thread, this class decouples the events handling logic from core business logic. This
558      * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the
559      * worker for processing by the Apex engine.
560      *
561      * @author Liam Fallon (liam.fallon@ericsson.com)
562      */
563     private class EventProcessor implements Runnable {
564         private final boolean debugEnabled = LOGGER.isDebugEnabled();
565         // the events queue
566         private BlockingQueue<ApexEvent> eventProcessingQueue = null;
567
568         /**
569          * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
570          *
571          * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events.
572          */
573         EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
574             this.eventProcessingQueue = eventProcessingQueue;
575         }
576
577         /**
578          * {@inheritDoc}.
579          */
580         @Override
581         public void run() {
582             LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
583
584             // Take events from the event processing queue of the worker and pass them to the engine
585             // for processing
586             boolean stopFlag = false;
587             while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) {
588                 ApexEvent event = null;
589                 try {
590                     event = eventProcessingQueue.take();
591                 } catch (final InterruptedException e) {
592                     // restore the interrupt status
593                     Thread.currentThread().interrupt();
594                     LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
595                     break;
596                 }
597
598                 try {
599                     if (event != null) {
600                         debugEventIfDebugEnabled(event);
601
602                         final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
603                         engine.handleEvent(enevent);
604                     }
605                 } catch (final ApexException e) {
606                     LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
607                 } catch (final Exception e) {
608                     LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
609                     stopFlag = true;
610                 }
611             }
612             LOGGER.debug("Engine {} completed processing", engineWorkerKey);
613         }
614
615         /**
616          * Debug the event if debug is enabled.
617          *
618          * @param event the event to debug
619          */
620         private void debugEventIfDebugEnabled(ApexEvent event) {
621             if (debugEnabled) {
622                 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);
623             }
624         }
625     }
626 }