lunedì 29 giugno 2015

django-cms and haystack: update index with a django signal

Haystack provides the HAYSTACK_SIGNAL_PROCESSOR settings and it works well, especially the RealtimeSignalProcessor for the django models.

But, the problem that I had was update the search index when the user publishes a django-cms page.
In fact, if you use the HAYSTACK_SIGNAL_PROCESSOR settings, haystack will update your index with the draft copy of the page - both if you save a plugin or a page itself - and it's not a good thing for user experience... and also for our indexes! :-)

Going deep into this issue, I wrote a signal that wraps the RealtimeSignalProcessor and that doesn't break the django-cms publish command if there's no connection to the search engine.

# models.py
# other imports
import logging
# ...
logger = logging.getLogger(__name__)
# ...

def real_time_signal_processor(instance, **kwargs):
    from elasticsearch.exceptions import ConnectionError
    from django.conf import settings
    from haystack import signals
    from haystack.utils import loading

    connections = loading.ConnectionHandler(settings.HAYSTACK_CONNECTIONS)
    connection_router = loading.ConnectionRouter()
    if hasattr(settings, 'HAYSTACK_ROUTERS'):
        connection_router = loading.ConnectionRouter(settings.HAYSTACK_ROUTERS)
    try:
        signals.RealtimeSignalProcessor(connections, connection_router).handle_save(kwargs['sender'], instance)
    except ConnectionError as e:
        logger.error(e.message)


post_publish.connect(real_time_signal_processor)

Just for curiosity, the models.py is a part of the django app named search, where I put all the search engine code (custom backends, forms, etc). I think it's a good practice and nice to have an isolated app that serve to search purposes.

That's it!

Cheers

[Update] The code above catch the exception from elasticsearch client. With a generic exception, you can use that signal with your search backend. IMHO is not a good practice to catch a generic Exception, then use the appropriate one! :-)

mercoledì 24 giugno 2015

Dealing with elasticsearch reindex and haystack

Prerequisites: some knowledge on haystack and elasticsearch and obviously django are required.

When we're using e.g. synonyms or stopwords in our indices, we need to reindex our data in order to have the new settings on board. Elasticsearch documentation rises this problem and suggest how to fix, see Reindex Your Data. Well, let's do with haystack!

Suppose we have this haystack settings:

HAYSTACK_CONNECTIONS = {
    'default': {
        'ENGINE': 'search.backend.ElasticsearchEngineCustom',
        'URL': 'http://127.0.0.1:9200/',
        'INDEX_NAME': 'haystack',
        'BATCH_SIZE': 1000
    }
}

As you can see I created a custom backend, but it's not the focus on this article, please read the links below to get an idea:
As a sample, for this article the backend module looks like:

from haystack.backends.elasticsearch_backend import ElasticsearchSearchBackend

class ElasticsearchEngineBackendCustom(ElasticsearchSearchBackend):
    def __init__(self, connection_alias, **connection_options):
        super(ElasticsearchEngineBackendCustom, self).__init__(connection_alias, **connection_options)

        self.setup_complete = True

class ElasticsearchEngineCustom(ElasticsearchSearchEngine):
    backend = ElasticsearchEngineBackendCustom

Setting self.setup_complete to True avoids the 'put index' call from haystack setup and permits us to use INDEX_NAME as index alias name.

Now we have to manage all the haystack index setup via management commands overriding rebuild_index and creating the reindex_index command, let's do it.

