Source code for hestia_earth.validation.validators

import os
from hestia_earth.schema import SchemaType
from hestia_earth.utils.tools import flatten

from ..log import logger
from ..utils import update_error_path
from .shared import validate_empty_fields, validate_nodes_duplicates
from .cycle import validate_cycle, validate_cycle_aggregated
from .impact_assessment import validate_impact_assessment, validate_impact_assessment_aggregated
from .organisation import validate_organisation
from .site import validate_site
from .source import validate_source

# disable validation based on `@type`
VALIDATE_EXISTING_NODES = os.getenv('VALIDATE_EXISTING_NODES', 'false') == 'true'
VALIDATE_TYPE = {
    SchemaType.CYCLE.value: lambda n, nodes: validate_cycle(n, nodes),
    SchemaType.IMPACTASSESSMENT.value: lambda n, nodes: validate_impact_assessment(n, nodes),
    SchemaType.ORGANISATION.value: lambda n, nodes: validate_organisation(n, nodes),
    SchemaType.SITE.value: lambda n, nodes: validate_site(n, nodes),
    SchemaType.SOURCE.value: lambda n, nodes: validate_source(n, nodes)
}
VALIDATE_TYPE_AGGREGATED = {
    SchemaType.CYCLE.value: lambda n, nodes: validate_cycle_aggregated(n, nodes),
    SchemaType.IMPACTASSESSMENT.value: lambda n, nodes: validate_impact_assessment_aggregated(n, nodes)
}
SKIP_VALIDATE_DUPLICATES = [
    SchemaType.ACTOR.value,
    SchemaType.IMPACTASSESSMENT.value,
    SchemaType.TERM.value
]


def _validate_node_type(node_by_type: dict, node_by_hash: dict, ntype: str, node: dict):
    is_aggregated = node.get('aggregated', False) is True
    should_run = all([
        ntype in VALIDATE_TYPE_AGGREGATED if is_aggregated else ntype in VALIDATE_TYPE,
        len(node.keys()) > 2
    ])
    if should_run:
        logger.debug('Run validation on: type=%s, id=%s', ntype, node.get('id', node.get('@id')))
    validator = VALIDATE_TYPE_AGGREGATED.get(ntype) if is_aggregated else VALIDATE_TYPE.get(ntype)
    validations = validator(node, node_by_type) if should_run else []
    return validations + validate_empty_fields(node) + (
        [] if ntype in SKIP_VALIDATE_DUPLICATES else validate_nodes_duplicates(node, node_by_hash)
    )


def _validate_node_children(node_by_type: dict, node_by_hash: dict, node: dict):
    validations = []
    for key, value in node.items():
        if isinstance(value, list):
            validations.extend([
                _validate_node_child(node_by_type, node_by_hash, key, value, index) for index, value in enumerate(value)
            ])
        if isinstance(value, dict):
            validations.append(_validate_node_child(node_by_type, node_by_hash, key, value))
    return flatten(validations)


def _validate_node_child(node_by_type: dict, node_by_hash: dict, key: str, value: dict, index=None):
    values = validate_node(node_by_type, node_by_hash)(value)
    return list(map(lambda error: update_error_path(error, key, index) if isinstance(error, dict) else error, values))


[docs]def validate_node(node_by_type: dict = {}, node_by_hash: dict = {}): def validate(node: dict): """ Validates a single Node. Parameters ---------- node : dict The JSON-Node to validate. Returns ------- List The list of errors/warnings for the node, which can be empty if no errors/warnings detected. """ try: ntype = node.get( 'type', node.get('@type') if VALIDATE_EXISTING_NODES else None ) if isinstance(node, dict) else None return [] if ntype is None else list(filter(lambda v: v is not True, flatten( _validate_node_type(node_by_type, node_by_hash, ntype, node) + _validate_node_children(node_by_type, node_by_hash, node) ))) except Exception as e: logger.error(f"Error validating {ntype} with id '{node.get('id', node.get('@id'))}': {str(e)}") raise e return validate