2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.openecomp.sdc.be.dao.janusgraph;
23 import org.janusgraph.core.*;
24 import org.janusgraph.core.schema.ConsistencyModifier;
25 import org.janusgraph.core.schema.JanusGraphIndex;
26 import org.janusgraph.core.schema.JanusGraphManagement;
27 import org.janusgraph.diskstorage.BackendException;
28 import org.janusgraph.diskstorage.ResourceUnavailableException;
29 import org.janusgraph.diskstorage.locking.PermanentLockingException;
30 import org.janusgraph.graphdb.database.idassigner.IDPoolExhaustedException;
31 import fj.data.Either;
32 import org.apache.commons.configuration.BaseConfiguration;
33 import org.apache.tinkerpop.gremlin.structure.T;
34 import org.apache.tinkerpop.gremlin.structure.Vertex;
35 import org.openecomp.sdc.be.config.BeEcompErrorManager;
36 import org.openecomp.sdc.be.config.ConfigurationManager;
37 import org.openecomp.sdc.be.dao.DAOJanusGraphStrategy;
38 import org.openecomp.sdc.be.dao.JanusGraphClientStrategy;
39 import org.openecomp.sdc.be.dao.neo4j.GraphPropertiesDictionary;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.stereotype.Component;
44 import javax.annotation.PostConstruct;
45 import javax.annotation.PreDestroy;
46 import java.util.ArrayList;
47 import java.util.HashMap;
48 import java.util.concurrent.*;
51 @Component("janusgraph-client")
52 public class JanusGraphClient {
53 private static Logger logger = LoggerFactory.getLogger(JanusGraphClient.class.getName());
54 private static Logger healthLogger = LoggerFactory.getLogger("janusgraph.healthcheck");
56 private static final String HEALTH_CHECK = GraphPropertiesDictionary.HEALTH_CHECK.getProperty();
57 private static final String OK = "GOOD";
59 public JanusGraphClient() {
63 public void closeSession(){
64 if ( graph.isOpen() ){
66 logger.info("** JanusGraphClient session closed");
69 private class HealthCheckTask implements Callable<Vertex> {
71 public Vertex call() {
73 JanusGraphVertex v = (JanusGraphVertex) graph.query().has(HEALTH_CHECK, OK).vertices().iterator().next();
74 JanusGraphVertexProperty<String> property = v.property("healthcheck", OK + "_" + System.currentTimeMillis());
75 healthLogger.trace("Health Check Node Found...{}", v.property(HEALTH_CHECK));
82 private class HealthCheckScheduledTask implements Runnable {
85 healthLogger.trace("Executing janusGraph Health Check Task - Start");
86 boolean healthStatus = isGraphOpen();
87 healthLogger.trace("Executing janusGraph Health Check Task - Status = {}", healthStatus);
88 if (healthStatus != lastHealthState) {
89 logger.trace("janusGraph Health State Changed to {}. Issuing alarm / recovery alarm...", healthStatus);
90 lastHealthState = healthStatus;
96 private class ReconnectTask implements Runnable {
99 logger.trace("Trying to reconnect to JanusGraph...");
101 createGraph(janusGraphCfgFile);
106 private JanusGraph graph;
108 // Health Check Variables
111 * This executor will execute the health check task on a callable task that can be executed with a timeout.
113 ExecutorService healthCheckExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
115 public Thread newThread(Runnable r) {
116 return new Thread(r, "JanusGraph-Health-Check-Thread");
119 private long healthCheckReadTimeout = 2;
120 HealthCheckTask healthCallableTask = new HealthCheckTask();
121 HealthCheckScheduledTask healthCheckScheduledTask = new HealthCheckScheduledTask();
122 boolean lastHealthState = false;
124 // Reconnection variables
125 private ScheduledExecutorService reconnectScheduler = null;
126 private ScheduledExecutorService healthCheckScheduler = null;
127 private Runnable reconnectTask = null;
128 private long reconnectInterval = 3;
129 @SuppressWarnings("rawtypes")
130 private Future reconnectFuture;
132 private String janusGraphCfgFile = null;
133 JanusGraphClientStrategy janusGraphClientStrategy;
135 public JanusGraphClient(JanusGraphClientStrategy janusGraphClientStrategy) {
137 this.janusGraphClientStrategy = janusGraphClientStrategy;
139 // Initialize a single threaded scheduler for health-check
140 this.healthCheckScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
142 public Thread newThread(Runnable r) {
143 return new Thread(r, "JanusGraph-Health-Check-Task");
147 healthCheckReadTimeout = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphHealthCheckReadTimeout(2);
148 reconnectInterval = ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphReconnectIntervalInSeconds(3);
150 logger.info("** JanusGraphClient created");
154 public JanusGraphOperationStatus createGraph() {
156 logger.info("** createGraph started **");
158 if (ConfigurationManager.getConfigurationManager().getConfiguration().getJanusGraphInMemoryGraph()) {
159 BaseConfiguration conf = new BaseConfiguration();
160 conf.setProperty("storage.backend", "inmemory");
161 graph = JanusGraphFactory.open(conf);
162 createJanusGraphSchema();
163 logger.info("** in memory graph created");
164 return JanusGraphOperationStatus.OK;
166 this.janusGraphCfgFile = janusGraphClientStrategy.getConfigFile();
167 if (janusGraphCfgFile == null || janusGraphCfgFile.isEmpty()) {
168 janusGraphCfgFile = "config/janusgraph.properties";
172 // In case connection failed on init time, schedule a reconnect task
174 JanusGraphOperationStatus status = createGraph(janusGraphCfgFile);
175 logger.debug("Create JanusGraph graph status {}", status);
176 if (status != JanusGraphOperationStatus.OK) {
177 this.startReconnectTask();
184 private void startHealthCheckTask() {
185 this.healthCheckScheduler.scheduleAtFixedRate(healthCheckScheduledTask, 0, reconnectInterval, TimeUnit.SECONDS);
189 * This method will be invoked ONLY on init time in case JanusGraph storage is down.
191 private void startReconnectTask() {
192 this.reconnectTask = new ReconnectTask();
193 // Initialize a single threaded scheduler
194 this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
196 public Thread newThread(Runnable r) {
197 return new Thread(r, "JanusGraph-Reconnect-Task");
201 logger.info("Scheduling reconnect task {} with interval of {} seconds", reconnectTask, reconnectInterval);
202 reconnectFuture = this.reconnectScheduler.scheduleAtFixedRate(this.reconnectTask, 0, this.reconnectInterval, TimeUnit.SECONDS);
205 public void cleanupGraph() {
210 JanusGraphFactory.drop(graph);
211 } catch (BackendException e) {
217 private boolean graphInitialized(){
218 JanusGraphManagement graphMgmt = graph.openManagement();
219 return graphMgmt.containsPropertyKey(HEALTH_CHECK) && graphMgmt.containsGraphIndex(HEALTH_CHECK);
223 public JanusGraphOperationStatus createGraph(String janusGraphCfgFile) {
224 logger.info("** open graph with {} started", janusGraphCfgFile);
226 logger.info("openGraph : try to load file {}", janusGraphCfgFile);
227 graph = JanusGraphFactory.open(janusGraphCfgFile);
228 if (graph.isClosed() || !graphInitialized()) {
229 logger.error("janusgraph graph was not initialized");
230 return JanusGraphOperationStatus.NOT_CREATED;
233 } catch (Exception e) {
235 logger.info("createGraph : failed to open JanusGraph graph with configuration file: {}", janusGraphCfgFile);
236 logger.debug("createGraph : failed with exception.", e);
237 return JanusGraphOperationStatus.NOT_CONNECTED;
240 logger.info("** JanusGraph graph created ");
242 // Do some post creation actions
243 this.onGraphOpened();
245 return JanusGraphOperationStatus.OK;
248 private void onGraphOpened() {
249 // if a reconnect task is running, cancel it.
250 if (this.reconnectFuture != null) {
251 logger.info("** Cancelling JanusGraph reconnect task");
252 reconnectFuture.cancel(true);
255 // create health-check node
256 if (!graph.query().has(HEALTH_CHECK, OK).vertices().iterator().hasNext()) {
257 logger.trace("Healthcheck Singleton node does not exist, Creating healthcheck node...");
258 Vertex healthCheckNode = graph.addVertex();
259 healthCheckNode.property(HEALTH_CHECK, OK);
260 logger.trace("Healthcheck node created successfully. ID={}", healthCheckNode.property(T.id.getAccessor()));
263 logger.trace("Skipping Healthcheck Singleton node creation. Already exist...");
265 this.startHealthCheckTask();
269 public Either<JanusGraph, JanusGraphOperationStatus> getGraph() {
271 return Either.left(graph);
273 return Either.right(JanusGraphOperationStatus.NOT_CREATED);
277 public JanusGraphOperationStatus commit() {
281 return JanusGraphOperationStatus.OK;
282 } catch (Exception e) {
283 return handleJanusGraphException(e);
286 return JanusGraphOperationStatus.NOT_CREATED;
290 public JanusGraphOperationStatus rollback() {
294 graph.tx().rollback();
295 return JanusGraphOperationStatus.OK;
296 } catch (Exception e) {
297 return handleJanusGraphException(e);
300 return JanusGraphOperationStatus.NOT_CREATED;
304 public static JanusGraphOperationStatus handleJanusGraphException(Exception e) {
305 if (e instanceof JanusGraphConfigurationException) {
306 return JanusGraphOperationStatus.JANUSGRAPH_CONFIGURATION;
308 if (e instanceof SchemaViolationException) {
309 return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
311 if (e instanceof PermanentLockingException) {
312 return JanusGraphOperationStatus.JANUSGRAPH_SCHEMA_VIOLATION;
314 if (e instanceof IDPoolExhaustedException) {
315 return JanusGraphOperationStatus.GENERAL_ERROR;
317 if (e instanceof InvalidElementException) {
318 return JanusGraphOperationStatus.INVALID_ELEMENT;
320 if (e instanceof InvalidIDException) {
321 return JanusGraphOperationStatus.INVALID_ID;
323 if (e instanceof QueryException) {
324 return JanusGraphOperationStatus.INVALID_QUERY;
326 if (e instanceof ResourceUnavailableException) {
327 return JanusGraphOperationStatus.RESOURCE_UNAVAILABLE;
329 if (e instanceof IllegalArgumentException) {
330 // TODO check the error message??
331 return JanusGraphOperationStatus.ILLEGAL_ARGUMENT;
334 return JanusGraphOperationStatus.GENERAL_ERROR;
337 public boolean getHealth() {
338 return this.lastHealthState;
341 private boolean isGraphOpen() {
342 healthLogger.trace("Invoking JanusGraph health check ...");
346 Future<Vertex> future = healthCheckExecutor.submit(healthCallableTask);
347 v = future.get(this.healthCheckReadTimeout, TimeUnit.SECONDS);
348 healthLogger.trace("Health Check Node Found... {}", v.property(HEALTH_CHECK));
350 } catch (Exception e) {
351 String message = e.getMessage();
352 if (message == null) {
353 message = e.getClass().getName();
355 logger.error("JanusGraph Health Check Failed. {}", message);
365 public static void main(String[] args) throws InterruptedException {
366 JanusGraphClient client = new JanusGraphClient(new DAOJanusGraphStrategy());
367 client.createGraph();
370 boolean health = client.isGraphOpen();
371 System.err.println("health=" + health);
378 private static final String JANUSGRAPH_HEALTH_CHECK = "janusgraphHealthCheck";
380 private void logAlarm() {
381 if (lastHealthState) {
382 BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphRecovery(JANUSGRAPH_HEALTH_CHECK);
384 BeEcompErrorManager.getInstance().logBeHealthCheckJanusGraphError(JANUSGRAPH_HEALTH_CHECK);
388 private void createJanusGraphSchema() {
390 JanusGraphManagement graphMgt = graph.openManagement();
391 JanusGraphIndex index = null;
392 for (GraphPropertiesDictionary prop : GraphPropertiesDictionary.values()) {
393 PropertyKey propKey = null;
394 if (!graphMgt.containsPropertyKey(prop.getProperty())) {
395 Class<?> clazz = prop.getClazz();
396 if (!clazz.isAssignableFrom(ArrayList.class) && !clazz.isAssignableFrom(HashMap.class)) {
397 propKey = graphMgt.makePropertyKey(prop.getProperty()).dataType(prop.getClazz()).make();
400 propKey = graphMgt.getPropertyKey(prop.getProperty());
402 if (prop.isIndexed()) {
403 if (!graphMgt.containsGraphIndex(prop.getProperty())) {
404 if (prop.isUnique()) {
405 index = graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).unique().buildCompositeIndex();
406 // Ensures only one name per vertex
407 graphMgt.setConsistency(propKey, ConsistencyModifier.LOCK);
408 // Ensures name uniqueness in the graph
409 graphMgt.setConsistency(index, ConsistencyModifier.LOCK);
412 graphMgt.buildIndex(prop.getProperty(), Vertex.class).addKey(propKey).buildCompositeIndex();