开发自救手册 开发自救手册
首页
  • Java指南

    • 手册的初衷以及适用人群
    • 介绍
    • JAVA涉及技术总览
  • 技术场景

    • 线上代码覆盖率监控
  • cpp自救指南
  • go指南
  • 真正的面经系列
  • 经历分享
  • 相关文章

    • 25届实习内推
  • 微服务
友情链接
  • 团队介绍
  • 如何加入
GitHub (opens new window)
首页
  • Java指南

    • 手册的初衷以及适用人群
    • 介绍
    • JAVA涉及技术总览
  • 技术场景

    • 线上代码覆盖率监控
  • cpp自救指南
  • go指南
  • 真正的面经系列
  • 经历分享
  • 相关文章

    • 25届实习内推
  • 微服务
友情链接
  • 团队介绍
  • 如何加入
GitHub (opens new window)
  • 微服务

    • 微服务介绍
    • EureKa注册中心
    • Ribbon负载均衡
    • Nacos注册中心、配置中心
    • OpenFeign远程调用
    • Gateway统一网关
    • Docker
    • DockerCompose
    • RabbitMQ
      • 同步通讯和异步通讯
        • 同步调用的问题
        • 异步调用方案
        • 我的理解
      • 什么是MQ
      • RabbitMQ概述
      • 常见消息模型
      • SpringAMQP(大大简化api)
      • WorkQueue 工作队列(模型)
      • 发布订阅模型介绍
        • 发布订阅-Fanout Exchange
        • 发布订阅-DirectExchange
        • 发布订阅-TopicExchange
      • 消息转换器
    • Sentinel微服务保护
  • 八股文
  • 微服务
ethandu
2024-04-27
目录

RabbitMQ

# 初识MQ

# 同步通讯和异步通讯

image-20220213224214526

# 同步调用的问题

image-20220213224605926

image-20220213224725874

每加一次需求代码都需要修改

再加上是同步调用 用户必须等待订单服务完成才能执行仓储服务执行完才能执行短信服务 总耗时500ms很恐怖 性能太差

在整个过程中有很多资源的浪费 要是卡住在仓储服务支付服务就渐渐被耗尽了

image-20220213224950164

image-20220213225025676

总结:

image-20220213225114871

# 异步调用方案

image-20220213225252153

优势一:服务解耦

现在加需求就不需要再修改支付服务里面的代码 例如添加了优惠卷服务 因为我们呢现在支付服务不负责调用服务了 而是发消息 那么后面添加服务就只需要订阅事件就好了

解除服务的话我们也就取消订阅就ok了

优势二:性能提升,吞吐量提高

image-20220213225623644

优势三:服务没有强依赖,不担心级联失败问题

image-20220213225654961

优势四:流量消峰

image-20220213225826285

总结:

image-20220213225941732

# 我的理解

传统的微服务调用用的是openFeign之类的通信手段,这个实际上在不对代码做处理的时候是同步的。

但是我们的消息队列是可以声明“监听器”,监听到消息队列中有消息就执行业务逻辑的。这个实际上就是一种响应式的架构。不需要我们主动的去通信等待另一个微服务执行完毕,而是发个消息给消息队列,而微服务自己监听到消息就执行逻辑。

这种响应式的架构实际上就是异步的,非阻塞的。快速的。

# 什么是MQ

image-20220213230503194

# RabbitMQ概述

image-20220213230651293

image-20220213231023725

一般来说我创建一个用户他有自己的虚拟主机 然后我再创建一个用户他还是有一个虚拟主机 虚拟主机之间是相互隔离的互不干扰

总结:

image-20220213231145170

# 常见消息模型

image-20220213231317415

image-20220213231355226

image-20220214200232288

要向mq发送消息必须要和它先建立连接 用到连接工厂

image-20220310215759080

这边有itcast的虚拟主机的访问权

image-20220310215818996

创建通道

image-20220310215932324

image-20220310215949632

有了通道可以向队列里面发送消息了

那么我们需要的是创建一个队列

image-20220310220104132

image-20220310220124657

生产者向队列里面发送消息

image-20220310220114996

image-20220310220210879

image-20220310220230202

这个时候我们的发送者已经任务结束了 连接都断开了, 这样就解除耦合了 变成异步了

image-20220310220356907

接收消息的是Consumer

同样的是先创建连接

image-20220310220828540

创建通道 创建队列(这里创建的原因是因为 我们的生产者和消费者启动的顺序是不确定的 万一消费者先启动找不到队列 )

image-20220310220910809

然后就是接收消息了 里面的匿名内部类的对象的方法是我们的处理操作(handleDelivery) 我们把这个行为给挂在了队列上 一旦有消息它就会进行操作处理 最后一个参数byte[] body就是消息体

