Sonar critical issues
[dmaap/messagerouter/dmaapclient.git] / src / main / java / com / att / nsa / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.nsa.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import com.att.nsa.mr.client.HostSelector;
55 import com.att.nsa.mr.client.MRBatchingPublisher;
56 import com.att.nsa.mr.client.response.MRPublisherResponse;
57 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
58
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60         private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62         public static class Builder {
63                 public Builder() {
64                 }
65
66                 public Builder againstUrls(Collection<String> baseUrls) {
67                         fUrls = baseUrls;
68                         return this;
69                 }
70
71                 public Builder onTopic(String topic) {
72                         fTopic = topic;
73                         return this;
74                 }
75
76                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
77                         fMaxBatchSize = maxBatchSize;
78                         fMaxBatchAgeMs = maxBatchAgeMs;
79                         return this;
80                 }
81
82                 public Builder compress(boolean compress) {
83                         fCompress = compress;
84                         return this;
85                 }
86
87                 public Builder httpThreadTime(int threadOccuranceTime) {
88                         this.threadOccuranceTime = threadOccuranceTime;
89                         return this;
90                 }
91
92                 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
93                         fAllowSelfSignedCerts = allowSelfSignedCerts;
94                         return this;
95                 }
96
97                 public Builder withResponse(boolean withResponse) {
98                         fWithResponse = withResponse;
99                         return this;
100                 }
101
102                 public MRSimplerBatchPublisher build() {
103                         if (!fWithResponse) {
104                                 try {
105                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
106                                                         fAllowSelfSignedCerts, threadOccuranceTime);
107                                 } catch (MalformedURLException e) {
108                                         throw new IllegalArgumentException(e);
109                                 }
110                         } else {
111                                 try {
112                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
113                                                         fAllowSelfSignedCerts, fMaxBatchSize);
114                                 } catch (MalformedURLException e) {
115                                         throw new IllegalArgumentException(e);
116                                 }
117                         }
118
119                 }
120
121                 private Collection<String> fUrls;
122                 private String fTopic;
123                 private int fMaxBatchSize = 100;
124                 private long fMaxBatchAgeMs = 1000;
125                 private boolean fCompress = false;
126                 private int threadOccuranceTime = 50;
127                 private boolean fAllowSelfSignedCerts = false;
128                 private boolean fWithResponse = false;
129
130         };
131
132         @Override
133         public int send(String partition, String msg) {
134                 return send(new message(partition, msg));
135         }
136
137         @Override
138         public int send(String msg) {
139                 return send(new message(null, msg));
140         }
141
142         @Override
143         public int send(message msg) {
144                 final LinkedList<message> list = new LinkedList<message>();
145                 list.add(msg);
146                 return send(list);
147         }
148
149         @Override
150         public synchronized int send(Collection<message> msgs) {
151                 if (fClosed) {
152                         throw new IllegalStateException("The publisher was closed.");
153                 }
154
155                 for (message userMsg : msgs) {
156                         fPending.add(new TimestampedMessage(userMsg));
157                 }
158                 return getPendingMessageCount();
159         }
160
161         @Override
162         public synchronized int getPendingMessageCount() {
163                 return fPending.size();
164         }
165
166         @Override
167         public void close() {
168                 try {
169                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
170                         if (remains.size() > 0) {
171                                 getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
172                                                 + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
173                         }
174                 } catch (InterruptedException e) {
175                         getLog().warn("Possible message loss. " + e.getMessage(), e);
176                         Thread.currentThread().interrupt();
177                 } catch (IOException e) {
178                         getLog().warn("Possible message loss. " + e.getMessage(), e);
179                 }
180         }
181
182         @Override
183         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
184                 synchronized (this) {
185                         fClosed = true;
186
187                         // stop the background sender
188                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
189                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
190                         fExec.shutdown();
191                 }
192
193                 final long now = Clock.now();
194                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
195                 final long timeoutAtMs = now + waitInMs;
196
197                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
198                         send(true);
199                         Thread.sleep(250);
200                 }
201
202                 synchronized (this) {
203                         final LinkedList<message> result = new LinkedList<message>();
204                         fPending.drainTo(result);
205                         return result;
206                 }
207         }
208
209         /**
210          * Possibly send a batch to the MR server. This is called by the background
211          * thread and the close() method
212          * 
213          * @param force
214          */
215         private synchronized void send(boolean force) {
216                 if (force || shouldSendNow()) {
217                         if (!sendBatch()) {
218                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
219
220                                 // note the time for back-off
221                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
222                         }
223                 }
224         }
225
226         private synchronized boolean shouldSendNow() {
227                 boolean shouldSend = false;
228                 if (fPending.size() > 0) {
229                         final long nowMs = Clock.now();
230
231                         shouldSend = (fPending.size() >= fMaxBatchSize);
232                         if (!shouldSend) {
233                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
234                                 shouldSend = sendAtMs <= nowMs;
235                         }
236
237                         // however, wait after an error
238                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
239                 }
240                 return shouldSend;
241         }
242
243         /**
244          * Method to parse published JSON Objects and Arrays
245          * 
246          * @return JSONArray
247          */
248         private JSONArray parseJSON() {
249                 JSONArray jsonArray = new JSONArray();
250                 for (TimestampedMessage m : fPending) {
251                         JSONTokener jsonTokener = new JSONTokener(m.fMsg);
252                         JSONObject jsonObject = null;
253                         JSONArray tempjsonArray = null;
254                         final char firstChar = jsonTokener.next();
255                         jsonTokener.back();
256                         if ('[' == firstChar) {
257                                 tempjsonArray = new JSONArray(jsonTokener);
258                                 if (null != tempjsonArray) {
259                                         for (int i = 0; i < tempjsonArray.length(); i++) {
260                                                 jsonArray.put(tempjsonArray.getJSONObject(i));
261                                         }
262                                 }
263                         } else {
264                                 jsonObject = new JSONObject(jsonTokener);
265                                 jsonArray.put(jsonObject);
266                         }
267
268                 }
269                 return jsonArray;
270         }
271
272         private synchronized boolean sendBatch() {
273                 // it's possible for this call to be made with an empty list. in this
274                 // case, just return.
275                 if (fPending.size() < 1) {
276                         return true;
277                 }
278
279                 final long nowMs = Clock.now();
280
281                 host = this.fHostSelector.selectBaseHost();
282
283                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
284                                 props.getProperty("partition"));
285
286                 try {
287                         /*
288                          * final String contentType = fCompress ?
289                          * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
290                          */
291
292                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
293                         OutputStream os = baseStream;
294                         final String contentType = props.getProperty("contenttype");
295                         if (contentType.equalsIgnoreCase("application/json")) {
296                                 JSONArray jsonArray = parseJSON();
297                                 os.write(jsonArray.toString().getBytes());
298                                 os.close();
299
300                         } else if (contentType.equalsIgnoreCase("text/plain")) {
301                                 for (TimestampedMessage m : fPending) {
302                                         os.write(m.fMsg.getBytes());
303                                         os.write('\n');
304                                 }
305                                 os.close();
306                         } else if (contentType.equalsIgnoreCase("application/cambria")
307                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
308                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
309                                         os = new GZIPOutputStream(baseStream);
310                                 }
311                                 for (TimestampedMessage m : fPending) {
312
313                                         os.write(("" + m.fPartition.length()).getBytes());
314                                         os.write('.');
315                                         os.write(("" + m.fMsg.length()).getBytes());
316                                         os.write('.');
317                                         os.write(m.fPartition.getBytes());
318                                         os.write(m.fMsg.getBytes());
319                                         os.write('\n');
320                                 }
321                                 os.close();
322                         } else {
323                                 for (TimestampedMessage m : fPending) {
324                                         os.write(m.fMsg.getBytes());
325
326                                 }
327                                 os.close();
328                         }
329
330                         final long startMs = Clock.now();
331                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
332
333                                 DME2Configue();
334
335                                 Thread.sleep(5);
336                                 getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
337                                                 + (nowMs - fPending.peek().timestamp) + " ms");
338                                 sender.setPayload(os.toString());
339                                 String dmeResponse = sender.sendAndWait(5000L);
340
341                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
342                                 getLog().info(logLine);
343                                 fPending.clear();
344                                 return true;
345                         }
346
347                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
348                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
349                                                 + (nowMs - fPending.peek().timestamp) + " ms");
350                                 final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
351                                                 username, password, protocolFlag);
352                                 // Here we are checking for error response. If HTTP status
353                                 // code is not within the http success response code
354                                 // then we consider this as error and return false
355                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
356                                         return false;
357                                 }
358                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
359                                 getLog().info(logLine);
360                                 fPending.clear();
361                                 return true;
362                         }
363
364                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
365                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
366                                                 + (nowMs - fPending.peek().timestamp) + " ms");
367                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
368                                                 protocolFlag);
369
370                                 // Here we are checking for error response. If HTTP status
371                                 // code is not within the http success response code
372                                 // then we consider this as error and return false
373                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
374                                         return false;
375                                 }
376                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
377                                 getLog().info(logLine);
378                                 fPending.clear();
379                                 return true;
380                         }
381                 } catch (IllegalArgumentException x) {
382                         getLog().warn(x.getMessage(), x);
383                 } catch (IOException x) {
384                         getLog().warn(x.getMessage(), x);
385                 } catch (HttpException x) {
386                         getLog().warn(x.getMessage(), x);
387                 } catch (Exception x) {
388                         getLog().warn(x.getMessage(), x);
389                 }
390                 return false;
391         }
392
393         public synchronized MRPublisherResponse sendBatchWithResponse() {
394                 // it's possible for this call to be made with an empty list. in this
395                 // case, just return.
396                 if (fPending.size() < 1) {
397                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
398                         pubResponse.setResponseMessage("No Messages to send");
399                         return pubResponse;
400                 }
401
402                 final long nowMs = Clock.now();
403
404                 host = this.fHostSelector.selectBaseHost();
405
406                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
407                                 props.getProperty("partition"));
408                 OutputStream os = null;
409                 try {
410
411                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
412                         os = baseStream;
413                         final String contentType = props.getProperty("contenttype");
414                         if (contentType.equalsIgnoreCase("application/json")) {
415                                 JSONArray jsonArray = parseJSON();
416                                 os.write(jsonArray.toString().getBytes());
417                         } else if (contentType.equalsIgnoreCase("text/plain")) {
418                                 for (TimestampedMessage m : fPending) {
419                                         os.write(m.fMsg.getBytes());
420                                         os.write('\n');
421                                 }
422                         } else if (contentType.equalsIgnoreCase("application/cambria")
423                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
424                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
425                                         os = new GZIPOutputStream(baseStream);
426                                 }
427                                 for (TimestampedMessage m : fPending) {
428
429                                         os.write(("" + m.fPartition.length()).getBytes());
430                                         os.write('.');
431                                         os.write(("" + m.fMsg.length()).getBytes());
432                                         os.write('.');
433                                         os.write(m.fPartition.getBytes());
434                                         os.write(m.fMsg.getBytes());
435                                         os.write('\n');
436                                 }
437                                 os.close();
438                         } else {
439                                 for (TimestampedMessage m : fPending) {
440                                         os.write(m.fMsg.getBytes());
441
442                                 }
443                         }
444
445                         final long startMs = Clock.now();
446                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
447
448                                 try {
449                                         DME2Configue();
450
451                                         Thread.sleep(5);
452                                         getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
453                                                         + (nowMs - fPending.peek().timestamp) + " ms");
454                                         sender.setPayload(os.toString());
455
456                                         String dmeResponse = sender.sendAndWait(5000L);
457
458                                         pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
459
460                                         if (Integer.valueOf(pubResponse.getResponseCode()) < 200
461                                                         || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
462
463                                                 return pubResponse;
464                                         }
465                                         final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
466                                         getLog().info(logLine);
467                                         fPending.clear();
468
469                                 } catch (DME2Exception x) {
470                                         getLog().warn(x.getMessage(), x);
471                                         pubResponse.setResponseCode(x.getErrorCode());
472                                         pubResponse.setResponseMessage(x.getErrorMessage());
473                                 } catch (URISyntaxException x) {
474
475                                         getLog().warn(x.getMessage(), x);
476                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
477                                         pubResponse.setResponseMessage(x.getMessage());
478                                 } catch (Exception x) {
479
480                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
481                                         pubResponse.setResponseMessage(x.getMessage());
482                                         logger.error("exception: ", x);
483
484                                 }
485
486                                 return pubResponse;
487                         }
488
489                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
490                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
491                                                 + (nowMs - fPending.peek().timestamp) + " ms");
492                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
493                                                 authDate, username, password, protocolFlag);
494                                 // Here we are checking for error response. If HTTP status
495                                 // code is not within the http success response code
496                                 // then we consider this as error and return false
497
498                                 pubResponse = createMRPublisherResponse(result, pubResponse);
499
500                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
501                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
502
503                                         return pubResponse;
504                                 }
505
506                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
507                                 getLog().info(logLine);
508                                 fPending.clear();
509                                 return pubResponse;
510                         }
511
512                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
513                                 getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
514                                                 + (nowMs - fPending.peek().timestamp) + " ms");
515                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
516                                                 password, protocolFlag);
517
518                                 // Here we are checking for error response. If HTTP status
519                                 // code is not within the http success response code
520                                 // then we consider this as error and return false
521                                 pubResponse = createMRPublisherResponse(result, pubResponse);
522
523                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
524                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
525
526                                         return pubResponse;
527                                 }
528
529                                 final String logLine = String.valueOf((Clock.now() - startMs));
530                                 getLog().info(logLine);
531                                 fPending.clear();
532                                 return pubResponse;
533                         }
534                 } catch (IllegalArgumentException x) {
535                         getLog().warn(x.getMessage(), x);
536                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
537                         pubResponse.setResponseMessage(x.getMessage());
538
539                 } catch (IOException x) {
540                         getLog().warn(x.getMessage(), x);
541                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
542                         pubResponse.setResponseMessage(x.getMessage());
543
544                 } catch (HttpException x) {
545                         getLog().warn(x.getMessage(), x);
546                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
547                         pubResponse.setResponseMessage(x.getMessage());
548
549                 } catch (Exception x) {
550                         getLog().warn(x.getMessage(), x);
551
552                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
553                         pubResponse.setResponseMessage(x.getMessage());
554
555                 }
556
557                 finally {
558                         if (fPending.size() > 0) {
559                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
560                                 pubResponse.setPendingMsgs(fPending.size());
561                         }
562                         if (os != null) {
563                                 try {
564                                         os.close();
565                                 } catch (Exception x) {
566                                         getLog().warn(x.getMessage(), x);
567                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
568                                         pubResponse.setResponseMessage("Error in closing Output Stream");
569                                 }
570                         }
571                 }
572
573                 return pubResponse;
574         }
575
576         private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
577
578                 if (reply.isEmpty()) {
579
580                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
581                         mrPubResponse.setResponseMessage("Please verify the Producer properties");
582                 } else if (reply.startsWith("{")) {
583                         JSONObject jObject = new JSONObject(reply);
584                         if (jObject.has("message") && jObject.has("status")) {
585                                 String message = jObject.getString("message");
586                                 if (null != message) {
587                                         mrPubResponse.setResponseMessage(message);
588                                 }
589                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
590                         } else {
591                                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
592                                 mrPubResponse.setResponseMessage(reply);
593                         }
594                 } else if (reply.startsWith("<")) {
595                         String responseCode = getHTTPErrorResponseCode(reply);
596                         if (responseCode.contains("403")) {
597                                 responseCode = "403";
598                         }
599                         mrPubResponse.setResponseCode(responseCode);
600                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
601                 }
602
603                 return mrPubResponse;
604         }
605
606         private final String fTopic;
607         private final int fMaxBatchSize;
608         private final long fMaxBatchAgeMs;
609         private final boolean fCompress;
610         private int threadOccuranceTime;
611         private boolean fClosed;
612         private String username;
613         private String password;
614         private String host;
615
616         // host selector
617         private HostSelector fHostSelector = null;
618
619         private final LinkedBlockingQueue<TimestampedMessage> fPending;
620         private long fDontSendUntilMs;
621         private final ScheduledThreadPoolExecutor fExec;
622
623         private String latitude;
624         private String longitude;
625         private String version;
626         private String serviceName;
627         private String env;
628         private String partner;
629         private String routeOffer;
630         private String subContextPath;
631         private String protocol;
632         private String methodType;
633         private String url;
634         private String dmeuser;
635         private String dmepassword;
636         private String contentType;
637         private static final long sfWaitAfterError = 10000;
638         private HashMap<String, String> DMETimeOuts;
639         private DME2Client sender;
640         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
641         private String authKey;
642         private String authDate;
643         private String handlers;
644         private Properties props;
645         public static String routerFilePath;
646         protected static final Map<String, String> headers = new HashMap<String, String>();
647         public static MultivaluedMap<String, Object> headersMap;
648
649         private MRPublisherResponse pubResponse;
650
651         public MRPublisherResponse getPubResponse() {
652                 return pubResponse;
653         }
654
655         public void setPubResponse(MRPublisherResponse pubResponse) {
656                 this.pubResponse = pubResponse;
657         }
658
659         public static String getRouterFilePath() {
660                 return routerFilePath;
661         }
662
663         public static void setRouterFilePath(String routerFilePath) {
664                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
665         }
666
667         public Properties getProps() {
668                 return props;
669         }
670
671         public void setProps(Properties props) {
672                 this.props = props;
673         }
674
675         public String getProtocolFlag() {
676                 return protocolFlag;
677         }
678
679         public void setProtocolFlag(String protocolFlag) {
680                 this.protocolFlag = protocolFlag;
681         }
682
683         private void DME2Configue() throws Exception {
684                 try {
685
686                         /*
687                          * FileReader reader = new FileReader(new File (producerFilePath));
688                          * Properties props = new Properties(); props.load(reader);
689                          */
690                         latitude = props.getProperty("Latitude");
691                         longitude = props.getProperty("Longitude");
692                         version = props.getProperty("Version");
693                         serviceName = props.getProperty("ServiceName");
694                         env = props.getProperty("Environment");
695                         partner = props.getProperty("Partner");
696                         routeOffer = props.getProperty("routeOffer");
697                         subContextPath = props.getProperty("SubContextPath") + fTopic;
698                         /*
699                          * if(props.getProperty("partition")!=null &&
700                          * !props.getProperty("partition").equalsIgnoreCase("")){
701                          * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
702                          * "partition"); }
703                          */
704                         protocol = props.getProperty("Protocol");
705                         methodType = props.getProperty("MethodType");
706                         dmeuser = props.getProperty("username");
707                         dmepassword = props.getProperty("password");
708                         contentType = props.getProperty("contenttype");
709                         handlers = props.getProperty("sessionstickinessrequired");
710                         routerFilePath = props.getProperty("DME2preferredRouterFilePath");
711
712                         /**
713                          * Changes to DME2Client url to use Partner for auto failover
714                          * between data centers When Partner value is not provided use the
715                          * routeOffer value for auto failover within a cluster
716                          */
717
718                         String partitionKey = props.getProperty("partition");
719
720                         if (partner != null && !partner.isEmpty()) {
721                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
722                                                 + partner;
723                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
724                                         url = url + "&partitionKey=" + partitionKey;
725                                 }
726                         } else if (routeOffer != null && !routeOffer.isEmpty()) {
727                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
728                                                 + routeOffer;
729                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
730                                         url = url + "&partitionKey=" + partitionKey;
731                                 }
732                         }
733
734                         DMETimeOuts = new HashMap<String, String>();
735                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
736                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
737                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
738                         DMETimeOuts.put("Content-Type", contentType);
739                         System.setProperty("AFT_LATITUDE", latitude);
740                         System.setProperty("AFT_LONGITUDE", longitude);
741                         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
742                         // System.setProperty("DME2.DEBUG", "true");
743
744                         // SSL changes
745                         // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
746                         // "SSLv3,TLSv1,TLSv1.1");
747                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
748                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
749                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
750
751                         // SSL changes
752
753                         sender = new DME2Client(new URI(url), 5000L);
754
755                         sender.setAllowAllHttpReturnCodes(true);
756                         sender.setMethod(methodType);
757                         sender.setSubContext(subContextPath);
758                         sender.setCredentials(dmeuser, dmepassword);
759                         sender.setHeaders(DMETimeOuts);
760                         if (handlers.equalsIgnoreCase("yes")) {
761                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
762                                                 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
763                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
764                                                 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
765                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
766                         } else {
767                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
768                         }
769                 } catch (DME2Exception x) {
770                         getLog().warn(x.getMessage(), x);
771                         throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
772                 } catch (URISyntaxException x) {
773
774                         getLog().warn(x.getMessage(), x);
775                         throw new URISyntaxException(url, x.getMessage());
776                 } catch (Exception x) {
777
778                         getLog().warn(x.getMessage(), x);
779                         throw new IllegalArgumentException(x.getMessage());
780                 }
781         }
782
783         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
784                         boolean compress) throws MalformedURLException {
785                 super(hosts);
786
787                 if (topic == null || topic.length() < 1) {
788                         throw new IllegalArgumentException("A topic must be provided.");
789                 }
790
791                 fHostSelector = new HostSelector(hosts, null);
792                 fClosed = false;
793                 fTopic = topic;
794                 fMaxBatchSize = maxBatchSize;
795                 fMaxBatchAgeMs = maxBatchAgeMs;
796                 fCompress = compress;
797
798                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
799                 fDontSendUntilMs = 0;
800                 fExec = new ScheduledThreadPoolExecutor(1);
801                 pubResponse = new MRPublisherResponse();
802
803         }
804
805         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
806                         boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
807                 super(hosts);
808
809                 if (topic == null || topic.length() < 1) {
810                         throw new IllegalArgumentException("A topic must be provided.");
811                 }
812
813                 fHostSelector = new HostSelector(hosts, null);
814                 fClosed = false;
815                 fTopic = topic;
816                 fMaxBatchSize = maxBatchSize;
817                 fMaxBatchAgeMs = maxBatchAgeMs;
818                 fCompress = compress;
819                 threadOccuranceTime = httpThreadOccurnace;
820                 fPending = new LinkedBlockingQueue<TimestampedMessage>();
821                 fDontSendUntilMs = 0;
822                 fExec = new ScheduledThreadPoolExecutor(1);
823                 fExec.scheduleAtFixedRate(new Runnable() {
824                         @Override
825                         public void run() {
826                                 send(false);
827                         }
828                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
829         }
830
831         private static class TimestampedMessage extends message {
832                 public TimestampedMessage(message m) {
833                         super(m);
834                         timestamp = Clock.now();
835                 }
836
837                 public final long timestamp;
838         }
839
840         public String getUsername() {
841                 return username;
842         }
843
844         public void setUsername(String username) {
845                 this.username = username;
846         }
847
848         public String getPassword() {
849                 return password;
850         }
851
852         public void setPassword(String password) {
853                 this.password = password;
854         }
855
856         public String getHost() {
857                 return host;
858         }
859
860         public void setHost(String host) {
861                 this.host = host;
862         }
863
864         public String getContentType() {
865                 return contentType;
866         }
867
868         public void setContentType(String contentType) {
869                 this.contentType = contentType;
870         }
871
872         public String getAuthKey() {
873                 return authKey;
874         }
875
876         public void setAuthKey(String authKey) {
877                 this.authKey = authKey;
878         }
879
880         public String getAuthDate() {
881                 return authDate;
882         }
883
884         public void setAuthDate(String authDate) {
885                 this.authDate = authDate;
886         }
887
888 }