mirror of
https://git.notmuchmail.org/git/notmuch
synced 2024-12-27 11:51:42 +01:00
8b737af28b
The previous (pre-0.34.2) constructor searched for a config file but only if the database path was not specified, and only to retrieve database.path. Neither of the available options (CONFIG.SEARCH or CONFIG.NONE) matches this semantics exactly, but CONFIG.SEARCH causes less breakage for people who relied on the old behaviour to set their database.path [1]. Since it also seems like the friendlier option in the long run, this commit switches to CONFIG.SEARCH as default. This requires a certain amount of updating the pytest tests, but most users will actually have a config file, unlike the test environment. [1]: id:87fsqijx7u.fsf@metapensiero.it
342 lines
10 KiB
Python
342 lines
10 KiB
Python
import collections
|
|
import configparser
|
|
import os
|
|
import pathlib
|
|
|
|
import pytest
|
|
|
|
import notmuch2
|
|
import notmuch2._errors as errors
|
|
import notmuch2._database as dbmod
|
|
import notmuch2._message as message
|
|
|
|
|
|
@pytest.fixture
|
|
def db(maildir):
|
|
with dbmod.Database.create(maildir.path, config=notmuch2.Database.CONFIG.EMPTY) as db:
|
|
yield db
|
|
|
|
|
|
class TestDefaultDb:
|
|
"""Tests for reading the default database.
|
|
|
|
The error cases are fairly undefined, some relevant Python error
|
|
will come out if you give it a bad filename or if the file does
|
|
not parse correctly. So we're not testing this too deeply.
|
|
"""
|
|
|
|
def test_config_pathname_default(self, monkeypatch):
|
|
monkeypatch.delenv('NOTMUCH_CONFIG', raising=False)
|
|
user = pathlib.Path('~/.notmuch-config').expanduser()
|
|
assert dbmod._config_pathname() == user
|
|
|
|
def test_config_pathname_env(self, monkeypatch):
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', '/some/random/path')
|
|
assert dbmod._config_pathname() == pathlib.Path('/some/random/path')
|
|
|
|
def test_default_path_nocfg(self, monkeypatch, tmppath):
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath/'foo'))
|
|
with pytest.raises(FileNotFoundError):
|
|
dbmod.Database.default_path()
|
|
|
|
def test_default_path_cfg_is_dir(self, monkeypatch, tmppath):
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath))
|
|
with pytest.raises(IsADirectoryError):
|
|
dbmod.Database.default_path()
|
|
|
|
def test_default_path_parseerr(self, monkeypatch, tmppath):
|
|
cfg = tmppath / 'notmuch-config'
|
|
with cfg.open('w') as fp:
|
|
fp.write('invalid')
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
|
|
with pytest.raises(configparser.Error):
|
|
dbmod.Database.default_path()
|
|
|
|
def test_default_path_parse(self, monkeypatch, tmppath):
|
|
cfg = tmppath / 'notmuch-config'
|
|
with cfg.open('w') as fp:
|
|
fp.write('[database]\n')
|
|
fp.write('path={!s}'.format(tmppath))
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
|
|
assert dbmod.Database.default_path() == tmppath
|
|
|
|
def test_default_path_param(self, monkeypatch, tmppath):
|
|
cfg_dummy = tmppath / 'dummy'
|
|
monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg_dummy))
|
|
cfg_real = tmppath / 'notmuch_config'
|
|
with cfg_real.open('w') as fp:
|
|
fp.write('[database]\n')
|
|
fp.write('path={!s}'.format(cfg_real/'mail'))
|
|
assert dbmod.Database.default_path(cfg_real) == cfg_real/'mail'
|
|
|
|
|
|
class TestCreate:
|
|
|
|
def test_create(self, tmppath, db):
|
|
assert tmppath.joinpath('.notmuch/xapian/').exists()
|
|
|
|
def test_create_already_open(self, tmppath, db):
|
|
with pytest.raises(errors.NotmuchError):
|
|
db.create(tmppath)
|
|
|
|
def test_create_existing(self, tmppath, db):
|
|
with pytest.raises(errors.DatabaseExistsError):
|
|
dbmod.Database.create(path=tmppath)
|
|
|
|
def test_close(self, db):
|
|
db.close()
|
|
|
|
def test_del_noclose(self, db):
|
|
del db
|
|
|
|
def test_close_del(self, db):
|
|
db.close()
|
|
del db
|
|
|
|
def test_closed_attr(self, db):
|
|
assert not db.closed
|
|
db.close()
|
|
assert db.closed
|
|
|
|
def test_ctx(self, db):
|
|
with db as ctx:
|
|
assert ctx is db
|
|
assert not db.closed
|
|
assert db.closed
|
|
|
|
def test_path(self, db, tmppath):
|
|
assert db.path == tmppath
|
|
|
|
def test_version(self, db):
|
|
assert db.version > 0
|
|
|
|
def test_needs_upgrade(self, db):
|
|
assert db.needs_upgrade in (True, False)
|
|
|
|
|
|
class TestAtomic:
|
|
|
|
def test_exit_early(self, db):
|
|
with pytest.raises(errors.UnbalancedAtomicError):
|
|
with db.atomic() as ctx:
|
|
ctx.force_end()
|
|
|
|
def test_exit_late(self, db):
|
|
with db.atomic() as ctx:
|
|
pass
|
|
with pytest.raises(errors.UnbalancedAtomicError):
|
|
ctx.force_end()
|
|
|
|
def test_abort(self, db):
|
|
with db.atomic() as txn:
|
|
txn.abort()
|
|
assert db.closed
|
|
|
|
|
|
class TestRevision:
|
|
|
|
def test_single_rev(self, db):
|
|
r = db.revision()
|
|
assert isinstance(r, dbmod.DbRevision)
|
|
assert isinstance(r.rev, int)
|
|
assert isinstance(r.uuid, bytes)
|
|
assert r is r
|
|
assert r == r
|
|
assert r <= r
|
|
assert r >= r
|
|
assert not r < r
|
|
assert not r > r
|
|
|
|
def test_diff_db(self, tmppath):
|
|
dbpath0 = tmppath.joinpath('db0')
|
|
dbpath0.mkdir()
|
|
dbpath1 = tmppath.joinpath('db1')
|
|
dbpath1.mkdir()
|
|
db0 = dbmod.Database.create(path=dbpath0)
|
|
db1 = dbmod.Database.create(path=dbpath1)
|
|
r_db0 = db0.revision()
|
|
r_db1 = db1.revision()
|
|
assert r_db0 != r_db1
|
|
assert r_db0.uuid != r_db1.uuid
|
|
|
|
def test_cmp(self, db, maildir):
|
|
rev0 = db.revision()
|
|
_, pathname = maildir.deliver()
|
|
db.add(pathname, sync_flags=False)
|
|
rev1 = db.revision()
|
|
assert rev0 < rev1
|
|
assert rev0 <= rev1
|
|
assert not rev0 > rev1
|
|
assert not rev0 >= rev1
|
|
assert not rev0 == rev1
|
|
assert rev0 != rev1
|
|
|
|
# XXX add tests for revisions comparisons
|
|
|
|
class TestMessages:
|
|
|
|
def test_add_message(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(pathname, sync_flags=False)
|
|
assert isinstance(msg, message.Message)
|
|
assert msg.path == pathname
|
|
assert msg.messageid == msgid
|
|
|
|
def test_add_message_str(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(str(pathname), sync_flags=False)
|
|
|
|
def test_add_message_bytes(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(os.fsencode(bytes(pathname)), sync_flags=False)
|
|
|
|
def test_remove_message(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(pathname, sync_flags=False)
|
|
assert db.find(msgid)
|
|
dup = db.remove(pathname)
|
|
with pytest.raises(LookupError):
|
|
db.find(msgid)
|
|
|
|
def test_remove_message_str(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(pathname, sync_flags=False)
|
|
assert db.find(msgid)
|
|
dup = db.remove(str(pathname))
|
|
with pytest.raises(LookupError):
|
|
db.find(msgid)
|
|
|
|
def test_remove_message_bytes(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg, dup = db.add(pathname, sync_flags=False)
|
|
assert db.find(msgid)
|
|
dup = db.remove(os.fsencode(bytes(pathname)))
|
|
with pytest.raises(LookupError):
|
|
db.find(msgid)
|
|
|
|
def test_find_message(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg0, dup = db.add(pathname, sync_flags=False)
|
|
msg1 = db.find(msgid)
|
|
assert isinstance(msg1, message.Message)
|
|
assert msg1.messageid == msgid == msg0.messageid
|
|
assert msg1.path == pathname == msg0.path
|
|
|
|
def test_find_message_notfound(self, db):
|
|
with pytest.raises(LookupError):
|
|
db.find('foo')
|
|
|
|
def test_get_message(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
msg0, _ = db.add(pathname, sync_flags=False)
|
|
msg1 = db.get(pathname)
|
|
assert isinstance(msg1, message.Message)
|
|
assert msg1.messageid == msgid == msg0.messageid
|
|
assert msg1.path == pathname == msg0.path
|
|
|
|
def test_get_message_str(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
db.add(pathname, sync_flags=False)
|
|
msg = db.get(str(pathname))
|
|
assert msg.messageid == msgid
|
|
|
|
def test_get_message_bytes(self, db, maildir):
|
|
msgid, pathname = maildir.deliver()
|
|
db.add(pathname, sync_flags=False)
|
|
msg = db.get(os.fsencode(bytes(pathname)))
|
|
assert msg.messageid == msgid
|
|
|
|
|
|
class TestTags:
|
|
# We just want to test this behaves like a set at a hight level.
|
|
# The set semantics are tested in detail in the test_tags module.
|
|
|
|
def test_type(self, db):
|
|
assert isinstance(db.tags, collections.abc.Set)
|
|
|
|
def test_none(self, db):
|
|
itags = iter(db.tags)
|
|
with pytest.raises(StopIteration):
|
|
next(itags)
|
|
assert len(db.tags) == 0
|
|
assert not db.tags
|
|
|
|
def test_some(self, db, maildir):
|
|
_, pathname = maildir.deliver()
|
|
msg, _ = db.add(pathname, sync_flags=False)
|
|
msg.tags.add('hello')
|
|
itags = iter(db.tags)
|
|
assert next(itags) == 'hello'
|
|
with pytest.raises(StopIteration):
|
|
next(itags)
|
|
assert 'hello' in msg.tags
|
|
|
|
def test_cache(self, db):
|
|
assert db.tags is db.tags
|
|
|
|
def test_iters(self, db):
|
|
i1 = iter(db.tags)
|
|
i2 = iter(db.tags)
|
|
assert i1 is not i2
|
|
|
|
|
|
class TestQuery:
|
|
|
|
@pytest.fixture
|
|
def db(self, maildir, notmuch):
|
|
"""Return a read-only notmuch2.Database.
|
|
|
|
The database will have 3 messages, 2 threads.
|
|
"""
|
|
msgid, _ = maildir.deliver(body='foo')
|
|
maildir.deliver(body='bar')
|
|
maildir.deliver(body='baz',
|
|
headers=[('In-Reply-To', '<{}>'.format(msgid))])
|
|
notmuch('new')
|
|
with dbmod.Database(maildir.path, 'rw', config=notmuch2.Database.CONFIG.EMPTY) as db:
|
|
yield db
|
|
|
|
def test_count_messages(self, db):
|
|
assert db.count_messages('*') == 3
|
|
|
|
def test_messages_type(self, db):
|
|
msgs = db.messages('*')
|
|
assert isinstance(msgs, collections.abc.Iterator)
|
|
|
|
def test_message_no_results(self, db):
|
|
msgs = db.messages('not_a_matching_query')
|
|
with pytest.raises(StopIteration):
|
|
next(msgs)
|
|
|
|
def test_message_match(self, db):
|
|
msgs = db.messages('*')
|
|
msg = next(msgs)
|
|
assert isinstance(msg, notmuch2.Message)
|
|
|
|
def test_count_threads(self, db):
|
|
assert db.count_threads('*') == 2
|
|
|
|
def test_threads_type(self, db):
|
|
threads = db.threads('*')
|
|
assert isinstance(threads, collections.abc.Iterator)
|
|
|
|
def test_threads_no_match(self, db):
|
|
threads = db.threads('not_a_matching_query')
|
|
with pytest.raises(StopIteration):
|
|
next(threads)
|
|
|
|
def test_threads_match(self, db):
|
|
threads = db.threads('*')
|
|
thread = next(threads)
|
|
assert isinstance(thread, notmuch2.Thread)
|
|
|
|
def test_use_threaded_message_twice(self, db):
|
|
thread = next(db.threads('*'))
|
|
for msg in thread.toplevel():
|
|
assert isinstance(msg, notmuch2.Message)
|
|
assert msg.alive
|
|
del msg
|
|
for msg in thread:
|
|
assert isinstance(msg, notmuch2.Message)
|
|
assert msg.alive
|
|
del msg
|