「延迟消费java」延迟消费是投资的特性吗

博主:adminadmin 2022-12-25 13:42:06 76

今天给各位分享延迟消费java的知识,其中也会对延迟消费是投资的特性吗进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

一起讨论下,消息幂等(去重)通用解决方案

消息中间件是分布式系统常用的组件,无论是异步化、解耦、削峰等都有广泛的应用价值。我们通常会认为,消息中间件是一个可靠的组件——这里所谓的可靠是指,只要我把消息成功投递到了消息中间件,消息就不会丢失,即消息肯定会至少保证消息能被消费者成功消费一次,这是消息中间件最基本的特性之一,也就是我们常说的“AT LEAST ONCE”,即消息至少会被“成功消费一遍”。

举个例子,一个消息M发送到了消息中间件,消息投递到了消费程序A,A接受到了消息,然后进行消费,但在消费到一半的时候程序重启了,这时候这个消息并没有标记为消费成功,这个消息还会继续投递给这个消费者,直到其消费成功了,消息中间件才会停止投递。

然而这种可靠的特性导致,消息可能被多次地投递。举个例子,还是刚刚这个例子,程序A接受到这个消息M并完成消费逻辑之后,正想通知消息中间件“我已经消费成功了”的时候,程序就重启了,那么对于消息中间件来说,这个消息并没有成功消费过,所以他还会继续投递。这时候对于应用程序A来说,看起来就是这个消息明明消费成功了,但是消息中间件还在重复投递。

这在RockectMQ的场景来看,就是同一个messageId的消息重复投递下来了。

基于消息的投递可靠(消息不丢)是优先级更高的,所以消息不重的任务就会转移到应用程序自我实现,这也是为什么RocketMQ的文档里强调的,消费逻辑需要自我实现幂等。背后的逻辑其实就是:不丢和不重是矛盾的(在分布式场景下),但消息重复是有解决方案的,而消息丢失是很麻烦的。

例如:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存:

要实现消息的幂等,我们可能会采取这样的方案:

这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

假设这个消费的所有代码加起来需要1秒,有重复的消息在这1秒内(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),那么很可能,上面去重代码里面会发现,数据依然是空的(因为上一条消息还没消费完,还没成功更新订单状态),

那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题(如主键冲突抛出异常、库存被重复扣减而没释放等)

要解决上面并发场景下的消息幂等问题,一个可取的方案是开启事务把select 改成 select for update语句,把记录进行锁定。

但这样消费的逻辑会因为引入了事务包裹而导致整个消息消费可能变长,并发度下降。

当然还有其他更高级的解决方案,例如更新订单状态采取乐观锁,更新失败则消息重新消费之类的。但这需要针对具体业务场景做更复杂和细致的代码开发、库表设计,不在本文讨论的范围。

但无论是select for update, 还是乐观锁这种解决方案,实际上都是基于业务表本身做去重,这无疑增加了业务开发的复杂度, 一个业务系统里面很大部分的请求处理都是依赖MQ的,如果每个消费逻辑本身都需要基于业务本身而做去重/幂等的开发的话,这是繁琐的工作量。本文希望 探索 出一个通用的消息幂等处理的方法,从而抽象出一定的工具类用以适用各个业务场景。

在消息中间件里,有一个投递语义的概念,而这个语义里有一个叫”Exactly Once”,即消息肯定会被成功消费,并且只会被消费一次。以下是阿里云里对Exactly Once的解释:

在我们业务消息幂等处理的领域内,可以认为业务消息的代码肯定会被执行,并且只被执行一次,那么我们可以认为是Exactly Once。

但这在分布式的场景下想找一个通用的方案几乎是不可能的。不过如果是针对基于数据库事务的消费逻辑,实际上是可行的。

假设我们业务的消息消费逻辑是:更新MySQL数据库的某张订单表的状态:

要实现Exaclty Once即这个消息只被消费一次(并且肯定要保证能消费一次),我们可以这样做:在这个数据库中增加一个消息消费记录表,把消息插入到这个表,并且把原来的订单更新和这个插入的动作放到同一个事务中一起提交,就能保证消息只会被消费一遍了。

1、开启事务

2、插入消息表(处理好主键冲突的问题)

3、更新订单表(原消费逻辑)

4、提交事务

说明:

1、这时候如果消息消费成功并且事务提交了,那么消息表就插入成功了,这时候就算RocketMQ还没有收到消费位点的更新再次投递,也会插入消息失败而视为已经消费过,后续就直接更新消费位点了。这保证我们消费代码只会执行一次。2、如果事务提交之前服务挂了(例如重启),对于本地事务并没有执行所以订单没有更新,消息表也没插入成功;而对于RocketMQ服务端来说,消费位点也没更新,所以消息还会继续投递下来,投递下来发现这个消息插入消息表也是成功的,所以可以继续消费。这保证了消息不丢失。

事实上,阿里云ONS的EXACTLY-ONCE语义的实现上,就是类似这个方案基于数据库的事务特性实现的。更多详情可参考:

基于这种方式,的确这是有能力拓展到不同的应用场景,因为他的实现方案与具体业务本身无关——而是依赖一个消息表。

但是这里有它的局限性

1、消息的消费逻辑必须是依赖于关系型数据库事务。如果消费的消费过程中还涉及其他数据的修改,例如Redis这种不支持事务特性的数据源,则这些数据是不可回滚的。

2、数据库的数据必须是在一个库,跨库无法解决

注:业务上,消息表的设计不应该以消息ID作为标识,而应该以业务的业务主键作为标识更为合理,以应对生产者的重发。阿里云上的消息去重只是RocketMQ的messageId,在生产者因为某些原因手动重发(例如上游针对一个交易重复请求了)的场景下起不到去重/幂等的效果(因消息id不同)。

如上所述,这种方式Exactly Once语义的实现,实际上有很多局限性,这种局限性使得这个方案基本不具备广泛应用的价值。并且由于基于事务,可能导致锁表时间过长等性能问题。

例如我们以一个比较常见的一个订单申请的消息来举例,可能有以下几步(以下统称为步骤X):

1、 检查库存(RPC)

2、 锁库存(RPC)

3、 开启事务,插入订单表(MySQL)

4、 调用某些其他下游服务(RPC)

5、 更新订单状态

6、 commit 事务(MySQL)

这种情况下,我们如果采取消息表+本地事务的实现方式,消息消费过程中很多子过程是不支持回滚的,也就是说就算我们加了事务,实际上这背后的操作并不是原子性的。怎么说呢,就是说有可能第一条小在经历了第二步锁库存的时候,服务重启了,这时候实际上库存是已经在另外的服务里被锁定了,这并不能被回滚。当然消息还会再次投递下来,要保证消息能至少消费一遍,换句话说,锁库存的这个RPC接口本身依旧要支持“幂等”。

再者,如果在这个比较耗时的长链条场景下加入事务的包裹,将大大的降低系统的并发。所以通常情况下,我们处理这种场景的消息去重的方法还是会使用一开始说的业务自己实现去重逻辑的方式,如前面加select for update,或者使用乐观锁。

那我们有没有方法抽取出一个公共的解决方案,能兼顾去重、通用、高性能呢?

其中一个思路是把上面的几步,拆解成几个不同的子消息,例如:

1、库存系统消费A:检查库存并做锁库存,发送消息B给订单服务

2、订单系统消费消息B:插入订单表(MySQL),发送消息C给自己(下游系统)消费

3、下游系统消费消息C:处理部分逻辑,发送消息D给订单系统

4、订单系统消费消息D:更新订单状态

注:上述步骤需要保证本地事务和消息是一个事务的(至少是最终一致性的),这其中涉及到分布式事务消息相关的话题,不在本文论述。

可以看到这样的处理方法会使得每一步的操作都比较原子,而原子则意味着是小事务,小事务则意味着使用消息表+事务的方案显得可行。

然而,这太复杂了!这把一个本来连续的代码逻辑割裂成多个系统多次消息交互!那还不如业务代码层面上加锁实现呢。

