bump the version
[dmaap/messagerouter/msgrtr.git] / src / main / java / com / att / dmf / mr / resources / streamReaders / CambriaJsonStreamReader.java
1 /*******************************************************************************
2  *  ============LICENSE_START=======================================================
3  *  org.onap.dmaap
4  *  ================================================================================
5  *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11 *  
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  *  
19  *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
20  *  
21  *******************************************************************************/
22 package com.att.dmf.mr.resources.streamReaders;
23
24 import java.io.InputStream;
25
26 import javax.servlet.http.HttpServletResponse;
27
28 import org.json.JSONException;
29 import org.json.JSONObject;
30 import org.json.JSONTokener;
31
32 import com.att.dmf.mr.CambriaApiException;
33 import com.att.dmf.mr.backends.Publisher.message;
34 import com.att.dmf.mr.beans.LogDetails;
35 import com.att.dmf.mr.resources.CambriaEventSet.reader;
36
37 /**
38  * 
39  * @author anowarul.islam
40  *
41  */
42 public class CambriaJsonStreamReader implements reader {
43         private final JSONTokener fTokens;
44         private final boolean fIsList;
45         private long fCount;
46         private final String fDefPart;
47         public static final String kKeyField = "cambria.partition";
48
49         /**
50          * 
51          * @param is
52          * @param defPart
53          * @throws CambriaApiException
54          */
55         public CambriaJsonStreamReader(InputStream is, String defPart) throws CambriaApiException {
56                 try {
57                         fTokens = new JSONTokener(is);
58                         fCount = 0;
59                         fDefPart = defPart;
60
61                         final int c = fTokens.next();
62                         if (c == '[') {
63                                 fIsList = true;
64                         } else if (c == '{') {
65                                 fTokens.back();
66                                 fIsList = false;
67                         } else {
68                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expecting an array or an object.");
69                         }
70                 } catch (JSONException e) {
71                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
72                 }
73         }
74
75         @Override
76         public message next() throws CambriaApiException {
77                 try {
78                         if (!fTokens.more()) {
79                                 return null;
80                         }
81
82                         final int c = fTokens.next();
83                         
84                         
85                         if (fIsList) {
86                                 if (c == ']' || (fCount > 0 && c == 10))
87                                         return null;
88
89
90                                 if (fCount > 0 && c != ',' && c!= 10) {
91                                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST,
92                                                         "Expected ',' or closing ']' after last object.");
93                                 }
94
95                                 if (fCount == 0 && c != '{' && c!= 10  && c!=32) {
96                                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected { to start an object.");
97                                 }
98                         } else if (fCount != 0 || c != '{') {
99                                 throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, "Expected '{' to start an object.");
100                         }
101
102                         if (c == '{') {
103                                 fTokens.back();
104                         }
105                         final JSONObject o = new JSONObject(fTokens);
106                         fCount++;
107                         return new msg(o);
108                 } catch (JSONException e) {
109                         throw new CambriaApiException(HttpServletResponse.SC_BAD_REQUEST, e.getMessage());
110
111                 }
112         }
113
114         private class msg implements message {
115                 private final String fKey;
116                 private  String fMsg;
117                 private LogDetails logDetails;
118                 private boolean transactionEnabled;
119
120                 /**
121                  * constructor
122                  * 
123                  * @param o
124                  */
125                 
126                 
127                 
128                 public msg(JSONObject o) {
129                         String key = o.optString(kKeyField, fDefPart);
130                         if (key == null) {
131                                 key = "" + System.currentTimeMillis();
132                         }
133                         fKey = key;
134                                         
135                                 fMsg = o.toString().trim();
136                         
137                 }
138
139                 @Override
140                 public String getKey() {
141                         return fKey;
142                 }
143
144                 @Override
145                 public String getMessage() {
146                         return fMsg;
147                 }
148
149                 @Override
150                 public boolean isTransactionEnabled() {
151                         return transactionEnabled;
152                 }
153
154                 @Override
155                 public void setTransactionEnabled(boolean transactionEnabled) {
156                         this.transactionEnabled = transactionEnabled;
157                 }
158
159                 @Override
160                 public void setLogDetails(LogDetails logDetails) {
161                         this.logDetails = logDetails;
162                 }
163
164                 @Override
165                 public LogDetails getLogDetails() {
166                         return logDetails;
167                 }
168         }
169 }