diff --git a/djblets/secrets/crypto.py b/djblets/secrets/crypto.py
index 1437e9471d481bd6e1406cc16b185e556a9b8d88..7fe5a918b57e1bd2aa2bdd5002699c7431dbe809 100644
--- a/djblets/secrets/crypto.py
+++ b/djblets/secrets/crypto.py
@@ -15,7 +15,7 @@ from django.conf import settings
 AES_BLOCK_SIZE = algorithms.AES.block_size // 8
 
 
-def _create_cipher(iv, key):
+def _create_cipher(iv, *, key=None):
     """Create a cipher for use in symmetric encryption/decryption.
 
     This will use AES encryption in CFB mode (using an 8-bit shift register)
@@ -28,7 +28,7 @@ def _create_cipher(iv, key):
         iv (bytes):
             The random IV to use for the cipher.
 
-        key (bytes):
+        key (bytes, optional):
             The encryption key to use.
 
     Returns:
@@ -39,6 +39,9 @@ def _create_cipher(iv, key):
         ValueError:
             The encryption key was not in the right format.
     """
+    if key is None:
+        key = get_default_aes_encryption_key()
+
     if not isinstance(key, bytes):
         raise TypeError('The encryption key must be of type "bytes", not "%s"'
                         % type(key))
@@ -96,7 +99,7 @@ def aes_encrypt(data, *, key=None):
         data = data.encode('utf-8')
 
     iv = os.urandom(AES_BLOCK_SIZE)
-    cipher = _create_cipher(iv, key or get_default_aes_encryption_key())
+    cipher = _create_cipher(iv, key=key)
     encryptor = cipher.encryptor()
 
     return iv + encryptor.update(data) + encryptor.finalize()
@@ -132,6 +135,66 @@ def aes_encrypt_base64(data, *, key=None):
     return base64.b64encode(aes_encrypt(data, key=key)).decode('utf-8')
 
 
