本文最后更新于:2024年4月16日 下午
SpringBoot作为目前后端开发的主流框架,基本上系统都有应用,并且它对其他很多框架提供了非常优秀的集成。
新建一个springboot工程
在Messaging选项中勾选RabbitMQ,之后卡一看到mavenpom文件中依赖spring-boot-starter-amqp
,它继承了amqp-client
依赖实现了amqp协议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
|
修改yaml配置文件:
1 2 3 4 5 6 7
| spring: rabbitmq: host: 192.168.204.127 port: 5672 username: guest password: guest virtual-host: /
|
常用的RabbitMQ操作类有RabbitAdmin、RabbitTemplate,其中使用比较广泛的是RabbitTemplate,这里整合的也是以RabbitTemplate使用为主。它提供了编辑、发送、监听消息等一系列功能,通过RabbitTemplate,可以在Springboot中像操作原生API一样进行消息的发送监听操作
先按照之前的最基础模式,测试Springboot集成RabbitMQ的基础操作API
Hello World
简单模式就是最基础的消息队列,它简化了其他操作,仅模拟生产者、队列、消费者三个部分
- 创建三个对象的对应类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class SimpleSender { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class); private static final String queueName = "simple.hello";
@Autowired private RabbitTemplate template; public void send() { String message = "Hello World!"; this.template.convertAndSend(queueName, message); LOGGER.info(" [x] Sent '{}'", message); } }
public class SimpleReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);
@RabbitHandler public void receive(String in) { LOGGER.info(" [x] Received '{}'", in); } }
@Configuration public class SimpleRabbitConfig { @Bean public Queue hello() { return new Queue("simple.hello"); }
@Bean public SimpleSender simpleSender() { return new SimpleSender(); }
@Bean public SimpleReceiver simpleReceiver() { return new SimpleReceiver(); } }
|
- 创建controller进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @RestController @RequestMapping("/rabbit") public class RabbitController {
@Autowired private SimpleSender simpleSender;
@GetMapping(value = "/simple") public void simpleTest() throws InterruptedException { for(int i=0;i<10;i++){ simpleSender.send(); Thread.sleep(1000); } } }
|

Work Queue
WorkQueue模式将多个消费者绑定到一个队列中,默认采取轮询方式分发消息
操作可以参照上面
- 创建对应的对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| public class WorkSender { public static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class); public static final String QUEUE_NAME = "work"; @Autowired private RabbitTemplate rabbitTemplate; public void send(int index){ StringBuilder builder = new StringBuilder("Hello"); int limitIndex = index % 3+1; for (int i = 0; i < limitIndex; i++) { builder.append('.'); } builder.append(index+1); String message = builder.toString(); rabbitTemplate.convertAndSend(QUEUE_NAME, message); LOGGER.info(" [x] Sent '{}'", message); } }
@RabbitListener(queues = "work") public class WorkReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class); private final int instance; public WorkReceiver(int i) { this.instance = i; } @RabbitHandler public void receive(String in) throws InterruptedException { StopWatch watch = new StopWatch(); watch.start(); LOGGER.info("instance {} [x] Received '{}'", this.instance, in); doWork(in); watch.stop(); LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds()); } private void doWork(String in) throws InterruptedException { for (char ch : in.toCharArray()) { if (ch == '.') { Thread.sleep(1000); } } } }
@Configuration public class WorkRabbitConfig { @Bean public Queue workQueue() { return new Queue("work"); } @Bean public WorkReceiver workReceiver1() { return new WorkReceiver(1); } @Bean public WorkReceiver workReceiver2() { return new WorkReceiver(2); } @Bean public WorkSender workSender() { return new WorkSender(); } }
|
- 编写controller,调用测试
1 2 3 4 5 6 7
| @GetMapping(value = "/work") public void workTest() throws InterruptedException { for (int i = 0; i < 10; i++) { workSender.send(i); Thread.sleep(1000); } }
|