from hestia_earth.schema import SiteSiteType, TermTermType
from hestia_earth.utils.model import filter_list_term_type, find_term_match
from hestia_earth.utils.tools import flatten, list_sum, non_empty_list
from hestia_earth.utils.lookup import download_lookup, get_table_value, column_name
from hestia_earth.validation.utils import _filter_list_errors, get_term_lookup_value
from hestia_earth.validation.terms import get_rice_terms
def _valid_list_sum(practices: list):
values = flatten([p.get('value', []) for p in practices])
values_number = [v for v in values if isinstance(v, int) or isinstance(v, float)]
return list_sum(values_number), len(values) == len(values_number)
[docs]def validate_defaultValue(data: dict, list_key: str = 'practices'):
def validate(values: tuple):
index, practice = values
term = practice.get('term', {})
has_value = len(practice.get('value', [])) > 0
default_value = get_term_lookup_value(term, 'defaultValue')
return has_value or default_value is None or {
'level': 'warning',
'dataPath': f".{list_key}[{index}]",
'message': 'should specify a value when Hestia has a default one',
'params': {
'term': term,
'expected': default_value
}
}
return _filter_list_errors(flatten(map(validate, enumerate(data.get(list_key, [])))))
[docs]def validate_longFallowPeriod(practices: list):
longFallowPeriod = find_term_match(practices, 'longFallowPeriod', None)
longFallowPeriod_index = practices.index(longFallowPeriod) if longFallowPeriod else 0
value = list_sum(longFallowPeriod.get('value', [0])) if longFallowPeriod else 0
rotationDuration = list_sum(find_term_match(practices, 'rotationDuration').get('value', 0))
return value == 0 or ((rotationDuration - value) / value) < 5 or {
'level': 'error',
'dataPath': f".practices[{longFallowPeriod_index}].value",
'message': 'longFallowPeriod must be lower than 5 years'
}
[docs]def validate_cropResidueManagement(practices: list):
practices = filter_list_term_type(practices, TermTermType.CROPRESIDUEMANAGEMENT)
total_value, valid_sum = _valid_list_sum(practices)
return total_value <= 100.5 or {
'level': 'error',
'dataPath': '.practices',
'message': 'value should sum to 100 or less across crop residue management practices',
'params': {
'sum': total_value
}
}
def _is_waterRegime_sum(practice: dict):
term = practice.get('term')
lookup = download_lookup(f"{term.get('termType')}.csv")
value = get_table_value(lookup, 'termid', term.get('@id'), column_name('percentAreaSumIs100'))
return not (not value)
[docs]def validate_waterRegime(practices: list):
practices = [
p for p in filter_list_term_type(practices, TermTermType.WATERREGIME) if _is_waterRegime_sum(p)
]
total_value, valid_sum = _valid_list_sum(practices)
return total_value <= 100.5 or {
'level': 'error',
'dataPath': '.practices',
'message': 'value should sum to 100 or less across water regime practices',
'params': {
'sum': total_value
}
}
[docs]def validate_waterRegime_rice_products(cycle: dict, list_key: str = 'practices'):
all_rice_product_ids = get_rice_terms()
products = cycle.get('products', [])
rice_products = non_empty_list([
find_term_match(products, term_id, None) for term_id in all_rice_product_ids
])
lookup = download_lookup('waterRegime.csv')
def validate(values: tuple):
index, practice = values
term = practice.get('term', {})
term_id = term.get('@id')
allowed_product_ids = get_table_value(lookup, 'termid', term_id, column_name('allowedRiceTermId')).split(';')
not_allowed = list(filter(lambda p: p.get('term').get('@id') not in allowed_product_ids, rice_products))
return len(not_allowed) == 0 or {
'level': 'error',
'dataPath': f".{list_key}[{index}].term",
'message': 'rice products not allowed for this water regime practice',
'params': {
'term': term,
'products': non_empty_list(map(lambda p: p.get('term'), not_allowed))
}
}
return _filter_list_errors(flatten(map(validate, enumerate(cycle.get(list_key, [])))))
[docs]def validate_excretaManagement(node: dict, practices: list):
has_input = len(filter_list_term_type(node.get('inputs', []), TermTermType.EXCRETA)) > 0
has_practice = len(filter_list_term_type(practices, TermTermType.EXCRETAMANAGEMENT)) > 0
return not has_practice or has_input or {
'level': 'error',
'dataPath': '.practices',
'message': 'an excreta input is required when using an excretaManagement practice'
}
NO_TILLAGE_ID = 'noTillage'
def _practice_is_tillage(practice: dict):
term_id = practice.get('term', {}).get('@id')
term_type = practice.get('term', {}).get('termType')
return True if term_type == TermTermType.OPERATION.value and get_table_value(
download_lookup('operation.csv'), 'termid', term_id, column_name('isTillage')) else False
[docs]def validate_no_tillage(practices: list):
no_tillage = find_term_match(practices, NO_TILLAGE_ID, None)
no_value = list_sum(no_tillage.get('value', [100]), 100) if no_tillage else 0
return _filter_list_errors([{
'level': 'error',
'dataPath': f".practices[{index}]",
'message': f"is not allowed in combination with {NO_TILLAGE_ID}"
} for index, p in enumerate(practices) if _practice_is_tillage(p)] if no_value == 100 else [])
_TILLAGE_SITE_TYPES = [
SiteSiteType.CROPLAND.value
]
def _unique_practice(practice: dict):
term_id = practice.get('term', {}).get('@id')
term_type = practice.get('term', {}).get('termType')
return True if term_type == TermTermType.TILLAGE.value and get_table_value(
download_lookup('tillage.csv'), 'termid', term_id, column_name('unique')) else False
[docs]def validate_tillage_site_type(practices: list, site: dict):
has_tillage = len(filter_list_term_type(practices, TermTermType.TILLAGE)) > 0
site_type = site.get('siteType')
return site_type not in _TILLAGE_SITE_TYPES or has_tillage or {
'level': 'warning',
'dataPath': '.practices',
'message': 'should contain a tillage practice'
}
def _validate_tillage_with_values(practices: list):
total_value, valid_sum = _valid_list_sum(practices)
return 99.5 <= total_value <= 100.5 or {
'level': 'error',
'dataPath': '.practices',
'message': 'sum not equal to 100% for tillage practices',
'params': {
'sum': total_value
}
}
def _validate_tillage_no_values(practices: list):
return len(practices) <= 1 or {
'level': 'error',
'dataPath': '.practices',
'message': 'can only have 1 tillage practice without a value',
'params': {
'current': non_empty_list(map(lambda p: p.get('term', {}), practices))
}
}
[docs]def validate_tillage_values(practices: list):
practices = list(filter(_unique_practice, practices))
with_value = list(filter(lambda p: len(p.get('value', [])) > 0, practices))
no_value = list(filter(lambda p: len(p.get('value', [])) == 0, practices))
return {
'level': 'error',
'dataPath': '.practices',
'message': 'must set value for every tillage practice'
} if (len(no_value) > 0 and len(with_value) > 0) else (
_validate_tillage_with_values(practices) if len(with_value) > 0 else _validate_tillage_no_values(practices)
)
[docs]def validate_liveAnimal_system(data: dict):
has_liveAnimal = len(filter_list_term_type(data.get('products', []), TermTermType.LIVEANIMAL)) > 0
has_system = len(filter_list_term_type(data.get('practices', []), TermTermType.SYSTEM)) > 0
return not has_liveAnimal or has_system or {
'level': 'warning',
'dataPath': '.practices',
'message': 'should add an animal production system'
}
PASTURE_GRASS_TERM_ID = 'pastureGrass'
[docs]def validate_pastureGrass_key_units(data: dict, list_key: str = 'practices'):
validate_key_units = 'ha'
def validate(values: tuple):
index, practice = values
term_id = practice.get('term', {}).get('@id')
key_units = practice.get('key', {}).get('units')
return term_id != PASTURE_GRASS_TERM_ID or not key_units or key_units == validate_key_units or {
'level': 'error',
'dataPath': f".{list_key}[{index}].key",
'message': f"{PASTURE_GRASS_TERM_ID} key units must be '{validate_key_units}'",
'params': {
'value': key_units,
'expected': validate_key_units,
'term': practice.get('key', {})
}
}
return _filter_list_errors(flatten(map(validate, enumerate(data.get(list_key, [])))))
[docs]def validate_pastureGrass_key_value(data: dict, list_key: str = 'practices'):
practices = [p for p in data.get(list_key, []) if p.get('term', {}).get('@id') == PASTURE_GRASS_TERM_ID]
total_value, valid_sum = _valid_list_sum(practices)
return {
'level': 'error',
'dataPath': f".{list_key}",
'message': 'all values must be numbers'
} if not valid_sum else len(practices) == 0 or total_value == 100 or {
'level': 'error',
'dataPath': f".{list_key}",
'message': f"the sum of all {PASTURE_GRASS_TERM_ID} values must be 100",
'params': {
'expected': 100,
'current': total_value
}
}
[docs]def validate_has_pastureGrass(data: dict, site: dict, list_key: str = 'practices'):
site_type = site.get('siteType')
has_practice = find_term_match(data.get(list_key, []), PASTURE_GRASS_TERM_ID, None) is not None
return site_type not in [
SiteSiteType.PERMANENT_PASTURE.value
] or has_practice or {
'level': 'warning',
'dataPath': f".{list_key}",
'message': f"should add the term {PASTURE_GRASS_TERM_ID}"
}