+def aes_encrypt_iter(data_iter, *, key=None):
+    """Encrypt and yield data iteratively.
+
+    This iterates through an iterable (a generator, list, or similar),
+    yielding AES-encrypted batches of data. This can be used when streaming
+    a source and yielding encrypted data to a file, HTTP response, across
+    multiple cache keys, etc.
+
+    The result can be decrypted either by joining together all the results
+    or by passing the results to :py:func:`aes_decrypt_iter`.
+
+    Args:
+        data_iter (iterable):
+            An iterator that yields byte strings or Unicode strings.
+
+        key (bytes, optional):
+            The optional custom encryption key to use. If not supplied, the
+            default encryption key (from
+            :py:func:`get_default_aes_encryption_key)` will be used.
+
+    Yields:
+        bytes:
+        An encrypted block of data.
+
+    Raises:
+        ValueError:
+            The encryption key was not in the right format.
+    """
+    iv = os.urandom(AES_BLOCK_SIZE)
+    cipher = _create_cipher(iv, key=key)
+    encryptor = cipher.encryptor()
+
+    # We want the very first value to contain the iv, and the very last to
+    # contain the finalizer. So we need to operate one ahead of the item from
+    # the stream. We'll iterate through, grab an item, then yield the previous
+    # one (prepending the iv if yielding the very first).
+    prev_item = None
+
+    for item in data_iter:
+        if isinstance(item, str):
+            item = item.encode('utf-8')
+
+        encrypted_item = encryptor.update(item)
+
+        if prev_item is not None:
+            yield prev_item
+
+            prev_item = encrypted_item
+        else:
+            prev_item = iv + encrypted_item
+
+    # We can now follow up with the finalizer.
+    if prev_item is not None:
+        yield prev_item + encryptor.finalize()
+    else:
+        # We had absolutely nothing to yield. Let's just yield an empty
+        # encrypted block.
+        yield iv + encryptor.finalize()
+
+
 def aes_decrypt(encrypted_data, *, key=None):
     """Decrypt AES-encrypted data.
 
@@ -169,7 +232,7 @@ def aes_decrypt(encrypted_data, *, key=None):
                         % (type(encrypted_data)))
 
     cipher = _create_cipher(encrypted_data[:AES_BLOCK_SIZE],
-                            key or get_default_aes_encryption_key())
+                            key=key)
     decryptor = cipher.decryptor()
 
     return (decryptor.update(encrypted_data[AES_BLOCK_SIZE:]) +
@@ -207,3 +270,72 @@ def aes_decrypt_base64(encrypted_data, *, key=None):
                     key=key)
         .decode('utf-8')
     )
+
+
+def aes_decrypt_iter(encrypted_iter, *, key=None):
+    """Decrypt and yield data iteratively.
+
+    This iterates through an iterable (a generator, list, or similar),
+    decrypting items and yielding the decrypted values. This can be used when
+    streaming an encrypted source and yielding the decrypted results to a file,
+    HTTP response, across multiple cache keys, etc.
+
+    Args:
+        encrypted_iter (iterable):
+            An iterator that yields AES-encrypted data as byte strings.
+
+        key (bytes, optional):
+            The optional custom encryption key to use. If not supplied, the
+            default encryption key (from
+            :py:func:`get_default_aes_encryption_key)` will be used.
+
+    Yields:
+        bytes:
+        A decrypted block of data.
+
+    Raises:
+        ValueError:
+            The encryption key was not in the right format or the encrypted
+            data was invalid.
+    """
+    # Ensure we're working with an actual iterator now, not just something
+    # iterable. We need to ensure we're not starting iteration over when we
+    # loop through a second time.
+    encrypted_iter = iter(encrypted_iter)
+
+    # We need to read enough to get the IV (the Initialization Vector at the
+    # start of the encrypted data). We'll keep yielding items until we find
+    # it, or until we have nothing left to read.
+    #
+    # If we don't receive enough for the IV, cryptography will raise a
+    # ValueError below (same as when passing an incomplete payload to
+    # aes_decrypt()).
+    iv_buf = b''
+
+    for item in encrypted_iter:
+        iv_buf += item
+
+        if len(iv_buf) >= AES_BLOCK_SIZE:
+            break
+
+    # Create the cipher as normal.
+    cipher = _create_cipher(iv_buf[:AES_BLOCK_SIZE],
+                            key=key)
+    decryptor = cipher.decryptor()
+
+    # Start with the data after the IV, and then go through the iterator.
+    # Like with aes_encrypt_iter(), we're going to fetch the next item and
+    # *then* yield the previous item. The reason is that we want to ensure
+    # the finalizer is part of the last item yielded.
+    prev_item = iv_buf[AES_BLOCK_SIZE:] or None
+
+    for item in encrypted_iter:
+        if prev_item is not None:
+            yield decryptor.update(prev_item)
+
+        prev_item = item
+
+    if prev_item is not None:
+        yield decryptor.update(prev_item) + decryptor.finalize()
+    else:
+        yield decryptor.finalize()
diff --git a/djblets/secrets/tests/test_crypto_utils.py b/djblets/secrets/tests/test_crypto_utils.py
index 65df41096cd34d1b4f17c9da393f4b657768cf45..aa85b69ac13c43b4828dc348880e5ab4b664da03 100644
--- a/djblets/secrets/tests/test_crypto_utils.py
+++ b/djblets/secrets/tests/test_crypto_utils.py
@@ -1,11 +1,15 @@
 """Unit tests for djblets.secrets.crypto."""
 
+import inspect
+
 from django.test.utils import override_settings
 
 from djblets.secrets.crypto import (aes_decrypt,
                                     aes_decrypt_base64,
+                                    aes_decrypt_iter,
                                     aes_encrypt,
                                     aes_encrypt_base64,
+                                    aes_encrypt_iter,
                                     get_default_aes_encryption_key)
 from djblets.testing.testcases import TestCase
 
@@ -107,6 +111,148 @@ class AESDecryptBase64(BaseAESTestCase):
         self.assertEqual(decrypted, self.PASSWORD_UNICODE)
 
 
+class AESDecryptIterTests(BaseAESTestCase):
+    """Unit tests for aes_decrypt_iter."""
+
+    def test_with_bytes(self):
+        """Testing aes_decrypt_iter with byte string"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        def _gen_data():
+            yield (b'\xfd\x1c\xccF\xe8\xa8\xe3\x94`\xfa\xcf\xc7\x11\xeabD'
+                   b'\xdft\xfa;Y\x06\xb2\xb3\xae\x10#"kR\x13')
+            yield b'\xc7\x17/\x02w\xe0\xe4\xa32L/\x9d\r\xd1v\xa2\xec\xb41\x82'
+            yield (b'\xa8\x04\xa9M\t\xac\x92\x9d|\xc0\xb3\xa4\x1f+\xab\x0c\t'
+                   b'\xc4\x80\x8c')
+            yield b'*'
+
+        decrypted = aes_decrypt_iter(_gen_data())
+
+        self.assertTrue(inspect.isgenerator(decrypted))
+        self.assertEqual(b''.join(decrypted),
+                         b'this is a test\n'
+                         b'and this is another\n'
+                         b'hey look three tests!')
+
+    def test_with_list(self):
+        """Testing aes_decrypt_iter with list"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        decrypted = aes_decrypt_iter([
+            (b'\xfd\x1c\xccF\xe8\xa8\xe3\x94`\xfa\xcf\xc7\x11\xeabD'
+             b'\xdft\xfa;Y\x06\xb2\xb3\xae\x10#"kR\x13'),
+
+            b'\xc7\x17/\x02w\xe0\xe4\xa32L/\x9d\r\xd1v\xa2\xec\xb41\x82',
+
+            (b'\xa8\x04\xa9M\t\xac\x92\x9d|\xc0\xb3\xa4\x1f+\xab\x0c\t'
+             b'\xc4\x80\x8c'),
+
+            b'*',
+        ])
+
+        self.assertTrue(inspect.isgenerator(decrypted))
+        self.assertEqual(b''.join(decrypted),
+                         b'this is a test\n'
+                         b'and this is another\n'
+                         b'hey look three tests!')
+
+    def test_with_empty_iter(self):
+        """Testing aes_encrypt_iter with empty iterator"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        decrypted = aes_decrypt_iter([])
+
+        self.assertTrue(inspect.isgenerator(decrypted))
+
+        with self.assertRaisesMessage(ValueError,
+                                      'Invalid IV size (0) for CFB8.'):
+            list(decrypted)
+
+    def test_with_empty_str(self):
+        """Testing aes_encrypt_iter with only empty string from iterator"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        decrypted = aes_decrypt_iter([b''])
+
+        self.assertTrue(inspect.isgenerator(decrypted))
+
+        with self.assertRaisesMessage(ValueError,
+                                      'Invalid IV size (0) for CFB8.'):
+            list(decrypted)
+
+    def test_with_iv_data_split(self):
+        """Testing aes_encrypt_iter with IV and encrypted data split across
+        items
+        """
+        def _gen_data():
+            data = (
+                b'\xfd\x1c\xccF\xe8\xa8\xe3\x94`\xfa\xcf\xc7\x11\xeabD\xdft'
+                b'\xfa;Y\x06\xb2\xb3\xae\x10#"kR\x13\xc7\x17/\x02w\xe0\xe4'
+                b'\xa32L/\x9d\r\xd1v\xa2\xec\xb41\x82\xa8\x04\xa9M\t\xac\x92'
+                b'\x9d|\xc0\xb3\xa4\x1f+\xab\x0c\t\xc4\x80\x8c*'
+            )
+
+            # Iterating over a byte string yields integers, unless we slice.
+            # Hence needing to do this.
+            yield from (
+                data[i:i + 1]
+                for i in range(len(data))
+            )
+
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        decrypted = aes_decrypt_iter(_gen_data())
+
+        self.assertTrue(inspect.isgenerator(decrypted))
+        self.assertEqual(b''.join(decrypted),
+                         b'this is a test\n'
+                         b'and this is another\n'
+                         b'hey look three tests!')
+
+    def test_with_just_iv_finalizer(self):
+        """Testing aes_encrypt_iter with only IV + finalizer in encypted data
+        """
+        def _gen_data():
+            yield b'\xa9;\xb1)b\xaeSF.S\xb4\x8f\x8d\xecT\xfb'
+
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        decrypted = aes_decrypt_iter(_gen_data())
+
+        self.assertTrue(inspect.isgenerator(decrypted))
+        self.assertEqual(b''.join(decrypted),
+                         b'')
+
+    def test_with_custom_key(self):
+        """Testing aes_decrypt_iter with custom key"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        def _gen_data():
+            yield (b'\xbbq\xb8w\xf8s!\x10E4+\x89^0\x00\x04A\xcc\x82"\xa7\xc6'
+                   b'\xe0\xfe\x93M\x0b\xdc\xdf\xe1\xfb')
+            yield (b'?\x06\xef\x94\xda\x88\xb7\xd0\xbfM\xc8@\x80\xff\xde\x8a'
+                   b'\xc4\xab\xb9h')
+            yield (b'B\x97\x82am\x9b\x8d\xf0\x8c\x10\xfa\xb1\xf7\xc7\x9c\x97'
+                   b'9ZbA')
+            yield b'\x01'
+
+        decrypted = aes_decrypt_iter(_gen_data(),
+                                     key=self.CUSTOM_KEY)
+
+        self.assertTrue(inspect.isgenerator(decrypted))
+        self.assertEqual(b''.join(decrypted),
+                         b'this is a test\n'
+                         b'and this is another\n'
+                         b'hey look three tests!')
+
+
 class AESEncryptTests(BaseAESTestCase):
     """Unit tests for aes_encrypt."""
 
@@ -177,6 +323,94 @@ class AESEncryptBase64(BaseAESTestCase):
         self.assertEqual(decrypted, self.PASSWORD_UNICODE)
 
 
+class AESEncryptIterTests(BaseAESTestCase):
+    """Unit tests for aes_encrypt_iter."""
+
+    def test_with_bytes(self):
+        """Testing aes_encrypt_iter with byte string"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        def _gen_data():
+            yield b'this is a test\n'
+            yield b'and this is another\n'
+            yield b'hey look three tests'
+            yield b'!'
+
+        encrypted = aes_encrypt_iter(_gen_data())
+
+        self.assertTrue(inspect.isgenerator(encrypted))
+        self.assertEqual(aes_decrypt(b''.join(encrypted)),
+                         b'this is a test\n'
+                         b'and this is another\n'
+                         b'hey look three tests!')
+
+    def test_with_unicode(self):
+        """Testing aes_encrypt_iter with Unicode string"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        def _gen_data():
+            yield 'this is a test\n'
+            yield 'and this is another\n'
+            yield 'hey look three tests'
+            yield '!'
+
+        encrypted = aes_encrypt_iter(_gen_data())
+
+        self.assertTrue(inspect.isgenerator(encrypted))
+        self.assertEqual(aes_decrypt(b''.join(encrypted)),
+                         b'this is a test\n'
+                         b'and this is another\n'
+                         b'hey look three tests!')
+
+    def test_with_list(self):
+        """Testing aes_encrypt_iter with list"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        encrypted = aes_encrypt_iter([
+            b'this is a test\n',
+            b'and this is another\n',
+            b'hey look three tests',
+            b'!',
+        ])
+
+        self.assertTrue(inspect.isgenerator(encrypted))
+        self.assertEqual(aes_decrypt(b''.join(encrypted)),
+                         b'this is a test\n'
+                         b'and this is another\n'
+                         b'hey look three tests!')
+
+    def test_with_empty_iter(self):
+        """Testing aes_encrypt_iter with empty iterator"""
+        encrypted = aes_encrypt_iter([])
+
+        self.assertTrue(inspect.isgenerator(encrypted))
+        self.assertEqual(aes_decrypt(b''.join(encrypted)),
+                         b'')
+
+    def test_with_custom_key(self):
+        """Testing aes_encrypt_iter with custom key"""
+        # The encrypted value will change every time, since the iv changes,
+        # so we can't compare a direct value. Instead, we need to ensure that
+        # we can decrypt what we encrypt.
+        def _gen_data():
+            yield b'this is a test\n'
+            yield b'and this is another\n'
+            yield b'hey look three tests'
+            yield b'!'
+
+        encrypted = aes_encrypt_iter(_gen_data(), key=self.CUSTOM_KEY)
+        decrypted = aes_decrypt(b''.join(encrypted), key=self.CUSTOM_KEY)
+
+        self.assertIsInstance(decrypted, bytes)
+        self.assertEqual(decrypted,
+                         b'this is a test\n'
+                         b'and this is another\n'
+                         b'hey look three tests!')
+
+
 class GetDefaultAESEncryptionKeyTests(BaseAESTestCase):
     """Unit tests for get_default_aes_encryption_key."""
 
