「重试机制java」重试机制和断熔有什么区别
今天给各位分享重试机制java的知识,其中也会对重试机制和断熔有什么区别进行解释,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!
本文目录一览:
- 1、Spring Boot 一个注解搞定重试机制,不能太优雅了
- 2、死锁避免,检测和预防之间的区别是什么
- 3、[TEXT_FULL_WRITING]异常处理
- 4、《Spring Cloud 》Eureka服务调用服务超时重试机制
- 5、javaokhttp怎么去除重试机制
- 6、Spring整合rabbitmq实践(一):基础
Spring Boot 一个注解搞定重试机制,不能太优雅了
在实际工作中,重处理是一个非常常见的场景,比如:
这些错误可能是因为网络波动造成的,等待过后重处理就能成功。通常来说,会用 try/catch , while 循环之类的语法来进行重处理,但是这样的做法缺乏统一性,并且不是很方便,要多写很多代码。
然而 spring-retry 却可以通过注解,在不入侵原有业务逻辑代码的方式下,优雅的实现重处理功能。
spring系列的 spring-retry 是另一个实用程序模块,可以帮助我们以标准方式处理任何特定操作的重试。在 spring-retry 中,所有配置都是基于简单注释的。
Spring Boot 基础就不介绍了,推荐下这个实战教程:
来简单解释一下注解中几个参数的含义:
当重试耗尽时还是失败,会出现什么情况呢?
当重试耗尽时, RetryOperations 可以将控制传递给另一个回调,即 RecoveryCallback 。 Spring-Retry 还提供了 @Recover 注解,用于@Retryable重试失败后处理方法。如果不需要回调方法,可以直接不写回调方法,那么实现的效果是,重试次数完了后,如果还是没成功没符合业务判断,就抛出异常。
可以看到传参里面写的是 Exception e ,这个是作为回调的接头暗号(重试次数用完了,还是失败,我们抛出这个 Exception e 通知触发这个回调方法)。
对于 @Recover 注解的方法,需要特别注意的是:
本篇主要简单介绍了Springboot中的 Retryable 的使用,主要的适用场景和注意事项,当需要重试的时候还是很有用的。
死锁避免,检测和预防之间的区别是什么
在有些情况下死锁是可以避免的。本文将展示三种用于避免死锁的技术:
加锁顺序
加锁时限
死锁检测
加锁顺序
当多个线程需要相同的一些锁,但是按照不同的顺序加锁,死锁就很容易发生。
如果能确保所有的线程都是按照相同的顺序获得锁,那么死锁就不会发生。看下面这个例子:
Thread 1:
lock A
lock B
Thread 2:
wait for A
lock C (when A locked)
Thread 3:
wait for A
wait for B
wait for C
如果一个线程(比如线程3)需要一些锁,那么它必须按照确定的顺序获取锁。它只有获得了从顺序上排在前面的锁之后,才能获取后面的锁。
例如,线程2和线程3只有在获取了锁A之后才能尝试获取锁C(译者注:获取锁A是获取锁C的必要条件)。因为线程1已经拥有了锁A,所以线程2和3需要一直等到锁A被释放。然后在它们尝试对B或C加锁之前,必须成功地对A加了锁。
按照顺序加锁是一种有效的死锁预防机制。但是,这种方式需要你事先知道所有可能会用到的锁(译者注:并对这些锁做适当的排序),但总有些时候是无法预知的。
加锁时限
另外一个可以避免死锁的方法是在尝试获取锁的时候加一个超时时间,这也就意味着在尝试获取锁的过程中若超过了这个时限该线程则放弃对该锁请求。若一个线程没有在给定的时限内成功获得所有需要的锁,则会进行回退并释放所有已经获得的锁,然后等待一段随机的时间再重试。这段随机的等待时间让其它线程有机会尝试获取相同的这些锁,并且让该应用在没有获得锁的时候可以继续运行(译者注:加锁超时后可以先继续运行干点其它事情,再回头来重复之前加锁的逻辑)。
以下是一个例子,展示了两个线程以不同的顺序尝试获取相同的两个锁,在发生超时后回退并重试的场景:
Thread 1 locks A
Thread 2 locks B
Thread 1 attempts to lock B but is blocked
Thread 2 attempts to lock A but is blocked
Thread 1's lock attempt on B times out
Thread 1 backs up and releases A as well
Thread 1 waits randomly (e.g. 257 millis) before retrying.
Thread 2's lock attempt on A times out
Thread 2 backs up and releases B as well
Thread 2 waits randomly (e.g. 43 millis) before retrying.
在上面的例子中,线程2比线程1早200毫秒进行重试加锁,因此它可以先成功地获取到两个锁。这时,线程1尝试获取锁A并且处于等待状态。当线程2结束时,线程1也可以顺利的获得这两个锁(除非线程2或者其它线程在线程1成功获得两个锁之前又获得其中的一些锁)。
需要注意的是,由于存在锁的超时,所以我们不能认为这种场景就一定是出现了死锁。也可能是因为获得了锁的线程(导致其它线程超时)需要很长的时间去完成它的任务。
此外,如果有非常多的线程同一时间去竞争同一批资源,就算有超时和回退机制,还是可能会导致这些线程重复地尝试但却始终得不到锁。如果只有两个线程,并且重试的超时时间设定为0到500毫秒之间,这种现象可能不会发生,但是如果是10个或20个线程情况就不同了。因为这些线程等待相等的重试时间的概率就高的多(或者非常接近以至于会出现问题)。
(译者注:超时和重试机制是为了避免在同一时间出现的竞争,但是当线程很多时,其中两个或多个线程的超时时间一样或者接近的可能性就会很大,因此就算出现竞争而导致超时后,由于超时时间一样,它们又会同时开始重试,导致新一轮的竞争,带来了新的问题。)
这种机制存在一个问题,在Java中不能对synchronized同步块设置超时时间。你需要创建一个自定义锁,或使用Java5中java.util.concurrent包下的工具。写一个自定义锁类不复杂,但超出了本文的内容。后续的Java并发系列会涵盖自定义锁的内容。
死锁检测
死锁检测是一个更好的死锁预防机制,它主要是针对那些不可能实现按序加锁并且锁超时也不可行的场景。
每当一个线程获得了锁,会在线程和锁相关的数据结构中(map、graph等等)将其记下。除此之外,每当有线程请求锁,也需要记录在这个数据结构中。
当一个线程请求锁失败时,这个线程可以遍历锁的关系图看看是否有死锁发生。例如,线程A请求锁7,但是锁7这个时候被线程B持有,这时线程A就可以检查一下线程B是否已经请求了线程A当前所持有的锁。如果线程B确实有这样的请求,那么就是发生了死锁(线程A拥有锁1,请求锁7;线程B拥有锁7,请求锁1)。
当然,死锁一般要比两个线程互相持有对方的锁这种情况要复杂的多。线程A等待线程B,线程B等待线程C,线程C等待线程D,线程D又在等待线程A。线程A为了检测死锁,它需要递进地检测所有被B请求的锁。从线程B所请求的锁开始,线程A找到了线程C,然后又找到了线程D,发现线程D请求的锁被线程A自己持有着。这是它就知道发生了死锁。
下面是一幅关于四个线程(A,B,C和D)之间锁占有和请求的关系图。像这样的数据结构就可以被用来检测死锁。
那么当检测出死锁时,这些线程该做些什么呢?
一个可行的做法是释放所有锁,回退,并且等待一段随机的时间后重试。这个和简单的加锁超时类似,不一样的是只有死锁已经发生了才回退,而不会是因为加锁的请求超时了。虽然有回退和等待,但是如果有大量的线程竞争同一批锁,它们还是会重复地死锁(编者注:原因同超时类似,不能从根本上减轻竞争)。
一个更好的方案是给这些线程设置优先级,让一个(或几个)线程回退,剩下的线程就像没发生死锁一样继续保持着它们需要的锁。如果赋予这些线程的优先级是固定不变的,同一批线程总是会拥有更高的优先级。为避免这个问题,可以在死锁发生的时候设置随机的优先级。
[TEXT_FULL_WRITING]异常处理
前段时间项目用到websocket技术,偶然过程遇到过sendMessage出现异常的情况。
The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1148)
=======================
查了一些资料没有找到,只好先使用重试机制,出现异常时回调监听将任务重新扔进线程池。
现在空闲了一点,准备着手测试一下这个问题产生的原因。
这主要是一个线程安全的问题。
网上出现这个问题的也主要有两种情况
一种是,onMessage方法有返回值,导致onMessage无法被sync block控制。
一种是,session没有被同步控制,导致多线程情况下,出现IllegalStateException
保证以上两点之后
使用session.getBasicRemote()应该是没有问题的。
session.getAsyncRemote()可能是tomcat有bug引起,依旧会出现TEXT_FULL_WRITING
参考:
《Spring Cloud 》Eureka服务调用服务超时重试机制
问题根源有二:
1):业务耗时时间比较长,超过服务调用超时时间配置,由于Spring Cloud 服务调用超时重试机制默认开启,所以会导致服务被调用了两次。
2):服务端未做幂等性,导致重复的业务处理。
增大配置超时时间
ribbon.ReadTimeout=10000
ribbon.ConnectTimeout=10000
eureka服务调用重试开关的配置属性:
默认开启。
重试机制:对于连接超时的异常,feign都会触发重试机制,对于读取超时,会根据请求类型判断,如果是GET异常,触发重试;其他异常,不会触发重试。
服务响应超时时间,默认5s,连接的超时时间为2s。
同一实例最大重试次数为1
负载均衡的其他实例的重试次数为2
对所有操作请求都进行重试,默认false,建议不要开启。
javaokhttp怎么去除重试机制
1、首先打开javaokhttp软件,输入账号点击登录。
2、其次进入后点击设置选项。
3、最后在设置中勾选重试机制关闭即可。
Spring整合rabbitmq实践(一):基础
Spring整合rabbitmq实践(二):扩展
Spring整合rabbitmq实践(三):源码
producer:消息生产者;
consumer:消息消费者;
queue:消息队列;
exchange:接收producer发送的消息按照binding规则转发给相应的queue;
binding:exchange与queue之间的关系;
virtualHost:每个virtualHost持有自己的exchange、queue、binding,用户只能在virtualHost粒度控制权限。
fanout:
群发到所有绑定的queue;
direct:
根据routing key routing到相应的queue,routing不到任何queue的消息扔掉;可以不同的key绑到同一个queue,也可以同一个key绑到不同的queue;
topic:
类似direct,区别是routing key是由一组以“.”分隔的单词组成,可以有通配符,“*”匹配一个单词,“#”匹配0个或多个单词;
headers:
根据arguments来routing。
arguments为一组key-value对,任意设置。
“x-match”是一个特殊的key,值为“all”时必须匹配所有argument,值为“any”时只需匹配任意一个argument,不设置默认为“all”。
通过以下配置,可以获得最基础的发送消息到queue,以及从queue接收消息的功能。
这个包同时包含了一些其它的包:spring-context、spring-tx、spring-web、spring-messaging、spring-retry、spring-amqp、amqp-client,如果想单纯一点,可以单独引入。
最主要的是以下几个包,
spring-amqp:
spring-rabbit:
amqp-client:
个人理解就是,spring-amqp是spring整合的amqp,spring-rabbit是spring整合的rabbitmq(rabbitmq是amqp的一个实现,所以可能spring-rabbit也是类似关系),amqp-client提供操作rabbitmq的java api。
目前最新的是2.0.5.RELEASE版本。如果编译报错,以下信息或许能有所帮助:
(1)
解决方案:spring-amqp版本改为2.0.5.RELEASE。
(2)
解决方案:spring-context版本改为5.0.7.RELEASE。
(3)
解决方案:spring-core版本改为5.0.7.RELEASE。
(4)
解决方案:spring-beans版本改为5.0.7.RELEASE。
(5)
解决方案:spring-aop版本改为5.0.7.RELEASE。
总之,需要5.0.7.RELEASE版本的spring,及相匹配版本的amqp-client。
后面所讲的这些bean配置,spring-amqp中都有默认配置,如果不需要修改默认配置,则不用人为配置这些bean。后面这些配置也没有涉及到所有的属性。
这里的ConnectionFactory指的是spring-rabbit包下面的ConnectionFactory接口,不是amqp-client包下面的ConnectionFactory类。
上面这个bean是spring-amqp的核心,不论是发送消息还是接收消息都需要这个bean,下面描述一下里面这些配置的含义。
setAddresses :设置了rabbitmq的地址、端口,集群部署的情况下可填写多个,“,”分隔。
setUsername :设置rabbitmq的用户名。
setPassword :设置rabbitmq的用户密码。
setVirtualHost :设置virtualHost。
setCacheMode :设置缓存模式,共有两种, CHANNEL 和 CONNECTION 模式。
CHANNEL 模式,程序运行期间ConnectionFactory会维护着一个Connection,所有的操作都会使用这个Connection,但一个Connection中可以有多个Channel,操作rabbitmq之前都必须先获取到一个Channel,否则就会阻塞(可以通过setChannelCheckoutTimeout()设置等待时间),这些Channel会被缓存(缓存的数量可以通过setChannelCacheSize()设置);
CONNECTION 模式,这个模式下允许创建多个Connection,会缓存一定数量的Connection,每个Connection中同样会缓存一些Channel,除了可以有多个Connection,其它都跟CHANNEL模式一样。
这里的Connection和Channel是spring-amqp中的概念,并非rabbitmq中的概念,官方文档对Connection和Channel有这样的描述:
关于 CONNECTION 模式中,可以存在多个Connection的使用场景,官方文档的描述:
setChannelCacheSize :设置每个Connection中(注意是每个Connection)可以缓存的Channel数量,注意只是缓存的Channel数量,不是Channel的数量上限,操作rabbitmq之前(send/receive message等)要先获取到一个Channel,获取Channel时会先从缓存中找闲置的Channel,如果没有则创建新的Channel,当Channel数量大于缓存数量时,多出来没法放进缓存的会被关闭。
注意,改变这个值不会影响已经存在的Connection,只影响之后创建的Connection。
setChannelCheckoutTimeout :当这个值大于0时, channelCacheSize 不仅是缓存数量,同时也会变成数量上限,从缓存获取不到可用的Channel时,不会创建新的Channel,会等待这个值设置的毫秒数,到时间仍然获取不到可用的Channel会抛出AmqpTimeoutException异常。
同时,在 CONNECTION 模式,这个值也会影响获取Connection的等待时间,超时获取不到Connection也会抛出AmqpTimeoutException异常。
setPublisherReturns、setPublisherConfirms :producer端的消息确认机制(confirm和return),设为true后开启相应的机制,后文详述。
官方文档描述publisherReturns设为true打开return机制,publisherComfirms设为true打开confirm机制,但测试结果(2.0.5.RELEASE版本)是,任意一个设为true,两个都会打开。
addConnectionListener、addChannelListener、setRecoveryListener :添加或设置相应的Listener,后文详述。
setConnectionCacheSize :仅在 CONNECTION 模式使用,设置Connection的缓存数量。
setConnectionLimit :仅在 CONNECTION 模式使用,设置Connection的数量上限。
上面的bean配置,除了需要注入的几个listener bean以外,其它设置的都是其默认值(2.0.5.RELEASE版本),后面的bean示例配置也是一样,部分属性不同版本的默认值可能有所不同。
一般不用配置这个bean,这里简单提一下。
这个ConnectionFactory是rabbit api中的ConnectionFactory类,这里面是连接rabbitmq节点的Connection配置。
如果想修改这些配置,可以按如下方式配置:
consumer端如果通过@RabbitListener注解的方式接收消息,不需要这个bean。
不建议直接通过ConnectionFactory获取Channel操作rabbitmq,建议通过amqpTemplate操作。
setConnectionFactory :设置spring-amqp的ConnectionFactory。
setRetryTemplate :设置重试机制,详情见后文。
setMessageConverter :设置MessageConverter,用于java对象与Message对象(实际发送和接收的消息对象)之间的相互转换,详情见后文。
setChannelTransacted :打开或关闭Channel的事务,关于amqp的事务后文描述。
setReturnCallback、setConfirmCallback :return和confirm机制的回调接口,后文详述。
setMandatory :设为true使ReturnCallback生效。
这个bean仅在consumer端通过@RabbitListener注解的方式接收消息时使用,每一个@RabbitListener注解的方法都会由这个RabbitListenerContainerFactory创建一个MessageListenerContainer,负责接收消息。
setConnectionFactory :设置spring-amqp的ConnectionFactory。
setMessageConverter :对于consumer端,MessageConverter也可以在这里配置。
setAcknowledgeMode :设置consumer端的应答模式,共有三种:NONE、AUTO、MANUAL。
NONE,无应答,这种模式下rabbitmq默认consumer能正确处理所有发出的消息,所以不管消息有没有被consumer收到,有没有正确处理都不会恢复;
AUTO,由Container自动应答,正确处理发出ack信息,处理失败发出nack信息,rabbitmq发出消息后将会等待consumer端的应答,只有收到ack确认信息才会把消息清除掉,收到nack信息的处理办法由setDefaultRequeueRejected()方法设置,所以在这种模式下,发生错误的消息是可以恢复的。
MANUAL,基本同AUTO模式,区别是需要人为调用方法给应答。
setConcurrentConsumers :设置每个MessageListenerContainer将会创建的Consumer的最小数量,默认是1个。
setMaxConcurrentConsumers :设置每个MessageListenerContainer将会创建的Consumer的最大数量,默认等于最小数量。
setPrefetchCount :设置每次请求发送给每个Consumer的消息数量。
setChannelTransacted :设置Channel的事务。
setTxSize :设置事务当中可以处理的消息数量。
setDefaultRequeueRejected :设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。
setErrorHandler :实现ErrorHandler接口设置进去,所有未catch的异常都会由ErrorHandler处理。
AmqpTamplate里面有下面几个方法可以向queue发送消息:
这里,exchange必须存在,否则消息发不出去,会看到错误日志,但不影响程序运行:
Message是org.springframework.amqp.core.Message类,spring-amqp发送和接收的都是这个Message。
从Message类源码可以看到消息内容放在byte[]里面,MessageProperties对象包含了非常多的一些其它信息,如Header、exchange、routing key等。
这种方式,需要将消息内容(String,或其它Object)转换为byte[],示例:
也可以直接调用下面几个方法,Object将会自动转为Message对象发送:
有两种方法接收消息:
1.polling consumer,轮询调用方法一次获取一条;
2.asynchronous consumer,listener异步接收消息。
polling consumer
直接通过AmqpTemplate的方法从queue获取消息,有如下方法:
如果queue里面没有消息,会立刻返回null;传入timeoutMillis参数后可阻塞等待一段时间。
如果想直接从queue获取想要的java对象,可调用下面这一组方法:
后面4个方法是带泛型的,示例如下:
使用这四个方法需要配置org.springframework.amqp.support.converter.SmartMessageConverter,这是一个接口,Jackson2JsonMessageConverter已经实现了这个接口,所以只要将Jackson2JsonMessageConverter设置到RabbitTemplate中即可。
asynchronous consumer
有多种方式可以实现,详情参考官方文档。
最简单的实现方式是@RabbitListener注解,示例:
这里接收消息的对象用的是Message,也可以是自定义的java对象,但调用Converter转换失败会报错。
注解上指定的queue必须是已经存在并且绑定到某个exchange的,否则会报错:
如果在@RabbitListener注解中指明binding信息,就能自动创建queue、exchange并建立binding关系。
direct和topic类型的exchange需要routingKey,示例:
fanout类型的exchange,示例:
2.0版本之后,可以指定多个routingKey,示例:
并且支持arguments属性,可用于headers类型的exchange,示例:
@Queue有两个参数exclusive和autoDelete顺便解释一下:
exclusive,排他队列,只对创建这个queue的Connection可见,Connection关闭queue删除;
autoDelete,没有consumer对这个queue消费时删除。
对于这两种队列,durable=true是不起作用的。
另外,如果注解申明的queue和exchange及binding关系都已经存在,但与已存在的设置不同,比如,已存在的exchange的是direct类型,这里尝试改为fanout类型,结果是不会有任何影响,不论是修改或者新增参数都不会生效。
如果queue存在,exchange存在,但没有binding,那么程序启动后会自动建立起binding关系。
关于重试机制java和重试机制和断熔有什么区别的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
发布于:2022-12-07,除非注明,否则均为
原创文章,转载请注明出处。