f69eae664f99da0d2473961f06a99dc4645d262f
[policy/apex-pdp.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  *  Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.apex.service.engine.runtime.impl;
24
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonParser;
27 import java.io.ByteArrayInputStream;
28 import java.io.ByteArrayOutputStream;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collection;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Map.Entry;
35 import java.util.concurrent.BlockingQueue;
36 import lombok.Setter;
37 import org.onap.policy.apex.context.ContextException;
38 import org.onap.policy.apex.context.ContextRuntimeException;
39 import org.onap.policy.apex.context.SchemaHelper;
40 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
41 import org.onap.policy.apex.core.engine.engine.ApexEngine;
42 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
43 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
44 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
45 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
46 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
47 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
48 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
49 import org.onap.policy.apex.model.basicmodel.service.ModelService;
50 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
51 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
52 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
53 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
54 import org.onap.policy.apex.service.engine.event.ApexEvent;
55 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
56 import org.onap.policy.apex.service.engine.main.ApexPolicyStatisticsManager;
57 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
58 import org.onap.policy.apex.service.engine.runtime.EngineService;
59 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
60 import org.slf4j.ext.XLogger;
61 import org.slf4j.ext.XLoggerFactory;
62
63 /**
64  * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the
65  * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and
66  * when the policy is triggered it runs through to completion in the ApexEngine.
67  *
68  * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving
69  * events from it.
70  *
71  * @author Liam Fallon (liam.fallon@ericsson.com)
72  */
73 final class EngineWorker implements EngineService {
74     // Logger for this class
75     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
76
77     // Recurring string constants
78     private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
79     private static final String ENGINE_SUFFIX = " of this engine";
80     private static final String BAD_KEY_MATCH_TAG = " does not match the key";
81     private static final String ENGINE_KEY_PREFIX = "engine key ";
82
83     // The ID of this engine
84     private final AxArtifactKey engineWorkerKey;
85
86     // The Apex engine which is running the policies in this worker
87     private final ApexEngine engine;
88
89     // The event processor is an inner class, an instance of which runs as a thread that reads
90     // incoming events from a queue and forwards them to the Apex engine
91     private EventProcessor processor = null;
92
93     // Thread handling for the worker
94     private final ApplicationThreadFactory threadFactory;
95     private Thread processorThread;
96
97     // Converts ApexEvent instances to and from EnEvent instances
98     private ApexEvent2EnEventConverter apexEnEventConverter = null;
99
100     @Setter
101     private boolean isSubsequentInstance;
102
103     /**
104      * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
105      * {@link ApexModelReader} instance to read Apex models using JAXB.
106      *
107      * @param engineWorkerKey the engine worker key
108      * @param queue the queue on which events for this Apex worker will come
109      * @param threadFactory the thread factory to use for creating the event processing thread
110      * @throws ApexException thrown on errors on worker instantiation
111      */
112     protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
113             final ApplicationThreadFactory threadFactory) {
114         LOGGER.entry(engineWorkerKey);
115
116         this.engineWorkerKey = engineWorkerKey;
117         this.threadFactory = threadFactory;
118
119         // Create the Apex engine
120         engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
121
122         // Create and run the event processor
123         processor = new EventProcessor(queue);
124
125         // Set the Event converter up
126         apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
127
128         LOGGER.exit();
129     }
130
131     /**
132      * {@inheritDoc}.
133      */
134     @Override
135     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
136         engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
137     }
138
139     /**
140      * {@inheritDoc}.
141      */
142     @Override
143     public void deregisterActionListener(final String listenerName) {
144         engine.removeEventListener(listenerName);
145     }
146
147     /**
148      * {@inheritDoc}.
149      */
150     @Override
151     public EngineServiceEventInterface getEngineServiceEventInterface() {
152         throw new UnsupportedOperationException(
153                 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
154     }
155
156     /**
157      * {@inheritDoc}.
158      */
159     @Override
160     public AxArtifactKey getKey() {
161         return engineWorkerKey;
162     }
163
164     /**
165      * {@inheritDoc}.
166      */
167     @Override
168     public Collection<AxArtifactKey> getEngineKeys() {
169         return Arrays.asList(engineWorkerKey);
170     }
171
172     /**
173      * {@inheritDoc}.
174      */
175     @Override
176     public AxArtifactKey getApexModelKey() {
177         if (ModelService.existsModel(AxPolicyModel.class)) {
178             return ModelService.getModel(AxPolicyModel.class).getKey();
179         } else {
180             return null;
181         }
182     }
183
184     /**
185      * {@inheritDoc}.
186      */
187     @Override
188     public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
189             throws ApexException {
190         LOGGER.entry(engineKey);
191
192         // Read the Apex model into memory using the Apex Model Reader
193         AxPolicyModel apexPolicyModel = null;
194         try {
195             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
196             apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
197         } catch (final ApexModelException e) {
198             throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
199         }
200
201         // Update the Apex model in the Apex engine
202         updateModel(engineKey, apexPolicyModel, forceFlag);
203
204         LOGGER.exit();
205     }
206
207     /**
208      * {@inheritDoc}.
209      */
210     @Override
211     public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
212             throws ApexException {
213         LOGGER.entry(engineKey);
214
215         // Check if the key on the update request is correct
216         if (!engineWorkerKey.equals(engineKey)) {
217             String message =
218                     ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX;
219             throw new ApexException(message);
220         }
221
222         // Check model compatibility
223         if (ModelService.existsModel(AxPolicyModel.class)) {
224             // The current policy model may or may not be defined
225             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
226             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
227                 if (forceFlag) {
228                     LOGGER.warn(
229                             "apex model update forced, supplied model with key \"{}\" is not a compatible model update "
230                                     + "from the existing engine model with key \"{}\"",
231                             apexModel.getKey().getId(), currentModel.getKey().getId());
232                 } else {
233                     throw new ContextException(
234                             "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
235                                     + "\" is not a compatible model update from the existing engine model with key \""
236                                     + currentModel.getKey().getId() + "\"");
237                 }
238             }
239         }
240         // Update the Apex model in the Apex engine
241         engine.updateModel(apexModel, isSubsequentInstance);
242
243         LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
244         LOGGER.exit();
245     }
246
247     /**
248      * {@inheritDoc}.
249      */
250     @Override
251     public AxEngineState getState() {
252         return engine.getState();
253     }
254
255     /**
256      * {@inheritDoc}.
257      */
258     @Override
259     public void startAll() throws ApexException {
260         start(this.getKey());
261     }
262
263     /**
264      * {@inheritDoc}.
265      */
266     @Override
267     public void start(final AxArtifactKey engineKey) throws ApexException {
268         LOGGER.entry(engineKey);
269
270         // Check if the key on the start request is correct
271         if (!engineWorkerKey.equals(engineKey)) {
272             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
273                     + ENGINE_SUFFIX);
274         }
275
276         // Starts the event processing thread that handles incoming events
277         if (processorThread != null && processorThread.isAlive()) {
278             String message =
279                     ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state " + getState();
280             LOGGER.error(message);
281             throw new ApexException(message);
282         }
283
284         // Start the engine
285         engine.start();
286
287         // Start a thread to process events for the engine
288         processorThread = threadFactory.newThread(processor);
289         processorThread.start();
290
291         LOGGER.exit(engineKey);
292     }
293
294     /**
295      * {@inheritDoc}.
296      */
297     @Override
298     public void stop() throws ApexException {
299         stop(this.getKey());
300     }
301
302     /**
303      * {@inheritDoc}.
304      */
305     @Override
306     public void stop(final AxArtifactKey engineKey) throws ApexException {
307         // Check if the key on the start request is correct
308         if (!engineWorkerKey.equals(engineKey)) {
309             LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
310                     + ENGINE_SUFFIX);
311             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
312                     + ENGINE_SUFFIX);
313         }
314
315         // Interrupt the worker to stop its thread
316         if (processorThread == null || !processorThread.isAlive()) {
317             processorThread = null;
318
319             LOGGER.warn(
320                     ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState());
321             return;
322         }
323
324         // Interrupt the thread that is handling events toward the engine
325         processorThread.interrupt();
326         processorThread = null;
327
328         // Stop the engine
329         engine.stop();
330
331         LOGGER.exit(engineKey);
332     }
333
334     /**
335      * {@inheritDoc}.
336      */
337     @Override
338     public void clear() throws ApexException {
339         clear(this.getKey());
340     }
341
342     /**
343      * {@inheritDoc}.
344      */
345     @Override
346     public void clear(final AxArtifactKey engineKey) throws ApexException {
347         // Check if the key on the start request is correct
348         if (!engineWorkerKey.equals(engineKey)) {
349             LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
350                     + ENGINE_SUFFIX);
351             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
352                     + ENGINE_SUFFIX);
353         }
354
355         // Interrupt the worker to stop its thread
356         if (processorThread != null && !processorThread.isAlive()) {
357             LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState());
358             return;
359         }
360
361         // Clear the engine
362         engine.clear();
363
364         LOGGER.exit(engineKey);
365     }
366
367     /**
368      * {@inheritDoc}.
369      */
370     @Override
371     public boolean isStarted() {
372         return isStarted(this.getKey());
373     }
374
375     /**
376      * {@inheritDoc}.
377      */
378     @Override
379     public boolean isStarted(final AxArtifactKey engineKey) {
380         final AxEngineState engstate = getState();
381         switch (engstate) {
382             case STOPPED:
383             case STOPPING:
384             case UNDEFINED:
385                 return false;
386             case EXECUTING:
387             case READY:
388                 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
389             default:
390                 break;
391         }
392         return false;
393     }
394
395     /**
396      * {@inheritDoc}.
397      */
398     @Override
399     public boolean isStopped() {
400         return isStopped(this.getKey());
401     }
402
403     /**
404      * {@inheritDoc}.
405      */
406     @Override
407     public boolean isStopped(final AxArtifactKey engineKey) {
408         final AxEngineState engstate = getState();
409         switch (engstate) {
410             case STOPPING:
411             case UNDEFINED:
412             case EXECUTING:
413             case READY:
414                 return false;
415             case STOPPED:
416                 return processorThread == null || !processorThread.isAlive();
417             default:
418                 break;
419         }
420         return false;
421     }
422
423     /**
424      * {@inheritDoc}.
425      */
426     @Override
427     public void startPeriodicEvents(final long period) {
428         throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
429     }
430
431     /**
432      * {@inheritDoc}.
433      */
434     @Override
435     public void stopPeriodicEvents() {
436         throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
437     }
438
439     /**
440      * {@inheritDoc}.
441      */
442     @Override
443     public String getStatus(final AxArtifactKey engineKey) {
444         // Get the information from the engine that we want to return
445         final AxEngineModel apexEngineModel = engine.getEngineStatus();
446         apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
447
448         // Convert that information into a string
449         try {
450             final var baOutputStream = new ByteArrayOutputStream();
451             final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
452             modelWriter.setJsonOutput(true);
453             modelWriter.write(apexEngineModel, baOutputStream);
454             return baOutputStream.toString();
455         } catch (final Exception e) {
456             LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
457             return null;
458         }
459     }
460
461     /**
462      * {@inheritDoc}.
463      */
464     @Override
465     public List<AxEngineModel> getEngineStats() {
466         List<AxEngineModel> engineStats = new ArrayList<>();
467         engineStats.add(engine.getEngineStatus());
468         return engineStats;
469     }
470
471     /**
472      * {@inheritDoc}.
473      */
474     @Override
475     public String getRuntimeInfo(final AxArtifactKey engineKey) {
476         // We'll build up the JSON string for runtime information bit by bit
477         final var runtimeJsonStringBuilder = new StringBuilder();
478
479         // Get the engine information
480         final AxEngineModel engineModel = engine.getEngineStatus();
481         final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
482
483         // Use GSON to convert our context information into JSON
484         final var gson = new GsonBuilder().setPrettyPrinting().create();
485
486         // Get context into a JSON string
487         runtimeJsonStringBuilder.append("{\"TimeStamp\":");
488         runtimeJsonStringBuilder.append(engineModel.getTimestamp());
489         runtimeJsonStringBuilder.append(",\"State\":");
490         runtimeJsonStringBuilder.append(engineModel.getState());
491         runtimeJsonStringBuilder.append(",\"Stats\":");
492         runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
493
494         // Get context into a JSON string
495         runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
496
497         var firstAlbum = true;
498         for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
499             if (firstAlbum) {
500                 firstAlbum = false;
501             } else {
502                 runtimeJsonStringBuilder.append(",");
503             }
504
505             runtimeJsonStringBuilder.append("{\"AlbumKey\":");
506             runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
507             runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
508
509             // Get the schema helper to use to marshal context album objects to JSON
510             final var axContextAlbum =
511                     ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey());
512             SchemaHelper schemaHelper = null;
513
514             try {
515                 // Get a schema helper to manage the translations between objects on the album map
516                 // for this album
517                 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
518                         axContextAlbum.getItemSchema());
519             } catch (final ContextRuntimeException e) {
520                 final var resultString =
521                         "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON";
522                 LOGGER.warn(resultString, e);
523
524                 // End of context album entry
525                 runtimeJsonStringBuilder.append(resultString);
526                 runtimeJsonStringBuilder.append("]}");
527
528                 continue;
529             }
530
531             var firstEntry = true;
532             for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
533                 if (firstEntry) {
534                     firstEntry = false;
535                 } else {
536                     runtimeJsonStringBuilder.append(",");
537                 }
538                 runtimeJsonStringBuilder.append("{\"EntryName\":");
539                 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
540                 runtimeJsonStringBuilder.append(",\"EntryContent\":");
541                 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
542
543                 // End of context entry
544                 runtimeJsonStringBuilder.append("}");
545             }
546
547             // End of context album entry
548             runtimeJsonStringBuilder.append("]}");
549         }
550
551         runtimeJsonStringBuilder.append("]}");
552
553         // Tidy up the JSON string
554         final var jsonElement = JsonParser.parseString(runtimeJsonStringBuilder.toString());
555         final var tidiedRuntimeString = gson.toJson(jsonElement);
556
557         LOGGER.debug("runtime information={}", tidiedRuntimeString);
558
559         return tidiedRuntimeString;
560     }
561
562     /**
563      * This is an event processor thread, this class decouples the events handling logic from core business logic. This
564      * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the
565      * worker for processing by the Apex engine.
566      *
567      * @author Liam Fallon (liam.fallon@ericsson.com)
568      */
569     private class EventProcessor implements Runnable {
570         private final boolean debugEnabled = LOGGER.isDebugEnabled();
571         // the events queue
572         private BlockingQueue<ApexEvent> eventProcessingQueue = null;
573
574         /**
575          * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
576          *
577          * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events.
578          */
579         EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
580             this.eventProcessingQueue = eventProcessingQueue;
581         }
582
583         /**
584          * {@inheritDoc}.
585          */
586         @Override
587         public void run() {
588             LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
589
590             // Take events from the event processing queue of the worker and pass them to the engine
591             // for processing
592             var stopFlag = false;
593             while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) {
594                 ApexEvent event = null;
595                 try {
596                     event = eventProcessingQueue.take();
597                 } catch (final InterruptedException e) {
598                     // restore the interrupt status
599                     Thread.currentThread().interrupt();
600                     LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
601                     break;
602                 }
603                 var executedResult = false;
604                 try {
605                     if (event != null) {
606                         debugEventIfDebugEnabled(event);
607
608                         final var enevent = apexEnEventConverter.fromApexEvent(event);
609                         executedResult = engine.handleEvent(enevent);
610                     }
611                 } catch (final ApexException e) {
612                     LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
613                 } catch (final Exception e) {
614                     LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
615                     stopFlag = true;
616                 }
617                 var apexPolicyCounter = ApexPolicyStatisticsManager.getInstanceFromRegistry();
618                 if (!stopFlag && apexPolicyCounter != null) {
619                     apexPolicyCounter.updatePolicyExecutedCounter(executedResult);
620                 }
621             }
622             LOGGER.debug("Engine {} completed processing", engineWorkerKey);
623         }
624
625         /**
626          * Debug the event if debug is enabled.
627          *
628          * @param event the event to debug
629          */
630         private void debugEventIfDebugEnabled(ApexEvent event) {
631             if (debugEnabled) {
632                 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);
633             }
634         }
635     }
636 }