0ddd2a92e0de5d1c0faf56adc3059464eefcd9cb
[policy/distribution.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2018 Intel Corp. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  * ================================================================================
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * SPDX-License-Identifier: Apache-2.0
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.policy.distribution.reception.handling.file;
23
24 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
25
26 import java.io.File;
27 import java.io.IOException;
28 import java.nio.file.FileSystems;
29 import java.nio.file.Path;
30 import java.nio.file.Paths;
31 import java.nio.file.WatchEvent;
32 import java.nio.file.WatchKey;
33 import java.nio.file.WatchService;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.Executors;
36 import java.util.concurrent.TimeUnit;
37 import java.util.zip.ZipFile;
38
39 import org.onap.policy.common.parameters.ParameterService;
40 import org.onap.policy.distribution.model.Csar;
41 import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
42 import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
43 import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 /**
48  * Handles reception of inputs from File System which can be used to decode policies.
49  */
50 public class FileSystemReceptionHandler extends AbstractReceptionHandler {
51
52     private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemReceptionHandler.class);
53     private boolean running = false;
54
55     /**
56      * {@inheritDoc}.
57      */
58     @Override
59     protected void initializeReception(final String parameterGroupName) {
60         LOGGER.debug("FileSystemReceptionHandler init...");
61         try {
62             final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
63                     ParameterService.get(parameterGroupName);
64             final FileClientHandler fileClientHandler = new FileClientHandler(this,
65                     handlerParameters.getWatchPath(),
66                     handlerParameters.getMaxThread());
67             final Thread fileWatcherThread = new Thread(fileClientHandler);
68             fileWatcherThread.start();
69         } catch (final Exception ex) {
70             LOGGER.error("FileSystemReceptionHandler initialization failed", ex);
71         }
72     }
73
74     /**
75      * {@inheritDoc}.
76      */
77     @Override
78     public void destroy() {
79         running = false;
80     }
81
82     /**
83      * Method to check the running status of file watcher thread.
84      *
85      * @return the running status
86      */
87     public boolean isRunning() {
88         return running;
89     }
90
91     /**
92      * Initialize the file watcher thread.
93      *
94      * @param watchPath Path to watch
95      */
96     public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
97         try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
98             final Path dir = Paths.get(watchPath);
99             dir.register(watcher, ENTRY_CREATE);
100             LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
101             startWatchService(watcher, dir, maxThread);
102         } catch (final Exception ex) {
103             LOGGER.error("FileWatcher initialization failed", ex);
104             Thread.currentThread().interrupt();
105         }
106     }
107
108     /**
109      * Method to keep watching the given path for any new file created.
110      *
111      * @param watcher the watcher
112      * @param dir the watch directory
113      * @param maxThread the max thread number
114      * @throws InterruptedException if it occurs
115      */
116     @SuppressWarnings("unchecked")
117     protected void startWatchService(final WatchService watcher,
118             final Path dir,
119             int maxThread) throws InterruptedException, NullPointerException, IllegalArgumentException {
120         WatchKey key;
121         ExecutorService pool = Executors.newFixedThreadPool(maxThread);
122
123         running = true;
124         while (running) {
125             key = watcher.take();
126
127             for (final WatchEvent<?> event : key.pollEvents()) {
128                 final WatchEvent<Path> ev = (WatchEvent<Path>) event;
129                 final Path fileName = ev.context();
130                 pool.execute(new Runnable() {
131                     public void run() {
132                         LOGGER.debug("new CSAR found: {}", fileName);
133                         DistributionStatisticsManager.updateTotalDistributionCount();
134                         final String fullFilePath = dir.toString() + File.separator + fileName.toString();
135                         try {
136                             waitForFileToBeReady(fullFilePath);
137                             createPolicyInputAndCallHandler(fullFilePath);
138                             LOGGER.debug("CSAR complete: {}", fileName);
139                         } catch (InterruptedException e) {
140                             LOGGER.error("waitForFileToBeReady interrupted", e);
141                         }
142                     }
143                 });
144             }
145             final boolean valid = key.reset();
146             if (!valid) {
147                 LOGGER.error("Watch key no longer valid!");
148                 break;
149             }
150         }
151         pool.shutdown();
152     }
153
154     /**
155      * Method to create policy input & call policy handlers.
156      *
157      * @param fileName the filename
158      */
159     protected void createPolicyInputAndCallHandler(final String fileName) {
160         try {
161             final Csar csarObject = new Csar(fileName);
162             DistributionStatisticsManager.updateTotalDownloadCount();
163             inputReceived(csarObject);
164             DistributionStatisticsManager.updateDownloadSuccessCount();
165             DistributionStatisticsManager.updateDistributionSuccessCount();
166         } catch (final PolicyDecodingException ex) {
167             DistributionStatisticsManager.updateDownloadFailureCount();
168             DistributionStatisticsManager.updateDistributionFailureCount();
169             LOGGER.error("Policy creation failed", ex);
170         }
171     }
172
173     private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
174         boolean flag = true;
175         while (flag) {
176             TimeUnit.MILLISECONDS.sleep(100);
177             try (ZipFile zipFile = new ZipFile(fullFilePath)) {
178                 flag = false;
179             } catch (final IOException exp) {
180                 LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);
181             }
182         }
183     }
184 }