We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
双击运行otp_win64_24.3.4.6.exe,点击下一步下一步即可,可以更改安装位置,需要注意路径不要有中文或空格
他的作用类似JDK,因为rabbitMQ是erlang语言开发的,所以需要环境支持
配置环境变量,在path中配置: 自己安装erlang的路径\bin
双击运行rabbitmq-server-3.10.1.exe,下一步即可,安装好后进入以下目录 在该目录下打开dos窗口,输入以下运行命令
rabbitmq-plugins enable rabbitmq_management
启动结束后,访问:http://localhost:15672 用户名和密码都是:guest
接受生产者发送的消息,并根据binding规则将消息路由给服务器中的队列。
ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的类型有三种:direct、fanout和topic。
消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到对应的queue则数据会丢失),等待消费者来取。
它表示的是Exchange和Message Queue是通过binding key进行联系的,这个关系是固定。
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与ExchangeType及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里
创建一个RabbitMQ的项目,用来学习rabbitmq,需要添加依赖
<!--添加RabbitMQ依赖--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.12.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> </dependency>
只有一个消费者
package cn.tedu.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //模拟简单模式的生产者 public class Producer { public static void main(String[] args)throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 /* 第一个参数:队列名称 第二个参数:是否是一个持久队列 第三个参数:是否是一个独占队列 第四个参数:是否自动删除 第五个参数:其他参数属性的设置 */ cc.queueDeclare("helloworld",false,false,false,null); //4.准备数据 String msg = "hello world" + System.currentTimeMillis(); //5.发送数据 /* 第一个参数:先忽略 第二个参数:先写成队列名称 第三个参数:其他属性设置 第四个参数:消息数据,需要转成byte[]类型 */ cc.basicPublish("","helloworld",null,msg.getBytes()); nc.close(); } }
package cn.tedu.rabbitmq.simple; import com.rabbitmq.client.*; import java.io.IOException; //简单模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 //如果该队列已经在服务器中存在,那么会忽略创建该队列的命令 cc.queueDeclare("helloworld",false,false,false,null); //4.消费消息 //处理数据的回调函数 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); System.out.println("==========================="); } }; //取消接收数据的回调函数 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume("helloworld",true,deliverCallback,cancelCallback); } }
多个消费者,从同一个队列中接受消息
负载均衡,消息会轮询发送给所有消费者
合理的分发消息
手动ack
消息回执
向服务器发送一个通知,告诉服务器一条消息已经处理完毕
服务器可以通过ack,知道消费者是空闲还是繁忙
qos=1
每次抓取的消息数量
消息处理完毕之前,不会抓取新消息
手动ack模式下才有效
消息持久化:在服务器端,持久保存消息,避免因为服务器宕机而造成消息丢失
package cn.tedu.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; //模拟工作队列模式的生产者 public class Producer { public static void main(String[] args)throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 /* 第一个参数:队列名称 第二个参数:是否是一个持久队列 第三个参数:是否是一个独占队列 第四个参数:是否自动删除 第五个参数:其他参数属性的设置 */ cc.queueDeclare("work_queue",false,false,false,null); //4.准备数据,发送数据 while(true){ System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); if (msg.equals("quit")) break; cc.basicPublish("","work_queue",null,msg.getBytes()); } nc.close(); } }
package cn.tedu.rabbitmq.work; import com.rabbitmq.client.*; import java.io.IOException; //工作队列模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 //如果该队列已经在服务器中存在,那么会忽略创建该队列的命令 cc.queueDeclare("work_queue",false,false,false,null); //4.消费消息 //处理数据的回调函数 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); //处理消息的时候,遇到一个‘.’,暂停1s,模拟实际处理消息的阻塞场景 for (int i = 0; i < msg.length(); i++) { //charAt(i) if(msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("消息处理完毕"); } }; //取消接收数据的回调函数 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume("work_queue",true,deliverCallback,cancelCallback); } }
package cn.tedu.rabbitmq.work; import com.rabbitmq.client.*; import java.io.IOException; //工作队列模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.创建队列 //如果该队列已经在服务器中存在,那么会忽略创建该队列的命令 cc.queueDeclare("work_queue",false,false,false,null); //4.消费消息 //处理数据的回调函数 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到了消息:"+msg); //处理消息的时候,遇到一个‘.’,暂停1s,模拟实际处理消息的阻塞场景 for (int i = 0; i < msg.length(); i++) { //charAt(i) if(msg.charAt(i)=='.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("消息处理完毕"); //发送消息确认(消息回执) //第一个参数:消息的标签,需要从消息中获取 //第二个参数:是否确认多条信息,false,只确认一条消息 cc.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } }; //取消接收数据的回调函数 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; //第二个参数:ack的设置,默认是autoAck,false是手动ack //设置消费者每次只获取一条消息 cc.basicQos(1); cc.basicConsume("work_queue",false,deliverCallback,cancelCallback); } }
如果消息保存在服务器端的内存中,是不安全的,如果服务器宕机,信息会丢失
停止rabbitmq服务:rabbitmq-service stop或者rabbitmqctl stop 启动rabbitmq服务:rabbitmq-service start
消息数据持久化、消息队列持久化
但是需要注意持久化队列时,不能直接修改队列属性,因为队列一定创建属性就固定了,不可以进行修改
可以选择新建一个别名队列或者删除该队列重新创建
队列持久化:cc.queueDeclare("task_queue",true,false,false,null);
数据持久化:cc.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
把消息群发给所有消费者,同一条消息所有消费者都可以收到
fanout类型的交换机
生产者:定义交换机,向交换机发送数据
消费者:定义交换机,定义自已独占的非持久的自动删除随机队列,绑定,正常从随机队列中接受数据
package cn.tedu.rabbitmq.publishsubscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; //模拟发布订阅模式的生产者 public class Producer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义交换机 //如果服务器中没有,就创建,有就直接使用 //第一个参数:交换机名称 //第二个参数:交换机类型 cc.exchangeDeclare("ps_exchange","fanout"); //4.向交换机发送数据,交换机只是接受发送数据,并不保存数据 //如果没有消费者接受,数据会丢失 while (true){ System.out.print("输入消息:"); String msg = new Scanner(System.in).nextLine(); if (msg.equals("quit")) break; cc.basicPublish("ps_exchange","",null,msg.getBytes()); } nc.close(); } }
package cn.tedu.rabbitmq.publishsubscribe; import com.rabbitmq.client.*; import java.io.IOException; //发布订阅模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义交换机 //如果服务器中没有,就创建,有就直接使用 //第一个参数:交换机名称 //第二个参数:交换机类型 cc.exchangeDeclare("ps_exchange","fanout"); //4.定义队列--随机队列名,否持久,独占、自动删除的 String queue = cc.queueDeclare().getQueue();//无参构造就可以满足我们的需求 //5.绑定交换机和队列 /* 第一个参数:队列名称 第二个参数:交换机名称 第三个参数:队列和交换机绑定的关系 */ cc.queueBind(queue,"ps_exchange",""); //6.处理接收到的消息 //处理数据 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { byte[] body = delivery.getBody(); String msg = new String(body); System.out.println("收到:"+msg); System.out.println("消息处理完毕"); } }; //取消数据时的回调函数 CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
通过关键字匹配,来决定把消息发送到哪个队列
生产者:定义direct类型的交换机,向交换机发送数据并携带路由键
消费者:定义交换机,定义队列,用绑定键来绑定交换机和队列,正常接收消息
package cn.tedu.rabbitmq.route; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; //模拟路由模式的生产者 public class Producer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义direct类型的交换机 cc.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT); //4.发送消息,携带路由键 while(true){ Scanner scanner = new Scanner(System.in); System.out.print("输入消息:"); String msg = scanner.nextLine(); System.out.print("路由键:"); String routingKey = scanner.nextLine(); if(msg.equals("quit")) break; cc.basicPublish("route_exchange",routingKey,null,msg.getBytes()); System.out.println("消息发送成功"); } nc.close(); } }
package cn.tedu.rabbitmq.route; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Scanner; //模拟路由模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义direct类型的交换机 cc.exchangeDeclare("route_exchange", BuiltinExchangeType.DIRECT); //4.定义队列 String queue = cc.queueDeclare().getQueue(); //5.绑定交换机和队列(重复绑定多次) System.out.print("输入绑定键,用空格隔开:"); String bindingKeys = new Scanner(System.in).nextLine(); String[] keys = bindingKeys.split("\\s+"); for (String bindingKey : keys) { cc.queueBind(queue,"route_exchange",bindingKey); } //处理数据 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { String msg = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(msg+"-"+routingKey); System.out.println("===================="); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
和路由模式相同,具有特殊的关键字规则
topic类型的交换机实现这种特殊路由规则
aaa.bbb.ccc.ddd
*.ccc.ddd.eee
#.ddd
"*"可以通配单个单词
"#"可以通配零个或多个单词
package cn.tedu.rabbitmq.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Scanner; //模拟主题模式的生产者 public class Producer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义direct类型的交换机 cc.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC); //4.发送消息,携带路由键 while(true){ Scanner scanner = new Scanner(System.in); System.out.print("输入消息:"); String msg = scanner.nextLine(); System.out.print("路由键:"); String routingKey = scanner.nextLine(); if(msg.equals("quit")) break; cc.basicPublish("topic_exchange",routingKey,null,msg.getBytes()); System.out.println("消息发送成功"); } nc.close(); } }
package cn.tedu.rabbitmq.topic; import com.rabbitmq.client.*; import java.io.IOException; import java.util.Scanner; //模拟主题模式的消费者 public class Consumer { public static void main(String[] args) throws Exception{ //1.配置RabbitMQ的连接信息 ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); //2.创建连接以及channel Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //3.定义direct类型的交换机 cc.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC); //4.定义队列 String queue = cc.queueDeclare().getQueue(); //5.绑定交换机和队列(重复绑定多次) System.out.print("输入绑定键,用空格隔开:"); String bindingKeys = new Scanner(System.in).nextLine(); String[] keys = bindingKeys.split("\\s+"); for (String bindingKey : keys) { cc.queueBind(queue,"topic_exchange",bindingKey); } //处理数据 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { String msg = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(msg+"-"+routingKey); System.out.println("===================="); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(queue,true,deliverCallback,cancelCallback); } }
实现原理
两个队列
调用队列
返回队列:每个客户端,都需要有自己的返回队列
返回队列的队列名
在调用消息数据中,携带返回队列名
根据返回队列名,向正确的返回队列来发送计算结果
关联ID
用来匹配计算结果和调用
如果连续发送多次调用,获得计算结果时,需要直到这个节骨欧式按一次调用的结果
客户端发送调用时,携带一个关联ID
服务器端返回结果时,也携带这个关联ID
客户端:多线程异步处理结果
主线程
发送调用信息
需要计算结果时,要取结果
次线程:等待接受结果
线程之间传递数据,可以使用BlockingQueue
集合工具
这个集合中,添加了线程的等待和通知
如果没有数据,取数据时会暂停等待
有多个子类:比如ArrayBlockingQueue
package cn.tedu.rabbitmq.rpc; import com.rabbitmq.client.*; import com.rabbitmq.client.AMQP.BasicProperties; import java.io.IOException; public class RPCServer { public static void main(String[] args) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //1.接受客户端发送的调用信息(正整数n) //2.执行算法求第n个斐波那契数 //3.向客户端发送计算结果 //定义调用队列 cc.queueDeclare("rpc_queue",false,false,false,null); //从调用队列中取调用信息 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { //从delivery取出正整数n String msg = new String(delivery.getBody()); String replyTo = delivery.getProperties().getReplyTo();//返回队列名 String correlationId = delivery.getProperties().getCorrelationId();//关联id long fbnqs = fbnqs(Integer.parseInt(msg)); BasicProperties basicProperties = new BasicProperties.Builder().correlationId(correlationId).build(); cc.basicPublish("",replyTo,basicProperties,(""+fbnqs).getBytes()); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume("rpc_queue",true,deliverCallback,cancelCallback); } //服务:求第n个斐波那契数 //1 1 2 3 5 8 13 21 34 55 ...... //递归效率低,可以用来模拟服务器端的耗时运算 static long fbnqs(int n){ if(n==1 || n==2) return 1; return fbnqs(n-1)+fbnqs(n-2); } }
package cn.tedu.rabbitmq.rpc; import com.rabbitmq.client.*; import com.rabbitmq.client.AMQP.BasicProperties; import java.io.IOException; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class RPCClient { static BlockingQueue<Long> q = new ArrayBlockingQueue<Long>(10); public static void main(String[] args) throws Exception{ System.out.print("输入求第几个斐波那契数:"); int n = new Scanner(System.in).nextInt(); long fbnqs = fbnqs(n); System.out.println("第"+n+"个的斐波那契数是:"+fbnqs); } //异步调用服务器,从服务器中获取结果 private static long fbnqs(int n) throws Exception{ ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672);//15672是UI端口,5672是客户端端口(可以省略) cf.setUsername("guest"); cf.setPassword("guest"); Connection nc = cf.newConnection(); Channel cc = nc.createChannel(); //定义调用队列 cc.queueDeclare("rpc_queue",false,false,false,null); //返回队列 String replyTo = cc.queueDeclare().getQueue(); //关联id String cid = UUID.randomUUID().toString(); BasicProperties basicProperties = new BasicProperties.Builder() .replyTo(replyTo) .correlationId(cid) .build(); cc.basicPublish("","rpc_queue",basicProperties,(""+n).getBytes()); //模拟执行其他运算,不等待计算结果 System.out.println("调用消息已经发送"); System.out.println("模拟执行其他运算,不立即等待计算结果"); //获取计算结果 DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { //处理数据之前,先对比关联id if(cid.equals(delivery.getProperties().getCorrelationId())){ String msg = new String(delivery.getBody()); long fbnqs = Integer.parseInt(msg); q.offer(fbnqs); cc.getConnection().close(); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String s) throws IOException { } }; cc.basicConsume(replyTo,true,deliverCallback,cancelCallback); return q.take(); } }
The text was updated successfully, but these errors were encountered:
ZhuangRenyang
No branches or pull requests
消息队列-RabbitMQ
安装rabbitMQ
双击运行otp_win64_24.3.4.6.exe,点击下一步下一步即可,可以更改安装位置,需要注意路径不要有中文或空格
他的作用类似JDK,因为rabbitMQ是erlang语言开发的,所以需要环境支持
配置环境变量,在path中配置: 自己安装erlang的路径\bin
![image-20230224202431336](https://user-images.githubusercontent.com/116170751/224718467-e3a829a7-7ffa-4394-9354-681a22a95ee2.png)
![image-20230224202301804](https://user-images.githubusercontent.com/116170751/224718880-ad0d62e7-a5df-4367-baea-9ba2134240a5.png)
双击运行rabbitmq-server-3.10.1.exe,下一步即可,安装好后进入以下目录
![image-20230224202733157](https://user-images.githubusercontent.com/116170751/224718895-857ebc8a-75e5-4b26-a5f2-3372cd563e55.png)
在该目录下打开dos窗口,输入以下运行命令
启动结束后,访问:http://localhost:15672
![image-20230224203737805](https://user-images.githubusercontent.com/116170751/224719020-a93cac79-bdfd-4403-8f48-195b088dd2d9.png)
![image-20230224203828259](https://user-images.githubusercontent.com/116170751/224719135-db5ec6d4-c967-43b3-aee8-db90dabd60a3.png)
用户名和密码都是:guest
rabbitMQ的基本概念
Exchange
接受生产者发送的消息,并根据binding规则将消息路由给服务器中的队列。
ExchangeType决定了Exchange路由消息的行为。在RabbitMQ中,ExchangeType常用的类型有三种:direct、fanout和topic。
Message Queue
消息队列。我们发送给RabbitMQ的消息最后都会到达各种queue,并且存储在其中(如果路由找不到对应的queue则数据会丢失),等待消费者来取。
Binding Key
它表示的是Exchange和Message Queue是通过binding key进行联系的,这个关系是固定。
Routing Key
生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则。这个routing key需要与ExchangeType及binding key联合使用才能生效,我们的生产者只需要通过指定routing key来决定消息流向哪里
RabbitMQ六种工作模式
创建一个RabbitMQ的项目,用来学习rabbitmq,需要添加依赖
简单模式
只有一个消费者
![image-20230224212545820](https://user-images.githubusercontent.com/116170751/224719650-ef438e1d-bb28-41d9-abaf-477401e7f415.png)
生产者
消费者
工作队列模式
多个消费者,从同一个队列中接受消息
负载均衡,消息会轮询发送给所有消费者
合理的分发消息
手动ack
消息回执
向服务器发送一个通知,告诉服务器一条消息已经处理完毕
服务器可以通过ack,知道消费者是空闲还是繁忙
qos=1
每次抓取的消息数量
消息处理完毕之前,不会抓取新消息
手动ack模式下才有效
消息持久化:在服务器端,持久保存消息,避免因为服务器宕机而造成消息丢失
生产者
消费者
合理分发
持久化
如果消息保存在服务器端的内存中,是不安全的,如果服务器宕机,信息会丢失
停止rabbitmq服务:rabbitmq-service stop或者rabbitmqctl stop 启动rabbitmq服务:rabbitmq-service start
消息数据持久化、消息队列持久化
但是需要注意持久化队列时,不能直接修改队列属性,因为队列一定创建属性就固定了,不可以进行修改
可以选择新建一个别名队列或者删除该队列重新创建
队列持久化:cc.queueDeclare("task_queue",true,false,false,null);
数据持久化:cc.basicPublish("","task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
发布订阅模式
把消息群发给所有消费者,同一条消息所有消费者都可以收到
fanout类型的交换机
生产者:定义交换机,向交换机发送数据
消费者:定义交换机,定义自已独占的非持久的自动删除随机队列,绑定,正常从随机队列中接受数据
生产者
消费者
路由模式
通过关键字匹配,来决定把消息发送到哪个队列
生产者:定义direct类型的交换机,向交换机发送数据并携带路由键
消费者:定义交换机,定义队列,用绑定键来绑定交换机和队列,正常接收消息
生产者
消费者
主题模式
和路由模式相同,具有特殊的关键字规则
topic类型的交换机实现这种特殊路由规则
aaa.bbb.ccc.ddd
*.ccc.ddd.eee
#.ddd
"*"可以通配单个单词
"#"可以通配零个或多个单词
生产者
消费者
RPC模式
实现原理
两个队列
调用队列
返回队列:每个客户端,都需要有自己的返回队列
返回队列的队列名
在调用消息数据中,携带返回队列名
根据返回队列名,向正确的返回队列来发送计算结果
关联ID
用来匹配计算结果和调用
如果连续发送多次调用,获得计算结果时,需要直到这个节骨欧式按一次调用的结果
客户端发送调用时,携带一个关联ID
服务器端返回结果时,也携带这个关联ID
客户端:多线程异步处理结果
主线程
发送调用信息
需要计算结果时,要取结果
次线程:等待接受结果
线程之间传递数据,可以使用BlockingQueue
集合工具
这个集合中,添加了线程的等待和通知
如果没有数据,取数据时会暂停等待
有多个子类:比如ArrayBlockingQueue
服务器
客户端
The text was updated successfully, but these errors were encountered: