diff --git a/README.rst b/README.rst
index e2b661144dbe2ff6434b97f4b730cb922a7ed7cc..0b319fb23a6a02edc96ae4e4073f2e346da29503 100644
--- a/README.rst
+++ b/README.rst
@@ -78,6 +78,9 @@ template tags, templates, etc. that can be used by your own codebase.
   Privacy-by-design support, allowing consent to be requested and tracked
   and personal information redacted
 
+* djblets.protect_ -
+  Locks, rate limiting, and other service protection mechanisms.
+
 * djblets.recaptcha_ -
   Mixins and form widgets for reCAPTCHA_ integration
 
diff --git a/djblets/cache/backend.py b/djblets/cache/backend.py
index 3822399d0d1c00dda276304334876b848b05ab72..7686cc52b2db2984a467bb30aa7f0dd3ef903db3 100644
--- a/djblets/cache/backend.py
+++ b/djblets/cache/backend.py
@@ -39,6 +39,7 @@ if TYPE_CHECKING:
     from django.core.cache.backends.base import BaseCache
     from typing_extensions import Annotated, TypeAlias
 
+    from djblets.protect.locks import CacheLock
     from djblets.util.symbols import Unsettable
 
     _T = TypeVar('_T')
@@ -109,6 +110,12 @@ class _CacheContext:
     #: The full cache key used for caching operations.
     full_cache_key: str
 
+    #: An optional cache lock to guard recomputing and writing data.
+    #:
+    #: Version Added:
+    #:     5.3
+    lock: CacheLock | None
+
     #: Whether to use encryption when storing or reading data.
     use_encryption: bool
 
@@ -121,13 +128,16 @@ class _CacheContext:
         compress_large_data: bool,
         use_encryption: bool | None,
         encryption_key: bytes | None,
+        lock: CacheLock | None,
     ) -> None:
         """Initialize the context.
 
         Version Changed:
             5.3:
-            ``base_cache_key`` may now be a sequence of string components of
-            the key.
+            * ``base_cache_key`` may now be a sequence of string components of
+              the key.
+
+            * Added the ``lock`` argument.
 
         Args:
             cache (django.core.cache.backends.base.BaseCache):
@@ -168,6 +178,12 @@ class _CacheContext:
                 to the default AES encryption key for the server as provided by
                 :py:func:`djblets.secrets.crypto.
                 get_default_aes_encryption_key`.
+
+            lock (djblets.protect.locks.CacheLock):
+                An optional cache lock to guard recomputing and writing data.
+
+                Version Added:
+                    5.3
         """
         if use_encryption is None:
             use_encryption = _get_default_use_encryption()
@@ -190,6 +206,20 @@ class _CacheContext:
 
         self.full_cache_key = self.make_key(base_cache_key)
 
