Comet message to WEB after delete package.
authornancylizi <li.zi30@zte.com.cn>
Thu, 27 Oct 2016 08:00:31 +0000 (16:00 +0800)
committernancylizi <li.zi30@zte.com.cn>
Thu, 27 Oct 2016 08:00:31 +0000 (16:00 +0800)
Change-Id: I03b04aece09b2943c8a80ef56899b688687ab11c
Issue-id:TOSCA-153
Signed-off-by: nancylizi <li.zi30@zte.com.cn>
catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/CatalogApp.java
catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdException.java [new file with mode: 0644]
catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdService.java [new file with mode: 0644]
catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdServlet.java [new file with mode: 0644]
catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdUtil.java [new file with mode: 0644]
catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/common/CommonConstant.java
catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/wrapper/PackageWrapper.java

index 6396b54..88e3b16 100644 (file)
@@ -28,6 +28,11 @@ import io.dropwizard.setup.Environment;
 import io.swagger.jaxrs.config.BeanConfig;
 import io.swagger.jaxrs.listing.ApiListingResource;
 
+import java.util.EnumSet;
+
+import javax.servlet.DispatcherType;
+
+import org.eclipse.jetty.servlets.CrossOriginFilter;
 import org.glassfish.jersey.media.multipart.MultiPartFeature;
 import org.openo.commontosca.catalog.common.Config;
 import org.openo.commontosca.catalog.common.HttpServerAddrConfig;
@@ -110,7 +115,7 @@ public class CatalogApp extends Application<CatalogAppConfiguration> {
     environment.jersey().register(MultiPartFeature.class);
 
     initSwaggerConfig(environment, configuration);
-    //    initCometd(environment);
+    initCometd(environment);
     Config.setConfigration(configuration);
     initService();
     LOGGER.info("Initialize catalogue finished.");
@@ -149,22 +154,22 @@ public class CatalogApp extends Application<CatalogAppConfiguration> {
     registerCatalogService.start();
   }
 
-//   /**
-//   * initialize cometd server.
-//   * 
-//   * @param environment environment information
-//   */
-//  private void initCometd(Environment environment) {
-//    // add filter
-//    environment.getApplicationContext().addFilter(CrossOriginFilter.class,
-//        "/api/nsoccataloguenotification/v1/*",
-//        EnumSet.of(DispatcherType.REQUEST, DispatcherType.ERROR));
-//    // add servlet
-//    environment.getApplicationContext()
-//        .addServlet("org.cometd.server.CometDServlet", "/api/nsoccataloguenotification/v1/*")
-//        .setInitOrder(1);
-//    // add servlet
-//    environment.getApplicationContext()
-//        .addServlet("CometdServlet", "/api/nsoccataloguenotification/v1").setInitOrder(2);
-//  }
+  /**
+   * initialize cometd server.
+   * 
+   * @param environment environment information
+   */
+  private void initCometd(Environment environment) {
+    // add filter
+    environment.getApplicationContext().addFilter(CrossOriginFilter.class,
+        "/openoapi/catalog/v1/catalognotification/*", EnumSet.of(DispatcherType.REQUEST, DispatcherType.ERROR));
+    // add servlet
+    environment.getApplicationContext()
+        .addServlet("org.cometd.server.CometDServlet", "/openoapi/catalog/v1/catalognotification/*")
+        .setInitOrder(1);
+    // add servlet
+    environment.getApplicationContext()
+        .addServlet("org.openo.commontosca.catalog.cometd.CometdServlet", "/openoapi/catalog/v1/catalognotification")
+        .setInitOrder(2);
+  }
 }
diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdException.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdException.java
new file mode 100644 (file)
index 0000000..7237a37
--- /dev/null
@@ -0,0 +1,34 @@
+/**
+ * Copyright 2016 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openo.commontosca.catalog.cometd;
+
+public class CometdException extends Exception {
+  public static int ERROR_CODE_BAYEUX = 0;
+  public static int ERROR_CODE_PARAM_ERROR = 1;
+  public static int ERROR_CODE_SESSION_ERROR = 2;
+  public static int ERROR_CODE_SUBSCRIBE_TIMEOUT = 3;
+  private int errorCode = -1;
+
+  public CometdException(Throwable e1) {
+    super(e1);
+  }
+
+  public CometdException(int code, String message) {
+    super(message);
+    this.errorCode = code;
+  }
+}
diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdService.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdService.java
new file mode 100644 (file)
index 0000000..a54b9ab
--- /dev/null
@@ -0,0 +1,104 @@
+/**
+ * Copyright 2016 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openo.commontosca.catalog.cometd;
+
+import org.cometd.bayeux.server.BayeuxServer;
+import org.cometd.bayeux.server.ConfigurableServerChannel;
+import org.cometd.bayeux.server.LocalSession;
+import org.cometd.bayeux.server.ServerChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class CometdService {
+  private static final Logger LOGGER = LoggerFactory.getLogger(CometdService.class);
+  public static String DATA_UPLOAD_CHANNEL = "/upload/data";
+  public static String SNMPTRAP_CHANNEL = "/upload/snmptrap";
+
+  private BayeuxServer bayeux;
+  private LocalSession session;
+
+  private static String bayeuxChannel = "/meta/";
+
+  private static String serviceChannel = "/service/";
+
+  private static CometdService service = new CometdService();
+
+  public static CometdService getInstance() {
+    return service;
+  }
+
+  public void publish(String channel, Object message) throws CometdException {
+    if (bayeux == null) {
+      this.bayeux = CometdUtil.getBayeuxServer();
+      checkBayeuxServer();
+      this.session = this.bayeux.newLocalSession("openo_catalog_local_session~");
+      this.session.handshake();
+    }
+    String jsonMsg;
+    try {
+      // jsonMsg = CometdUtil.convertBean2Json(message);
+      jsonMsg = CometdUtil.toJson(message);
+      LOGGER.debug("upload json=" + jsonMsg);
+    } catch (IOException e) {
+      throw new CometdException(e);
+    }
+
+    checkAndInit(channel);
+    ServerChannel serverChannel = this.bayeux.getChannel(channel);
+    serverChannel.publish(this.session, jsonMsg);
+  }
+
+  private void checkAndInit(String channel) throws CometdException {
+    checkBayeuxServer();
+    checkSession();
+    checkChannel(channel);
+    bayeux.createChannelIfAbsent(channel, new ConfigurableServerChannel.Initializer() {
+      public void configureChannel(ConfigurableServerChannel channel) {
+        channel.setPersistent(true);
+        channel.setLazy(true);
+      }
+    });
+  }
+
+  private void checkBayeuxServer() throws CometdException {
+    if (bayeux == null) {
+      throw new CometdException(CometdException.ERROR_CODE_BAYEUX, "bayeux is null.");
+    }
+  }
+
+  private void checkSession() throws CometdException {
+    if (session == null || !session.isConnected()) {
+      throw new CometdException(CometdException.ERROR_CODE_SESSION_ERROR, "session is invalid.");
+    }
+  }
+
+  private void checkChannel(String channel) throws CometdException {
+    if (channel == null || "".equals(channel)) {
+      throw new CometdException(CometdException.ERROR_CODE_PARAM_ERROR, "channel is null.");
+    }
+    if (channel.startsWith(bayeuxChannel)) {
+      throw new CometdException(CometdException.ERROR_CODE_PARAM_ERROR,
+          "channel [" + channel + "] is bayeuxChannel.");
+    }
+    if (channel.startsWith(serviceChannel)) {
+      throw new CometdException(CometdException.ERROR_CODE_PARAM_ERROR,
+          "channel [" + channel + "] is serviceChannel.");
+    }
+  }
+}
diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdServlet.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdServlet.java
new file mode 100644 (file)
index 0000000..ace9545
--- /dev/null
@@ -0,0 +1,95 @@
+/**
+ * Copyright 2016 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openo.commontosca.catalog.cometd;
+
+import org.cometd.annotation.Listener;
+import org.cometd.annotation.ServerAnnotationProcessor;
+import org.cometd.annotation.Service;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.server.BayeuxServer;
+import org.cometd.bayeux.server.ServerChannel;
+import org.cometd.bayeux.server.ServerMessage;
+import org.cometd.bayeux.server.ServerSession;
+import org.cometd.server.BayeuxServerImpl;
+import org.cometd.server.authorizer.GrantAuthorizer;
+import org.cometd.server.ext.AcknowledgedMessagesExtension;
+import org.cometd.server.ext.TimesyncExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import javax.servlet.GenericServlet;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.UnavailableException;
+import javax.servlet.http.HttpServletResponse;
+
+
+public class CometdServlet extends GenericServlet {
+  private static final long serialVersionUID = 8807005039926977330L;
+
+  private static final Logger logger = LoggerFactory.getLogger(CometdServlet.class);
+
+  @Override
+  public void init() throws ServletException {
+    super.init();
+
+    final BayeuxServerImpl bayeux =
+        (BayeuxServerImpl) getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);
+    if (bayeux == null) {
+      throw new UnavailableException("No BayeuxServer!");
+    }
+    // Create extensions
+    bayeux.addExtension(new TimesyncExtension());
+    bayeux.addExtension(new AcknowledgedMessagesExtension());
+
+    // Allow anybody to handshake
+    bayeux.getChannel(ServerChannel.META_HANDSHAKE).addAuthorizer(GrantAuthorizer.GRANT_PUBLISH);
+
+    ServerAnnotationProcessor processor = new ServerAnnotationProcessor(bayeux);
+    processor.process(new CatalogComet());
+
+    CometdUtil.setBayeuxServer(bayeux);
+  }
+
+  @Service("catalog")
+  public static class CatalogComet {
+    @Listener("/meta/subscribe")
+    public void catalogSubscribe(ServerSession session, ServerMessage message) {
+      logger.info("Catalog Subscribe from " + session + " for "
+          + message.get(Message.SUBSCRIPTION_FIELD));
+    }
+
+    @Listener("/meta/unsubscribe")
+    public void catalogUnsubscribe(ServerSession session, ServerMessage message) {
+      logger.info("Catalog Unsubscribe from " + session + " for "
+          + message.get(Message.SUBSCRIPTION_FIELD));
+    }
+
+    @Listener("/meta/*")
+    public void catalogMeta(ServerSession session, ServerMessage message) {
+      logger.debug(message.toString());
+    }
+  }
+
+  @Override
+  public void service(ServletRequest servletRequest, ServletResponse servletResponse)
+      throws ServletException, IOException {
+    ((HttpServletResponse) servletResponse).sendError(503);
+  }
+}
diff --git a/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdUtil.java b/catalog-core/catalog-mgr/src/main/java/org/openo/commontosca/catalog/cometd/CometdUtil.java
new file mode 100644 (file)
index 0000000..efb2d35
--- /dev/null
@@ -0,0 +1,60 @@
+/**
+ * Copyright 2016 ZTE Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openo.commontosca.catalog.cometd;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.cometd.server.BayeuxServerImpl;
+
+import com.google.gson.Gson;
+
+public class CometdUtil {
+  private static BayeuxServerImpl bayeuxServer = null;
+
+  public static BayeuxServerImpl getBayeuxServer() {
+    return bayeuxServer;
+  }
+
+  public static void setBayeuxServer(BayeuxServerImpl bayeux) {
+    bayeuxServer = bayeux;
+  }
+
+//  public static String convertBean2Json(Object object) throws IOException {
+//    ObjectMapper mapper = new ObjectMapper();
+//    return mapper.writeValueAsString(object);
+//  }
+//
+//  public static Map convertJson2Map(String jsonStr) throws IOException {
+//    ObjectMapper objectMapper = new ObjectMapper();
+//    return objectMapper.readValue(jsonStr, Map.class);
+//  }
+  public static <T> T fromJson(String jsonString, Class<T> templateClass) throws IOException {
+    Gson gson = new Gson();
+    return gson.fromJson(jsonString, templateClass);
+  }
+
+  /**
+   * gson to json.
+   * @param template class name
+   * @return String
+   */
+  public static <T> String toJson(T template) throws IOException {
+    Gson gson = new Gson();
+    return gson.toJson(template);
+  }
+}
index a2d5752..8250c94 100644 (file)
@@ -45,4 +45,6 @@ public class CommonConstant {
   public static final String HTTP_HEADER_CONTENT_RANGE = "Content-Range";
   
   public static final  String CATALOG_CSAR_DIR_NAME = "/csar";
+  
+  public static final String COMETD_CHANNEL_PACKAGE_DELETE = "/package/delete";
 }
