4da605dbcdfcae0b21b5fa1caa09d13a6ef7e7c3
[policy/apex-pdp.git] / services / services-engine / src / main / java / org / onap / policy / apex / service / engine / runtime / impl / EngineWorker.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  *  Modifications Copyright (C) 2019-2020 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.apex.service.engine.runtime.impl;
23
24 import com.google.gson.Gson;
25 import com.google.gson.GsonBuilder;
26 import com.google.gson.JsonElement;
27 import com.google.gson.JsonParser;
28
29 import java.io.ByteArrayInputStream;
30 import java.io.ByteArrayOutputStream;
31 import java.util.ArrayList;
32 import java.util.Arrays;
33 import java.util.Collection;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Map.Entry;
37 import java.util.concurrent.BlockingQueue;
38
39 import lombok.Setter;
40
41 import org.onap.policy.apex.context.ContextException;
42 import org.onap.policy.apex.context.ContextRuntimeException;
43 import org.onap.policy.apex.context.SchemaHelper;
44 import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory;
45 import org.onap.policy.apex.core.engine.engine.ApexEngine;
46 import org.onap.policy.apex.core.engine.engine.impl.ApexEngineFactory;
47 import org.onap.policy.apex.core.engine.event.EnEvent;
48 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
49 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
50 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
51 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
52 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
53 import org.onap.policy.apex.model.basicmodel.handling.ApexModelWriter;
54 import org.onap.policy.apex.model.basicmodel.service.ModelService;
55 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
56 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
57 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineModel;
58 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
59 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
60 import org.onap.policy.apex.service.engine.event.ApexEvent;
61 import org.onap.policy.apex.service.engine.event.impl.enevent.ApexEvent2EnEventConverter;
62 import org.onap.policy.apex.service.engine.main.ApexPolicyStatisticsManager;
63 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
64 import org.onap.policy.apex.service.engine.runtime.EngineService;
65 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
66 import org.slf4j.ext.XLogger;
67 import org.slf4j.ext.XLoggerFactory;
68
69 /**
70  * The Class EngineWorker encapsulates a core {@link ApexEngine} instance, which runs policies defined in the
71  * {@link org.onap.policy.apex.model.basicmodel.concepts.AxModelAxModel}. Each policy is triggered by an Apex event, and
72  * when the policy is triggered it runs through to completion in the ApexEngine.
73  *
74  * <p>This class acts as a container for an {@link ApexEngine}, running it in a thread, sending it events, and receiving
75  * events from it.
76  *
77  * @author Liam Fallon (liam.fallon@ericsson.com)
78  */
79 final class EngineWorker implements EngineService {
80     // Logger for this class
81     private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineService.class);
82
83     // Recurring string constants
84     private static final String ENGINE_FOR_KEY_PREFIX = "apex engine for engine key ";
85     private static final String ENGINE_SUFFIX = " of this engine";
86     private static final String BAD_KEY_MATCH_TAG = " does not match the key";
87     private static final String ENGINE_KEY_PREFIX = "engine key ";
88
89     // The ID of this engine
90     private final AxArtifactKey engineWorkerKey;
91
92     // The Apex engine which is running the policies in this worker
93     private final ApexEngine engine;
94
95     // The event processor is an inner class, an instance of which runs as a thread that reads
96     // incoming events from a queue and forwards them to the Apex engine
97     private EventProcessor processor = null;
98
99     // Thread handling for the worker
100     private final ApplicationThreadFactory threadFactory;
101     private Thread processorThread;
102
103     // Converts ApexEvent instances to and from EnEvent instances
104     private ApexEvent2EnEventConverter apexEnEventConverter = null;
105
106     @Setter
107     private boolean isSubsequentInstance;
108
109     /**
110      * Constructor that creates an Apex engine, an event processor for events to be sent to that engine, and an
111      * {@link ApexModelReader} instance to read Apex models using JAXB.
112      *
113      * @param engineWorkerKey the engine worker key
114      * @param queue the queue on which events for this Apex worker will come
115      * @param threadFactory the thread factory to use for creating the event processing thread
116      * @throws ApexException thrown on errors on worker instantiation
117      */
118     protected EngineWorker(final AxArtifactKey engineWorkerKey, final BlockingQueue<ApexEvent> queue,
119             final ApplicationThreadFactory threadFactory) {
120         LOGGER.entry(engineWorkerKey);
121
122         this.engineWorkerKey = engineWorkerKey;
123         this.threadFactory = threadFactory;
124
125         // Create the Apex engine
126         engine = new ApexEngineFactory().createApexEngine(engineWorkerKey);
127
128         // Create and run the event processor
129         processor = new EventProcessor(queue);
130
131         // Set the Event converter up
132         apexEnEventConverter = new ApexEvent2EnEventConverter(engine);
133
134         LOGGER.exit();
135     }
136
137     /**
138      * {@inheritDoc}.
139      */
140     @Override
141     public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
142         engine.addEventListener(listenerName, new EnEventListenerImpl(apexEventListener, apexEnEventConverter));
143     }
144
145     /**
146      * {@inheritDoc}.
147      */
148     @Override
149     public void deregisterActionListener(final String listenerName) {
150         engine.removeEventListener(listenerName);
151     }
152
153     /**
154      * {@inheritDoc}.
155      */
156     @Override
157     public EngineServiceEventInterface getEngineServiceEventInterface() {
158         throw new UnsupportedOperationException(
159                 "getEngineServiceEventInterface() call is not allowed on an Apex Engine Worker");
160     }
161
162     /**
163      * {@inheritDoc}.
164      */
165     @Override
166     public AxArtifactKey getKey() {
167         return engineWorkerKey;
168     }
169
170     /**
171      * {@inheritDoc}.
172      */
173     @Override
174     public Collection<AxArtifactKey> getEngineKeys() {
175         return Arrays.asList(engineWorkerKey);
176     }
177
178     /**
179      * {@inheritDoc}.
180      */
181     @Override
182     public AxArtifactKey getApexModelKey() {
183         if (ModelService.existsModel(AxPolicyModel.class)) {
184             return ModelService.getModel(AxPolicyModel.class).getKey();
185         } else {
186             return null;
187         }
188     }
189
190     /**
191      * {@inheritDoc}.
192      */
193     @Override
194     public void updateModel(final AxArtifactKey engineKey, final String engineModel, final boolean forceFlag)
195             throws ApexException {
196         LOGGER.entry(engineKey);
197
198         // Read the Apex model into memory using the Apex Model Reader
199         AxPolicyModel apexPolicyModel = null;
200         try {
201             final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
202             apexPolicyModel = modelReader.read(new ByteArrayInputStream(engineModel.getBytes()));
203         } catch (final ApexModelException e) {
204             throw new ApexException("failed to unmarshal the apex model on engine " + engineKey.getId(), e);
205         }
206
207         // Update the Apex model in the Apex engine
208         updateModel(engineKey, apexPolicyModel, forceFlag);
209
210         LOGGER.exit();
211     }
212
213     /**
214      * {@inheritDoc}.
215      */
216     @Override
217     public void updateModel(final AxArtifactKey engineKey, final AxPolicyModel apexModel, final boolean forceFlag)
218             throws ApexException {
219         LOGGER.entry(engineKey);
220
221         // Check if the key on the update request is correct
222         if (!engineWorkerKey.equals(engineKey)) {
223             String message =
224                     ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId() + ENGINE_SUFFIX;
225             throw new ApexException(message);
226         }
227
228         // Check model compatibility
229         if (ModelService.existsModel(AxPolicyModel.class)) {
230             // The current policy model may or may not be defined
231             final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
232             if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
233                 if (forceFlag) {
234                     LOGGER.warn(
235                             "apex model update forced, supplied model with key \"{}\" is not a compatible model update "
236                                     + "from the existing engine model with key \"{}\"",
237                             apexModel.getKey().getId(), currentModel.getKey().getId());
238                 } else {
239                     throw new ContextException(
240                             "apex model update failed, supplied model with key \"" + apexModel.getKey().getId()
241                                     + "\" is not a compatible model update from the existing engine model with key \""
242                                     + currentModel.getKey().getId() + "\"");
243                 }
244             }
245         }
246         // Update the Apex model in the Apex engine
247         engine.updateModel(apexModel, isSubsequentInstance);
248
249         LOGGER.debug("engine model {} added to the engine-{}", apexModel.getKey().getId(), engineWorkerKey);
250         LOGGER.exit();
251     }
252
253     /**
254      * {@inheritDoc}.
255      */
256     @Override
257     public AxEngineState getState() {
258         return engine.getState();
259     }
260
261     /**
262      * {@inheritDoc}.
263      */
264     @Override
265     public void startAll() throws ApexException {
266         start(this.getKey());
267     }
268
269     /**
270      * {@inheritDoc}.
271      */
272     @Override
273     public void start(final AxArtifactKey engineKey) throws ApexException {
274         LOGGER.entry(engineKey);
275
276         // Check if the key on the start request is correct
277         if (!engineWorkerKey.equals(engineKey)) {
278             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
279                     + ENGINE_SUFFIX);
280         }
281
282         // Starts the event processing thread that handles incoming events
283         if (processorThread != null && processorThread.isAlive()) {
284             String message =
285                     ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already running with state " + getState();
286             LOGGER.error(message);
287             throw new ApexException(message);
288         }
289
290         // Start the engine
291         engine.start();
292
293         // Start a thread to process events for the engine
294         processorThread = threadFactory.newThread(processor);
295         processorThread.start();
296
297         LOGGER.exit(engineKey);
298     }
299
300     /**
301      * {@inheritDoc}.
302      */
303     @Override
304     public void stop() throws ApexException {
305         stop(this.getKey());
306     }
307
308     /**
309      * {@inheritDoc}.
310      */
311     @Override
312     public void stop(final AxArtifactKey engineKey) throws ApexException {
313         // Check if the key on the start request is correct
314         if (!engineWorkerKey.equals(engineKey)) {
315             LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
316                     + ENGINE_SUFFIX);
317             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
318                     + ENGINE_SUFFIX);
319         }
320
321         // Interrupt the worker to stop its thread
322         if (processorThread == null || !processorThread.isAlive()) {
323             processorThread = null;
324
325             LOGGER.warn(
326                     ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is already stopped with state " + getState());
327             return;
328         }
329
330         // Interrupt the thread that is handling events toward the engine
331         processorThread.interrupt();
332         processorThread = null;
333
334         // Stop the engine
335         engine.stop();
336
337         LOGGER.exit(engineKey);
338     }
339
340     /**
341      * {@inheritDoc}.
342      */
343     @Override
344     public void clear() throws ApexException {
345         clear(this.getKey());
346     }
347
348     /**
349      * {@inheritDoc}.
350      */
351     @Override
352     public void clear(final AxArtifactKey engineKey) throws ApexException {
353         // Check if the key on the start request is correct
354         if (!engineWorkerKey.equals(engineKey)) {
355             LOGGER.warn(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
356                     + ENGINE_SUFFIX);
357             throw new ApexException(ENGINE_KEY_PREFIX + engineKey.getId() + BAD_KEY_MATCH_TAG + engineWorkerKey.getId()
358                     + ENGINE_SUFFIX);
359         }
360
361         // Interrupt the worker to stop its thread
362         if (processorThread != null && !processorThread.isAlive()) {
363             LOGGER.warn(ENGINE_FOR_KEY_PREFIX + engineWorkerKey.getId() + " is not stopped with state " + getState());
364             return;
365         }
366
367         // Clear the engine
368         engine.clear();
369
370         LOGGER.exit(engineKey);
371     }
372
373     /**
374      * {@inheritDoc}.
375      */
376     @Override
377     public boolean isStarted() {
378         return isStarted(this.getKey());
379     }
380
381     /**
382      * {@inheritDoc}.
383      */
384     @Override
385     public boolean isStarted(final AxArtifactKey engineKey) {
386         final AxEngineState engstate = getState();
387         switch (engstate) {
388             case STOPPED:
389             case STOPPING:
390             case UNDEFINED:
391                 return false;
392             case EXECUTING:
393             case READY:
394                 return processorThread != null && processorThread.isAlive() && !processorThread.isInterrupted();
395             default:
396                 break;
397         }
398         return false;
399     }
400
401     /**
402      * {@inheritDoc}.
403      */
404     @Override
405     public boolean isStopped() {
406         return isStopped(this.getKey());
407     }
408
409     /**
410      * {@inheritDoc}.
411      */
412     @Override
413     public boolean isStopped(final AxArtifactKey engineKey) {
414         final AxEngineState engstate = getState();
415         switch (engstate) {
416             case STOPPING:
417             case UNDEFINED:
418             case EXECUTING:
419             case READY:
420                 return false;
421             case STOPPED:
422                 return processorThread == null || !processorThread.isAlive();
423             default:
424                 break;
425         }
426         return false;
427     }
428
429     /**
430      * {@inheritDoc}.
431      */
432     @Override
433     public void startPeriodicEvents(final long period) {
434         throw new UnsupportedOperationException("startPeriodicEvents() call is not allowed on an Apex Engine Worker");
435     }
436
437     /**
438      * {@inheritDoc}.
439      */
440     @Override
441     public void stopPeriodicEvents() {
442         throw new UnsupportedOperationException("stopPeriodicEvents() call is not allowed on an Apex Engine Worker");
443     }
444
445     /**
446      * {@inheritDoc}.
447      */
448     @Override
449     public String getStatus(final AxArtifactKey engineKey) {
450         // Get the information from the engine that we want to return
451         final AxEngineModel apexEngineModel = engine.getEngineStatus();
452         apexEngineModel.getKeyInformation().generateKeyInfo(apexEngineModel);
453
454         // Convert that information into a string
455         try {
456             final ByteArrayOutputStream baOutputStream = new ByteArrayOutputStream();
457             final ApexModelWriter<AxEngineModel> modelWriter = new ApexModelWriter<>(AxEngineModel.class);
458             modelWriter.setJsonOutput(true);
459             modelWriter.write(apexEngineModel, baOutputStream);
460             return baOutputStream.toString();
461         } catch (final Exception e) {
462             LOGGER.warn("error outputting runtime information for engine {}", engineWorkerKey, e);
463             return null;
464         }
465     }
466
467     /**
468      * {@inheritDoc}.
469      */
470     @Override
471     public List<AxEngineModel> getEngineStats() {
472         List<AxEngineModel> engineStats = new ArrayList<>();
473         engineStats.add(engine.getEngineStatus());
474         return engineStats;
475     }
476
477     /**
478      * {@inheritDoc}.
479      */
480     @Override
481     public String getRuntimeInfo(final AxArtifactKey engineKey) {
482         // We'll build up the JSON string for runtime information bit by bit
483         final StringBuilder runtimeJsonStringBuilder = new StringBuilder();
484
485         // Get the engine information
486         final AxEngineModel engineModel = engine.getEngineStatus();
487         final Map<AxArtifactKey, Map<String, Object>> engineContextAlbums = engine.getEngineContext();
488
489         // Use GSON to convert our context information into JSON
490         final Gson gson = new GsonBuilder().setPrettyPrinting().create();
491
492         // Get context into a JSON string
493         runtimeJsonStringBuilder.append("{\"TimeStamp\":");
494         runtimeJsonStringBuilder.append(engineModel.getTimestamp());
495         runtimeJsonStringBuilder.append(",\"State\":");
496         runtimeJsonStringBuilder.append(engineModel.getState());
497         runtimeJsonStringBuilder.append(",\"Stats\":");
498         runtimeJsonStringBuilder.append(gson.toJson(engineModel.getStats()));
499
500         // Get context into a JSON string
501         runtimeJsonStringBuilder.append(",\"ContextAlbums\":[");
502
503         boolean firstAlbum = true;
504         for (final Entry<AxArtifactKey, Map<String, Object>> contextAlbumEntry : engineContextAlbums.entrySet()) {
505             if (firstAlbum) {
506                 firstAlbum = false;
507             } else {
508                 runtimeJsonStringBuilder.append(",");
509             }
510
511             runtimeJsonStringBuilder.append("{\"AlbumKey\":");
512             runtimeJsonStringBuilder.append(gson.toJson(contextAlbumEntry.getKey()));
513             runtimeJsonStringBuilder.append(",\"AlbumContent\":[");
514
515             // Get the schema helper to use to marshal context album objects to JSON
516             final AxContextAlbum axContextAlbum =
517                     ModelService.getModel(AxContextAlbums.class).get(contextAlbumEntry.getKey());
518             SchemaHelper schemaHelper = null;
519
520             try {
521                 // Get a schema helper to manage the translations between objects on the album map
522                 // for this album
523                 schemaHelper = new SchemaHelperFactory().createSchemaHelper(axContextAlbum.getKey(),
524                         axContextAlbum.getItemSchema());
525             } catch (final ContextRuntimeException e) {
526                 final String resultString =
527                         "could not find schema helper to marshal context album \"" + axContextAlbum + "\" to JSON";
528                 LOGGER.warn(resultString, e);
529
530                 // End of context album entry
531                 runtimeJsonStringBuilder.append(resultString);
532                 runtimeJsonStringBuilder.append("]}");
533
534                 continue;
535             }
536
537             boolean firstEntry = true;
538             for (final Entry<String, Object> contextEntry : contextAlbumEntry.getValue().entrySet()) {
539                 if (firstEntry) {
540                     firstEntry = false;
541                 } else {
542                     runtimeJsonStringBuilder.append(",");
543                 }
544                 runtimeJsonStringBuilder.append("{\"EntryName\":");
545                 runtimeJsonStringBuilder.append(gson.toJson(contextEntry.getKey()));
546                 runtimeJsonStringBuilder.append(",\"EntryContent\":");
547                 runtimeJsonStringBuilder.append(gson.toJson(schemaHelper.marshal2String(contextEntry.getValue())));
548
549                 // End of context entry
550                 runtimeJsonStringBuilder.append("}");
551             }
552
553             // End of context album entry
554             runtimeJsonStringBuilder.append("]}");
555         }
556
557         runtimeJsonStringBuilder.append("]}");
558
559         // Tidy up the JSON string
560         final JsonElement jsonElement = JsonParser.parseString(runtimeJsonStringBuilder.toString());
561         final String tidiedRuntimeString = gson.toJson(jsonElement);
562
563         LOGGER.debug("runtime information={}", tidiedRuntimeString);
564
565         return tidiedRuntimeString;
566     }
567
568     /**
569      * This is an event processor thread, this class decouples the events handling logic from core business logic. This
570      * class runs its own thread and continuously querying the blocking queue for the events that have been sent to the
571      * worker for processing by the Apex engine.
572      *
573      * @author Liam Fallon (liam.fallon@ericsson.com)
574      */
575     private class EventProcessor implements Runnable {
576         private final boolean debugEnabled = LOGGER.isDebugEnabled();
577         // the events queue
578         private BlockingQueue<ApexEvent> eventProcessingQueue = null;
579
580         /**
581          * Constructor accepts {@link ApexEngine} and {@link BlockingQueue} type objects.
582          *
583          * @param eventProcessingQueue is reference of {@link BlockingQueue} which contains trigger events.
584          */
585         EventProcessor(final BlockingQueue<ApexEvent> eventProcessingQueue) {
586             this.eventProcessingQueue = eventProcessingQueue;
587         }
588
589         /**
590          * {@inheritDoc}.
591          */
592         @Override
593         public void run() {
594             LOGGER.debug("Engine {} processing ... ", engineWorkerKey);
595
596             // Take events from the event processing queue of the worker and pass them to the engine
597             // for processing
598             boolean stopFlag = false;
599             while (processorThread != null && !processorThread.isInterrupted() && !stopFlag) {
600                 ApexEvent event = null;
601                 try {
602                     event = eventProcessingQueue.take();
603                 } catch (final InterruptedException e) {
604                     // restore the interrupt status
605                     Thread.currentThread().interrupt();
606                     LOGGER.debug("Engine {} processing interrupted ", engineWorkerKey);
607                     break;
608                 }
609                 boolean executedResult = false;
610                 try {
611                     if (event != null) {
612                         debugEventIfDebugEnabled(event);
613
614                         final EnEvent enevent = apexEnEventConverter.fromApexEvent(event);
615                         executedResult = engine.handleEvent(enevent);
616                     }
617                 } catch (final ApexException e) {
618                     LOGGER.warn("Engine {} failed to process event {}", engineWorkerKey, event.toString(), e);
619                 } catch (final Exception e) {
620                     LOGGER.warn("Engine {} terminated processing event {}", engineWorkerKey, event.toString(), e);
621                     stopFlag = true;
622                 }
623                 ApexPolicyStatisticsManager apexPolicyCounter = ApexPolicyStatisticsManager.getInstanceFromRegistry();
624                 if (!stopFlag && apexPolicyCounter != null) {
625                     apexPolicyCounter.updatePolicyExecutedCounter(executedResult);
626                 }
627             }
628             LOGGER.debug("Engine {} completed processing", engineWorkerKey);
629         }
630
631         /**
632          * Debug the event if debug is enabled.
633          *
634          * @param event the event to debug
635          */
636         private void debugEventIfDebugEnabled(ApexEvent event) {
637             if (debugEnabled) {
638                 LOGGER.debug("Trigger Event {} forwarded to the Apex engine", event);
639             }
640         }
641     }
642 }