mirror of
https://git.notmuchmail.org/git/notmuch
synced 2024-11-30 06:34:12 +01:00
776a54a0e4
Since it is possible to use an atomic context to abort a number of changes support this usage. Because the only way to actually abort the transaction is to close the database this must also do so. Amended by db: Note the limitation requiring close is a limitation of the underlying notmuch API, which should be fixed in a future notmuch release.
342 lines
10 KiB
Python
342 lines
10 KiB
Python
import collections
|
|
import configparser
|
|
import os
|
|
import pathlib
|
|
|
|
import pytest
|
|
|
|
import notmuch2
|
|
import notmuch2._errors as errors
|
|
import notmuch2._database as dbmod
|
|
import notmuch2._message as message
|
|
|
|
|
|
@pytest.fixture
|
|
def db(maildir):
|
|
with dbmod.Database.create(maildir.path) as db:
|
|
yield db
|
|
|
|
|
|
class TestDefaultDb:
|
|
"""Tests for reading the default database.
|
|
|
|
The error cases are fairly undefined, some relevant Python error
|
|
will come out if you give it a bad filename or if the file does
|
|
not parse correctly. So we're not testing this too deeply.
|
|
"""
|
|
|
|
def test_config_pathname_default(self, monkeypatch):
|
|
monkeypatch.delenv('NOTMUCH_CONFIG', raising=False)
|
|
user = pathlib.Path('~/.notmuch-config').expanduser()
|
|
assert dbmod._config_pathname() == user
|
|
|
|
def test_config_pathname_env(self, monkeypatch):
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', '/some/random/path')
|
|
assert dbmod._config_pathname() == pathlib.Path('/some/random/path')
|
|
|
|
def test_default_path_nocfg(self, monkeypatch, tmppath):
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath/'foo'))
|
|
with pytest.raises(FileNotFoundError):
|
|
dbmod.Database.default_path()
|
|
|
|
def test_default_path_cfg_is_dir(self, monkeypatch, tmppath):
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath))
|
|
with pytest.raises(IsADirectoryError):
|
|
dbmod.Database.default_path()
|
|
|
|
def test_default_path_parseerr(self, monkeypatch, tmppath):
|
|
cfg = tmppath / 'notmuch-config'
|
|
with cfg.open('w') as fp:
|
|
fp.write('invalid')
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
|
|
with pytest.raises(configparser.Error):
|
|
dbmod.Database.default_path()
|
|
|
|
def test_default_path_parse(self, monkeypatch, tmppath):
|
|
cfg = tmppath / 'notmuch-config'
|
|
with cfg.open('w') as fp:
|
|
fp.write('[database]\n')
|
|
fp.write('path={!s}'.format(tmppath))
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
|
|
assert dbmod.Database.default_path() == tmppath
|
|
|
|
def test_default_path_param(self, monkeypatch, tmppath):
|
|
cfg_dummy = tmppath / 'dummy'
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg_dummy))
|
|
cfg_real = tmppath / 'notmuch_config'
|
|
with cfg_real.open('w') as fp:
|
|
fp.write('[database]\n')
|
|
fp.write('path={!s}'.format(cfg_real/'mail'))
|
|
assert dbmod.Database.default_path(cfg_real) == cfg_real/'mail'
|
|
|
|
|
|
class TestCreate:
|
|
|
|
def test_create(self, tmppath, db):
|
|
assert tmppath.joinpath('.notmuch/xapian/').exists()
|
|
|
|
def test_create_already_open(self, tmppath, db):
|
|
with pytest.raises(errors.NotmuchError):
|
|
db.create(tmppath)
|
|
|
|
def test_create_existing(self, tmppath, db):
|
|
with pytest.raises(errors.FileError):
|
|
dbmod.Database.create(path=tmppath)
|
|
|
|
def test_close(self, db):
|
|
db.close()
|
|
|
|
def test_del_noclose(self, db):
|
|
del db
|
|
|
|
def test_close_del(self, db):
|
|
db.close()
|
|
del db
|
|
|
|
def test_closed_attr(self, db):
|
|
assert not db.closed
|
|
db.close()
|
|
assert db.closed
|
|
|
|
def test_ctx(self, db):
|
|
with db as ctx:
|
|
assert ctx is db
|
|
assert not db.closed
|
|
assert db.closed
|
|
|
|
def test_path(self, db, tmppath):
|
|
assert db.path == tmppath
|
|
|
|
def test_version(self, db):
|
|
assert db.version > 0
|
|
|
|
def test_needs_upgrade(self, db):
|
|
assert db.needs_upgrade in (True, False)
|
|
|
|
|
|
class TestAtomic:
|
|
|
|
def test_exit_early(self, db):
|
|
with pytest.raises(errors.UnbalancedAtomicError):
|
|
with db.atomic() as ctx:
|
|
ctx.force_end()
|
|
|
|
def test_exit_late(self, db):
|
|
with db.atomic() as ctx:
|
|
pass
|
|
with pytest.raises(errors.UnbalancedAtomicError):
|
|
ctx.force_end()
|
|
|
|
def test_abort(self, db):
|
|
with db.atomic() as txn:
|
|
txn.abort()
|
|
assert db.closed
|
|
|
|
|
|
class TestRevision:
|
|
|
|
def test_single_rev(self, db):
|
|
r = db.revision()
|
|
assert isinstance(r, dbmod.DbRevision)
|
|
assert isinstance(r.rev, int)
|
|
assert isinstance(r.uuid, bytes)
|
|
assert r is r
|
|
assert r == r
|
|
assert r <= r
|
|
assert r >= r
|
|
assert not r < r
|
|
assert not r > r
|
|
|
|
def test_diff_db(self, tmppath):
|
|
dbpath0 = tmppath.joinpath('db0')
|
|
dbpath0.mkdir()
|
|
dbpath1 = tmppath.joinpath('db1')
|
|
dbpath1.mkdir()
|
|
db0 = dbmod.Database.create(path=dbpath0)
|
|
db1 = dbmod.Database.create(path=dbpath1)
|
|
r_db0 = db0.revision()
|
|
r_db1 = db1.revision()
|
|
assert r_db0 != r_db1
|
|
assert r_db0.uuid != r_db1.uuid
|
|
|
|
def test_cmp(self, db, maildir):
|
|
rev0 = db.revision()
|
|
_, pathname = maildir.deliver()
|
|
db.add(pathname, sync_flags=False)
|
|
rev1 = db.revision()
|
|
assert rev0 < rev1
|
|
assert rev0 <= rev1
|
|
assert not rev0 > rev1
|
|
assert not rev0 >= rev1
|
|
assert not rev0 == rev1
|
|
assert rev0 != rev1
|
|
|
|
# XXX add tests for revisions comparisons
|
|
|
|
class TestMessages:
|
|
|
|
def test_add_message(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(pathname, sync_flags=False)
|
|
assert isinstance(msg, message.Message)
|
|
assert msg.path == pathname
|
|
assert msg.messageid == msgid
|
|
|
|
def test_add_message_str(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(str(pathname), sync_flags=False)
|
|
|
|
def test_add_message_bytes(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(os.fsencode(bytes(pathname)), sync_flags=False)
|
|
|
|
def test_remove_message(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(pathname, sync_flags=False)
|
|
assert db.find(msgid)
|
|
dup = db.remove(pathname)
|
|
with pytest.raises(LookupError):
|
|
db.find(msgid)
|
|
|
|
def test_remove_message_str(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(pathname, sync_flags=False)
|
|
assert db.find(msgid)
|
|
dup = db.remove(str(pathname))
|
|
with pytest.raises(LookupError):
|
|
db.find(msgid)
|
|
|
|
def test_remove_message_bytes(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(pathname, sync_flags=False)
|
|
assert db.find(msgid)
|
|
dup = db.remove(os.fsencode(bytes(pathname)))
|
|
with pytest.raises(LookupError):
|
|
db.find(msgid)
|
|
|
|
def test_find_message(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg0, dup = db.add(pathname, sync_flags=False)
|
|
msg1 = db.find(msgid)
|
|
assert isinstance(msg1, message.Message)
|
|
assert msg1.messageid == msgid == msg0.messageid
|
|
assert msg1.path == pathname == msg0.path
|
|
|
|
def test_find_message_notfound(self, db):
|
|
with pytest.raises(LookupError):
|
|
db.find('foo')
|
|
|
|
def test_get_message(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg0, _ = db.add(pathname, sync_flags=False)
|
|
msg1 = db.get(pathname)
|
|
assert isinstance(msg1, message.Message)
|
|
assert msg1.messageid == msgid == msg0.messageid
|
|
assert msg1.path == pathname == msg0.path
|
|
|
|
def test_get_message_str(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
db.add(pathname, sync_flags=False)
|
|
msg = db.get(str(pathname))
|
|
assert msg.messageid == msgid
|
|
|
|
def test_get_message_bytes(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
db.add(pathname, sync_flags=False)
|
|
msg = db.get(os.fsencode(bytes(pathname)))
|
|
assert msg.messageid == msgid
|
|
|
|
|
|
class TestTags:
|
|
# We just want to test this behaves like a set at a hight level.
|
|
# The set semantics are tested in detail in the test_tags module.
|
|
|
|
def test_type(self, db):
|
|
assert isinstance(db.tags, collections.abc.Set)
|
|
|
|
def test_none(self, db):
|
|
itags = iter(db.tags)
|
|
with pytest.raises(StopIteration):
|
|
next(itags)
|
|
assert len(db.tags) == 0
|
|
assert not db.tags
|
|
|
|
def test_some(self, db, maildir):
|
|
_, pathname = maildir.deliver()
|
|
msg, _ = db.add(pathname, sync_flags=False)
|
|
msg.tags.add('hello')
|
|
itags = iter(db.tags)
|
|
assert next(itags) == 'hello'
|
|
with pytest.raises(StopIteration):
|
|
next(itags)
|
|
assert 'hello' in msg.tags
|
|
|
|
def test_cache(self, db):
|
|
assert db.tags is db.tags
|
|
|
|
def test_iters(self, db):
|
|
i1 = iter(db.tags)
|
|
i2 = iter(db.tags)
|
|
assert i1 is not i2
|
|
|
|
|
|
class TestQuery:
|
|
|
|
@pytest.fixture
|
|
def db(self, maildir, notmuch):
|
|
"""Return a read-only notmuch2.Database.
|
|
|
|
The database will have 3 messages, 2 threads.
|
|
"""
|
|
msgid, _ = maildir.deliver(body='foo')
|
|
maildir.deliver(body='bar')
|
|
maildir.deliver(body='baz',
|
|
headers=[('In-Reply-To', '<{}>'.format(msgid))])
|
|
notmuch('new')
|
|
with dbmod.Database(maildir.path, 'rw') as db:
|
|
yield db
|
|
|
|
def test_count_messages(self, db):
|
|
assert db.count_messages('*') == 3
|
|
|
|
def test_messages_type(self, db):
|
|
msgs = db.messages('*')
|
|
assert isinstance(msgs, collections.abc.Iterator)
|
|
|
|
def test_message_no_results(self, db):
|
|
msgs = db.messages('not_a_matching_query')
|
|
with pytest.raises(StopIteration):
|
|
next(msgs)
|
|
|
|
def test_message_match(self, db):
|
|
msgs = db.messages('*')
|
|
msg = next(msgs)
|
|
assert isinstance(msg, notmuch2.Message)
|
|
|
|
def test_count_threads(self, db):
|
|
assert db.count_threads('*') == 2
|
|
|
|
def test_threads_type(self, db):
|
|
threads = db.threads('*')
|
|
assert isinstance(threads, collections.abc.Iterator)
|
|
|
|
def test_threads_no_match(self, db):
|
|
threads = db.threads('not_a_matching_query')
|
|
with pytest.raises(StopIteration):
|
|
next(threads)
|
|
|
|
def test_threads_match(self, db):
|
|
threads = db.threads('*')
|
|
thread = next(threads)
|
|
assert isinstance(thread, notmuch2.Thread)
|
|
|
|
def test_use_threaded_message_twice(self, db):
|
|
thread = next(db.threads('*'))
|
|
for msg in thread.toplevel():
|
|
assert isinstance(msg, notmuch2.Message)
|
|
assert msg.alive
|
|
del msg
|
|
for msg in thread:
|
|
assert isinstance(msg, notmuch2.Message)
|
|
assert msg.alive
|
|
del msg
|