Redis Stream

Redis Streams là một cấu trúc dữ liệu mạnh mẽ trong Redis, được giới thiệu từ phiên bản 5.0, dùng để quản lý luồng dữ liệu theo kiểu append-only log. Nó rất phù hợp cho các ứng dụng cần xử lý sự kiện thời gian thực, hàng đợi tin nhắn, hoặc lưu trữ lịch sử dữ liệu. So với Redis Pub/Sub, Streams cung cấp khả năng lưu trữ tin nhắn, xử lý nhóm người tiêu dùng (consumer groups), và đọc lại dữ liệu khi cần.

Dưới đây là hướng dẫn chi tiết và dễ hiểu về Redis Streams, kèm ví dụ bằng Java sử dụng thư viện Jedis.


1. Redis Streams là gì?

  • Streams là một danh sách các mục (entry) được thêm vào theo thứ tự thời gian, mỗi mục có một ID duy nhất và chứa dữ liệu dạng key-value.

  • Nó hỗ trợ:

    • Thêm dữ liệu vào luồng (stream).

    • Đọc dữ liệu từ luồng theo nhiều cách (từ đầu, từ ID cụ thể, hoặc dữ liệu mới).

    • Consumer Groups: Cho phép nhiều consumer xử lý tin nhắn theo kiểu phân phối công việc, tương tự Kafka.

    • Xác nhận tin nhắn: Consumer có thể xác nhận (ack) tin nhắn đã xử lý.

  • Không như Pub/Sub (mất tin nhắn nếu không có subscriber), Streams lưu trữ dữ liệu cho đến khi bạn xóa hoặc giới hạn kích thước.


2. Các khái niệm chính

  • Entry: Một tin nhắn trong stream, gồm:

    • ID: Định danh duy nhất, thường là <timestamp>-<sequence> (ví dụ: 1698765432123-0). Có thể tự chỉ định hoặc dùng * để Redis tự tạo.

    • Data: Dữ liệu dạng key-value (ví dụ: user: "Alice", message: "Hello").

  • Stream: Một danh sách các entry, được xác định bởi key trong Redis (ví dụ: mystream).

  • Consumer Group: Một nhóm consumer đọc và xử lý tin nhắn từ stream, đảm bảo mỗi tin nhắn chỉ được xử lý bởi một consumer trong nhóm.

  • Pending Entry List (PEL): Danh sách các tin nhắn đã được gửi đến consumer nhưng chưa được xác nhận (ack).


3. Các lệnh cơ bản

  • XADD key ID field value [field value ...]: Thêm entry vào stream.

    XADD mystream * user Alice message "Xin chào"

    (* để Redis tự tạo ID dựa trên thời gian).

  • XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]: Đọc dữ liệu từ stream.

    XREAD STREAMS mystream 0

    (Đọc tất cả entry từ đầu stream).

  • XGROUP CREATE key groupname ID: Tạo consumer group.

    XGROUP CREATE mystream mygroup $

    ($ nghĩa là bắt đầu từ tin nhắn mới nhất).

  • XREADGROUP GROUP groupname consumername [COUNT count] STREAMS key [key ...] ID: Đọc tin nhắn từ stream cho consumer trong nhóm.

    XREADGROUP GROUP mygroup consumer1 STREAMS mystream >

    (> để lấy tin nhắn mới chưa được xử lý).

  • XACK key groupname ID: Xác nhận tin nhắn đã được xử lý.

    XACK mystream mygroup 1698765432123-0
  • XDEL key ID: Xóa entry theo ID.

  • XTRIM key MAXLEN count: Giới hạn số entry trong stream.

    XTRIM mystream MAXLEN 1000
  • XLEN key: Đếm số entry trong stream.

  • XRANGE key start end [COUNT count]: Lấy các entry trong khoảng ID.

    XRANGE mystream - +

    (-+ là ID nhỏ nhất và lớn nhất).


4. So sánh với Pub/Sub

Tính năng
Pub/Sub
Streams

Lưu trữ tin nhắn

Không (fire-and-forget)

Có (persistent)

Đọc lại dữ liệu

Không

Có (dựa trên ID)

Consumer Groups

Không

Có (phân phối tin nhắn)

Xác nhận xử lý

Không

Có (ACK)

Use case

Chat, thông báo thời gian thực

Hàng đợi, xử lý sự kiện, log


5. Ví dụ với Java (Jedis)

