import collections
import configparser
import os
import pathlib

import pytest

import notmuch2
import notmuch2._errors as errors
import notmuch2._database as dbmod
import notmuch2._message as message


@pytest.fixture
def db(maildir):
    with dbmod.Database.create(maildir.path, config=notmuch2.Database.CONFIG.EMPTY) as db:
        yield db


class TestDefaultDb:
    """Tests for reading the default database.

    The error cases are fairly undefined, some relevant Python error
    will come out if you give it a bad filename or if the file does
    not parse correctly.  So we're not testing this too deeply.
    """

    def test_config_pathname_default(self, monkeypatch):
        monkeypatch.delenv('NOTMUCH_CONFIG', raising=False)
        user = pathlib.Path('~/.notmuch-config').expanduser()
        assert dbmod._config_pathname() == user

    def test_config_pathname_env(self, monkeypatch):
        monkeypatch.setenv('NOTMUCH_CONFIG', '/some/random/path')
        assert dbmod._config_pathname() == pathlib.Path('/some/random/path')

    def test_default_path_nocfg(self, monkeypatch, tmppath):
        monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath/'foo'))
        with pytest.raises(FileNotFoundError):
            dbmod.Database.default_path()

    def test_default_path_cfg_is_dir(self, monkeypatch, tmppath):
        monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath))
        with pytest.raises(IsADirectoryError):
            dbmod.Database.default_path()

    def test_default_path_parseerr(self, monkeypatch, tmppath):
        cfg = tmppath / 'notmuch-config'
        with cfg.open('w') as fp:
            fp.write('invalid')
        monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
        with pytest.raises(configparser.Error):
            dbmod.Database.default_path()

    def test_default_path_parse(self, monkeypatch, tmppath):
        cfg = tmppath / 'notmuch-config'
        with cfg.open('w') as fp:
            fp.write('[database]\n')
            fp.write('path={!s}'.format(tmppath))
        monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
        assert dbmod.Database.default_path() == tmppath

    def test_default_path_param(self, monkeypatch, tmppath):
        cfg_dummy = tmppath / 'dummy'
        monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg_dummy))
        cfg_real = tmppath / 'notmuch_config'
        with cfg_real.open('w') as fp:
            fp.write('[database]\n')
            fp.write('path={!s}'.format(cfg_real/'mail'))
        assert dbmod.Database.default_path(cfg_real) == cfg_real/'mail'


class TestCreate:

    def test_create(self, tmppath, db):
        assert tmppath.joinpath('.notmuch/xapian/').exists()

    def test_create_already_open(self, tmppath, db):
        with pytest.raises(errors.NotmuchError):
            db.create(tmppath)

    def test_create_existing(self, tmppath, db):
        with pytest.raises(errors.DatabaseExistsError):
            dbmod.Database.create(path=tmppath)

    def test_close(self, db):
        db.close()

    def test_del_noclose(self, db):
        del db

    def test_close_del(self, db):
        db.close()
        del db

    def test_closed_attr(self, db):
        assert not db.closed
        db.close()
        assert db.closed

    def test_ctx(self, db):
        with db as ctx:
            assert ctx is db
            assert not db.closed
        assert db.closed

    def test_path(self, db, tmppath):
        assert db.path == tmppath

    def test_version(self, db):
        assert db.version > 0

    def test_needs_upgrade(self, db):
        assert db.needs_upgrade in (True, False)


class TestAtomic:

    def test_exit_early(self, db):
        with pytest.raises(errors.UnbalancedAtomicError):
            with db.atomic() as ctx:
                ctx.force_end()

    def test_exit_late(self, db):
        with db.atomic() as ctx:
            pass
        with pytest.raises(errors.UnbalancedAtomicError):
            ctx.force_end()

    def test_abort(self, db):
        with db.atomic() as txn:
            txn.abort()
        assert db.closed


class TestRevision:

    def test_single_rev(self, db):
        r = db.revision()
        assert isinstance(r, dbmod.DbRevision)
        assert isinstance(r.rev, int)
        assert isinstance(r.uuid, bytes)
        assert r is r
        assert r == r
        assert r <= r
        assert r >= r
        assert not r < r
        assert not r > r

    def test_diff_db(self, tmppath):
        dbpath0 = tmppath.joinpath('db0')
        dbpath0.mkdir()
        dbpath1 = tmppath.joinpath('db1')
        dbpath1.mkdir()
        db0 = dbmod.Database.create(path=dbpath0)
        db1 = dbmod.Database.create(path=dbpath1)
        r_db0 = db0.revision()
        r_db1 = db1.revision()
        assert r_db0 != r_db1
        assert r_db0.uuid != r_db1.uuid

    def test_cmp(self, db, maildir):
        rev0 = db.revision()
        _, pathname = maildir.deliver()
        db.add(pathname, sync_flags=False)
        rev1 = db.revision()
        assert rev0 < rev1
        assert rev0 <= rev1
        assert not rev0 > rev1
        assert not rev0 >= rev1
        assert not rev0 == rev1
        assert rev0 != rev1

    # XXX add tests for revisions comparisons

