d9076a49bb2cbbd28843d74bb30d5ad1e56afbe6
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package org.onap.dmaap.mr.client.impl;
23
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.OutputStream;
27 import java.net.MalformedURLException;
28 import java.net.URI;
29 import java.net.URISyntaxException;
30 import java.util.Collection;
31 import java.util.HashMap;
32 import java.util.LinkedList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Properties;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.TimeUnit;
39 import java.util.zip.GZIPOutputStream;
40
41 import javax.ws.rs.core.MultivaluedMap;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import org.apache.http.HttpException;
47 import org.apache.http.HttpStatus;
48 import org.json.JSONArray;
49 import org.json.JSONObject;
50 import org.json.JSONTokener;
51
52 import com.att.aft.dme2.api.DME2Client;
53 import com.att.aft.dme2.api.DME2Exception;
54 import org.onap.dmaap.mr.client.HostSelector;
55 import org.onap.dmaap.mr.client.MRBatchingPublisher;
56 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
57 import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
58
59 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
60         private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
61
62         public static class Builder {
63                 public Builder() {
64                 }
65
66                 public Builder againstUrls(Collection<String> baseUrls) {
67                         fUrls = baseUrls;
68                         return this;
69                 }
70                 
71                 public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype )            
72                 {               
73                         fUrls = baseUrls;               
74                         fServiceName = serviceName;             
75                         fTransportype = transportype;           
76                         return this;            
77                 }
78
79                 public Builder onTopic(String topic) {
80                         fTopic = topic;
81                         return this;
82                 }
83
84                 public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
85                         fMaxBatchSize = maxBatchSize;
86                         fMaxBatchAgeMs = maxBatchAgeMs;
87                         return this;
88                 }
89
90                 public Builder compress(boolean compress) {
91                         fCompress = compress;
92                         return this;
93                 }
94
95                 public Builder httpThreadTime(int threadOccuranceTime) {
96                         this.threadOccuranceTime = threadOccuranceTime;
97                         return this;
98                 }
99
100                 public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
101                         fAllowSelfSignedCerts = allowSelfSignedCerts;
102                         return this;
103                 }
104
105                 public Builder withResponse(boolean withResponse) {
106                         fWithResponse = withResponse;
107                         return this;
108                 }
109
110                 public MRSimplerBatchPublisher build() {
111                         if (!fWithResponse) {
112                                 try {
113                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
114                                                         fAllowSelfSignedCerts, threadOccuranceTime);
115                                 } catch (MalformedURLException e) {
116                                         throw new IllegalArgumentException(e);
117                                 }
118                         } else {
119                                 try {
120                                         return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
121                                                         fAllowSelfSignedCerts, fMaxBatchSize);
122                                 } catch (MalformedURLException e) {
123                                         throw new IllegalArgumentException(e);
124                                 }
125                         }
126
127                 }
128
129                 private Collection<String> fUrls;
130                 private Collection<String> fServiceName;                
131                 private String fTransportype;   
132                 private String fTopic;
133                 private int fMaxBatchSize = 100;
134                 private long fMaxBatchAgeMs = 1000;
135                 private boolean fCompress = false;
136                 private int threadOccuranceTime = 50;
137                 private boolean fAllowSelfSignedCerts = false;
138                 private boolean fWithResponse = false;
139
140         };
141
142         @Override
143         public int send(String partition, String msg) {
144                 return send(new message(partition, msg));
145         }
146
147         @Override
148         public int send(String msg) {
149                 return send(new message(null, msg));
150         }
151
152         @Override
153         public int send(message msg) {
154                 final LinkedList<message> list = new LinkedList<>();
155                 list.add(msg);
156                 return send(list);
157         }
158
159         @Override
160         public synchronized int send(Collection<message> msgs) {
161                 if (fClosed) {
162                         throw new IllegalStateException("The publisher was closed.");
163                 }
164
165                 for (message userMsg : msgs) {
166                         fPending.add(new TimestampedMessage(userMsg));
167                 }
168                 return getPendingMessageCount();
169         }
170
171         @Override
172         public synchronized int getPendingMessageCount() {
173                 return fPending.size();
174         }
175
176         @Override
177         public void close() {
178                 try {
179                         final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
180                         if (remains.isEmpty()) {
181                                 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
182                                         remains.size());
183                         }
184                 } catch (InterruptedException e) {
185                         getLog().warn("Possible message loss. " + e.getMessage(), e);
186                         Thread.currentThread().interrupt();
187                 } catch (IOException e) {
188                         getLog().warn("Possible message loss. " + e.getMessage(), e);
189                 }
190         }
191
192         @Override
193         public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
194                 synchronized (this) {
195                         fClosed = true;
196
197                         // stop the background sender
198                         fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
199                         fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
200                         fExec.shutdown();
201                 }
202
203                 final long now = Clock.now();
204                 final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
205                 final long timeoutAtMs = now + waitInMs;
206
207                 while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
208                         send(true);
209                         Thread.sleep(250);
210                 }
211
212                 synchronized (this) {
213                         final LinkedList<message> result = new LinkedList<>();
214                         fPending.drainTo(result);
215                         return result;
216                 }
217         }
218
219         /**
220          * Possibly send a batch to the MR server. This is called by the background
221          * thread and the close() method
222          * 
223          * @param force
224          */
225         private synchronized void send(boolean force) {
226                 if ((force || shouldSendNow()) && !sendBatch()) {
227                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
228
229                                 // note the time for back-off
230                                 fDontSendUntilMs = sfWaitAfterError + Clock.now();
231                 }
232         }
233
234         private synchronized boolean shouldSendNow() {
235                 boolean shouldSend = false;
236                 if (fPending.isEmpty()) {
237                         final long nowMs = Clock.now();
238
239                         shouldSend = (fPending.size() >= fMaxBatchSize);
240                         if (!shouldSend) {
241                                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
242                                 shouldSend = sendAtMs <= nowMs;
243                         }
244
245                         // however, wait after an error
246                         shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
247                 }
248                 return shouldSend;
249         }
250
251         /**
252          * Method to parse published JSON Objects and Arrays
253          * 
254          * @return JSONArray
255          */
256         private JSONArray parseJSON() {
257                 JSONArray jsonArray = new JSONArray();
258                 for (TimestampedMessage m : fPending) {
259                         JSONTokener jsonTokener = new JSONTokener(m.fMsg);
260                         JSONObject jsonObject = null;
261                         JSONArray tempjsonArray = null;
262                         final char firstChar = jsonTokener.next();
263                         jsonTokener.back();
264                         if ('[' == firstChar) {
265                                 tempjsonArray = new JSONArray(jsonTokener);
266                                 if (null != tempjsonArray) {
267                                         for (int i = 0; i < tempjsonArray.length(); i++) {
268                                                 jsonArray.put(tempjsonArray.getJSONObject(i));
269                                         }
270                                 }
271                         } else {
272                                 jsonObject = new JSONObject(jsonTokener);
273                                 jsonArray.put(jsonObject);
274                         }
275
276                 }
277                 return jsonArray;
278         }
279
280         private synchronized boolean sendBatch() {
281                 // it's possible for this call to be made with an empty list. in this
282                 // case, just return.
283                 if (fPending.isEmpty()) {
284                         return true;
285                 }
286
287                 final long nowMs = Clock.now();
288
289                 if (this.fHostSelector != null) {
290                         host = this.fHostSelector.selectBaseHost();
291                 }
292
293                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
294                                 props.getProperty("partition"));
295
296                 try {
297                         
298                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
299                         OutputStream os = baseStream;
300                         final String contentType = props.getProperty("contenttype");
301                         if (contentType.equalsIgnoreCase("application/json")) {
302                                 JSONArray jsonArray = parseJSON();
303                                 os.write(jsonArray.toString().getBytes());
304                                 os.close();
305
306                         } else if (contentType.equalsIgnoreCase("text/plain")) {
307                                 for (TimestampedMessage m : fPending) {
308                                         os.write(m.fMsg.getBytes());
309                                         os.write('\n');
310                                 }
311                                 os.close();
312                         } else if (contentType.equalsIgnoreCase("application/cambria")
313                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
314                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
315                                         os = new GZIPOutputStream(baseStream);
316                                 }
317                                 for (TimestampedMessage m : fPending) {
318
319                                         os.write(("" + m.fPartition.length()).getBytes());
320                                         os.write('.');
321                                         os.write(("" + m.fMsg.length()).getBytes());
322                                         os.write('.');
323                                         os.write(m.fPartition.getBytes());
324                                         os.write(m.fMsg.getBytes());
325                                         os.write('\n');
326                                 }
327                                 os.close();
328                         } else {
329                                 for (TimestampedMessage m : fPending) {
330                                         os.write(m.fMsg.getBytes());
331
332                                 }
333                                 os.close();
334                         }
335
336                         final long startMs = Clock.now();
337                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
338
339                                 DME2Configue();
340
341                                 Thread.sleep(5);
342                                 getLog().info(String
343                                         .format("sending %d msgs to %s%s. Oldest: %d ms", fPending.size(), url, subContextPath,
344                                                 nowMs - fPending.peek().timestamp));
345                                 sender.setPayload(os.toString());
346                                 String dmeResponse = sender.sendAndWait(5000L);
347
348                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse;
349                                 getLog().info(logLine);
350                                 fPending.clear();
351                                 return true;
352                         }
353
354             if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
355                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
356                                         nowMs - fPending.peek().timestamp);
357                 final JSONObject result =
358                         postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
359                                 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
360                                 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
361                 // Here we are checking for error response. If HTTP status
362                 // code is not within the http success response code
363                 // then we consider this as error and return false
364                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
365                     return false;
366                 }
367                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
368                 getLog().info(logLine);
369                 fPending.clear();
370                 return true;
371             }
372
373                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
374                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
375                                         nowMs - fPending.peek().timestamp);
376                                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
377                                                 protocolFlag);
378
379                                 // Here we are checking for error response. If HTTP status
380                                 // code is not within the http success response code
381                                 // then we consider this as error and return false
382                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
383                                         return false;
384                                 }
385                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
386                                 getLog().info(logLine);
387                                 fPending.clear();
388                                 return true;
389                         }
390                         
391                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
392                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
393                                         nowMs - fPending.peek().timestamp);
394                                 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
395
396                                 // Here we are checking for error response. If HTTP status
397                                 // code is not within the http success response code
398                                 // then we consider this as error and return false
399                                 if (result.getInt("status") < 200 || result.getInt("status") > 299) {
400                                         return false;
401                                 }
402                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
403                                 getLog().info(logLine);
404                                 fPending.clear();
405                                 return true;
406                         }
407                 } catch (Exception x) {
408                         getLog().warn(x.getMessage(), x);
409                 }
410                 return false;
411         }
412
413         public synchronized MRPublisherResponse sendBatchWithResponse() {
414                 // it's possible for this call to be made with an empty list. in this
415                 // case, just return.
416                 if (fPending.isEmpty()) {
417                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
418                         pubResponse.setResponseMessage("No Messages to send");
419                         return pubResponse;
420                 }
421
422                 final long nowMs = Clock.now();
423
424                 host = this.fHostSelector.selectBaseHost();
425
426                 final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
427                                 props.getProperty("partition"));
428                 OutputStream os = null;
429                 try {
430
431                         final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
432                         os = baseStream;
433                         final String contentType = props.getProperty("contenttype");
434                         if (contentType.equalsIgnoreCase("application/json")) {
435                                 JSONArray jsonArray = parseJSON();
436                                 os.write(jsonArray.toString().getBytes());
437                         } else if (contentType.equalsIgnoreCase("text/plain")) {
438                                 for (TimestampedMessage m : fPending) {
439                                         os.write(m.fMsg.getBytes());
440                                         os.write('\n');
441                                 }
442                         } else if (contentType.equalsIgnoreCase("application/cambria")
443                                         || (contentType.equalsIgnoreCase("application/cambria-zip"))) {
444                                 if (contentType.equalsIgnoreCase("application/cambria-zip")) {
445                                         os = new GZIPOutputStream(baseStream);
446                                 }
447                                 for (TimestampedMessage m : fPending) {
448
449                                         os.write(("" + m.fPartition.length()).getBytes());
450                                         os.write('.');
451                                         os.write(("" + m.fMsg.length()).getBytes());
452                                         os.write('.');
453                                         os.write(m.fPartition.getBytes());
454                                         os.write(m.fMsg.getBytes());
455                                         os.write('\n');
456                                 }
457                                 os.close();
458                         } else {
459                                 for (TimestampedMessage m : fPending) {
460                                         os.write(m.fMsg.getBytes());
461
462                                 }
463                         }
464
465                         final long startMs = Clock.now();
466                         if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
467
468                                 try {
469                                         DME2Configue();
470
471                                         Thread.sleep(5);
472                                         getLog().info("sending {} msgs to {}{}. Oldest: {} ms", fPending.size(), url, subContextPath,
473                                                         nowMs - fPending.peek().timestamp);
474                                         sender.setPayload(os.toString());
475
476                                         String dmeResponse = sender.sendAndWait(5000L);
477
478                                         pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
479
480                                         if (Integer.valueOf(pubResponse.getResponseCode()) < 200
481                                                         || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
482
483                                                 return pubResponse;
484                                         }
485                                         final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
486                                         getLog().info(logLine);
487                                         fPending.clear();
488
489                                 } catch (DME2Exception x) {
490                                         getLog().warn(x.getMessage(), x);
491                                         pubResponse.setResponseCode(x.getErrorCode());
492                                         pubResponse.setResponseMessage(x.getErrorMessage());
493                                 } catch (URISyntaxException x) {
494
495                                         getLog().warn(x.getMessage(), x);
496                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
497                                         pubResponse.setResponseMessage(x.getMessage());
498                                 } catch (Exception x) {
499
500                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
501                                         pubResponse.setResponseMessage(x.getMessage());
502                                         logger.error("exception: ", x);
503
504                                 }
505
506                                 return pubResponse;
507                         }
508
509                         if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
510                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
511                                         nowMs - fPending.peek().timestamp);
512                                 final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
513                                                 authDate, username, password, protocolFlag);
514                                 // Here we are checking for error response. If HTTP status
515                                 // code is not within the http success response code
516                                 // then we consider this as error and return false
517
518                                 pubResponse = createMRPublisherResponse(result, pubResponse);
519
520                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
521                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
522
523                                         return pubResponse;
524                                 }
525
526                                 final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
527                                 getLog().info(logLine);
528                                 fPending.clear();
529                                 return pubResponse;
530                         }
531
532                         if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
533                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
534                                         nowMs - fPending.peek().timestamp);
535                                 final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
536                                                 password, protocolFlag);
537
538                                 // Here we are checking for error response. If HTTP status
539                                 // code is not within the http success response code
540                                 // then we consider this as error and return false
541                                 pubResponse = createMRPublisherResponse(result, pubResponse);
542
543                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
544                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
545
546                                         return pubResponse;
547                                 }
548
549                                 final String logLine = String.valueOf((Clock.now() - startMs));
550                                 getLog().info(logLine);
551                                 fPending.clear();
552                                 return pubResponse;
553                         }
554                         
555                         if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
556                                 getLog().info("sending {} msgs to {}. Oldest: {} ms", fPending.size(), httpurl,
557                                         nowMs - fPending.peek().timestamp);
558                                 final String result = postNoAuthWithResponse(httpurl, baseStream.toByteArray(), contentType);
559
560                                 // Here we are checking for error response. If HTTP status
561                                 // code is not within the http success response code
562                                 // then we consider this as error and return false
563                                 pubResponse = createMRPublisherResponse(result, pubResponse);
564
565                                 if (Integer.valueOf(pubResponse.getResponseCode()) < 200
566                                                 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
567
568                                         return pubResponse;
569                                 }
570
571                                 final String logLine = String.valueOf((Clock.now() - startMs));
572                                 getLog().info(logLine);
573                                 fPending.clear();
574                                 return pubResponse;
575                         }
576                 } catch (IllegalArgumentException | HttpException x) {
577                         getLog().warn(x.getMessage(), x);
578                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
579                         pubResponse.setResponseMessage(x.getMessage());
580
581                 } catch (IOException x) {
582                         getLog().warn(x.getMessage(), x);
583                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
584                         pubResponse.setResponseMessage(x.getMessage());
585                 } catch (Exception x) {
586                         getLog().warn(x.getMessage(), x);
587
588                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
589                         pubResponse.setResponseMessage(x.getMessage());
590
591                 }
592
593                 finally {
594                         if (!fPending.isEmpty()) {
595                                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
596                                 pubResponse.setPendingMsgs(fPending.size());
597                         }
598                         if (os != null) {
599                                 try {
600                                         os.close();
601                                 } catch (Exception x) {
602                                         getLog().warn(x.getMessage(), x);
603                                         pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
604                                         pubResponse.setResponseMessage("Error in closing Output Stream");
605                                 }
606                         }
607                 }
608
609                 return pubResponse;
610         }
611
612         public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
613
614                 if (reply.isEmpty()) {
615
616                         mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
617                         mrPubResponse.setResponseMessage("Please verify the Producer properties");
618                 } else if (reply.startsWith("{")) {
619                         JSONObject jObject = new JSONObject(reply);
620                         if (jObject.has("message") && jObject.has("status")) {
621                                 String message = jObject.getString("message");
622                                 if (null != message) {
623                                         mrPubResponse.setResponseMessage(message);
624                                 }
625                                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
626                         } else {
627                                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
628                                 mrPubResponse.setResponseMessage(reply);
629                         }
630                 } else if (reply.startsWith("<")) {
631                         String responseCode = getHTTPErrorResponseCode(reply);
632                         if (responseCode.contains("403")) {
633                                 responseCode = "403";
634                         }
635                         mrPubResponse.setResponseCode(responseCode);
636                         mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
637                 }
638
639                 return mrPubResponse;
640         }
641
642         private final String fTopic;
643         private final int fMaxBatchSize;
644         private final long fMaxBatchAgeMs;
645         private final boolean fCompress;
646         private int threadOccuranceTime;
647         private boolean fClosed;
648         private String username;
649         private String password;
650         private String host;
651
652         // host selector
653         private HostSelector fHostSelector = null;
654
655         private final LinkedBlockingQueue<TimestampedMessage> fPending;
656         private long fDontSendUntilMs;
657         private final ScheduledThreadPoolExecutor fExec;
658
659         private String latitude;
660         private String longitude;
661         private String version;
662         private String serviceName;
663         private String env;
664         private String partner;
665         private String routeOffer;
666         private String subContextPath;
667         private String protocol;
668         private String methodType;
669         private String url;
670         private String dmeuser;
671         private String dmepassword;
672         private String contentType;
673         private static final long sfWaitAfterError = 10000;
674         private HashMap<String, String> DMETimeOuts;
675         private DME2Client sender;
676         public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
677         private String authKey;
678         private String authDate;
679         private String handlers;
680         private Properties props;
681         public static String routerFilePath;
682         protected static final Map<String, String> headers = new HashMap<String, String>();
683         public static MultivaluedMap<String, Object> headersMap;
684
685         private MRPublisherResponse pubResponse;
686
687         public MRPublisherResponse getPubResponse() {
688                 return pubResponse;
689         }
690
691         public void setPubResponse(MRPublisherResponse pubResponse) {
692                 this.pubResponse = pubResponse;
693         }
694
695         public static String getRouterFilePath() {
696                 return routerFilePath;
697         }
698
699         public static void setRouterFilePath(String routerFilePath) {
700                 MRSimplerBatchPublisher.routerFilePath = routerFilePath;
701         }
702
703         public Properties getProps() {
704                 return props;
705         }
706
707         public void setProps(Properties props) {
708                 this.props = props;
709         }
710
711         public String getProtocolFlag() {
712                 return protocolFlag;
713         }
714
715         public void setProtocolFlag(String protocolFlag) {
716                 this.protocolFlag = protocolFlag;
717         }
718
719         private void DME2Configue() throws Exception {
720                 try {
721
722                         latitude = props.getProperty("Latitude");
723                         longitude = props.getProperty("Longitude");
724                         version = props.getProperty("Version");
725                         serviceName = props.getProperty("ServiceName");
726                         env = props.getProperty("Environment");
727                         partner = props.getProperty("Partner");
728                         routeOffer = props.getProperty("routeOffer");
729                         subContextPath = props.getProperty("SubContextPath") + fTopic;
730                         
731                         protocol = props.getProperty("Protocol");
732                         methodType = props.getProperty("MethodType");
733                         dmeuser = props.getProperty("username");
734                         dmepassword = props.getProperty("password");
735                         contentType = props.getProperty("contenttype");
736                         handlers = props.getProperty("sessionstickinessrequired");
737                         routerFilePath = props.getProperty("DME2preferredRouterFilePath");
738
739                         /**
740                          * Changes to DME2Client url to use Partner for auto failover
741                          * between data centers When Partner value is not provided use the
742                          * routeOffer value for auto failover within a cluster
743                          */
744
745                         String partitionKey = props.getProperty("partition");
746
747                         if (partner != null && !partner.isEmpty()) {
748                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
749                                                 + partner;
750                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
751                                         url = url + "&partitionKey=" + partitionKey;
752                                 }
753                         } else if (routeOffer != null && !routeOffer.isEmpty()) {
754                                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
755                                                 + routeOffer;
756                                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
757                                         url = url + "&partitionKey=" + partitionKey;
758                                 }
759                         }
760
761                         DMETimeOuts = new HashMap<String, String>();
762                         DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
763                         DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
764                         DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
765                         DMETimeOuts.put("Content-Type", contentType);
766                         System.setProperty("AFT_LATITUDE", latitude);
767                         System.setProperty("AFT_LONGITUDE", longitude);
768                         System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
769                         // System.setProperty("DME2.DEBUG", "true");
770
771                         // SSL changes
772                         // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
773                         
774                         System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
775                         System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
776                         System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
777
778                         // SSL changes
779
780                         sender = new DME2Client(new URI(url), 5000L);
781
782                         sender.setAllowAllHttpReturnCodes(true);
783                         sender.setMethod(methodType);
784                         sender.setSubContext(subContextPath);
785                         sender.setCredentials(dmeuser, dmepassword);
786                         sender.setHeaders(DMETimeOuts);
787                         if (handlers != null &&handlers.equalsIgnoreCase("yes")) {
788                                 sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
789                                                 props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
790                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
791                                                 props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
792                                 sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
793                         } else {
794                                 sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
795                         }
796                 } catch (DME2Exception x) {
797                         getLog().warn(x.getMessage(), x);
798                         throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
799                 } catch (URISyntaxException x) {
800
801                         getLog().warn(x.getMessage(), x);
802                         throw new URISyntaxException(url, x.getMessage());
803                 } catch (Exception x) {
804
805                         getLog().warn(x.getMessage(), x);
806                         throw new IllegalArgumentException(x.getMessage());
807                 }
808         }
809
810         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
811                         boolean compress) throws MalformedURLException {
812                 super(hosts);
813
814                 if (topic == null || topic.length() < 1) {
815                         throw new IllegalArgumentException("A topic must be provided.");
816                 }
817
818                 fHostSelector = new HostSelector(hosts, null);
819                 fClosed = false;
820                 fTopic = topic;
821                 fMaxBatchSize = maxBatchSize;
822                 fMaxBatchAgeMs = maxBatchAgeMs;
823                 fCompress = compress;
824
825                 fPending = new LinkedBlockingQueue<>();
826                 fDontSendUntilMs = 0;
827                 fExec = new ScheduledThreadPoolExecutor(1);
828                 pubResponse = new MRPublisherResponse();
829
830         }
831
832         private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
833                         boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
834                 super(hosts);
835
836                 if (topic == null || topic.length() < 1) {
837                         throw new IllegalArgumentException("A topic must be provided.");
838                 }
839
840                 fHostSelector = new HostSelector(hosts, null);
841                 fClosed = false;
842                 fTopic = topic;
843                 fMaxBatchSize = maxBatchSize;
844                 fMaxBatchAgeMs = maxBatchAgeMs;
845                 fCompress = compress;
846                 threadOccuranceTime = httpThreadOccurnace;
847                 fPending = new LinkedBlockingQueue<>();
848                 fDontSendUntilMs = 0;
849                 fExec = new ScheduledThreadPoolExecutor(1);
850                 fExec.scheduleAtFixedRate(new Runnable() {
851                         @Override
852                         public void run() {
853                                 send(false);
854                         }
855                 }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
856                 pubResponse = new MRPublisherResponse();
857         }
858
859         private static class TimestampedMessage extends message {
860                 public TimestampedMessage(message m) {
861                         super(m);
862                         timestamp = Clock.now();
863                 }
864
865                 public final long timestamp;
866         }
867
868         public String getUsername() {
869                 return username;
870         }
871
872         public void setUsername(String username) {
873                 this.username = username;
874         }
875
876         public String getPassword() {
877                 return password;
878         }
879
880         public void setPassword(String password) {
881                 this.password = password;
882         }
883
884         public String getHost() {
885                 return host;
886         }
887
888         public void setHost(String host) {
889                 this.host = host;
890         }
891
892         public String getContentType() {
893                 return contentType;
894         }
895
896         public void setContentType(String contentType) {
897                 this.contentType = contentType;
898         }
899
900         public String getAuthKey() {
901                 return authKey;
902         }
903
904         public void setAuthKey(String authKey) {
905                 this.authKey = authKey;
906         }
907
908         public String getAuthDate() {
909                 return authDate;
910         }
911
912         public void setAuthDate(String authDate) {
913                 this.authDate = authDate;
914         }
915
916 }