Merge "Watchdog-process that syncs 'ADVISED' CM Handles"
[cps.git] / cps-ri / src / main / java / org / onap / cps / spi / utils / SessionManager.java
index eb535ec..6f96cff 100644 (file)
@@ -1,6 +1,6 @@
 /*
  *  ============LICENSE_START=======================================================
- *  Copyright (C) 2021-2022 Nordix Foundation
+ *  Copyright (C) 2022 Nordix Foundation
  *  ================================================================================
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
 
 package org.onap.cps.spi.utils;
 
-import java.util.HashMap;
-import java.util.Map;
+import com.google.common.util.concurrent.TimeLimiter;
+import com.google.common.util.concurrent.UncheckedExecutionException;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.PostConstruct;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.hibernate.HibernateException;
+import org.hibernate.LockMode;
 import org.hibernate.Session;
-import org.hibernate.SessionException;
-import org.hibernate.SessionFactory;
-import org.hibernate.cfg.Configuration;
+import org.onap.cps.spi.config.CpsSessionFactory;
 import org.onap.cps.spi.entities.AnchorEntity;
 import org.onap.cps.spi.entities.DataspaceEntity;
-import org.onap.cps.spi.entities.SchemaSetEntity;
-import org.onap.cps.spi.entities.YangResourceEntity;
+import org.onap.cps.spi.exceptions.SessionManagerException;
+import org.onap.cps.spi.exceptions.SessionTimeoutException;
+import org.onap.cps.spi.repository.AnchorRepository;
+import org.onap.cps.spi.repository.DataspaceRepository;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
+@RequiredArgsConstructor
+@Slf4j
 @Component
+@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
 public class SessionManager {
 
-    private static SessionFactory sessionFactory;
-    private static Map<String, Session> sessionMap = new HashMap<>();
+    private final CpsSessionFactory cpsSessionFactory;
+    private final TimeLimiterProvider timeLimiterProvider;
+    private final DataspaceRepository dataspaceRepository;
+    private final AnchorRepository anchorRepository;
+    private final ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
+    public static final boolean WITH_COMMIT = true;
+    public static final boolean WITH_ROLLBACK = false;
 
-    private synchronized void buildSessionFactory() {
-        if (sessionFactory == null) {
-            sessionFactory = new Configuration().configure("hibernate.cfg.xml")
-                    .addAnnotatedClass(AnchorEntity.class)
-                    .addAnnotatedClass(DataspaceEntity.class)
-                    .addAnnotatedClass(SchemaSetEntity.class)
-                    .addAnnotatedClass(YangResourceEntity.class)
-                    .buildSessionFactory();
+    @PostConstruct
+    private void postConstruct() {
+        final Thread shutdownHook = new Thread(this::closeAllSessionsInShutdown);
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+    }
+
+    private void closeAllSessionsInShutdown() {
+        for (final String sessionId : sessionMap.keySet()) {
+            try {
+                closeSession(sessionId, WITH_ROLLBACK);
+                log.info("Session with session ID {} rolled back and closed", sessionId);
+            } catch (final Exception e) {
+                log.warn("Session with session ID {} failed to close", sessionId);
+            }
         }
+        cpsSessionFactory.closeSessionFactory();
     }
 
     /**
@@ -57,8 +85,7 @@ public class SessionManager {
      * @return Session ID string
      */
     public String startSession() {
-        buildSessionFactory();
-        final Session session = sessionFactory.openSession();
+        final Session session = cpsSessionFactory.openSession();
         final String sessionId = UUID.randomUUID().toString();
         sessionMap.put(sessionId, session);
         session.beginTransaction();
@@ -67,20 +94,88 @@ public class SessionManager {
 
     /**
      * Close session.
+     * Changes are committed when commit boolean is set to true.
+     * Rollback will execute when commit boolean is set to false.
      *
      * @param sessionId session ID
+     * @param commit indicator whether session will commit or rollback
      */
-    public void closeSession(final String sessionId) {
+    public void closeSession(final String sessionId, final boolean commit) {
         try {
-            final Session currentSession = sessionMap.get(sessionId);
-            currentSession.getTransaction().commit();
-            currentSession.close();
-        } catch (final NullPointerException e) {
-            throw new SessionException(String.format("Session with session ID %s does not exist", sessionId));
+            final Session session = getSession(sessionId);
+            if (commit) {
+                session.getTransaction().commit();
+            } else {
+                session.getTransaction().rollback();
+            }
+            session.close();
         } catch (final HibernateException e) {
-            throw new SessionException(String.format("Unable to close session with session ID %s", sessionId));
+            throw new SessionManagerException("Cannot close session",
+                String.format("Unable to close session with session ID '%s'", sessionId), e);
+        } finally {
+            sessionMap.remove(sessionId);
+        }
+    }
+
+    /**
+     * Lock Anchor.
+     * To release locks(s), the session holding the lock(s) must be closed.
+     *
+     * @param sessionId session ID
+     * @param dataspaceName dataspace name
+     * @param anchorName anchor name
+     * @param timeoutInMilliseconds lock attempt timeout in milliseconds
+     */
+    @SneakyThrows
+    public void lockAnchor(final String sessionId, final String dataspaceName,
+                           final String anchorName, final Long timeoutInMilliseconds) {
+        final ExecutorService executorService = Executors.newSingleThreadExecutor();
+        final TimeLimiter timeLimiter = timeLimiterProvider.getTimeLimiter(executorService);
+
+        try {
+            timeLimiter.callWithTimeout(() -> {
+                applyPessimisticWriteLockOnAnchor(sessionId, dataspaceName, anchorName);
+                return null;
+            }, timeoutInMilliseconds, TimeUnit.MILLISECONDS);
+        } catch (final TimeoutException e) {
+            throw new SessionTimeoutException(
+                    "Timeout: Anchor locking failed",
+                    "The error could be caused by another session holding a lock on the specified table. "
+                            + "Retrying the sending the request could be required.", e);
+        } catch (final InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new SessionManagerException("Operation interrupted", "This thread was interrupted.", e);
+        } catch (final ExecutionException | UncheckedExecutionException e) {
+            if (e.getCause() != null) {
+                throw e.getCause();
+            }
+            throw new SessionManagerException(
+                    "Operation Aborted",
+                    "The transaction request was aborted. "
+                            + "Retrying and checking all details are correct could be required", e);
+        } finally {
+            executorService.shutdownNow();
+        }
+    }
+
+    private void applyPessimisticWriteLockOnAnchor(final String sessionId, final String dataspaceName,
+                                                   final String anchorName) {
+        final Session session = getSession(sessionId);
+        final DataspaceEntity dataspaceEntity = dataspaceRepository.getByName(dataspaceName);
+        final AnchorEntity anchorEntity = anchorRepository.getByDataspaceAndName(dataspaceEntity, anchorName);
+        final int anchorId = anchorEntity.getId();
+        log.debug("Attempting to lock anchor {} for session {}", anchorName, sessionId);
+        session.get(AnchorEntity.class, anchorId, LockMode.PESSIMISTIC_WRITE);
+        log.info("Anchor {} successfully locked", anchorName);
+    }
+
+    private Session getSession(final String sessionId) {
+        final Session session = sessionMap.get(sessionId);
+        if (session == null) {
+            throw new SessionManagerException("Session not found",
+                String.format("Session with ID %s does not exist", sessionId));
         }
-        sessionMap.remove(sessionId);
+        return session;
     }
 
-}
\ No newline at end of file
+}