Source code for mast_contributor_tools.filename_check.fc_app

import os
import textwrap
from pathlib import Path
from typing import Union

from tqdm import tqdm

from mast_contributor_tools.filename_check.fc_db import Hlsp_SQLiteDb
from mast_contributor_tools.filename_check.hlsp_filename import HLSPNAME_REGEX, FieldRule, HlspFileName
from mast_contributor_tools.utils.logger_config import setup_logger

logger = setup_logger(__name__)


[docs] def get_file_paths( hlsp_path: str, from_file: str = "", search_pattern: str = "*.*", exclude_pattern: Union[str, None] = None, max_n: Union[int, None] = None, ) -> list[Path]: """ Build a list of filename Paths relative to the given directory. Parameters ---------- hlsp_path : str Head of directory containing HLSP collection files. The base directory defaults to the current working directory. from_file : str, optional Path to a text file containing a list of filenames to check, instead of scanning a directory search_pattern : str, optional Search pattern to limit files to test. For example, '*.fits' will only return the fits files. Default value is '*.*' for all files exclude_pattern : str, optional Search pattern to exclude files from testing. For example, '*.png' will only skip all of the png files. max_n : int, optional Maximum number of files to check, for testing purposes. For example, max_n=10 will only check the first 10 files found. Returns ------- list[Path] A list of filename Paths contained within the given directory """ # Set current directory if no directory specified if not hlsp_path: base_path = Path.cwd() else: base_path = Path(hlsp_path) # If a from_file was given, create the file list from that if from_file: # Raise error if file does not exist if not os.path.exists(from_file): msg = f"File '{from_file}' does not exist." logger.error(msg) raise FileNotFoundError(msg) else: with open(from_file, "r") as f: file_list = [Path(filename.strip("\n")) for filename in f.readlines()] # Otherwise, scan the contents of the directory else: file_list = [p.relative_to(base_path) for p in base_path.rglob(search_pattern) if p.is_file()] # Match against the search pattern file_list = [f for f in file_list if f.match(search_pattern)] # Exclude files from exclude_pattern if exclude_pattern: file_list = [f for f in file_list if not f.match(exclude_pattern)] # Limit number of files returned to first n rows for testing purposes if max_n: if int(max_n) < len(file_list): file_list = file_list[: int(max_n)] # Raise error if no files are found if len(file_list) == 0: if from_file: msg = f"No files found to check against filename rules in file ({from_file})." else: msg = f"No files found to check against filename rules in directory ({base_path})." logger.error(msg) raise FileNotFoundError(msg) return file_list
[docs] def check_filenames(hlsp_name: str, file_list: list[Path], dbFile: str, output_format: str = "db") -> None: """Recursively check filenames in a directory tree of HLSP products Parameters ---------- hlsp_name : str Official identifier (abbreviation/acronym/initialism) for the HLSP collection file_list: list[str] List of files to check, typically output from get_file_paths() dbFile : str, optional Name of SQLite database file to contain results output_format : str, optional Alternate format to save results to: 'csv', 'fits', 'html', or 'excel'. Default: "db" """ # Make sure hlsp name is valid if not FieldRule.match_pattern(hlsp_name, HLSPNAME_REGEX): msg = ( f"Invalid hlsp_name for HLSP collection: '{hlsp_name}'.\n" "The HLSP name must follow these rules: \n" "\t 1. The first character must be a lowercase letter \n" "\t 2. The middle characters can be lowercase letters, numbers, or a hyphen ‘-‘ \n" "\t 3. The last character must be a lowercase letter or a number \n" "\t 4. The hlsp_name must be 20 characters or less in length" ) logger.error(msg) raise ValueError(msg) # Beging file name checking logger.critical(f"Evaluating {len(file_list)} files for HLSP collection '{hlsp_name}'") if Path(dbFile).is_file(): logger.warning(f"Database file {dbFile} already exists. Overwriting File.") os.remove(dbFile) db = Hlsp_SQLiteDb(dbFile) logger.debug(f"Creating results database {dbFile}") db.create_db() # Evaluate each filename # tqdm creates the progress bar: https://tqdm.github.io/docs/tqdm/ for f in tqdm(file_list): logger.debug(f"Examining {f.name}") try: hfn = HlspFileName(f, hlsp_name) hfn.partition() except ValueError: logger.error(f"Invalid name: {f.name}, skipping...") else: hfn.create_fields() elements = hfn.evaluate_fields() # Link elements to parent filename in db for e in elements: e["file_ref"] = f.name # Order is important here: evaluating filename requries fields to be evaluated file_rec = hfn.evaluate_filename() # Record the results in the db try: db.add_filename(file_rec) except Exception as e: logger.error(f"Error adding {f.name}: {e}") else: db.add_fields(elements) logger.debug(f"Verdict for {f.name}: '{file_rec['final_verdict']}'") logger.critical(db.print_summary()) # print summary information on how many files passed logger.critical(f"\nResults written to {dbFile}") # Write ouput to alternate format if specified if output_format != "db": logger.debug(f"Also writing to alternate format '{output_format}'") ouput_files = db.write_to_alternate_format(output_format) logger.critical(f"Written to {ouput_files}") db.close_db() logger.critical(f"\nFilename checking complete. Results written to {dbFile}")
[docs] def check_single_filename(file_name: str, hlsp_name: str = "") -> None: """HLSP filename module CLI driver. Parameters ---------- file_name : str File name of an HLSP product to test: for example 'hlsp_my-hlsp_readme.txt'. This is a string, and does not need to be a real file. hlsp_name : str, optional Name of example HLSP collection. For example, 'my-hlsp'. If not supplied, the hlsp_name is inferred using the second field of the filename. """ # Infer hlsp_name from the file name if it wasn't provided if not hlsp_name: if len(file_name.split("_")) > 2: hlsp_name = file_name.split("_")[1].lower() else: msg = f"Could not infer HLSP name from filename '{file_name}'. Not enough parts in filename." logger.error(msg) raise ValueError(msg) # Check file name fields fp = Path(file_name) hfn = HlspFileName(fp, hlsp_name) hfn.partition() hfn.create_fields() elements = hfn.evaluate_fields() file_rec = hfn.evaluate_filename() # TODO: Add more helpful outputs here for each failure cases? # Define list of suggested solution for each rule suggested_solutions = { "capitalization_score": "File names should be all lowercase.", "length_score": "Character length for this field is too long.", "format_score": "Forbidden characters detected. Value should be alphanumeric with hyphens, although some special characters are allowed in the 'target_name' or 'version' fields.", "value_score": "Unrecognized value or combination. These are often necessary and good, but require review by MAST staff.", "nfield_score": "File name contains more than 9 fields; underscores cannot be used within a field.", } # Display resuls for e in elements: logger_msg = "Individual Field evaluations: \n" for p, v in e.items(): logger_msg += f" {p}: '{v}' \n" if (str(v).lower() in ["needs review", "fail"]) and (p in suggested_solutions.keys()): # Wrap text to the same indent level logger_msg += textwrap.fill( f"\tHINT: {suggested_solutions[p]}", subsequent_indent="\t", width=os.get_terminal_size().columns - 5, # Extra 5 to account for the tab ) logger_msg += "\n" logger.debug(logger_msg) logger_msg = f"Evaluating filename: {file_name} \n" for p, v in file_rec.items(): logger_msg += f" {p}: {v} \n" logger_msg += f"Final Verdict: '{file_rec['final_verdict'].upper()}'" logger.critical(logger_msg)