Implementation of distributed locking feature 91/37591/26
authorMagnusen, Drew (dm741q) <dm741q@att.com>
Wed, 21 Mar 2018 21:44:45 +0000 (16:44 -0500)
committerMagnusen, Drew (dm741q) <dm741q@att.com>
Tue, 3 Apr 2018 19:05:18 +0000 (14:05 -0500)
This feature is a very basic implementation of a distributed locking
system.

Issue-ID: POLICY-699
Change-Id: I012fd37926ccbbdd87a3e4acb2788b53680115f0
Signed-off-by: Magnusen, Drew (dm741q) <dm741q@att.com>
18 files changed:
feature-distributed-locking/pom.xml [new file with mode: 0644]
feature-distributed-locking/src/assembly/assemble_zip.xml [new file with mode: 0644]
feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties [new file with mode: 0644]
feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql [new file with mode: 0644]
feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql [new file with mode: 0644]
feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java [new file with mode: 0644]
feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java [new file with mode: 0644]
feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java [new file with mode: 0644]
feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java [new file with mode: 0644]
feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java [new file with mode: 0644]
feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI [new file with mode: 0644]
feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI [new file with mode: 0644]
feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java [new file with mode: 0644]
feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java [new file with mode: 0644]
feature-distributed-locking/src/test/resources/feature-distributed-locking.properties [new file with mode: 0644]
policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java [new file with mode: 0644]
policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java [new file with mode: 0644]
pom.xml

