Source code for hestia_earth.validation.validators.measurement

from functools import reduce
from hestia_earth.schema import TermTermType
from hestia_earth.utils.lookup import download_lookup, get_table_value, column_name
from hestia_earth.utils.tools import list_sum, non_empty_list, safe_parse_float, flatten

from hestia_earth.validation.utils import _filter_list_errors, _value_average, _node_year
from hestia_earth.validation.models import is_enabled as models_is_enabled, value_from_model
from .shared import need_validate_coordinates, value_difference


SOIL_TEXTURE_IDS = ['sandContent', 'siltContent', 'clayContent']


def _precipitationAnnual(site: dict, measurement: dict):
    from hestia_earth.models.geospatialDatabase.precipitationAnnual import _run
    year = _node_year(measurement)
    result = _run(site, year) if year and need_validate_coordinates(site) else None
    return result


MEASUREMENTS_MODELS = {
    'precipitationAnnual': [
        'geospatialDatabase',
        _precipitationAnnual
    ]
}


def _group_measurement_key(measurement: dict):
    keys = non_empty_list([
        str(measurement.get('depthUpper', '')),
        str(measurement.get('depthLower', '')),
        measurement.get('startDate'),
        measurement.get('endDate')
    ])
    return '-'.join(keys) if len(keys) > 0 else 'default'


def _group_measurements_depth(measurements: list):
    def group_by(group: dict, values: tuple):
        index, measurement = values
        key = _group_measurement_key(measurement)
        if key not in group:
            group[key] = []
        group[key].extend([{'index': index, 'measurement': measurement}])
        return group

    return reduce(group_by, measurements, {})


def _validate_soilTexture_percent(lookup):
    soil_texture_ids = list(lookup.termid)

    def validate_single(measurements: list, texture: dict, measurement_id: str):
        term_id = texture['measurement'].get('term', {}).get('@id')
        min = safe_parse_float(get_table_value(lookup, 'termid', term_id, column_name(f"{measurement_id}min")), 0)
        max = safe_parse_float(get_table_value(lookup, 'termid', term_id, column_name(f"{measurement_id}max")), 100)
        # set default value to min so if no value then passes validation
        measurement = next(
            (v for v in measurements if v['measurement'].get('term', {}).get('@id') == measurement_id), {})
        texture_value = _value_average(measurement.get('measurement'), min)
        return min <= texture_value <= max or {
            'level': 'error',
            'dataPath': f".measurements[{measurement['index']}].value",
            'message': 'is outside the allowed range',
            'params': {
                'term': measurement['measurement'].get('term', {}),
                'range': {'min': min, 'max': max}
            }
        }

    def validate(values: list):
        texture_ids = list(filter(lambda v: v['measurement'].get('term', {}).get('@id') in soil_texture_ids, values))
        return len(texture_ids) == 0 or flatten(map(
            lambda texture: list(map(lambda id: validate_single(values, texture, id), SOIL_TEXTURE_IDS)),
            texture_ids
        ))

    return validate


def _validate_soiltTexture_sum(values: list):
    measurements = list(filter(lambda v: v['measurement'].get('term', {}).get('@id') in SOIL_TEXTURE_IDS, values))
    measurements = list(filter(lambda v: 'value' in v['measurement'], measurements))
    terms = list(map(lambda v: v['measurement'].get('term', {}).get('@id'), measurements))
    sum_values = sum(map(lambda v: _value_average(v['measurement']), measurements))
    return len(set(terms)) != len(SOIL_TEXTURE_IDS) or 99.5 < sum_values < 100.5 or [{
        'level': 'error',
        'dataPath': f".measurements[{m['index']}]",
        'message': f"sum not equal to 100% for {', '.join(SOIL_TEXTURE_IDS)}"
    } for m in measurements]


