f521b41a226a5215b5d984086870a56efeafb3db
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / backends / kafka / KafkaLiveLockAvoider2.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.backends.kafka;
23
24
25 import java.util.List;
26 import java.util.concurrent.TimeUnit;
27
28 import javax.annotation.PostConstruct;
29
30 import org.apache.curator.framework.CuratorFramework;
31 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
32 import org.apache.zookeeper.CreateMode;
33 import org.apache.zookeeper.Watcher;
34 import com.att.eelf.configuration.EELFLogger;
35 import com.att.eelf.configuration.EELFManager;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.beans.factory.annotation.Qualifier;
38 import org.springframework.stereotype.Component;
39
40 //@ComponentScan(basePackages="com.att.dmf.mr.backends.kafka")
41 @Component
42 public class KafkaLiveLockAvoider2 {
43         
44         public static final String ZNODE_ROOT = "/live-lock-avoid";
45         public static final String ZNODE_LOCKS = "/locks";
46         public static final String ZNODE_UNSTICK_TASKS ="/unstick-tasks";
47         
48         private static String locksPath = ZNODE_ROOT+ZNODE_LOCKS;
49         private static String tasksPath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS;
50         private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaLiveLockAvoider2.class.getName());
51         
52         @Autowired
53         @Qualifier("curator")   
54         private CuratorFramework curatorFramework;
55         
56    @PostConstruct
57         public void init() {
58          log.info("Welcome......................................................................................");
59         try {
60                 if (curatorFramework.checkExists().forPath(locksPath) == null) {
61                         curatorFramework.create().creatingParentsIfNeeded().forPath(locksPath);
62                 }
63                 if (curatorFramework.checkExists().forPath(tasksPath) == null) {
64                         curatorFramework.create().creatingParentsIfNeeded().forPath(tasksPath);
65                 }
66                 
67         } catch (Exception e) {
68                 
69                 log.error("Error during creation of permanent Znodes under /live-lock-avoid ",e);
70                 
71         }
72         
73                 
74         }
75         public void unlockConsumerGroup(String appId, String groupName) throws Exception {
76                 
77                 log.info("Signalling unlock to all conumsers of in group [{}] now, " ,  groupName);
78                 
79                 String fullLockPath = String.format("%s/%s", locksPath, groupName );
80                 String fullTasksPath = null;
81                 
82                 try {
83
84                         //Use the Curator recipe for a Mutex lock, only one process can be broadcasting unlock instructions for a group
85                         InterProcessMutex lock = new InterProcessMutex(curatorFramework, fullLockPath);
86                         if ( lock.acquire(100L, TimeUnit.MILLISECONDS) ) 
87                         {
88                                 try 
89                                 {
90                                         List<String> taskNodes = curatorFramework.getChildren().forPath(tasksPath);
91                                         for (String taskNodeName : taskNodes) {
92                                                 if(!taskNodeName.equals(appId)) {
93                                                         
94                                                         fullTasksPath = String.format("%s/%s/%s", tasksPath, taskNodeName, groupName);
95                                                         log.info("Writing groupName {} to path {}",groupName, fullTasksPath);
96                                                         
97                                                         
98                                                         if(curatorFramework.checkExists().forPath(fullTasksPath) != null) {
99                                                                 curatorFramework.delete().forPath(fullTasksPath);
100                                                         }
101                                                         curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(fullTasksPath);
102                                                 }
103                                         }
104                                         
105
106                                 }
107                                 finally
108                                 {
109                                         //Curator lock recipe requires a acquire() to be followed by a release()
110                                         lock.release();
111                                 }
112                         }else {
113                                 log.info("Could not obtain the avoider lock, another process has the avoider lock? {}", !lock.isAcquiredInThisProcess() );
114                         }
115
116
117                 } catch (Exception e) {
118                         log.error("Error setting up either lock ZNode {} or task  ZNode {}",fullLockPath, fullTasksPath,e);
119                         throw e;
120                 }
121                 
122                 
123         }
124         
125         /*
126          * Shoud be called once per MR server instance.
127          * 
128          */
129         public void startNewWatcherForServer(String appId, LiveLockAvoidance avoidanceCallback) {
130                 LockInstructionWatcher instructionWatcher = new LockInstructionWatcher(curatorFramework,avoidanceCallback,this);
131                 assignNewProcessNode(appId, instructionWatcher);
132                 
133         }
134         
135         
136         protected void assignNewProcessNode(String appId, Watcher processNodeWatcher ) {
137                 
138                 String taskHolderZnodePath = ZNODE_ROOT+ZNODE_UNSTICK_TASKS+"/"+appId;
139                 
140                 
141                 try {
142                         
143                         if(curatorFramework.checkExists().forPath(taskHolderZnodePath) != null) {
144                                 curatorFramework.delete().deletingChildrenIfNeeded().forPath(taskHolderZnodePath);
145
146                         }
147                         curatorFramework.create().forPath(taskHolderZnodePath);
148                         //setup the watcher
149                         curatorFramework.getChildren().usingWatcher(processNodeWatcher).inBackground().forPath(taskHolderZnodePath);
150                         log.info("Done creating task holder and watcher for APP name: {}",appId);
151                         
152                 } catch (Exception e) {
153                         log.error("Could not add new processing node for name {}", appId, e);
154                 }
155                                 
156         }
157
158         
159 }