Sonar issues review, fix properties constant name.
[dmaap/messagerouter/dmaapclient.git] / src / main / java / org / onap / dmaap / mr / client / impl / MRSimplerBatchPublisher.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Modifications Copyright © 2021 Orange.
8  *  ================================================================================
9  *  Licensed under the Apache License, Version 2.0 (the "License");
10  *  you may not use this file except in compliance with the License.
11  *  You may obtain a copy of the License at
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  *  ============LICENSE_END=========================================================
20  *
21  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  *
23  *******************************************************************************/
24
25 package org.onap.dmaap.mr.client.impl;
26
27 import com.att.aft.dme2.api.DME2Client;
28 import com.att.aft.dme2.api.DME2Exception;
29 import java.io.ByteArrayOutputStream;
30 import java.io.IOException;
31 import java.io.OutputStream;
32 import java.net.MalformedURLException;
33 import java.net.URI;
34 import java.net.URISyntaxException;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.LinkedList;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Properties;
41
42 import java.util.concurrent.LinkedBlockingQueue;
43 import java.util.concurrent.ScheduledThreadPoolExecutor;
44 import java.util.concurrent.TimeUnit;
45 import java.util.zip.GZIPOutputStream;
46 import javax.ws.rs.core.MultivaluedMap;
47 import org.apache.http.HttpException;
48 import org.apache.http.HttpStatus;
49 import org.json.JSONArray;
50 import org.json.JSONObject;
51 import org.json.JSONTokener;
52 import org.onap.dmaap.mr.client.DmaapClientConst;
53 import org.onap.dmaap.mr.client.HostSelector;
54 import org.onap.dmaap.mr.client.MRBatchingPublisher;
55 import org.onap.dmaap.mr.client.ProtocolType;
56 import org.onap.dmaap.mr.client.response.MRPublisherResponse;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
59
60 public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
61     private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
62
63     private static final String HEADER_DME2_EXCHANGE_REQUEST_HANDLERS = "AFT_DME2_EXCHANGE_REQUEST_HANDLERS";
64     private static final String HEADER_DME2_EXCHANGE_REPLY_HANDLERS = "AFT_DME2_EXCHANGE_REPLY_HANDLERS";
65     private static final String HEADER_DME2_REQ_TRACE_ON = "AFT_DME2_REQ_TRACE_ON";
66
67     private static final String CONTENT_TYPE_TEXT = "text/plain";
68
69     private static final String JSON_STATUS = "status";
70
71     public static class Builder {
72
73         public Builder againstUrls(Collection<String> baseUrls) {
74             fUrls = baseUrls;
75             return this;
76         }
77
78         public Builder againstUrlsOrServiceName(Collection<String> baseUrls, Collection<String> serviceName, String transportype) {
79             fUrls = baseUrls;
80             return this;
81         }
82
83         public Builder onTopic(String topic) {
84             fTopic = topic;
85             return this;
86         }
87
88         public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
89             fMaxBatchSize = maxBatchSize;
90             fMaxBatchAgeMs = maxBatchAgeMs;
91             return this;
92         }
93
94         public Builder compress(boolean compress) {
95             fCompress = compress;
96             return this;
97         }
98
99         public Builder httpThreadTime(int threadOccurrenceTime) {
100             this.threadOccurrenceTime = threadOccurrenceTime;
101             return this;
102         }
103
104         public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
105             fAllowSelfSignedCerts = allowSelfSignedCerts;
106             return this;
107         }
108
109         public Builder withResponse(boolean withResponse) {
110             fWithResponse = withResponse;
111             return this;
112         }
113
114         public MRSimplerBatchPublisher build() {
115             if (!fWithResponse) {
116                 try {
117                     return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
118                             fAllowSelfSignedCerts, threadOccurrenceTime);
119                 } catch (MalformedURLException e) {
120                     throw new IllegalArgumentException(e);
121                 }
122             } else {
123                 try {
124                     return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
125                             fAllowSelfSignedCerts, fMaxBatchSize);
126                 } catch (MalformedURLException e) {
127                     throw new IllegalArgumentException(e);
128                 }
129             }
130
131         }
132
133         private Collection<String> fUrls;
134         private String fTopic;
135         private int fMaxBatchSize = 100;
136
137         private long fMaxBatchAgeMs = 1000;
138         private boolean fCompress = false;
139         private int threadOccurrenceTime = 50;
140         private boolean fAllowSelfSignedCerts = false;
141         private boolean fWithResponse = false;
142
143     }
144
145     @Override
146     public int send(String partition, String msg) {
147         return send(new Message(partition, msg));
148     }
149
150     @Override
151     public int send(String msg) {
152         return send(new Message(null, msg));
153     }
154
155     @Override
156     public int send(Message msg) {
157         final LinkedList<Message> list = new LinkedList<>();
158         list.add(msg);
159         return send(list);
160     }
161
162     @Override
163     public synchronized int send(Collection<Message> msgs) {
164         if (fClosed) {
165             throw new IllegalStateException("The publisher was closed.");
166         }
167
168         for (Message userMsg : msgs) {
169             fPending.add(new TimestampedMessage(userMsg));
170         }
171         return getPendingMessageCount();
172     }
173
174     @Override
175     public synchronized int getPendingMessageCount() {
176         return fPending.size();
177     }
178
179     @Override
180     public void close() {
181         try {
182             final List<Message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
183             if (remains.isEmpty()) {
184                 getLog().warn("Closing publisher with {} messages unsent. Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.",
185                         remains.size());
186             }
187         } catch (InterruptedException e) {
188             getLog().warn("Possible message loss. " + e.getMessage(), e);
189             Thread.currentThread().interrupt();
190         } catch (IOException e) {
191             getLog().warn("Possible message loss. " + e.getMessage(), e);
192         }
193     }
194
195     @Override
196     public List<Message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
197         synchronized (this) {
198             fClosed = true;
199
200             // stop the background sender
201             fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
202             fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
203             fExec.shutdown();
204         }
205
206         final long now = Clock.now();
207         final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
208         final long timeoutAtMs = now + waitInMs;
209
210         while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
211             send(true);
212             Thread.sleep(250);
213         }
214
215         synchronized (this) {
216             final LinkedList<Message> result = new LinkedList<>();
217             fPending.drainTo(result);
218             return result;
219         }
220     }
221
222     /**
223      * Possibly send a batch to the MR server. This is called by the background
224      * thread and the close() method
225      *
226      * @param force
227      */
228     private synchronized void send(boolean force) {
229         if ((force || shouldSendNow()) && !sendBatch()) {
230             getLog().warn("Send failed, {} message to send.", fPending.size());
231             // note the time for back-off
232             fDontSendUntilMs = SF_WAIT_AFTER_ERROR + Clock.now();
233         }
234     }
235
236     private synchronized boolean shouldSendNow() {
237         boolean shouldSend = false;
238         if (!fPending.isEmpty()) {
239             final long nowMs = Clock.now();
240
241             shouldSend = (fPending.size() >= fMaxBatchSize);
242             if (!shouldSend) {
243                 final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
244                 shouldSend = sendAtMs <= nowMs;
245             }
246
247             // however, wait after an error
248             shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
249         }
250         return shouldSend;
251     }
252
253     /**
254      * Method to parse published JSON Objects and Arrays.
255      *
256      * @return JSONArray
257      */
258     private JSONArray parseJSON() {
259         JSONArray jsonArray = new JSONArray();
260         for (TimestampedMessage m : fPending) {
261             JSONTokener jsonTokener = new JSONTokener(m.fMsg);
262             JSONObject jsonObject = null;
263             JSONArray tempjsonArray = null;
264             final char firstChar = jsonTokener.next();
265             jsonTokener.back();
266             if ('[' == firstChar) {
267                 tempjsonArray = new JSONArray(jsonTokener);
268                 for (int i = 0; i < tempjsonArray.length(); i++) {
269                     jsonArray.put(tempjsonArray.getJSONObject(i));
270                 }
271             } else {
272                 jsonObject = new JSONObject(jsonTokener);
273                 jsonArray.put(jsonObject);
274             }
275
276         }
277         return jsonArray;
278     }
279
280     private void logTime(long startMs, String dmeResponse) {
281         if (getLog().isInfoEnabled()) {
282             getLog().info("MR reply ok ({} ms):{}", (Clock.now() - startMs), dmeResponse);
283         }
284     }
285
286     private void logSendMessage(int nbMessage, String dest, long time) {
287         if (getLog().isInfoEnabled()) {
288             getLog().info("sending {} msgs to {}. Oldest: {} ms", nbMessage, dest, time);
289         }
290     }
291
292     private synchronized boolean sendBatch() {
293         // it's possible for this call to be made with an empty list. in this
294         // case, just return.
295         if (fPending.isEmpty()) {
296             return true;
297         }
298
299         final long nowMs = Clock.now();
300
301         if (this.fHostSelector != null) {
302             host = this.fHostSelector.selectBaseHost();
303         }
304
305         final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty(DmaapClientConst.PROTOCOL),
306                 props.getProperty(DmaapClientConst.PARTITION));
307
308         try {
309
310             final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
311             OutputStream os = baseStream;
312             final String contentType = props.getProperty(DmaapClientConst.CONTENT_TYPE);
313             if (contentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
314                 JSONArray jsonArray = parseJSON();
315                 os.write(jsonArray.toString().getBytes());
316                 os.close();
317
318             } else if (contentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
319                 for (TimestampedMessage m : fPending) {
320                     os.write(m.fMsg.getBytes());
321                     os.write('\n');
322                 }
323                 os.close();
324             } else if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
325                     || (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
326                 if (contentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
327                     os = new GZIPOutputStream(baseStream);
328                 }
329                 for (TimestampedMessage m : fPending) {
330
331                     os.write(("" + m.fPartition.length()).getBytes());
332                     os.write('.');
333                     os.write(("" + m.fMsg.length()).getBytes());
334                     os.write('.');
335                     os.write(m.fPartition.getBytes());
336                     os.write(m.fMsg.getBytes());
337                     os.write('\n');
338                 }
339                 os.close();
340             } else {
341                 for (TimestampedMessage m : fPending) {
342                     os.write(m.fMsg.getBytes());
343
344                 }
345                 os.close();
346             }
347
348             final long startMs = Clock.now();
349             if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
350
351                 configureDME2();
352
353                 this.wait(5);
354                 if (fPending.peek() != null) {
355                     logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
356                 }
357                 sender.setPayload(os.toString());
358                 String dmeResponse = sender.sendAndWait(5000L);
359
360                 logTime(startMs, dmeResponse);
361                 fPending.clear();
362                 return true;
363             }
364
365             if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
366                 if (fPending.peek() != null) {
367                     logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
368                 }
369                 final JSONObject result =
370                         postAuth(new PostAuthDataObject().setPath(httpurl).setData(baseStream.toByteArray())
371                                 .setContentType(contentType).setAuthKey(authKey).setAuthDate(authDate)
372                                 .setUsername(username).setPassword(password).setProtocolFlag(protocolFlag));
373                 // Here we are checking for error response. If HTTP status
374                 // code is not within the http success response code
375                 // then we consider this as error and return false
376                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
377                     return false;
378                 }
379                 logTime(startMs, result.toString());
380                 fPending.clear();
381                 return true;
382             }
383
384             if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
385                 if (fPending.peek() != null) {
386                     logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
387                 }
388                 final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
389                         protocolFlag);
390
391                 // Here we are checking for error response. If HTTP status
392                 // code is not within the http success response code
393                 // then we consider this as error and return false
394                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
395                     return false;
396                 }
397                 logTime(startMs, result.toString());
398                 fPending.clear();
399                 return true;
400             }
401
402             if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
403                 if (fPending.peek() != null) {
404                     logSendMessage(fPending.size(), httpurl, nowMs - fPending.peek().timestamp);
405                 }
406                 final JSONObject result = postNoAuth(httpurl, baseStream.toByteArray(), contentType);
407
408                 // Here we are checking for error response. If HTTP status
409                 // code is not within the http success response code
410                 // then we consider this as error and return false
411                 if (result.getInt(JSON_STATUS) < 200 || result.getInt(JSON_STATUS) > 299) {
412                     return false;
413                 }
414                 logTime(startMs, result.toString());
415                 fPending.clear();
416                 return true;
417             }
418         } catch (InterruptedException e) {
419             getLog().warn("Interrupted!", e);
420             // Restore interrupted state...
421             Thread.currentThread().interrupt();
422         } catch (Exception x) {
423             getLog().warn(x.getMessage(), x);
424         }
425         return false;
426     }
427
428     public synchronized MRPublisherResponse sendBatchWithResponse() {
429         // it's possible for this call to be made with an empty list. in this
430         // case, just return.
431         if (fPending.isEmpty()) {
432             pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
433             pubResponse.setResponseMessage("No Messages to send");
434             return pubResponse;
435         }
436
437         final long nowMs = Clock.now();
438
439         host = this.fHostSelector.selectBaseHost();
440
441         final String httpUrl = MRConstants.makeUrl(host, fTopic, props.getProperty(DmaapClientConst.PROTOCOL),
442                 props.getProperty(DmaapClientConst.PARTITION));
443         OutputStream os = null;
444         try (ByteArrayOutputStream baseStream = new ByteArrayOutputStream()) {
445             os = baseStream;
446             final String propsContentType = props.getProperty(DmaapClientConst.CONTENT_TYPE);
447             if (propsContentType.equalsIgnoreCase(MRFormat.JSON.toString())) {
448                 JSONArray jsonArray = parseJSON();
449                 os.write(jsonArray.toString().getBytes());
450             } else if (propsContentType.equalsIgnoreCase(CONTENT_TYPE_TEXT)) {
451                 for (TimestampedMessage m : fPending) {
452                     os.write(m.fMsg.getBytes());
453                     os.write('\n');
454                 }
455             } else if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA.toString())
456                     || (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString()))) {
457                 if (propsContentType.equalsIgnoreCase(MRFormat.CAMBRIA_ZIP.toString())) {
458                     os = new GZIPOutputStream(baseStream);
459                 }
460                 for (TimestampedMessage m : fPending) {
461                     os.write(("" + m.fPartition.length()).getBytes());
462                     os.write('.');
463                     os.write(("" + m.fMsg.length()).getBytes());
464                     os.write('.');
465                     os.write(m.fPartition.getBytes());
466                     os.write(m.fMsg.getBytes());
467                     os.write('\n');
468                 }
469                 os.close();
470             } else {
471                 for (TimestampedMessage m : fPending) {
472                     os.write(m.fMsg.getBytes());
473
474                 }
475             }
476
477             final long startMs = Clock.now();
478             if (ProtocolType.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
479
480                 try {
481                     configureDME2();
482
483                     this.wait(5);
484
485                     if (fPending.peek() != null) {
486                         logSendMessage(fPending.size(), url + subContextPath, nowMs - fPending.peek().timestamp);
487                     }
488                     sender.setPayload(os.toString());
489
490                     String dmeResponse = sender.sendAndWait(5000L);
491
492                     pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
493
494                     if (Integer.parseInt(pubResponse.getResponseCode()) < 200
495                             || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
496
497                         return pubResponse;
498                     }
499                     final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
500                     getLog().info(logLine);
501                     fPending.clear();
502
503                 } catch (DME2Exception x) {
504                     getLog().warn(x.getMessage(), x);
505                     pubResponse.setResponseCode(x.getErrorCode());
506                     pubResponse.setResponseMessage(x.getErrorMessage());
507                 } catch (URISyntaxException x) {
508
509                     getLog().warn(x.getMessage(), x);
510                     pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
511                     pubResponse.setResponseMessage(x.getMessage());
512                 } catch (InterruptedException e) {
513                     throw e;
514                 } catch (Exception x) {
515
516                     pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
517                     pubResponse.setResponseMessage(x.getMessage());
518                     logger.error("exception: ", x);
519
520                 }
521
522                 return pubResponse;
523             }
524
525             if (ProtocolType.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
526                 if (fPending.peek() != null) {
527                     logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
528                 }
529                 final String result = postAuthwithResponse(httpUrl, baseStream.toByteArray(), contentType, authKey,
530                         authDate, username, password, protocolFlag);
531                 // Here we are checking for error response. If HTTP status
532                 // code is not within the http success response code
533                 // then we consider this as error and return false
534
535                 pubResponse = createMRPublisherResponse(result, pubResponse);
536
537                 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
538                         || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
539
540                     return pubResponse;
541                 }
542
543                 logTime(startMs, result);
544                 fPending.clear();
545                 return pubResponse;
546             }
547
548             if (ProtocolType.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
549                 if (fPending.peek() != null) {
550                     logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
551                 }
552                 final String result = postWithResponse(httpUrl, baseStream.toByteArray(), contentType, username,
553                         password, protocolFlag);
554
555                 // Here we are checking for error response. If HTTP status
556                 // code is not within the http success response code
557                 // then we consider this as error and return false
558                 pubResponse = createMRPublisherResponse(result, pubResponse);
559
560                 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
561                         || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
562
563                     return pubResponse;
564                 }
565
566                 final String logLine = String.valueOf((Clock.now() - startMs));
567                 getLog().info(logLine);
568                 fPending.clear();
569                 return pubResponse;
570             }
571
572             if (ProtocolType.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
573                 if (fPending.peek() != null) {
574                     logSendMessage(fPending.size(), httpUrl, nowMs - fPending.peek().timestamp);
575                 }
576                 final String result = postNoAuthWithResponse(httpUrl, baseStream.toByteArray(), contentType);
577
578                 // Here we are checking for error response. If HTTP status
579                 // code is not within the http success response code
580                 // then we consider this as error and return false
581                 pubResponse = createMRPublisherResponse(result, pubResponse);
582
583                 if (Integer.parseInt(pubResponse.getResponseCode()) < 200
584                         || Integer.parseInt(pubResponse.getResponseCode()) > 299) {
585
586                     return pubResponse;
587                 }
588
589                 final String logLine = String.valueOf((Clock.now() - startMs));
590                 getLog().info(logLine);
591                 fPending.clear();
592                 return pubResponse;
593             }
594         } catch (IllegalArgumentException | HttpException x) {
595             getLog().warn(x.getMessage(), x);
596             pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
597             pubResponse.setResponseMessage(x.getMessage());
598
599         } catch (IOException x) {
600             getLog().warn(x.getMessage(), x);
601             pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
602             pubResponse.setResponseMessage(x.getMessage());
603         } catch (InterruptedException e) {
604             getLog().warn("Interrupted!", e);
605             // Restore interrupted state...
606             Thread.currentThread().interrupt();
607         } catch (Exception x) {
608             getLog().warn(x.getMessage(), x);
609
610             pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
611             pubResponse.setResponseMessage(x.getMessage());
612
613         } finally {
614             if (!fPending.isEmpty()) {
615                 getLog().warn("Send failed, " + fPending.size() + " message to send.");
616                 pubResponse.setPendingMsgs(fPending.size());
617             }
618             if (os != null) {
619                 try {
620                     os.close();
621                 } catch (Exception x) {
622                     getLog().warn(x.getMessage(), x);
623                     pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
624                     pubResponse.setResponseMessage("Error in closing Output Stream");
625                 }
626             }
627         }
628
629         return pubResponse;
630     }
631
632     public MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
633
634         if (reply.isEmpty()) {
635
636             mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
637             mrPubResponse.setResponseMessage("Please verify the Producer properties");
638         } else if (reply.startsWith("{")) {
639             JSONObject jObject = new JSONObject(reply);
640             if (jObject.has("message") && jObject.has(JSON_STATUS)) {
641                 String message = jObject.getString("message");
642                 if (null != message) {
643                     mrPubResponse.setResponseMessage(message);
644                 }
645                 mrPubResponse.setResponseCode(Integer.toString(jObject.getInt(JSON_STATUS)));
646             } else {
647                 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
648                 mrPubResponse.setResponseMessage(reply);
649             }
650         } else if (reply.startsWith("<")) {
651             String responseCode = getHTTPErrorResponseCode(reply);
652             if (responseCode.contains("403")) {
653                 responseCode = "403";
654             }
655             mrPubResponse.setResponseCode(responseCode);
656             mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
657         }
658
659         return mrPubResponse;
660     }
661
662     private final String fTopic;
663     private final int fMaxBatchSize;
664     private final long fMaxBatchAgeMs;
665     private final boolean fCompress;
666     private int threadOccurrenceTime;
667     private boolean fClosed;
668     private String username;
669     private String password;
670     private String host;
671
672     // host selector
673     private HostSelector fHostSelector = null;
674
675     private final LinkedBlockingQueue<TimestampedMessage> fPending;
676     private long fDontSendUntilMs;
677     private final ScheduledThreadPoolExecutor fExec;
678
679     private String latitude;
680     private String longitude;
681     private String version;
682     private String serviceName;
683     private String env;
684     private String partner;
685     private String routeOffer;
686     private String subContextPath;
687     private String protocol;
688     private String methodType;
689     private String url;
690     private String dmeuser;
691     private String dmepassword;
692     private String contentType;
693     private static final long SF_WAIT_AFTER_ERROR = 10000;
694     private HashMap<String, String> DMETimeOuts;
695     private DME2Client sender;
696     public String protocolFlag = ProtocolType.DME2.getValue();
697     private String authKey;
698     private String authDate;
699     private String handlers;
700     private Properties props;
701     public static String routerFilePath;
702     protected static final Map<String, String> headers = new HashMap<String, String>();
703     public static MultivaluedMap<String, Object> headersMap;
704
705     private MRPublisherResponse pubResponse;
706
707     public MRPublisherResponse getPubResponse() {
708         return pubResponse;
709     }
710
711     public void setPubResponse(MRPublisherResponse pubResponse) {
712         this.pubResponse = pubResponse;
713     }
714
715     public static String getRouterFilePath() {
716         return routerFilePath;
717     }
718
719     public static void setRouterFilePath(String routerFilePath) {
720         MRSimplerBatchPublisher.routerFilePath = routerFilePath;
721     }
722
723     public Properties getProps() {
724         return props;
725     }
726
727     public void setProps(Properties props) {
728         this.props = props;
729         setClientConfig(DmaapClientUtil.getClientConfig(props));
730     }
731
732     public String getProtocolFlag() {
733         return protocolFlag;
734     }
735
736     public void setProtocolFlag(String protocolFlag) {
737         this.protocolFlag = protocolFlag;
738     }
739
740     private void configureDME2() throws Exception {
741         try {
742
743             latitude = props.getProperty(DmaapClientConst.LATITUDE);
744             longitude = props.getProperty(DmaapClientConst.LONGITUDE);
745             version = props.getProperty(DmaapClientConst.VERSION);
746             serviceName = props.getProperty(DmaapClientConst.SERVICE_NAME);
747             env = props.getProperty(DmaapClientConst.ENVIRONMENT);
748             partner = props.getProperty(DmaapClientConst.PARTNER);
749             routeOffer = props.getProperty(DmaapClientConst.ROUTE_OFFER);
750             subContextPath = props.getProperty(DmaapClientConst.SUB_CONTEXT_PATH) + fTopic;
751
752             protocol = props.getProperty(DmaapClientConst.PROTOCOL);
753             methodType = props.getProperty(DmaapClientConst.METHOD_TYPE);
754             dmeuser = props.getProperty(DmaapClientConst.USERNAME);
755             dmepassword = props.getProperty(DmaapClientConst.PASSWORD);
756             contentType = props.getProperty(DmaapClientConst.CONTENT_TYPE);
757             handlers = props.getProperty(DmaapClientConst.SESSION_STICKINESS_REQUIRED);
758
759             MRSimplerBatchPublisher.routerFilePath = props.getProperty(DmaapClientConst.DME2PREFERRED_ROUTER_FILE_PATH);
760
761             /*
762              * Changes to DME2Client url to use Partner for auto failover
763              * between data centers When Partner value is not provided use the
764              * routeOffer value for auto failover within a cluster
765              */
766
767             String partitionKey = props.getProperty(DmaapClientConst.PARTITION);
768
769             if (partner != null && !partner.isEmpty()) {
770                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
771                         + partner;
772                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
773                     url = url + "&partitionKey=" + partitionKey;
774                 }
775             } else if (routeOffer != null && !routeOffer.isEmpty()) {
776                 url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
777                         + routeOffer;
778                 if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
779                     url = url + "&partitionKey=" + partitionKey;
780                 }
781             }
782
783             DMETimeOuts = new HashMap<>();
784             DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty(DmaapClientConst.AFT_DME2_EP_READ_TIMEOUT_MS));
785             DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty(DmaapClientConst.AFT_DME2_ROUNDTRIP_TIMEOUT_MS));
786             DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty(DmaapClientConst.AFT_DME2_EP_CONN_TIMEOUT));
787             DMETimeOuts.put("Content-Type", contentType);
788             System.setProperty("AFT_LATITUDE", latitude);
789             System.setProperty("AFT_LONGITUDE", longitude);
790             System.setProperty("AFT_ENVIRONMENT", props.getProperty(DmaapClientConst.AFT_ENVIRONMENT));
791             // System.setProperty("DME2.DEBUG", "true");
792
793             // SSL changes
794             // System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
795
796             System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
797             System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
798             System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
799
800             // SSL changes
801
802             sender = new DME2Client(new URI(url), 5000L);
803
804             sender.setAllowAllHttpReturnCodes(true);
805             sender.setMethod(methodType);
806             sender.setSubContext(subContextPath);
807             sender.setCredentials(dmeuser, dmepassword);
808             sender.setHeaders(DMETimeOuts);
809             if ("yes".equalsIgnoreCase(handlers)) {
810                 sender.addHeader(HEADER_DME2_EXCHANGE_REQUEST_HANDLERS,
811                         props.getProperty(DmaapClientConst.AFT_DME2_EXCHANGE_REQUEST_HANDLERS));
812                 sender.addHeader(HEADER_DME2_EXCHANGE_REPLY_HANDLERS,
813                         props.getProperty(DmaapClientConst.AFT_DME2_EXCHANGE_REPLY_HANDLERS));
814                 sender.addHeader(HEADER_DME2_REQ_TRACE_ON, props.getProperty(DmaapClientConst.AFT_DME2_REQ_TRACE_ON));
815             } else {
816                 sender.addHeader(HEADER_DME2_EXCHANGE_REQUEST_HANDLERS, "com.att.nsa.mr.dme.client.HeaderReplyHandler");
817             }
818         } catch (DME2Exception x) {
819             getLog().warn(x.getMessage(), x);
820             throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
821         } catch (URISyntaxException x) {
822
823             getLog().warn(x.getMessage(), x);
824             throw new URISyntaxException(url, x.getMessage());
825         } catch (Exception x) {
826
827             getLog().warn(x.getMessage(), x);
828             throw new IllegalArgumentException(x.getMessage());
829         }
830     }
831
832     private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
833                                     boolean compress) throws MalformedURLException {
834         super(hosts);
835
836         if (topic == null || topic.length() < 1) {
837             throw new IllegalArgumentException("A topic must be provided.");
838         }
839
840         fHostSelector = new HostSelector(hosts, null);
841         fClosed = false;
842         fTopic = topic;
843         fMaxBatchSize = maxBatchSize;
844         fMaxBatchAgeMs = maxBatchAgeMs;
845         fCompress = compress;
846
847         fPending = new LinkedBlockingQueue<>();
848         fDontSendUntilMs = 0;
849         fExec = new ScheduledThreadPoolExecutor(1);
850         pubResponse = new MRPublisherResponse();
851
852     }
853
854     private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
855                                     boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurrence) throws MalformedURLException {
856         super(hosts);
857
858         if (topic == null || topic.length() < 1) {
859             throw new IllegalArgumentException("A topic must be provided.");
860         }
861
862         fHostSelector = new HostSelector(hosts, null);
863         fClosed = false;
864         fTopic = topic;
865         fMaxBatchSize = maxBatchSize;
866         fMaxBatchAgeMs = maxBatchAgeMs;
867         fCompress = compress;
868         threadOccurrenceTime = httpThreadOccurrence;
869         fPending = new LinkedBlockingQueue<>();
870         fDontSendUntilMs = 0;
871         fExec = new ScheduledThreadPoolExecutor(1);
872         fExec.scheduleAtFixedRate(new Runnable() {
873             @Override
874             public void run() {
875                 send(false);
876             }
877         }, 100, threadOccurrenceTime, TimeUnit.MILLISECONDS);
878         pubResponse = new MRPublisherResponse();
879     }
880
881     private static class TimestampedMessage extends Message {
882         public TimestampedMessage(Message message) {
883             super(message);
884             timestamp = Clock.now();
885         }
886
887         public final long timestamp;
888     }
889
890     public String getUsername() {
891         return username;
892     }
893
894     public void setUsername(String username) {
895         this.username = username;
896     }
897
898     public String getPassword() {
899         return password;
900     }
901
902     public void setPassword(String password) {
903         this.password = password;
904     }
905
906     public String getHost() {
907         return host;
908     }
909
910     public void setHost(String host) {
911         this.host = host;
912     }
913
914     public String getContentType() {
915         return contentType;
916     }
917
918     public void setContentType(String contentType) {
919         this.contentType = contentType;
920     }
921
922     public String getAuthKey() {
923         return authKey;
924     }
925
926     public void setAuthKey(String authKey) {
927         this.authKey = authKey;
928     }
929
930     public String getAuthDate() {
931         return authDate;
932     }
933
934     public void setAuthDate(String authDate) {
935         this.authDate = authDate;
936     }
937
938 }