2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * ================================================================================
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
17 * SPDX-License-Identifier: Apache-2.0
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.apex.service.engine.runtime.impl;
23 import java.io.ByteArrayInputStream;
24 import java.util.Collection;
25 import java.util.Collections;
26 import java.util.LinkedHashMap;
28 import java.util.Map.Entry;
29 import java.util.concurrent.BlockingQueue;
30 import java.util.concurrent.LinkedBlockingQueue;
32 import org.onap.policy.apex.context.ContextException;
33 import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory;
34 import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities;
35 import org.onap.policy.apex.model.basicmodel.concepts.ApexException;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.handling.ApexModelException;
38 import org.onap.policy.apex.model.basicmodel.handling.ApexModelReader;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.enginemodel.concepts.AxEngineState;
41 import org.onap.policy.apex.model.policymodel.concepts.AxPolicyModel;
42 import org.onap.policy.apex.service.engine.event.ApexEvent;
43 import org.onap.policy.apex.service.engine.event.ApexPeriodicEventGenerator;
44 import org.onap.policy.apex.service.engine.runtime.ApexEventListener;
45 import org.onap.policy.apex.service.engine.runtime.EngineService;
46 import org.onap.policy.apex.service.engine.runtime.EngineServiceEventInterface;
47 import org.onap.policy.apex.service.parameters.engineservice.EngineServiceParameters;
48 import org.slf4j.ext.XLogger;
49 import org.slf4j.ext.XLoggerFactory;
52 * The Class EngineServiceImpl controls a thread pool that runs a set of Apex engine workers, each
53 * of which is running on an identical Apex model. This class handles the management of the engine
54 * worker instances, their threads, and event forwarding to and from the engine workers.
56 * @author Sajeevan Achuthan (sajeevan.achuthan@ericsson.com)
57 * @author Liam Fallon (liam.fallon@ericsson.com)
58 * @author John Keeney (john.keeney@ericsson.com)
60 public final class EngineServiceImpl implements EngineService, EngineServiceEventInterface {
61 // Logging static variables
62 private static final XLogger LOGGER = XLoggerFactory.getXLogger(EngineServiceImpl.class);
63 private static final boolean DEBUG_ENABLED = LOGGER.isDebugEnabled();
65 // Constants for timing
66 private static final long MAX_START_WAIT_TIME = 5000; // 5 seconds
67 private static final long MAX_STOP_WAIT_TIME = 5000; // 5 seconds
68 private static final int ENGINE_SERVICE_STOP_START_WAIT_INTERVAL = 200;
70 // The ID of this engine
71 private AxArtifactKey engineServiceKey = null;
73 // The Apex engine workers this engine service is handling
74 private final Map<AxArtifactKey, EngineService> engineWorkerMap =
75 Collections.synchronizedMap(new LinkedHashMap<AxArtifactKey, EngineService>());
77 // Event queue for events being sent into the Apex engines, it used by all engines within a
79 private final BlockingQueue<ApexEvent> queue = new LinkedBlockingQueue<>();
81 // Thread factory for thread management
82 private final ApplicationThreadFactory atFactory = new ApplicationThreadFactory("apex-engine-service", 512);
84 // Periodic event generator and its period in milliseconds
85 private ApexPeriodicEventGenerator periodicEventGenerator = null;
86 private long periodicEventPeriod;
89 * This constructor instantiates engine workers and adds them to the set of engine workers to be
90 * managed. The constructor is private to prevent subclassing.
92 * @param engineServiceKey the engine service key
93 * @param incomingThreadCount the thread count, the number of engine workers to start
94 * @param periodicEventPeriod the period in milliseconds at which periodic events are generated
95 * @throws ApexException on worker instantiation errors
97 private EngineServiceImpl(final AxArtifactKey engineServiceKey, final int incomingThreadCount,
98 final long periodicEventPeriod) throws ApexException {
99 LOGGER.entry(engineServiceKey, incomingThreadCount);
101 this.engineServiceKey = engineServiceKey;
102 this.periodicEventPeriod = periodicEventPeriod;
104 int threadCount = incomingThreadCount;
105 if (threadCount <= 0) {
106 // Just start one engine worker
110 // Start engine workers
111 for (int engineCounter = 0; engineCounter < threadCount; engineCounter++) {
112 final AxArtifactKey engineWorkerKey =
113 new AxArtifactKey(engineServiceKey.getName() + '-' + engineCounter, engineServiceKey.getVersion());
114 engineWorkerMap.put(engineWorkerKey, new EngineWorker(engineWorkerKey, queue, atFactory));
115 LOGGER.info("Created apex engine {} .", engineWorkerKey.getID());
118 LOGGER.info("APEX service created.");
123 * Create an Apex Engine Service instance. This method is deprecated and will be removed in the
126 * @param engineServiceKey the engine service key
127 * @param threadCount the thread count, the number of engine workers to start
128 * @return the Engine Service instance
129 * @throws ApexException on worker instantiation errors
130 * @deprecated Do not use this version. Use {@link #create(EngineServiceParameters)}
133 public static EngineServiceImpl create(final AxArtifactKey engineServiceKey, final int threadCount)
134 throws ApexException {
135 // Check if the Apex model specified is sane
136 if (engineServiceKey == null) {
137 LOGGER.warn("engine service key is null");
138 throw new ApexException("engine service key is null");
140 return new EngineServiceImpl(engineServiceKey, threadCount, 0);
144 * Create an Apex Engine Service instance. This method does not load the policy so
145 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} or
146 * {@link #updateModel(AxArtifactKey, AxPolicyModel, boolean)} must be used to load a model.
147 * This method does not start the Engine Service so {@link #start(AxArtifactKey)} or
148 * {@link #startAll()} must be used.
150 * @param config the configuration for this Apex Engine Service.
151 * @return the Engine Service instance
152 * @throws ApexException on worker instantiation errors
154 public static EngineServiceImpl create(final EngineServiceParameters config) throws ApexException {
155 if (config == null) {
156 LOGGER.warn("Engine service configuration parameters is null");
157 throw new ApexException("engine service configuration parameters is null");
159 final String validation = config.validate();
160 if (validation != null && validation.length() > 0) {
161 LOGGER.warn("Invalid engine service configuration parameters: " + validation);
162 throw new ApexException("Invalid engine service configuration parameters: " + validation);
164 final AxArtifactKey engineServiceKey = config.getEngineKey();
165 final int threadCount = config.getInstanceCount();
167 // Check if the Apex model specified is sane
168 if (engineServiceKey == null) {
169 LOGGER.warn("engine service key is null");
170 throw new ApexException("engine service key is null");
173 return new EngineServiceImpl(engineServiceKey, threadCount, config.getPeriodicEventPeriod());
179 * @see com.ericsson.apex.service.engine.runtime.EngineService#registerActionListener(java.lang.
180 * String, com.ericsson.apex.service.engine.runtime.ApexEventListener)
183 public void registerActionListener(final String listenerName, final ApexEventListener apexEventListener) {
184 LOGGER.entry(apexEventListener);
186 // Register the Apex event listener on all engine workers, each worker will return Apex
187 // events to the listening application
188 for (final EngineService engineWorker : engineWorkerMap.values()) {
189 engineWorker.registerActionListener(listenerName, apexEventListener);
192 LOGGER.info("Added the action listener to the engine");
200 * com.ericsson.apex.service.engine.runtime.EngineService#deregisterActionListener(java.lang.
204 public void deregisterActionListener(final String listenerName) {
205 LOGGER.entry(listenerName);
207 // Register the Apex event listener on all engine workers, each worker will return Apex
208 // events to the listening application
209 for (final EngineService engineWorker : engineWorkerMap.values()) {
210 engineWorker.deregisterActionListener(listenerName);
213 LOGGER.info("Removed the action listener from the engine");
220 * @see com.ericsson.apex.service.engine.runtime.EngineService#getEngineServiceEventInterface()
223 public EngineServiceEventInterface getEngineServiceEventInterface() {
230 * @see com.ericsson.apex.service.engine.runtime.EngineService#getKey()
233 public AxArtifactKey getKey() {
234 return engineServiceKey;
240 * @see com.ericsson.apex.service.engine.runtime.EngineService#getInfo()
243 public Collection<AxArtifactKey> getEngineKeys() {
244 return engineWorkerMap.keySet();
250 * @see com.ericsson.apex.service.engine.runtime.EngineService#getApexModelKey()
253 public AxArtifactKey getApexModelKey() {
254 if (engineWorkerMap.size() == 0) {
258 return engineWorkerMap.entrySet().iterator().next().getValue().getApexModelKey();
265 * com.ericsson.apex.service.engine.runtime.EngineService#updateModel(com.ericsson.apex.model.
266 * basicmodel.concepts.AxArtifactKey, java.lang.String, boolean)
269 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final String apexModelString,
270 final boolean forceFlag) throws ApexException {
271 // Check if the Apex model specified is sane
272 if (apexModelString == null || apexModelString.trim().length() == 0) {
274 "model for updating on engine service with key " + incomingEngineServiceKey.getID() + " is empty");
275 throw new ApexException(
276 "model for updating on engine service with key " + incomingEngineServiceKey.getID() + " is empty");
279 // Read the Apex model into memory using the Apex Model Reader
280 AxPolicyModel apexPolicyModel = null;
282 final ApexModelReader<AxPolicyModel> modelReader = new ApexModelReader<>(AxPolicyModel.class);
283 apexPolicyModel = modelReader.read(new ByteArrayInputStream(apexModelString.getBytes()));
284 } catch (final ApexModelException e) {
285 LOGGER.error("failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getID(), e);
286 throw new ApexException(
287 "failed to unmarshal the apex model on engine service " + incomingEngineServiceKey.getID(), e);
290 if (apexPolicyModel == null) {
291 LOGGER.error("apex model null on engine service " + incomingEngineServiceKey.getID());
292 throw new ApexException("apex model null on engine service " + incomingEngineServiceKey.getID());
296 updateModel(incomingEngineServiceKey, apexPolicyModel, forceFlag);
305 * com.ericsson.apex.service.engine.runtime.EngineService#updateModel(com.ericsson.apex.model.
306 * basicmodel.concepts.AxArtifactKey,
307 * com.ericsson.apex.model.policymodel.concepts.AxPolicyModel, boolean)
310 public void updateModel(final AxArtifactKey incomingEngineServiceKey, final AxPolicyModel apexModel,
311 final boolean forceFlag) throws ApexException {
312 LOGGER.entry(incomingEngineServiceKey);
314 // Check if the Apex model specified is sane
315 if (apexModel == null) {
317 "model for updating on engine service with key " + incomingEngineServiceKey.getID() + " is null");
318 throw new ApexException(
319 "model for updating on engine service with key " + incomingEngineServiceKey.getID() + " is null");
322 // Check if the key on the update request is correct
323 if (!this.engineServiceKey.equals(incomingEngineServiceKey)) {
324 LOGGER.warn("engine service key " + incomingEngineServiceKey.getID() + " does not match the key"
325 + engineServiceKey.getID() + " of this engine service");
326 throw new ApexException("engine service key " + incomingEngineServiceKey.getID() + " does not match the key"
327 + engineServiceKey.getID() + " of this engine service");
330 // Check model compatibility
331 if (ModelService.existsModel(AxPolicyModel.class)) {
332 // The current policy model may or may not be defined
333 final AxPolicyModel currentModel = ModelService.getModel(AxPolicyModel.class);
334 if (!currentModel.getKey().isCompatible(apexModel.getKey())) {
336 LOGGER.warn("apex model update forced, supplied model with key \"" + apexModel.getKey().getID()
337 + "\" is not a compatible model update from the existing engine model with key \""
338 + currentModel.getKey().getID() + "\"");
340 throw new ContextException(
341 "apex model update failed, supplied model with key \"" + apexModel.getKey().getID()
342 + "\" is not a compatible model update from the existing engine model with key \""
343 + currentModel.getKey().getID() + "\"");
348 final boolean wasstopped = isStopped();
351 // Stop all engines on this engine service
353 final long stoptime = System.currentTimeMillis();
354 while (!isStopped() && System.currentTimeMillis() - stoptime < MAX_STOP_WAIT_TIME) {
355 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
357 // Check if all engines are stopped
358 final StringBuilder notStoppedEngineIDBuilder = new StringBuilder();
359 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
360 if (engineWorkerEntry.getValue().getState() != AxEngineState.STOPPED) {
361 notStoppedEngineIDBuilder.append(engineWorkerEntry.getKey().getID());
362 notStoppedEngineIDBuilder.append('(');
363 notStoppedEngineIDBuilder.append(engineWorkerEntry.getValue().getState());
364 notStoppedEngineIDBuilder.append(") ");
367 if (notStoppedEngineIDBuilder.length() > 0) {
368 final String errorString = "cannot update model on engine service with key "
369 + incomingEngineServiceKey.getID() + ", engines not stopped after " + MAX_STOP_WAIT_TIME
370 + "ms are: " + notStoppedEngineIDBuilder.toString().trim();
371 LOGGER.warn(errorString);
372 throw new ApexException(errorString);
376 // Update the engines
377 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
378 LOGGER.info("Registering apex model on engine {}", engineWorkerEntry.getKey().getID());
379 engineWorkerEntry.getValue().updateModel(engineWorkerEntry.getKey(), apexModel, forceFlag);
383 // start all engines on this engine service if it was not stopped before the update
385 final long starttime = System.currentTimeMillis();
386 while (!isStarted() && System.currentTimeMillis() - starttime < MAX_START_WAIT_TIME) {
387 ThreadUtilities.sleep(ENGINE_SERVICE_STOP_START_WAIT_INTERVAL);
389 // Check if all engines are running
390 final StringBuilder notRunningEngineIDBuilder = new StringBuilder();
391 for (final Entry<AxArtifactKey, EngineService> engineWorkerEntry : engineWorkerMap.entrySet()) {
392 if (engineWorkerEntry.getValue().getState() != AxEngineState.READY
393 && engineWorkerEntry.getValue().getState() != AxEngineState.EXECUTING) {
394 notRunningEngineIDBuilder.append(engineWorkerEntry.getKey().getID());
395 notRunningEngineIDBuilder.append('(');
396 notRunningEngineIDBuilder.append(engineWorkerEntry.getValue().getState());
397 notRunningEngineIDBuilder.append(") ");
400 if (notRunningEngineIDBuilder.length() > 0) {
401 final String errorString = "engine start error on model update on engine service with key "
402 + incomingEngineServiceKey.getID() + ", engines not running are: "
403 + notRunningEngineIDBuilder.toString().trim();
404 LOGGER.warn(errorString);
405 throw new ApexException(errorString);
415 * @see com.ericsson.apex.service.engine.runtime.EngineService#getState()
418 public AxEngineState getState() {
419 // If one worker is running then we are running, otherwise we are stopped
420 for (final EngineService engine : engineWorkerMap.values()) {
421 if (engine.getState() != AxEngineState.STOPPED) {
422 return AxEngineState.EXECUTING;
426 return AxEngineState.STOPPED;
432 * @see com.ericsson.apex.service.engine.runtime.EngineService#startAll()
435 public void startAll() throws ApexException {
436 for (final EngineService engine : engineWorkerMap.values()) {
437 start(engine.getKey());
440 // Check if periodic events should be turned on
441 if (periodicEventPeriod > 0) {
442 startPeriodicEvents(periodicEventPeriod);
450 * com.ericsson.apex.service.engine.runtime.EngineService#start(com.ericsson.apex.core.model.
451 * concepts.AxArtifactKey)
454 public void start(final AxArtifactKey engineKey) throws ApexException {
455 LOGGER.entry(engineKey);
457 // Check if we have this key on our map
458 if (!engineWorkerMap.containsKey(engineKey)) {
459 LOGGER.warn("engine with key " + engineKey.getID() + " not found in engine service");
460 throw new ApexException("engine with key " + engineKey.getID() + " not found in engine service");
464 engineWorkerMap.get(engineKey).start(engineKey);
466 LOGGER.exit(engineKey);
472 * @see com.ericsson.apex.service.engine.runtime.EngineService#stop()
475 public void stop() throws ApexException {
479 for (final EngineService engine : engineWorkerMap.values()) {
480 if (engine.getState() != AxEngineState.STOPPED) {
492 * com.ericsson.apex.service.engine.runtime.EngineService#stop(com.ericsson.apex.core.model.
493 * concepts.AxArtifactKey)
496 public void stop(final AxArtifactKey engineKey) throws ApexException {
497 LOGGER.entry(engineKey);
499 // Check if we have this key on our map
500 if (!engineWorkerMap.containsKey(engineKey)) {
501 LOGGER.warn("engine with key " + engineKey.getID() + " not found in engine service");
502 throw new ApexException("engine with key " + engineKey.getID() + " not found in engine service");
506 engineWorkerMap.get(engineKey).stop(engineKey);
508 LOGGER.exit(engineKey);
512 * Check all engines are started.
514 * @return true if <i>all</i> engines are started
515 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStarted()
518 public boolean isStarted() {
519 for (final EngineService engine : engineWorkerMap.values()) {
520 if (!engine.isStarted()) {
531 * com.ericsson.apex.service.engine.runtime.EngineService#isStarted(com.ericsson.apex.model.
532 * basicmodel.concepts.AxArtifactKey)
535 public boolean isStarted(final AxArtifactKey engineKey) {
536 // Check if we have this key on our map
537 if (!engineWorkerMap.containsKey(engineKey)) {
538 LOGGER.warn("engine with key " + engineKey.getID() + " not found in engine service");
540 return engineWorkerMap.get(engineKey).isStarted();
544 * Check all engines are stopped.
546 * @return true if <i>all</i> engines are stopped
547 * @see org.onap.policy.apex.service.engine.runtime.EngineService#isStopped()
550 public boolean isStopped() {
551 for (final EngineService engine : engineWorkerMap.values()) {
552 if (!engine.isStopped()) {
563 * com.ericsson.apex.service.engine.runtime.EngineService#isStopped(com.ericsson.apex.model.
564 * basicmodel.concepts.AxArtifactKey)
567 public boolean isStopped(final AxArtifactKey engineKey) {
568 // Check if we have this key on our map
569 if (!engineWorkerMap.containsKey(engineKey)) {
570 LOGGER.warn("engine with key " + engineKey.getID() + " not found in engine service");
572 return engineWorkerMap.get(engineKey).isStopped();
578 * @see com.ericsson.apex.service.engine.runtime.EngineService#startPeriodicEvents(long)
581 public void startPeriodicEvents(final long period) throws ApexException {
582 // Check if periodic events are already started
583 if (periodicEventGenerator != null) {
584 LOGGER.warn("Peiodic event geneation already running on engine " + engineServiceKey.getID() + ", "
585 + periodicEventGenerator.toString());
586 throw new ApexException("Peiodic event geneation already running on engine " + engineServiceKey.getID()
587 + ", " + periodicEventGenerator.toString());
590 // Set up periodic event execution, its a Java Timer/TimerTask
591 periodicEventGenerator = new ApexPeriodicEventGenerator(this.getEngineServiceEventInterface(), period);
593 // Record the periodic event period because it may have been set over the Web Socket admin
595 this.periodicEventPeriod = period;
601 * @see com.ericsson.apex.service.engine.runtime.EngineService#stopPeriodicEvents()
604 public void stopPeriodicEvents() throws ApexException {
605 // Check if periodic events are already started
606 if (periodicEventGenerator == null) {
607 LOGGER.warn("Peiodic event geneation not running on engine " + engineServiceKey.getID());
608 throw new ApexException("Peiodic event geneation not running on engine " + engineServiceKey.getID());
611 // Stop periodic events
612 periodicEventGenerator.cancel();
613 periodicEventGenerator = null;
620 * com.ericsson.apex.service.engine.runtime.EngineService#getStatus(com.ericsson.apex.core.model
621 * .concepts.AxArtifactKey)
624 public String getStatus(final AxArtifactKey engineKey) throws ApexException {
625 // Check if we have this key on our map
626 if (!engineWorkerMap.containsKey(engineKey)) {
627 LOGGER.warn("engine with key " + engineKey.getID() + " not found in engine service");
628 throw new ApexException("engine with key " + engineKey.getID() + " not found in engine service");
631 // Return the information for this worker
632 return engineWorkerMap.get(engineKey).getStatus(engineKey);
639 * com.ericsson.apex.service.engine.runtime.EngineService#getRuntimeInfo(com.ericsson.apex.core.
640 * model.concepts.AxArtifactKey)
643 public String getRuntimeInfo(final AxArtifactKey engineKey) throws ApexException {
644 // Check if we have this key on our map
645 if (!engineWorkerMap.containsKey(engineKey)) {
646 LOGGER.warn("engine with key " + engineKey.getID() + " not found in engine service");
647 throw new ApexException("engine with key " + engineKey.getID() + " not found in engine service");
650 // Return the information for this worker
651 return engineWorkerMap.get(engineKey).getRuntimeInfo(engineKey);
658 * com.ericsson.apex.service.engine.runtime.EngineServiceEventInterface#sendEvent(com.ericsson.
659 * apex.service.engine.event.ApexEvent)
662 public void sendEvent(final ApexEvent event) {
663 // Check if we have this key on our map
664 if (getState() == AxEngineState.STOPPED) {
665 LOGGER.warn("event " + event.getName() + " not processed, no engines on engine service "
666 + engineServiceKey.getID() + " are running");
671 LOGGER.warn("Null events cannot be processed, in engine service " + engineServiceKey.getID());
676 LOGGER.debug("Forwarding Apex Event {} to the processing engine", event);
679 // Add the incoming event to the queue, the next available worker will process it