Fix bugs in context flushing
[policy/apex-pdp.git] / context / context-management / src / main / java / org / onap / policy / apex / context / impl / distribution / AbstractDistributor.java
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2016-2018 Ericsson. All rights reserved.
4  * ================================================================================
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * SPDX-License-Identifier: Apache-2.0
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.policy.apex.context.impl.distribution;
22
23 import java.util.Collections;
24 import java.util.HashMap;
25 import java.util.Map;
26 import java.util.Map.Entry;
27
28 import org.onap.policy.apex.context.ContextAlbum;
29 import org.onap.policy.apex.context.ContextException;
30 import org.onap.policy.apex.context.Distributor;
31 import org.onap.policy.apex.context.LockManager;
32 import org.onap.policy.apex.context.Persistor;
33 import org.onap.policy.apex.context.impl.ContextAlbumImpl;
34 import org.onap.policy.apex.context.impl.locking.LockManagerFactory;
35 import org.onap.policy.apex.context.impl.persistence.PersistorFactory;
36 import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey;
37 import org.onap.policy.apex.model.basicmodel.concepts.AxKeyInformation;
38 import org.onap.policy.apex.model.basicmodel.concepts.AxValidationResult;
39 import org.onap.policy.apex.model.basicmodel.service.ModelService;
40 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbum;
41 import org.onap.policy.apex.model.contextmodel.concepts.AxContextAlbums;
42 import org.onap.policy.apex.model.contextmodel.concepts.AxContextModel;
43 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchema;
44 import org.onap.policy.apex.model.contextmodel.concepts.AxContextSchemas;
45 import org.slf4j.ext.XLogger;
46 import org.slf4j.ext.XLoggerFactory;
47
48 /**
49  * This context distributor implements the mechanism-neutral parts of a context distributor.
50  *
51  * @author Liam Fallon (liam.fallon@ericsson.com)
52  */
53 public abstract class AbstractDistributor implements Distributor {
54
55     // Logger for this class
56     private static final XLogger LOGGER = XLoggerFactory.getXLogger(AbstractDistributor.class);
57
58     // The key of this distributor
59     private AxArtifactKey key = null;
60
61     // The context albums for this context set indexed by their keys
62     private static Map<AxArtifactKey, ContextAlbum> albumMaps = Collections
63                     .synchronizedMap(new HashMap<AxArtifactKey, ContextAlbum>());
64
65     // Lock manager for this distributor
66     private static LockManager lockManager = null;
67
68     // Hold a persistor for this distributor
69     private Persistor persistor = null;
70
71     // Hold a flush timer for this context distributor
72     private static DistributorFlushTimerTask flushTimer = null;
73
74     /**
75      * Create an instance of an abstract Context Distributor.
76      */
77     public AbstractDistributor() {
78         LOGGER.entry("AbstractContextDistributor()");
79         LOGGER.exit("AbstractContextDistributor()");
80     }
81
82     /*
83      * (non-Javadoc)
84      *
85      * @see org.onap.policy.apex.context.ContextDistributor#init(org.onap.policy.apex.model.basicmodel.concepts.
86      * AxArtifactKey)
87      */
88     @Override
89     public void init(final AxArtifactKey distributorKey) throws ContextException {
90         LOGGER.entry("init(" + distributorKey + ")");
91
92         // Record parameters and key
93         this.key = distributorKey;
94
95         // Create the lock manager if it doesn't already exist
96         if (lockManager == null) {
97             setLockManager(new LockManagerFactory().createLockManager(key));
98         }
99
100         // Set up flushing on the context distributor if its not set up already
101         if (flushTimer == null) {
102             setFlushTimer(new DistributorFlushTimerTask(this));
103         }
104
105         // Create a new persistor for this key
106         persistor = new PersistorFactory().createPersistor(key);
107         LOGGER.exit("init(" + key + ")");
108     }
109
110     /**
111      * Set the static lock manager.
112      *
113      * @param incomingLockManager the lock manager value
114      */
115     private static void setLockManager(final LockManager incomingLockManager) {
116         lockManager = incomingLockManager;
117     }
118
119     /**
120      * Set the static flush timer.
121      *
122      * @param incomingFlushTimer the flush timer value
123      */
124     private static void setFlushTimer(final DistributorFlushTimerTask incomingFlushTimer) {
125         flushTimer = incomingFlushTimer;
126     }
127
128     /*
129      * (non-Javadoc)
130      *
131      * @see org.onap.policy.apex.context.ContextDistributor#shutdown()
132      */
133     @Override
134     public abstract void shutdown();
135
136     /*
137      * (non-Javadoc)
138      *
139      * @see org.onap.policy.apex.context.ContextDistributor#getKey()
140      */
141     @Override
142     public AxArtifactKey getKey() {
143         return key;
144     }
145
146     /**
147      * Create a context album using whatever underlying mechanism we are using for albums.
148      *
149      * @param contextAlbumKey The key of the album
150      * @return The album as a string-object map
151      */
152     public abstract Map<String, Object> getContextAlbumMap(AxArtifactKey contextAlbumKey);
153
154     /*
155      * (non-Javadoc)
156      *
157      * @see org.onap.policy.apex.context.Distributor#registerModel(org.onap.policy.apex.model.contextmodel.concepts.
158      * AxContextModel)
159      */
160     @Override
161     public void registerModel(final AxContextModel contextModel) throws ContextException {
162         ModelService.registerModel(AxKeyInformation.class, contextModel.getKeyInformation());
163         ModelService.registerModel(AxContextSchemas.class, contextModel.getSchemas());
164         ModelService.registerModel(AxContextAlbums.class, contextModel.getAlbums());
165     }
166
167     /*
168      * (non-Javadoc)
169      *
170      * @see
171      * org.onap.policy.apex.core.context.ContextDistributor#createContextAlbum(org.onap.policy.apex.core.basicmodel.
172      * concepts. AxArtifactKey)
173      */
174     @Override
175     public synchronized ContextAlbum createContextAlbum(final AxArtifactKey axContextAlbumKey) throws ContextException {
176         // Get the context album definition
177         final AxContextAlbum album = ModelService.getModel(AxContextAlbums.class).get(axContextAlbumKey);
178         if (album == null) {
179             final String resultString = "context album " + axContextAlbumKey.getId() + " does not exist";
180             LOGGER.warn(resultString);
181             throw new ContextException(resultString);
182         }
183
184         // Check if the context album is valid
185         final AxValidationResult result = album.validate(new AxValidationResult());
186         if (!result.isValid()) {
187             final String resultString = "context album definition for " + album.getKey().getId() + " is invalid"
188                             + result;
189             LOGGER.warn(resultString);
190             throw new ContextException(resultString);
191         }
192
193         // Get the schema of the context album
194         final AxContextSchema schema = ModelService.getModel(AxContextSchemas.class).get(album.getItemSchema());
195         if (schema == null) {
196             final String resultString = "schema \"" + album.getItemSchema().getId() + "\" for context album "
197                             + album.getKey().getId() + " does not exist";
198             LOGGER.warn(resultString);
199             throw new ContextException(resultString);
200         }
201
202         synchronized (albumMaps) {
203             // Check if the map has already been instantiated
204             if (!albumMaps.containsKey(album.getKey())) {
205                 // Instantiate the album map for this context album that we'll distribute using the distribution
206                 // mechanism
207                 final Map<String, Object> newContextAlbumMap = getContextAlbumMap(album.getKey());
208
209                 // The distributed context album will have content from another process instance if the album exists in
210                 // another process, if not, we have to try to read the content from persistence
211                 if (newContextAlbumMap.isEmpty()) {
212                     // Read entries from persistence, (Not implemented yet)
213                 }
214
215                 // Create the context album and put the context album object onto the distributor
216                 albumMaps.put(album.getKey(), new ContextAlbumImpl(album, this, newContextAlbumMap));
217             }
218
219             return albumMaps.get(album.getKey());
220         }
221     }
222
223     /*
224      * (non-Javadoc)
225      *
226      * @see
227      * org.onap.policy.apex.core.context.ContextDistributor#removeContextAlbum(org.onap.policy.apex.core.basicmodel.
228      * concepts. AxArtifactKey)
229      */
230     @Override
231     public void removeContextAlbum(final AxContextAlbum contextAlbum) throws ContextException {
232         synchronized (albumMaps) {
233             // Check if the map already exists, if not return
234             if (!albumMaps.containsKey(contextAlbum.getKey())) {
235                 LOGGER.warn("map remove failed, supplied map is null");
236                 throw new ContextException("map update failed, supplied map is null");
237             }
238
239             // Remove the map from the distributor
240             albumMaps.remove(contextAlbum.getKey());
241         }
242     }
243
244     /*
245      * (non-Javadoc)
246      *
247      * @see org.onap.policy.apex.core.context.ContextDistributor#flush()
248      */
249     @Override
250     public void flush() throws ContextException {
251         synchronized (albumMaps) {
252             // Flush all the maps
253             for (final Entry<AxArtifactKey, ContextAlbum> distributorMapEntry : albumMaps.entrySet()) {
254                 // Let the persistor write each of the entries
255                 for (final Object contextItem : distributorMapEntry.getValue().values()) {
256                     persistor.writeContextItem(contextItem);
257                 }
258             }
259         }
260     }
261
262     /*
263      * (non-Javadoc)
264      *
265      * @see org.onap.policy.apex.core.context.ContextDistributor#flushContextAlbum(org.onap.policy.apex.core.context.
266      * ContextAlbum)
267      */
268     @Override
269     public void flushContextAlbum(final ContextAlbum contextAlbum) throws ContextException {
270         synchronized (albumMaps) {
271             // Check if the map already exists, if not return
272             if (!albumMaps.containsKey(contextAlbum.getKey())) {
273                 LOGGER.warn("map flush failed, supplied map is null");
274                 throw new ContextException("map flush failed, supplied map is null");
275             }
276
277             // Let the persistor flush the items on the map
278             for (final Object contextItem : albumMaps.get(contextAlbum.getKey()).values()) {
279                 persistor.writeContextItem(contextItem);
280             }
281         }
282     }
283
284     /*
285      * (non-Javadoc)
286      *
287      * @see org.onap.policy.apex.core.context.ContextDistributor#lockForReading(java.lang.String)
288      */
289     @Override
290     public synchronized void lockForReading(final AxArtifactKey mapKey, final String itemKey) throws ContextException {
291         // Lock using the lock manager
292         lockManager.lockForReading(mapKey.getId(), itemKey);
293     }
294
295     /*
296      * (non-Javadoc)
297      *
298      * @see org.onap.policy.apex.core.context.ContextDistributor#lockForWriting(java.lang.String)
299      */
300     @Override
301     public synchronized void lockForWriting(final AxArtifactKey mapKey, final String itemKey) throws ContextException {
302         // Lock using the lock manager
303         lockManager.lockForWriting(mapKey.getId(), itemKey);
304     }
305
306     /*
307      * (non-Javadoc)
308      *
309      * @see org.onap.policy.apex.core.context.ContextDistributor#unlockForReading(java.lang.String)
310      */
311     @Override
312     public void unlockForReading(final AxArtifactKey mapKey, final String itemKey) throws ContextException {
313         // Unlock using the lock manager
314         lockManager.unlockForReading(mapKey.getId(), itemKey);
315     }
316
317     /*
318      * (non-Javadoc)
319      *
320      * @see org.onap.policy.apex.core.context.ContextDistributor#unlockForWriting(java.lang.String)
321      */
322     @Override
323     public void unlockForWriting(final AxArtifactKey mapKey, final String itemKey) throws ContextException {
324         // Unlock using the lock manager
325         lockManager.unlockForWriting(mapKey.getId(), itemKey);
326     }
327
328     /*
329      * (non-Javadoc)
330      *
331      * @see org.onap.policy.apex.core.context.ContextDistributor#clear()
332      */
333     @Override
334     public void clear() {
335         // Shut down the lock manager
336         if (lockManager != null) {
337             lockManager.shutdown();
338             setLockManager(null);
339         }
340
341         synchronized (albumMaps) {
342             albumMaps.clear();
343         }
344
345         // Turn off the flush timer
346         flushTimer.cancel();
347
348         // Shut down the specialization of the context distributor
349         shutdown();
350     }
351 }