配置文件设置:
spring:
# rabbitmq config
rabbitmq:
host: 192.168.25.202 #192.168.105.88
port: 5672
username: guest #lmz8957@cnki.net
password: guest #limingzhi
virtua_host: /
queue_name: getDataanalyseidsQueue
读取Spring Boot配置信息
/**
* FileName: MqConfig.java
* @Description: TODO(用一句话描述该文件做什么)
* All rights Reserved, Designed By CNKI
* Copyright: Copyright(C) 2018-2100
* Company CNKI.
* @author: SongBin
* @version V1.0
* Createdate:2018年7月20日 上午10:23:00
*
* Modification History:
* Date Author Version Discription
* -----------------------------------------------------------------------------------
* 2018年7月20日 SongBin 1.0 1.0
* Why & What is modified: <修改原因描述>
*/
package cnki.bdms.web.ui.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @ClassName: MqConfig
* @Description: (消息队列配置信息)
* @author: SongBin
* @date: 2018年7月20日 上午10:23:00
*/
@Component
@ConfigurationProperties(prefix="spring.rabbitmq")
public class MqConfig {
private static String host;
private static String port;
private static String username;
private static String password;
private static String virtua_host;
private static String queue_name;
/**
* @return the host
*/
public static String getHost() {
return host;
}
/**
* @param host the host to set
*/
public static void setHost(String host) {
MqConfig.host = host;
}
/**
* @return the port
*/
public static String getPort() {
return port;
}
/**
* @param port the port to set
*/
public static void setPort(String port) {
MqConfig.port = port;
}
/**
* @return the username
*/
public static String getUsername() {
return username;
}
/**
* @param username the username to set
*/
public static void setUsername(String username) {
MqConfig.username = username;
}
/**
* @return the password
*/
public static String getPassword() {
return password;
}
/**
* @param password the password to set
*/
public static void setPassword(String password) {
MqConfig.password = password;
}
/**
* @return the virtua_host
*/
public static String getVirtua_host() {
return virtua_host;
}
/**
* @param virtua_host the virtua_host to set
*/
public static void setVirtua_host(String virtua_host) {
MqConfig.virtua_host = virtua_host;
}
/**
* @return the queue_name
*/
public static String getQueue_name() {
return queue_name;
}
/**
* @param queue_name the queue_name to set
*/
public static void setQueue_name(String queue_name) {
MqConfig.queue_name = queue_name;
}
}
消息持久化,MQ服务器重启后保证队列不消失。
发送消息:
/**
* FileName: MQMain.java
* @Description: TODO(用一句话描述该文件做什么)
* All rights Reserved, Designed By CNKI
* Copyright: Copyright(C) 2018-2100
* Company CNKI.
* @author: SongBin
* @version V1.0
* Createdate:2018年7月20日 上午9:51:10
*
* Modification History:
* Date Author Version Discription
* -----------------------------------------------------------------------------------
* 2018年7月20日 SongBin 1.0 1.0
* Why & What is modified: <修改原因描述>
*/
package cnki.bdms.base;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import cnki.bdms.web.ui.config.MqConfig;
/**
* @ClassName: MqSender
* @Description: (这里用一句话描述这个类的作用)
* @author: SongBin
* @date: 2018年7月20日 上午9:51:10
*/
public class MqSender {
private final static String QUEUE_NAME = MqConfig.getQueue_name();
public static void main(String[] args) {
send("my first message .....");
}
public static void send(String message)
{
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost(MqConfig.getHost());
factory.setPort(Integer.parseInt(MqConfig.getPort())); //默认端口号
factory.setUsername(MqConfig.getUsername());//默认用户名
factory.setPassword(MqConfig.getPassword());//默认密码
connection = factory.newConnection();
channel = connection.createChannel();
//第二个参数durable=True,表示将队列设置持久化,这样可以确保在RabbitMq重启之后queue_declare队列不会丢失。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/*消息持久化:将delivery_mode的属性设为2*/
BasicProperties props = null;//new BasicProperties(null, null, null, 2, null, null, null, null, null, null, null, null, null, null);
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
System.out.println("已经发送消息....."+message);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
try {
//关闭资源
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
消费(接收)消息:
/**
* FileName: MqReceiver.java
* @Description: TODO(用一句话描述该文件做什么)
* All rights Reserved, Designed By CNKI
* Copyright: Copyright(C) 2018-2100
* Company CNKI.
* @author: SongBin
* @version V1.0
* Createdate:2018年7月20日 下午2:32:44
*
* Modification History:
* Date Author Version Discription
* -----------------------------------------------------------------------------------
* 2018年7月20日 SongBin 1.0 1.0
* Why & What is modified: <修改原因描述>
*/
package cnki.bdms.base;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cnki.bdms.web.ui.config.MqConfig;
/**
* @ClassName: MqReceiver
* @Description: (这里用一句话描述这个类的作用)
* @author: SongBin
* @date: 2018年7月20日 下午2:32:44
*/
public class MqReceiver {
private final static String QUEUE_NAME = MqConfig.getQueue_name();
public static void main(String[] args) {
receive();
}
public static void receive()
{
ConnectionFactory factory = null;
Connection connection = null;
Channel channel = null;
try {
factory = new ConnectionFactory();
factory.setHost(MqConfig.getHost());
factory.setPort(Integer.parseInt(MqConfig.getPort())); //默认端口号
factory.setUsername(MqConfig.getUsername());//默认用户名
factory.setPassword(MqConfig.getPassword());//默认密码
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println("11111111111");
String message = new String(body, "UTF-8");
System.out.println("收到消息....."+message);
}};
channel.basicConsume(QUEUE_NAME, true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally{
try {
//关闭资源
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
更多内容请访问:IT源点
注意:本文归作者所有,未经作者允许,不得转载