Upgrade SDC from Titan to Janus Graph
[sdc.git] / catalog-model / src / main / java / org / openecomp / sdc / be / model / operations / impl / CacheMangerOperation.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * SDC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.openecomp.sdc.be.model.operations.impl;
22
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import org.openecomp.sdc.be.config.Configuration;
25 import org.openecomp.sdc.be.config.ConfigurationManager;
26 import org.openecomp.sdc.be.dao.janusgraph.JanusGraphGenericDao;
27 import org.openecomp.sdc.be.datatypes.enums.NodeTypeEnum;
28 import org.openecomp.sdc.be.model.cache.ComponentCache;
29 import org.openecomp.sdc.be.model.cache.DaoInfo;
30 import org.openecomp.sdc.be.model.cache.jobs.*;
31 import org.openecomp.sdc.be.model.cache.workers.CacheWorker;
32 import org.openecomp.sdc.be.model.cache.workers.IWorker;
33 import org.openecomp.sdc.be.model.cache.workers.SyncWorker;
34 import org.openecomp.sdc.be.model.jsonjanusgraph.operations.ToscaOperationFacade;
35 import org.openecomp.sdc.be.model.operations.api.ICacheMangerOperation;
36 import org.openecomp.sdc.common.log.wrappers.Logger;
37 import org.springframework.beans.factory.annotation.Autowired;
38 import org.springframework.stereotype.Component;
39
40 import javax.annotation.PostConstruct;
41 import javax.annotation.PreDestroy;
42 import java.util.LinkedList;
43 import java.util.concurrent.*;
44
45 /**
46  * Created by mlando on 9/5/2016. the class is responsible for handling all cache update operations asynchronously including sync between the graph and cache and on demand update requests
47  */
48 @Component("cacheManger-operation")
49 public class CacheMangerOperation implements ICacheMangerOperation {
50     @Autowired
51     private ToscaOperationFacade toscaOperationFacade;
52     @Autowired
53     private JanusGraphGenericDao janusGraphGenericDao;
54     @Autowired
55     private ComponentCache componentCache;
56
57     private static final Logger log = Logger.getLogger(CacheMangerOperation.class.getName());
58     private LinkedBlockingQueue<Job> jobQueue = null;
59     private int waitOnShutDownInMinutes;
60     private ScheduledExecutorService syncExecutor;
61     private ExecutorService workerExecutor;
62     private LinkedList<IWorker> workerList = new LinkedList<>();
63     private DaoInfo daoInfo;
64
65     /**
66      * constructor
67      */
68     public CacheMangerOperation() {
69     }
70
71     /**
72      * the method checks in the cache is enabled, if it is, it initializes all the workers according to the configuration values.
73      */
74     @PostConstruct
75     public void init() {
76
77         daoInfo = new DaoInfo(toscaOperationFacade, componentCache);
78
79         Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
80         if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
81             Integer numberOfWorkers = applicationL2CacheConfig.getQueue().getNumberOfCacheWorkers();
82             this.waitOnShutDownInMinutes = applicationL2CacheConfig.getQueue().getWaitOnShutDownInMinutes();
83             jobQueue = new LinkedBlockingQueue<>();
84             log.info("L2 Cache is enabled initializing queue");
85             log.debug("initializing SyncWorker, creating {} workers", numberOfWorkers);
86             ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Sync-Cache-Worker-%d").build();
87             this.syncExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
88             log.debug("initializing workers, creating {} cacheWorkers", numberOfWorkers);
89             threadFactory = new ThreadFactoryBuilder().setNameFormat("Cache-Worker-%d").build();
90             String workerName = "Sync-Worker";
91             Integer syncWorkerExacutionIntrval = applicationL2CacheConfig.getQueue().getSyncIntervalInSecondes();
92             log.debug("starting Sync worker:{} with executions interval:{} ", workerName, syncWorkerExacutionIntrval);
93             SyncWorker syncWorker = new SyncWorker(workerName, this);
94             this.syncExecutor.scheduleAtFixedRate(syncWorker, 5 * 60L, syncWorkerExacutionIntrval, TimeUnit.SECONDS);
95             this.workerExecutor = Executors.newFixedThreadPool(numberOfWorkers, threadFactory);
96             CacheWorker cacheWorker;
97             for (int i = 0; i < numberOfWorkers; i++) {
98                 workerName = "Cache-Worker-" + i;
99                 log.debug("starting Cache worker:{}", workerName);
100                 cacheWorker = new CacheWorker(workerName, jobQueue);
101                 this.workerExecutor.submit(cacheWorker);
102                 this.workerList.add(cacheWorker);
103             }
104         } else {
105             log.info("L2 Cache is disabled");
106         }
107         log.info("L2 Cache has been initialized and the workers are running");
108     }
109
110     /**
111      * the method creates a job to check it the given component is in the cach and if so is it valid if the value in the cache is not valid it will be updated.
112      *
113      * @param componentId
114      *            the uid of the component we want to update
115      * @param timestamp
116      *            the time of the component update
117      * @param nodeTypeEnum
118      *            the type of the component resource/service/product
119      */
120     @Override
121     public void updateComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
122         Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
123         if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
124             this.jobQueue.add(new CheckAndUpdateJob(daoInfo, componentId, nodeTypeEnum, timestamp));
125         }
126     }
127
128     public void overideComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
129         Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
130         if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
131             this.jobQueue.add(new OverrideJob(daoInfo, componentId, nodeTypeEnum, timestamp));
132         }
133     }
134
135     public void deleteComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
136         Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
137         if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
138             this.jobQueue.add(new DeleteJob(daoInfo, componentId, nodeTypeEnum, timestamp));
139         }
140     }
141
142     /**
143      * the method stores the given component in the cache
144      *
145      * @param component
146      *            componet to store in cache
147      * @param nodeTypeEnum
148      *            the type of the component we want to store
149      */
150     @Override
151     public void storeComponentInCache(org.openecomp.sdc.be.model.Component component, NodeTypeEnum nodeTypeEnum) {
152         Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
153         if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
154             this.jobQueue.add(new StoreJob(daoInfo, component, nodeTypeEnum));
155         }
156     }
157
158     /**
159      * the method shutdown's all the worker's. the method has a pre set of how long it will wait for the workers to shutdown. the pre defined value is taken from the configuration.
160      */
161     @PreDestroy
162     public void shutDown() {
163         workerExecutor.shutdown();
164         syncExecutor.shutdown();
165         this.workerList.forEach(IWorker::shutDown);
166         try {
167             if (!workerExecutor.awaitTermination(this.waitOnShutDownInMinutes, TimeUnit.MINUTES)) {
168                 log.error("timer elapsed while waiting for Cache workers to finish, forcing a shutdown. ");
169             }
170             log.debug("all Cache workers finished");
171         } catch (InterruptedException e) {
172             log.error("failed while waiting for Cache worker", e);
173             Thread.currentThread().interrupt();
174         }
175         try {
176             if (!workerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
177                 log.error("timer elapsed while waiting for the Sync worker's to finish, forcing a shutdown. ");
178             }
179             log.debug("sync worker finished");
180         } catch (InterruptedException e) {
181             log.error("failed while waiting for sync worker", e);
182             Thread.currentThread().interrupt();
183         }
184     }
185
186     public JanusGraphGenericDao getJanusGraphGenericDao() {
187         return janusGraphGenericDao;
188     }
189
190     public ComponentCache getComponentCache() {
191         return componentCache;
192     }
193 }