In a module named utils, I created the current_index function that is an utility function that return the current index in use and the next version number for the current index (we'll use the number on management commands). Below the code:
 
INDEX_TEMPLATE = "{}_v{}"

def number_sequence():
    n = 0
    while True:
        yield n
        n += 1

def current_index(es_client, index_name):
    version = number_sequence()
    index = INDEX_TEMPLATE.format(index_name, version.next())

    if not es_client.indices.exists_alias(name=index_name):
        return INDEX_TEMPLATE.format(index_name, 0), 1

    while not es_client.indices.exists(index=index):
        index = INDEX_TEMPLATE.format(index_name, version.next())

    return index, version.next()

The guard for exists_alias is used to know if the rebuild_index has been run at least once. If no alias is present, there is no index. The version number added to the name start from 0. The default index, when the rebuild_index command is called will be haystack_v0.
An objection is the use of while not... because the function search from zero to current index version number and it could be a great number and for each number hits the elasticsearch; it could be more efficient having two indexes name. Take this as a "quick win" solution and improving it!

Let's go to write the reindex_index command, the steps are:
  • Delete the new index ignoring the errors
  • Create the new index with the current index settings and fields mapping
  • Reindex data from current index to new index
  • Zero downtime: remove from alias the current index and add the new index
  • Delete current index, that is old. The new one is used 
from elasticsearch import Elasticsearch
from elasticsearch.helpers import reindex
from django.core.management.base import BaseCommand
from django.conf import settings
from search.utils import current_index, INDEX_TEMPLATE

class Command(BaseCommand):

    def __init__(self):
        super(Command, self).__init__()

        self.es_client = Elasticsearch(hosts=settings.HAYSTACK_CONNECTIONS['default']['URL'])
        self.index_alias = settings.HAYSTACK_CONNECTIONS['default']['INDEX_NAME']
        self.current_index, version = current_index(self.es_client, self.index_alias)
        self.new_index = INDEX_TEMPLATE.format(self.index_alias, version)
        # Update settings with fields mapping
        self.index_settings = settings.ELASTICSEARCH_INDEX_SETTINGS
        self.index_settings.update(self.es_client.indices.get_mapping()[self.current_index])

    def handle(self, *args, **options):
        self.es_client.indices.delete(index=self.new_index, ignore=[404, 400])
        self.es_client.indices.create(index=self.new_index, body=self.index_settings)
        reindex(self.es_client, self.current_index, self.new_index)
        update_aliases = {
            "actions": [
                {"remove": {"index": self.current_index, "alias": self.index_alias}},
                {"add": {"index": self.new_index, "alias": self.index_alias}}
            ]
        }
        self.es_client.indices.update_aliases(body=update_aliases)
        self.es_client.indices.delete(index=self.current_index)

        print(u"Successfully reindex.")


Now we need to override rebuild_index command with:
  • Rewriting clear_index call
  • Creating the alias using the INDEX_NAME from haystack settings
    • delete all index that match haystack_v*
    • create the index "v0" with haystack settings
    • create the alias haystack to haystack_v0
  • Call the update_index
from elasticsearch import Elasticsearch

from django.conf import settings
from django.core.management import call_command
from haystack.backends.elasticsearch_backend import ElasticsearchSearchBackend
from haystack.management.commands import rebuild_index
from search.utils import INDEX_TEMPLATE

class Command(rebuild_index.Command):

    def __init__(self):
        super(Command, self).__init__()
        self.es_client = Elasticsearch(hosts=settings.HAYSTACK_CONNECTIONS['default']['URL'])
        self.index_name = INDEX_TEMPLATE.format(settings.HAYSTACK_CONNECTIONS['default']['INDEX_NAME'], 0)
        self.index_alias = settings.HAYSTACK_CONNECTIONS['default']['INDEX_NAME']

    def _create_index_alias(self):
        self.es_client.indices.delete(index=self.index_name.replace("0", "*"))
        self.es_client.indices.create(index=self.index_name,
                                      body=ElasticsearchSearchBackend.DEFAULT_SETTINGS)
        print(u"Created index {}".format(self.index_name))
        self.es_client.indices.put_alias(index=self.index_name, name=self.index_alias)
        print(u"Added index {}_v0 to alias {}.".format(self.index_name, self.index_alias))

    def handle(self, **options):
        self._create_index_alias()
        call_command('update_index', **options)

Instead of ElasticsearchSearchBackend.DEFAULT_SETTINGS you should use your elasticsearch settings. The links above explained how to make it.
A nice improvement is having the yes/no options, the rebuild command is destructive.

Any suggestions will be appreciated! That's it. Enjoy! :-)

Cheers