re base code
[sdc.git] / catalog-model / src / main / java / org / openecomp / sdc / be / model / cache / workers / SyncWorker.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.model.cache.workers;
22
23 import fj.data.Either;
24 import org.openecomp.sdc.be.dao.api.ActionStatus;
25 import org.openecomp.sdc.be.dao.titan.TitanOperationStatus;
26 import org.openecomp.sdc.be.datatypes.enums.NodeTypeEnum;
27 import org.openecomp.sdc.be.model.Component;
28 import org.openecomp.sdc.be.model.operations.impl.CacheMangerOperation;
29 import org.openecomp.sdc.be.model.operations.impl.UniqueIdBuilder;
30 import org.openecomp.sdc.be.resources.data.ComponentCacheData;
31 import org.openecomp.sdc.be.resources.data.ComponentMetadataData;
32 import org.openecomp.sdc.common.log.wrappers.Logger;
33
34 import java.util.ArrayList;
35 import java.util.HashMap;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.stream.Collectors;
39
40 /**
41  * the class creates a worker that is used to update cache date, in case of
42  * failures and inconsistencies
43  */
44 public class SyncWorker implements Runnable, IWorker {
45
46     private static final Logger log = Logger.getLogger(SyncWorker.class.getName());
47     private final CacheMangerOperation cacheMangerOperation;
48     private final String workerName;
49     private volatile boolean shutdown = false;
50     private Map<String, ComponentCacheData> cacheIdAndTimeMap;
51     private long updateDelayInMilliseconds = 60 * 60 * 1000L;
52
53     /**
54      * creates the sync worker
55      *
56      * @param workerName
57      *            the name of the worker
58      * @param cacheMangerOperation
59      *            responsible for all persistence's operations to graph and the
60      *            cache
61      */
62     public SyncWorker(String workerName, CacheMangerOperation cacheMangerOperation) {
63         this.workerName = workerName;
64         this.cacheMangerOperation = cacheMangerOperation;
65     }
66
67     /**
68      * the method collects all the resources/services/products from graph and
69      * checks if the component representing them in the cache is valid logic: if
70      * the record is present in the graph but not in cache -> create a job that
71      * will update the record oin cache if the timestamp of the record in cache
72      * is older than the timestamp on the graph -> create a job that will update
73      * the record oin cache otherwise no update is required
74      */
75     @Override
76     public void run() {
77         try {
78             collectAllCacheRecords();
79             syncCacheByComponentType(NodeTypeEnum.Resource);
80             syncCacheByComponentType(NodeTypeEnum.Service);
81             syncCacheByComponentType(NodeTypeEnum.Product);
82             clearCacheRecords();
83
84         } catch (Exception e) {
85             log.debug("sync worker:{} encounered an exception", workerName);
86             log.debug("exception", e);
87         } finally {
88             this.cacheMangerOperation.getTitanGenericDao().commit();
89         }
90     }
91
92     /**
93      * the method checks for each component in the cache except the ones that
94      * were update during the sync, if they exist on the graph if not a job to
95      * remove them is created
96      */
97     private void clearCacheRecords() {
98         cacheIdAndTimeMap.forEach((k, v) -> {
99             try {
100                 Either<ComponentMetadataData, TitanOperationStatus> componentFromGraphRes = getComponentMetaData(k,
101                         NodeTypeEnum.getByName(v.getType()));
102                 if (componentFromGraphRes.isRight()) {
103                     TitanOperationStatus error = componentFromGraphRes.right().value();
104                     if (TitanOperationStatus.NOT_FOUND.equals(error)) {
105                         long delay = System.currentTimeMillis() - v.getModificationTime().getTime();
106                         if (delay > updateDelayInMilliseconds) {
107                             this.cacheMangerOperation.deleteComponentInCache(k, v.getModificationTime().getTime(),
108                                     NodeTypeEnum.getByName(v.getType()));
109                         } else {
110                             log.trace(
111                                     "no delete done because an hour did not pass since the delete was done  timeSinceUpdate {} < updateDelayInMilliseconds {} ",
112                                     delay, updateDelayInMilliseconds);
113                         }
114                     } else {
115                         log.debug("failed to get metadata for id:{} from graph error:{}", k, error);
116                     }
117                 } else {
118                     log.trace("id {} is in graph nothing to do", k);
119                 }
120             } catch (Exception e) {
121                 log.debug("during clean cache records an exception was thrown", e);
122             }
123         });
124     }
125
126     /**
127      * the method collects all the records from cache except the component
128      * itself
129      */
130     public void collectAllCacheRecords() {
131         Either<List<ComponentCacheData>, ActionStatus> getAllRes = this.cacheMangerOperation.getComponentCache()
132                 .getAllComponentIdTimeAndType();
133         if (getAllRes.isRight()) {
134             log.debug("error while trying to get all records from cache error:{}", getAllRes.right().value());
135             cacheIdAndTimeMap = new HashMap<>();
136         } else {
137             cacheIdAndTimeMap = getAllRes.left().value().stream().collect(Collectors.toMap(ComponentCacheData::getId, e -> e));
138         }
139     }
140
141     /**
142      * the method checks that the records ot the given type are sync between the
143      * cache and the graph
144      *
145      * @param nodeTypeEnum
146      *            the type of components we want to sync
147      */
148     private void syncCacheByComponentType(NodeTypeEnum nodeTypeEnum) {
149         if (!this.shutdown) {
150             log.trace("syncCache records of type:{} .", nodeTypeEnum);
151             Either<List<ComponentMetadataData>, TitanOperationStatus> getAllResult = getAllComponentsMetaData(
152                     nodeTypeEnum);
153             List<ComponentMetadataData> componentList = new ArrayList<>();
154             if (getAllResult.isRight() && !TitanOperationStatus.NOT_FOUND.equals(getAllResult.right().value())) {
155                 log.debug("error while trying to get all components of type:{} TitanOperationStatus:{}.", nodeTypeEnum,
156                         getAllResult.right().value());
157                 return;
158             }
159             if (getAllResult.isLeft()) {
160                 componentList = getAllResult.left().value();
161                 log.trace("get all components of type:{} returned:{} components.", nodeTypeEnum, componentList.size());
162             }
163             componentList.forEach(this::checkAndUpdateCacheComponent);
164             log.trace("syncCache records of type:{} was successful.", nodeTypeEnum);
165         }
166     }
167
168     /**
169      * the method compares the given component to the record in the cache if the
170      * record is not in the cache a job to update the cache for this record will
171      * be created. if the record is present in the graph but not in cache ->
172      * create a job that will update the record oin cache if the timestamp of
173      * the record in cache is older than the timestamp on the graph -> create a
174      * job that will update the record oin cache if the retried component from
175      * cache fails to be deserialized -> create job to override it otherwise no
176      * update is required
177      *
178      * @param metadataData
179      *            the date of the node we want to compare to the value in the
180      *            cache
181      */
182     private void checkAndUpdateCacheComponent(ComponentMetadataData metadataData) {
183         long timeSinceUpdate = System.currentTimeMillis()
184                 - metadataData.getMetadataDataDefinition().getLastUpdateDate();
185         if (timeSinceUpdate >= updateDelayInMilliseconds) {
186             String uid = metadataData.getMetadataDataDefinition().getUniqueId();
187             log.trace("checking cache if record for uid:{} needs to be updated.", uid);
188             Either<Component, ActionStatus> cacheResult = this.cacheMangerOperation.getComponentCache()
189                     .getComponent(uid);
190             if (cacheResult.isRight()) {
191                 ActionStatus actionStatus = cacheResult.right().value();
192                 if (ActionStatus.RESOURCE_NOT_FOUND.equals(actionStatus)) {
193                     log.trace("record for uid:{} not found in cache. creating an update job.", uid);
194                     this.cacheMangerOperation.updateComponentInCache(uid,
195                             metadataData.getMetadataDataDefinition().getLastUpdateDate(),
196                             NodeTypeEnum.getByName(metadataData.getLabel()));
197                 } else if (ActionStatus.CONVERT_COMPONENT_ERROR.equals(actionStatus)) {
198                     log.trace("uid:{} found in cache but we failed deserializing it. creating an override job  .", uid);
199                     this.cacheMangerOperation.overideComponentInCache(uid,
200                             metadataData.getMetadataDataDefinition().getLastUpdateDate(),
201                             NodeTypeEnum.getByName(metadataData.getLabel()));
202                 } else {
203                     log.debug("during lookup for uid:{} an error accords status:{} .", uid, actionStatus);
204                 }
205             } else {
206                 log.trace("uid:{} found in cache.", uid);
207                 this.cacheIdAndTimeMap.remove(uid);
208                 Component cacheComponent = cacheResult.left().value();
209                 Long cacheTimestamp = cacheComponent.getLastUpdateDate();
210                 Long graphTimestamp = metadataData.getMetadataDataDefinition().getLastUpdateDate();
211                 if (cacheTimestamp < graphTimestamp) {
212                     log.trace("uid:{} found in cache. cache Timestamp {} < graph timestamp {} , creating an update job  .",
213                             uid, cacheTimestamp, graphTimestamp);
214                     this.cacheMangerOperation.updateComponentInCache(uid, graphTimestamp,
215                             NodeTypeEnum.getByName(metadataData.getLabel()));
216                 } else {
217                     log.trace("uid:{} found in cache. cache Timestamp {} => graph timestamp {}, no update is needed .",
218                             uid, cacheTimestamp, graphTimestamp);
219                 }
220             }
221         } else {
222             log.trace(
223                     "no update done because an hour did not pass since the update was done  timeSinceUpdate {} < updateDelayInMilliseconds {} ",
224                     timeSinceUpdate, updateDelayInMilliseconds);
225         }
226     }
227
228     /**
229      * the method sets the shutdown flag, when set the worker will stop it's
230      * execution as soon as possible with out completing its work
231      */
232     @Override
233     public void shutDown() {
234         log.debug("syncWorker {} shuting down.", workerName);
235         this.shutdown = true;
236     }
237
238     /**
239      * the method retrives all nodes matching the given node type from the graph
240      *
241      * @param nodeTypeEnum
242      *            node type we want to lookup on the graph
243      * @return a list of retrieved nodes matching the given type or not found in
244      *         case no nodes were found or error in case of failure
245      */
246     private Either<List<ComponentMetadataData>, TitanOperationStatus> getAllComponentsMetaData(
247             NodeTypeEnum nodeTypeEnum) {
248         return this.cacheMangerOperation.getTitanGenericDao().getByCriteria(nodeTypeEnum, null,
249                 ComponentMetadataData.class);
250     }
251
252     /**
253      * the method retrieves the metadata from graph for the given id
254      *
255      * @param uid
256      *            the unique id of the component we want to retrieve
257      * @param nodeTypeEnum
258      *            the type of the recored we want to retrieve
259      * @return the meta dat of the component or the error encountered during the
260      *         get
261      */
262     private Either<ComponentMetadataData, TitanOperationStatus> getComponentMetaData(String uid,
263             NodeTypeEnum nodeTypeEnum) {
264         return this.cacheMangerOperation.getTitanGenericDao().getNode(UniqueIdBuilder.getKeyByNodeType(nodeTypeEnum),
265                 uid, ComponentMetadataData.class);
266     }
267 }