Source code for dquality.accumulator

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# #########################################################################
# Copyright (c) 2016, UChicago Argonne, LLC. All rights reserved.         #
#                                                                         #
# Copyright 2016. UChicago Argonne, LLC. This software was produced       #
# under U.S. Government contract DE-AC02-06CH11357 for Argonne National   #
# Laboratory (ANL), which is operated by UChicago Argonne, LLC for the    #
# U.S. Department of Energy. The U.S. Government has rights to use,       #
# reproduce, and distribute this software.  NEITHER THE GOVERNMENT NOR    #
# UChicago Argonne, LLC MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR        #
# ASSUMES ANY LIABILITY FOR THE USE OF THIS SOFTWARE.  If software is     #
# modified to produce derivative works, such modified software should     #
# be clearly marked, so as not to confuse it with the version available   #
# from ANL.                                                               #
#                                                                         #
# Additionally, redistribution and use in source and binary forms, with   #
# or without modification, are permitted provided that the following      #
# conditions are met:                                                     #
#                                                                         #
#     * Redistributions of source code must retain the above copyright    #
#       notice, this list of conditions and the following disclaimer.     #
#                                                                         #
#     * Redistributions in binary form must reproduce the above copyright #
#       notice, this list of conditions and the following disclaimer in   #
#       the documentation and/or other materials provided with the        #
#       distribution.                                                     #
#                                                                         #
#     * Neither the name of UChicago Argonne, LLC, Argonne National       #
#       Laboratory, ANL, the U.S. Government, nor the names of its        #
#       contributors may be used to endorse or promote products derived   #
#       from this software without specific prior written permission.     #
#                                                                         #
# THIS SOFTWARE IS PROVIDED BY UChicago Argonne, LLC AND CONTRIBUTORS     #
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT       #
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS       #
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL UChicago     #
# Argonne, LLC OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,        #
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,    #
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;        #
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER        #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT      #
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN       #
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE         #
# POSSIBILITY OF SUCH DAMAGE.                                             #
# #########################################################################

"""
Please make sure the installation :ref:`pre-requisite-reference-label` are met.

The application monitors given directory for new/modified files of the
given pattern. Each of the detected file is verified according to schema
configuration and for each of the file several new processes are started,
each process performing specific quality calculations.

The results will be sent to an EPICS PV (printed on screen for now).

"""

import os
import sys
import pyinotify
from pyinotify import WatchManager
from multiprocessing import Process, Queue
import json
import numpy as np
import dquality.common.utilities as utils
import dquality.handler as datahandler
import dquality.common.report as report
from dquality.common.containers import Data

__author__ = "Barbara Frosik"
__copyright__ = "Copyright (c) 2016, UChicago Argonne, LLC."
__docformat__ = 'restructuredtext en'
__all__ = ['init',
           'verify',
           'directory']

files = Queue()
INTERRUPT = 'interrupt'


[docs]def init(config): """ This function initializes global variables. It gets values from the configuration file, evaluates and processes the values. If mandatory file or directory is missing, the script logs an error and exits. Parameters ---------- config : str configuration file name, including path Returns ------- logger : Logger logger instance limits : dictionary a dictionary containing limit values read from the configured 'limit' file report_file : str a report file configured in a given configuration file extensions : list a list containing extensions of files to be monitored read from the configuration file """ conf = utils.get_config(config) logger = utils.get_logger(__name__, conf) limitsfile = utils.get_file(conf, 'limits', logger) if limitsfile is None: sys.exit(-1) with open(limitsfile) as limits_file: limits = json.loads(limits_file.read())['limits'] report_file = utils.get_file(conf, 'report_file', logger) try: extensions = conf['extensions'] except KeyError: logger.warning('no file extension specified. Monitoring for all files.') extensions = [''] return logger, limits, extensions
[docs]def directory(directory, patterns): """ This method monitors a directory given by the "*directory*" parameter. It creates a notifier object. The notifier is registered to await the "*CLOSE_WRITE*" event on a new file that matches the "*pattern*" parameter. If there is no such event, it yields control on timeout, defaulted to 1 second. It returns the created notifier. Parameters ---------- file : str File Name including path patterns : list A list of strings representing file extension Returns ------- None """ class EventHandler(pyinotify.ProcessEvent): def process_IN_CLOSE_WRITE(self, event): for pattern in patterns: file = event.pathname if file.endswith(pattern): files.put(event.pathname) break wm = WatchManager() mask = pyinotify.IN_CLOSE_WRITE handler = EventHandler() notifier = pyinotify.Notifier(wm, handler, timeout=1) wdd = wm.add_watch(directory, mask, rec=False) return notifier
[docs]def verify(conf, folder, data_type, num_files, report_by_files=True): """ This is the main function called when the verifier application starts. It reads the configuration for the directory to monitor, for pattern that represents a file extension to look for, and for a number of files that are expected for the experiment. The number of files configuration parameter is added for experiments that generate multiple files. In some cases the experiment data is collected into a single file, which is organized with data sets. The function calls directory function that sets up the monitoring and returns notifier. After the monitoring is initialized, it starts a loop that reads the global "*files*" queue and then the global "*results*" queue. If there is any new file, the file is removed from the queue, and the data in the file is validated by a sequence of validation methods. If there is any new result, the result is removed from the queue, corresponding process is terminated, and the result is presented. (currently printed on console, later will be pushed into an EPICS process variable) The loop is interrupted when all expected processes produced results. The number of expected processes is determined by number of files and number of validation functions. Parameters ---------- conf : str configuration file name, including path folder : str monitored directory data_type : str defines which data type is being evaluated num_files : int number of files that will be processed report_by_files : boolean this variable directs how to present the bad indexes in a report. If True, the indexes are related to the files, and a filename is included in the report. Otherwise, the report contains a list of bad indexes. Returns ------- bad_indexes : dict a dictionary or list containing bad indexes """ logger, limits, extensions = init(conf) if not os.path.isdir(folder): logger.error( 'parameter error: directory ' + folder + ' does not exist') sys.exit(-1) notifier = directory(folder, extensions) interrupted = False file_list = [] offset_list = [] dataq = Queue() aggregateq = Queue() p = Process(target=datahandler.handle_data, args=(dataq, limits[data_type], aggregateq, )) p.start() file_index = 0 slice_index = 0 while not interrupted: # The notifier will put a new file into a newFiles queue if one was # detected notifier.process_events() if notifier.check_events(): notifier.read_events() # checking the newFiles queue for new entries and starting verification # processes for each new file while not files.empty(): file = files.get() if file.find('INTERRUPT') >= 0: # the calling function may use a 'interrupt' command to stop the monitoring # and processing. dataq.put('all_data') notifier.stop() interrupted = True break else: if file_index == 0: report_file = file.rsplit(".",)[0] + '.report' fp, tags = utils.get_data_hd5(file) data_tag = tags['/exchange/'+data_type] data = np.asarray(fp[data_tag]) slice_index += data.shape[0] file_list.append(file) offset_list.append(slice_index) for i in range(0, data.shape[0]): dataq.put(Data(data[i])) file_index += 1 if file_index == num_files: dataq.put('all_data') notifier.stop() interrupted = True break aggregate = aggregateq.get() try: report_file = open(report_file, 'w') except: logger.warning('Cannot open report file, writing report on console') report_file = None report.report_results(aggregate, data_type, None, report_file) bad_indexes = {} if report_by_files == 'True': report.add_bad_indexes_per_file(aggregate, data_type, bad_indexes, file_list, offset_list) else: report.add_bad_indexes(aggregate, data_type, bad_indexes) report.report_bad_indexes(bad_indexes, report_file) return bad_indexes