100 lines
2.5 KiB
Python
100 lines
2.5 KiB
Python
import asyncio
|
|
from datetime import datetime
|
|
|
|
from src.neo_neo_todo.models.todo import (
|
|
delete_todo,
|
|
insert_todo,
|
|
select_todo,
|
|
select_todos,
|
|
update_todo,
|
|
)
|
|
|
|
|
|
async def test_model_todo_select_todos(db_pool):
|
|
todos_list_all = await select_todos(db_pool, 1)
|
|
|
|
assert len(todos_list_all) == 3
|
|
|
|
todos_list_completed_false = await select_todos(db_pool, 1, False)
|
|
|
|
assert len(todos_list_completed_false) == 3
|
|
|
|
todos_list_completed_true = await select_todos(db_pool, 1, True)
|
|
|
|
assert len(todos_list_completed_true) == 0
|
|
|
|
todos_list_non_existent_member_id = await select_todos(db_pool, 12341234)
|
|
|
|
assert len(todos_list_non_existent_member_id) == 0
|
|
|
|
|
|
async def test_model_todo_select_todo(db_pool):
|
|
todo_queries = [
|
|
select_todo(db_pool, member_id=1, id=1),
|
|
select_todo(db_pool, member_id=1, id=2),
|
|
select_todo(db_pool, member_id=1, id=3),
|
|
]
|
|
|
|
# Now you can await all the select_todo calls
|
|
todos = await asyncio.gather(*todo_queries)
|
|
|
|
for i in range(len(todos)):
|
|
assert todos[i] is not None
|
|
assert todos[i].id == (i + 1) # type: ignore
|
|
assert not todos[i].complete # type: ignore
|
|
|
|
non_existent_member_id_todo = await select_todo(db_pool, member_id=1234123, id=1)
|
|
|
|
assert non_existent_member_id_todo is None
|
|
|
|
|
|
async def test_model_todo_insert_todo(db_pool):
|
|
todo_insert_success = await insert_todo(
|
|
db_pool, member_id=1, task="Test Task 4", complete=False, due=None
|
|
)
|
|
|
|
assert todo_insert_success
|
|
assert not todo_insert_success.complete
|
|
assert todo_insert_success.task == "Test Task 4"
|
|
|
|
|
|
async def test_model_todo_update_todo(db_pool):
|
|
due = datetime.now()
|
|
|
|
todo_updated_1 = await update_todo(
|
|
db_pool,
|
|
id=3,
|
|
member_id=1,
|
|
task="Updated Task Test",
|
|
complete=True,
|
|
due=due,
|
|
)
|
|
|
|
assert todo_updated_1
|
|
assert todo_updated_1.id == 3
|
|
assert todo_updated_1.task == "Updated Task Test"
|
|
assert todo_updated_1.complete
|
|
assert todo_updated_1.due
|
|
|
|
# non existent id and member_id
|
|
todo_updated_2 = await update_todo(
|
|
db_pool,
|
|
id=99999,
|
|
member_id=2,
|
|
task="Updated Task Test",
|
|
complete=True,
|
|
due=due,
|
|
)
|
|
|
|
assert not todo_updated_2
|
|
|
|
|
|
async def test_model_todo_delete_todo(db_pool):
|
|
delete_todo_success = await delete_todo(db_pool, id=1, member_id=1)
|
|
|
|
assert delete_todo_success
|
|
|
|
delete_todo_fail = await delete_todo(db_pool, id=1, member_id=2)
|
|
|
|
assert not delete_todo_fail
|