Chuẩn bị

  • Thêm Jedis vào dự án (như đã hướng dẫn trong câu trả lời trước):

    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.1.5</version>
    </dependency>
  • Đảm bảo Redis server đang chạy trên localhost:6379.


Producer (Thêm dữ liệu vào Stream)

import redis.clients.jedis.Jedis;
import java.util.HashMap;
import java.util.Map;

public class StreamProducer {
    public static void main(String[] args) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            // Tạo dữ liệu dạng key-value
            Map<String, String> message = new HashMap<>();
            message.put("user", "Alice");
            message.put("message", "Xin chào từ Java!");

            // Thêm entry vào stream "mystream"
            String streamKey = "mystream";
            String entryId = jedis.xadd(streamKey, null, message); // null để Redis tự tạo ID
            System.out.println("Đã thêm entry với ID: " + entryId);
        }
    }
}

Giải thích:

  • jedis.xadd(streamKey, null, message): Thêm entry vào stream, null để Redis tự tạo ID.

  • entryId trả về ID của entry vừa thêm (ví dụ: 1698765432123-0).


Consumer (Đọc dữ liệu từ Stream)

Cách 1: Đọc trực tiếp (không dùng Consumer Group)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;

import java.util.List;

public class StreamConsumer {
    public static void main(String[] args) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            String streamKey = "mystream";

            // Đọc tất cả entry từ đầu stream
            List<StreamEntry> entries = jedis.xrange(streamKey, null, null, Integer.MAX_VALUE);
            for (StreamEntry entry : entries) {
                System.out.println("Entry ID: " + entry.getID());
                entry.getFields().forEach((key, value) ->
                        System.out.println("  " + key + ": " + value));
            }
        }
    }
}

Output (giả sử đã chạy Producer):

Entry ID: 1698765432123-0
  user: Alice
  message: Xin chào từ Java!

Giải thích:

  • jedis.xrange(streamKey, null, null, Integer.MAX_VALUE): Lấy tất cả entry từ stream.

  • StreamEntry chứa ID và dữ liệu dạng key-value.


Cách 2: Đọc với Consumer Group

Consumer Group cho phép nhiều consumer phân chia công việc xử lý tin nhắn.

Tạo Consumer Group và Consumer:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import java.util.List;
import java.util.Map;

public class StreamConsumerGroup {
    public static void main(String[] args) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            String streamKey = "mystream";
            String groupName = "mygroup";
            String consumerName = "consumer1";

            // Tạo consumer group nếu chưa tồn tại
            try {
                jedis.xgroupCreate(streamKey, groupName, StreamEntryID.LAST_ENTRY, true);
                System.out.println("Đã tạo consumer group: " + groupName);
            } catch (Exception e) {
                System.out.println("Consumer group đã tồn tại.");
            }

            // Đọc tin nhắn mới từ group
            while (true) {
                List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(
                        groupName,
                        consumerName,
                        1, // count
                        1000, // block (ms)
                        false, // noAck
                        Map.entry(streamKey, StreamEntryID.UNDELIVERED_ENTRIES) // ">" để lấy tin nhắn mới
                );

                if (entries != null) {
                    for (Map.Entry<String, List<StreamEntry>> streamEntries : entries) {
                        for (StreamEntry entry : streamEntries.getValue()) {
                            System.out.println("Consumer " + consumerName + " nhận Entry ID: " + entry.getID());
                            entry.getFields().forEach((key, value) ->
                                    System.out.println("  " + key + ": " + value));

                            // Xác nhận (ack) tin nhắn
                            jedis.xack(streamKey, groupName, entry.getID());
                            System.out.println("Đã ACK entry: " + entry.getID());
                        }
                    }
                }
            }
        }
    }
}

Giải thích:

  • jedis.xgroupCreate: Tạo consumer group nếu chưa có.

  • jedis.xreadGroup: Đọc tin nhắn từ stream cho consumer trong nhóm, dùng > để lấy tin nhắn mới.

  • jedis.xack: Xác nhận tin nhắn đã xử lý để nó không được gửi lại.

Cách chạy:

  1. Chạy StreamProducer để thêm dữ liệu.

  2. Chạy StreamConsumerGroup. Output sẽ giống:

    Đã tạo consumer group: mygroup
    Consumer consumer1 nhận Entry ID: 1698765432123-0
      user: Alice
      message: Xin chào từ Java!
    Đã ACK entry: 1698765432123-0

