2 * ============LICENSE_START=======================================================
3 * Copyright (C) 2016-2018 Ericsson. All rights reserved.
4 * Modifications Copyright (C) 2019 Nordix Foundation.
5 * Modifications Copyright (C) 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
19 * SPDX-License-Identifier: Apache-2.0
20 * ============LICENSE_END=========================================================
23 package org.onap.policy.apex.context.impl.distribution;
25 import java.util.Collections;
26 import java.util.HashMap;
28 import java.util.Map.Entry;
29 import lombok.AccessLevel;
32 import org.onap.policy.apex.context.ContextAlbum;
33 import org.onap.policy.apex.context.ContextException;
34 import org.onap.policy.apex.context.Distributor;
35 import org.onap.policy.apex.context.LockManager;
36 import org.onap.policy.apex.context.Persistor;
37 import org.onap.policy.apex.context.impl.ContextAlbumImpl;
38 import org.onap.policy.apex.context.impl.locking.LockManagerFactory;
39 import org.onap.policy.apex.context.impl.persistence.PersistorFactory;
40 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
41 import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation;
42 import org.onap.policy.apex.model.basicmodel.concepts.AxValidationResult;
43 import org.onap.policy.apex.model.basicmodel.service.ModelService;
44 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
45 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
46 import org.onap.policy.apex.model.contextmodel.concepts.AxContextModel;
47 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema;
48 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas;
49 import org.slf4j.ext.XLogger;
50 import org.slf4j.ext.XLoggerFactory;
53 * This context distributor implements the mechanism-neutral parts of a context distributor.
55 * @author Liam Fallon (liam.fallon@ericsson.com)
57 public abstract class AbstractDistributor implements Distributor {
59 // Logger for this class
60 private static final XLogger LOGGER = XLoggerFactory.getXLogger(AbstractDistributor.class);
62 // The context albums for this context set indexed by their keys
63 private static Map<AxArtifactKey, ContextAlbum> albumMaps = Collections
64 .synchronizedMap(new HashMap<AxArtifactKey, ContextAlbum>());
66 // Lock manager for this distributor
67 @Setter(AccessLevel.PRIVATE)
68 private static LockManager lockManager = null;
70 // Hold a flush timer for this context distributor
71 @Setter(AccessLevel.PRIVATE)
72 private static DistributorFlushTimerTask flushTimer = null;
74 // The key of this distributor
76 private AxArtifactKey key = null;
78 // Hold a persistor for this distributor
79 private Persistor persistor = null;
82 * Create an instance of an abstract Context Distributor.
84 protected AbstractDistributor() {
85 LOGGER.entry("AbstractContextDistributor()");
86 LOGGER.exit("AbstractContextDistributor()");
93 public void init(final AxArtifactKey distributorKey) throws ContextException {
94 LOGGER.entry("init(" + distributorKey + ")");
96 // Record parameters and key
97 this.key = distributorKey;
99 // Create the lock manager if it doesn't already exist
100 if (lockManager == null) {
101 setLockManager(new LockManagerFactory().createLockManager(key));
104 // Set up flushing on the context distributor if its not set up already
105 if (flushTimer == null) {
106 setFlushTimer(new DistributorFlushTimerTask(this));
109 // Create a new persistor for this key
110 persistor = new PersistorFactory().createPersistor(key);
111 LOGGER.exit("init(" + key + ")");
118 public abstract void shutdown();
121 * Create a context album using whatever underlying mechanism we are using for albums.
123 * @param contextAlbumKey The key of the album
124 * @return The album as a string-object map
126 public abstract Map<String, Object> getContextAlbumMap(AxArtifactKey contextAlbumKey);
132 public void registerModel(final AxContextModel contextModel) throws ContextException {
133 ModelService.registerModel(AxKeyInformation.class, contextModel.getKeyInformation());
134 ModelService.registerModel(AxContextSchemas.class, contextModel.getSchemas());
135 ModelService.registerModel(AxContextAlbums.class, contextModel.getAlbums());
142 public synchronized ContextAlbum createContextAlbum(final AxArtifactKey axContextAlbumKey) throws ContextException {
143 // Get the context album definition
144 final AxContextAlbum album = ModelService.getModel(AxContextAlbums.class).get(axContextAlbumKey);
146 final String resultString = "context album " + axContextAlbumKey.getId() + " does not exist";
147 LOGGER.warn(resultString);
148 throw new ContextException(resultString);
151 // Check if the context album is valid
152 final AxValidationResult result = album.validate(new AxValidationResult());
153 if (!result.isValid()) {
154 final String resultString = "context album definition for " + album.getKey().getId() + " is invalid"
156 LOGGER.warn(resultString);
157 throw new ContextException(resultString);
160 // Get the schema of the context album
161 final AxContextSchema schema = ModelService.getModel(AxContextSchemas.class).get(album.getItemSchema());
162 if (schema == null) {
163 final String resultString = "schema \"" + album.getItemSchema().getId() + "\" for context album "
164 + album.getKey().getId() + " does not exist";
165 LOGGER.warn(resultString);
166 throw new ContextException(resultString);
169 synchronized (albumMaps) {
170 // Check if the map has already been instantiated
171 if (!albumMaps.containsKey(album.getKey())) {
172 // Instantiate the album map for this context album that we'll distribute using the distribution
174 final Map<String, Object> newContextAlbumMap = getContextAlbumMap(album.getKey());
176 // The distributed context album will have content from another process instance if the album exists in
177 // another process, if not, we have to try to read the content from persistence
178 if (newContextAlbumMap.isEmpty()) {
179 // Read entries from persistence, (Not implemented yet)
182 // Create the context album and put the context album object onto the distributor
183 albumMaps.put(album.getKey(), new ContextAlbumImpl(album, this, newContextAlbumMap));
186 return albumMaps.get(album.getKey());
194 public void removeContextAlbum(final AxArtifactKey axContextAlbumKey) throws ContextException {
195 synchronized (albumMaps) {
196 // Remove the map from the distributor
197 if (null == albumMaps.remove(axContextAlbumKey)) {
198 throw new ContextException("map update failed, supplied map is null");
207 public void flush() throws ContextException {
208 synchronized (albumMaps) {
209 // Flush all the maps
210 for (final Entry<AxArtifactKey, ContextAlbum> distributorMapEntry : albumMaps.entrySet()) {
211 // Let the persistor write each of the entries
212 for (final Object contextItem : distributorMapEntry.getValue().values()) {
213 persistor.writeContextItem(contextItem);
223 public void flushContextAlbum(final ContextAlbum contextAlbum) throws ContextException {
224 synchronized (albumMaps) {
225 // Check if the map already exists, if not return
226 if (!albumMaps.containsKey(contextAlbum.getKey())) {
227 LOGGER.warn("map flush failed, supplied map is null");
228 throw new ContextException("map flush failed, supplied map is null");
231 // Let the persistor flush the items on the map
232 for (final Object contextItem : albumMaps.get(contextAlbum.getKey()).values()) {
233 persistor.writeContextItem(contextItem);
242 public synchronized void lockForReading(final AxArtifactKey mapKey, final String itemKey) throws ContextException {
243 // Lock using the lock manager
244 lockManager.lockForReading(mapKey.getId(), itemKey);
251 public synchronized void lockForWriting(final AxArtifactKey mapKey, final String itemKey) throws ContextException {
252 // Lock using the lock manager
253 lockManager.lockForWriting(mapKey.getId(), itemKey);
260 public void unlockForReading(final AxArtifactKey mapKey, final String itemKey) throws ContextException {
261 // Unlock using the lock manager
262 lockManager.unlockForReading(mapKey.getId(), itemKey);
269 public void unlockForWriting(final AxArtifactKey mapKey, final String itemKey) throws ContextException {
270 // Unlock using the lock manager
271 lockManager.unlockForWriting(mapKey.getId(), itemKey);
278 public void clear() {
279 // Shut down the lock manager
280 if (lockManager != null) {
281 lockManager.shutdown();
282 setLockManager(null);
285 synchronized (albumMaps) {
289 // Turn off the flush timer
292 // Shut down the specialization of the context distributor