+ private static final String TYPE_FIELD = "type";
+
+ /**
+ * Used to encode & decode JSON messages sent & received, respectively, on the
+ * internal DMaaP topic.
+ */
+ private final Gson gson = new Gson();
+
+ /**
+ * Maps a message subclass to its type.
+ */
+ private static final Map<Class<? extends Message>, String> class2type = new HashMap<>();
+
+ /**
+ * Maps a message type to the appropriate subclass.
+ */
+ private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
+
+ static {
+ class2type.put(Forward.class, "forward");
+ class2type.put(Heartbeat.class, "heartbeat");
+ class2type.put(Identification.class, "identification");
+ class2type.put(Leader.class, "leader");
+ class2type.put(Offline.class, "offline");
+ class2type.put(Query.class, "query");
+
+ class2type.forEach((clazz, type) -> type2class.put(type, clazz));
+ }