6. Ứng dụng thực tế

  • Hàng đợi công việc: Phân phối task cho nhiều worker (như xử lý thanh toán, gửi email).

  • Lưu trữ sự kiện: Ghi lại các sự kiện của hệ thống để phân tích sau (event sourcing).

  • Xử lý log: Lưu và phân tích log từ nhiều nguồn.

  • Thông báo thời gian thực: Kết hợp với Pub/Sub để gửi thông báo sau khi xử lý dữ liệu từ stream.


7. Ưu điểm

  • Lưu trữ dữ liệu: Không mất tin nhắn như Pub/Sub.

  • Consumer Groups: Hỗ trợ phân phối tin nhắn, lý tưởng cho hệ thống phân tán.

  • Linh hoạt: Có thể đọc lại dữ liệu, giới hạn kích thước stream.

  • Hiệu suất cao: Redis Streams rất nhanh và nhẹ.


8. Hạn chế

  • Không thay thế Kafka: Streams không được thiết kế cho khối lượng dữ liệu cực lớn hoặc hệ thống phức tạp như Apache Kafka.

  • Quản lý thủ công: Cần tự quản lý việc xóa entry hoặc giới hạn stream để tránh đầy bộ nhớ.

  • Phụ thuộc Redis: Chỉ hoạt động trong hệ sinh thái Redis.


9. Mẹo sử dụng

  • Dùng XTRIM để giới hạn kích thước stream, tránh tiêu tốn bộ nhớ.

  • Kết hợp Consumer Groups với nhiều consumer để xử lý song song.

  • Theo dõi Pending Entry List (XPENDING) để xử lý các tin nhắn bị lỗi.

  • Dùng JedisPool trong ứng dụng thực tế để quản lý kết nối hiệu quả.


1. Redis Streams có hỗ trợ retry không?

Redis Streams không có cơ chế retry tự động tích hợp sẵn, nhưng nó cung cấp các công cụ mạnh mẽ để bạn tự triển khai retry theo ý muốn. Dưới đây là cách Redis Streams hỗ trợ retry gián tiếp:

  • Pending Entry List (PEL):

    • Khi một consumer trong Consumer Group đọc một tin nhắn (qua XREADGROUP), tin nhắn đó được đánh dấu là "pending" trong PEL của consumer đó cho đến khi được xác nhận (acknowledged) bằng lệnh XACK.

    • Nếu consumer gặp lỗi và không thể xử lý tin nhắn, tin nhắn vẫn nằm trong PEL. Bạn có thể dùng lệnh XPENDING để kiểm tra các tin nhắn chưa được xử lý và XCLAIM để lấy lại (reclaim) tin nhắn đó cho consumer khác hoặc thử lại.

    • Điều này cho phép bạn triển khai logic retry thủ công, ví dụ: thử lại sau một khoảng thời gian hoặc chuyển tin nhắn cho consumer khác.

  • Cách triển khai retry:

    • Thử lại thủ công: Sau khi phát hiện lỗi, bạn có thể đọc lại tin nhắn từ PEL và xử lý lại.

    • Tự động retry: Viết logic trong code (ví dụ: dùng Java với Jedis) để kiểm tra PEL định kỳ và retry tin nhắn nếu cần.

    • Dead Letter Queue (DLQ): Nếu tin nhắn thất bại sau nhiều lần retry, bạn có thể chuyển nó sang một stream khác để xử lý sau (tương tự DLQ).

  • Ví dụ retry với Jedis:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamPendingEntry;
import java.util.List;

public class StreamRetryExample {
    public static void main(String[] args) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            String streamKey = "mystream";
            String groupName = "mygroup";
            String consumerName = "consumer1";

            // Kiểm tra các tin nhắn pending
            List<StreamPendingEntry> pending = jedis.xpending(streamKey, groupName, null, null, 10, consumerName);
            for (StreamPendingEntry entry : pending) {
                System.out.println("Tin nhắn pending ID: " + entry.getID());
                // Lấy lại tin nhắn để retry
                List<Map.Entry<String, List<StreamEntry>>> reclaimed = jedis.xclaim(
                        streamKey, groupName, consumerName, 0, entry.getID());
                // Xử lý lại tin nhắn ở đây
                // Nếu thành công, gọi XACK
                jedis.xack(streamKey, groupName, entry.getID());
            }
        }
    }
}
  • Hạn chế:

    • Redis Streams không tự động retry như RabbitMQ (có retry plugin) hoặc Kafka (có offset reset). Bạn phải tự viết logic retry.

    • PEL không tự hết hạn, nên bạn cần quản lý việc xóa tin nhắn pending nếu không muốn chúng tích lũy mãi mãi.


