Redis Pub/Sub
Redis Pub/Sub (Publish/Subscribe) là một mô hình nhắn tin cho phép các thành phần trong hệ thống giao tiếp với nhau theo cách không đồng bộ. Nó hoạt động dựa trên cơ chế nhà xuất bản (publisher) gửi tin nhắn đến một kênh (channel) và người đăng ký (subscriber) nhận tin nhắn từ kênh đó mà không cần biết ai gửi. Dưới đây là giải thích chi tiết và dễ hiểu:
1. Cách Redis Pub/Sub hoạt động
Publisher: Gửi tin nhắn đến một kênh cụ thể bằng lệnh
PUBLISH
. Ví dụ:PUBLISH mychannel "Xin chào"
.Subscriber: Đăng ký để nhận tin nhắn từ một hoặc nhiều kênh bằng lệnh
SUBSCRIBE
hoặcPSUBSCRIBE
(cho mẫu kênh).Channel: Là tên định danh (string) để gửi/nhận tin nhắn. Tin nhắn được gửi đến kênh sẽ đến tất cả các subscriber của kênh đó.
Redis không lưu trữ tin nhắn sau khi gửi. Nếu subscriber không kết nối tại thời điểm tin nhắn được gửi, nó sẽ không nhận được tin nhắn.
2. Các lệnh cơ bản
PUBLISH channel message
: Gửi tin nhắn đến kênh. Ví dụ:PUBLISH mychannel "Hello, world!"
SUBSCRIBE channel [channel ...]
: Đăng ký nhận tin nhắn từ một hoặc nhiều kênh. Ví dụ:SUBSCRIBE mychannel
Sau khi chạy lệnh này, client sẽ ở chế độ lắng nghe và nhận tin nhắn từ kênh
mychannel
.UNSUBSCRIBE [channel ...]
: Hủy đăng ký kênh.PSUBSCRIBE pattern [pattern ...]
: Đăng ký theo mẫu kênh (dùng ký tự wildcard như*
). Ví dụ:PSUBSCRIBE news.*
Nhận tin nhắn từ tất cả kênh bắt đầu bằng
news.
(nhưnews.sport
,news.tech
).PUNSUBSCRIBE [pattern ...]
: Hủy đăng ký mẫu kênh.PUBSUB
: Cung cấp thông tin về trạng thái Pub/Sub, ví dụ:PUBSUB CHANNELS
: Liệt kê các kênh đang hoạt động.PUBSUB NUMSUB channel
: Đếm số subscriber của kênh.
3. Ví dụ minh họa
Giả sử bạn có 2 terminal chạy Redis CLI:
Terminal 1 (Subscriber):
SUBSCRIBE mychannel
Terminal này sẽ chờ và hiển thị tin nhắn từ kênh mychannel
.
Terminal 2 (Publisher):
PUBLISH mychannel "Xin chào mọi người!"
Kết quả ở Terminal 1:
1) "message"
2) "mychannel"
3) "Xin chào mọi người!"
4. Ứng dụng thực tế
Chat thời gian thực: Gửi tin nhắn đến các phòng chat (mỗi phòng là một kênh).
Thông báo sự kiện: Báo cáo trạng thái hệ thống, ví dụ: cập nhật đơn hàng, thông báo lỗi.
Cập nhật dữ liệu trực tiếp: Gửi dữ liệu mới (như giá chứng khoán) đến nhiều client.
Hệ thống phân tán: Điều phối công việc giữa các dịch vụ.
5. Ưu điểm
Nhẹ và nhanh: Redis Pub/Sub rất hiệu quả, phù hợp với các hệ thống thời gian thực.
Đơn giản: API dễ sử dụng, không cần cấu hình phức tạp.
Hỗ trợ mẫu kênh:
PSUBSCRIBE
cho phép linh hoạt trong việc lọc tin nhắn.
6. Hạn chế
Không lưu trữ tin nhắn: Nếu subscriber không hoạt động, tin nhắn sẽ bị mất (khác với các hệ thống như Kafka).
Không đảm bảo phân phối: Nếu kết nối bị gián đoạn, tin nhắn có thể không được gửi đến.
Không phù hợp với khối lượng lớn: Pub/Sub tối ưu cho các hệ thống nhỏ và trung bình, không thay thế được các message queue mạnh mẽ hơn.
1. Chuẩn bị
Cài Redis server trên máy của bạn và đảm bảo nó đang chạy (mặc định trên
localhost:6379
).Thêm thư viện Jedis vào dự án. Nếu dùng Maven, thêm dependency sau vào
pom.xml
:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.1.5</version> <!-- Kiểm tra phiên bản mới nhất -->
</dependency>
2. Ví dụ code
Subscriber
Subscriber sẽ lắng nghe tin nhắn từ một kênh cụ thể (mychannel
). Chúng ta cần tạo một lớp kế thừa JedisPubSub
để xử lý các sự kiện khi nhận được tin nhắn.
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisSubscriber {
public static void main(String[] args) {
// Kết nối đến Redis
try (Jedis jedis = new Jedis("localhost", 6379)) {
// Tạo một đối tượng JedisPubSub để xử lý tin nhắn
JedisPubSub pubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("Nhận tin nhắn từ kênh " + channel + ": " + message);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("Đã đăng ký kênh: " + channel);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println("Đã hủy đăng ký kênh: " + channel);
}
};
// Đăng ký kênh "mychannel"
System.out.println("Đang chờ tin nhắn...");
jedis.subscribe(pubSub, "mychannel");
// Lưu ý: subscribe() là blocking call, chương trình sẽ dừng tại đây
// để lắng nghe tin nhắn cho đến khi hủy đăng ký
}
}
}
Giải thích:
JedisPubSub
là một lớp trừu tượng cung cấp các phương thức để xử lý sự kiện nhưonMessage
,onSubscribe
, v.v.jedis.subscribe(pubSub, "mychannel")
đăng ký lắng nghe kênhmychannel
. Phương thức này chặn luồng (blocking) cho đến khi hủy đăng ký.
Publisher
Publisher gửi tin nhắn đến kênh mychannel
.
import redis.clients.jedis.Jedis;
public class RedisPublisher {
public static void main(String[] args) {
// Kết nối đến Redis
try (Jedis jedis = new Jedis("localhost", 6379)) {
// Gửi tin nhắn đến kênh "mychannel"
String channel = "mychannel";
String message = "Xin chào từ Java!";
jedis.publish(channel, message);
System.out.println("Đã gửi tin nhắn: " + message + " đến kênh: " + channel);
}
}
}
Giải thích:
jedis.publish(channel, message)
gửi tin nhắn đến kênh được chỉ định.Publisher không cần biết có bao nhiêu subscriber đang lắng nghe.
3. Cách chạy
Khởi động Redis server:
Nếu bạn dùng máy local, chạy lệnh:
redis-server
Đảm bảo Redis chạy trên
localhost:6379
(hoặc cấu hình lại trong code nếu khác).
Chạy Subscriber:
Chạy lớp
RedisSubscriber
trước. Bạn sẽ thấy output:Đang chờ tin nhắn... Đã đăng ký kênh: mychannel
Chạy Publisher:
Chạy lớp
RedisPublisher
. Output của Publisher:Đã gửi tin nhắn: Xin chào từ Java! đến kênh: mychannel
Ở cửa sổ Subscriber, bạn sẽ thấy:
Nhận tin nhắn từ kênh mychannel: Xin chào từ Java!
4. Mở rộng: Đăng ký nhiều kênh hoặc dùng mẫu kênh
Nếu muốn lắng nghe nhiều kênh hoặc dùng mẫu kênh (pattern), bạn có thể sửa code Subscriber như sau:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class RedisPatternSubscriber {
public static void main(String[] args) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
JedisPubSub pubSub = new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
System.out.println("Nhận từ mẫu [" + pattern + "], kênh: " + channel + ", tin nhắn: " + message);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
System.out.println("Đã đăng ký mẫu: " + pattern);
}
};
// Đăng ký mẫu kênh "news.*"
System.out.println("Đang chờ tin nhắn từ các kênh news.*...");
jedis.psubscribe(pubSub, "news.*");
}
}
}
Publisher gửi tin nhắn đến kênh news.sport
:
import redis.clients.jedis.Jedis;
public class RedisPatternPublisher {
public static void main(String[] args) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
jedis.publish("news.sport", "Bóng đá: Việt Nam thắng!");
System.out.println("Đã gửi tin nhắn đến news.sport");
}
}
}
Kết quả ở Subscriber:
Đang chờ tin nhắn từ các kênh news.*...
Đã đăng ký mẫu: news.*
Nhận từ mẫu [news.*], kênh: news.sport, tin nhắn: Bóng đá: Việt Nam thắng!
5. Lưu ý
Blocking: Phương thức
subscribe
vàpsubscribe
chặn luồng hiện tại. Nếu bạn cần xử lý đồng thời, hãy chạy Subscriber trong một thread riêng.Xử lý lỗi: Đảm bảo xử lý ngoại lệ khi kết nối Redis bị gián đoạn.
Hiệu suất: Jedis không tự động quản lý pool kết nối trong ví dụ đơn giản này. Trong ứng dụng thực tế, dùng
JedisPool
để quản lý kết nối hiệu quả hơn.Không lưu trữ tin nhắn: Như đã đề cập, Redis Pub/Sub không lưu tin nhắn, nên nếu Subscriber không hoạt động, nó sẽ bỏ lỡ tin nhắn.
6. Ví dụ với JedisPool (thực tế hơn)
Dùng JedisPool
để quản lý kết nối trong môi trường sản xuất:
Subscriber với JedisPool:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
public class RedisSubscriberWithPool {
public static void main(String[] args) {
JedisPool jedisPool = new JedisPool("localhost", 6379);
try (Jedis jedis = jedisPool.getResource()) {
JedisPubSub pubSub = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("Nhận: " + message + " từ kênh: " + channel);
}
};
System.out.println("Đang chờ tin nhắn...");
jedis.subscribe(pubSub, "mychannel");
} finally {
jedisPool.close();
}
}
}
Last updated