Skip to content

Commit

Permalink
Create a custom AddIndexConcurrently operation
Browse files Browse the repository at this point in the history
Django already provides an AddIndexConcurrently operation class.
However, it comes with some limitations:

1. It does not support `IF NOT EXISTS`. The Django team believes that
   this isn't necessary, though we disagree:
   https://forum.djangoproject.com/t/missing-support-for-create-index-concurrently-if-not-exists/30283
2. The operation doesn't manage lock timeouts. The CREATE INDEX
   CONCURRENTLY operation has three phases. In the first phase, the
   operation must wait for **all** existing transactions to finish
   before moving to the second phase, otherwise a potential lock timeout
   error is raised. The second phase effectively builds the index, and
   the third phase makes sure any rows touched during the second phase
   are updated.

This new class provides a solution that encompasses all these errors and
make the AddIndexConcurrently idempotent.
  • Loading branch information
marcelofern committed Jul 30, 2024
1 parent 9c8d088 commit f7c7a7a
Show file tree
Hide file tree
Showing 2 changed files with 339 additions and 0 deletions.
162 changes: 162 additions & 0 deletions src/django_pg_migration_tools/operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from __future__ import annotations

from textwrap import dedent
from typing import Any

from django.contrib.postgres import operations as psql_operations
from django.db import migrations, models
from django.db.backends.base import schema as base_schema
from django.db.migrations.operations import base as migrations_base


class AddIndexConcurrently(
migrations_base.Operation, psql_operations.NotInTransactionMixin
):
"""
This class mimics the behaviour of:
django.contrib.postgres.operations.AddIndexConcurrently
However, it uses `django.db.migrations.operations.base.Operation` as a base
class due to limitations of Django's AddIndexConcurrently.
One such limitation is that Django's AddIndexConcurrently does not provide
easy hooks so that we can add the conditional `IF NOT EXISTS` to the
`CREATE INDEX CONCURRENTLY` command, which is something we must have here.
As a compromise, this class implements the same input interface as Django's
AddIndexConcurrently, so that the developer using it doesn't "feel" any
differences.
"""

atomic = False

SHOW_LOCK_TIMEOUT_QUERY = "SHOW lock_timeout;"

SET_LOCK_TIMEOUT_QUERY = "SET lock_timeout = %(lock_timeout)s"

CHECK_INVALID_INDEX_QUERY = dedent("""
SELECT relname
FROM pg_class, pg_index
WHERE (
pg_index.indisvalid = false
AND pg_index.indexrelid = pg_class.oid
AND relname = %(index_name)s
);
""")

DROP_INDEX_QUERY = 'DROP INDEX CONCURRENTLY IF EXISTS "{}";'

def __init__(self, model_name: str, index: models.Index) -> None:
self.model_name = model_name
self.index = index
self.original_lock_timeout = ""

def describe(self) -> str:
return (
f"Concurrently creates index {self.index.name} on field(s) "
f"{self.index.fields} of model {self.model_name} if the index "
f"does not exist. NOTE: Using django_pg_migration_tools "
f"AddIndexConcurrently operation."
)

def database_forwards(
self,
app_label: str,
schema_editor: base_schema.BaseDatabaseSchemaEditor,
from_state: migrations.state.ProjectState,
to_state: migrations.state.ProjectState,
) -> None:
self._ensure_not_in_transaction(schema_editor)
self._ensure_no_lock_timeout_set(schema_editor)
self._ensure_not_an_invalid_index(schema_editor)
model = from_state.apps.get_model(app_label, self.model_name)
index_sql = str(self.index.create_sql(model, schema_editor, concurrently=True))
# Inject the IF NOT EXISTS because Django doesn't provide a handy
# if_not_exists: bool parameter for us to use.
index_sql = index_sql.replace(
"CREATE INDEX CONCURRENTLY", "CREATE INDEX CONCURRENTLY IF NOT EXISTS"
)
schema_editor.execute(index_sql)
self._ensure_original_lock_timeout_is_reset(schema_editor)