2. So sánh Redis Streams với Kafka và RabbitMQ

Dưới đây là bảng so sánh chi tiết giữa Redis Streams, Apache Kafka, và RabbitMQ dựa trên các tiêu chí quan trọng:

Tiêu chí

Redis Streams

Apache Kafka

RabbitMQ

Mô hình

Stream (append-only log) với Consumer Groups.

Log-based messaging với partition và offset.

Message queue với exchange và queue.

Lưu trữ dữ liệu

Lưu trữ tin nhắn cho đến khi bị xóa hoặc giới hạn bằng XTRIM.

Lưu trữ lâu dài (dựa trên retention policy: thời gian hoặc kích thước).

Tin nhắn được xóa sau khi được tiêu thụ (trừ khi cấu hình DLQ).

Retry

Không tự động, nhưng hỗ trợ retry thủ công qua PEL (XPENDING, XCLAIM).

Không tự động, nhưng dễ retry bằng cách reset offset hoặc replay log.

Hỗ trợ retry qua plugin hoặc DLQ (cấu hình TTL và requeue).

Consumer Groups

Có, phân phối tin nhắn giữa các consumer trong nhóm, tương tự Kafka.

Có, partition-based, hỗ trợ scale tốt hơn.

Không có chính thức, nhưng có thể giả lập qua exchange và routing.

Xác nhận (Ack)

Hỗ trợ XACK để xác nhận tin nhắn đã xử lý.

Consumer tự quản lý offset commit.

Hỗ trợ Ack/Nack để xác nhận hoặc từ chối tin nhắn.

Hiệu suất

Rất cao (in-memory), phù hợp với khối lượng trung bình.

Cao, tối ưu cho dữ liệu lớn và phân tán (disk-based).

Tốt cho khối lượng vừa và nhỏ, nhưng chậm hơn Kafka với dữ liệu lớn.

Độ bền (Durability)

In-memory (mặc định), có thể bền hơn với AOF/RDB, nhưng không mạnh bằng Kafka.

Rất bền (replication, disk-based).

Bền (hỗ trợ persistent messages và replication).

Scale

Giới hạn bởi single instance hoặc cluster Redis, không tối ưu cho dữ liệu lớn.

Scale tốt (hàng triệu tin nhắn/giây), phân tán mạnh mẽ.

Scale tốt cho hệ thống vừa, nhưng phức tạp hơn Kafka khi mở rộng.

Cấu hình và vận hành

Đơn giản, dễ triển khai (Redis là lightweight).

Phức tạp, cần quản lý cluster (ZooKeeper hoặc KRaft).

Dễ cấu hình, nhưng cần quản lý exchange/queue phức tạp hơn Redis.

Use case

Hàng đợi công việc, xử lý sự kiện thời gian thực, log đơn giản, chat.

Big data, event streaming, log aggregation, analytics.

Hàng đợi công việc, hệ thống phân tán, ứng dụng cần retry/DLQ rõ ràng.

Dead Letter Queue

Không tích hợp, nhưng có thể tự triển khai bằng stream khác.

Không chính thức, nhưng có thể gửi tin nhắn lỗi sang topic khác.

Hỗ trợ tốt qua cấu hình DLQ và exchange.

Hỗ trợ ngôn ngữ

Tốt (Jedis, Lettuce cho Java, và nhiều thư viện khác).

Tốt (Kafka clients cho nhiều ngôn ngữ).

Tốt (AMQP clients cho hầu hết ngôn ngữ).

Cộng đồng và hệ sinh thái

Cộng đồng lớn, nhưng hệ sinh thái nhỏ hơn Kafka/RabbitMQ.

Hệ sinh thái rất mạnh (Kafka Connect, Streams, KSQL).

Cộng đồng mạnh, nhiều plugin (như retry, shovel).


