notmuch/bindings/python-cffi/notdb/_query.py

84 lines
2.7 KiB
Python
Raw Normal View History

from notdb import _base as base
from notdb import _capi as capi
from notdb import _errors as errors
from notdb import _message as message
from notdb import _thread as thread
__all__ = []
class Query(base.NotmuchObject):
"""Private, minimal query object.
This is not meant for users and is not a full implementation of
the query API. It is only an intermediate used internally to
match libnotmuch's memory management.
"""
_query_p = base.MemoryPointer()
def __init__(self, db, query_p):
self._db = db
self._query_p = query_p
@property
def alive(self):
if not self._db.alive:
return False
try:
self._query_p
except errors.ObjectDestroyedError:
return False
else:
return True
def __del__(self):
self._destroy()
def _destroy(self):
if self.alive:
capi.lib.notmuch_query_destroy(self._query_p)
self._query_p = None
@property
def query(self):
"""The query string as seen by libnotmuch."""
q = capi.lib.notmuch_query_get_query_string(self._query_p)
return base.BinString.from_cffi(q)
def messages(self):
"""Return an iterator over all the messages found by the query.
This executes the query and returns an iterator over the
:class:`Message` objects found.
"""
msgs_pp = capi.ffi.new('notmuch_messages_t**')
ret = capi.lib.notmuch_query_search_messages(self._query_p, msgs_pp)
if ret != capi.lib.NOTMUCH_STATUS_SUCCESS:
raise errors.NotmuchError(ret)
return message.MessageIter(self, msgs_pp[0], db=self._db)
def count_messages(self):
"""Return the number of messages matching this query."""
count_p = capi.ffi.new('unsigned int *')
ret = capi.lib.notmuch_query_count_messages(self._query_p, count_p)
if ret != capi.lib.NOTMUCH_STATUS_SUCCESS:
raise errors.NotmuchError(ret)
return count_p[0]
def threads(self):
"""Return an iterator over all the threads found by the query."""
threads_pp = capi.ffi.new('notmuch_threads_t **')
ret = capi.lib.notmuch_query_search_threads(self._query_p, threads_pp)
if ret != capi.lib.NOTMUCH_STATUS_SUCCESS:
raise errors.NotmuchError(ret)
return thread.ThreadIter(self, threads_pp[0], db=self._db)
def count_threads(self):
"""Return the number of threads matching this query."""
count_p = capi.ffi.new('unsigned int *')
ret = capi.lib.notmuch_query_count_threads(self._query_p, count_p)
if ret != capi.lib.NOTMUCH_STATUS_SUCCESS:
raise errors.NotmuchError(ret)
return count_p[0]