2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.runtime.impl;
23 import java.io.ByteArrayInputStream;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
28 import java.util.Map.Entry;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
32 import org.onap.policy.apex.context.ContextException;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
38 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
41 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
42 import org.onap.policy.apex.service.engine.event.ApexEvent;
43 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
44 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
45 import org.onap.policy.apex.service.engine.runtime.EngineService;
46 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
47 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
48 import org.onap.policy.common.parameters.GroupValidationResult;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
53 * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each of which is running
54 * on an identical Apex model. This class handles the management of the engine worker instances, their threads, and
55 * event forwarding to and from the engine workers.
57 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
58 * @author Liam Fallon (liam.fallon@ericsson.com)
59 * @author John Keeney (john.keeney@ericsson.com)
61 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
62 // Logging static variables
63 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
64 private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
66 // Constants for timing
67 private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
68 private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
69 private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
71 // The ID of this engine
72 private AxArtifactKey engineServiceKey = null;
74 // The Apex engine workers this engine service is handling
75 private final Map<AxArtifactKey, EngineService> engineWorkerMap = Collections
76 .synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineService>());
78 // Event queue for events being sent into the Apex engines, it used by all engines within a
80 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
82 // Thread factory for thread management
83 private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
85 // Periodic event generator and its period in milliseconds
86 private ApexPeriodicEventGenerator periodicEventGenerator = null;
87 private long periodicEventPeriod;
90 * This constructor instantiates engine workers and adds them to the set of engine workers to be managed. The
91 * constructor is private to prevent subclassing.
93 * @param engineServiceKey the engine service key
94 * @param incomingThreadCount the thread count, the number of engine workers to start
95 * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
96 * @throws ApexException on worker instantiation errors
98 private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int incomingThreadCount,
99 final long periodicEventPeriod) throws ApexException {
100 LOGGER.entry(engineServiceKey, incomingThreadCount);
102 this.engineServiceKey = engineServiceKey;
103 this.periodicEventPeriod = periodicEventPeriod;
105 int threadCount = incomingThreadCount;
106 if (threadCount <= 0) {
107 // Just start one engine worker
111 // Start engine workers
112 for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
113 final AxArtifactKey engineWorkerKey = new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter,
114 engineServiceKey.getVersion());
115 engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
116 LOGGER.info("Created apex engine {} .", engineWorkerKey.getId());
119 LOGGER.info("APEX service created.");
124 * Create an Apex Engine Service instance. This method is deprecated and will be removed in the next version.
126 * @param engineServiceKey the engine service key
127 * @param threadCount the thread count, the number of engine workers to start
128 * @return the Engine Service instance
129 * @throws ApexException on worker instantiation errors
130 * @deprecated Do not use this version. Use {@link #create(EngineServiceParameters)}
133 public static EngineServiceImpl create(final AxArtifactKey engineServiceKey, final int threadCount)
134 throws ApexException {
135 // Check if the Apex model specified is sane
136 if (engineServiceKey == null) {
137 LOGGER.warn("engine service key is null");
138 throw new ApexException("engine service key is null");
140 return new EngineServiceImpl(engineServiceKey, threadCount, 0);
144 * Create an Apex Engine Service instance. This method does not load the policy so
145 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
146 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model. This method does not
147 * start the Engine Service so {@link #start(AxArtifactKey)} or {@link #startAll()} must be used.
149 * @param config the configuration for this Apex Engine Service.
150 * @return the Engine Service instance
151 * @throws ApexException on worker instantiation errors
153 public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
154 if (config == null) {
155 LOGGER.warn("Engine service configuration parameters is null");
156 throw new ApexException("engine service configuration parameters is null");
158 final GroupValidationResult validation = config.validate();
159 if (!validation.isValid()) {
160 LOGGER.warn("Invalid engine service configuration parameters: {}" + validation.getResult());
161 throw new ApexException("Invalid engine service configuration parameters: " + validation);
163 final AxArtifactKey engineServiceKey = config.getEngineKey();
164 final int threadCount = config.getInstanceCount();
166 // Check if the Apex model specified is sane
167 if (engineServiceKey == null) {
168 LOGGER.warn("engine service key is null");
169 throw new ApexException("engine service key is null");
172 return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
178 * @see org.onap.policy.apex.service.engine.runtime.EngineService#registerActionListener(java.lang. String,
179 * org.onap.policy.apex.service.engine.runtime.ApexEventListener)
182 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
183 LOGGER.entry(apexEventListener);
185 // Register the Apex event listener on all engine workers, each worker will return Apex
186 // events to the listening application
187 for (final EngineService engineWorker : engineWorkerMap.values()) {
188 engineWorker.registerActionListener(listenerName, apexEventListener);
191 LOGGER.info("Added the action listener to the engine");
198 * @see org.onap.policy.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang. String)
201 public void deregisterActionListener(final String listenerName) {
202 LOGGER.entry(listenerName);
204 // Register the Apex event listener on all engine workers, each worker will return Apex
205 // events to the listening application
206 for (final EngineService engineWorker : engineWorkerMap.values()) {
207 engineWorker.deregisterActionListener(listenerName);
210 LOGGER.info("Removed the action listener from the engine");
217 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
220 public EngineServiceEventInterface getEngineServiceEventInterface() {
227 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getKey()
230 public AxArtifactKey getKey() {
231 return engineServiceKey;
237 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getInfo()
240 public Collection<AxArtifactKey> getEngineKeys() {
241 return engineWorkerMap.keySet();
247 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getApexModelKey()
250 public AxArtifactKey getApexModelKey() {
251 if (engineWorkerMap.size() == 0) {
255 return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
261 * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
262 * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
265 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
266 final boolean forceFlag) throws ApexException {
267 // Check if the Apex model specified is sane
268 if (apexModelString == null || apexModelString.trim().length() == 0) {
269 LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
271 throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
275 // Read the Apex model into memory using the Apex Model Reader
276 AxPolicyModel apexPolicyModel = null;
278 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
279 apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
280 } catch (final ApexModelException e) {
281 LOGGER.error("failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId(), e);
282 throw new ApexException(
283 "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getId(),
287 if (apexPolicyModel == null) {
288 LOGGER.error("apex model null on engine service " + incomingEngineServiceKey.getId());
289 throw new ApexException("apex model null on engine service " + incomingEngineServiceKey.getId());
293 updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
301 * @see org.onap.policy.apex.service.engine.runtime.EngineService#updateModel(org.onap.policy.apex.model.
302 * basicmodel.concepts.AxArtifactKey, org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel, boolean)
305 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
306 final boolean forceFlag) throws ApexException {
307 LOGGER.entry(incomingEngineServiceKey);
309 // Check if the Apex model specified is sane
310 if (apexModel == null) {
311 LOGGER.warn("model for updating on engine service with key " + incomingEngineServiceKey.getId()
313 throw new ApexException("model for updating on engine service with key " + incomingEngineServiceKey.getId()
317 // Check if the key on the update request is correct
318 if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
319 LOGGER.warn("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
320 + engineServiceKey.getId() + " of this engine service");
321 throw new ApexException("engine service key " + incomingEngineServiceKey.getId() + " does not match the key"
322 + engineServiceKey.getId() + " of this engine service");
325 // Check model compatibility
326 if (ModelService.existsModel(AxPolicyModel.class)) {
327 // The current policy model may or may not be defined
328 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
329 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
331 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getId()
332 + "\" is not a compatible model update from the existing engine model with key \""
333 + currentModel.getKey().getId() + "\"");
335 throw new ContextException("apex model update failed, supplied model with key \""
336 + apexModel.getKey().getId()
337 + "\" is not a compatible model update from the existing engine model with key \""
338 + currentModel.getKey().getId() + "\"");
344 // Stop all engines on this engine service
346 final long stoptime = System.currentTimeMillis();
347 while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
348 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
350 // Check if all engines are stopped
351 final StringBuilder notStoppedEngineIdBuilder = new StringBuilder();
352 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
353 if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
354 notStoppedEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
355 notStoppedEngineIdBuilder.append('(');
356 notStoppedEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
357 notStoppedEngineIdBuilder.append(") ");
360 if (notStoppedEngineIdBuilder.length() > 0) {
361 final String errorString = "cannot update model on engine service with key "
362 + incomingEngineServiceKey.getId() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
363 + "ms are: " + notStoppedEngineIdBuilder.toString().trim();
364 LOGGER.warn(errorString);
365 throw new ApexException(errorString);
369 // Update the engines
370 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
371 LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getId());
372 engineWorkerEntry.getValue().updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
375 // start all engines on this engine service if it was not stopped before the update
377 final long starttime = System.currentTimeMillis();
378 while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
379 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
381 // Check if all engines are running
382 final StringBuilder notRunningEngineIdBuilder = new StringBuilder();
383 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
384 if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
385 && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
386 notRunningEngineIdBuilder.append(engineWorkerEntry.getKey().getId());
387 notRunningEngineIdBuilder.append('(');
388 notRunningEngineIdBuilder.append(engineWorkerEntry.getValue().getState());
389 notRunningEngineIdBuilder.append(") ");
392 if (notRunningEngineIdBuilder.length() > 0) {
393 final String errorString = "engine start error on model update on engine service with key "
394 + incomingEngineServiceKey.getId() + ", engines not running are: "
395 + notRunningEngineIdBuilder.toString().trim();
396 LOGGER.warn(errorString);
397 throw new ApexException(errorString);
406 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getState()
409 public AxEngineState getState() {
410 // If one worker is running then we are running, otherwise we are stopped
411 for (final EngineService engine : engineWorkerMap.values()) {
412 if (engine.getState() != AxEngineState.STOPPED) {
413 return AxEngineState.EXECUTING;
417 return AxEngineState.STOPPED;
423 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startAll()
426 public void startAll() throws ApexException {
427 for (final EngineService engine : engineWorkerMap.values()) {
428 start(engine.getKey());
431 // Check if periodic events should be turned on
432 if (periodicEventPeriod > 0) {
433 startPeriodicEvents(periodicEventPeriod);
440 * @see org.onap.policy.apex.service.engine.runtime.EngineService#start(org.onap.policy.apex.core.model.
441 * concepts.AxArtifactKey)
444 public void start(final AxArtifactKey engineKey) throws ApexException {
445 LOGGER.entry(engineKey);
447 // Check if we have this key on our map
448 if (!engineWorkerMap.containsKey(engineKey)) {
449 LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
450 throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
454 engineWorkerMap.get(engineKey).start(engineKey);
456 LOGGER.exit(engineKey);
462 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop()
465 public void stop() throws ApexException {
469 for (final EngineService engine : engineWorkerMap.values()) {
470 if (engine.getState() != AxEngineState.STOPPED) {
481 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stop(org.onap.policy.apex.core.model.
482 * concepts.AxArtifactKey)
485 public void stop(final AxArtifactKey engineKey) throws ApexException {
486 LOGGER.entry(engineKey);
488 // Check if we have this key on our map
489 if (!engineWorkerMap.containsKey(engineKey)) {
490 LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
491 throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
495 engineWorkerMap.get(engineKey).stop(engineKey);
497 LOGGER.exit(engineKey);
503 * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear()
506 public void clear() throws ApexException {
510 for (final EngineService engine : engineWorkerMap.values()) {
511 if (engine.getState() == AxEngineState.STOPPED) {
522 * @see org.onap.policy.apex.service.engine.runtime.EngineService#clear(org.onap.policy.apex.core.model.
523 * concepts.AxArtifactKey)
526 public void clear(final AxArtifactKey engineKey) throws ApexException {
527 LOGGER.entry(engineKey);
529 // Check if we have this key on our map
530 if (!engineWorkerMap.containsKey(engineKey)) {
531 LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
532 throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
536 if (engineWorkerMap.get(engineKey).getState() == AxEngineState.STOPPED) {
537 engineWorkerMap.get(engineKey).stop(engineKey);
540 LOGGER.exit(engineKey);
544 * Check all engines are started.
546 * @return true if <i>all</i> engines are started
547 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
550 public boolean isStarted() {
551 for (final EngineService engine : engineWorkerMap.values()) {
552 if (!engine.isStarted()) {
562 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted(org.onap.policy.apex.model.
563 * basicmodel.concepts.AxArtifactKey)
566 public boolean isStarted(final AxArtifactKey engineKey) {
567 // Check if we have this key on our map
568 if (!engineWorkerMap.containsKey(engineKey)) {
569 LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
571 return engineWorkerMap.get(engineKey).isStarted();
575 * Check all engines are stopped.
577 * @return true if <i>all</i> engines are stopped
578 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
581 public boolean isStopped() {
582 for (final EngineService engine : engineWorkerMap.values()) {
583 if (!engine.isStopped()) {
593 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped(org.onap.policy.apex.model.
594 * basicmodel.concepts.AxArtifactKey)
597 public boolean isStopped(final AxArtifactKey engineKey) {
598 // Check if we have this key on our map
599 if (!engineWorkerMap.containsKey(engineKey)) {
600 LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
602 return engineWorkerMap.get(engineKey).isStopped();
608 * @see org.onap.policy.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
611 public void startPeriodicEvents(final long period) throws ApexException {
612 // Check if periodic events are already started
613 if (periodicEventGenerator != null) {
614 LOGGER.warn("Peiodic event geneation already running on engine " + engineServiceKey.getId() + ", "
615 + periodicEventGenerator.toString());
616 throw new ApexException("Peiodic event geneation already running on engine " + engineServiceKey.getId()
617 + ", " + periodicEventGenerator.toString());
620 // Set up periodic event execution, its a Java Timer/TimerTask
621 periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
623 // Record the periodic event period because it may have been set over the Web Socket admin
625 this.periodicEventPeriod = period;
631 * @see org.onap.policy.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
634 public void stopPeriodicEvents() throws ApexException {
635 // Check if periodic events are already started
636 if (periodicEventGenerator == null) {
637 LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getId());
638 throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getId());
641 // Stop periodic events
642 periodicEventGenerator.cancel();
643 periodicEventGenerator = null;
649 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getStatus(org.onap.policy.apex.core.model
650 * .concepts.AxArtifactKey)
653 public String getStatus(final AxArtifactKey engineKey) throws ApexException {
654 // Check if we have this key on our map
655 if (!engineWorkerMap.containsKey(engineKey)) {
656 LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
657 throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
660 // Return the information for this worker
661 return engineWorkerMap.get(engineKey).getStatus(engineKey);
667 * @see org.onap.policy.apex.service.engine.runtime.EngineService#getRuntimeInfo(org.onap.policy.apex.core.
668 * model.concepts.AxArtifactKey)
671 public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
672 // Check if we have this key on our map
673 if (!engineWorkerMap.containsKey(engineKey)) {
674 LOGGER.warn("engine with key " + engineKey.getId() + " not found in engine service");
675 throw new ApexException("engine with key " + engineKey.getId() + " not found in engine service");
678 // Return the information for this worker
679 return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
685 * @see org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface#sendEvent(org.onap.policy.
686 * apex.service.engine.event.ApexEvent)
689 public void sendEvent(final ApexEvent event) {
690 // Check if we have this key on our map
691 if (getState() == AxEngineState.STOPPED) {
692 LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
693 + engineServiceKey.getId() + " are running");
698 LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getId());
703 LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
706 // Add the incoming event to the queue, the next available worker will process it