3. Phân tích chi tiết về Retry

  • Redis Streams:

    • Retry thủ công: Bạn phải tự viết logic retry dựa trên PEL. Điều này linh hoạt nhưng đòi hỏi thêm code để quản lý retry count, delay, hoặc lỗi.

    • Ưu điểm: PEL giúp theo dõi trạng thái tin nhắn dễ dàng, không cần cấu hình phức tạp.

    • Nhược điểm: Không có cơ chế retry tự động hoặc chính sách retry (như exponential backoff) tích hợp sẵn.

  • Kafka:

    • Retry thủ công: Kafka không tự retry, nhưng bạn có thể reset offset của consumer để đọc lại tin nhắn hoặc gửi tin nhắn lỗi sang topic khác.

    • Retry qua code: Thường dùng thư viện (như Spring Kafka) để triển khai retry với exponential backoff hoặc DLQ.

    • Ưu điểm: Offset cho phép quay lại bất kỳ điểm nào trong log, rất linh hoạt.

    • Nhược điểm: Retry phức tạp hơn Redis Streams vì cần quản lý offset và topic.

  • RabbitMQ:

    • Retry tích hợp: RabbitMQ hỗ trợ retry qua plugin hoặc cấu hình (ví dụ: requeue tin nhắn khi Nack hoặc dùng TTL để gửi lại sau).

    • Dead Letter Queue: Dễ dàng cấu hình DLQ để lưu tin nhắn thất bại sau số lần retry nhất định.

    • Ưu điểm: Retry và DLQ được hỗ trợ tốt, ít phải viết code phức tạp.

    • Nhược điểm: Retry có thể làm tăng độ trễ nếu cấu hình không tối ưu (ví dụ: requeue quá nhiều).


4. Khi nào chọn cái nào?

  • Chọn Redis Streams:

    • Khi bạn cần một hệ thống nhẹ, đơn giản để xử lý sự kiện thời gian thực hoặc hàng đợi công việc với khối lượng trung bình.

    • Phù hợp với các ứng dụng đã dùng Redis cho các mục đích khác (cache, session store).

    • Lý tưởng cho: Chat, thông báo, log đơn giản, hoặc hệ thống không cần scale quá lớn.

    • Retry thủ công không phải vấn đề lớn vì bạn có thể viết logic phù hợp.

  • Chọn Kafka:

    • Khi bạn xử lý dữ liệu lớn (hàng triệu tin nhắn/giây) hoặc cần event streaming trong hệ thống phân tán.

    • Phù hợp với: Big data pipelines, log aggregation, real-time analytics.

    • Retry phức tạp hơn, nhưng Kafka mạnh về độ bền và khả năng replay dữ liệu.

  • Chọn RabbitMQ:

    • Khi bạn cần một message queue truyền thống với retry và DLQ dễ cấu hình.

    • Phù hợp với: Hàng đợi công việc, ứng dụng doanh nghiệp cần đảm bảo tin nhắn được xử lý chính xác.

    • Tốt hơn Redis Streams nếu bạn ưu tiên retry tự động và DLQ tích hợp.


5. Ví dụ Retry trong thực tế

Giả sử bạn cần retry tối đa 3 lần cho một tin nhắn:

  • Redis Streams:

    • Lưu số lần retry trong code hoặc thêm field retry_count vào entry.

    • Dùng XPENDING để kiểm tra tin nhắn, nếu retry_count < 3, dùng XCLAIM để retry. Nếu thất bại quá 3 lần, chuyển sang stream DLQ.

    • Code phức tạp hơn nhưng kiểm soát tốt.

  • Kafka:

    • Dùng Spring Kafka với @Retryable hoặc cấu hình retry trong ConsumerConfig.

    • Sau 3 lần thất bại, gửi tin nhắn sang topic DLQ.

    • Cần quản lý offset cẩn thận.

  • RabbitMQ:

    • Cấu hình queue với x-message-ttlx-dead-letter-exchange để tự động retry hoặc chuyển sang DLQ.

    • Ít code hơn, nhưng cần hiểu rõ cách cấu hình exchange/queue.


6. Kết luận

  • Retry trong Redis Streams: Không tự động, nhưng dễ triển khai thủ công nhờ PEL và các lệnh như XPENDING, XCLAIM. Phù hợp nếu bạn muốn kiểm soát chi tiết.

  • So sánh với Kafka/RabbitMQ:

    • Redis Streams: Nhẹ, đơn giản, tốt cho khối lượng trung bình, nhưng thiếu retry tự động và không scale tốt bằng Kafka.

    • Kafka: Mạnh mẽ, bền bỉ, lý tưởng cho dữ liệu lớn và streaming, nhưng phức tạp hơn về vận hành.

    • RabbitMQ: Dễ dùng, retry/DLQ tích hợp tốt, phù hợp cho hàng đợi công việc truyền thống.

Nếu bạn đang xây dựng một hệ thống đơn giản và đã dùng Redis, Streams là lựa chọn tuyệt vời. Nếu cần retry tự động hoặc dữ liệu lớn, cân nhắc RabbitMQ hoặc Kafka tùy vào yêu cầu scale và độ phức tạp.

Last updated