def database_backwards(
self,
app_label: str,
schema_editor: base_schema.BaseDatabaseSchemaEditor,
from_state: migrations.state.ProjectState,
to_state: migrations.state.ProjectState,
) -> None:
self._ensure_not_in_transaction(schema_editor)
self._ensure_no_lock_timeout_set(schema_editor)
model = from_state.apps.get_model(app_label, self.model_name)
index_sql = str(self.index.remove_sql(model, schema_editor, concurrently=True))
# Differently from the CREATE INDEX operation, Django already provides
# us with IF EXISTS when dropping an index... We don't have to do that
# .replace() call here.
schema_editor.execute(index_sql)
self._ensure_original_lock_timeout_is_reset(schema_editor)

def _ensure_no_lock_timeout_set(
self,
schema_editor: base_schema.BaseDatabaseSchemaEditor,
) -> None:
cursor = schema_editor.connection.cursor()
cursor.execute(self.SHOW_LOCK_TIMEOUT_QUERY)
self.original_lock_timeout = cursor.fetchone()[0]
cursor.execute(self.SET_LOCK_TIMEOUT_QUERY, {"lock_timeout": 0})

def _ensure_not_an_invalid_index(
self,
schema_editor: base_schema.BaseDatabaseSchemaEditor,
) -> None:
"""
It is possible that the migration would have failed when:
1. We created an index manually and it failed and we didn't notice.
2. The migration is being automatically retried and the first
attempt failed and generated an invalid index.
One potential cause of failure that might trigger number 2 is if we
have deadlocks on the table at the time the migration runs. Another is
if the migrations was accidentally ran with a lock_timeout value, and
the operation timed out.
In those cases we want to drop the invalid index first so that it can
be recreated on next steps via CREATE INDEX CONCURRENTLY IF EXISTS.
"""
cursor = schema_editor.connection.cursor()
cursor.execute(self.CHECK_INVALID_INDEX_QUERY, {"index_name": self.index.name})
if cursor.fetchone():
# In theory we don't need the `IF EXISTS` in the DROP INDEX query,
# but it makes the statement run faster because Postgres doesn't
# need to look up the `information_schema` table to check if the
# index exists first, which can be slow during high-concurrent
# operation time.
cursor.execute(self.DROP_INDEX_QUERY.format(self.index.name))

def _ensure_original_lock_timeout_is_reset(
self,
schema_editor: base_schema.BaseDatabaseSchemaEditor,
) -> None:
cursor = schema_editor.connection.cursor()
cursor.execute(
self.SET_LOCK_TIMEOUT_QUERY, {"lock_timeout": self.original_lock_timeout}
)

# The following methods are necessary for Django to understand state
# changes.
def state_forwards(
self, app_label: str, state: migrations.state.ProjectState
) -> None:
state.add_index(app_label, self.model_name.lower(), self.index)

def deconstruct(self) -> tuple[str, list, dict[str, Any]]:
return (
self.__class__.__qualname__,
[],
{"model_name": self.model_name, "index": self.index},
)

@property
def reversible(self) -> bool: # type: ignore[override]
return True
177 changes: 177 additions & 0 deletions tests/django_pg_migration_tools/test_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from textwrap import dedent

import pytest
from django.db import (
NotSupportedError,
connection,
)
from django.db.migrations.state import (
ModelState,
ProjectState,
)
from django.db.models import Index
from django.test import utils

from django_pg_migration_tools import operations
from tests.example_app.models import IntModel


_CHECK_INDEX_EXISTS_QUERY = """
SELECT indexname FROM pg_indexes
WHERE (
tablename = 'example_app_intmodel'
AND indexname = 'int_field_idx'
);
"""

_CHECK_VALID_INDEX_EXISTS_QUERY = """
SELECT relname
FROM pg_class, pg_index
WHERE (
pg_index.indisvalid = true
AND pg_index.indexrelid = pg_class.oid
AND relname = 'int_field_idx'
);
"""

_CREATE_INDEX_QUERY = """
CREATE INDEX "int_field_idx"
ON "example_app_intmodel" ("int_field");
"""

_SET_INDEX_INVALID = """
UPDATE pg_index
SET indisvalid = false
WHERE indexrelid = (
SELECT c.oid
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE c.relname = 'int_field_idx'
)::regclass;
"""

_SET_LOCK_TIMEOUT = """
SET SESSION lock_timeout = 1000;
"""


class TestAddIndexConcurrently:
app_label = "example_app"

@pytest.mark.django_db
def test_requires_atomic_false(self):
project_state = ProjectState()
new_state = project_state.clone()
operation = operations.AddIndexConcurrently(
"IntModel", Index(fields=["int_field"], name="int_field_idx")
)
with pytest.raises(NotSupportedError):
with connection.schema_editor(atomic=True) as editor:
operation.database_forwards(
self.app_label, editor, project_state, new_state
)