diff --git a/feature-distributed-locking/pom.xml b/feature-distributed-locking/pom.xml
new file mode 100644 (file)
index 0000000..c4beacc
--- /dev/null
@@ -0,0 +1,130 @@
+<!--
+  ============LICENSE_START=======================================================
+  ONAP Policy Engine - Drools PDP
+  ================================================================================
+  Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  ============LICENSE_END=========================================================
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         
+  <modelVersion>4.0.0</modelVersion>
+  
+  <parent>
+    <groupId>org.onap.policy.drools-pdp</groupId>
+    <artifactId>drools-pdp</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+  </parent>
+  
+  <artifactId>feature-distributed-locking</artifactId>
+  
+  <name>feature-distributed-locking</name>
+  <description>Loadable module that provides distributed locking capability</description>
+
+  <properties>
+          <maven.compiler.source>1.8</maven.compiler.source>
+          <maven.compiler.target>1.8</maven.compiler.target>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>2.6</version>
+        <executions>
+          <execution>
+            <id>zipfile</id>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <phase>package</phase>
+            <configuration>
+              <attach>true</attach>
+              <finalName>${project.artifactId}-${project.version}</finalName>
+              <descriptors>
+                <descriptor>src/assembly/assemble_zip.xml</descriptor>
+              </descriptors>
+              <appendAssemblyId>false</appendAssemblyId>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <phase>prepare-package</phase>
+            <configuration>
+              <transitive>false</transitive>
+              <outputDirectory>${project.build.directory}/assembly/lib</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>true</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <useRepositoryLayout>false</useRepositoryLayout>
+              <addParentPoms>false</addParentPoms>
+              <copyPom>false</copyPom>
+                         <includeScope>runtime</includeScope>
+                         <excludeTransitive>true</excludeTransitive>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+       <dependency>
+              <groupId>org.onap.policy.drools-pdp</groupId>
+              <artifactId>policy-core</artifactId>
+              <version>${project.version}</version>
+              <scope>provided</scope>
+       </dependency>
+       <dependency>
+               <groupId>org.onap.policy.drools-pdp</groupId>
+               <artifactId>policy-management</artifactId>
+               <version>${project.version}</version>
+               <scope>provided</scope>
+       </dependency>
+       <dependency>
+               <groupId>junit</groupId>
+               <artifactId>junit</artifactId>
+               <scope>test</scope>
+       </dependency>
+       <dependency>
+               <groupId>com.h2database</groupId>
+               <artifactId>h2</artifactId>
+               <version>[1.4.186,)</version>
+               <scope>test</scope>
+       </dependency>
+       <dependency>
+       <groupId>org.onap.policy.common</groupId>
+               <artifactId>utils</artifactId>
+               <version>${project.version}</version>
+    </dependency>
+    <dependency>
+       <groupId>org.onap.policy.common</groupId>
+               <artifactId>utils-test</artifactId>
+               <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/feature-distributed-locking/src/assembly/assemble_zip.xml b/feature-distributed-locking/src/assembly/assemble_zip.xml
new file mode 100644 (file)
index 0000000..2112fbc
--- /dev/null
@@ -0,0 +1,75 @@
+<!--
+  ============LICENSE_START=======================================================
+  feature-distributed-locking
+  ================================================================================
+  Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  ============LICENSE_END=========================================================
+  -->
+
+<!-- Defines how we build the .zip file which is our distribution. -->
+
+<assembly
+       xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+       <id>feature-distributed-locking</id>
+       <formats>
+               <format>zip</format>
+       </formats>
+
+       <includeBaseDirectory>false</includeBaseDirectory>
+
+       <fileSets>
+               <fileSet>
+                       <directory>target</directory>
+                       <outputDirectory>lib/feature</outputDirectory>
+                       <includes>
+                               <include>feature-distributed-locking-${project.version}.jar</include>
+                       </includes>
+               </fileSet>
+               <fileSet>
+                       <directory>target/assembly/lib</directory>
+                       <outputDirectory>lib/dependencies</outputDirectory>
+                       <includes>
+                               <include>*.jar</include>
+                       </includes>
+               </fileSet>
+               <fileSet>
+                       <directory>src/main/feature/config</directory>
+                       <outputDirectory>config</outputDirectory>
+                       <fileMode>0644</fileMode>
+                       <excludes/>
+               </fileSet>
+               <fileSet>
+                       <directory>src/main/feature/bin</directory>
+                       <outputDirectory>bin</outputDirectory>
+                       <fileMode>0744</fileMode>
+                       <excludes/>
+               </fileSet>
+               <fileSet>
+                       <directory>src/main/feature/db</directory>
+                       <outputDirectory>db</outputDirectory>
+                       <fileMode>0744</fileMode>
+                       <excludes/>
+               </fileSet>
+               <fileSet>
+                       <directory>src/main/feature/install</directory>
+                       <outputDirectory>install</outputDirectory>
+                       <fileMode>0744</fileMode>
+                       <excludes/>
+               </fileSet>
+       </fileSets>
+
+</assembly>
diff --git a/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties b/feature-distributed-locking/src/main/feature/config/feature-distributed-locking.properties
new file mode 100644 (file)
index 0000000..ee4aa47
--- /dev/null
@@ -0,0 +1,34 @@
+###
+# ============LICENSE_START=======================================================
+ # feature-distributed-locking
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+#Database properties
+#javax.persistence.jdbc.driver= org.mariadb.jdbc.Driver
+#javax.persistence.jdbc.url=jdbc:mariadb://${{SQL_HOST}}:3306/locks
+#javax.persistence.jdbc.user=${{SQL_USER}}
+#javax.persistence.jdbc.password=${{SQL_PASSWORD}}
+
+#This value is added to System.currentTimeMs to
+#set expirationTime when a lock is obtained. 
+#distributed.locking.lock.aging=1000
+
+#The frequency (in milliseconds) that the heartbeat
+#thread refreshes locks owned by the current host
+#distributed.locking.heartbeat.interval=5000
+
diff --git a/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.downgrade.sql
new file mode 100644 (file)
index 0000000..cd1b815
--- /dev/null
@@ -0,0 +1,20 @@
+#  ============LICENSE_START=======================================================
+#  feature-distributed-locking
+# ================================================================================
+#  Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+#  ================================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#  
+#       http://www.apache.org/licenses/LICENSE-2.0
+#  
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=========================================================
+
+use pooling;
+drop table if exists locks;
\ No newline at end of file
diff --git a/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql b/feature-distributed-locking/src/main/feature/db/pooling/sql/1804-distributedlocking.upgrade.sql
new file mode 100644 (file)
index 0000000..be56d35
--- /dev/null
@@ -0,0 +1,23 @@
+#  ============LICENSE_START=======================================================
+#  feature-distributed-locking
+# ================================================================================
+#  Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+#  ================================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#  
+#       http://www.apache.org/licenses/LICENSE-2.0
+#  
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=========================================================
+
+ set foreign_key_checks=0; 
+ CREATE TABLE if not exists pooling.locks (resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), expirationTime BIGINT, PRIMARY KEY (resourceId), INDEX idx_expirationTime(expirationTime), INDEX idx_host(host));
+
+ set foreign_key_checks=1;
\ No newline at end of file
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeature.java
new file mode 100644 (file)
index 0000000..cc7a7a1
--- /dev/null
@@ -0,0 +1,158 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.onap.policy.drools.core.lock.LockRequestFuture;
+import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI;
+import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
+import org.onap.policy.drools.persistence.SystemPersistence;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedLockingFeature implements PolicyEngineFeatureAPI, PolicyResourceLockFeatureAPI {
+       
+       /**
+        * Logger instance
+        */
+       private static final Logger logger = LoggerFactory.getLogger(DistributedLockingFeature.class);
+       
+       /**
+        * Properties Configuration Name
+        */
+       public static final String CONFIGURATION_PROPERTIES_NAME = "feature-distributed-locking";
+       
+       /**
+        * Properties for locking feature
+        */
+       private DistributedLockingProperties lockProps;
+       
+       /**
+        *ScheduledExecutorService for LockHeartbeat 
+        */
+       private ScheduledExecutorService scheduledExecutorService;
+       
+       /**
+        * UUID 
+        */
+       private static final UUID uuid = UUID.randomUUID();
+       
+       /**
+        * Config directory
+        */
+       @Override
+       public int getSequenceNumber() {
+        return 1000;
+       }
+       
+       @Override
+       public Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) {
+               
+               TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+               
+               return new LockRequestFuture(resourceId, owner, tLock.lock());
+                               
+       }
+
+       @Override
+       public Boolean beforeUnlock(String resourceId, String owner) {
+               TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+               
+               return tLock.unlock();
+       }
+       
+       @Override
+       public Boolean beforeIsLockedBy(String resourceId, String owner) {
+               TargetLock tLock = new TargetLock(resourceId, this.uuid, owner, lockProps);
+               
+               return tLock.isActive();
+       }
+       
+       @Override
+       public Boolean beforeIsLocked(String resourceId) {
+               TargetLock tLock = new TargetLock(resourceId, this.uuid, "dummyOwner", lockProps);
+               
+               return tLock.isLocked();
+       }
+       
+       @Override
+       public boolean afterStart(PolicyEngine engine) {
+
+               try {
+                       this.lockProps = new DistributedLockingProperties(SystemPersistence.manager.getProperties(DistributedLockingFeature.CONFIGURATION_PROPERTIES_NAME));
+               } catch (PropertyException e) {
+                       logger.error("DistributedLockingFeature feature properies have not been loaded", e);
+                       throw new DistributedLockingFeatureException(e);
+               }
+               
+               long heartbeatInterval = this.lockProps.getHeartBeatIntervalProperty();
+               
+               cleanLockTable();
+               Heartbeat heartbeat = new Heartbeat(this.uuid, lockProps);
+               
+               this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
+               this.scheduledExecutorService.scheduleAtFixedRate(heartbeat, heartbeatInterval, heartbeatInterval, TimeUnit.MILLISECONDS);
+               return false;
+       }
+       
+       /**
+        * This method kills the heartbeat thread and calls refreshLockTable which removes
+        * any records from the db where the current host is the owner.
+        */
+       @Override
+       public boolean beforeShutdown(PolicyEngine engine) {
+               scheduledExecutorService.shutdown();
+               cleanLockTable();
+               return false;
+       }
+
+       /**
+        * This method removes all records owned by the current host from the db.
+        */
+       private void cleanLockTable() {
+               
+           try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), 
+                       lockProps.getDbUser(),
+                       lockProps.getDbPwd());
+               PreparedStatement statement = conn.prepareStatement("DELETE FROM pooling.locks WHERE host = ? OR expirationTime < ?");
+               ){
+                       
+                               statement.setString(1, this.uuid.toString());
+                               statement.setLong(2, System.currentTimeMillis());
+                               statement.executeUpdate();
+                       
+               } catch (SQLException e) {
+                       logger.error("error in refreshLockTable()", e);
+               }
+               
+       }
+       
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingFeatureException.java
new file mode 100644 (file)
index 0000000..f28ccbc
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+public class DistributedLockingFeatureException extends RuntimeException {
+       private static final long serialVersionUID = 1L;
+
+       /**
+        * 
+        * @param e
+        *            exception to be wrapped
+        */
+       public DistributedLockingFeatureException(Exception e) {
+               super(e);
+       }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/DistributedLockingProperties.java
new file mode 100644 (file)
index 0000000..139bfb7
--- /dev/null
@@ -0,0 +1,127 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.util.Properties;
+
+import org.onap.policy.common.utils.properties.PropertyConfiguration;
+import org.onap.policy.common.utils.properties.exception.PropertyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DistributedLockingProperties extends PropertyConfiguration{
+       
+       private static final Logger  logger = LoggerFactory.getLogger(DistributedLockingProperties.class);
+       
+       /**
+     * Feature properties all begin with this prefix.
+     */
+    public static final String PREFIX = "distributed.locking.";
+    
+       public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+       public static final String DB_URL = "javax.persistence.jdbc.url";
+       public static final String DB_USER = "javax.persistence.jdbc.user";
+       public static final String DB_PWD = "javax.persistence.jdbc.password";
+       public static final String AGING_PROPERTY = PREFIX + "lock.aging";
+       public static final String HEARTBEAT_INTERVAL_PROPERTY = PREFIX + "heartbeat.interval";
+       
+       /**
+     * Properties from which this was constructed.
+     */
+    private Properties source;
+
+    /**
+     * Database driver
+     */
+    @Property(name = DB_DRIVER)
+    private String dbDriver;
+    
+    /**
+     * Database url
+     */
+    @Property(name = DB_URL)
+    private String dbUrl;
+    
+    /**
+     * Database user
+     */
+    @Property(name = DB_USER)
+    private String dbUser;
+    
+    /**
+     * Database password
+     */
+    @Property(name = DB_PWD)
+    private String dbPwd;
+    
+    /**
+     * Used to set expiration time for lock.
+     */
+    @Property(name = AGING_PROPERTY, defaultValue = "300000")
+    private long agingProperty;
+    
+    /**
+     * Indicates intervals at which we refresh locks.
+     */
+    @Property(name = HEARTBEAT_INTERVAL_PROPERTY, defaultValue = "60000")
+    private long heartBeatIntervalProperty;
+
+    public DistributedLockingProperties(Properties props) throws PropertyException {
+       super(props);
+       source = props;
+    }
+
+
+       public Properties getSource() {
+               return source;
+       }
+
+
+       public String getDbDriver() {
+               return dbDriver;
+       }
+
+
+       public String getDbUrl() {
+               return dbUrl;
+       }
+
+
+       public String getDbUser() {
+               return dbUser;
+       }
+
+
+       public String getDbPwd() {
+               return dbPwd;
+       }
+
+
+       public long getAgingProperty() {
+               return agingProperty;
+       }
+
+
+       public long getHeartBeatIntervalProperty() {
+               return heartBeatIntervalProperty;
+       }
+
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/Heartbeat.java
new file mode 100644 (file)
index 0000000..c753dba
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.onap.policy.drools.utils.NetworkUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * This runnable class scans the locks table for all locks owned by this host.
+ * It refreshes the expiration time of each lock using the locking.distributed.aging
+ * property
+ *
+ */
+public class Heartbeat implements Runnable{
+
+       private static final Logger logger = LoggerFactory.getLogger(Heartbeat.class);
+
+       /**
+        * Properties object containing properties needed by class
+        */
+       private DistributedLockingProperties lockProps;
+
+       /**
+        * UUID 
+        */
+       private UUID uuid;
+       
+       public Heartbeat(UUID uuid, DistributedLockingProperties lockProps) {
+               this.lockProps = lockProps;
+               this.uuid = uuid;
+       }
+       
+       @Override
+       public void run() {
+
+               long expirationAge = lockProps.getAgingProperty();
+
+               try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+                               lockProps.getDbPwd());
+                       PreparedStatement statement = conn
+                                               .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE host = ?");) {
+
+                       statement.setLong(1, System.currentTimeMillis() + expirationAge);
+                       statement.setString(2, this.uuid.toString());
+                       statement.executeUpdate();
+               } catch (SQLException e) {
+                       logger.error("error in Heartbeat.run()", e);
+               }
+
+       }
+}
diff --git a/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java b/feature-distributed-locking/src/main/java/org/onap/policy/distributed/locking/TargetLock.java
new file mode 100644 (file)
index 0000000..ceaa849
--- /dev/null
@@ -0,0 +1,233 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.distributed.locking;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TargetLock {
+       
+       private static final Logger logger = LoggerFactory.getLogger(TargetLock.class);
+       
+       /**
+        * The Target resource we want to lock
+        */
+       private String resourceId;
+       
+       /**
+        * Properties object containing properties needed by class
+        */
+       private DistributedLockingProperties lockProps;
+
+       /**
+        * UUID 
+        */
+       private UUID uuid;
+       
+       /**
+        * Owner
+        */
+       private String owner;
+       
+       /**
+        * Constructs a TargetLock object.
+        * 
+        * @param resourceId ID of the entity we want to lock
+        * @param lockProps Properties object containing properties needed by class
+        */
+       public TargetLock (String resourceId, UUID uuid, String owner, DistributedLockingProperties lockProps) {
+               this.resourceId = resourceId;
+               this.uuid = uuid;
+               this.owner = owner;
+               this.lockProps = lockProps;
+       }
+       
+       /**
+        * obtain a lock
+        */
+       public boolean lock() {
+               
+               return grabLock();
+       }
+       
+       /**
+        * Unlock a resource by deleting it's associated record in the db
+        */
+       public boolean unlock() {
+               return deleteLock();
+       }
+       
+       /**
+        * "Grabs" lock by attempting to insert a new record in the db.
+        *  If the insert fails due to duplicate key error resource is already locked
+        *  so we call secondGrab. 
+        */
+       private boolean grabLock() {
+
+               try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+                               lockProps.getDbPwd());
+
+                               // try to insert a record into the table(thereby grabbing the lock)
+                               PreparedStatement statement = conn
+                                               .prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) {
+                       statement.setString(1, this.resourceId);
+                       statement.setString(2, this.uuid.toString());
+                       statement.setString(3, this.owner);
+                       statement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty());
+
+                       statement.executeUpdate();
+               }  catch (SQLException e) {
+                       logger.error("error in TargetLock.grabLock()", e);
+                       return secondGrab();
+               }
+
+               return true;
+       }
+
+       /**
+        * A second attempt at grabbing a lock. It first attempts to update the lock in case it is expired.
+        * If that fails, it attempts to insert a new record again
+        */
+       private boolean secondGrab() {
+
+               try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+                               lockProps.getDbPwd());
+
+                               PreparedStatement updateStatement = conn.prepareStatement("UPDATE pooling.locks SET host = ?, owner = ?, expirationTime = ? WHERE expirationTime <= ? AND resourceId = ?");
+                               
+                               PreparedStatement insertStatement = conn.prepareStatement("INSERT INTO pooling.locks (resourceId, host, owner, expirationTime) values (?, ?, ?, ?)");) {
+
+                       updateStatement.setString(1, this.uuid.toString());
+                       updateStatement.setString(2, this.owner);
+                       updateStatement.setLong(3, System.currentTimeMillis() + lockProps.getAgingProperty());
+                       updateStatement.setLong(4, System.currentTimeMillis());
+                       updateStatement.setString(5, this.resourceId);
+
+                       // The lock was expired and we grabbed it.
+                       // return true
+                       if (updateStatement.executeUpdate() == 1) {
+                               return true;
+                       }
+                       // If our update does not return 1 row, the lock either has not expired
+                       // or it was removed. Try one last grab
+                       else {
+                               insertStatement.setString(1, this.resourceId);
+                               insertStatement.setString(2, this.uuid.toString());
+                               insertStatement.setString(3, this.owner);
+                               insertStatement.setLong(4, System.currentTimeMillis() + lockProps.getAgingProperty());
+
+                                       // If our insert returns 1 we successfully grabbed the lock
+                               return (insertStatement.executeUpdate() == 1);
+                       }
+
+               } catch (SQLException e) {
+                       logger.error("error in TargetLock.secondGrab()", e);
+                       return false;
+               }
+
+       }
+       
+       /**
+        *To remove a lock we simply delete the record from the db 
+        */
+       private boolean deleteLock() {
+
+               try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+                               lockProps.getDbPwd());
+
+                               PreparedStatement deleteStatement = conn
+                                               .prepareStatement("DELETE FROM pooling.locks WHERE resourceId = ? AND owner = ? AND host = ?");) {
+
+                       deleteStatement.setString(1, this.resourceId);
+                       deleteStatement.setString(2, this.owner);
+                       deleteStatement.setString(3, this.uuid.toString());
+                       
+                       return (deleteStatement.executeUpdate() == 1);
+
+               } catch (SQLException e) {
+                       logger.error("error in TargetLock.deleteLock()", e);
+                       return false;
+               }
+
+       }
+       
+       /**
+        * Is the lock active
+        */
+       public boolean isActive() {
+
+               try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+                               lockProps.getDbPwd());
+
+                               PreparedStatement selectStatement = conn
+                                               .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND host = ? AND owner= ? AND expirationTime >= ?");) {
+                       {
+                               selectStatement.setString(1, this.resourceId);
+                               selectStatement.setString(2, this.uuid.toString());
+                               selectStatement.setString(3, this.owner);
+                               selectStatement.setLong(4, System.currentTimeMillis());
+
+                               ResultSet result = selectStatement.executeQuery();
+
+                               // This will return true if the
+                               // query returned at least one row
+                               return result.first();
+
+                       }
+               } catch (SQLException e) {
+                       logger.error("error in TargetLock.isActive()", e);
+                       return false;
+               }
+       }
+
+       /**
+        * Is the resource locked
+        */
+       public boolean isLocked() {
+
+               try (Connection conn = DriverManager.getConnection(lockProps.getDbUrl(), lockProps.getDbUser(),
+                               lockProps.getDbPwd());
+
+                               PreparedStatement selectStatement = conn
+                                               .prepareStatement("SELECT * FROM pooling.locks WHERE resourceId = ? AND expirationTime >= ?");) {
+                       {
+                               selectStatement.setString(1, this.resourceId);
+                               selectStatement.setLong(2, System.currentTimeMillis());
+                               ResultSet result = selectStatement.executeQuery();
+
+                               // This will return true if the
+                               // query returned at least one row
+                               return result.first();
+
+                       }
+               } catch (SQLException e) {
+                       logger.error("error in TargetLock.isActive()", e);
+                       return false;
+               }
+       }
+
+}
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI
new file mode 100644 (file)
index 0000000..19bdf50
--- /dev/null
@@ -0,0 +1 @@
+org.onap.policy.distributed.locking.DistributedLockingFeature
\ No newline at end of file
diff --git a/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI b/feature-distributed-locking/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
new file mode 100644 (file)
index 0000000..19bdf50
--- /dev/null
@@ -0,0 +1 @@
+org.onap.policy.distributed.locking.DistributedLockingFeature
\ No newline at end of file
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/DistributedLockingFeatureExceptionTest.java
new file mode 100644 (file)
index 0000000..ea53e52
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+import org.onap.policy.common.utils.test.ExceptionsTester;
+import org.onap.policy.distributed.locking.DistributedLockingFeatureException;
+
+public class DistributedLockingFeatureExceptionTest extends ExceptionsTester{
+       
+       @Test
+       public void test() {
+               assertEquals(1, test(DistributedLockingFeatureException.class)); 
+       }
+
+}
diff --git a/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java b/feature-distributed-locking/src/test/java/org/onap/policy/distributed/locking/test/TargetLockTest.java
new file mode 100644 (file)
index 0000000..e624afb
--- /dev/null
@@ -0,0 +1,220 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-distributed-locking
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.distributed.locking.test;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.distributed.locking.DistributedLockingFeature;
+import org.onap.policy.drools.persistence.SystemPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class TargetLockTest {
+       private static final Logger logger = LoggerFactory.getLogger(TargetLockTest.class);
+       private static final String DB_CONNECTION = "jdbc:h2:mem:pooling;INIT=CREATE SCHEMA IF NOT EXISTS pooling\\;SET SCHEMA pooling";
+       private static final String DB_USER = "user";
+       private static final String DB_PASSWORD = "password";
+       private static Connection conn = null;
+       private static DistributedLockingFeature distLockFeat;
+
+       @BeforeClass
+       public static void setup() {
+               getDBConnection();
+               createTable();
+               SystemPersistence.manager.setConfigurationDir("src/test/resources");
+               distLockFeat = new DistributedLockingFeature();
+               distLockFeat.afterStart(null);
+       
+       }
+       
+       @AfterClass
+       public static void cleanUp() {
+               distLockFeat.beforeShutdown(null);
+               try {
+                       conn.close();
+               } catch (SQLException e) {
+                       logger.error("Error in TargetLockTest.cleanUp()", e);
+               }
+       }
+       
+       @Before
+       public void wipeDb() {
+               
+               try (PreparedStatement lockDelete = conn.prepareStatement("DELETE FROM pooling.locks");){
+                       lockDelete.executeUpdate();
+               } catch (SQLException e) {
+                       logger.error("Error in TargetLockTest.wipeDb()", e);
+                       throw new RuntimeException(e);
+               }
+
+       }
+       
+       @Test
+       public void testGrabLockSuccess() throws InterruptedException, ExecutionException {
+               assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get());
+       
+                       //attempt to grab expiredLock
+               try (PreparedStatement updateStatement = conn.prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE resourceId = ?");)
+               {
+                       updateStatement.setLong(1, System.currentTimeMillis() - 1000);
+                       updateStatement.setString(2, "resource1");
+                       updateStatement.executeUpdate();
+                               
+               } catch (SQLException e) {
+                       logger.error("Error in TargetLockTest.testGrabLockSuccess()", e);
+                       throw new RuntimeException(e);
+               }
+               
+               assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get());
+       }
+
+       @Test
+       public void testExpiredLocks() throws InterruptedException, ExecutionException {
+               CountDownLatch latch = new CountDownLatch(1);
+               
+               distLockFeat.beforeLock("resource1", "owner1", null);
+               
+               try {
+                       latch.await(1000, TimeUnit.MILLISECONDS);
+               } catch (InterruptedException e) {
+                       logger.error("Error in testExpiredLocks", e);
+               }
+               
+                       //Heartbeat should keep it active
+               assertFalse(distLockFeat.beforeLock("resource1", "owner1", null).get());
+       }
+       
+       @Test
+       public void testGrabLockFail() throws InterruptedException, ExecutionException {
+               CountDownLatch latch = new CountDownLatch(1);
+               
+               distLockFeat.beforeLock("resource1", "owner1", null);
+               
+               try {
+                       latch.await(10, TimeUnit.MILLISECONDS);
+               } catch (InterruptedException e) {
+                       logger.error("Error in testExpiredLocks", e);
+               }
+               assertFalse(distLockFeat.beforeLock("resource1", "owner1", null).get());
+
+       }
+       
+       
+       @Test
+       public void testUnlock() throws InterruptedException, ExecutionException {
+               distLockFeat.beforeLock("resource1", "owner1", null);
+
+               assertTrue(distLockFeat.beforeUnlock("resource1", "owner1"));
+               assertTrue(distLockFeat.beforeLock("resource1", "owner1", null).get());
+       }
+       
+       @Test
+       public void testIsActive() {
+               assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1"));
+               distLockFeat.beforeLock("resource1", "owner1", null);
+               assertTrue(distLockFeat.beforeIsLockedBy("resource1", "owner1"));
+               assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner2"));
+
+               // isActive on expiredLock
+               try (PreparedStatement updateStatement = conn
+                               .prepareStatement("UPDATE pooling.locks SET expirationTime = ? WHERE resourceId = ?");) {
+                       updateStatement.setLong(1, System.currentTimeMillis() - 5000);
+                       updateStatement.setString(2, "resource1");
+                       updateStatement.executeUpdate();
+
+               } catch (SQLException e) {
+                       logger.error("Error in TargetLockTest.testIsActive()", e);
+                       throw new RuntimeException(e);
+               }
+
+               assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1"));
+
+               distLockFeat.beforeLock("resource1", "owner1", null);
+                       //Unlock record, next isActive attempt should fail
+               distLockFeat.beforeUnlock("resource1", "owner1");
+               assertFalse(distLockFeat.beforeIsLockedBy("resource1", "owner1"));
+               
+       }
+       
+       @Test
+       public void testHeartbeat() {
+               CountDownLatch latch = new CountDownLatch(1);
+               
+               distLockFeat.beforeLock("resource1", "owner1", null);
+               try {
+                       latch.await(1000, TimeUnit.MILLISECONDS);
+               } catch (InterruptedException e) {
+                       logger.error("Error in testExpiredLocks", e);
+               }
+               
+                       // This test always returns true.
+               assertTrue(distLockFeat.beforeIsLocked("resource1"));
+       }
+       
+       @Test
+       public void unlockBeforeLock() {
+               assertFalse(distLockFeat.beforeUnlock("resource1", "owner1"));
+               distLockFeat.beforeLock("resource1", "owner1", null);
+               assertTrue(distLockFeat.beforeUnlock("resource1", "owner1"));
+               assertFalse(distLockFeat.beforeUnlock("resource1", "owner1"));
+       }
+       
+       @Test
+       public void testIsLocked() {
+               assertFalse(distLockFeat.beforeIsLocked("resource1"));
+               distLockFeat.beforeLock("resource1", "owner1", null);
+               assertTrue(distLockFeat.beforeIsLocked("resource1"));
+       
+       }
+       
+       private static void getDBConnection() {
+               try {
+                       conn = DriverManager.getConnection(DB_CONNECTION, DB_USER, DB_PASSWORD);
+               } catch (SQLException e) {
+                       logger.error("Error in TargetLockTest.getDBConnection()", e);
+               }
+       }
+
+       private static void createTable() {
+               String createString = "create table if not exists pooling.locks (resourceId VARCHAR(128), host VARCHAR(128), owner VARCHAR(128), expirationTime BIGINT, PRIMARY KEY (resourceId))";
+               try (PreparedStatement createStmt = conn.prepareStatement(createString);) {
+                       createStmt.executeUpdate();
+                       
+               } catch (SQLException e) {
+                       logger.error("Error in TargetLockTest.createTable()", e);
+               }
+       }       
+        
+
+}
diff --git a/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties b/feature-distributed-locking/src/test/resources/feature-distributed-locking.properties
new file mode 100644 (file)
index 0000000..d1a07e8
--- /dev/null
@@ -0,0 +1,26 @@
+###
+# ============LICENSE_START=======================================================
+ # feature-distributed-locking
+# ================================================================================
+# Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# 
+#      http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+javax.persistence.jdbc.driver=org.h2.Driver
+javax.persistence.jdbc.url=jdbc:h2:mem:pooling
+javax.persistence.jdbc.user=user
+javax.persistence.jdbc.password=password
+distributed.locking.lock.aging=150
+distributed.locking.heartbeat.interval=500
\ No newline at end of file
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/LockRequestFuture.java
new file mode 100644 (file)
index 0000000..46d1ff2
--- /dev/null
@@ -0,0 +1,261 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.onap.policy.drools.core.lock.PolicyResourceLockFeatureAPI.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Future associated with a lock request.
+ */
+public class LockRequestFuture implements Future<Boolean> {
+
+    // messages used in exceptions
+    public static final String MSG_NULL_RESOURCE_ID = "null resourceId";
+    public static final String MSG_NULL_OWNER = "null owner";
+
+    private static Logger logger = LoggerFactory.getLogger(LockRequestFuture.class);
+
+    /**
+     * The resource on which the lock was requested.
+     */
+    private final String resourceId;
+
+    /**
+     * The owner for which the lock was requested.
+     */
+    private final String owner;
+
+    /**
+     * Possible states for this future.
+     */
+    private enum State {
+        WAITING, CANCELLED, ACQUIRED, DENIED
+    };
+
+    private AtomicReference<State> state;
+
+    /**
+     * Used to wait for the lock request to complete.
+     */
+    private CountDownLatch waiter = new CountDownLatch(1);
+
+    /**
+     * Callback to invoke once the lock is acquired (or denied). This is set to
+     * {@code null} once the callback has been invoked.
+     */
+    private final AtomicReference<Callback> callback;
+
+    /**
+     * Constructs a future that has already been completed.
+     * 
+     * @param resourceId
+     * @param owner owner for which the lock was requested
+     * @param locked {@code true} if the lock has been acquired, {@code false} if the lock
+     *        request has been denied
+     * @throws IllegalArgumentException if any of the arguments are {@code null}
+     */
+    public LockRequestFuture(String resourceId, String owner, boolean locked) {
+        if (resourceId == null) {
+            throw makeNullArgException(MSG_NULL_RESOURCE_ID);
+        }
+
+        if (owner == null) {
+            throw makeNullArgException(MSG_NULL_OWNER);
+        }
+
+        this.resourceId = resourceId;
+        this.owner = owner;
+        this.callback = new AtomicReference<>(null);
+        this.state = new AtomicReference<>(locked ? State.ACQUIRED : State.DENIED);
+
+        // indicate that it's already done
+        this.waiter.countDown();
+    }
+
+    /**
+     * Constructs a future that has not yet been completed.
+     * 
+     * @param resourceId
+     * @param owner owner for which the lock was requested
+     * @param callback item to be wrapped
+     * @throws IllegalArgumentException if the resourceId or owner is {@code null}
+     */
+    public LockRequestFuture(String resourceId, String owner, Callback callback) {
+        if (resourceId == null) {
+            throw makeNullArgException(MSG_NULL_RESOURCE_ID);
+        }
+
+        if (owner == null) {
+            throw makeNullArgException(MSG_NULL_OWNER);
+        }
+
+        this.resourceId = resourceId;
+        this.owner = owner;
+        this.callback = new AtomicReference<>(callback);
+        this.state = new AtomicReference<>(State.WAITING);
+    }
+
+    public String getResourceId() {
+        return resourceId;
+    }
+
+    public String getOwner() {
+        return owner;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        boolean cancelled = state.compareAndSet(State.WAITING, State.CANCELLED);
+
+        if (cancelled) {
+            logger.info("resource {} owner {} cancelled lock request", resourceId, owner);
+            waiter.countDown();
+        }
+
+        return cancelled;
+    }
+
+    /**
+     * Indicates that the lock has been acquired or denied.
+     * 
+     * @param locked {@code true} if the lock has been acquired, {@code false} if the lock
+     *        request has been denied
+     * 
+     * @return {@code true} if it was not already completed, {@code false} otherwise
+     */
+    protected boolean setLocked(boolean locked) {
+        State newState = (locked ? State.ACQUIRED : State.DENIED);
+        if (state.compareAndSet(State.WAITING, newState)) {
+            waiter.countDown();
+            return true;
+
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return (state.get() == State.CANCELLED);
+    }
+
+    @Override
+    public boolean isDone() {
+        return (state.get() != State.WAITING);
+    }
+
+    /**
+     * Gets the current status of the lock.
+     * 
+     * @return {@code true} if the lock has been acquired, {@code false} otherwise
+     */
+    public boolean isLocked() {
+        return (state.get() == State.ACQUIRED);
+    }
+
+    /**
+     * @return {@code true} if the lock was acquired, {@code false} if it was denied
+     */
+    @Override
+    public Boolean get() throws CancellationException, InterruptedException {
+        waiter.await();
+
+        switch (state.get()) {
+            case CANCELLED:
+                throw new CancellationException("lock request was cancelled");
+            case ACQUIRED:
+                return true;
+            default:
+                // should only be DENIED at this point
+                return false;
+        }
+    }
+
+    /**
+     * @return {@code true} if the lock was acquired, {@code false} if it was denied
+     */
+    @Override
+    public Boolean get(long timeout, TimeUnit unit)
+                    throws CancellationException, InterruptedException, TimeoutException {
+
+        if (!waiter.await(timeout, unit)) {
+            throw new TimeoutException("lock request did not complete in time");
+        }
+
+        return get();
+    }
+
+    /**
+     * Invokes the callback, indicating whether or not the lock was acquired.
+     * 
+     * @throws IllegalStateException if the request was previously cancelled, has not yet
+     *         completed, or if the callback has already been invoked
+     */
+    protected void invokeCallback() {
+        boolean locked;
+
+        switch (state.get()) {
+            case ACQUIRED:
+                locked = true;
+                break;
+            case DENIED:
+                locked = false;
+                break;
+            case CANCELLED:
+                throw new IllegalStateException("cancelled lock request callback");
+            default:
+                // only other choice is WAITING
+                throw new IllegalStateException("incomplete lock request callback");
+        }
+
+        Callback cb = callback.get();
+        if (cb == null || !callback.compareAndSet(cb, null)) {
+            throw new IllegalStateException("already invoked lock request callback");
+        }
+
+
+        // notify the callback
+        try {
+            cb.set(locked);
+
+        } catch (RuntimeException e) {
+            logger.info("lock request callback for resource {} owner {} threw an exception", resourceId, owner, e);
+        }
+    }
+
+    /**
+     * Makes an exception for when an argument is {@code null}.
+     * 
+     * @param msg exception message
+     * @return a new Exception
+     */
+    public static IllegalArgumentException makeNullArgException(String msg) {
+        return new IllegalArgumentException(msg);
+    }
+}
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java b/policy-core/src/main/java/org/onap/policy/drools/core/lock/PolicyResourceLockFeatureAPI.java
new file mode 100644 (file)
index 0000000..718ed5e
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * ============LICENSE_START=======================================================
+ * api-resource-locks
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.core.lock;
+
+import java.util.concurrent.Future;
+import org.onap.policy.drools.utils.OrderedService;
+import org.onap.policy.drools.utils.OrderedServiceImpl;
+
+/**
+ * Resource locks. Each lock has an "owner", which is intended to be unique across a
+ * single instance of a running PolicyEngine.
+ * <p>
+ * This interface provides a way to invoke optional features at various points in the
+ * code. At appropriate points in the application, the code iterates through this list,
+ * invoking these optional methods.
+ * <p>
+ * Implementers may choose to implement a level of locking appropriate to the application.
+ * For instance, they may choose to implement an engine-wide locking scheme, or they may
+ * choose to implement a global locking scheme (e.g., through a shared DB).
+ */
+public interface PolicyResourceLockFeatureAPI extends OrderedService {
+
+    /**
+     * 'FeatureAPI.impl.getList()' returns an ordered list of objects implementing the
+     * 'FeatureAPI' interface.
+     */
+    public static OrderedServiceImpl<PolicyResourceLockFeatureAPI> impl =
+                    new OrderedServiceImpl<>(PolicyResourceLockFeatureAPI.class);
+
+    /**
+     * Callback that an implementer invokes when a lock is acquired (or denied),
+     * asynchronously. The implementer invokes the method to indicate that the lock was
+     * acquired (or denied).
+     */
+    @FunctionalInterface
+    public static interface Callback {
+
+        /**
+         * 
+         * @param locked {@code true} if the lock was acquired, {@code false} if the lock
+         *        was denied
+         */
+        public void set(boolean locked);
+    }
+
+    /**
+     * This method is called before a lock is acquired on a resource. If a callback is
+     * provided, and the implementer is unable to acquire the lock immediately, then the
+     * implementer will invoke the callback once the lock is acquired. If the implementer
+     * handled the request, then it will return a future, which may be in one of three
+     * states:
+     * <dl>
+     * <dt>isDone()=true and get()=true</dt>
+     * <dd>the lock has been acquired; the callback may or may not have been invoked</dd>
+     * <dt>isDone()=true and get()=false</dt>
+     * <dd>the lock request has been denied; the callback may or may not have been
+     * invoked</dd>
+     * <dt>isDone()=false</dt>
+     * <dd>the lock was not immediately available and a callback was provided. The
+     * callback will be invoked once the lock is acquired (or denied). In this case, the
+     * future may be used to cancel the request</dd>
+     * </dl>
+     * 
+     * @param resourceId
+     * @param owner
+     * @param callback function to invoke, if the requester wishes to wait for the lock to
+     *        come available, {@code null} to provide immediate replies
+     * @return a future for the lock, if the implementer handled the request, {@code null}
+     *         if additional locking logic should be performed
+     * @throws IllegalStateException if the owner already holds the lock or is already in
+     *         the queue to get the lock
+     */
+    public default Future<Boolean> beforeLock(String resourceId, String owner, Callback callback) {
+        return null;
+    }
+
+    /**
+     * This method is called after a lock for a resource has been acquired or denied. This
+     * may be invoked immediately, if the status can be determined immediately, or it may
+     * be invoked asynchronously, once the status has been determined.
+     * 
+     * @param resourceId
+     * @param owner
+     * @param locked {@code true} if the lock was acquired, {@code false} if it was denied
+     * @return {@code true} if the implementer handled the request, {@code false}
+     *         otherwise
+     */
+    public default boolean afterLock(String resourceId, String owner, boolean locked) {
+        return false;
+    }
+
+    /**
+     * This method is called before a lock on a resource is released.
+     * 
+     * @param resourceId
+     * @param owner
+     *        <dt>true</dt>
+     *        <dd>the implementer handled the request and found the resource to be locked
+     *        by the given owner; the resource was unlocked and no additional locking
+     *        logic should be performed</dd>
+     *        <dt>false</dt>
+     *        <dd>the implementer handled the request and found the resource was not
+     *        locked by given the owner; no additional locking logic should be
+     *        performed</dd>
+     *        <dt>null</dt>
+     *        <dd>the implementer did not handle the request; additional locking logic
+     *        <i>should be</i> performed
+     *        </dl>
+     */
+    public default Boolean beforeUnlock(String resourceId, String owner) {
+        return null;
+    }
+
+    /**
+     * This method is called after a lock on a resource is released.
+     * 
+     * @param resourceId
+     * @param owner
+     * @param unlocked {@code true} if the lock was released, {@code false} if the owner
+     *        did not have a lock on the resource
+     * @return {@code true} if the implementer handled the request, {@code false}
+     *         otherwise
+     */
+    public default boolean afterUnlock(String resourceId, String owner, boolean unlocked) {
+        return false;
+    }
+
+    /**
+     * This method is called before a check is made to determine if a resource is locked.
+     * 
+     * @param resourceId
+     * @return
+     *         <dl>
+     *         <dt>true</dt>
+     *         <dd>the implementer handled the request and found the resource to be
+     *         locked; no additional locking logic should be performed</dd>
+     *         <dt>false</dt>
+     *         <dd>the implementer handled the request and found the resource was not
+     *         locked; no additional locking logic should be performed</dd>
+     *         <dt>null</dt>
+     *         <dd>the implementer did not handle the request; additional locking logic
+     *         <i>should be</i> performed
+     *         </dl>
+     */
+    public default Boolean beforeIsLocked(String resourceId) {
+        return null;
+    }
+
+    /**
+     * This method is called before a check is made to determine if a particular owner
+     * holds the lock on a resource.
+     * 
+     * @param resourceId
+     * @param owner
+     * @return
+     *         <dl>
+     *         <dt>true</dt>
+     *         <dd>the implementer handled the request and found the resource to be locked
+     *         by the given owner; no additional locking logic should be performed</dd>
+     *         <dt>false</dt>
+     *         <dd>the implementer handled the request and found the resource was not
+     *         locked by given the owner; no additional locking logic should be
+     *         performed</dd>
+     *         <dt>null</dt>
+     *         <dd>the implementer did not handle the request; additional locking logic
+     *         <i>should be</i> performed
+     *         </dl>
+     */
+    public default Boolean beforeIsLockedBy(String resourceId, String owner) {
+        return null;
+    }
+}
diff --git a/pom.xml b/pom.xml
index 8c7bfda..9fd43aa 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
                <module>api-active-standby-management</module>
                <module>feature-active-standby-management</module>
                <module>feature-simulators</module>
+               <module>feature-distributed-locking</module>
                <module>packages</module>
        </modules>