import json
from typing import List
from functools import reduce
from datetime import datetime
from hestia_earth.schema import NodeType, TermTermType, UNIQUENESS_FIELDS
from hestia_earth.utils.api import download_hestia
from hestia_earth.utils.lookup import download_lookup, get_table_value, column_name
from hestia_earth.utils.tools import list_average, safe_parse_date, non_empty_list
from hestia_earth.utils.model import filter_list_term_type
def _next_error(values: list): return next((x for x in values if x is not True), True)
def _filter_list_errors(values: list, return_single=True):
values = list(filter(lambda x: x is not True, values))
return True if return_single and len(values) == 0 else (values[0] if return_single and len(values) == 1 else values)
def _list_except_item(values: list, item):
try:
idx = values.index(item)
return values[:idx] + values[idx+1:]
except ValueError:
return values
[docs]def update_error_path(error: dict, key: str, index=None):
path = f".{key}[{index}]{error.get('dataPath')}" if index is not None else f".{key}{error.get('dataPath')}"
return {**error, **{'dataPath': path}}
def _safe_cast(val, to_type, default=None):
try:
return to_type(val)
except (ValueError, TypeError):
return default
[docs]def hash_dict(value: dict): return json.dumps(value, sort_keys=True)
def _is_number(value): return value is not None and (isinstance(value, float) or isinstance(value, int))
[docs]def is_same_dict(a: dict, b: dict): return hash_dict(a) == hash_dict(b)
def _dict_without_key(a: dict, key: str):
no_key = a.copy()
if key in no_key:
no_key.pop(key)
return no_key
def _group_nodes(nodes: List[dict]):
def group(groups: dict, node: dict):
type = node.get('type')
id = node.get('id')
if type and id:
groups[type] = groups.get(type, {})
groups[type][id] = node
return groups
return reduce(group, nodes, {})
def _hash_nodes(nodes: List[dict]):
def group(groups: dict, node: dict):
type = node.get('type')
id = node.get('id')
if type and id:
# store the hash of the node without the `id` for uniqueness check
key = hash_dict(_dict_without_key(node, 'id'))
groups[key] = groups.get(key, []) + [node]
return groups
return reduce(group, nodes, {})
def _get_by_key(x, y):
return x if x is None else (
x.get(y) if isinstance(x, dict) else list(map(lambda v: _get_dict_key(v, y), x))
)
def _get_dict_key(value: dict, key: str): return reduce(lambda x, y: _get_by_key(x, y), key.split('.'), value)
def _value_range_error(value: int, minimum: int, maximum: int):
return 'minimum' if minimum is not None and value < minimum else \
'maximum' if maximum is not None and value > maximum else False
def _list_sum(values: list, prop: str): return sum(map(lambda v: _safe_cast(v.get(prop, 0), float, 0.0), values))
def _list_sum_terms(values: list, term_ids=[]):
return sum([_value_average(node) for node in values if node.get('term', {}).get('@id') in term_ids])
def _compare_values(x, y):
return next((True for item in x if item in y), False) if isinstance(x, list) and isinstance(y, list) else x == y
def _same_properties(value: dict, props: List[str]):
def identical(test: dict):
same_values = list(filter(lambda x: _compare_values(_get_dict_key(value, x), _get_dict_key(test, x)), props))
return test if len(same_values) == len(props) else None
return identical
def _value_average(node: dict, default=0, key='value'):
try:
value = node.get(key)
return list_average(value, default) if isinstance(value, list) else (value or default)
except Exception:
return default
[docs]def term_id_prefix(term_id: str): return term_id.split('Kg')[0]
def _download_linked_node(node: dict):
data = download_hestia(node.get('@id'), node.get('@type')) if node.get('@id') and node.get('@type') else None
return data if (data or {}).get('@id') == node.get('@id') else None
def _find_linked_node(node_map: dict, node: dict):
return node_map.get(node.get('type'), {}).get(node.get('id')) or _download_linked_node(node)
def _is_before_today(date: str): return safe_parse_date(date).date() <= datetime.now().date()
def _node_year(node: dict):
date = node.get('endDate', node.get('startDate'))
date = safe_parse_date(date) if date else None
return date.year if date else None
[docs]def is_live_animal_cycle(cycle: dict):
return any([
len(filter_list_term_type(cycle.get('animals', []), [
TermTermType.LIVEANIMAL,
TermTermType.LIVEAQUATICSPECIES
])) > 0,
len(filter_list_term_type(cycle.get('products', []), [
TermTermType.LIVEANIMAL,
TermTermType.LIVEAQUATICSPECIES
])) > 0
])
def _match_list_el(source: list, dest: list, key: str):
src_values = non_empty_list([x.get(key) for x in source])
dest_values = non_empty_list([x.get(key) for x in dest])
return sorted(src_values) == sorted(dest_values)
def _match_el(source: dict, dest: dict, fields: list):
def match(key: str):
keys = key.split('.')
is_list = len(keys) == 2 and (
isinstance(source.get(keys[0]), list) or
isinstance(dest.get(keys[0]), list)
)
return _match_list_el(
_get_dict_key(source, keys[0]) or [],
_get_dict_key(dest, keys[0]) or [],
keys[1]
) if is_list else (_get_dict_key(dest, key) is None or _get_dict_key(source, key) == _get_dict_key(dest, key))
return all(map(match, fields))
[docs]def is_same_product(p1: dict, p2: dict):
keys = UNIQUENESS_FIELDS.get(NodeType.CYCLE.value).get('products', ['term.@id'])
return _match_el(p1, p2, keys)
[docs]def find_by_product(node: dict, product: dict, list_key: str = 'products'):
keys = UNIQUENESS_FIELDS.get(node.get('type', node.get('@type')), {}).get(list_key, ['term.@id'])
products = node.get(list_key, [])
return next((p for p in products if _match_el(p, product, keys)), None)
[docs]def get_term_lookup_value(term: dict, column: str):
table_name = f"{term.get('termType')}.csv" if term else None
value = get_table_value(
download_lookup(table_name), 'termid', term.get('@id'), column_name(column)
) if table_name else None
return value