# Disable the overall test transaction because a concurrent index cannot
# be triggered/tested inside of a transaction.
@pytest.mark.django_db(transaction=True)
def test_add(self):
with connection.cursor() as cursor:
# We first create the index and set it to invalid, to make sure it
# will be removed automatically by the operation before re-creating
# the index.
cursor.execute(_CREATE_INDEX_QUERY)
cursor.execute(_SET_INDEX_INVALID)
# Also, set the lock_timeout to check it has been returned to
# it's original value once the index creation is completed.
cursor.execute(_SET_LOCK_TIMEOUT)

# Prove that the invalid index exists before the operation runs:
with connection.cursor() as cursor:
cursor.execute(
operations.AddIndexConcurrently.CHECK_INVALID_INDEX_QUERY,
{"index_name": "int_field_idx"},
)
assert cursor.fetchone()

project_state = ProjectState()
project_state.add_model(ModelState.from_model(IntModel))
new_state = project_state.clone()

# Set the operation that will drop the invalid index and re-create it
# (without lock timeouts).
index = Index(fields=["int_field"], name="int_field_idx")
operation = operations.AddIndexConcurrently("IntModel", index)

assert operation.describe() == (
"Concurrently creates index int_field_idx on field(s) "
"['int_field'] of model IntModel if the index "
"does not exist. NOTE: Using django_pg_migration_tools "
"AddIndexConcurrently operation."
)

name, args, kwargs = operation.deconstruct()
assert name == "AddIndexConcurrently"
assert args == []
assert kwargs == {"model_name": "IntModel", "index": index}

operation.state_forwards(self.app_label, new_state)
assert len(new_state.models[self.app_label, "intmodel"].options["indexes"]) == 1
assert (
new_state.models[self.app_label, "intmodel"].options["indexes"][0].name
== "int_field_idx"
)
# Proceed to add the index:
with connection.schema_editor(atomic=False, collect_sql=False) as editor:
with utils.CaptureQueriesContext(connection) as queries:
operation.database_forwards(
self.app_label, editor, project_state, new_state
)

# Assert the invalid index has been replaced by a valid index.
with connection.cursor() as cursor:
cursor.execute(_CHECK_VALID_INDEX_EXISTS_QUERY)
assert cursor.fetchone()

# Assert the lock_timeout has been set back to the default (1s)
with connection.cursor() as cursor:
cursor.execute(operations.AddIndexConcurrently.SHOW_LOCK_TIMEOUT_QUERY)
assert cursor.fetchone()[0] == "1s"

# Assert on the sequence of expected SQL queries:
# 1. Check the original lock_timeout value to be able to restore it
# later.
assert queries[0]["sql"] == "SHOW lock_timeout;"
# 2. Remove the timeout.
assert queries[1]["sql"] == "SET lock_timeout = 0"
# 3. Verify if the index is invalid.
assert queries[2]["sql"] == dedent("""
SELECT relname
FROM pg_class, pg_index
WHERE (
pg_index.indisvalid = false
AND pg_index.indexrelid = pg_class.oid
AND relname = 'int_field_idx'
);
""")
# 4. Drop the index because in this case it was invalid!
assert queries[3]["sql"] == 'DROP INDEX CONCURRENTLY IF EXISTS "int_field_idx";'
# 5. Finally create the index concurrently.
assert (
queries[4]["sql"]
== 'CREATE INDEX CONCURRENTLY IF NOT EXISTS "int_field_idx" ON "example_app_intmodel" ("int_field")'
)
# 6. Set the timeout back to what it was originally.
assert queries[5]["sql"] == "SET lock_timeout = '1s'"

# Now rollback and remove the index to verify that the reverse
# operation also works and produces the proper SQL statements.
with connection.schema_editor(atomic=False, collect_sql=False) as editor:
with utils.CaptureQueriesContext(connection):
operation.database_backwards(
self.app_label, editor, project_state, new_state
)

# Verify the index has been deleted.
with connection.cursor() as cursor:
cursor.execute(_CHECK_INDEX_EXISTS_QUERY)
assert not cursor.fetchone()

0 comments on commit f7c7a7a

Please sign in to comment.