"""The main logic module to check filename compliance"""
import os
import re
from abc import ABC, abstractmethod
from pathlib import Path
import yaml
from mast_contributor_tools.utils.logger_config import setup_logger
logger = setup_logger(__name__)
# ==========================================
# Setup some configurations for this module
# ==========================================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
with open(os.path.join(BASE_DIR, "fc_config.yaml"), "r") as f:
cfg = yaml.safe_load(f)
EXTENSION_TYPES = cfg["ExtensionTypes"]
SEMANTIC_TYPES = cfg["SemanticTypes"]
fieldLengthPolicy = cfg["FieldLength"]
# Fetch configurations of three name fields: observation, instrument, and filter (oif)
with open(os.path.join(BASE_DIR, "oif.yaml"), "r") as f:
oif = yaml.safe_load(f)
MISSIONS = [*oif]
INSTRUMENTS = set(sum([[*oif[m]["instruments"]] for m in MISSIONS], []))
# Tussing the list of all unique filters takes more work
# The following does not support name "aliases" in the yaml file
filt_list = []
for m in MISSIONS:
for i in [*oif[m]["instruments"]]:
for f in [*oif[m]["instruments"][i]["filters"]]:
filt_list.append(f)
FILTERS = set(filt_list)
SCORE = {False: "fail", True: "pass"}
SCORE_LAX = {False: "needs review", True: "pass"}
# Define REGEX pattern rules for various fields
# Use https://regex101.com to verify these and explore more examples
# File Name Expression:
# "^[a-zA-Z0-9]": The first character must be a letter or a number
# "[\w\-\+]+": The middle characters can be word characters (\w for 'word') or a hyphen (\-) or a plus sign (\+)
# Note: \w is equivalent to [a-zA-Z0-9_]: any letter, number, or underscore.
# "(\.[\w\-\+\.]+)?": There can optionally be a period follwed by more word characters in the middle (for example "v1.0_spec"")
# "(\.[\w]+": The file should end with "." follwed by a word (like ".fits" or ."jpg")
# "(\.gz|\.zip)?)$": the file can optionally end in .gz or .zip too
# Note this expression is intentionally too generous; this is used to search for files to test, not to actually test the files
# For example, this regex allows the first character to be a number, when the rules require the name to start with 'hlsp'
# In that case, the file would match this pattern and therefore be added to the list to test, but it would fail the tests due to the value
FILENAME_REGEX = re.compile(r"^[a-zA-Z0-9][\w\-\+]+(\.[\w\-\+\.]+)?(\.[\w]+(\.gz|\.zip)?)$")
# HLSP Name Expression:
# "^[a-zA-Z]"" : The first character must be a lowercase letter
# "[a-zA-Z0-9-]*" : The middle characters can be lowercase letters, numbers, or a hyphen '-'
# "[a-zA-Z0-9]$" : The last character must be a lowercase letter or a number
HLSPNAME_REGEX = re.compile(r"^[a-zA-Z][a-zA-Z0-9-]*[a-zA-Z0-9]$")
# Target Name Expression:
# "^[a-zA-Z0-9]" : The first character must be a letter or a number
# "[a-zA-Z0-9+\-.]*" : middle characters can be letters, numbers, or some special characters are allowed: '+' and '-' and '.'
# "[a-zA-Z0-9]$" : Last character must be a letter or a number (no special characters)
TARGET_REGEX = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9+\-.]*[a-zA-Z0-9]$")
# Version Expression:
# "^v" : must start with lowercase "v"
# "[0-9]{0,2}" Next zero to two characters must be numbers
# "([.][0-9]{0,2})": There can be up to two "." or "p" followed by up to two more numbers
# "[0-9]$" : the last character must be a number
VERSION_REGEX = re.compile(r"^v[0-9]{0,2}([.p][0-9]{0,2}){0,2}[0-9]$")
# Expression for all file extension:
# "^[a-zA-Z]"" : The first character must be a lowercase or uppercase letter
# "[a-zA-Z0-9.]*" : The middle characters can be letters, numbers, or a period '.'
# "[a-zA-Z0-9]*$" : The last character must be a letter or a number
EXTENSION_REGEX = re.compile(r"^[a-zA-Z]*[a-zA-Z0-9.]*[a-zA-Z0-9]*$")
# Expression for all other fields: telescope, instrument, filter, etc.:
# (this is purposefully generous on captilization - a different test checks for that and want to avoid confusion)
# "^[a-zA-Z]"" : The first character must be a lowercase or uppercase letter
# "[a-zA-Z0-9-]*" : The middle characters can be letters, numbers, or a hyphen '-'
# "[a-zA-Z0-9]$" : The last character must be a letter or a number
OTHER_REGEX = re.compile(r"^[a-zA-Z]*[a-zA-Z0-9-]*[a-zA-Z0-9]*$")
# =============================
# Classes for field rules
# =============================
[docs]
class FieldRule:
"""Rules for filename validation.
This class embodies rules for validating attributes of field names. The
approach to validiting field *values* varies by field. The expressions that
validate the version or target fields can be verified at https://regex101.com
"""
[docs]
def length(value: str, max_length: int) -> str:
"""Test if the character count is non-zero and within the limit for that field.
Returns 'pass' or 'fail' based on results."""
return SCORE[(len(value) <= max_length) and (len(value) > 0)]
[docs]
def capitalization(value: str) -> str:
"""Test the captilizaiton: the entire filename must be lowercase.
Returns 'pass' or 'fail' based on results."""
return SCORE[value.islower()]
[docs]
def nfields(field_index: int) -> str:
"""Tests that the field index is less than 9;
Returns 'pass' or 'fail' based on results."""
return SCORE[field_index < 10]
[docs]
def match_pattern(value: str, regex_expr: re.Pattern) -> str:
"""Test that the field contains no forbidden characters.
Returns 'pass' or 'fail' based on results."""
return SCORE[regex_expr.match(value) is not None]
[docs]
def match_choice(value: str, choice_list: list[str], score_level="lax") -> str:
"""Checks value against a list, typically from oif.yaml.
Returns 'pass' or 'needs review' or 'fail' based on results.
The optional 'score_level' argument determines if 'fail' or 'needs review' is returned (default lax)"""
if score_level == "lax":
return SCORE_LAX[value.lower() in choice_list]
else:
return SCORE[value.lower() in choice_list]
[docs]
def match_multi_choice(value: str, choice_list: list[str], score_level="lax") -> str:
"""Checks multiple values against a list, typically from oif.yaml.
Returns 'pass' or 'needs review' or 'fail' based on results.
The optional 'score_level' argument determines if 'fail' or 'needs review' is returned (default lax)"""
# match all elements in a hyphenated value to the choice_list
if score_level == "lax":
return SCORE_LAX[all([(v.lower() in choice_list) for v in value.split("-")])]
else:
return SCORE[all([(v.lower() in choice_list) for v in value.split("-")])]
[docs]
def field_verdict(scores: list[str]) -> str:
"""Determine the final verdict for this field: 'pass', 'needs review' or 'fail',
determined as the worst of the input scores."""
if "fail" in scores:
verdict = "fail"
elif "needs review" in scores:
verdict = "needs review"
else:
verdict = "pass"
return verdict.upper()
[docs]
class FilenameFieldAB(ABC):
"""Template for Filename Field classes.
Each field of a filename in an HLSP collection will be evaluated for:
length, capitalization, content, and often a match against valid values.
Each evaluation results in a score, which is one of:
- 'pass' for no detected problems
- 'fail' for a detected problem that must be fixed
- 'needs review' for a possible but non-fatal problem that requires review by MAST Staff
The final verdict of the set of evaluations is determined as the worst of the input scores.
Parameters
----------
field_name : str
Internal name for the field being created
field_value : str
Value of the field (i.e. text of the field in the filename)
"""
def __init__(self, field_name: str, field_value: str, field_indx: int) -> None:
self.name = field_name
self.value = field_value
self.max_len = fieldLengthPolicy[field_name]
self.field_indx = field_indx + 1 # index from 1 instead of 0
# Set regex pattern based on field name
if self.name == "hlsp_name":
self.regex_pattern = HLSPNAME_REGEX
elif self.name == "target_name":
self.regex_pattern = TARGET_REGEX
elif self.name == "version_id":
self.regex_pattern = VERSION_REGEX
elif self.name == "extension":
self.regex_pattern = EXTENSION_REGEX
else:
self.regex_pattern = OTHER_REGEX
# Capitalization Evaluation
self.cap_eval = False
# Character Length Evaluation
self.len_eval = False
# Format Evaluation (no forbidden characters)
self.format_eval = False
# Value Evaluation (recognized entries for telescope, filter, etc.)
self.value_eval = False
# Field number index evaluation (must be less than 9)
self.nfield_eval = False
# Final Verdict
self.field_verdict = "fail"
[docs]
@abstractmethod
def evaluate(self):
"""Evaluate the field for each rule"""
self.cap_eval = FieldRule.capitalization(self.value)
self.len_eval = FieldRule.length(self.value, self.max_len)
self.format_eval = FieldRule.match_pattern(self.value, self.regex_pattern)
self.nfield_eval = FieldRule.nfields(self.field_indx)
[docs]
def get_scores(self):
"""Return final scores"""
# Determine the final verdict as the worst of the four scores
all_scores = [self.cap_eval, self.len_eval, self.format_eval, self.value_eval, self.nfield_eval]
self.field_verdict = FieldRule.field_verdict(all_scores)
return {
# Name of Field: for example 'mission' or 'product_type'
"name": self.name,
# value of the field: for example 'jwst' or 'spec'
"value": self.value,
# Index of the field: location in file name
"nfield": self.field_indx,
# Results from each validation check
"capitalization_score": self.cap_eval,
"length_score": self.len_eval,
"format_score": self.format_eval,
"value_score": self.value_eval,
"nfield_score": self.nfield_eval,
# Final Score
"field_verdict": self.field_verdict,
}
[docs]
class ExtensionField(FilenameFieldAB):
def __init__(self, value: str, field_indx: int = 8) -> None:
super().__init__("extension", value, field_indx)
[docs]
def evaluate(self):
super().evaluate()
self.value_eval = FieldRule.match_choice(self.value, EXTENSION_TYPES)
[docs]
class FilterField(FilenameFieldAB):
"""A container for attributes of the filename Filtername field."""
def __init__(self, value: str, field_indx: int = 5) -> None:
super().__init__("filter", value, field_indx)
[docs]
def evaluate(self):
super().evaluate()
self.value_eval = FieldRule.match_multi_choice(self.value, FILTERS)
[docs]
class HlspField(FilenameFieldAB):
"""A container for attributes of the literal 'hlsp' prefix field."""
def __init__(self, value: str, field_indx: int = 0) -> None:
super().__init__("hlsp_str", value, field_indx)
[docs]
def evaluate(self):
super().evaluate()
self.value_eval = FieldRule.match_choice(self.value, ["hlsp"], score_level="fatal")
[docs]
class HlspNameField(FilenameFieldAB):
"""A container for attributes of the HLSP name field."""
def __init__(self, value: str, ref_name: str, field_indx: int = 1) -> None:
super().__init__("hlsp_name", value, field_indx)
self.hlsp_ref_name = ref_name.lower()
[docs]
def evaluate(self):
super().evaluate()
# Assume a valid HLSP name was passed to the constructor
self.value_eval = FieldRule.match_choice(self.value, [self.hlsp_ref_name], score_level="fatal")
[docs]
class InstrumentField(FilenameFieldAB):
"""A container for attributes of the filename Instrument field."""
def __init__(self, value: str, field_indx: int = 3) -> None:
super().__init__("instrument", value, field_indx)
[docs]
def evaluate(self):
super().evaluate()
self.value_eval = FieldRule.match_multi_choice(self.value, INSTRUMENTS)
[docs]
class MissionField(FilenameFieldAB):
"""A container for attributes of the filename Mission (or observatory) field."""
def __init__(self, value: str, field_indx: int = 2) -> None:
super().__init__("mission", value, field_indx)
[docs]
def evaluate(self):
super().evaluate()
self.value_eval = FieldRule.match_multi_choice(self.value, MISSIONS)
[docs]
class ProductField(FilenameFieldAB):
"""A container for attributes of the filename ProductType field."""
def __init__(self, value: str, field_indx: int = 7) -> None:
super().__init__("product_type", value, field_indx)
[docs]
def evaluate(self):
super().evaluate()
self.value_eval = FieldRule.match_multi_choice(self.value, SEMANTIC_TYPES)
[docs]
class TargetField(FilenameFieldAB):
"""A container for attributes of the filename TargetName field."""
def __init__(self, value: str, field_indx: int = 4) -> None:
super().__init__("target_name", value, field_indx)
[docs]
def evaluate(self):
super().evaluate()
# A valid target name may contain the following characters in addition to
# alpha-numeric: + - .
# but must begin and end with a purely alphanumeric character.
self.value_eval = FieldRule.match_pattern(self.value, self.regex_pattern)
[docs]
class VersionField(FilenameFieldAB):
"""A container for attributes of the filename Version field."""
def __init__(self, value: str, field_indx: int = 6) -> None:
super().__init__("version_id", value, field_indx)
[docs]
def evaluate(self):
super().evaluate()
self.value_eval = FieldRule.match_pattern(self.value, self.regex_pattern)
[docs]
class GenericField(FilenameFieldAB):
"""Generic field concrete class.
Since some filename fields are optional, this class handles the case of fields
that are not identifiable with the standard set. In this case the field will be
validated for length and capitalization, but not for value.
"""
def __init__(self, value: str, id: int, field_indx: int) -> None:
super().__init__("generic" + str(id), value, field_indx)
[docs]
def evaluate(self) -> None:
super().evaluate()
# No restriction on generic field values
self.value_eval = "pass"
[docs]
class HlspFileName:
"""HLSP filename validation
Filenames are composed of fields separated by underscores, except
that the last field is really composed of two fields separated by a period.
The last part of the last field may also contain a period. Certain fields
are further composed of elements, separated by hyphens.
Filenames must have at least 4 and as many as 9 fields to be valid.
For valid filenames:
- The first two and the last two fields are required
- the third from last (N-2) is always required except when the value of
N-1 is 'readme'
Unless all 9 fields are present, or only 4 are present, it is not possible
to determine robustly what the other fields (if present) contain.
Parameters
----------
path : str
Filesystem path relative to the root of the HLSP collection files
filename : str
Filename of a collection product
hlsp_name : str
Official abbreviation/acronym/initialism of this HLSP collection
Raises
------
ValueError
If the number of fields falls outside the limits.
"""
def __init__(self, filepath: Path, hlsp_name: str) -> None:
self.filepath = filepath
# Check that filename is of the right form
if not re.match(FILENAME_REGEX, self.filepath.name):
raise ValueError(f"Invalid file name for testing: {self.filepath.name}")
# Check that the HLSP name is valid
if FieldRule.match_pattern(hlsp_name, HLSPNAME_REGEX):
self.hlspName = hlsp_name
else:
raise ValueError(f"Invalid HLSP name: {hlsp_name}")
self.fields: list[FilenameFieldAB] = []
[docs]
def partition(self) -> None:
"""Partition the filepath into path+filename, and filename into fields"""
self.name = self.filepath.name
self.path = str(self.filepath.parents[0])
parts = self.name.split("_")
# split the last part into the product type and the file extension
last = parts[-1].split(".", 1)
self.fieldvals = parts[:-1] + last
self.nFields = len(self.fieldvals)
if self.nFields < 4:
raise ValueError(f"Filename {self.name} has less than 4 fields")
elif self.nFields > 9:
# Don't raise a ValueError here: the individual fields can still be checked
# but filename will be added to the results as a FAIL
logger.error(
(
f"Filename '{self.name}' contains more than 9 fields (total {self.nFields})."
"Individual fields will still be evaulated, "
"but the final verdict will be 'FAIL'"
)
)
[docs]
def create_fields(self) -> None:
"""Create Field objects for each field in the filename."""
nf = self.nFields
# The first two fields are: 'hlsp' and the acronnym of the collection
self.fields.append(HlspField(self.fieldvals[0], 0))
self.fields.append(HlspNameField(self.fieldvals[1], self.hlspName, 1))
# If there are 9 fields, assume the rest of the fields are present in order
if nf == 9:
self.fields.append(MissionField(self.fieldvals[2], 2))
self.fields.append(InstrumentField(self.fieldvals[3], 3))
self.fields.append(TargetField(self.fieldvals[4], 4))
self.fields.append(FilterField(self.fieldvals[5], 5))
# If there are 5 < nFields < 9, the other fields are treated as generic
elif 5 < nf < 9:
for i in range(2, nf - 3):
self.fields.append(GenericField(self.fieldvals[i], i - 1, i))
# If there are more than 9 fields, treat the extra fields as generic
# The check will fail at the filename level, but the fields can still be tested
elif nf > 9:
self.fields.append(MissionField(self.fieldvals[2], 2))
self.fields.append(InstrumentField(self.fieldvals[3], 3))
self.fields.append(TargetField(self.fieldvals[4], 4))
self.fields.append(FilterField(self.fieldvals[5], 5))
for i in range(6, nf - 3):
self.fields.append(GenericField(self.fieldvals[i], i - 5, i))
# Files should have a version field unless the product_type is readme
if self.fieldvals[nf - 2].lower() not in ["readme"]:
self.fields.append(VersionField(self.fieldvals[nf - 3], nf - 3))
# The last two fields are: the file semantic type and the extension
self.fields.append(ProductField(self.fieldvals[nf - 2], nf - 2))
self.fields.append(ExtensionField(self.fieldvals[nf - 1], nf - 1))
[docs]
def evaluate_fields(self):
"""Evaluate attributes of each field
Returns:
--------
List of result dictionaries for each field
"""
for f in self.fields:
f.evaluate()
# If the field evaluations succeeded, set a positive status
self.field_status = "pass"
return [f.get_scores() for f in self.fields]
[docs]
def evaluate_filename(self):
"""Evaluate attributes of the filename.
Note that the filename 'status' depends upon having evaluated the fields.
Returns:
--------
dict[str, Any]
Dictionary of file name attributes
"""
# The final verdict is determined as the worst of the individual field verdicts
field_verdicts = [f.field_verdict for f in self.fields]
if "FAIL" in field_verdicts:
final_verdict = "fail"
elif "NEEDS REVIEW" in field_verdicts:
final_verdict = "needs review"
else:
final_verdict = "pass"
# Additional last-minute checks based on the number of fields
if self.nFields > 9: # more than 9 fields
final_verdict = "fail"
elif self.nFields < 4: # less than 4 fields
final_verdict = "fail"
# Final result for this filename
attr = {
"path": self.path,
"filename": self.name,
"n_elements": self.nFields,
"final_verdict": final_verdict.upper(),
}
return attr