[docs]def validate_soilTexture(measurements: list): soilTexture = download_lookup('soilTexture.csv') groupped_values = _group_measurements_depth(enumerate(measurements)).values() return _filter_list_errors( list(map(_validate_soiltTexture_sum, groupped_values)) + flatten(map(_validate_soilTexture_percent(soilTexture), groupped_values)) )
[docs]def validate_depths(measurements: list): def validate(values: tuple): index, measurement = values depthUpper = measurement.get('depthUpper') depthLower = measurement.get('depthLower') return any([depthUpper is None, depthLower is None]) or depthUpper <= depthLower or { 'level': 'error', 'dataPath': f".measurements[{index}].depthLower", 'message': 'must be greater than or equal to depthUpper' } return _filter_list_errors(map(validate, enumerate(measurements)))
[docs]def validate_required_depths(site: dict, list_key: str): values = site.get(list_key, []) required_has_depths = any(filter( lambda v: v.get('term', {}).get('@id') in SOIL_TEXTURE_IDS and ( v.get('depthUpper') or v.get('depthLower') ), values )) def validate(values: tuple): index, measurement = values term = measurement.get('term', {}) term_id = term.get('@id') term_type = term.get('termType') lookup = download_lookup(f"{term_type}.csv") with_depth = get_table_value(lookup, 'termid', term_id, column_name('recommendAddingDepth')) has_depths = measurement.get('depthUpper') is not None and measurement.get('depthLower') is not None level = 'error' if all([required_has_depths, term_type == TermTermType.SOILTEXTURE.value]) else 'warning' return not with_depth or with_depth == '' or has_depths or { 'level': level, 'dataPath': f".{list_key}[{index}]", 'message': f"{'must' if level == 'error' else 'should'} set both depthUpper and depthLower" } return _filter_list_errors(map(validate, enumerate(values)))
[docs]def validate_term_unique(measurements: list): lookup = download_lookup('measurement.csv') def count_same_term(term_id: str): return len(list(filter(lambda x: x.get('term', {}).get('@id') == term_id, measurements))) def validate(values: tuple): index, measurement = values term_id = measurement.get('term', {}).get('@id') unique = get_table_value(lookup, 'termid', term_id, 'onemeasurementpersite') unique = False if unique is None or unique == '-' else bool(unique) return not unique or count_same_term(term_id) == 1 or { 'level': 'error', 'dataPath': f".measurements[{index}].term.name", 'message': 'must be unique' } return _filter_list_errors(map(validate, enumerate(measurements)))
[docs]def validate_require_startDate_endDate(site: dict, list_key: str): lookup = download_lookup('measurement.csv') site_start_date = site.get('startDate') site_end_date = site.get('endDate') def validate(values: tuple): index, measurement = values term_id = measurement.get('term', {}).get('@id') start_date = measurement.get('startDate') end_date = measurement.get('endDate') required = get_table_value(lookup, 'termid', term_id, column_name('needStartDateEndDate')) return any([ not required, start_date is not None and end_date is not None, site_start_date is not None and start_date == site_start_date, site_end_date is not None and end_date == site_end_date ]) or list(map(lambda k: { 'level': 'error', 'dataPath': f".{list_key}[{index}]", 'message': f"should have required property '{k}'", 'params': { 'missingProperty': k } }, ['startDate', 'endDate'])) return _filter_list_errors(flatten(map(validate, enumerate(site.get(list_key, [])))))
[docs]def validate_with_models(site: dict, list_key: str): threshold = 0.25 def validate(values: tuple): index, blank_node = values term_id = blank_node.get('term', {}).get('@id') value = blank_node.get('value', [0]) value = list_sum(value, value) model, model_run = MEASUREMENTS_MODELS.get(term_id, [None, None]) expected_node = model_run(site, blank_node) if model else {} expected_value = value_from_model(expected_node) if expected_node else value delta = value_difference(value, expected_value) if expected_value else 0 data_path = '' if blank_node.get('value') is None else '.value' return delta < threshold or { 'level': 'warning', 'dataPath': f".{list_key}[{index}]{data_path}", 'message': 'the measurement provided might be in error', 'params': { 'term': blank_node.get('term', {}), 'model': expected_node.get('methodModel', {}), 'current': value, 'expected': expected_value, 'delta': delta * 100, 'threshold': threshold } } nodes = site.get(list_key, []) if models_is_enabled() else [] return _filter_list_errors(flatten(map(validate, enumerate(nodes))))
[docs]def validate_value_length(site: dict, list_key: str): def validate(values: tuple): index, blank_node = values term_id = blank_node.get('term', {}).get('@id') lookup = download_lookup('measurement.csv') array_type = get_table_value(lookup, 'termid', term_id, column_name('arrayTreatment')) value_length = len(blank_node.get('value', [])) return array_type != 'arrayNotAllowed' or value_length <= 1 or { 'level': 'error', 'dataPath': f".measurements[{index}].value", 'message': 'must not contain more than 1 value' } return _filter_list_errors(flatten(map(validate, enumerate(site.get(list_key, [])))))