RabbitMQ学习小结(二)----工作队列

wylc123 1年前 ⋅ 771 阅读

RabbitMQ学习小结(二)----工作队列

配置文件设置:

spring:
# rabbitmq config
  rabbitmq: 
    host: 192.168.25.202 #192.168.105.88
    port: 5672
    username: guest #lmz8957@cnki.net
    password: guest #limingzhi
    virtua_host: /
    queue_name: getDataanalyseidsQueue

 

读取Spring Boot配置信息

/**

 * FileName:     MqConfig.java

 * @Description: TODO(用一句话描述该文件做什么)

 * All rights Reserved, Designed By CNKI

 * Copyright:    Copyright(C) 2018-2100

 * Company       CNKI.

 * @author:    SongBin

 * @version    V1.0 

 * Createdate:2018年7月20日 上午10:23:00

 *

 * Modification  History:

 * Date         Author        Version        Discription

 * -----------------------------------------------------------------------------------

 * 2018年7月20日       SongBin          1.0             1.0

 * Why & What is modified: <修改原因描述>

 */
package cnki.bdms.web.ui.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @ClassName: MqConfig
 * @Description: (消息队列配置信息)
 * @author: SongBin
 * @date: 2018年7月20日 上午10:23:00
 */
@Component
@ConfigurationProperties(prefix="spring.rabbitmq")
public class MqConfig {
	private static String host;
	private static String port;
	private static String username;
	private static String password;
	private static String virtua_host;
	private static String queue_name;
	/**
	 * @return the host
	 */
	public static String getHost() {
		return host;
	}
	/**
	 * @param host the host to set
	 */
	public static void setHost(String host) {
		MqConfig.host = host;
	}
	/**
	 * @return the port
	 */
	public static String getPort() {
		return port;
	}
	/**
	 * @param port the port to set
	 */
	public static void setPort(String port) {
		MqConfig.port = port;
	}
	/**
	 * @return the username
	 */
	public static String getUsername() {
		return username;
	}
	/**
	 * @param username the username to set
	 */
	public static void setUsername(String username) {
		MqConfig.username = username;
	}
	/**
	 * @return the password
	 */
	public static String getPassword() {
		return password;
	}
	/**
	 * @param password the password to set
	 */
	public static void setPassword(String password) {
		MqConfig.password = password;
	}
	/**
	 * @return the virtua_host
	 */
	public static String getVirtua_host() {
		return virtua_host;
	}
	/**
	 * @param virtua_host the virtua_host to set
	 */
	public static void setVirtua_host(String virtua_host) {
		MqConfig.virtua_host = virtua_host;
	}
	/**
	 * @return the queue_name
	 */
	public static String getQueue_name() {
		return queue_name;
	}
	/**
	 * @param queue_name the queue_name to set
	 */
	public static void setQueue_name(String queue_name) {
		MqConfig.queue_name = queue_name;
	}
	
}

消息持久化,MQ服务器重启后保证队列不消失。

发送消息:

/**

 * FileName:     MQMain.java

 * @Description: TODO(用一句话描述该文件做什么)

 * All rights Reserved, Designed By CNKI

 * Copyright:    Copyright(C) 2018-2100

 * Company       CNKI.

 * @author:    SongBin

 * @version    V1.0 

 * Createdate:2018年7月20日 上午9:51:10

 *

 * Modification  History:

 * Date         Author        Version        Discription

 * -----------------------------------------------------------------------------------

 * 2018年7月20日       SongBin          1.0             1.0

 * Why & What is modified: <修改原因描述>

 */
package cnki.bdms.base;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import cnki.bdms.web.ui.config.MqConfig;

/**
 * @ClassName: MqSender
 * @Description: (这里用一句话描述这个类的作用)
 * @author: SongBin
 * @date: 2018年7月20日 上午9:51:10
 */

public class MqSender {  
    private final static String QUEUE_NAME = MqConfig.getQueue_name();  
      
    public static void main(String[] args) {  
        send("my first message .....");  
    }  
      
    public static void send(String message)  
    {  
        ConnectionFactory factory = null;  
        Connection connection = null;  
        Channel channel = null;  
        try {  
            factory = new ConnectionFactory();  
            factory.setHost(MqConfig.getHost());  
            factory.setPort(Integer.parseInt(MqConfig.getPort())); //默认端口号
            factory.setUsername(MqConfig.getUsername());//默认用户名
            factory.setPassword(MqConfig.getPassword());//默认密码
            connection = factory.newConnection();  
            channel = connection.createChannel();  
            //第二个参数durable=True,表示将队列设置持久化,这样可以确保在RabbitMq重启之后queue_declare队列不会丢失。
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            /*消息持久化:将delivery_mode的属性设为2*/
            BasicProperties props = null;//new BasicProperties(null, null, null, 2, null, null, null, null, null, null, null, null, null, null);
            channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));  
            System.out.println("已经发送消息....."+message);
        } catch (IOException e) {  
            e.printStackTrace();
        } catch (TimeoutException e) {  
            e.printStackTrace();  
        }finally{  
            try {  
                //关闭资源  
                channel.close();  
                connection.close();  
            } catch (IOException e) {  
                e.printStackTrace();  
            } catch (TimeoutException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
}  

消费(接收)消息:

/**

 * FileName:     MqReceiver.java

 * @Description: TODO(用一句话描述该文件做什么)

 * All rights Reserved, Designed By CNKI

 * Copyright:    Copyright(C) 2018-2100

 * Company       CNKI.

 * @author:    SongBin

 * @version    V1.0 

 * Createdate:2018年7月20日 下午2:32:44

 *

 * Modification  History:

 * Date         Author        Version        Discription

 * -----------------------------------------------------------------------------------

 * 2018年7月20日       SongBin          1.0             1.0

 * Why & What is modified: <修改原因描述>

 */
package cnki.bdms.base;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import cnki.bdms.web.ui.config.MqConfig;

/**
 * @ClassName: MqReceiver
 * @Description: (这里用一句话描述这个类的作用)
 * @author: SongBin
 * @date: 2018年7月20日 下午2:32:44
 */
public class MqReceiver {
	private final static String QUEUE_NAME = MqConfig.getQueue_name();
	
	public static void main(String[] args) {
		receive();
	}
	
	public static void receive()
	{
		ConnectionFactory factory = null;  
        Connection connection = null;  
        Channel channel = null; 
		
		try {
			factory = new ConnectionFactory();  
            factory.setHost(MqConfig.getHost());  
            factory.setPort(Integer.parseInt(MqConfig.getPort())); //默认端口号
            factory.setUsername(MqConfig.getUsername());//默认用户名
            factory.setPassword(MqConfig.getPassword());//默认密码
            connection = factory.newConnection();  
            channel = connection.createChannel();
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			Consumer consumer = new DefaultConsumer(channel){
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
						byte[] body) throws IOException {
					System.out.println("11111111111");
					String message = new String(body, "UTF-8");
					System.out.println("收到消息....."+message);
				}};
			channel.basicConsume(QUEUE_NAME, true,consumer);
		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		}finally{
			try {
				//关闭资源
				channel.close();
				connection.close();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			}
		}
	}
}

 

 

更多内容请访问:IT源点

相关文章推荐

全部评论: 0

    我有话说: