在默认情况下,一旦RabbitMQ重启的话,我们定义的exchange和queue都会消失,在queue里面的消息也自然消失的无影无踪。这样肯定是不合理的,毕竟谁都无法保证RabbitMQ服务器永远不用重启,也永远不会出故障。那么怎么保证在RabbitMQ服务器重启后消息不会丢失呢?这里我们就要用到RabbitMQ的持久化。

要完成消息的持久化需要三个步骤:

  1. 把exchange的durable属性设成true

  2. 把queue的durable属性设成true

  3. 把消息的deliveryMode设成2

第一和第二步是把exchange和queue设置为持久化,第三部是持久化消息。三步缺一不可,如果不做第三步则消息会在重启后消失,但exchange和queue不会。下面我们来看看具体代码:

package com.jaeger.persistence;import java.io.IOException;import java.util.concurrent.TimeoutException;import org.junit.Test;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.AMQP.BasicProperties.Builder;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer {	private static final String MY_EXCHANGE_NAME = "MyExchange";	private static final String MY_ROUTING_KEY = "MyRoutingKey";	private static final String MY_QUEUE_NAME = "MyQueue";	private static final String DIRECT = "direct";	private static final String HOST = "172.19.64.28";	private static final String USER = "jaeger";	private static final String PASSWORD = "root";	private static final int PORT = 5672;	@Test	public void createExchangeAndQueue() throws IOException, TimeoutException {		ConnectionFactory connectionFactory = new ConnectionFactory();		connectionFactory.setHost(HOST);		connectionFactory.setUsername(USER);		connectionFactory.setPassword(PASSWORD);		connectionFactory.setPort(PORT);		Connection connection = connectionFactory.newConnection();		Channel channel = connection.createChannel();		//把exchange的durable属性设成true		channel.exchangeDeclare(MY_EXCHANGE_NAME, DIRECT, true);		//把queue的durable属性设成true		channel.queueDeclare(MY_QUEUE_NAME, true, false, false, null);		channel.queueBind(MY_QUEUE_NAME, MY_EXCHANGE_NAME, MY_ROUTING_KEY);		channel.close();		connection.close();	}	@Test	public void produce() throws IOException, TimeoutException {		ConnectionFactory connectionFactory = new ConnectionFactory();		connectionFactory.setHost(HOST);		connectionFactory.setUsername(USER);		connectionFactory.setPassword(PASSWORD);		connectionFactory.setPort(PORT);		Connection connection = connectionFactory.newConnection();		Channel channel = connection.createChannel();		String message = "Hello 世界!";		//把消息的deliveryMode设成2		BasicProperties props = new BasicProperties.Builder().deliveryMode(2).build();		channel.basicPublish(MY_EXCHANGE_NAME, MY_ROUTING_KEY, props, message.getBytes("utf-8"));		System.out.println("Sent '" + message + "'");		channel.close();		connection.close();	}}