index 8a88253..247446f 100644 (file)
@@ -16,6 +16,9 @@
 package org.openo.commontosca.catalog.wrapper;
 
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.openo.commontosca.catalog.cometd.CometdException;
+import org.openo.commontosca.catalog.cometd.CometdService;
+import org.openo.commontosca.catalog.cometd.CometdUtil;
 import org.openo.commontosca.catalog.common.CommonConstant;
 import org.openo.commontosca.catalog.common.HttpServerPathConfig;
 import org.openo.commontosca.catalog.common.RestUtil;
@@ -42,6 +45,8 @@ import java.io.InputStream;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
@@ -197,6 +202,18 @@ public class PackageWrapper {
     }
   }
 
+  private void publishDeletionPendingStatusCometdMessage(String csarid) {
+    try {
+      Map<String, Object> cometdMessage = new HashMap<String, Object>();
+      cometdMessage.put("csarid", csarid);
+      cometdMessage.put("status", "deletionPending");
+      CometdService.getInstance().publish(CommonConstant.COMETD_CHANNEL_PACKAGE_DELETE,
+          cometdMessage);
+    } catch (CometdException e1) {
+      LOG.error("publish delfinish cometdmsg fail.", e1);
+    }
+  }
+  
   class DelCsarThread implements Runnable {
     private String csarid;
     private boolean isInstanceTemplate = false;
@@ -216,12 +233,12 @@ public class PackageWrapper {
         LOG.error("del instance csar fail."+ e1.getMessage());
         updatePackageStatus(csarid, null, null, null, CommonConstant.PACKAGE_STATUS_DELETE_FAIL,
             null);
-        // publishDelFinishCometdMessage(csarid, "false");
+        publishDelFinishCometdMessage(csarid, "false");
       }
     }
 
     private void delCsarData(String csarId) {
-      updatePackageStatus(csarid, null, null, null, CommonConstant.PACKAGE_STATUS_DELETING, null);
+      updatePackageStatus(csarId, null, null, null, CommonConstant.PACKAGE_STATUS_DELETING, null);
       String packagePath = PackageWrapperUtil.getPackagePath(csarId);
       if (packagePath == null) {
         LOG.error("package path is null! ");
@@ -236,36 +253,30 @@ public class PackageWrapper {
       } catch (CatalogResourceException e) {
         LOG.error("delete template data from db error! csarId = " + csarId, e);
       }
-//      PackageData packageData = new PackageData();
-//      packageData.setCsarId(csarId);
-//      try {
-//        TemplateManager.getInstance().deleteServiceTemplateByCsarPackageInfo(packageData);
-//      } catch (CatalogResourceException e2) {
-//        LOG.error("delete template data from db error! csarId = " + csarId);
-//      }
       //delete package data from database
       try {
         PackageManager.getInstance().deletePackage(csarId);
       } catch (CatalogResourceException e1) {
         LOG.error("delete package  by csarId from db error ! " + e1.getMessage(), e1);
       }
+      publishDelFinishCometdMessage(csarId, "true");
     }
 
-    // private void publishDelFinishCometdMessage(String csarid, String csarDelStatus) {
-    // if (isInstanceTemplate) {
-    // LOG.info("delete instance Template finish. csarid:{}", csarid);
-    // return;
-    // }
-    // try {
-    // Map<String, Object> cometdMessage = new HashMap<String, Object>();
-    // cometdMessage.put("csarid", csarid);
-    // cometdMessage.put("status", csarDelStatus);
-    // CometdService.getInstance().publish(CommonConstant.COMETD_CHANNEL_PACKAGE_DELETE,
-    // cometdMessage);
-    // } catch (CometdException e) {
-    // LOG.error("publish delfinish cometdmsg fail.", e);
-    // }
-    // }
+    private void publishDelFinishCometdMessage(String csarId, String csarDelStatus) {
+      if (isInstanceTemplate) {
+        LOG.info("delete instance Template finish. csarid:{}", csarId);
+        return;
+      }
+      try {
+        Map<String, Object> cometdMessage = new HashMap<String, Object>();
+        cometdMessage.put("csarid", csarId);
+        cometdMessage.put("status", csarDelStatus);
+        CometdService.getInstance().publish(CommonConstant.COMETD_CHANNEL_PACKAGE_DELETE,
+            cometdMessage);
+      } catch (CometdException e1) {
+        LOG.error("publish delfinish cometdmsg fail." + e1.getMessage(), e1);
+      }
+    }
   }
 
   /**