2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.runtime.impl;
23 import java.io.ByteArrayInputStream;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
28 import java.util.Map.Entry;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
32 import org.onap.policy.apex.context.ContextException;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
38 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
41 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
42 import org.onap.policy.apex.service.engine.event.ApexEvent;
43 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
44 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
45 import org.onap.policy.apex.service.engine.runtime.EngineService;
46 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
47 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
48 import org.onap.policy.common.parameters.GroupValidationResult;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
53 * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
54 * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
55 * event forwarding to and from the engine workers.
57 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
58 * @author Liam Fallon (liam.fallon@ericsson.com)
59 * @author John Keeney (john.keeney@ericsson.com)
61 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
62 // Logging static variables
63 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
64 private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
66 // Recurring string constants
67 private static final String ENGINE_KEY_PREAMBLE = "engine with key ";
68 private static final String NOT_FOUND_SUFFIX = " not found in engine service";
70 // Constants for timing
71 private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
72 private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
73 private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
75 // The ID of this engine
76 private AxArtifactKey engineServiceKey = null;
78 // The Apex engine workers this engine service is handling
79 private final Map<AxArtifactKey, EngineService> engineWorkerMap = Collections
80 .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineService>());
82 // Event queue for events being sent into the Apex engines, it used by all engines within a
84 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
86 // Thread factory for thread management
87 private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
89 // Periodic event generator and its period in milliseconds
90 private ApexPeriodicEventGenerator periodicEventGenerator = null;
91 private long periodicEventPeriod;
94 * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
95 * constructor is private to prevent subclassing.
97 * @param engineServiceKey the engine service key
98 * @param incomingThreadCount the thread count, the number of engine workers to start
99 * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
100 * @throws ApexException on worker instantiation errors
102 private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int incomingThreadCount,
103 final long periodicEventPeriod) {
104 LOGGER.entry(engineServiceKey, incomingThreadCount);
106 this.engineServiceKey = engineServiceKey;
107 this.periodicEventPeriod = periodicEventPeriod;
109 int threadCount = incomingThreadCount;
110 if (threadCount <= 0) {
111 // Just start one engine worker
115 // Start engine workers
116 for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
117 final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
118 engineServiceKey.getVersion());
119 engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
120 LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
123 LOGGER.info("APEX service created.");
128 * Create an Apex Engine Service instance. This method does not load the policy so
129 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
130 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
131 * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
133 * @param config the configuration for this Apex Engine Service.
134 * @return the Engine Service instance
135 * @throws ApexException on worker instantiation errors
137 public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
138 if (config == null) {
139 LOGGER.warn("Engine service configuration parameters is null");
140 throw new ApexException("engine service configuration parameters is null");
142 final GroupValidationResult validation = config.validate();
143 if (!validation.isValid()) {
144 LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
145 throw new ApexException("Invalid engine service configuration parameters: " + validation);
147 final AxArtifactKey engineServiceKey = config.getEngineKey();
148 final int threadCount = config.getInstanceCount();
150 // Check if the Apex model specified is sane
151 if (engineServiceKey == null) {
152 LOGGER.warn("engine service key is null");
153 throw new ApexException("engine service key is null");
156 return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
162 * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String,
163 * org.onap.policy.apex.service.engine.runtime.ApexEventListener)
166 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
167 LOGGER.entry(apexEventListener);
169 // Register the Apex event listener on all engine workers, each worker will return Apex
170 // events to the listening application
171 for (final EngineService engineWorker : engineWorkerMap.values()) {
172 engineWorker.registerActionListener(listenerName, apexEventListener);
175 LOGGER.info("Added the action listener to the engine");
182 * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String)
185 public void deregisterActionListener(final String listenerName) {
186 LOGGER.entry(listenerName);
188 // Register the Apex event listener on all engine workers, each worker will return Apex
189 // events to the listening application
190 for (final EngineService engineWorker : engineWorkerMap.values()) {
191 engineWorker.deregisterActionListener(listenerName);
194 LOGGER.info("Removed the action listener from the engine");
201 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
204 public EngineServiceEventInterface getEngineServiceEventInterface() {
211 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
214 public AxArtifactKey getKey() {
215 return engineServiceKey;
221 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
224 public Collection<AxArtifactKey> getEngineKeys() {
225 return engineWorkerMap.keySet();
231 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
234 public AxArtifactKey getApexModelKey() {
235 if (engineWorkerMap.size() == 0) {
239 return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
245 * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
246 * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
249 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
250 final boolean forceFlag) throws ApexException {
251 // Check if the Apex model specified is sane
252 if (apexModelString == null || apexModelString.trim().length() == 0) {
253 String emptyModelMessage = "model for updating engine service with key "
254 + incomingEngineServiceKey.getId() + " is empty";
255 LOGGER.warn(emptyModelMessage);
256 throw new ApexException(emptyModelMessage);
259 // Read the Apex model into memory using the Apex Model Reader
260 AxPolicyModel apexPolicyModel = null;
262 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
263 apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
264 } catch (final ApexModelException e) {
265 String message = "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId();
266 LOGGER.error(message, e);
267 throw new ApexException(message, e);
270 if (apexPolicyModel == null) {
271 String message = "apex model null on engine service " + incomingEngineServiceKey.getId();
272 LOGGER.error(message);
273 throw new ApexException(message);
277 updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
285 * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
286 * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
289 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
290 final boolean forceFlag) throws ApexException {
291 LOGGER.entry(incomingEngineServiceKey);
293 // Check if the Apex model specified is sane
294 if (apexModel == null) {
295 LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
297 throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
301 // Check if the key on the update request is correct
302 if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
303 LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
304 + engineServiceKey.getId() + " of this engine service");
305 throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
306 + engineServiceKey.getId() + " of this engine service");
309 // Check model compatibility
310 if (ModelService.existsModel(AxPolicyModel.class)) {
311 // The current policy model may or may not be defined
312 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
313 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
314 handleIncompatibility(apexModel, forceFlag, currentModel);
319 stopEngines(incomingEngineServiceKey);
322 // Update the engines
323 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
324 LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
325 engineWorkerEntry.getValue().updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
328 // start all engines on this engine service if it was not stopped before the update
330 final long starttime = System.currentTimeMillis();
331 while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
332 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
334 // Check if all engines are running
335 final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
336 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
337 if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
338 && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
339 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
340 notRunningEngineIdBuilder.append('(');
341 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
342 notRunningEngineIdBuilder.append(") ");
345 if (notRunningEngineIdBuilder.length() > 0) {
346 final String errorString = "engine start error on model update on engine service with key "
347 + incomingEngineServiceKey.getId() + ", engines not running are: "
348 + notRunningEngineIdBuilder.toString().trim();
349 LOGGER.warn(errorString);
350 throw new ApexException(errorString);
357 * Stop engines for a model update.
358 * @param incomingEngineServiceKey the engine service key for the engines that are to be stopped
359 * @throws ApexException on errors stopping engines
361 private void stopEngines(final AxArtifactKey incomingEngineServiceKey) throws ApexException {
362 // Stop all engines on this engine service
364 final long stoptime = System.currentTimeMillis();
365 while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
366 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
368 // Check if all engines are stopped
369 final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
370 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
371 if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
372 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
373 notStoppedEngineIdBuilder.append('(');
374 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
375 notStoppedEngineIdBuilder.append(") ");
378 if (notStoppedEngineIdBuilder.length() > 0) {
379 final String errorString = "cannot update model on engine service with key "
380 + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
381 + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
382 LOGGER.warn(errorString);
383 throw new ApexException(errorString);
388 * Issue compatibility warning or error message.
389 * @param apexModel The model name
390 * @param forceFlag true if we are forcing the update
391 * @param currentModel the existing model that is loaded
392 * @throws ContextException on compatibility errors
394 private void handleIncompatibility(final AxPolicyModel apexModel, final boolean forceFlag,
395 final AxPolicyModel currentModel) throws ContextException {
397 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
398 + "\" is not a compatible model update from the existing engine model with key \""
399 + currentModel.getKey().getId() + "\"");
401 throw new ContextException("apex model update failed, supplied model with key \""
402 + apexModel.getKey().getId()
403 + "\" is not a compatible model update from the existing engine model with key \""
404 + currentModel.getKey().getId() + "\"");
411 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
414 public AxEngineState getState() {
415 // If one worker is running then we are running, otherwise we are stopped
416 for (final EngineService engine : engineWorkerMap.values()) {
417 if (engine.getState() != AxEngineState.STOPPED) {
418 return AxEngineState.EXECUTING;
422 return AxEngineState.STOPPED;
428 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
431 public void startAll() throws ApexException {
432 for (final EngineService engine : engineWorkerMap.values()) {
433 start(engine.getKey());
436 // Check if periodic events should be turned on
437 if (periodicEventPeriod > 0) {
438 startPeriodicEvents(periodicEventPeriod);
445 * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.model.
446 * concepts.AxArtifactKey)
449 public void start(final AxArtifactKey engineKey) throws ApexException {
450 LOGGER.entry(engineKey);
452 // Check if we have this key on our map
453 if (!engineWorkerMap.containsKey(engineKey)) {
454 String message = ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX;
455 LOGGER.warn(message);
456 throw new ApexException(message);
460 engineWorkerMap.get(engineKey).start(engineKey);
462 LOGGER.exit(engineKey);
468 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
471 public void stop() throws ApexException {
475 for (final EngineService engine : engineWorkerMap.values()) {
476 if (engine.getState() != AxEngineState.STOPPED) {
487 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.model.
488 * concepts.AxArtifactKey)
491 public void stop(final AxArtifactKey engineKey) throws ApexException {
492 LOGGER.entry(engineKey);
494 // Check if we have this key on our map
495 if (!engineWorkerMap.containsKey(engineKey)) {
496 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
497 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
501 engineWorkerMap.get(engineKey).stop(engineKey);
503 LOGGER.exit(engineKey);
509 * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
512 public void clear() throws ApexException {
516 for (final EngineService engine : engineWorkerMap.values()) {
517 if (engine.getState() == AxEngineState.STOPPED) {
528 * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core.model.
529 * concepts.AxArtifactKey)
532 public void clear(final AxArtifactKey engineKey) throws ApexException {
533 LOGGER.entry(engineKey);
535 // Check if we have this key on our map
536 if (!engineWorkerMap.containsKey(engineKey)) {
537 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
538 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
542 if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
543 engineWorkerMap.get(engineKey).stop(engineKey);
546 LOGGER.exit(engineKey);
550 * Check all engines are started.
552 * @return true if <i>all</i> engines are started
553 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
556 public boolean isStarted() {
557 for (final EngineService engine : engineWorkerMap.values()) {
558 if (!engine.isStarted()) {
568 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.model.
569 * basicmodel.concepts.AxArtifactKey)
572 public boolean isStarted(final AxArtifactKey engineKey) {
573 // Check if we have this key on our map
574 if (!engineWorkerMap.containsKey(engineKey)) {
575 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
577 return engineWorkerMap.get(engineKey).isStarted();
581 * Check all engines are stopped.
583 * @return true if <i>all</i> engines are stopped
584 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
587 public boolean isStopped() {
588 for (final EngineService engine : engineWorkerMap.values()) {
589 if (!engine.isStopped()) {
599 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.model.
600 * basicmodel.concepts.AxArtifactKey)
603 public boolean isStopped(final AxArtifactKey engineKey) {
604 // Check if we have this key on our map
605 if (!engineWorkerMap.containsKey(engineKey)) {
606 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
608 return engineWorkerMap.get(engineKey).isStopped();
614 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
617 public void startPeriodicEvents(final long period) throws ApexException {
618 // Check if periodic events are already started
619 if (periodicEventGenerator != null) {
620 String message = "Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
621 + periodicEventGenerator.toString();
622 LOGGER.warn(message);
623 throw new ApexException(message);
626 // Set up periodic event execution, its a Java Timer/TimerTask
627 periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
629 // Record the periodic event period because it may have been set over the Web Socket admin
631 this.periodicEventPeriod = period;
637 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
640 public void stopPeriodicEvents() throws ApexException {
641 // Check if periodic events are already started
642 if (periodicEventGenerator == null) {
643 LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
644 throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
647 // Stop periodic events
648 periodicEventGenerator.cancel();
649 periodicEventGenerator = null;
655 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core.model
656 * .concepts.AxArtifactKey)
659 public String getStatus(final AxArtifactKey engineKey) throws ApexException {
660 // Check if we have this key on our map
661 if (!engineWorkerMap.containsKey(engineKey)) {
662 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
663 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
666 // Return the information for this worker
667 return engineWorkerMap.get(engineKey).getStatus(engineKey);
673 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex.core.
674 * model.concepts.AxArtifactKey)
677 public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
678 // Check if we have this key on our map
679 if (!engineWorkerMap.containsKey(engineKey)) {
680 LOGGER.warn(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
681 throw new ApexException(ENGINE_KEY_PREAMBLE + engineKey.getId() + NOT_FOUND_SUFFIX);
684 // Return the information for this worker
685 return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
691 * @see org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface#sendEvent(org.onap.policy.
692 * apex.service.engine.event.ApexEvent)
695 public void sendEvent(final ApexEvent event) {
696 // Check if we have this key on our map
697 if (getState() == AxEngineState.STOPPED) {
698 LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
699 + engineServiceKey.getId() + " are running");
704 LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
709 LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
712 // Add the incoming event to the queue, the next available worker will process it