class TestMessages:

    def test_add_message(self, db, maildir):
        msgid, pathname = maildir.deliver()
        msg, dup = db.add(pathname, sync_flags=False)
        assert isinstance(msg, message.Message)
        assert msg.path == pathname
        assert msg.messageid == msgid

    def test_add_message_str(self, db, maildir):
        msgid, pathname = maildir.deliver()
        msg, dup = db.add(str(pathname), sync_flags=False)

    def test_add_message_bytes(self, db, maildir):
        msgid, pathname = maildir.deliver()
        msg, dup = db.add(os.fsencode(bytes(pathname)), sync_flags=False)

    def test_remove_message(self, db, maildir):
        msgid, pathname = maildir.deliver()
        msg, dup = db.add(pathname, sync_flags=False)
        assert db.find(msgid)
        dup = db.remove(pathname)
        with pytest.raises(LookupError):
            db.find(msgid)

    def test_remove_message_str(self, db, maildir):
        msgid, pathname = maildir.deliver()
        msg, dup = db.add(pathname, sync_flags=False)
        assert db.find(msgid)
        dup = db.remove(str(pathname))
        with pytest.raises(LookupError):
            db.find(msgid)

    def test_remove_message_bytes(self, db, maildir):
        msgid, pathname = maildir.deliver()
        msg, dup = db.add(pathname, sync_flags=False)
        assert db.find(msgid)
        dup = db.remove(os.fsencode(bytes(pathname)))
        with pytest.raises(LookupError):
            db.find(msgid)

    def test_find_message(self, db, maildir):
        msgid, pathname = maildir.deliver()
        msg0, dup = db.add(pathname, sync_flags=False)
        msg1 = db.find(msgid)
        assert isinstance(msg1, message.Message)
        assert msg1.messageid == msgid == msg0.messageid
        assert msg1.path == pathname == msg0.path

    def test_find_message_notfound(self, db):
        with pytest.raises(LookupError):
            db.find('foo')

    def test_get_message(self, db, maildir):
        msgid, pathname = maildir.deliver()
        msg0, _ = db.add(pathname, sync_flags=False)
        msg1 = db.get(pathname)
        assert isinstance(msg1, message.Message)
        assert msg1.messageid == msgid == msg0.messageid
        assert msg1.path == pathname == msg0.path

    def test_get_message_str(self, db, maildir):
        msgid, pathname = maildir.deliver()
        db.add(pathname, sync_flags=False)
        msg = db.get(str(pathname))
        assert msg.messageid == msgid

    def test_get_message_bytes(self, db, maildir):
        msgid, pathname = maildir.deliver()
        db.add(pathname, sync_flags=False)
        msg = db.get(os.fsencode(bytes(pathname)))
        assert msg.messageid == msgid


class TestTags:
    # We just want to test this behaves like a set at a hight level.
    # The set semantics are tested in detail in the test_tags module.

    def test_type(self, db):
        assert isinstance(db.tags, collections.abc.Set)

    def test_none(self, db):
        itags = iter(db.tags)
        with pytest.raises(StopIteration):
            next(itags)
        assert len(db.tags) == 0
        assert not db.tags

    def test_some(self, db, maildir):
        _, pathname = maildir.deliver()
        msg, _ = db.add(pathname, sync_flags=False)
        msg.tags.add('hello')
        itags = iter(db.tags)
        assert next(itags) == 'hello'
        with pytest.raises(StopIteration):
            next(itags)
        assert 'hello' in msg.tags

    def test_cache(self, db):
        assert db.tags is db.tags

    def test_iters(self, db):
        i1 = iter(db.tags)
        i2 = iter(db.tags)
        assert i1 is not i2


class TestQuery:

    @pytest.fixture
    def db(self, maildir, notmuch):
        """Return a read-only notmuch2.Database.

        The database will have 3 messages, 2 threads.
        """
        msgid, _ = maildir.deliver(body='foo')
        maildir.deliver(body='bar')
        maildir.deliver(body='baz',
                        headers=[('In-Reply-To', '<{}>'.format(msgid))])
        notmuch('new')
        with dbmod.Database(maildir.path, 'rw', config=notmuch2.Database.CONFIG.EMPTY) as db:
            yield db

    def test_count_messages(self, db):
        assert db.count_messages('*') == 3

    def test_messages_type(self, db):
        msgs = db.messages('*')
        assert isinstance(msgs, collections.abc.Iterator)

    def test_message_no_results(self, db):
        msgs = db.messages('not_a_matching_query')
        with pytest.raises(StopIteration):
            next(msgs)

    def test_message_match(self, db):
        msgs = db.messages('*')
        msg = next(msgs)
        assert isinstance(msg, notmuch2.Message)

    def test_count_threads(self, db):
        assert db.count_threads('*') == 2

    def test_threads_type(self, db):
        threads = db.threads('*')
        assert isinstance(threads, collections.abc.Iterator)

    def test_threads_no_match(self, db):
        threads = db.threads('not_a_matching_query')
        with pytest.raises(StopIteration):
            next(threads)

    def test_threads_match(self, db):
        threads = db.threads('*')
        thread = next(threads)
        assert isinstance(thread, notmuch2.Thread)

    def test_use_threaded_message_twice(self, db):
        thread = next(db.threads('*'))
        for msg in thread.toplevel():
            assert isinstance(msg, notmuch2.Message)
            assert msg.alive
            del msg
        for msg in thread:
            assert isinstance(msg, notmuch2.Message)
            assert msg.alive
            del msg