Skip to Content
Product ApplicationApplication ScenarioBuild real-time messaging system

Building Real-Time Messaging Systems

Redis’s Pub/Sub system can construct real-time messaging systems, such as many developers use Pub/Sub to build real-time chat systems.

import redis.clients.jedis.*; import java.util.Date; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.commons.lang3.RandomStringUtils; class PrintListener extends JedisPubSub{ @Override public void onMessage(String channel, String message) { String time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"); System.out.println("message receive:" + message + ",channel:" + channel + "..." + time); //Here we can unsubscribe if(message.equalsIgnoreCase("quit")){ this.unsubscribe(channel); } } } class PubClient { private Jedis jedis; public PubClient(String host,int port){ jedis = new Jedis(host,port); } public void pub(String channel,String message){ jedis.publish(channel, message); } public void close(String channel){ jedis.publish(channel, "quit"); jedis.del(channel);//Real-time messaging system } } class SubClient { private Jedis jedis;// public SubClient(String host,int port){ jedis = new Jedis(host,port); } public void sub(JedisPubSub listener,String channel){ jedis.subscribe(listener, channel); //This place will be blocked, at the client code level for JedisPubSub in handling messages, will "monoport" link //and the way of while loop is taken, listening to the subscribed messages } } public class PubSubTest { /** * @param args */ static String host = "127.0.0.1"; static int port = 10011; public static void main(String[] args) throws Exception{ PubClient pubClient = new PubClient(host,port); final String channel = "pubsub-channel"; pubClient.pub(channel, "before1"); pubClient.pub(channel, "before2"); Thread.sleep(2000); //The message subscriber is very special and needs to monopolize the connection, so we need to create a new connection for it; //in addition, the implementation of the client also ensures the "link monoport" feature, the sub method will keep blocking, //until the listener.unsubscribe method is called Thread subThread = new Thread(new Runnable() { @Override public void run() { try{ SubClient subClient = new SubClient(host,port); System.out.println("----------subscribe operation begin-------"); JedisPubSub listener = new PrintListener(); //At the API level, this is a polling operation, and it will return only when unsubscribe is called subClient.sub(listener, channel); System.out.println("----------subscribe operation end-------") ; }catch(Exception e){ e.printStackTrace(); } } }); subThread.start(); int i=0; while(i < 10){ String message = RandomStringUtils.random(6, true, true);//apache-commons pubClient.pub(channel, message); i++; Thread.sleep(1000); } //Passive closing instruction, if the publisher in the channel confirms that the channel needs to be closed, then send a "quit" //then when the listener.onMessage() receives a "quit", other subscription clients will execute the "unsubscribe" operation. pubClient.close(channel); //Additionally, you can unsubscribe like this //listener.unsubscribe(channel); } }

Output:

----------subscribe operation begin------- message receive:erRIEe,channel:pubsub-channel...2016-03-15 15:53:52 message receive:Ovcwiw,channel:pubsub-channel...2016-03-15 15:53:53 message receive:STPWfV,channel:pubsub-channel...2016-03-15 15:53:54 message receive:SR4iIk,channel:pubsub-channel...2016-03-15 15:53:55 message receive:GI3Ege,channel:pubsub-channel...2016-03-15 15:53:56 message receive:0V1JUt,channel:pubsub-channel...2016-03-15 15:53:57 message receive:3iU8BV,channel:pubsub-channel...2016-03-15 15:53:58 message receive:BqeI2x,channel:pubsub-channel...2016-03-15 15:53:59 message receive:D53cHF,channel:pubsub-channel...2016-03-15 15:54:00 message receive:quit,channel:pubsub-channel...2016-03-15 15:54:01 ----------subscribe operation end-------