上面消息表+本地事务的方案之所以有其局限性和并发的短板,究其根本是因为它依赖于关系型数据库的事务,且必须要把事务包裹于整个消息消费的环节。

如果我们能不依赖事务而实现消息的去重,那么方案就能推广到更复杂的场景例如:RPC、跨库等。

例如,我们依旧使用消息表,但是不依赖事务,而是针对消息表增加消费状态,是否可以解决问题呢?

67_1.png

以上是去事务化后的消息幂等方案的流程,可以看到,此方案是无事务的,而是针对消息表本身做了状态的区分:消费中、消费完成。只有消费完成的消息才会被幂等处理掉。而对于已有消费中的消息,后面重复的消息会触发延迟消费(在RocketMQ的场景下即发送到RETRY TOPIC),之所以触发延迟消费是为了控制并发场景下,第二条消息在第一条消息没完成的过程中,去控制消息不丢(如果直接幂等,那么会丢失消息(同一个消息id的话),因为上一条消息如果没有消费完成的时候,第二条消息你已经告诉broker成功了,那么第一条消息这时候失败broker也不会重新投递了)

上面的流程不再细说,后文有github源码的地址,读者可以参考源码的实现,这里我们回头看看我们一开始想解决的问题是否解决了:

1、 消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。

2、 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。

3、 支持上游业务生产者重发的业务重复的消息幂等问题。

关于第一个问题已经很明显已经解决了,在此就不讨论了。

关于第二个问题是如何解决的?主要是依靠插入消息表的这个动作做控制的,假设我们用MySQL作为消息表的存储媒介(设置消息的唯一ID为主键),那么插入的动作只有一条消息会成功,后面的消息插入会由于主键冲突而失败,走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。

关于第三个问题,只要我们设计去重的消息键让其支持业务的主键(例如订单号、请求流水号等),而不仅仅是messageId即可。所以也不是问题。

如果细心的读者可能会发现这里实际上是有逻辑漏洞的,问题出在上面聊到的个三问题中的第2个问题(并发场景),在并发场景下我们依赖于消息状态是做并发控制使得第2条消息重复的消息会不断延迟消费(重试)。但如果这时候第1条消息也由于一些异常原因(例如机器重启了、外部异常导致消费失败)没有成功消费成功呢?也就是说这时候延迟消费实际上每次下来看到的都是 消费中 的状态,最后消费就会被视为消费失败而被投递到死信Topic中(RocketMQ默认可以重复消费16次)。

有这种顾虑是正确的!对于此,我们解决的方法是,插入的消息表必须要带一个最长消费过期时间,例如10分钟,意思是如果一个消息处于 消费中 超过10分钟,就需要从消息表中删除(需要程序自行实现)。所以最后这个消息的流程会是这样的:

67_2.png

我们这个方案实际上没有事务的,只需要一个存储的中心媒介,那么自然我们可以选择更灵活的存储媒介,例如Redis。使用Redis有两个好处:

1、性能上损耗更低

2、上面我们讲到的超时时间可以直接利用Redis本身的ttl实现

当然Redis存储的数据可靠性、一致性等方面是不如MySQL的,需要用户自己取舍。

以上方案针对RocketMQ的Java实现已经开源放到Github中,具体的使用文档可以参考 ,

以下仅贴一个Readme中利用Redis去重的使用样例,用以意业务中如果使用此工具加入消息去重幂等的是多么简单:

以上代码大部分是原始RocketMQ的必须代码,唯一需要修改的仅仅是创建一个 DedupConcurrentListener 示例,在这个示例中指明你的消费逻辑和去重的业务键(默认是messageId)。

更多使用详情请参考Github上的说明。

实现到这里,似乎方案挺完美的,所有的消息都能快速的接入去重,且与具体业务实现也完全解耦。那么这样是否就完美的完成去重的所有任务呢?

很可惜,其实不是的。原因很简单:因为要保证消息至少被成功消费一遍,那么消息就有机会消费到一半的时候失败触发消息重试的可能。还是以上面的订单流程X:

1、 检查库存(RPC)

2、 锁库存(RPC)

3、 开启事务,插入订单表(MySQL)

4、 调用某些其他下游服务(RPC)

5、 更新订单状态

6、 commit 事务(MySQL)

当消息消费到步骤3的时候,我们假设MySQL异常导致失败了,触发消息重试。因为在重试前我们会删除幂等表的记录,所以消息重试的时候就会重新进入消费代码,那么步骤1和步骤2就会重新再执行一遍。如果步骤2本身不是幂等的,那么这个业务消息消费依旧没有做好完整的幂等处理。

那么既然这个并不能完整的完成消息幂等,还有什么价值呢?价值可就大了!虽然这不是解决消息幂等的银弹(事实上,软件工程领域里基本没有银弹),但是他能以便捷的手段解决:

1、各种由于Broker、负载均衡等原因导致的消息重投递的重复问题

2、各种上游生产者导致的业务级别消息重复问题

3、重复消息并发消费的控制窗口问题,就算重复,重复也不可能同一时间进入消费逻辑

也就是说,使用这个方法能保证正常的消费逻辑场景下(无异常,无异常退出),消息的幂等工作全部都能解决,无论是业务重复,还是rocketmq特性带来的重复。

事实上,这已经能解决99%的消息重复问题了,毕竟异常的场景肯定是少数的。那么如果希望异常场景下也能处理好幂等的问题,可以做以下工作降低问题率:

1、消息消费失败做好回滚处理。如果消息消费失败本身是带回滚机制的,那么消息重试自然就没有副作用了。

2、消费者做好优雅退出处理。这是为了尽可能避免消息消费到一半程序退出导致的消息重试。

3、一些无法做到幂等的操作,至少要做到终止消费并告警。例如锁库存的操作,如果统一的业务流水锁成功了一次库存,再触发锁库存,如果做不到幂等的处理,至少要做到消息消费触发异常(例如主键冲突导致消费异常等)

4、在#3做好的前提下,做好消息的消费监控,发现消息重试不断失败的时候,手动做好#1的回滚,使得下次重试消费成功

(三)延迟队列DelayQueue实现订单自动取消

DelayQueue :,1)java自带延时获取元素, 无界 阻塞队列,2)队列 内部用PriorityQueue实现 。     创建元素时可 指定多久 才能从队列中获取当前元素。期满才从队列中 提取 ,没到延时时间, 阻塞 当前线程。

泛型队列,继承Delayed,需重写getDelay和compareTo方法。

1.public class DelayQueue E extends Delayed extends AbstractQueue E

2.public int compareTo (T o); 往DelayQueue 加入数据 执行,根据返回值判断位置。排得越 前,越先被消费

3. long getDelay (TimeUnit unit);判断消息是否到期。负数,已到期,可读。

优点: java自带,轻量级,使用简单

缺点: 存储 内存中 ,服务器 重启 会造成数据 丢失 ,配合redis使用。数量大用mq

订单类,实现Delayed接口

unit.convert(this.createdTime.toInstant(ZoneOffset.of("+8")).toEpochMilli()+expireTime-System.currentTimeMillis(),TimeUnit.MILLISECONDS);

DelayQueue 分布式 环境中就会 重复执行;所以加redis:

每次生成订单时, 同时向 redis setnx 设定该未支付订单,

每次查询待支付订单时须从 redis 中也查一遍,

redis 不存在该订单,改为已取消。

AB 两个队列,A 队列设置 消息过期时间 , 没有消费者 ,A 过期自动转发到 B , B 队列消费者 取消 。

网易传媒技术团队:消息中间件实现延迟队列的应用与实践

早期需要延迟处理的业务场景,更多的是通过定时任务扫表,然后执行满足条件的记录,具有频率高、命中低、资源消耗大的缺点。随着消息中间件的普及,延迟消息可以很好的处理这种场景,本文主要介绍延迟消息的使用场景以及基于常见的消息中间件如何实现延迟队列,最后给出了一个在网易公开课使用延迟队列的实践。

1、有效期:限时活动、拼团。。。

2、超时处理:取消超时未支付订单、超时自动确认收货。。。

4、重试:网络异常重试、打车派单、依赖条件未满足重试。。。

5、定时任务:智能设备定时启动。。。

1、RabbitMQ

1)简介:基于AMQP协议,使用Erlang编写,实现了一个Broker框架

a、Broker:接收和分发消息的代理服务器

b、Virtual Host:虚拟主机之间相互隔离,可理解为一个虚拟主机对应一个消息服务

c、Exchange:交换机,消息发送到指定虚拟机的交换机上

d、Binding:交换机与队列绑定,并通过路由策略和routingKey将消息投递到一个或多个队列中

e、Queue:存放消息的队列,FIFO,可持久化

f、Channel:信道,消费者通过信道消费消息,一个TCP连接上可同时创建成百上千个信道,作为消息隔离

2)延迟队列实现:RabbitMQ的延迟队列基于消息的存活时间TTL(Time To Live)和死信交换机DLE(Dead Letter Exchanges)实现

a、TTL:RabbitMQ支持对队列和消息各自设置存活时间,取二者中较小的值,即队列无消费者连接或消息在队列中一直未被消费的过期时间

b、DLE:过期的消息通过绑定的死信交换机,路由到指定的死信队列,消费者实际上消费的是死信队列上的消息

3)缺点:

a、配置麻烦,额外增加一个死信交换机和一个死信队列的配置

b、脆弱性,配置错误或者生产者消费者连接的队列错误都有可能造成延迟失效

2、RocketMQ

1)简介:来源于阿里,目前为Apache顶级开源项目,使用Java编写,基于长轮询的拉取方式,支持事务消息,并解决了顺序消息和海量堆积的问题

a、Broker:存放Topic并根据读取Producer的提交日志,将逻辑上的一个Topic分多个Queue存储,每个Queue上存储消息在提交日志上的位置

b、Name Server:无状态的节点,维护Topic与Broker的对应关系以及Broker的主从关系

2)延迟队列实现:RocketMQ发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中),然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中

3)缺点:延迟时间粒度受限制(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)

3、Kafka

1)简介:来源于Linkedin,目前为Apache顶级开源项目,使用Scala和Java编写,基于zookeeper协调的分布式、流处理的日志系统,升级版为Jafka

2)延迟队列实现:Kafka支持延时生产、延时拉取、延时删除等,其基于时间轮和JDK的DelayQueue实现

a、时间轮(TimingWheel):是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表

b、定时任务列表(TimerTaskList):是一个环形的双向链表,链表中的每一项表示的都是定时任务项

c、定时任务项(TimerTaskEntry):封装了真正的定时任务TimerTask

d、层级时间轮:当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中,类似于钟表就是一个三级时间轮

e、JDK DelayQueue:存储TimerTaskList,并根据其expiration来推进时间轮的时间,每推进一次除执行相应任务列表外,层级时间轮也会进行相应调整

3)缺点:

a、延迟精度取决于时间格设置

b、延迟任务除由超时触发还可能被外部事件触发而执行

4、ActiveMQ

1)简介:基于JMS协议,Java编写的Apache顶级开源项目,支持点对点和发布订阅两种模式。

a、点对点(point-to-point):消息发送到指定的队列,每条消息只有一个消费者能够消费,基于拉模型

b、发布订阅(publish/subscribe):消息发送到主题Topic上,每条消息会被订阅该Topic的所有消费者各自消费,基于推模型

2)延迟队列实现:需要延迟的消息会先存储在JobStore中,通过异步线程任务JobScheduler将到达投递时间的消息投递到相应队列上

a、Broker Filter:Broker中定义了一系列BrokerFilter的子类构成拦截器链,按顺序对消息进行相应处理

b、ScheduleBroker:当消息中指定了延迟相关属性,并且jobId为空时,会生成调度任务存储到JobStore中,此时消息不会进入到队列

c、JobStore:基于BTree存储,key为任务执行的时间戳,value为该时间戳下需要执行的任务列表

d、JobScheduler:取JobStore中最小的key执行(调度时间最早的),执行时间=当前时间,将该任务列表依次投递到所属的队列,对于需要重复投递和投递失败的会再次存入JobStore中。

注: 此处JobScheduler的执行时间间隔可动态变化,默认0.5s,有新任务时会立即执行(Object-notifyAll())并设置时间间隔为0.1s,没有新任务后,下次执行时间为最近任务的调度执行时间。

3)缺点:投递到队列失败,将消息重新存入JobStore,消息调度执行时间=系统当前时间+延迟时间,会导致消息被真实投递的时间可能为设置的延迟时间的整数倍

5、Redis

1)简介:基于Key-Value的NoSQL数据库,由于其极高的性能常被当作缓存来使用,其数据结构支持:字符串、哈希、列表、集合、有序集合

2)延迟队列实现:Redis的延迟队列基于有序集合,score为执行时间戳,value为任务实体或任务实体引用

3)缺点:

a、实现复杂,本身不支持

b、完全基于内存,延迟时间长浪费内存资源

6、消息队列对比

1、公开课延迟队列技术选型

1)业务场景:关闭超时未支付订单、限时优惠活动、拼团

2)性能要求:订单、活动、拼团 数据量可控,上述MQ均能满足要求

3)可靠性:使用ActiveMQ、RabbitMQ、RocketMQ作为延迟队列更普遍

4)可用性:ActiveMQ、RocketMQ自身支持延迟队列功能,且目前公开课业务中使用的中间件为ActiveMQ和Kafka

5)延迟时间灵活:活动的开始和结束时间比较灵活,而RocketMQ时间粒度较粗,Kafka会依赖时间格有精度缺失

结论: 最终选择ActiveMQ来作为延迟队列

2、业务场景:关闭未支付订单

1)关闭微信未支付订单

2)关闭IOS未支付订单

3、ActiveMQ使用方式

1)activemq.xml中支持调度任务

2)发送消息时,设置message的延迟属性

其中:

a、延迟处理

AMQ_SCHEDULED_DELAY:设置多长时间后,投递给消费者(毫秒)

b、重复投递

AMQ_SCHEDULED_PERIOD:重复投递时间间隔(毫秒)

AMQ_SCHEDULED_REPEAT:重复投递次数

c、指定调度计划

AMQ_SCHEDULED_CRON:corn正则表达式

4、公开课使用中进行的优化

1)可靠性:针对实际投递时间可能翻倍的问题,结合ActiveMQ的重复投递,在消费者逻辑中做幂等处理来保证延迟时间的准确性

2)可追溯性:延迟消息及消费情况做数据库冗余存储

3)易用性:业务上定义好延迟枚举类型,直接使用JmsDelayTemplate发送,无需关心数据备份和参数等细节

1、无论是基于死信队列还是基于数据先存储后投递,本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储,从而降低耦合度

2、无论是检查队头消息TTL还是调度存储的延迟数据,本质上都是通过定时任务来完成的,但是定时任务的触发策略以及延迟数据的存储方式决定了不同中间件之间的性能优劣

张浩,2018年加入网易传媒,高级Java开发工程师,目前在网易公开课主要做支付财务体系、版本迭代相关的工作。

java 判断2个时间差15分钟

可以使用消息队列,大致流程:

操作端:第一次拿到文件-发送一条记录到消息队列(此队列有延迟15设置)

队列监听端:监听延迟的消息队列,有消息后消费(处理你的业务逻辑比如那文件,再发送到此消息队列)

javaweb 如何将请求缓存,或者说延时请求?

把用户参数请求入队列,然后直接返回用户调用,后续消费者慢慢处理请求就好了,不过这样子只能返回调用是否成功,并不能立即返回业务处理结果

延迟消费java的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于延迟消费是投资的特性吗、延迟消费java的信息别忘了在本站进行查找喔。

The End

发布于:2022-12-25,除非注明,否则均为首码项目网原创文章,转载请注明出处。