2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.model.operations.impl;
23 import com.google.common.util.concurrent.ThreadFactoryBuilder;
24 import org.openecomp.sdc.be.config.Configuration;
25 import org.openecomp.sdc.be.config.ConfigurationManager;
26 import org.openecomp.sdc.be.dao.janusgraph.JanusGraphGenericDao;
27 import org.openecomp.sdc.be.datatypes.enums.NodeTypeEnum;
28 import org.openecomp.sdc.be.model.cache.ComponentCache;
29 import org.openecomp.sdc.be.model.cache.DaoInfo;
30 import org.openecomp.sdc.be.model.cache.jobs.*;
31 import org.openecomp.sdc.be.model.cache.workers.CacheWorker;
32 import org.openecomp.sdc.be.model.cache.workers.IWorker;
33 import org.openecomp.sdc.be.model.cache.workers.SyncWorker;
34 import org.openecomp.sdc.be.model.jsonjanusgraph.operations.ToscaOperationFacade;
35 import org.openecomp.sdc.be.model.operations.api.ICacheMangerOperation;
36 import org.openecomp.sdc.common.log.wrappers.Logger;
37 import org.springframework.beans.factory.annotation.Autowired;
38 import org.springframework.stereotype.Component;
40 import javax.annotation.PostConstruct;
41 import javax.annotation.PreDestroy;
42 import java.util.LinkedList;
43 import java.util.concurrent.*;
46 * Created by mlando on 9/5/2016. the class is responsible for handling all cache update operations asynchronously including sync between the graph and cache and on demand update requests
48 @Component("cacheManger-operation")
49 public class CacheMangerOperation implements ICacheMangerOperation {
51 private ToscaOperationFacade toscaOperationFacade;
53 private JanusGraphGenericDao janusGraphGenericDao;
55 private ComponentCache componentCache;
57 private static final Logger log = Logger.getLogger(CacheMangerOperation.class.getName());
58 private LinkedBlockingQueue<Job> jobQueue = null;
59 private int waitOnShutDownInMinutes;
60 private ScheduledExecutorService syncExecutor;
61 private ExecutorService workerExecutor;
62 private LinkedList<IWorker> workerList = new LinkedList<>();
63 private DaoInfo daoInfo;
68 public CacheMangerOperation() {
72 * the method checks in the cache is enabled, if it is, it initializes all the workers according to the configuration values.
77 daoInfo = new DaoInfo(toscaOperationFacade, componentCache);
79 Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
80 if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
81 Integer numberOfWorkers = applicationL2CacheConfig.getQueue().getNumberOfCacheWorkers();
82 this.waitOnShutDownInMinutes = applicationL2CacheConfig.getQueue().getWaitOnShutDownInMinutes();
83 jobQueue = new LinkedBlockingQueue<>();
84 log.info("L2 Cache is enabled initializing queue");
85 log.debug("initializing SyncWorker, creating {} workers", numberOfWorkers);
86 ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Sync-Cache-Worker-%d").build();
87 this.syncExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
88 log.debug("initializing workers, creating {} cacheWorkers", numberOfWorkers);
89 threadFactory = new ThreadFactoryBuilder().setNameFormat("Cache-Worker-%d").build();
90 String workerName = "Sync-Worker";
91 Integer syncWorkerExacutionIntrval = applicationL2CacheConfig.getQueue().getSyncIntervalInSecondes();
92 log.debug("starting Sync worker:{} with executions interval:{} ", workerName, syncWorkerExacutionIntrval);
93 SyncWorker syncWorker = new SyncWorker(workerName, this);
94 this.syncExecutor.scheduleAtFixedRate(syncWorker, 5 * 60L, syncWorkerExacutionIntrval, TimeUnit.SECONDS);
95 this.workerExecutor = Executors.newFixedThreadPool(numberOfWorkers, threadFactory);
96 CacheWorker cacheWorker;
97 for (int i = 0; i < numberOfWorkers; i++) {
98 workerName = "Cache-Worker-" + i;
99 log.debug("starting Cache worker:{}", workerName);
100 cacheWorker = new CacheWorker(workerName, jobQueue);
101 this.workerExecutor.submit(cacheWorker);
102 this.workerList.add(cacheWorker);
105 log.info("L2 Cache is disabled");
107 log.info("L2 Cache has been initialized and the workers are running");
111 * the method creates a job to check it the given component is in the cach and if so is it valid if the value in the cache is not valid it will be updated.
114 * the uid of the component we want to update
116 * the time of the component update
117 * @param nodeTypeEnum
118 * the type of the component resource/service/product
121 public void updateComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
122 Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
123 if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
124 this.jobQueue.add(new CheckAndUpdateJob(daoInfo, componentId, nodeTypeEnum, timestamp));
128 public void overideComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
129 Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
130 if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
131 this.jobQueue.add(new OverrideJob(daoInfo, componentId, nodeTypeEnum, timestamp));
135 public void deleteComponentInCache(String componentId, long timestamp, NodeTypeEnum nodeTypeEnum) {
136 Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
137 if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
138 this.jobQueue.add(new DeleteJob(daoInfo, componentId, nodeTypeEnum, timestamp));
143 * the method stores the given component in the cache
146 * componet to store in cache
147 * @param nodeTypeEnum
148 * the type of the component we want to store
151 public void storeComponentInCache(org.openecomp.sdc.be.model.Component component, NodeTypeEnum nodeTypeEnum) {
152 Configuration.ApplicationL2CacheConfig applicationL2CacheConfig = ConfigurationManager.getConfigurationManager().getConfiguration().getApplicationL2Cache();
153 if (applicationL2CacheConfig != null && applicationL2CacheConfig.isEnabled()) {
154 this.jobQueue.add(new StoreJob(daoInfo, component, nodeTypeEnum));
159 * the method shutdown's all the worker's. the method has a pre set of how long it will wait for the workers to shutdown. the pre defined value is taken from the configuration.
162 public void shutDown() {
163 workerExecutor.shutdown();
164 syncExecutor.shutdown();
165 this.workerList.forEach(IWorker::shutDown);
167 if (!workerExecutor.awaitTermination(this.waitOnShutDownInMinutes, TimeUnit.MINUTES)) {
168 log.error("timer elapsed while waiting for Cache workers to finish, forcing a shutdown. ");
170 log.debug("all Cache workers finished");
171 } catch (InterruptedException e) {
172 log.error("failed while waiting for Cache worker", e);
173 Thread.currentThread().interrupt();
176 if (!workerExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
177 log.error("timer elapsed while waiting for the Sync worker's to finish, forcing a shutdown. ");
179 log.debug("sync worker finished");
180 } catch (InterruptedException e) {
181 log.error("failed while waiting for sync worker", e);
182 Thread.currentThread().interrupt();
186 public JanusGraphGenericDao getJanusGraphGenericDao() {
187 return janusGraphGenericDao;
190 public ComponentCache getComponentCache() {
191 return componentCache;