Article From:https://www.cnblogs.com/Eternally-dream/p/9970564.html

1. Publish/Subscribe(Publish/Subscribe) Structure Diagram

  

 

  The figure above shows a consumer consumption message. Instead of storing the message directly into the queue, two consumers declare a queue and bind their respective queues to the switch. In this way, every consumer reads all the messages of their corresponding queue, which greatly reaches a producer’s production consumption.Interest, the purpose for which all consumers can consume.

  Publish/Subscribe can be implemented by setting the switch type to fanout

2. Producer code

package com.wangx.rabbitmq.sp;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "exchange";
    public static void main(String[] args) throws IOException, TimeoutException {

        //Create a Connection Factory
        ConnectionFactory factory = new ConnectionFactory();
        //Setting up Server Host
        factory.setHost("127.0.0.1");
        //Username
        factory.setUsername("wangx");
        //Set password
        factory.setPassword("wangx");
        //Setting up Virtual Host
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //Declaration switch
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            String message = "Hello World!";
            //send message
            for (int i = 0; i < 10; i++) {
                //send message
                channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes());
                System.out.println(" [x] Sent '" + message + i + "'");
            }
        }catch (Exception e) {

        }finally {
            channel.close();
            connection.close();
        }
    }
}

  Unlike regular message producers, a switch must be declared when publishing a subscription, and there is no queue when sending a message, and the name of the switch must be set.

3. Consumer Implementation Code

package com.wangx.rabbitmq.sp;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    /**
     * Queue name*/
    private static String QUEUE_NAME = "queue1";
    private static final String EXCHANGE_NAME = "exchange";
    public static void main(String[] args){

        //Create a Connection Factory
        ConnectionFactory factory = new ConnectionFactory();
        //Setting up Server Host
        factory.setHost("127.0.0.1");
        //Username
        factory.setUsername("wangx");
        //Set password
        factory.setPassword("wangx");
        //Setting up Virtual Host
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        try {
            //Create connection
            connection = factory.newConnection();
            //Create a message channel
            final Channel  channel = connection.createChannel();
            //Declaration switch
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //Declaration queue
            channel.queueDeclare(QUEUE_NAME,false, false, false, null);
            //Binding queues and switches
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //The message server sends only one message to the consumer at a time//            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel){
                //Rewrite the handleDelivery method in DefaultConsumer to get messages in the method
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //The news fell asleep for a second.
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer1 Receive the message'"+message +"'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer1 News consumption is complete...);
                    }

                }
            };
            //Monitoring messages
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  Two different consumers are still used here, and two different consumers declare different message queues separately, then bind the declared queues to the switch, using channel. queueBind (QUEUE_NAME, EXCHANGE_NA).ME, fanout; method. Start two consumers, register and bind two different message queues. Start message producers send messages, and you will see that two different consumers can receive all messages sent by producers.

2. Routing Pattern realization

  Structure chart:

  

  When sending messages, different keys are specified, and switches distribute messages to different consumer queues according to keys.

  Change the switch mode to DIRECT, and set different keys at the consumer side, which is equivalent to classifying messages so that when the switch distributes messages, the message can be distributed to the consumer who holds the key. The producer code is as follows:

package com.wangx.rabbitmq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "exchange-routing";
    public static void main(String[] args) throws IOException, TimeoutException {

        //Create a Connection Factory
        ConnectionFactory factory = new ConnectionFactory();
        //Setting up Server Host
        factory.setHost("127.0.0.1");
        //Username
        factory.setUsername("wangx");
        //Set password
        factory.setPassword("wangx");
        //Setting up Virtual Host
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //Declaration switch
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String message = "Hello World!";
            //send message
            for (int i = 0; i < 10; i++) {
               if ( i % 2 == 0) {
                   //Send a message, specify key
                   channel.basicPublish(EXCHANGE_NAME, "key2", null, (message + i).getBytes());
                   System.out.println(" Even message'"+message + I +"");
               } else {
                   //send message
                   channel.basicPublish(EXCHANGE_NAME, "key1", null, (message + i).getBytes());
                   System.out.println(" Odd number message'"+message + I +"");
               }
            }
        }catch (Exception e) {

        }finally {
            channel.close();
            connection.close();
        }
    }
}

 The consumer code is as follows:

package com.wangx.rabbitmq.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
    /**
     * Queue name*/
    private static String QUEUE_NAME = "queue1";
    private static final String EXCHANGE_NAME = "exchange-routing";
    public static void main(String[] args){

        //Create a Connection Factory
        ConnectionFactory factory = new ConnectionFactory();
        //Setting up Server Host
        factory.setHost("127.0.0.1");
        //Username
        factory.setUsername("wangx");
        //Set password
        factory.setPassword("wangx");
        //Setting up Virtual Host
        factory.setVirtualHost("/wangx");
        Connection connection = null;
        try {
            //Create connection
            connection = factory.newConnection();
            //Create a message channel
            final Channel  channel = connection.createChannel();
            //Declaration switch
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //Declaration queue
            channel.queueDeclare(QUEUE_NAME,false, false, false, null);
            //Binding queues and switches, specifying the key of the received message
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
            //The message server sends only one message to the consumer at a time//            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel){
                //Rewrite the handleDelivery method in DefaultConsumer to get messages in the method
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //The news fell asleep for a second.
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer1 Receive the message'"+message +"'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer1 News consumption is complete...);
                    }

                }
            };
            //Monitoring messages
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

  Two consumers receive an odd number, one receives an even number of messages (different keys are set according to the parity when sending messages), and the other receives the corresponding keys when receiving them.

  This allows you to specify the type of message you want to receive, which is equivalent to the filtering of the MQ message you learned earlier.

  

Leave a Reply

Your email address will not be published. Required fields are marked *