image-20220310221130096

这是一个异步的机制,这行代码(接收到消息)在等待接收消息后面打印 该处理消息处理消息 我还是继续去执行我后面的业务去了

image-20220310221608148

总结

image-20220310221730227

# SpringAMQP(大大简化api)

大大的简化我们的api

image-20220310221844454

AMQP: 是消息队列的规范

SpringAMQP:是Spring对AMQP的实现(就像是redis里面spring提供的模板)

官网讲的特征:

image-20220310222038096

SpringAMQP实现HelloWorld种的基础消息队列功能

流程如下:

image-20220310222214794

image-20220310222518608

image-20220310222543355

总结

image-20220310222928585

子啊Consumer中编写消费逻辑,监听simple.queue 接收的是String类型的消息 将来都是Spring帮助我们完成任务 十分的优雅

image-20220310223013568

总结

image-20220310223424112

# WorkQueue 工作队列(模型)

image-20220310223600783

队列后面跟了两个消费者,消息将来给谁?

rabbitmq中消息是阅后即焚

消息一旦让消费者1消费了消费者2就肯定是拿不到了。

两个消费者实际上是合作关系,共同处理。

假设只有一个消费者,它的处理速度是:40条消息/s,但是发布者却每s发布了50条。这个就暂时是搞不定了,每s有10条消息多出来没人处理,就只能堆积在队列当中,队列在内存中是有存储上限的,这样下去一定会把队列给堆满,这样再有消息就进不去了。如果进不去,消息就会被丢弃,这样就出问题了。两个消费者,每s都处理40条消息的话之前的情况就是可以轻松应对了。

所以WorkQueue实际上就还是普通的消息队列,只是挂了两个消费者。可以提高消息处理速度,避免队列消息堆积。

image-20220512104256528

image-20220512105653155

消费者1消费的快,消费者2消费的慢,就少消费一点,毕竟能者多劳,但是我们现在是平均分布。这个并不是我们想要的。

这个就是我们的消息预取机制。什么叫做消息预取?当我们大量的消息到达队列的时候队列会将消息进行投递,consumer1和consumer2的通道(channel)会把消息先拿过来,这个叫做消息预取。管他能不能处理,先拿过来再说,

image-20220512105952141

这下就两边各自处理了50条消息了,于是两个人就平均分配了所有的消息,每个人25条,但是消费者1处理的快,很快搞定了,消费者2处理的慢,就花了很长时间才搞定

消费预取限制:

image-20220512110154593

我们这里就是预取1条消息,消费完成以后再去拿,不至于一下预取一大堆结果处理不完。

image-20220512110719221

这个就起到了一个能者多劳的效果了。

最后我们做一个总结:

image-20220512110408528

这个模式可以提高整个队列的速度。

这里我们发现在代码中创建队列或者交换机之类的也是非常简单(包括这些绑定交换机到队列之类的操作,都是可以在一个config类中解决的,只要将amqp包中的对象给注册到SpringIOC容器中就好了)


@Configuration
public class FanoutConfig {
    // itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("itcast.fanout");
    }

    // fanout.queue1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    // 绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue1)
                .to(fanoutExchange);
    }

    // fanout.queue2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    // 绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder
                .bind(fanoutQueue2)
                .to(fanoutExchange);
    }

    @Bean
    public Queue objectQueue(){
        return new Queue("object.queue");
    }

    @Bean
    public Queue simpleQueue(){
        return new Queue("simple.queue");
    }

}

image-20220512111013751

# 发布订阅模型介绍

刚才我们的消息都是阅后即焚的(一旦消费完就会从队列中删除),这个就无法达到给多个消费者消费了。

这个就无法满足我们课程开始时提出的需求:当用户支付完成了,你得去通知订单服务,仓储服务,短信服务。让这些服务各自去完成自己的业务。

现在的消息要被3个服务都收到。

那么就需要用到发布订阅模型了。实现方式实际上就是加上了一个交换机(exchange)。这个模型和计算机网络实际上是一样的。

image-20220512111653755

这个模型我们不关心消费者怎么绑定。我们关心的是消息如何从发布者到达队列。

消息发布到交换机,然后交换机将消息转发到队列当中。消息发布者不需要知道投递到队列的细节,这些都是交换机决定的,交换机转发消息给多个队列,这个就实现了我们的一份消息多个消费者消费的需求了!

到底交换机是发送给一个还是多个呢?

image-20220512112410933

exchange只负责转发,不管消息是否丢失,只有队列是存消息的。

# 发布订阅-Fanout Exchange

image-20220512112559133

image-20220512112629997

SpringAMQP提供了声明交换机,队列,绑定关系的API,例如:

image-20220512112642809

image-20220512112737269

