--- /dev/null
+from sqlite3 import connect
+
+
+def Q(q, b="", t=["now", "-7 days"]):
+ """Return db query identified by `q`.
+
+ :param q: Identifier of query.
+ :param b: Optionally, specify the beverage(s).
+ :param t: Optionally, specify the time range.
+ """
+ assert isinstance(t, tuple) or isinstance(t, list)
+ assert len(t) == 2
+ if t[0] == "now":
+ dtf = f"datetime('now', 'localtime', '{t[1]}')"
+ dtt = "datetime('now', 'localtime')"
+ elif t[1] == "now":
+ dtf = f"datetime('now', 'localtime', '{t[0]}')"
+ dtt = "datetime('now', 'localtime')"
+ else:
+ dtf = f"datetime('{t[0]}', 'localtime')"
+ dtt = f"datetime('{t[1]}', 'localtime')"
+ if q == "get_drinks":
+ return f"""
+
+ SELECT count(*), flavor
+ FROM coffees
+ WHERE time BETWEEN
+ {dtf}
+ AND {dtt}
+ GROUP BY flavor
+
+ """
+ elif q == "get_drinkers_of":
+ if isinstance(b, tuple) or isinstance(b, list):
+ assert len(b) > 0
+ f = f"WHERE flavor = '{b[0]}'"
+ for i in b[1:]:
+ f += f" OR flavor = '{i}'"
+ else:
+ assert b != ""
+ f = f"WHERE flavor = '{b}'"
+ return f"""
+
+ SELECT count(*), users.name FROM coffees
+ LEFT JOIN identifiers on coffees.id = identifiers.userid
+ LEFT JOIN users on identifiers.userid = users.id
+ {f}
+ AND coffees.time BETWEEN
+ {dtf}
+ AND {dtt}
+ GROUP BY identifiers.userid
+
+ """
+
+
+class Db:
+ def __init__(self, db_path):
+ self.con = connect(db_path)
+ self.cur = self.con.cursor()
+
+ def __del__(self):
+ if self.con:
+ self.con.close()
+
+ def get_top_drinks(self):
+ q = Q("get_drinks")
+ top = []
+ for (cnt, dn) in self.cur.execute(q):
+ top.append((dn, cnt))
+ top.sort(key=lambda x: (x[1], x[0]), reverse=True)
+ return tuple(top)
+
+ def get_top_mate_drinkers(self):
+ """Return list of pairs of name, count for Mate drinkers."""
+ users = {}
+ que = Q("get_drinkers_of", "Club-Mate 0,5 l")
+ for (cnt, un) in self.cur.execute(que):
+ users[un] = cnt * 0.5
+ que = Q("get_drinkers_of", "Club-Mate 0,33 l")
+ for (cnt, un) in self.cur.execute(que):
+ if un in users:
+ users[un] += cnt * 0.33
+ else:
+ users[un] = cnt * 0.33
+ top = []
+ for (un, cnt) in users.items():
+ top.append((un, cnt))
+ top.sort(key=lambda x: (x[1], x[0]), reverse=True)
+ return tuple(top)
+
+ def get_top_tea_drinkers(self):
+ """Return list of pairs of name, count for tea drinkers."""
+ q = Q("get_drinkers_of", "tea")
+ top = []
+ for (cnt, un) in self.cur.execute(q):
+ top.append((un, cnt))
+ top.sort(key=lambda x: (x[1], x[0]), reverse=True)
+ return tuple(top)
+
+ def get_drunk_sum(self, *args, **kwargs):
+ """Return list of drunken ``flavor`` from ``dtf`` to ``dtt``.
+
+ Keyword arguments:
+ flavor -- The flavor of beverage.
+ dtf -- Date and time *from*.
+ dtt -- Date and time *to*.
+ """
+ assert ((len(args) == 3 and len(kwargs) == 0)
+ or (len(args) == 0 and len(kwargs) == 3))
+ if args:
+ flavor = args[0]
+ dtf = args[1]
+ dtt = args[2]
+ elif kwargs:
+ flavor = kwargs["flavor"]
+ dtf = kwargs["dtf"]
+ dtt = kwargs["dtt"]
+ que = Q("get_drinkers_of", flavor.split(";"), (dtf, dtt))
+ drunk = []
+ for (cnt, un) in self.cur.execute(que):
+ drunk.append((un, cnt))
+ return tuple(drunk)
+
+ def get_drunk_list(self, *args, **kwargs):
+ """Return dict of lists of drunken ``flavor`` from ``dtf`` to ``dtt``.
+
+ Keyword arguments:
+ flavor -- The flavor of beverage.
+ dtf -- Date and time *from*.
+ dtt -- Date and time *to*.
+ """
+ assert ((len(args) == 3 and len(kwargs) == 0)
+ or (len(args) == 0 and len(kwargs) == 3))
+ if args:
+ flavor = args[0]
+ dtf = args[1]
+ dtt = args[2]
+ elif kwargs:
+ flavor = kwargs["flavor"]
+ dtf = kwargs["dtf"]
+ dtt = kwargs["dtt"]
+ flavors = flavor.split(";")
+ drunk = {}
+ i = 0
+ for f in flavors:
+ que = Q("get_drinkers_of", f, (dtf, dtt))
+ for (cnt, un) in self.cur.execute(que):
+ if un not in drunk:
+ drunk[un] = [0 for j in range(i)]
+ drunk[un].append(cnt)
+ i += 1
+ return drunk