DMAAP-MR - Merge MR repos
[dmaap/messagerouter/messageservice.git] / src / main / java / org / onap / dmaap / dmf / mr / backends / kafka / KafkaLiveLockAvoider2.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.dmf.mr.backends.kafka;
23
24
25 import com.att.eelf.configuration.EELFLogger;
26 import com.att.eelf.configuration.EELFManager;
27 import org.apache.curator.framework.CuratorFramework;
28 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
29 import org.apache.zookeeper.CreateMode;
30 import org.apache.zookeeper.Watcher;
31 import org.springframework.beans.factory.annotation.Autowired;
32 import org.springframework.beans.factory.annotation.Qualifier;
33 import org.springframework.stereotype.Component;
34
35 import javax.annotation.PostConstruct;
36 import java.util.List;
37 import java.util.concurrent.TimeUnit;
38
39 //@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka")
40 @Component
41 public class KafkaLiveLockAvoider2 {
42         
43         public static final String ZNODE_ROOT = "/live-lock-avoid";
44         public static final String ZNODE_LOCKS = "/locks";
45         public static final String ZNODE_UNSTICK_TASKS ="/unstick-tasks";
46         
47         private static String locksPath = ZNODE_ROOT+ZNODE_LOCKS;
48         private static String tasksPath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS;
49         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaLiveLockAvoider2.class.getName());
50         
51         @Autowired
52         @Qualifier("curator")   
53         private CuratorFramework curatorFramework;
54         
55    @PostConstruct
56         public void init() {
57          log.info("Welcome......................................................................................");
58         try {
59                 if (curatorFramework.checkExists().forPath(locksPath) == null) {
60                         curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath);
61                 }
62                 if (curatorFramework.checkExists().forPath(tasksPath) == null) {
63                         curatorFramework.create().creatingParentsIfNeeded().forPath(tasksPath);
64                 }
65                 
66         } catch (Exception e) {
67                 
68                 log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e);
69                 
70         }
71         
72                 
73         }
74         public void unlockConsumerGroup(String appId, String groupName) throws Exception {
75                 
76                 log.info("Signalling unlock to all conumsers of in group [{}] now, " ,  groupName);
77                 
78                 String fullLockPath = String.format("%s/%s", locksPath, groupName );
79                 String fullTasksPath = null;
80                 
81                 try {
82
83                         //Use the Curator recipe for a Mutex lock, only one process can be broadcasting unlock instructions for a group
84                         InterProcessMutex lock = new InterProcessMutex(curatorFramework, fullLockPath);
85                         if ( lock.acquire(100L, TimeUnit.MILLISECONDS) ) 
86                         {
87                                 try 
88                                 {
89                                         List<String> taskNodes = curatorFramework.getChildren().forPath(tasksPath);
90                                         for (String taskNodeName : taskNodes) {
91                                                 if(!taskNodeName.equals(appId)) {
92                                                         
93                                                         fullTasksPath = String.format("%s/%s/%s", tasksPath, taskNodeName, groupName);
94                                                         log.info("Writing groupName {} to path {}",groupName, fullTasksPath);
95                                                         
96                                                         
97                                                         if(curatorFramework.checkExists().forPath(fullTasksPath) != null) {
98                                                                 curatorFramework.delete().forPath(fullTasksPath);
99                                                         }
100                                                         curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(fullTasksPath);
101                                                 }
102                                         }
103                                         
104
105                                 }
106                                 finally
107                                 {
108                                         //Curator lock recipe requires a acquire() to be followed by a release()
109                                         lock.release();
110                                 }
111                         }else {
112                                 log.info("Could not obtain the avoider lock, another process has the avoider lock? {}", !lock.isAcquiredInThisProcess() );
113                         }
114
115
116                 } catch (Exception e) {
117                         log.error("Error setting up either lock ZNode {} or task  ZNode {}",fullLockPath, fullTasksPath,e);
118                         throw e;
119                 }
120                 
121                 
122         }
123         
124         /*
125          * Shoud be called once per MR server instance.
126          * 
127          */
128         public void startNewWatcherForServer(String appId, LiveLockAvoidance avoidanceCallback) {
129                 LockInstructionWatcher instructionWatcher = new LockInstructionWatcher(curatorFramework,avoidanceCallback,this);
130                 assignNewProcessNode(appId, instructionWatcher);
131                 
132         }
133         
134         
135         protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) {
136                 
137                 String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId;
138                 
139                 
140                 try {
141                         
142                         if(curatorFramework.checkExists().forPath(taskHolderZnodePath) != null) {
143                                 curatorFramework.delete().deletingChildrenIfNeeded().forPath(taskHolderZnodePath);
144
145                         }
146                         curatorFramework.create().forPath(taskHolderZnodePath);
147                         //setup the watcher
148                         curatorFramework.getChildren().usingWatcher(processNodeWatcher).inBackground().forPath(taskHolderZnodePath);
149                         log.info("Done creating task holder and watcher for APP name: {}",appId);
150                         
151                 } catch (Exception e) {
152                         log.error("Could not add new processing node for name {}", appId, e);
153                 }
154                                 
155         }
156
157         
158 }