]> rtime.felk.cvut.cz Git - hubacji1/coffee-getter.git/blobdiff - coffee_getter/db.py
Merge branch 'refactor'
[hubacji1/coffee-getter.git] / coffee_getter / db.py
diff --git a/coffee_getter/db.py b/coffee_getter/db.py
new file mode 100644 (file)
index 0000000..8959d78
--- /dev/null
@@ -0,0 +1,152 @@
+from sqlite3 import connect
+
+
+def Q(q, b="", t=["now", "-7 days"]):
+    """Return db query identified by `q`.
+
+    :param q: Identifier of query.
+    :param b: Optionally, specify the beverage(s).
+    :param t: Optionally, specify the time range.
+    """
+    assert isinstance(t, tuple) or isinstance(t, list)
+    assert len(t) == 2
+    if t[0] == "now":
+        dtf = f"datetime('now', 'localtime', '{t[1]}')"
+        dtt = "datetime('now', 'localtime')"
+    elif t[1] == "now":
+        dtf = f"datetime('now', 'localtime', '{t[0]}')"
+        dtt = "datetime('now', 'localtime')"
+    else:
+        dtf = f"datetime('{t[0]}', 'localtime')"
+        dtt = f"datetime('{t[1]}', 'localtime')"
+    if q == "get_drinks":
+        return f"""
+
+        SELECT count(*), flavor
+        FROM coffees
+        WHERE time BETWEEN
+            {dtf}
+            AND {dtt}
+        GROUP BY flavor
+
+        """
+    elif q == "get_drinkers_of":
+        if isinstance(b, tuple) or isinstance(b, list):
+            assert len(b) > 0
+            f = f"WHERE flavor = '{b[0]}'"
+            for i in b[1:]:
+                f += f" OR flavor = '{i}'"
+        else:
+            assert b != ""
+            f = f"WHERE flavor = '{b}'"
+        return f"""
+
+        SELECT count(*), users.name FROM coffees
+        LEFT JOIN identifiers on coffees.id = identifiers.userid
+        LEFT JOIN users on identifiers.userid = users.id
+        {f}
+        AND coffees.time BETWEEN
+            {dtf}
+            AND {dtt}
+        GROUP BY identifiers.userid
+
+        """
+
+
+class Db:
+    def __init__(self, db_path):
+        self.con = connect(db_path)
+        self.cur = self.con.cursor()
+
+    def __del__(self):
+        if self.con:
+            self.con.close()
+
+    def get_top_drinks(self):
+        q = Q("get_drinks")
+        top = []
+        for (cnt, dn) in self.cur.execute(q):
+            top.append((dn, cnt))
+        top.sort(key=lambda x: (x[1], x[0]), reverse=True)
+        return tuple(top)
+
+    def get_top_mate_drinkers(self):
+        """Return list of pairs of name, count for Mate drinkers."""
+        users = {}
+        que = Q("get_drinkers_of", "Club-Mate 0,5 l")
+        for (cnt, un) in self.cur.execute(que):
+            users[un] = cnt * 0.5
+        que = Q("get_drinkers_of", "Club-Mate 0,33 l")
+        for (cnt, un) in self.cur.execute(que):
+            if un in users:
+                users[un] += cnt * 0.33
+            else:
+                users[un] = cnt * 0.33
+        top = []
+        for (un, cnt) in users.items():
+            top.append((un, cnt))
+        top.sort(key=lambda x: (x[1], x[0]), reverse=True)
+        return tuple(top)
+
+    def get_top_tea_drinkers(self):
+        """Return list of pairs of name, count for tea drinkers."""
+        q = Q("get_drinkers_of", "tea")
+        top = []
+        for (cnt, un) in self.cur.execute(q):
+            top.append((un, cnt))
+        top.sort(key=lambda x: (x[1], x[0]), reverse=True)
+        return tuple(top)
+
+    def get_drunk_sum(self, *args, **kwargs):
+        """Return list of drunken ``flavor`` from ``dtf`` to ``dtt``.
+
+        Keyword arguments:
+        flavor -- The flavor of beverage.
+        dtf -- Date and time *from*.
+        dtt -- Date and time *to*.
+        """
+        assert ((len(args) == 3 and len(kwargs) == 0)
+                or (len(args) == 0 and len(kwargs) == 3))
+        if args:
+            flavor = args[0]
+            dtf = args[1]
+            dtt = args[2]
+        elif kwargs:
+            flavor = kwargs["flavor"]
+            dtf = kwargs["dtf"]
+            dtt = kwargs["dtt"]
+        que = Q("get_drinkers_of", flavor.split(";"), (dtf, dtt))
+        drunk = []
+        for (cnt, un) in self.cur.execute(que):
+            drunk.append((un, cnt))
+        return tuple(drunk)
+
+    def get_drunk_list(self, *args, **kwargs):
+        """Return dict of lists of drunken ``flavor`` from ``dtf`` to ``dtt``.
+
+        Keyword arguments:
+        flavor -- The flavor of beverage.
+        dtf -- Date and time *from*.
+        dtt -- Date and time *to*.
+        """
+        assert ((len(args) == 3 and len(kwargs) == 0)
+                or (len(args) == 0 and len(kwargs) == 3))
+        if args:
+            flavor = args[0]
+            dtf = args[1]
+            dtt = args[2]
+        elif kwargs:
+            flavor = kwargs["flavor"]
+            dtf = kwargs["dtf"]
+            dtt = kwargs["dtt"]
+        flavors = flavor.split(";")
+        drunk = {}
+        i = 0
+        for f in flavors:
+            que = Q("get_drinkers_of", f, (dtf, dtt))
+            for (cnt, un) in self.cur.execute(que):
+                if un not in drunk:
+                    drunk[un] = [0 for j in range(i)]
+                drunk[un].append(cnt)
+            i += 1
+        return drunk