[DMAAP-BC] Consolidate bus controller repos
[dmaap/buscontroller.git] / dmaap-bc / src / main / java / org / onap / dmaap / dbcapi / service / MirrorMakerService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dmaap
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dmaap.dbcapi.service;
22
23 import java.util.ArrayList;
24 import java.util.List;
25 import java.util.Map;
26
27
28
29
30
31
32
33
34
35 //import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
36 import org.onap.dmaap.dbcapi.aaf.AafDecrypt;
37 import org.onap.dmaap.dbcapi.client.MrTopicConnection;
38 import org.onap.dmaap.dbcapi.database.DatabaseClass;
39 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
40 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
41 import org.onap.dmaap.dbcapi.model.ApiError;
42 import org.onap.dmaap.dbcapi.model.MR_Cluster;
43 import org.onap.dmaap.dbcapi.model.MirrorMaker;
44 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
45 import org.onap.dmaap.dbcapi.util.DmaapConfig;
46 import org.onap.dmaap.dbcapi.util.RandomInteger;
47
48 public class MirrorMakerService extends BaseLoggingClass {
49         
50         private Map<String, MirrorMaker> mirrors = DatabaseClass.getMirrorMakers();
51         private static MrTopicConnection prov;
52         private static AafDecrypt decryptor;
53         
54         static final String PROV_USER_PROPERTY = "MM.ProvUserMechId";
55         static final String PROV_PWD_PROPERTY = "MM.ProvUserPwd";
56         static final String PROV_PWD_DEFAULT = "pwdNotSet";
57         static final String SOURCE_REPLICATION_PORT_PROPERTY = "MR.SourceReplicationPort";
58         static final String SOURCE_REPLICATION_PORT_DEFAULT = "9092";
59         static final String TARGET_REPLICATION_PORT_PROPERTY = "MR.TargetReplicationPort";
60         static final String TARGET_REPLICATION_PORT_DEFAULT = "2181";
61         
62         private static String provUser;
63         private static String provUserPwd;
64         private static String defaultProducerPort;
65         private static String defaultConsumerPort;
66         private static String centralFqdn;
67         private int maxTopicsPerMM;
68         private boolean mmPerMR;
69         
70         public MirrorMakerService() {
71                 super();
72                 decryptor = new AafDecrypt();
73                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
74                 provUser = p.getProperty(PROV_USER_PROPERTY);
75                 provUserPwd = decryptor.decrypt(p.getProperty( PROV_PWD_PROPERTY, PROV_PWD_DEFAULT ));
76                 defaultProducerPort = p.getProperty( SOURCE_REPLICATION_PORT_PROPERTY, SOURCE_REPLICATION_PORT_DEFAULT );
77                 defaultConsumerPort = p.getProperty( TARGET_REPLICATION_PORT_PROPERTY, TARGET_REPLICATION_PORT_DEFAULT );       
78                 centralFqdn = p.getProperty("MR.CentralCname", "notSet");
79                 maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
80                 mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
81         }
82
83         // will create a MM on MMagent if needed
84         // will update the MMagent whitelist with all topics for this MM
85         public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
86                 logger.info( "updateMirrorMaker");
87         
88                 prov = new MrTopicConnection( provUser, provUserPwd );
89         
90                 DmaapService dmaap = new DmaapService();
91                 MR_ClusterService clusters = new MR_ClusterService();
92                 MR_Cluster target_cluster = null;
93                 String override = null;
94                 
95                 if ( ! mmPerMR ) {
96                         // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility
97                         //  we use a cname for the central MR cluster that is active, and provision on agent topic on that target
98                         // but only send 1 message so MM Agents can read it relying on kafka delivery
99                         for( MR_Cluster cluster: clusters.getCentralClusters() ) {
100
101                                 target_cluster = cluster;
102                                 override = centralFqdn;
103                                 // we only want to send one message even if there are multiple central clusters
104                                 break;
105                         
106                         } 
107                 } else {
108                         // In ONAP deployment architecture, the MM Agent is deployed with each target MR
109                         target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
110                         override = null;
111                 }
112                 
113                 prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override  );
114                 ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
115                 if ( ! resp.is2xx() ) {
116
117                         errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
118                         mm.setStatus(DmaapObject_Status.INVALID);
119                 } else {
120                         prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
121                         resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
122                         if ( ! resp.is2xx()) {
123                                 errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
124                                 mm.setStatus(DmaapObject_Status.INVALID);
125                         } else {
126                                 mm.setStatus(DmaapObject_Status.VALID);
127                         }
128                 }
129
130                 mm.setLastMod();
131                 return mirrors.put( mm.getMmName(), mm);
132         }
133         public MirrorMaker getMirrorMaker( String part1, String part2, int index ) {
134                 String targetPart;
135
136                 // original mm names did not have any index, so leave off index 0 for
137                 // backwards compatibility
138                 if ( index == 0 ) {
139                         targetPart = part2;
140                 } else {
141                         targetPart = part2 + "_" + index;
142                 }
143                 logger.info( "getMirrorMaker using " + part1 + " and " + targetPart );
144                 return mirrors.get(MirrorMaker.genKey(part1, targetPart));
145         }
146         public MirrorMaker getMirrorMaker( String part1, String part2 ) {
147                 logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
148                 return mirrors.get(MirrorMaker.genKey(part1, part2));
149         }       
150         public MirrorMaker getMirrorMaker( String key ) {
151                 logger.info( "getMirrorMaker using " + key);
152                 return mirrors.get(key);
153         }
154         
155         
156         public void delMirrorMaker( MirrorMaker mm ) {
157                 logger.info( "delMirrorMaker");
158                 mirrors.remove(mm.getMmName());
159         }
160         
161         // TODO: this should probably return sequential values or get replaced by the MM client API
162         // but it should be sufficient for initial 1610 development
163         public static String genTransactionId() {
164                 RandomInteger ri = new RandomInteger(100000);
165             int randomInt = ri.next();
166             return Integer.toString(randomInt);
167         }
168         public List<String> getAllMirrorMakers() {
169                 List<String> ret = new ArrayList<String>();
170                 for( String key: mirrors.keySet()) {
171                         ret.add( key );
172                 }
173                 
174                 return ret;
175         }
176         
177         public MirrorMaker findNextMM( String source, String target, String fqtn ) {
178                 int i = 0;
179                 MirrorMaker mm = null;
180                 while( mm == null ) {
181                         
182                         mm = this.getMirrorMaker( source, target, i);
183                         if ( mm == null ) {
184                                 mm = new MirrorMaker(source, target, i);
185                         }
186                         if ( mm.getTopics().contains(fqtn) ) {
187                                 break;
188                         }
189                         if ( mm.getTopicCount() >= maxTopicsPerMM ) {
190                                 logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics.  Moving to next MM");
191                                 i++;
192                                 mm = null;
193                         }
194                 }
195          
196                 
197                 return mm;
198         }
199
200         public MirrorMaker splitMM( MirrorMaker orig ) {
201                 
202                 String source = orig.getSourceCluster();
203                 String target = orig.getTargetCluster();
204                 
205                 
206                 ArrayList<String> whitelist = orig.getTopics();
207                 while( whitelist.size() > maxTopicsPerMM ) {
208                         
209                         int last = whitelist.size() - 1;
210                         String topic = whitelist.get(last);
211                         whitelist.remove(last);
212                         MirrorMaker mm = this.findNextMM( source, target, "aValueThatShouldNotMatchAnything" );
213                         mm.addTopic(topic);     
214                         this.updateMirrorMaker(mm);
215                 }
216                 
217                 orig.setTopics(whitelist);
218
219                 return orig;
220                 
221         }
222         
223         public static String getProvUser() {
224                 return provUser;
225         }
226
227         public static void setProvUser(String provUser) {
228                 MirrorMakerService.provUser = provUser;
229         }
230
231         public static String getProvUserPwd() {
232                 return provUserPwd;
233         }
234
235         public static void setProvUserPwd(String provUserPwd) {
236                 MirrorMakerService.provUserPwd = provUserPwd;
237         }
238
239         public static String getDefaultProducerPort() {
240                 return defaultProducerPort;
241         }
242
243         public static void setDefaultProducerPort(String defaultProducerPort) {
244                 MirrorMakerService.defaultProducerPort = defaultProducerPort;
245         }
246
247         public static String getDefaultConsumerPort() {
248                 return defaultConsumerPort;
249         }
250
251         public static void setDefaultConsumerPort(String defaultConsumerPort) {
252                 MirrorMakerService.defaultConsumerPort = defaultConsumerPort;
253         }
254
255 }