47312e62786e659b35a57a35191fdd4d456b944f
[policy/distribution.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  *  Copyright (C) 2018 Intel Corp. All rights reserved.
4  *  Copyright (C) 2019 Nordix Foundation.
5  *  Modifications Copyright (C) 2019 AT&T Intellectual Property.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  * SPDX-License-Identifier: Apache-2.0
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.policy.distribution.reception.handling.file;
24
25 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
26
27 import java.io.File;
28 import java.io.IOException;
29 import java.nio.file.FileSystems;
30 import java.nio.file.Path;
31 import java.nio.file.Paths;
32 import java.nio.file.WatchEvent;
33 import java.nio.file.WatchKey;
34 import java.nio.file.WatchService;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.TimeUnit;
38 import java.util.zip.ZipFile;
39
40 import org.onap.policy.common.parameters.ParameterService;
41 import org.onap.policy.distribution.model.Csar;
42 import org.onap.policy.distribution.reception.decoding.PolicyDecodingException;
43 import org.onap.policy.distribution.reception.handling.AbstractReceptionHandler;
44 import org.onap.policy.distribution.reception.statistics.DistributionStatisticsManager;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48 /**
49  * Handles reception of inputs from File System which can be used to decode policies.
50  */
51 public class FileSystemReceptionHandler extends AbstractReceptionHandler {
52
53     private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemReceptionHandler.class);
54     private boolean running = false;
55
56     /**
57      * {@inheritDoc}.
58      */
59     @Override
60     protected void initializeReception(final String parameterGroupName) {
61         LOGGER.debug("FileSystemReceptionHandler init...");
62         try {
63             final FileSystemReceptionHandlerConfigurationParameterGroup handlerParameters =
64                     ParameterService.get(parameterGroupName);
65             final FileClientHandler fileClientHandler =
66                     new FileClientHandler(this, handlerParameters.getWatchPath(), handlerParameters.getMaxThread());
67             final Thread fileWatcherThread = new Thread(fileClientHandler);
68             fileWatcherThread.start();
69         } catch (final Exception ex) {
70             LOGGER.error("FileSystemReceptionHandler initialization failed", ex);
71         }
72     }
73
74     /**
75      * {@inheritDoc}.
76      */
77     @Override
78     public void destroy() {
79         running = false;
80     }
81
82     /**
83      * Method to check the running status of file watcher thread.
84      *
85      * @return the running status
86      */
87     public boolean isRunning() {
88         return running;
89     }
90
91     /**
92      * Initialize the file watcher thread.
93      *
94      * @param watchPath Path to watch
95      */
96     public void initFileWatcher(final String watchPath, final int maxThread) throws IOException {
97         try (final WatchService watcher = FileSystems.getDefault().newWatchService()) {
98             final Path dir = Paths.get(watchPath);
99             dir.register(watcher, ENTRY_CREATE);
100             LOGGER.debug("Watch Service registered for dir: {}", dir.getFileName());
101             startWatchService(watcher, dir, maxThread);
102         } catch (final Exception ex) {
103             LOGGER.error("FileWatcher initialization failed", ex);
104             Thread.currentThread().interrupt();
105         }
106     }
107
108     /**
109      * Method to keep watching the given path for any new file created.
110      *
111      * @param watcher the watcher
112      * @param dir the watch directory
113      * @param maxThread the max thread number
114      * @throws InterruptedException if it occurs
115      */
116     protected void startWatchService(final WatchService watcher, final Path dir, final int maxThread)
117             throws InterruptedException {
118         WatchKey key;
119         final ExecutorService pool = Executors.newFixedThreadPool(maxThread);
120
121         try {
122             running = true;
123             while (running) {
124                 key = watcher.take();
125                 processFileEvents(dir, key, pool);
126                 final boolean valid = key.reset();
127                 if (!valid) {
128                     LOGGER.error("Watch key no longer valid!");
129                     break;
130                 }
131             }
132         } finally {
133             pool.shutdown();
134         }
135     }
136
137     private void processFileEvents(final Path dir, final WatchKey key, final ExecutorService pool) {
138         for (final WatchEvent<?> event : key.pollEvents()) {
139             @SuppressWarnings("unchecked")
140             final WatchEvent<Path> ev = (WatchEvent<Path>) event;
141             final Path fileName = ev.context();
142             pool.execute(() -> {
143                 LOGGER.debug("new CSAR found: {}", fileName);
144                 DistributionStatisticsManager.updateTotalDistributionCount();
145                 final String fullFilePath = dir.toString() + File.separator + fileName.toString();
146                 try {
147                     waitForFileToBeReady(fullFilePath);
148                     createPolicyInputAndCallHandler(fullFilePath);
149                     LOGGER.debug("CSAR complete: {}", fileName);
150                 } catch (final InterruptedException e) {
151                     LOGGER.error("waitForFileToBeReady interrupted", e);
152                     Thread.currentThread().interrupt();
153                 }
154             });
155         }
156     }
157
158     /**
159      * Method to create policy input & call policy handlers.
160      *
161      * @param fileName the filename
162      */
163     protected void createPolicyInputAndCallHandler(final String fileName) {
164         try {
165             final Csar csarObject = new Csar(fileName);
166             DistributionStatisticsManager.updateTotalDownloadCount();
167             inputReceived(csarObject);
168             DistributionStatisticsManager.updateDownloadSuccessCount();
169             DistributionStatisticsManager.updateDistributionSuccessCount();
170         } catch (final PolicyDecodingException ex) {
171             DistributionStatisticsManager.updateDownloadFailureCount();
172             DistributionStatisticsManager.updateDistributionFailureCount();
173             LOGGER.error("Policy creation failed", ex);
174         }
175     }
176
177     private void waitForFileToBeReady(final String fullFilePath) throws InterruptedException {
178         boolean flag = true;
179         while (flag) {
180             TimeUnit.MILLISECONDS.sleep(100);
181             try (ZipFile zipFile = new ZipFile(fullFilePath)) {
182                 flag = false;
183             } catch (final IOException exp) {
184                 LOGGER.error("file is not ready for reading, wait for sometime and try again", exp);
185             }
186         }
187     }
188 }