连绑定都是需要声明的,利用BindingBuilder这个提供给我们的工厂。(绑定它(队列)到它(交换机))

image-20220512112936808

这里我们是通过声明Bean的方式去写的。

将来它读取到这些Bean以后就会帮我们,向rabbitmq去声明队列,交换机,绑定关系。全部都由Spring帮我们去做。

image-20220512113155611

image-20220512113202359

交换机点到这个里面来,看到这个bindings

image-20220512113227937

image-20220512113320885

队列就已经绑定上了。

image-20220512113602086

这里以前是发送到队列,现在是发送到交换机。

image-20220512113607932

然后发送消息,两个队列都收到了,这个就是fanout exchange

image-20220512113506269

总结:

image-20220512113649167

# 发布订阅-DirectExchange

image-20220512113724535

会将消息根据路由规则路由到指定的Queue

这个也就被称为路由模式(routes)

image-20220512113836463

我们现在两个队列都有自己的bindingkey了,将来发布者发送消息的时候也要指定一个RoutingKey

这个时候Exchange将消息路由到队列的时候要比对bindingkey了。

image-20220512114059766

这里要是我们发送的消息是bindingkey是red的话就和我们之前的fanoutexchange一样了,就都是广播了。

所以我们可以用DirectExchange来模拟FanoutExchange,它比FanoutExchange更加灵活。这种灵活性也是有代价的,不要忘了在消息中加上bindingkey。

这里我们不再使用Bean的方式来声明,毕竟需要声明这么多东西。

image-20220512114502566

接下来就是基于@RabbitListener注解来声明这些组件。

之前我们写的所有消费者当中都有这个注解。但是这个注解上可以同时完成队列和交换机的声明。不用再创建Bean了。

(消费者实际上才是在代码中指定组件逻辑的,发布者只需要知道往哪个交换机or队列中发送消息就ok了)

image-20220512114949567

这里我们原来在主键中写的是queues=...

而这里我们是写bindings=@QueueBinding(...)

image-20220512115154823

这样我们就完成了绑定的声明了,并且还声明了bindingkeys。

image-20220512115330245

同样的代码copy一份来声明queue2到交换机的绑定。key为red,yellow

这里多了一个exchange叫做itcast.direct,然后进去看到queue1绑定了什么,queue2绑定了什么key。

image-20220512115449568

接着是消息发送的方法。

image-20220512115730126

这下就blue就收到了。

image-20220512115742285

# 发布订阅-TopicExchange

image-20220512115937546

这个模式和DirectExchange很像,但是routingkey必须是多个单词的列表,并且用.分割

image-20220512120028961

左下角我们看到这个routingkey的逻辑实际上和话题的非常像

那么我们去订阅的话,比如我想关系下新闻,中国的,日本的,那么我们得绑定两个key(在Direct中)。

而在TopicExchange中我们支持通配符

image-20220512120316353

image-20220512120332408

于是我们只要写china.#就可以收到所有的中国相关的话题。而我们写#.news就可以收到所有和新闻相关的话题。

这个实际上就是简化了bindingkey的写法,原来用多个key的,现在只需要用一个key就可以了。

image-20220512120538898

这下就写完成了

image-20220512120637890

itcast.exchange

image-20220512120742956

这里我们就可以测试这里的服务了。

image-20220512120859547

这个变化也没有很大

总结:

这个bindingkey可以支持通配符。然后routingkey需要用.来分割单词列表。

# 消息转换器

我们在使用rabbitTemplate的时候消息这边接收的类一直是Object,这个说明我们可以发送任意的对象。

先在config里面声明一个队列

image-20220512121305170

然后我们测试发送一个对象

@Test
    public void testSendObjectQueue(){
        Map<String,Object> msg=new HashMap<>();
        msg.put("name","pixel-revolve");
        msg.put("age",20);

        rabbitTemplate.convertAndSend("object.queue",msg);

    }

去到rabbitmq里面,我们看到消息,还有消息类型是java序列化类型。

image-20220512121758594

它只支持字节。这种原生的序列化方式并不是那么好,性能差,安全性有问题,数据长度太长了,消息体越大,传输的效率也就越慢,而且还占用额外的内存空间。所以我们非常不推荐这种方式。

Spring的消息对象处理使用MessageConverter来处理的。

image-20220512122120971

我们使用jackson的方式来实现是比较喜欢的。

同样的我们在consumer中引入jackson的依赖,并且定义一样的MessageConverter

image-20220512122621498

这里我们就接收到了,这个方式就是比较方便的

image-20220512122543082

总结:

image-20220512122722144

上次更新: 2024/05/02, 13:47:32
DockerCompose
Sentinel微服务保护

← DockerCompose Sentinel微服务保护→

Theme by Vdoing | Copyright © 2019-2024 Backend Development | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式