event_lattice.py 3.51 KB
import math
from functools import cache
from itertools import accumulate
import operator


def uniform_op(x):
    n = len(list(x))
    return 0.0 if n == 0 else 1.0/n


def max_op(x):
    return max(x)


def min_op(x):
    return min(x)


def sum_op(x):
    return sum(x)


def prod_op(x):
    log_x = map(math.log, x)
    return math.exp(sum(log_x))


def co(x):
    if isinstance(x, float) or isinstance(x, int):
        return 1 - x
    elif isinstance(x, str):
        return x.swapcase()
    else:
        return x


def parse(d):
    """Structures a string as an Event and a dict as a base of stable models."""
    if isinstance(d, str):
        return frozenset(d)
    elif isinstance(d, dict):
        result = dict()
        for k, v in d.items():
            key = parse(k)
            result[key] = v
        return result
    else:
        return d


def is_consistent(event):
    return all(x.swapcase() not in event for x in event)


class EventsLattice:

    @staticmethod
    def close_literals(base_literals):
        base_lits = list(accumulate(base_literals, func=operator.or_))[-1]
        lits = set([])
        for x in base_lits:
            lits.add(x)
            lits.add(x.swapcase())
        return lits

    def __init__(self, smodels):
        """Create base for Events Lattice."""
        self._smodels = smodels
        self._literals = EventsLattice.close_literals(self._smodels.keys())

    def literals(self):
        return self._literals

    def stable_models(self):
        return list(map(set, self._smodels.keys()))

    @cache
    def lower_bound(self, event):
        return set(filter(lambda sm: sm <= event, self._smodels))

    @cache
    def upper_bound(self, event):
        return set(filter(lambda sm: event <= sm, self._smodels))

    def related(self, u, v):
        u_consistent = is_consistent(u)
        v_consistent = is_consistent(v)
        if u_consistent and (u_consistent == v_consistent):
            return \
                self.lower_bound(u) == self.lower_bound(v) and \
                self.upper_bound(u) == self.upper_bound(v)
        else:
            return u_consistent == v_consistent

    def factors(self, event):
        pass

    def propagated_value(self, event, lower_op=sum_op, upper_op=prod_op):
        value = 0.0

        lb = self.lower_bound(event)
        len_lb = len(lb)
        if len_lb > 1:
            value = lower_op(map(lambda sm: self._smodels[sm], lb))
        elif len_lb == 1:
            value = self._smodels[event]
        else:
            ub = self.upper_bound(event)
            len_ub = len(ub)
            if len_ub > 1:
                value = upper_op(map(lambda sm: self._smodels[sm], ub))
            elif len_ub == 1:
                value = self._smodels[event]

        return value


def zoom_event(event_str, lattice, lower_op=sum_op, upper_op=prod_op):
    event = parse(event_str)
    lower_bound = lattice.lower_bound(event)
    upper_bound = lattice.upper_bound(event)
    propagated = lattice.propagated_value(
        event, lower_op=lower_op, upper_op=upper_op)
    print(
        f"Event: {event}\n\tLB: {lower_bound}\n\tUB: {upper_bound}\n\tProp: {propagated}")


if __name__ == "__main__":

    smodels = parse({
        "A": 0.7,
        "ab": 2 * 3,
        "ac": 5 * 7
    })

    lattice = EventsLattice(smodels)

    print(
        f"Literals: {lattice.literals()}\nStable Models: {lattice.stable_models()}")

    zoom_event("abc", lattice, upper_op=min_op, lower_op=max_op)

    print(is_consistent(parse("aBacc")))
    print(is_consistent(parse("aBabc")))