-# -*- coding: utf-8 -*-
-"""Test database access."""
-from os import path, remove
-from sqlite3 import connect
+from datetime import datetime, timedelta
from unittest import TestCase
-from cbconf import Conf
-from cbdb import Db
+from sqlite3 import connect
+from os import remove
+from coffee_getter.db import Db, Q
+import coffee_getter.conf as conf
+
+
+class Queries(TestCase):
+ def test_get_drinks(self):
+ assert Q("get_drinks") == """
+
+ SELECT count(*), flavor
+ FROM coffees
+ WHERE (coffees.time BETWEEN
+ datetime('now', 'localtime', '-7 days')
+ AND datetime('now', 'localtime'))
+ GROUP BY flavor
+
+ """
+
+ def test_get_drinkers(self):
+ assert Q("get_drinkers_of", "FOO") == """
+
+ SELECT count(*), users.name FROM coffees
+ INNER JOIN identifiers on coffees.id = identifiers.id
+ INNER JOIN users on identifiers.userid = users.id
+ WHERE flavor = 'FOO'
+ AND (coffees.time BETWEEN
+ datetime('now', 'localtime', '-7 days')
+ AND datetime('now', 'localtime'))
+ GROUP BY users.id
+
+ """
+ assert Q("get_drinkers_of", "BAR", ["FROM", "TO"]) == """
+
+ SELECT count(*), users.name FROM coffees
+ INNER JOIN identifiers on coffees.id = identifiers.id
+ INNER JOIN users on identifiers.userid = users.id
+ WHERE flavor = 'BAR'
+ AND (coffees.time BETWEEN
+ datetime('FROM', 'localtime')
+ AND datetime('TO', 'localtime'))
+ GROUP BY users.id
+
+ """
+ assert Q("get_drinkers_of", ["FOO", "BAR", "B"], ["FROM", "TO"]) == """
+
+ SELECT count(*), users.name FROM coffees
+ INNER JOIN identifiers on coffees.id = identifiers.id
+ INNER JOIN users on identifiers.userid = users.id
+ WHERE (flavor = 'FOO' OR flavor = 'BAR' OR flavor = 'B')
+ AND (coffees.time BETWEEN
+ datetime('FROM', 'localtime')
+ AND datetime('TO', 'localtime'))
+ GROUP BY users.id
+
+ """
+
+ def test_assert_error_raised(self):
+ self.assertRaises(AssertionError, Q, "get_drinkers_of")
+ self.assertRaises(AssertionError, Q, "get_drinkers_of", "")
+ self.assertRaises(AssertionError, Q, "get_drinkers_of", [])
+ self.assertRaises(AssertionError, Q, "get_drinkers_of", "foo", "")
+ self.assertRaises(AssertionError, Q, "get_drinkers_of", "foo", "bar")
+ self.assertRaises(AssertionError, Q, "get_drinkers_of", "foo", [])
+
+
+TESTDB_PATH = ".tmptest.db"
-TESTDB_FILENAME = ".tmptest.db"
def create_test_db(fn):
con = connect(fn)
pragma foreign_keys = ON;
create table if not exists users (
-id varchar(24) primary key not null,
-name varchar(255) default "human"
+ id varchar(24) primary key not null,
+ name varchar(255) default "human"
);
create table if not exists flavors (
-name varchar(255) primary key not null,
-ord integer not null default 999
+ name varchar(255) primary key not null,
+ ord integer not null default 999,
+ type varchar(24) not null default ""
);
insert or ignore into flavors values
-("espresso", 2),
-("espresso lungo", 3),
-("cappuccino", 1),
-("latte macchiato", 4),
-("Club-Mate 0,5 l", 5),
-("Club-Mate 0,33 l", 6)
+ ("espresso", 2, "coffee"),
+ ("espresso lungo", 3, "coffee"),
+ ("cappuccino", 1, "coffee"),
+ ("latte macchiato", 4, "coffee"),
+ ("Club-Mate 0,5 l", 5, "Club-Mate"),
+ ("Club-Mate 0,33 l", 6, "Club-Mate"),
+ ("tea", 7, "tea")
;
create table if not exists coffees (
-num integer primary key,
-id varchar(24) references users(id), -- id may be unknown
-flavor varchar(255) not null references flavors(name),
-time datetime default current_timestamp,
-UNIQUE (id, flavor, time)
+ num integer primary key,
+ id varchar(24) references users(id), -- id may be unknown
+ flavor varchar(255) not null references flavors(name),
+ time datetime default current_timestamp,
+ UNIQUE (id, flavor, time)
);
create table if not exists days (
-num integer primary key not null
+ num integer primary key not null
);
insert or ignore into days values
-(0),(1),(2),(3),(4),(5),(6)
+ (0),(1),(2),(3),(4),(5),(6)
;
+
+CREATE TABLE if not exists identifiers (
+ `userid` varchar ( 24 ) NOT NULL,
+ `id` varchar ( 24 ) PRIMARY KEY NOT NULL,
+ `name` varchar ( 24 ),
+ `active` INTEGER NOT NULL DEFAULT 1,
+ FOREIGN KEY(`userid`) REFERENCES `users`(`id`)
+);
""")
cur.executescript("""
-INSERT INTO users VALUES('1111','tester');
-INSERT INTO users VALUES('2222','newer');
-INSERT INTO coffees (id, flavor, time) VALUES
+INSERT OR IGNORE INTO users VALUES('1111','tester');
+insert into identifiers (userid, id, name) values ('1111', '1111', 'chip 10');
+INSERT OR IGNORE INTO users VALUES('2222','newer');
+insert into identifiers (userid, id, name) values ('2222', '2222', 'chip 20');
+INSERT OR IGNORE INTO users VALUES('3333','clone');
+insert into identifiers (userid, id, name) values ('3333', '3333', 'chip 30');
+INSERT or ignore INTO coffees (id, flavor, time) VALUES
('1111', 'espresso lungo', datetime('now', 'localtime', '-99 seconds')),
('1111', 'Club-Mate 0,5 l', datetime('now', 'localtime', '-99 seconds')),
('1111', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-99 seconds')),
('1111', 'espresso lungo', datetime('now', 'localtime', '-95 second')),
('1111', 'Club-Mate 0,5 l', datetime('now', 'localtime', '-95 second')),
('1111', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-95 second')),
+('1111', 'tea', datetime('now', 'localtime', '-95 second')),
('1111', 'espresso lungo', datetime('now', 'localtime', '-90 second')),
('1111', 'Club-Mate 0,5 l', datetime('now', 'localtime', '-90 second')),
('1111', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-90 second')),
('1111', 'Club-Mate 0,5 l', datetime('now', 'localtime', '-85 second')),
('1111', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-85 second')),
('2222', 'espresso lungo', datetime('now', 'localtime', '-99 seconds')),
-('2222', 'Club-Mate 0,5 l', datetime('now', 'localtime', '-99 seconds')),
('2222', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-99 seconds')),
('2222', 'espresso lungo', datetime('now', 'localtime', '-95 second')),
-('2222', 'Club-Mate 0,5 l', datetime('now', 'localtime', '-95 second')),
-('2222', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-95 second'));
+('2222', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-95 second')),
+('3333', 'espresso lungo', datetime('now', 'localtime', '-99 seconds')),
+('3333', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-99 seconds')),
+('3333', 'tea', datetime('now', 'localtime', '-99 second')),
+('3333', 'tea', datetime('now', 'localtime', '-95 second')),
+('3333', 'espresso lungo', datetime('now', 'localtime', '-95 second')),
+('3333', 'Club-Mate 0,5 l', datetime('now', 'localtime', '-95 second')),
+('3333', 'Club-Mate 0,33 l', datetime('now', 'localtime', '-95 second'));
""")
con.close()
+
def delete_test_db(fn):
remove(fn)
-class CoffeeDb(TestCase):
- def test_db_loads(self):
- cfg = Conf()
- self.assertTrue(path.isfile(cfg.getCoffeeDbPath()))
- db = Db(cfg.getCoffeeDbPath())
- self.assertIsNotNone(db.con)
- self.assertIsNotNone(db.cur)
- def test_test_db_loads(self):
- create_test_db(TESTDB_FILENAME)
+class Database(TestCase):
+ def test_connection_to_database(self):
+ db = Db(conf.DB_PATH)
+ assert db.con
+ assert db.cur
+
+ def test_connection_to_test_database(self):
+ create_test_db(TESTDB_PATH)
try:
- db = Db(TESTDB_FILENAME)
- self.assertIsNotNone(db.con)
- self.assertIsNotNone(db.cur)
+ db = Db(TESTDB_PATH)
+ assert db.con
+ assert db.cur
finally:
- delete_test_db(TESTDB_FILENAME)
+ delete_test_db(TESTDB_PATH)
+
+
+class DrunkSummary(TestCase):
+ tdy = datetime.now()
+ tdy += timedelta(days=1)
+ tdystr = tdy.strftime("%Y-%m-%d")
+ lw = tdy - timedelta(days=8)
+ lwstr = lw.strftime("%Y-%m-%d")
+
+ def setUp(self):
+ create_test_db(TESTDB_PATH)
+ self.db = Db(TESTDB_PATH)
+
+ def tearDown(self):
+ delete_test_db(TESTDB_PATH)
+
+ def test_top_drinks(self):
+ top = self.db.get_top_drinks()
+ assert top == (
+ ("espresso lungo", 8),
+ ("Club-Mate 0,33 l", 8),
+ ("Club-Mate 0,5 l", 5),
+ ("tea", 3))
+
+ def test_top_mate_drinkers(self):
+ assert self.db.get_top_mate_drinkers() == (
+ ("tester", 4*0.5 + 4*0.33),
+ ("clone", 0.5 + 2*0.33),
+ ("newer", 2*0.33))
+ top_sorted = list(self.db.get_top_mate_drinkers())
+ top_sorted.sort(key=lambda x: (x[1], x[0]), reverse=True)
+ assert self.db.get_top_mate_drinkers() == tuple(top_sorted)
+
+ def test_top_tea_drinkers(self):
+ assert self.db.get_top_tea_drinkers() == (("clone", 2), ("tester", 1))
+
+ def test_coffee_sums(self):
+ s = self.db.get_drunk_sum("espresso", self.lwstr, self.tdystr)
+ assert s == ()
+ s = self.db.get_drunk_sum("espresso lungo", self.lwstr, self.tdystr)
+ assert s == (("tester", 4), ("newer", 2), ("clone", 2))
+
+ def test_mate_sums(self):
+ s = self.db.get_drunk_sum("Club-Mate 0,33 l", self.lwstr, self.tdystr)
+ assert s == (("tester", 4), ("newer", 2), ("clone", 2))
+ s = self.db.get_drunk_sum("Club-Mate 0,5 l", self.lwstr, self.tdystr)
+ assert s == (("tester", 4), ("clone", 1))
+ s = self.db.get_drunk_sum(
+ "Club-Mate 0,5 l;Club-Mate 0,33 l",
+ self.lwstr,
+ self.tdystr)
+ assert s == (("tester", 8), ("newer", 2), ("clone", 3))
+ s = self.db.get_drunk_list(
+ "Club-Mate 0,5 l;Club-Mate 0,33 l",
+ self.lwstr,
+ self.tdystr)
+ assert s == {"tester": [4, 4], "newer": [0, 2], "clone": [1, 2]}
+
+ def test_assert_error_raised_for_bad_number_of_arguments(self):
+ self.assertRaises(TypeError, self.db.get_drunk_sum)
+ self.assertRaises(TypeError, self.db.get_drunk_sum, "")
+ self.assertRaises(TypeError, self.db.get_drunk_sum, [])
+ self.assertRaises(TypeError, self.db.get_drunk_sum, "espresso")
+ self.assertRaises(
+ TypeError,
+ self.db.get_drunk_sum,
+ "espresso",
+ "2010-10-20")
+ self.assertRaises(
+ TypeError,
+ self.db.get_drunk_sum,
+ "espresso",
+ "lungo",
+ "2010-10-20",
+ "2010-10-20")