+        # Set a default cache key for the lock, if an explicit key is not
+        # provided.
+        if lock and not lock.full_cache_key:
+            lock_key: str | Sequence[str]
+
+            if isinstance(base_cache_key, str):
+                lock_key = f'_lock_:{base_cache_key}'
+            else:
+                lock_key = ['_lock_', *base_cache_key]
+
+            lock.full_cache_key = self.make_key(lock_key)
+
+        self.lock = lock
+
     def make_key(
         self,
         key: str | Sequence[str],
@@ -326,7 +356,8 @@ class _CacheContext:
 
         Returns:
             object:
-            The value from the cache, or :py:data:`_NO_RESULTS` if not found.
+            The value from the cache, or :py:data:`djblets.util.symbols.UNSET`
+            if not found.
 
         Raises:
             Exception:
@@ -362,6 +393,60 @@ class _CacheContext:
 
         return value
 
+    def load_value_or_lock_for_write(
+        self,
+        key: (str | None) = None,
+    ) -> Any:
+        """Load a value from cache, and lock if not present.
+
+        This will optimistically attempt to fetch a value from the cache.
+        If it's not present, this will attempt to acquire a lock before
+        checking again and then returning.
+
+        If the lock times out, the timeout will be logged and not acquired.
+        The caller may still write to the cache.
+
+        Version Added:
+            5.3
+
+        Args:
+            key (str):
+                The full cache key to load from cache.
+
+                If not provided, the full main cache key will be used.
+
+        Returns:
+            object:
+            The value from the cache, or :py:data:`djblets.util.symbols.UNSET`
+            if not found.
+
+        Raises:
+            Exception:
+                An error occurred reading from cache or processing results.
+                The exception is raised as-is.
+        """
+        value = self.load_value(key)
+
+        if value is UNSET:
+            # There was no value in the cache. We'll want to write, so first
+            # acquire a lock if there's one passed.
+            lock = self.lock
+
+            if lock is not None:
+                try:
+                    lock.acquire()
+                except TimeoutError:
+                    logger.error('Timeout waiting on distributed cache lock '
+                                 '%r',
+                                 lock)
+
+                # We've either acquired a lock or timed out waiting for one.
+                # Check if there's a new value in the cache and return that
+                # result.
+                value = self.load_value(key)
+
+        return value
+
     def store_value(
         self,
         value: Any,
@@ -793,6 +878,7 @@ def _cache_store_chunks(
     read_start: int = 0
     i: int = 0
     error: (Exception | None) = None
+    lock = cache_context.lock
 
     for data, has_item, item in items:
         if has_item:
@@ -830,6 +916,9 @@ def _cache_store_chunks(
             # Store the keys in the cache in a single request.
             try:
                 cache_context.store_many(cached_data)
+
+                if lock and lock.locked():
+                    lock.update_expiration()
             except Exception as e:
                 # Store this error and skip any further cache operations.
                 error = e
@@ -851,6 +940,10 @@ def _cache_store_chunks(
             cache_context.store_value([chunk],
                                       key=cache_context.make_subkey(i),
                                       raw=True)
+
+            if lock and lock.locked():
+                lock.update_expiration()
+
             i += 1
         except Exception as e:
             # Store this error and skip any further cache operations.
@@ -956,6 +1049,7 @@ def cache_memoize_iter(
     compress_large_data: bool = True,
     use_encryption: (bool | None) = None,
     encryption_key: (bytes | None) = None,
+    lock: (CacheLock | None) = None,
 ) -> Iterator[_T]:
     """Memoize an iterable list of items inside the configured cache.
 
@@ -985,7 +1079,8 @@ def cache_memoize_iter(
 
     Version Changed:
         5.3:
-        ``key`` may now be a sequence of string components of the key.
+        * ``key`` may now be a sequence of string components of the key.
+        * Added the ``lock`` argument.
 
     Version Changed:
         5.1:
@@ -1049,6 +1144,12 @@ def cache_memoize_iter(
             Version Added:
                 3.0
 
+        lock (djblets.protect.locks.CacheLock):
+            An optional cache lock to guard recomputing and writing data.
+
+            Version Added:
+                5.3
+
     Yields:
         object:
         The list of items from the cache or from ``items_or_callable`` if
@@ -1068,85 +1169,97 @@ def cache_memoize_iter(
         expiration=expiration,
         compress_large_data=compress_large_data,
         use_encryption=use_encryption,
-        encryption_key=encryption_key)
+        encryption_key=encryption_key,
+        lock=lock,
+    )
     full_cache_key = cache_context.full_cache_key
 
     results: Unsettable[Iterable[_T]] = UNSET
     data_from_cache: bool = False
 
-    if not force_overwrite:
-        try:
-            chunk_count = cache_context.load_value()
-        except Exception as e:
-            # We've logged specifics, but add some more context.
-            logger.error('Failed to fetch large or iterable data entry count '
-                         'from cache for key "%s". Rebuilding data. '
-                         'Error = %s',
-                         cache_context.full_cache_key, e)
-            chunk_count = UNSET
-
-        if chunk_count is not UNSET:
-            data_from_cache = True
-
+    try:
+        if not force_overwrite:
             try:
-                results = _cache_iter_large_data(
-                    cache_context=cache_context,
-                    data=_cache_fetch_large_data(
-                        cache_context=cache_context,
-                        chunk_count=int(chunk_count)))
+                chunk_count = cache_context.load_value_or_lock_for_write()
             except Exception as e:
-                logger.warning('Failed to fetch large or iterable data from '
-                               'cache for key "%s": %s',
-                               full_cache_key, e)
-                results = UNSET
-        else:
-            logger.debug('Cache miss for key "%s"' % full_cache_key)
+                # We've logged specifics, but add some more context.
+                logger.error('Failed to fetch large or iterable data entry '
+                             'count from cache for key "%s". Rebuilding '
+                             'data. Error = %s',
+                             cache_context.full_cache_key, e)
+                chunk_count = UNSET
+
+            if chunk_count is not UNSET:
+                data_from_cache = True
+
+                try:
+                    results = _cache_iter_large_data(
+                        cache_context=cache_context,
+                        data=_cache_fetch_large_data(
+                            cache_context=cache_context,
+                            chunk_count=int(chunk_count)))
+                except Exception as e:
+                    logger.warning('Failed to fetch large or iterable data '
+                                   'from cache for key "%s": %s',
+                                   full_cache_key, e)
+                    results = UNSET
+            else:
+                # The value was not found in cache. It will need to be
+                # recomputed.
+                logger.debug('Cache miss for key "%s"',
+                             full_cache_key)
+
+        if results is UNSET:
+            data_from_cache = False
+
+            if callable(items_or_callable):
+                # NOTE: We don't want to catch exceptions here, since a
+                #       function may need to bubble exceptions up to a caller.
+                items = items_or_callable()
+            else:
+                items = items_or_callable
 
-    if results is UNSET:
-        data_from_cache = False
+            try:
+                results = _cache_store_items(cache_context=cache_context,
+                                             items=items)
+            except Exception as e:
+                # We've logged specifics. Log a general error.
+                #
+                # Note that this will only happen if an error occurs prior to
+                # yielding results. It shouldn't be cache backend-related.
+                logger.error('Failed to generate large or iterable cache data '
+                             'for key "%s". Newly-generated data will be '
+                             'returned but not cached. Error = %s',
+                             full_cache_key, e)
 
-        if callable(items_or_callable):
-            # NOTE: We don't want to catch exceptions here, since a function
-            #       may need to bubble exceptions up to a caller.
-            items = items_or_callable()
-        else:
-            items = items_or_callable
+                # Return the results as-is without caching.
+                results = items
 
+        # Yield the results to the caller.
         try:
-            results = _cache_store_items(cache_context=cache_context,
-                                         items=items)
+            yield from results
         except Exception as e:
-            # We've logged specifics. Log a general error.
-            #
-            # Note that this will only happen if an error occurs prior to
-            # yielding results. It shouldn't be cache backend-related.
-            logger.error('Failed to generate large or iterable cache data for '
-                         'key "%s". Newly-generated data will be returned but '
-                         'not cached. Error = %s',
-                         full_cache_key, e)
-
-            # Return the results as-is without caching.
-            results = items
-
-    # Yield the results to the caller.
-    try:
-        yield from results
-    except Exception as e:
-        if data_from_cache:
-            # This really shouldn't happen, since we've already fetched it
-            # above, but we'll log it just in case.
-            logger.error('Failed to return large or iterable cache data for '
-                         'key "%s". Something went wrong when returning '
-                         'processed results to the caller. Error = %s',
-                         full_cache_key, e)
-        else:
-            # The newly-generated results couldn't be cached. At this point,
-            # we should have yielded all data to the caller, and this should
-            # be the final error caught and raised at the end of that process.
-            logger.error('Failed to store or retrieve large or iterable '
-                         'cache data for key "%s". Newly-generated data '
-                         'will be returned but not cached. Error = %s',
-                         full_cache_key, e)
+            if data_from_cache:
+                # This really shouldn't happen, since we've already fetched it
+                # above, but we'll log it just in case.
+                logger.error('Failed to return large or iterable cache data '
+                             'for key "%s". Something went wrong when '
+                             'returning processed results to the caller. '
+                             'Error = %s',
+                             full_cache_key, e)
+            else:
+                # The newly-generated results couldn't be cached. At this
+                # point, we should have yielded all data to the caller, and
+                # this should be the final error caught and raised at the end
+                # of that process.
+                logger.error('Failed to store or retrieve large or iterable '
+                             'cache data for key "%s". Newly-generated data '
+                             'will be returned but not cached. Error = %s',
+                             full_cache_key, e)
+    finally:
+        # If there's an active write lock established, release it.
+        if lock and lock.locked():
+            lock.release()
 
 
 @deprecate_non_keyword_only_args(RemovedInDjblets70Warning)
@@ -1161,6 +1274,7 @@ def cache_memoize(
     use_generator: bool = False,
     use_encryption: (bool | None) = None,
     encryption_key: (bytes | None) = None,
+    lock: (CacheLock | None) = None,
 ) -> _T:
     """Memoize the results of a callable inside the configured cache.
 
@@ -1176,7 +1290,8 @@ def cache_memoize(
 
     Version Changed:
         5.3:
-        ``key`` may now be a sequence of string components of the key.
+        * ``key`` may now be a sequence of string components of the key.
+        * Added the ``lock`` argument.
 
     Version Changed:
         5.1:
@@ -1257,6 +1372,12 @@ def cache_memoize(
             Version Added:
                 3.0
 
+        lock (djblets.protect.locks.CacheLock):
+            An optional cache lock to guard recomputing and writing data.
+
+            Version Added:
+                5.3
+
     Returns:
         object:
         The cached data, or the result of ``lookup_callable`` if uncached.
@@ -1272,6 +1393,7 @@ def cache_memoize(
             expiration=expiration,
             force_overwrite=force_overwrite,
             compress_large_data=compress_large_data,
+            lock=lock,
             use_encryption=use_encryption,
             encryption_key=encryption_key))
 
@@ -1285,50 +1407,61 @@ def cache_memoize(
             expiration=expiration,
             compress_large_data=compress_large_data,
             use_encryption=use_encryption,
-            encryption_key=encryption_key)
+            encryption_key=encryption_key,
+            lock=lock,
+        )
         full_cache_key = cache_context.full_cache_key
 
-        if not force_overwrite:
+        try:
+            if not force_overwrite:
+                try:
+                    result = cache_context.load_value_or_lock_for_write(
+                        full_cache_key)
+                except Exception:
+                    # We've already logged enough information for this. Proceed
+                    # to generate new data.
+                    result = UNSET
+
+                if result is not UNSET:
+                    return result
+
+                # The value was not found in cache. It will need to be
+                # recomputed.
+                logger.debug('Cache miss for key "%s"',
+                             full_cache_key)
+
+            data = lookup_callable()
+
+            # Most people will be using memcached, and memcached has a limit of
+            # 1MB. Data this big should be broken up somehow, so let's warn
+            # about this. Users should hopefully be using large_data=True which
+            # will handle this appropriately.
+            #
+            # If we do get here, we try to do some sanity checking.
+            # python-memcached will return a result in the case where the data
+            # exceeds the value size, which Django will then silently use to
+            # clear out the key. We won't know at all whether we had success
+            # unless we come back and try to verify the value.
+            #
+            # This check handles the common case of large string data being
+            # stored in cache. It's still possible to attempt to store large
+            # data structures (where len(data) might be something like '6' but
+            # the serialized value is huge), where this can still fail.
+            if (isinstance(data, str) and
+                len(data) >= CACHE_CHUNK_SIZE):
+                logger.warning('Cache data for key "%s" (length %s) may be '
+                               'too big for the cache.',
+                               full_cache_key, len(data))
+
             try:
-                result = cache_context.load_value(full_cache_key)
+                cache_context.store_value(data, key=full_cache_key)
             except Exception:
-                # We've already logged enough information for this. Proceed
-                # to generate new data.
-                result = UNSET
-
-            if result is not UNSET:
-                return result
-            else:
-                logger.debug('Cache miss for key "%s"' % full_cache_key)
-
-        data = lookup_callable()
-
-        # Most people will be using memcached, and memcached has a limit of
-        # 1MB. Data this big should be broken up somehow, so let's warn
-        # about this. Users should hopefully be using large_data=True which
-        # will handle this appropriately.
-        #
-        # If we do get here, we try to do some sanity checking.
-        # python-memcached will return a result in the case where the data
-        # exceeds the value size, which Django will then silently use to clear
-        # out the key. We won't know at all whether we had success unless we
-        # come back and try to verify the value.
-        #
-        # This check handles the common case of large string data being stored
-        # in cache. It's still possible to attempt to store large data
-        # structures (where len(data) might be something like '6' but the
-        # serialized value is huge), where this can still fail.
-        if (isinstance(data, str) and
-            len(data) >= CACHE_CHUNK_SIZE):
-            logger.warning('Cache data for key "%s" (length %s) may be too '
-                           'big for the cache.',
-                           full_cache_key, len(data))
-
-        try:
-            cache_context.store_value(data, key=full_cache_key)
-        except Exception:
-            # We've already caught and logged this error.
-            pass
+                # We've already caught and logged this error.
+                pass
+        finally:
+            # If there's an active write lock established, release it.
+            if lock and lock.locked():
+                lock.release()
 
         return data
 
diff --git a/djblets/cache/tests/test_backend.py b/djblets/cache/tests/test_backend.py
index fc37c249112ac6b78e981236cf8918b81a4165cb..3a8571756b5309255bf3fffb68fe18e3187653ed 100644
--- a/djblets/cache/tests/test_backend.py
+++ b/djblets/cache/tests/test_backend.py
@@ -5,6 +5,7 @@ from __future__ import annotations
 import inspect
 import pickle
 import re
+import time
 import zlib
 from typing import TYPE_CHECKING
 
@@ -20,11 +21,12 @@ from djblets.cache.backend import (CACHE_CHUNK_SIZE,
                                    cache_memoize_iter,
                                    make_cache_key,
                                    _get_default_encryption_key)
+from djblets.protect.locks import CacheLock
 from djblets.secrets.crypto import AES_BLOCK_SIZE, aes_decrypt, aes_encrypt
 from djblets.testing.testcases import TestCase
 
 if TYPE_CHECKING:
-    from collections.abc import Sequence
+    from collections.abc import Iterator, Sequence
 
 
 class BaseCacheTestCase(kgb.SpyAgency, TestCase):
@@ -37,15 +39,33 @@ class BaseCacheTestCase(kgb.SpyAgency, TestCase):
 
         cache.clear()
 
-    def build_test_chunk_data(self, num_chunks, extra_len=0,
-                              use_compression=False, use_encryption=False,
-                              encryption_key=None):
+    def build_test_chunk_data(
+        self,
+        *,
+        data_char: str = 'x',
+        num_chunks: int,
+        extra_len: int = 0,
+        use_compression: bool = False,
+        use_encryption: bool = False,
+        encryption_key: (bytes | None) = None,
+    ) -> tuple[str, bytes]:
         """Build enough test data to fill up the specified number of chunks.
 
         This takes into account the size of the pickle data, and will
         get us to exactly the specified number of chunks of data in the cache.
 
+        Version Changed:
+            5.3:
+            * Made all arguments keyword-only.
+            * Added the ``data_char`` argumnet.
+
         Args:
+            data_char (str, optional):
+                The character to use for the test value.
+
+                Version Added:
+                    5.3
+
             num_chunks (int):
                 The number of chunks to build.
 
@@ -65,18 +85,23 @@ class BaseCacheTestCase(kgb.SpyAgency, TestCase):
             tuple:
             A 2-tuple containing:
 
-            1. The raw generated data.
-            2. The resulting chunk data to store.
+            Tuple:
+                0 (str):
+                    The raw generated data.
+
+                1 (bytes):
+                    The resulting chunk data to store.
         """
         data_len = CACHE_CHUNK_SIZE * num_chunks - 3 * num_chunks + extra_len
 
         if use_encryption:
             data_len -= AES_BLOCK_SIZE
 
-        data = 'x' * data_len
+        data = data_char * data_len
 
         chunk_data = pickle.dumps(data, protocol=0)
-        self.assertTrue(chunk_data.startswith(b'Vxxxxxxxx'))
+        self.assertTrue(chunk_data.startswith(
+            b'V%s' % (data_char.encode('utf-8') * 8)))
 
         if use_compression:
             chunk_data = zlib.compress(chunk_data)
@@ -983,6 +1008,273 @@ class CacheMemoizeTests(BaseCacheTestCase):
         self.assertEqual(result, data)
         self.assertSpyCallCount(cache_func, 1)
 
+    def test_with_lock_and_in_cache(self) -> None:
+        """Testing cache_memoize with lock and data in cache"""
+        lock = CacheLock(timeout_secs=0.1,
+                         retry_secs=0.05)
+
+        self.spy_on(lock.acquire)
+        self.spy_on(lock.release)
+
+        cache_key = 'abc123'
+
+        def cache_func() -> str:
+            return 'new result'
+
+        self.spy_on(cache_func)
+
+        cache.set(make_cache_key(cache_key), 'existing result')
+
+        result = cache_memoize(cache_key,
+                               cache_func,
+                               lock=lock)
+        self.assertEqual(result, 'existing result')
+        self.assertSpyNotCalled(cache_func)
+
+        self.assertSpyNotCalled(lock.acquire)
+        self.assertSpyNotCalled(lock.release)
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_with_lock_and_cache_miss(self) -> None:
+        """Testing cache_memoize with lock and cache miss"""
+        lock = CacheLock(timeout_secs=0.1,
+                         retry_secs=0.05)
+
+        self.spy_on(lock.acquire)
+        self.spy_on(lock.release)
+
+        cache_key = 'abc123'
+        data = 'test123'
+
+        def cache_func() -> str:
+            self.assertEqual(cache.get(lock.full_cache_key), lock.token)
+
+            return data
+
+        self.spy_on(cache_func)
+
+        result = cache_memoize(cache_key,
+                               cache_func,
+                               lock=lock)
+        self.assertEqual(result, data)
+        self.assertSpyCallCount(cache_func, 1)
+
+        self.assertSpyCallCount(lock.acquire, 1)
+        self.assertSpyCallCount(lock.release, 1)
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_with_lock_and_in_cache_after_lock(self) -> None:
+        """Testing cache_memoize with lock and data in cache after lock"""
+        lock = CacheLock(timeout_secs=0.1,
+                         retry_secs=0.05)
+
+        def _lock_acquire(_self, *args, **kwargs) -> bool:
+            cache.set(make_cache_key(cache_key), 'existing result')
+
+            return lock.acquire.call_original(*args, **kwargs)
+
+        self.spy_on(lock.acquire, call_fake=_lock_acquire)
+        self.spy_on(lock.release)
+
+        cache_key = 'abc123'
+
+        def cache_func() -> str:
+            return 'new result'
+
+        self.spy_on(cache_func)
+
+        result = cache_memoize(cache_key,
+                               cache_func,
+                               lock=lock)
+        self.assertEqual(result, 'existing result')
+        self.assertSpyNotCalled(cache_func)
+
+        self.assertSpyCallCount(lock.acquire, 1)
+        self.assertSpyCallCount(lock.release, 1)
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_with_lock_and_wait_with_result(self) -> None:
+        """Testing cache_memoize with lock and waiting with result from cache
+        """
+        existing_lock = CacheLock(key='lock-key')
+        lock_key = existing_lock.full_cache_key
+
+        with existing_lock as locked:
+            self.assertTrue(locked)
+
+            new_lock = CacheLock(key='lock-key',
+                                 timeout_secs=0.1,
+                                 retry_secs=0.05)
+
+            def _lock_acquire(_self, *args, **kwargs) -> bool:
+                # Place a value in cache and release the lock so the new
+                # lock can be acquired.
+                cache.set(make_cache_key(cache_key), 'existing result')
+                existing_lock.release()
+
+                return new_lock.acquire.call_original(*args, **kwargs)
+
+            self.spy_on(new_lock.acquire, call_fake=_lock_acquire)
+            self.spy_on(new_lock.release)
+
+            cache_key = 'abc123'
+
+            def cache_func() -> str:
+                return 'new result'
+
+            self.spy_on(cache_func)
+
+            result = cache_memoize(cache_key,
+                                   cache_func,
+                                   lock=new_lock)
+            self.assertEqual(result, 'existing result')
+            self.assertSpyNotCalled(cache_func)
+
+            self.assertSpyCallCount(new_lock.acquire, 1)
+            self.assertSpyCallCount(new_lock.release, 1)
+
+            self.assertIsNone(cache.get(lock_key))
+
+        self.assertIsNone(cache.get(lock_key))
+
+    def test_with_lock_and_wait_with_timeout(self) -> None:
+        """Testing cache_memoize with lock and waiting with timeout"""
+        existing_lock = CacheLock(key='lock-key')
+        lock_key = existing_lock.full_cache_key
+
+        with existing_lock as locked:
+            self.assertTrue(locked)
+
+            new_lock = CacheLock(key='lock-key',
+                                 timeout_secs=0.1,
+                                 retry_secs=0.05)
+
+            self.spy_on(new_lock.acquire)
+            self.spy_on(new_lock.release)
+
+            cache_key = 'abc123'
+
+            def cache_func() -> str:
+                return 'new result'
+
+            self.spy_on(cache_func)
+
+            with self.assertLogs() as cm:
+                result = cache_memoize(cache_key,
+                                       cache_func,
+                                       lock=new_lock)
+
+            self.assertEqual(result, 'new result')
+            self.assertSpyCallCount(cache_func, 1)
+
+            self.assertSpyCallCount(new_lock.acquire, 1)
+            self.assertSpyNotCalled(new_lock.release)
+            self.assertSpyLastRaisedMessage(
+                new_lock.acquire,
+                TimeoutError,
+                'Timed out waiting for lock: example.com:lock-key')
+
+            self.assertEqual(cache.get(lock_key), existing_lock.token)
+
+            self.assertEqual(cm.output, [
+                f'WARNING:djblets.protect.locks:Timed out waiting for cache '
+                f'lock "example.com:lock-key" (token "{new_lock.token}") '
+                f'for 0.1 seconds',
+
+                f'ERROR:djblets.cache.backend:Timeout waiting on distributed '
+                f'cache lock {new_lock!r}',
+            ])
+
+        self.assertIsNone(cache.get(lock_key))
+
+    def test_with_lock_and_wait_with_timeout_and_value_in_cache(self) -> None:
+        """Testing cache_memoize with lock and waiting with timeout and new
+        value in cache
+        """
+        existing_lock = CacheLock(key='lock-key')
+        lock_key = existing_lock.full_cache_key
+
+        with existing_lock as locked:
+            self.assertTrue(locked)
+
+            new_lock = CacheLock(key='lock-key',
+                                 timeout_secs=0.1,
+                                 retry_secs=0.05)
+
+            def _lock_acquire(_self, *args, **kwargs) -> bool:
+                cache.set(make_cache_key(cache_key), 'existing result')
+
+                return new_lock.acquire.call_original(*args, **kwargs)
+
+            self.spy_on(new_lock.acquire, call_fake=_lock_acquire)
+            self.spy_on(new_lock.release)
+
+            cache_key = 'abc123'
+
+            def cache_func() -> str:
+                return 'new result'
+
+            self.spy_on(cache_func)
+
+            result = cache_memoize(cache_key,
+                                   cache_func,
+                                   lock=new_lock)
+            self.assertEqual(result, 'existing result')
+            self.assertSpyNotCalled(cache_func)
+
+            self.assertSpyCallCount(new_lock.acquire, 1)
+            self.assertSpyNotCalled(new_lock.release)
+            self.assertSpyLastRaisedMessage(
+                new_lock.acquire,
+                TimeoutError,
+                'Timed out waiting for lock: example.com:lock-key')
+
+            self.assertEqual(cache.get(lock_key), existing_lock.token)
+
+        self.assertIsNone(cache.get(lock_key))
+
+    def test_with_lock_and_wait_with_prev_lock_expired(self) -> None:
+        """Testing cache_memoize with lock and waiting with previous lock
+        expired
+        """
+        existing_lock = CacheLock(key='lock-key',
+                                  lock_expiration_secs=0.01)
+        lock_key = existing_lock.full_cache_key
+
+        time.sleep(0.015)
+
+        with existing_lock as locked:
+            self.assertTrue(locked)
+
+            new_lock = CacheLock(key='lock-key',
+                                 timeout_secs=0.1,
+                                 retry_secs=0.05)
+
+            self.spy_on(new_lock.acquire)
+            self.spy_on(new_lock.release)
+
+            cache_key = 'abc123'
+
+            def cache_func() -> str:
+                return 'new result'
+
+            self.spy_on(cache_func)
+
+            with self.assertNoLogs():
+                result = cache_memoize(cache_key,
+                                       cache_func,
+                                       lock=new_lock)
+
+            self.assertEqual(result, 'new result')
+            self.assertSpyCallCount(cache_func, 1)
+
+            self.assertSpyCallCount(new_lock.acquire, 1)
+            self.assertSpyCallCount(new_lock.release, 1)
+
+            self.assertIsNone(cache.get(lock_key))
+
+        self.assertIsNone(cache.get(lock_key))
+
 
 class CacheMemoizeIterTests(BaseCacheTestCase):
     """Unit tests for cache_memoize_iter."""
@@ -1271,6 +1563,332 @@ class CacheMemoizeIterTests(BaseCacheTestCase):
         self.assertEqual(len(results), 2)
         self.assertEqual(results, [data1, data2])
 
+    def test_with_lock_and_in_cache(self) -> None:
+        """Testing cache_memoize_iter with lock and data in cache"""
+        lock = CacheLock(timeout_secs=0.1,
+                         retry_secs=0.05)
+
+        self.spy_on(lock.acquire)
+        self.spy_on(lock.release)
+
+        cache_key = 'abc123'
+        data_yielded = []
+
+        data1 = self.build_test_chunk_data(num_chunks=2)[0]
+        data2 = self.build_test_chunk_data(num_chunks=2)[0]
+
+        def cache_func() -> Iterator[str]:
+            yield data1
+            data_yielded.append('data1')
+
+            yield data2
+            data_yielded.append('data2')
+
+        self.spy_on(cache_func)
+
+        cache.set(make_cache_key(cache_key), 'existing result')
+
+        results = list(cache_memoize_iter(cache_key,
+                                          cache_func,
+                                          compress_large_data=False,
+                                          lock=lock))
+        self.assertEqual(data_yielded, ['data1', 'data2'])
+        self.assertEqual(len(results), 2)
+        self.assertEqual(results, [data1, data2])
+
+        self.assertSpyCallCount(cache_func, 1)
+
+        self.assertSpyNotCalled(lock.acquire)
+        self.assertSpyNotCalled(lock.release)
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_with_lock_and_cache_miss(self) -> None:
+        """Testing cache_memoize_iter with lock and cache miss"""
+        lock = CacheLock(timeout_secs=0.1,
+                         retry_secs=0.05)
+
+        self.spy_on(lock.acquire)
+        self.spy_on(lock.release)
+
+        cache_key = 'abc123'
+        data_yielded = []
+
+        data1 = self.build_test_chunk_data(num_chunks=2)[0]
+        data2 = self.build_test_chunk_data(num_chunks=2)[0]
+
+        def cache_func() -> Iterator[str]:
+            self.assertEqual(cache.get(lock.full_cache_key), lock.token)
+
+            yield data1
+            data_yielded.append('data1')
+
+            yield data2
+            data_yielded.append('data2')
+
+        self.spy_on(cache_func)
+
+        results = list(cache_memoize_iter(cache_key,
+                                          cache_func,
+                                          compress_large_data=False,
+                                          lock=lock))
+        self.assertEqual(data_yielded, ['data1', 'data2'])
+        self.assertEqual(len(results), 2)
+        self.assertEqual(results, [data1, data2])
+
+        self.assertSpyCallCount(cache_func, 1)
+
+        self.assertSpyCallCount(lock.acquire, 1)
+        self.assertSpyCallCount(lock.release, 1)
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_with_lock_and_in_cache_after_lock(self) -> None:
+        """Testing cache_memoize_iter with lock and data in cache after lock
+        """
+        existing1 = self.build_test_chunk_data(data_char='z',
+                                               num_chunks=2)[0]
+        existing2 = self.build_test_chunk_data(data_char='z',
+                                               num_chunks=2)[0]
+
+        lock = CacheLock(timeout_secs=0.1,
+                         retry_secs=0.05)
+
+        def _lock_acquire(_self, *args, **kwargs) -> bool:
+            list(cache_memoize_iter(cache_key,
+                                    lambda: [existing1, existing2],
+                                    compress_large_data=False))
+
+            return lock.acquire.call_original(*args, **kwargs)
+
+        self.spy_on(lock.acquire, call_fake=_lock_acquire)
+        self.spy_on(lock.release)
+
+        cache_key = 'abc123'
+        data_yielded = []
+
+        data1 = self.build_test_chunk_data(num_chunks=2)[0]
+        data2 = self.build_test_chunk_data(num_chunks=2)[0]
+
+        def cache_func() -> Iterator[str]:
+            yield data1
+            data_yielded.append('data1')
+
+            yield data2
+            data_yielded.append('data2')
+
+        self.spy_on(cache_func)
+
+        results = list(cache_memoize_iter(cache_key,
+                                          cache_func,
+                                          compress_large_data=False,
+                                          lock=lock))
+        self.assertEqual(data_yielded, [])
+        self.assertEqual(len(results), 2)
+        self.assertEqual(results, [existing1, existing2])
+
+        self.assertSpyNotCalled(cache_func)
+
+        self.assertSpyCallCount(lock.acquire, 1)
+        self.assertSpyCallCount(lock.release, 1)
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_with_lock_and_wait_with_result(self) -> None:
+        """Testing cache_memoize_iter with lock and waiting with result
+        from cache
+        """
+        existing_lock = CacheLock(key='lock-key')
+        lock_key = existing_lock.full_cache_key
+
+        with existing_lock as locked:
+            self.assertTrue(locked)
+
+            existing1 = self.build_test_chunk_data(data_char='z',
+                                                   num_chunks=2)[0]
+            existing2 = self.build_test_chunk_data(data_char='z',
+                                                   num_chunks=2)[0]
+
+            new_lock = CacheLock(key='lock-key',
+                                 timeout_secs=0.1,
+                                 retry_secs=0.05)
+
+            def _lock_acquire(_self, *args, **kwargs) -> bool:
+                # Place a value in cache and release the lock so the new
+                # lock can be acquired.
+                list(cache_memoize_iter(cache_key,
+                                        lambda: [existing1, existing2],
+                                        compress_large_data=False))
+                existing_lock.release()
+
+                return new_lock.acquire.call_original(*args, **kwargs)
+
+            self.spy_on(new_lock.acquire, call_fake=_lock_acquire)
+            self.spy_on(new_lock.release)
+
+            cache_key = 'abc123'
+            data_yielded = []
+
+            data1 = self.build_test_chunk_data(num_chunks=2)[0]
+            data2 = self.build_test_chunk_data(num_chunks=2)[0]
+
+            def cache_func() -> Iterator[str]:
+                yield data1
+                data_yielded.append('data1')
+
+                yield data2
+                data_yielded.append('data2')
+
+            self.spy_on(cache_func)
+
+            results = list(cache_memoize_iter(cache_key,
+                                              cache_func,
+                                              compress_large_data=False,
+                                              lock=new_lock))
+            self.assertEqual(data_yielded, [])
+            self.assertEqual(len(results), 2)
+            self.assertEqual(results, [existing1, existing2])
+
+            self.assertSpyNotCalled(cache_func)
+
+            self.assertSpyCallCount(new_lock.acquire, 1)
+            self.assertSpyCallCount(new_lock.release, 1)
+
+            self.assertIsNone(cache.get(lock_key))
+
+        self.assertIsNone(cache.get(lock_key))
+
+    def test_with_lock_and_wait_with_timeout(self) -> None:
+        """Testing cache_memoize_iter with lock and waiting with timeout"""
+        existing_lock = CacheLock(key='lock-key')
+        lock_key = existing_lock.full_cache_key
+
+        with existing_lock as locked:
+            self.assertTrue(locked)
+
+            new_lock = CacheLock(key='lock-key',
+                                 timeout_secs=0.1,
+                                 retry_secs=0.05)
+
+            self.spy_on(new_lock.acquire)
+            self.spy_on(new_lock.release)
+
+            cache_key = 'abc123'
+            data_yielded = []
+
+            data1 = self.build_test_chunk_data(num_chunks=2)[0]
+            data2 = self.build_test_chunk_data(num_chunks=2)[0]
+
+            def cache_func() -> Iterator[str]:
+                yield data1
+                data_yielded.append('data1')
+
+                yield data2
+                data_yielded.append('data2')
+
+            self.spy_on(cache_func)
+
+            with self.assertLogs() as cm:
+                results = list(cache_memoize_iter(cache_key,
+                                                  cache_func,
+                                                  compress_large_data=False,
+                                                  lock=new_lock))
+
+            self.assertEqual(data_yielded, ['data1', 'data2'])
+            self.assertEqual(len(results), 2)
+            self.assertEqual(results, [data1, data2])
+
+            self.assertSpyCallCount(cache_func, 1)
+
+            self.assertSpyCallCount(new_lock.acquire, 1)
+            self.assertSpyNotCalled(new_lock.release)
+            self.assertSpyLastRaisedMessage(
+                new_lock.acquire,
+                TimeoutError,
+                'Timed out waiting for lock: example.com:lock-key')
+
+            self.assertEqual(cm.output, [
+                f'WARNING:djblets.protect.locks:Timed out waiting for cache '
+                f'lock "example.com:lock-key" (token "{new_lock.token}") '
+                f'for 0.1 seconds',
+
+                f'ERROR:djblets.cache.backend:Timeout waiting on distributed '
+                f'cache lock {new_lock!r}',
+            ])
+
+            self.assertEqual(cache.get(lock_key), existing_lock.token)
+
+        self.assertIsNone(cache.get(lock_key))
+
+    def test_with_lock_and_wait_with_timeout_and_value_in_cache(self) -> None:
+        """Testing cache_memoize_iter with lock and waiting with timeout and
+        new value in cache
+        """
+        existing_lock = CacheLock(key='lock-key')
+        lock_key = existing_lock.full_cache_key
+
+        with existing_lock as locked:
+            self.assertTrue(locked)
+
+            existing1 = self.build_test_chunk_data(data_char='z',
+                                                   num_chunks=2)[0]
+            existing2 = self.build_test_chunk_data(data_char='z',
+                                                   num_chunks=2)[0]
+
+            new_lock = CacheLock(key='lock-key',
+                                 timeout_secs=0.1,
+                                 retry_secs=0.05)
+
+            def _lock_acquire(_self, *args, **kwargs) -> bool:
+                # Place a value in cache and release the lock so the new
+                # lock can be acquired.
+                list(cache_memoize_iter(cache_key,
+                                        lambda: [existing1, existing2],
+                                        compress_large_data=False))
+
+                return new_lock.acquire.call_original(*args, **kwargs)
+
+            self.spy_on(new_lock.acquire, call_fake=_lock_acquire)
+            self.spy_on(new_lock.release)
+
+            cache_key = 'abc123'
+            data_yielded = []
+
+            def cache_func() -> Iterator[str]:
+                yield self.build_test_chunk_data(num_chunks=2)[0]
+                data_yielded.append('data1')
+
+            self.spy_on(cache_func)
+
+            with self.assertLogs() as cm:
+                results = list(cache_memoize_iter(cache_key,
+                                                  cache_func,
+                                                  compress_large_data=False,
+                                                  lock=new_lock))
+
+            self.assertEqual(data_yielded, [])
+            self.assertEqual(len(results), 2)
+            self.assertEqual(results, [existing1, existing2])
+
+            self.assertSpyNotCalled(cache_func)
+
+            self.assertSpyCallCount(new_lock.acquire, 1)
+            self.assertSpyNotCalled(new_lock.release)
+            self.assertSpyLastRaisedMessage(
+                new_lock.acquire,
+                TimeoutError,
+                'Timed out waiting for lock: example.com:lock-key')
+
+            self.assertEqual(cm.output, [
+                f'WARNING:djblets.protect.locks:Timed out waiting for cache '
+                f'lock "example.com:lock-key" (token "{new_lock.token}") '
+                f'for 0.1 seconds',
+
+                f'ERROR:djblets.cache.backend:Timeout waiting on distributed '
+                f'cache lock {new_lock!r}',
+            ])
+
+            self.assertEqual(cache.get(lock_key), existing_lock.token)
+
+        self.assertIsNone(cache.get(lock_key))
+
 
 class MakeCacheKeyTests(BaseCacheTestCase):
     """Unit tests for make_cache_key."""
diff --git a/djblets/protect/__init__.py b/djblets/protect/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..4779189dc4d4d84ae05f063e6cfa4abd765b7e1f
--- /dev/null
+++ b/djblets/protect/__init__.py
@@ -0,0 +1,5 @@
+"""Service protection.
+
+Version Added:
+    5.3
+"""
diff --git a/djblets/protect/locks.py b/djblets/protect/locks.py
new file mode 100644
index 0000000000000000000000000000000000000000..993119eb9e2eed7b2c2d50fa60e643f7a44158f0
--- /dev/null
+++ b/djblets/protect/locks.py
@@ -0,0 +1,440 @@
+"""Locking implementations.
+
+Version Added:
+    5.3
+"""
+
+from __future__ import annotations
+
+import logging
+import random
+import time
+from typing import TYPE_CHECKING
+from uuid import uuid4
+
+from django.core.cache import cache
+
+from djblets.cache.backend import make_cache_key
+
+if TYPE_CHECKING:
+    from collections.abc import Sequence
+    from typing import Self
+
+
+logger = logging.getLogger(__name__)
+
+
+class CacheLock:
+    """A distributed lock backed by a cache.
+
+    This is used to acquire a lock associated with a key, either blocking or
+    immediately returning if a lock can't immediately be acquired.
+
+    Locks can be used by code that needs to run only once across multiple
+    processes or servers. They can be used directly or passed to
+    :py:func:`cache_memoize` or :py:func:`cache_memoize_iter` to wrap any
+    cache updates in a lock.
+
+    Locks have an expiration time in order to prevent deadlocks. Setting
+    this higher can result in longer wait periods if the lock fails to be
+    released. Setting it lower can result in locks automatically releasing
+    prematurely. It defaults to an expiration of 30 seconds.
+
+    The interface is kept compatible with :py:class:`threading.Lock`, but
+    with additional capabilities for updating expiration times and setting
+    default blocking and timeout behavior during construction.
+
+    Note:
+        This lock is subject to the limitations of the cache system. The
+        lock may be purged from cache without notice, and it's possible
+        for one client to overwrite another's lock depending on timing
+        issues or scaling setup.
+
+        For these reasons, this lock should be considered an imperfect,
+        lossy lock. Callers can use it to help prevent multiple operations
+        from occurring at once, but they should be tolerant of that
+        possibility and designed accordingly.
+
+        CacheLocks are also not thread-safe. Do not reuse the same lock
+        across threads.
+
+    Version Added:
+        5.3
+    """
+
+    ######################
+    # Instance variables #
+    ######################
+
+    #: Whether a lock is currently acquired by this instance.
+    acquired: bool
+
+    #: Whether this lock will block for a period of time to be acquired.
+    blocking: bool
+
+    #: The full cache key used for the lock.
+    #:
+    #: This may be set lazily after the lock is constructed, but must be
+    #: set prior to acquiring a lock.
+    full_cache_key: str
+
+    #: The max amount of time a lock can be claimed.
+    lock_expiration_secs: int
+
+    #: The time to sleep between checking for a lock to be released.
+    retry_secs: float
+
+    #: The max time to wait for a lock to be released.
+    #:
+    #: If -1, the lock will wait indefinitely.
+    timeout_secs: float
+
+    #: The cached token value associated with this lock instance.
+    token: str
+
+    #: The expected timestamp for lock expiration.
+    _lock_expires_time: float
+
+    def __init__(
+        self,
+        key: str | Sequence[str] = '',
+        *,
+        blocking: bool = True,
+        lock_expiration_secs: int = 30,
+        retry_secs: float = 0.25,
+        timeout_secs: float = -1,
+    ) -> None:
+        """Initialize the lock.
+
+        Args:
+            key (str or list of str):
+                The key to use in the cache.
+
+                This may be a sequence of strings, which will take care of
+                serializing each component to help avoid key injection
+                attacks.
+
+                This will be passed to :py:func:`make_cache_key` to construct
+                a full cache key.
+
+            blocking (bool, optional):
+                Whether this lock will block for a period of time to be
+                acquired.
+
+            lock_expiration_secs (int, optional):
+                The max amount of time a lock can be claimed.
+
+                After this period, the lock will be automatically released.
+
+            retry_secs (float, optional):
+                The time to sleep between checking for a lock to be released.
+
+                The caller should set this to be less than the timeout, but
+                note that timeouts can be extended or reduced by the lock
+                owner.
+
+                An additional jitter between 0-25% of the retry time will be
+                added to reduce stampede issues.
+
+            timeout_secs (float, optional):
+                The max time to wait for a lock to be released.
+
+                If -1, the lock will block indefinitely.
+
+        Raises:
+            ValueError:
+                A provided argument had an invalid value.
+        """
+        if timeout_secs != -1 and timeout_secs <= 0:
+            raise ValueError('timeout_secs must be -1 or a positive value.')
+
+        if retry_secs <= 0:
+            raise ValueError('retry_secs must be a positive value.')
+
+        self.blocking = blocking
+        self.retry_secs = retry_secs
+        self.timeout_secs = timeout_secs
+        self.lock_expiration_secs = lock_expiration_secs
+
+        self.acquired = False
+        self.token = ''
+        self._lock_expires_time = 0
+
+        if key:
+            self.full_cache_key = make_cache_key(key)
+        else:
+            self.full_cache_key = ''
+
+    def __del__(self) -> None:
+        """Handle destruction of the cache lock.
+
+        If this lock was garbage collected while released, an exception will
+        be logged indicating an implementation problem with the caller's
+        use of the lock.
+        """
+        if self.acquired:
+            logger.error('Cache lock "%s" was garbage collected without '
+                         'being released! The caller must be careful to '
+                         'keep this lock around until it is released.',
+                         self.full_cache_key)
+
+    def locked(self) -> bool:
+        """Return whether the lock is acquired.
+
+        This wraps :py:attr:`acquired`, and provides API compatibility with
+        :py:class:`threading.Lock` and other Python lock objects.
+
+        Returns:
+            bool:
+            ``True`` if the lock has been acquired. ``False`` if it has not.
+        """
+        return self.acquired
+
+    def acquire(
+        self,
+        blocking: (bool | None) = None,
+        timeout: (float | None) = None,
+    ) -> bool:
+        """Acquire a lock.
+
+        If there's already an existing lock in cache, this will either
+        return immediately or wait for the lock to be released, depending
+        on :py:attr:`blocking`.
+
+        If waiting, this will wait for a total time specified by
+        :py:attr:`timeout_secs`, checking every :py:attr:`retry_secs`.
+
+        Waiting uses the monotonic clock, so it's not affected by changes
+        to the system clock.
+
+        To ensure API compatibility with :py:class:`threading.Lock` and
+        other Python lock objects, this method can also take arguments that
+        override the values provided during construction.
+
+        Returns:
+            bool:
+            ``True`` if the lock could be acquired (even after waiting).
+            ``False`` if it could not (only if :py:attr:`blocking` is
+            ``False``).
+
+        Raises:
+            RuntimeError:
+                An attempt was made to acquire a lock that was already
+                acquired.
+
+            TimeoutError:
+                The lock could not be acquired due to the wait time expiring.
+
+            ValueError:
+                A cache key was never set for the lock.
+        """
+        full_cache_key = self.full_cache_key
+
+        if not full_cache_key:
+            raise ValueError('A cache key for the lock must be provided.')
+
+        if self.acquired:
+            raise RuntimeError(
+                'acquire() is not reentrant for %s instance %s'
+                % (type(self).__name__, id(self))
+            )
+
+        token = str(uuid4())
+        self.token = token
+
+        if blocking is None:
+            blocking = self.blocking
+
+        if timeout is None:
+            timeout = self.timeout_secs
+
+        retry_secs = self.retry_secs
+        lock_expiration_secs = self.lock_expiration_secs
+
+        start = time.monotonic()
+
+        while True:
+            if cache.add(full_cache_key, token, lock_expiration_secs):
+                lock_expires_time = time.monotonic() + lock_expiration_secs
+
+                logger.debug('Acquired cache lock "%s" with token "%s" for '
+                             '%s seconds (monotonic expiration = %s, '
+                             'estimated timestamp = %s)',
+                             full_cache_key, token, lock_expiration_secs,
+                             lock_expires_time,
+                             time.time() + lock_expiration_secs)
+
+                self.acquired = True
+                self._lock_expires_time = lock_expires_time
+
+                return True
+
+            if not blocking:
+                # The caller doesn't want to block waiting for a request,
+                # so return immediately.
+                return False
+
+            # Check if we've timed out waiting for a lock.
+            if (timeout != -1 and
+                time.monotonic() - start >= timeout):
+                logger.warning('Timed out waiting for cache lock "%s" '
+                               '(token "%s") for %s seconds',
+                               self.full_cache_key, token, timeout)
+
+                raise TimeoutError(
+                    f'Timed out waiting for lock: {full_cache_key}'
+                )
+
+            # Wait before retrying, and add random jitter to the retry time
+            # to avoid overloading the cache server with concurrent checks
+            # during a stampede.
+            time.sleep(retry_secs + random.uniform(0, retry_secs * 0.25))
+
+    def update_expiration(
+        self,
+        lock_expiration_secs: (int | None) = None,
+    ) -> None:
+        """Update the expiration of the lock.
+
+        This can be used to keep the lock opened a bit longer, in case
+        there's work in progress, or to shorten the lock.
+
+        Args:
+            lock_expiration_secs (int, optional):
+                A specific number of seconds to set for the new expiration.
+
+                If not provided, the original expiration time in seconds
+                will be used.
+
+        Raises:
+            AssertionError:
+                The caller called this without first acquiring a lock.
+        """
+        if not self.acquired:
+            raise AssertionError(
+                'Cannot extend a lock that has not been acquired.'
+            )
+
+        if lock_expiration_secs is None:
+            lock_expiration_secs = self.lock_expiration_secs
+
+        key = self.full_cache_key
+        token = self.token
+
+        if cache.touch(key, lock_expiration_secs):
+            lock_expires_time = time.monotonic() + lock_expiration_secs
+
+            logger.debug('Extended cache lock "%s" (token "%s") to '
+                         '%s seconds (monotonic expiration = %s, '
+                         'estimated timestamp = %s)',
+                         key, token, lock_expiration_secs,
+                         lock_expires_time,
+                         time.time() + lock_expiration_secs)
+
+            self._lock_expires_time = lock_expires_time
+        else:
+            # The key fell out of cache. Attempt to re-add it.
+            logger.warning('Cache lock "%s" (token "%s") fell out of cache. '
+                           'The lock is no longer valid.',
+                           key, token)
+
+    def release(self) -> None:
+        """Release a lock.
+
+        If the lock is still valid, it will be removed from the cache,
+        allowing something else to acquire the lock.
+
+        If a lock is not acquired, this will raise an exception.
+
+        Raises:
+            RuntimeError:
+                This was called on a lock that was not acquired.
+        """
+        if not self.acquired:
+            raise RuntimeError('Cannot release a lock that was not acquired.')
+
+        self.acquired = False
+
+        key = self.full_cache_key
+        token = self.token
+        lock_expiration_secs = self.lock_expiration_secs
+
+        # First check if we're past the expiration window of the lock.
+        #
+        # If we're past the expiration window (or close enough, given clock
+        # inconsistencies on servers), then let the lock expire and don't
+        # try to delete it explicitly.
+        expired = time.monotonic() > self._lock_expires_time
+
+        if not expired:
+            # Attempt to bump the expiration for the key. If it timed out and
+            # we lost the key, then the worst that happens is the expiration
+            # for the new owner's key is bumped up. It should help avoid
+            # deleting that owner's key, in this case.
+            expired = not cache.touch(key, lock_expiration_secs)
+
+            if not expired and cache.get(key) == token:
+                # The lock is still acquired. Delete it.
+                cache.delete(key)
+
+        if expired:
+            logger.debug('Released cache lock "%s" (token "%s"), which '
+                         'already expired from cache',
+                         key, token)
+        else:
+            logger.debug('Released cache lock "%s" (token "%s")',
+                         key, token)
+
+    def __enter__(self) -> Self:
+        """Enter the context manager.
+
+        This will acquire the lock, if possible, and pass the result as
+        the context. Once the context manager is exited, the lock will be
+        released.
+
+        This is equivalent to calling :py:meth:`acquire`.
+
+        Context:
+            CacheLock:
+            The cache lock instance.
+
+        Raises:
+            TimeoutError:
+                The lock could not be acquired due to the wait time expiring.
+
+            ValueError:
+                A cache key was never set for the lock.
+        """
+        self.acquire()
+
+        return self
+
+    def __exit__(self, *args, **kwargs) -> None:
+        """Exit the context manager.
+
+        This will release the lock, if one was acquired.
+
+        Args:
+            *args (tuple, unused):
+                Unused positional arguments.
+
+            **kwargs (dict, unused):
+                Unused keyword arguments.
+        """
+        if self.acquired:
+            self.release()
+
+    def __repr__(self) -> str:
+        """Return a string representation of the cache lock object.
+
+        Returns:
+            str:
+            The string representation.
+        """
+        cls_name = type(self).__name__
+
+        return (
+            f'<{cls_name}(blocking={self.blocking!r},'
+            f' full_cache_key={self.full_cache_key!r},'
+            f' token={self.token!r})>'
+        )
diff --git a/djblets/protect/tests/test_cache_lock.py b/djblets/protect/tests/test_cache_lock.py
new file mode 100644
index 0000000000000000000000000000000000000000..4e50dfbb177338854ecad9a4fa0f03ff9052667b
--- /dev/null
+++ b/djblets/protect/tests/test_cache_lock.py
@@ -0,0 +1,370 @@
+"""Unit tests for djblets.protect.locks.CacheLock.
+
+Version Added:
+    5.3
+"""
+
+from __future__ import annotations
+
+import logging
+
+import kgb
+from django.core.cache import cache
+
+from djblets.protect.locks import CacheLock
+from djblets.testing.testcases import TestCase
+
+
+class CacheLockTests(kgb.SpyAgency, TestCase):
+    """Unit tests for CacheLock.
+
+    Version Added:
+        5.3
+    """
+
+    def setUp(self) -> None:
+        super().setUp()
+
+        cache.clear()
+
+    def tearDown(self) -> None:
+        super().tearDown()
+
+        cache.clear()
+
+    def test_acquire_with_new_lock(self) -> None:
+        """Testing CacheLock.acquire with new lock"""
+        lock = CacheLock(key='1D5BC5F3')
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            self.assertTrue(lock.acquire())
+
+        self.addCleanup(lock.release)
+
+        self.assertEqual(len(cm.output), 1)
+        self.assertRegex(
+            cm.output[0],
+            r'DEBUG:djblets.protect.locks:Acquired cache lock '
+            r'"example.com:1D5BC5F3" with token "[\a-z0-9-]+" for 30 '
+            r'seconds \(monotonic expiration = \d+\.\d+, '
+            r'estimated timestamp = \d+\.\d+\)')
+
+        self.assertTrue(lock.acquired)
+        self.assertEqual(cache.get(lock.full_cache_key), lock.token)
+
+    def test_acquire_with_existing_lock_and_not_blocking(self) -> None:
+        """Testing CacheLock.acquire with existing lock and blocking=False"""
+        existing_lock = CacheLock(key='ACC54266')
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            self.assertTrue(existing_lock.acquire())
+
+        self.addCleanup(existing_lock.release)
+
+        self.assertEqual(len(cm.output), 1)
+        self.assertRegex(
+            cm.output[0],
+            r'DEBUG:djblets.protect.locks:Acquired cache lock '
+            r'"example.com:ACC54266" with token "[\a-z0-9-]+" for 30 '
+            r'seconds \(monotonic expiration = \d+\.\d+, '
+            r'estimated timestamp = \d+\.\d+\)')
+
+        new_lock = CacheLock(key='ACC54266',
+                             blocking=False)
+        self.assertFalse(new_lock.blocking)
+
+        with self.assertNoLogs(level=logging.DEBUG):
+            self.assertFalse(new_lock.acquire())
+
+        self.assertFalse(new_lock.acquired)
+        self.assertEqual(cache.get(new_lock.full_cache_key),
+                         existing_lock.token)
+
+    def test_acquire_with_existing_lock_and_blocking_and_released(
+        self,
+    ) -> None:
+        """Testing CacheLock.acquire with existing lock and blocking=True and
+        lock released
+        """
+        existing_lock = CacheLock(key='72C7A1A3',
+                                  lock_expiration_secs=1)
+        new_lock = CacheLock(key='72C7A1A3')
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            self.assertTrue(existing_lock.acquire())
+
+        self.addCleanup(existing_lock.release)
+
+        self.assertEqual(len(cm.output), 1)
+        self.assertRegex(
+            cm.output[0],
+            r'DEBUG:djblets.protect.locks:Acquired cache lock '
+            r'"example.com:72C7A1A3" with token "[\a-z0-9-]+" for 1 '
+            r'seconds \(monotonic expiration = \d+\.\d+, '
+            r'estimated timestamp = \d+\.\d+\)')
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            self.assertTrue(new_lock.acquire())
+
+        self.addCleanup(new_lock.release)
+
+        self.assertEqual(len(cm.output), 1)
+        self.assertRegex(
+            cm.output[0],
+            r'DEBUG:djblets.protect.locks:Acquired cache lock '
+            r'"example.com:72C7A1A3" with token "[\a-z0-9-]+" for 30 '
+            r'seconds \(monotonic expiration = \d+\.\d+, '
+            r'estimated timestamp = \d+\.\d+\)')
+
+        self.assertTrue(new_lock.acquired)
+        self.assertEqual(cache.get(new_lock.full_cache_key),
+                         new_lock.token)
+
+    def test_acquire_with_existing_lock_and_blocking_and_timeout(self) -> None:
+        """Testing CacheLock.acquire with existing lock and blocking=True and
+        lock released
+        """
+        existing_lock = CacheLock(key='44A816B0')
+        existing_lock.acquire()
+        self.addCleanup(existing_lock.release)
+
+        new_lock = CacheLock(key='44A816B0',
+                             timeout_secs=0.05)
+
+        message = 'Timed out waiting for lock: example.com:44A816B0'
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            with self.assertRaisesMessage(TimeoutError, message):
+                self.assertFalse(new_lock.acquire())
+
+        self.assertEqual(
+            cm.output,
+            [
+                f'WARNING:djblets.protect.locks:Timed out waiting for cache '
+                f'lock "example.com:44A816B0" (token "{new_lock.token}") '
+                f'for 0.05 seconds',
+            ])
+
+        self.assertFalse(new_lock.acquired)
+        self.assertEqual(cache.get(new_lock.full_cache_key),
+                         existing_lock.token)
+
+    def test_acquire_with_blocking_false(self) -> None:
+        """Testing CacheLock.acquire with blocking=False overriding
+        constructor
+        """
+        existing_lock = CacheLock(key='DF71C7BC')
+        existing_lock.acquire()
+        self.addCleanup(existing_lock.release)
+
+        new_lock = CacheLock(key='DF71C7BC',
+                             blocking=True,
+                             timeout_secs=10)
+
+        with self.assertNoLogs(level=logging.DEBUG):
+            self.assertFalse(new_lock.acquire(blocking=False))
+
+        self.assertFalse(new_lock.acquired)
+        self.assertEqual(cache.get(new_lock.full_cache_key),
+                         existing_lock.token)
+
+    def test_release_with_acquired(self) -> None:
+        """Testing CacheLock.release with lock acquired"""
+        lock = CacheLock(key='F0E07CB8')
+        lock.acquire()
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            lock.release()
+
+        self.assertEqual(
+            cm.output,
+            [
+                f'DEBUG:djblets.protect.locks:Released cache lock '
+                f'"example.com:F0E07CB8" (token "{lock.token}")'
+            ])
+
+        self.assertFalse(lock.acquired)
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_release_with_not_acquired(self) -> None:
+        """Testing CacheLock.release with lock not acquired"""
+        lock = CacheLock(key='CC8E30F5')
+
+        message = 'Cannot release a lock that was not acquired.'
+
+        with self.assertNoLogs(level=logging.DEBUG):
+            with self.assertRaisesMessage(RuntimeError, message):
+                lock.release()
+
+        self.assertFalse(lock.acquired)
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_release_with_lock_token_replaced(self) -> None:
+        """Testing CacheLock.release with lock token replaced"""
+        lock = CacheLock(key='A49A3C30')
+        lock.acquire()
+
+        cache.set(lock.full_cache_key, 'XXX')
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            lock.release()
+
+        self.assertEqual(
+            cm.output,
+            [
+                f'DEBUG:djblets.protect.locks:Released cache lock '
+                f'"example.com:A49A3C30" (token "{lock.token}")'
+            ])
+
+        self.assertFalse(lock.acquired)
+        self.assertEqual(cache.get(lock.full_cache_key), 'XXX')
+
+    def test_release_with_held_beyond_expiration(self) -> None:
+        """Testing CacheLock.release with lock held beyond epiration"""
+        lock = CacheLock(key='4BB55758',
+                         lock_expiration_secs=1)
+        lock.acquire()
+        lock._lock_expires_time -= 100
+
+        self.spy_on(cache.touch)
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            lock.release()
+
+        self.assertEqual(
+            cm.output,
+            [
+                f'DEBUG:djblets.protect.locks:Released cache lock '
+                f'"example.com:4BB55758" (token "{lock.token}"), which '
+                f'already expired from cache',
+            ])
+
+        self.assertFalse(lock.acquired)
+        self.assertSpyNotCalled(cache.touch)
+
+        # We're simulating that a new key may be in cache at the end of
+        # this.
+        self.assertIsNotNone(cache.get(lock.full_cache_key))
+
+    def test_release_with_lost_key(self) -> None:
+        """Testing CacheLock.release with lock key not in cache"""
+        lock = CacheLock(key='84E054E5')
+        lock.acquire()
+
+        self.spy_on(cache.touch, op=kgb.SpyOpReturn(False))
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            lock.release()
+
+        self.assertEqual(
+            cm.output,
+            [
+                f'DEBUG:djblets.protect.locks:Released cache lock '
+                f'"example.com:84E054E5" (token '
+                f'"{lock.token}"), which already expired from cache',
+            ])
+
+        self.assertFalse(lock.acquired)
+
+        self.assertSpyCalledWith(
+            cache.touch,
+            'example.com:84E054E5',
+            timeout=30)
+
+        # We're simulating that a new key may be in cache at the end of
+        # this.
+        self.assertIsNotNone(cache.get(lock.full_cache_key))
+
+    def test_update_expiration(self) -> None:
+        """Testing CacheLock.update_expiration"""
+        self.spy_on(cache.touch)
+
+        lock = CacheLock(key='E977344F')
+        lock.acquire()
+        self.addCleanup(lock.release)
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            lock.update_expiration()
+
+        self.assertEqual(len(cm.output), 1)
+        self.assertRegex(
+            cm.output[0],
+            r'DEBUG:djblets.protect.locks:Extended cache lock '
+            r'"example.com:E977344F" \(token "[a-f0-9-]+"\) to 30 '
+            r'seconds \(monotonic expiration = \d+\.\d+, estimated '
+            r'timestamp = \d+\.\d+\)')
+
+        self.assertSpyCalledWith(
+            cache.touch,
+            'example.com:E977344F',
+            timeout=30)
+
+    def test_update_expiration_with_custom_time(self) -> None:
+        """Testing CacheLock.update_expiration with custom expiration time"""
+        self.spy_on(cache.touch)
+
+        lock = CacheLock(key='587961F7')
+        lock.acquire()
+        self.addCleanup(lock.release)
+
+        with self.assertLogs(level=logging.DEBUG) as cm:
+            lock.update_expiration(100)
+
+        self.assertEqual(len(cm.output), 1)
+        self.assertRegex(
+            cm.output[0],
+            r'DEBUG:djblets.protect.locks:Extended cache lock '
+            r'"example.com:587961F7" \(token "[a-f0-9-]+"\) to 100 '
+            r'seconds \(monotonic expiration = \d+\.\d+, estimated '
+            r'timestamp = \d+\.\d+\)')
+
+        self.assertSpyCalledWith(
+            cache.touch,
+            'example.com:587961F7',
+            timeout=100)
+
+    def test_update_expiration_with_lost_lock(self) -> None:
+        """Testing CacheLock.update_expiration with lost lock"""
+        self.spy_on(cache.touch, op=kgb.SpyOpReturn(False))
+
+        lock = CacheLock(key='47714EF6')
+        lock.acquire()
+        self.addCleanup(lock.release)
+
+        with self.assertLogs() as cm:
+            lock.update_expiration()
+
+        self.assertEqual(
+            cm.output,
+            [
+                f'WARNING:djblets.protect.locks:Cache lock '
+                f'"example.com:47714EF6" (token "{lock.token}") fell '
+                f'out of cache. The lock is no longer valid.',
+            ])
+
+        self.assertSpyCalledWith(
+            cache.touch,
+            'example.com:47714EF6',
+            timeout=30)
+
+    def test_context_manager(self) -> None:
+        """Testing CacheLock as context manager"""
+        with CacheLock(key='6E58D95B') as lock:
+            self.assertTrue(lock.acquired)
+            self.assertEqual(cache.get(lock.full_cache_key), lock.token)
+
+        self.assertIsNone(cache.get(lock.full_cache_key))
+
+    def test_del_while_acquired(self) -> None:
+        """Testing CacheLock destruction while lock acquired"""
+        lock = CacheLock(key='ADAEAFA1')
+        lock.acquire()
+
+        with self.assertLogs() as cm:
+            del lock
+
+        self.assertEqual(cm.output, [
+            'ERROR:djblets.protect.locks:Cache lock "example.com:ADAEAFA1" '
+            'was garbage collected without being released! The caller must '
+            'be careful to keep this lock around until it is released.'
+        ])
diff --git a/docs/djblets/coderef/index.rst b/docs/djblets/coderef/index.rst
index 005c4e2d496619158b2bd8030f1a80eb1730df23..0d8d49a57416c58198b0ce9a55d549cbc37ac088 100644
--- a/docs/djblets/coderef/index.rst
+++ b/docs/djblets/coderef/index.rst
@@ -360,6 +360,19 @@ Privacy Protection
    :ref:`privacy-guides`
 
 
+.. _coderef-djblets-protect:
+
+
+Service Protection
+==================
+
+.. autosummary::
+   :toctree: python
+
+   djblets.protect
+   djblets.protect.locks
+
+
 .. _coderef-djblets-recaptcha:
 
 reCAPTCHA
