35 changed files with 510 additions and 593 deletions
@ -0,0 +1,48 @@ |
|||||
|
package org.dromara.common.sdk.cloudapi.interconnection; |
||||
|
|
||||
|
/** |
||||
|
* @author sean |
||||
|
* @version 1.7 |
||||
|
* @date 2023/10/16 |
||||
|
*/ |
||||
|
public class PsdkFloatingWindowTextPsdk { |
||||
|
|
||||
|
/** |
||||
|
* Data content |
||||
|
* length: Less than 256 |
||||
|
*/ |
||||
|
private Integer psdkIndex; |
||||
|
|
||||
|
private String value; |
||||
|
|
||||
|
|
||||
|
public PsdkFloatingWindowTextPsdk() { |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return "PsdkFloatingWindowTextPsdk{" + |
||||
|
"psdkIndex=" + psdkIndex + |
||||
|
",value='" + value + '\'' + |
||||
|
'}'; |
||||
|
} |
||||
|
|
||||
|
public String getValue() { |
||||
|
return value; |
||||
|
} |
||||
|
|
||||
|
public PsdkFloatingWindowTextPsdk setValue(String value) { |
||||
|
this.value = value; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
public Integer getPsdkIndex() { |
||||
|
return psdkIndex; |
||||
|
} |
||||
|
|
||||
|
public PsdkFloatingWindowTextPsdk setPsdkIndex(Integer psdkIndex) { |
||||
|
this.psdkIndex = psdkIndex; |
||||
|
return this; |
||||
|
} |
||||
|
} |
@ -0,0 +1,104 @@ |
|||||
|
package org.dromara.common.sdk.cloudapi.psdk; |
||||
|
|
||||
|
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
import org.dromara.common.sdk.common.BaseModel; |
||||
|
|
||||
|
/** |
||||
|
* @auther wuyuan |
||||
|
* @data 2025/3/29 |
||||
|
*/ |
||||
|
public class PsdkSpeaker { |
||||
|
|
||||
|
@JsonProperty("play_file_md5") |
||||
|
private Integer playFileMd5; |
||||
|
|
||||
|
@JsonProperty("play_file_name") |
||||
|
private String playFileName; |
||||
|
|
||||
|
@JsonProperty("play_mode") |
||||
|
private String playMode; |
||||
|
|
||||
|
|
||||
|
|
||||
|
@JsonProperty("play_volume") |
||||
|
private Integer playVolume; |
||||
|
|
||||
|
@JsonProperty("system_state") |
||||
|
private Integer systemState; |
||||
|
|
||||
|
|
||||
|
@JsonProperty("work_mode") |
||||
|
private Integer workMode; |
||||
|
|
||||
|
|
||||
|
public PsdkSpeaker() { |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return "PsdkSpeaker{" + |
||||
|
"playFileMd5=" + playFileMd5 + |
||||
|
",playFileName=" + playFileName + |
||||
|
",playMode=" + playMode + |
||||
|
",playVolume=" + playVolume + |
||||
|
",systemState=" + systemState + |
||||
|
",workMode=" + workMode + |
||||
|
'}'; |
||||
|
} |
||||
|
|
||||
|
public Integer getPlayFileMd5() { |
||||
|
return playFileMd5; |
||||
|
} |
||||
|
|
||||
|
public PsdkSpeaker setPlayFileMd5(Integer playFileMd5) { |
||||
|
this.playFileMd5 = playFileMd5; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public String getPlayFileName() { |
||||
|
return playFileName; |
||||
|
} |
||||
|
|
||||
|
public PsdkSpeaker setPlayFileName(String playFileName) { |
||||
|
this.playFileName = playFileName; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public String getPlayMode() { |
||||
|
return playMode; |
||||
|
} |
||||
|
|
||||
|
public PsdkSpeaker setPlayMode(String playMode) { |
||||
|
this.playMode = playMode; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public Integer getPlayVolume() { |
||||
|
return playVolume; |
||||
|
} |
||||
|
|
||||
|
public PsdkSpeaker setPlayVolume(Integer playVolume) { |
||||
|
this.playVolume = playVolume; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public Integer getSystemState() { |
||||
|
return systemState; |
||||
|
} |
||||
|
|
||||
|
public PsdkSpeaker setSystemState(Integer systemState) { |
||||
|
this.systemState = systemState; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public Integer getWorkMode() { |
||||
|
return workMode; |
||||
|
} |
||||
|
|
||||
|
public PsdkSpeaker setWorkMode(Integer workMode) { |
||||
|
this.workMode = workMode; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
} |
@ -0,0 +1,56 @@ |
|||||
|
package org.dromara.common.sdk.cloudapi.psdk; |
||||
|
|
||||
|
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
import org.dromara.common.sdk.common.BaseModel; |
||||
|
|
||||
|
/** |
||||
|
* @auther wuyuan |
||||
|
* @data 2025/3/29 |
||||
|
*/ |
||||
|
public class PsdkUiResource { |
||||
|
|
||||
|
@JsonProperty("object_key") |
||||
|
private Integer objectKey; |
||||
|
|
||||
|
@JsonProperty("psdk_index") |
||||
|
private Integer psdkIndex; |
||||
|
|
||||
|
@JsonProperty("psdk_ready") |
||||
|
private Integer psdkReady; |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return "PsdkWidgetValue{" + |
||||
|
"objectKey=" + objectKey + |
||||
|
",psdkIndex=" + psdkIndex + |
||||
|
",psdkReady=" + psdkReady + |
||||
|
'}'; |
||||
|
} |
||||
|
|
||||
|
public Integer getObjectKey() { |
||||
|
return objectKey; |
||||
|
} |
||||
|
|
||||
|
public PsdkUiResource setObjectKey(Integer objectKey) { |
||||
|
this.objectKey = objectKey; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public Integer getPsdkIndex() { |
||||
|
return psdkIndex; |
||||
|
} |
||||
|
|
||||
|
public PsdkUiResource setPsdkIndex(Integer psdkIndex) { |
||||
|
this.psdkIndex = psdkIndex; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public Integer getPsdkReady() { |
||||
|
return psdkReady; |
||||
|
} |
||||
|
|
||||
|
public PsdkUiResource setPsdkReady(Integer psdkReady) { |
||||
|
this.psdkReady = psdkReady; |
||||
|
return this; |
||||
|
} |
||||
|
} |
@ -0,0 +1,119 @@ |
|||||
|
package org.dromara.common.sdk.cloudapi.psdk; |
||||
|
|
||||
|
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
import jakarta.validation.constraints.NotNull; |
||||
|
import org.dromara.common.sdk.cloudapi.device.DockSilentMode; |
||||
|
import org.dromara.common.sdk.cloudapi.property.SilentModeEnum; |
||||
|
import org.dromara.common.sdk.common.BaseModel; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* @auther wuyuan |
||||
|
* @data 2025/3/29 |
||||
|
*/ |
||||
|
public class PsdkWidgetValue { |
||||
|
|
||||
|
@JsonProperty("psdk_index") |
||||
|
private Integer psdkIndex; |
||||
|
|
||||
|
@JsonProperty("psdk_lib_version") |
||||
|
private String psdkLibVersion; |
||||
|
|
||||
|
@JsonProperty("psdk_name") |
||||
|
private String psdkName; |
||||
|
|
||||
|
|
||||
|
|
||||
|
@JsonProperty("psdk_type") |
||||
|
private String psdkType; |
||||
|
|
||||
|
@JsonProperty("psdk_version") |
||||
|
private String psdkVersion; |
||||
|
|
||||
|
@JsonProperty("psdk_speaker") |
||||
|
private PsdkSpeaker psdkSpeaker; |
||||
|
|
||||
|
@JsonProperty("values") |
||||
|
private List<String> values; |
||||
|
|
||||
|
public PsdkWidgetValue() { |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return "PsdkWidgetValue{" + |
||||
|
"psdkIndex=" + psdkIndex + |
||||
|
",psdkLibVersion=" + psdkLibVersion + |
||||
|
",psdkName=" + psdkName + |
||||
|
",psdkType=" + psdkType + |
||||
|
",psdkVersion=" + psdkVersion + |
||||
|
",psdkSpeaker=" + psdkSpeaker + |
||||
|
",values=" + values + |
||||
|
'}'; |
||||
|
} |
||||
|
|
||||
|
|
||||
|
public Integer getPsdkIndex() { |
||||
|
return psdkIndex; |
||||
|
} |
||||
|
|
||||
|
public PsdkWidgetValue setPsdkIndex(Integer psdkIndex) { |
||||
|
this.psdkIndex = psdkIndex; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public String getPsdkLibVersion() { |
||||
|
return psdkLibVersion; |
||||
|
} |
||||
|
|
||||
|
public PsdkWidgetValue setPsdkLibVersion(String psdkLibVersion) { |
||||
|
this.psdkLibVersion = psdkLibVersion; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public String getPsdkName() { |
||||
|
return psdkName; |
||||
|
} |
||||
|
|
||||
|
public PsdkWidgetValue setPsdkName(String psdkName) { |
||||
|
this.psdkName = psdkName; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public String getPsdkType() { |
||||
|
return psdkType; |
||||
|
} |
||||
|
|
||||
|
public PsdkWidgetValue setPsdkType(String psdkType) { |
||||
|
this.psdkType = psdkType; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public String getPsdkVersion() { |
||||
|
return psdkVersion; |
||||
|
} |
||||
|
|
||||
|
public PsdkWidgetValue setPsdkVersion(String psdkVersion) { |
||||
|
this.psdkVersion = psdkVersion; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public List<String> getValues() { |
||||
|
return values; |
||||
|
} |
||||
|
|
||||
|
public PsdkWidgetValue setValues(List<String> values) { |
||||
|
this.values = values; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
public PsdkSpeaker getPsdkSpeaker() { |
||||
|
return psdkSpeaker; |
||||
|
} |
||||
|
|
||||
|
public PsdkWidgetValue setPsdkSpeaker(PsdkSpeaker psdkSpeaker) { |
||||
|
this.psdkSpeaker = psdkSpeaker; |
||||
|
return this; |
||||
|
} |
||||
|
} |
@ -0,0 +1,39 @@ |
|||||
|
package org.dromara.common.sdk.cloudapi.psdk; |
||||
|
|
||||
|
import com.fasterxml.jackson.annotation.JsonProperty; |
||||
|
import jakarta.validation.constraints.NotNull; |
||||
|
import org.dromara.common.sdk.cloudapi.device.DockPayloadControlSource; |
||||
|
import org.dromara.common.sdk.cloudapi.device.DockSilentMode; |
||||
|
import org.dromara.common.sdk.cloudapi.property.SilentModeEnum; |
||||
|
import org.dromara.common.sdk.common.BaseModel; |
||||
|
|
||||
|
import java.util.List; |
||||
|
|
||||
|
/** |
||||
|
* @auther wuyuan |
||||
|
* @data 2025/3/29 |
||||
|
*/ |
||||
|
public class PsdkWidgetValues { |
||||
|
@JsonProperty("psdk_widget_values") |
||||
|
private List<PsdkWidgetValue> psdkWidgetValues; |
||||
|
|
||||
|
public PsdkWidgetValues() { |
||||
|
} |
||||
|
|
||||
|
@Override |
||||
|
public String toString() { |
||||
|
return "PsdkWidgetValues{" + |
||||
|
"psdkWidgetValues=" + psdkWidgetValues + |
||||
|
'}'; |
||||
|
} |
||||
|
|
||||
|
public List<PsdkWidgetValue> getPsdkWidgetValues() { |
||||
|
return psdkWidgetValues; |
||||
|
} |
||||
|
|
||||
|
public PsdkWidgetValues setPsdkWidgetValues(List<PsdkWidgetValue> psdkWidgetValues) { |
||||
|
this.psdkWidgetValues = psdkWidgetValues; |
||||
|
return this; |
||||
|
} |
||||
|
|
||||
|
} |
@ -0,0 +1,26 @@ |
|||||
|
package org.dromara.sample.manage.service.impl; |
||||
|
|
||||
|
import lombok.extern.slf4j.Slf4j; |
||||
|
import org.dromara.common.sdk.cloudapi.interconnection.CustomDataTransmissionFromEsdk; |
||||
|
import org.dromara.common.sdk.cloudapi.interconnection.PsdkFloatingWindowTextPsdk; |
||||
|
import org.dromara.common.sdk.cloudapi.interconnection.api.AbstractInterconnectionService; |
||||
|
import org.dromara.common.sdk.mqtt.events.TopicEventsRequest; |
||||
|
import org.springframework.messaging.MessageHeaders; |
||||
|
import org.springframework.stereotype.Service; |
||||
|
|
||||
|
/** |
||||
|
* @auther wuyuan |
||||
|
* @data 2025/3/29 |
||||
|
*/ |
||||
|
@Service |
||||
|
@Slf4j |
||||
|
public class InterconnectionService extends AbstractInterconnectionService { |
||||
|
|
||||
|
public void customDataTransmissionFromPsdk(TopicEventsRequest<CustomDataTransmissionFromEsdk> request, MessageHeaders headers) { |
||||
|
log.info(request.toString()); |
||||
|
} |
||||
|
|
||||
|
public void psdkFloatingWindowText(TopicEventsRequest<PsdkFloatingWindowTextPsdk> request, MessageHeaders headers) { |
||||
|
log.info(request.toString()); |
||||
|
} |
||||
|
} |
@ -1,25 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq; |
|
||||
|
|
||||
import com.ruoyi.common.security.annotation.EnableCustomConfig; |
|
||||
import com.ruoyi.common.security.annotation.EnableRyFeignClients; |
|
||||
import org.springframework.boot.SpringApplication; |
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication; |
|
||||
import org.springframework.scheduling.annotation.EnableAsync; |
|
||||
|
|
||||
/** |
|
||||
* 平台管理模块 |
|
||||
* |
|
||||
* @author ruoyi |
|
||||
*/ |
|
||||
@EnableCustomConfig |
|
||||
@EnableRyFeignClients |
|
||||
@SpringBootApplication |
|
||||
@EnableAsync |
|
||||
public class RocketMQApplication |
|
||||
{ |
|
||||
public static void main(String[] args) |
|
||||
{ |
|
||||
SpringApplication.run(RocketMQApplication.class, args); |
|
||||
System.out.println("(♥◠‿◠)ノ゙ RocketMQ模块启动成功 ლ(´ڡ`ლ)゙"); |
|
||||
} |
|
||||
} |
|
@ -1,64 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.config; |
|
||||
|
|
||||
import com.ruoyi.testrocketmq.consumer.RocketMsgListener; |
|
||||
import com.ruoyi.testrocketmq.enums.MessageCodeEnum; |
|
||||
import com.ruoyi.testrocketmq.model.ConsumerMode; |
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; |
|
||||
import org.apache.rocketmq.client.exception.MQClientException; |
|
||||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; |
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|
||||
import org.springframework.context.annotation.Bean; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
|
|
||||
/** |
|
||||
* 消费者配置 |
|
||||
*/ |
|
||||
@RefreshScope |
|
||||
@Configuration |
|
||||
@Slf4j |
|
||||
public class ConsumerConfig { |
|
||||
@Autowired |
|
||||
private ConsumerMode consumerMode; |
|
||||
|
|
||||
@Bean |
|
||||
public DefaultMQPushConsumer getRocketMQConsumer() throws MQClientException { |
|
||||
// ConsumerMode consumerMode = new ConsumerMode();
|
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerMode.getGroupName()); |
|
||||
consumer.setNamesrvAddr(consumerMode.getNamesrvAddr()); |
|
||||
consumer.setConsumeThreadMin(consumerMode.getConsumeThreadMin()); |
|
||||
consumer.setConsumeThreadMax(consumerMode.getConsumeThreadMax()); |
|
||||
consumer.registerMessageListener(new RocketMsgListener()); |
|
||||
/** |
|
||||
* 1. CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费 |
|
||||
* 2. CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费 |
|
||||
* 3. CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费 |
|
||||
* 以上所说的第一次启动是指从来没有消费过的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始 |
|
||||
*/ |
|
||||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); |
|
||||
/** |
|
||||
* CLUSTERING (集群模式) :默认模式,同一个ConsumerGroup(groupName相同)每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所 |
|
||||
* 订阅topic整体,从而达到负载均衡的目的 |
|
||||
* BROADCASTING (广播模式) :同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。 |
|
||||
* |
|
||||
*/ |
|
||||
// consumer.setMessageModel(MessageModel.BROADCASTING);
|
|
||||
|
|
||||
consumer.setVipChannelEnabled(false); |
|
||||
consumer.setConsumeMessageBatchMaxSize(consumerMode.getConsumeMessageBatchMaxSize()); |
|
||||
try { |
|
||||
/** |
|
||||
* 订阅topic,可以对指定消息进行过滤,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 |
|
||||
*/ |
|
||||
consumer.subscribe(MessageCodeEnum.ORDER_MESSAGE.getCode(),"*"); |
|
||||
consumer.subscribe(MessageCodeEnum.USER_MESSAGE.getCode(),"*"); |
|
||||
consumer.start(); |
|
||||
log.info("消费者初始化成功:{}", consumer.toString()); |
|
||||
} catch (MQClientException e) { |
|
||||
e.printStackTrace(); |
|
||||
log.error("消费者初始化失败:{}",e.getMessage()); |
|
||||
} |
|
||||
return consumer; |
|
||||
} |
|
||||
} |
|
@ -1,27 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.config; |
|
||||
|
|
||||
/** |
|
||||
* @author yz |
|
||||
*/ |
|
||||
public class MessageConfig { |
|
||||
private Class<?> messageClass; |
|
||||
private boolean orderlyMessage; |
|
||||
|
|
||||
public Class<?> getMessageClass() { |
|
||||
return messageClass; |
|
||||
} |
|
||||
|
|
||||
public void setMessageClass(Class<?> messageClass) { |
|
||||
this.messageClass = messageClass; |
|
||||
} |
|
||||
|
|
||||
public boolean isOrderlyMessage() { |
|
||||
return orderlyMessage; |
|
||||
} |
|
||||
|
|
||||
public void setOrderlyMessage(boolean orderlyMessage) { |
|
||||
this.orderlyMessage = orderlyMessage; |
|
||||
} |
|
||||
|
|
||||
|
|
||||
} |
|
@ -1,48 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.config; |
|
||||
|
|
||||
import com.ruoyi.testrocketmq.model.ProducerMode; |
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.rocketmq.client.exception.MQClientException; |
|
||||
import org.apache.rocketmq.client.producer.DefaultMQProducer; |
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||
import org.springframework.context.annotation.Bean; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
|
|
||||
|
|
||||
@Configuration |
|
||||
@Slf4j |
|
||||
public class ProducerConfig { |
|
||||
|
|
||||
public static DefaultMQProducer producer; |
|
||||
|
|
||||
@Autowired |
|
||||
private ProducerMode producerMode; |
|
||||
|
|
||||
|
|
||||
|
|
||||
@Bean |
|
||||
public DefaultMQProducer getRocketMQProducer() { |
|
||||
producer = new DefaultMQProducer(producerMode.getGroupName()); |
|
||||
producer.setNamesrvAddr(producerMode.getNamesrvAddr()); |
|
||||
//如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
|
|
||||
if(producerMode.getMaxMessageSize()!=null){ |
|
||||
producer.setMaxMessageSize(producerMode.getMaxMessageSize()); |
|
||||
} |
|
||||
if(producerMode.getSendMsgTimeout()!=null){ |
|
||||
producer.setSendMsgTimeout(producerMode.getSendMsgTimeout()); |
|
||||
} |
|
||||
//如果发送消息失败,设置重试次数,默认为2次
|
|
||||
if(producerMode.getRetryTimesWhenSendFailed()!=null){ |
|
||||
producer.setRetryTimesWhenSendFailed(producerMode.getRetryTimesWhenSendFailed()); |
|
||||
} |
|
||||
producer.setVipChannelEnabled(false); |
|
||||
try { |
|
||||
producer.start(); |
|
||||
log.info("生产者初始化成功:{}",producer.toString()); |
|
||||
} catch (MQClientException e) { |
|
||||
log.error("生产者初始化失败:{}",e.getMessage()); |
|
||||
} |
|
||||
return producer; |
|
||||
} |
|
||||
|
|
||||
} |
|
@ -1,100 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.consumer; |
|
||||
|
|
||||
import com.ruoyi.testrocketmq.enums.MessageCodeEnum; |
|
||||
import com.ruoyi.testrocketmq.producer.ConsumeException; |
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; |
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; |
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; |
|
||||
import org.apache.rocketmq.common.message.MessageExt; |
|
||||
import org.springframework.stereotype.Component; |
|
||||
import org.springframework.util.CollectionUtils; |
|
||||
|
|
||||
import java.io.UnsupportedEncodingException; |
|
||||
import java.text.DateFormat; |
|
||||
import java.text.SimpleDateFormat; |
|
||||
import java.util.Date; |
|
||||
import java.util.List; |
|
||||
|
|
||||
/** |
|
||||
* 消息监听 |
|
||||
*/ |
|
||||
@Slf4j |
|
||||
@Component |
|
||||
public class RocketMsgListener implements MessageListenerConcurrently { |
|
||||
|
|
||||
/** |
|
||||
* 消费消息 |
|
||||
* |
|
||||
* @param list msgs.size() >= 1 |
|
||||
* DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here |
|
||||
* 这里只设置为1,当设置为多个时,list中只要有一条消息消费失败,就会整体重试 |
|
||||
* @param consumeConcurrentlyContext 上下文信息 |
|
||||
* @return 消费状态 成功(CONSUME_SUCCESS)或者 重试 (RECONSUME_LATER) |
|
||||
*/ |
|
||||
@Override |
|
||||
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { |
|
||||
|
|
||||
if (!CollectionUtils.isEmpty(list)) { |
|
||||
for (MessageExt messageExt : list) { |
|
||||
// 消息内容
|
|
||||
String body = new String(messageExt.getBody()); |
|
||||
log.info("接受到的消息为:{}", body); |
|
||||
String tags = messageExt.getTags(); |
|
||||
String topic = messageExt.getTopic(); |
|
||||
String msgId = messageExt.getMsgId(); |
|
||||
String keys = messageExt.getKeys(); |
|
||||
int reConsume = messageExt.getReconsumeTimes(); |
|
||||
// 消息已经重试了3次,如果不需要再次消费,则返回成功
|
|
||||
if (reConsume == 3) { |
|
||||
// TODO 补偿信息
|
|
||||
//smsLogService.insertLog(topic, tags, msgId, keys, body, "【" + EnumUtil.getStrMsgByCode(tags, TagsCodeEnum.class) + "】消费失败");
|
|
||||
log.error("消息重试超过3次,消费失败!"); |
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|
||||
} |
|
||||
// 订单超时处理
|
|
||||
if (MessageCodeEnum.ORDER_MESSAGE.getCode().equals(topic)) { |
|
||||
if (MessageCodeEnum.ORDER_TIMEOUT_TAG.getCode().equals(tags)) { |
|
||||
// //获取订单
|
|
||||
// DealUserOrder dealUserOrder = pcRemoteDealUserOrderService.selectDealUserOrderByOrderNumber(keys);
|
|
||||
// if (dealUserOrder != null) {
|
|
||||
// //订单状态超时未支付关闭订单 处理
|
|
||||
// if (dealUserOrder.getStatus().equals("1")) {
|
|
||||
// DealUserOrder dealUserOrders = new DealUserOrder();
|
|
||||
// dealUserOrders.setOrderId(dealUserOrder.getOrderId());
|
|
||||
// dealUserOrders.setStatus("4");
|
|
||||
// pcRemoteDealUserOrderService.updateDealUserOrder(dealUserOrders);
|
|
||||
// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
||||
// }
|
|
||||
// log.info("Order does not exist.");
|
|
||||
// }
|
|
||||
log.info("Consumption success:" + body); |
|
||||
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
||||
log.info("Consumption time:{}", format.format(new Date())); |
|
||||
} else { |
|
||||
log.info("未匹配到Tag【{}】" + tags); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
// 消息消费成功
|
|
||||
//ConsumeConcurrentlyStatus.RECONSUME_LATER broker会根据设置的messageDelayLevel发起重试,默认16次
|
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* 异常处理 |
|
||||
* |
|
||||
* @param e 捕获的异常 |
|
||||
* @return 消息消费结果 |
|
||||
*/ |
|
||||
private static ConsumeConcurrentlyStatus handleException(final Exception e) { |
|
||||
Class exceptionClass = e.getClass(); |
|
||||
if (exceptionClass.equals(UnsupportedEncodingException.class)) { |
|
||||
log.error(e.getMessage()); |
|
||||
} else if (exceptionClass.equals(ConsumeException.class)) { |
|
||||
log.error(e.getMessage()); |
|
||||
} |
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER; |
|
||||
} |
|
||||
} |
|
@ -1,60 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.enums; |
|
||||
|
|
||||
|
|
||||
import lombok.Getter; |
|
||||
|
|
||||
@Getter |
|
||||
public enum MessageCodeEnum { |
|
||||
/** |
|
||||
* 消息模块主题 |
|
||||
*/ |
|
||||
MESSAGE_TOPIC("elink-message","消息服务模块topic名称"), |
|
||||
/** |
|
||||
* 系统消息 |
|
||||
*/ |
|
||||
NOTE_MESSAGE("system-message","系统消息服务模块topic名称"), |
|
||||
/** |
|
||||
* 用户消息 |
|
||||
*/ |
|
||||
USER_MESSAGE("user-message","用户消息服务模块topic名称"), |
|
||||
|
|
||||
/** |
|
||||
* 订单消息 |
|
||||
*/ |
|
||||
ORDER_MESSAGE("order-message","订单消息服务模块topic名称"), |
|
||||
|
|
||||
/** |
|
||||
* 平台编号 |
|
||||
*/ |
|
||||
USER_MESSAGE_TAG("user_message_tag","用户消息推送"), |
|
||||
NOTE_MESSAGE_TAG("system_message_tag","系统消息推送"), |
|
||||
ORDER_MESSAGE_TAG("order_message_tag","订单消息推送"), |
|
||||
|
|
||||
/** |
|
||||
* 订单处理编号 |
|
||||
*/ |
|
||||
//订单超时处理
|
|
||||
ORDER_TIMEOUT_TAG("order_timeout_tag","订单超时处理"); |
|
||||
|
|
||||
|
|
||||
private final String code; |
|
||||
private final String msg; |
|
||||
|
|
||||
MessageCodeEnum(String code, String msg){ |
|
||||
this.code = code; |
|
||||
this.msg = msg; |
|
||||
} |
|
||||
|
|
||||
public static String valuesOfType(String code) { |
|
||||
String value = ""; |
|
||||
for (MessageCodeEnum e : MessageCodeEnum.values()) { |
|
||||
if (code.equals(e.code)) { |
|
||||
value = e.msg; |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
return value; |
|
||||
} |
|
||||
|
|
||||
|
|
||||
} |
|
@ -1,22 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.model; |
|
||||
|
|
||||
import lombok.Data; |
|
||||
import org.springframework.beans.factory.annotation.Value; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
import org.springframework.stereotype.Component; |
|
||||
|
|
||||
@Data |
|
||||
@Configuration |
|
||||
@Component |
|
||||
public class ConsumerMode { |
|
||||
@Value("${suning.rocketmq.namesrvAddr}") |
|
||||
private String namesrvAddr; |
|
||||
@Value("${suning.rocketmq.conumer.groupName}") |
|
||||
private String groupName ; |
|
||||
@Value("${suning.rocketmq.conumer.consumeThreadMin}") |
|
||||
private int consumeThreadMin; |
|
||||
@Value("${suning.rocketmq.conumer.consumeThreadMax}") |
|
||||
private int consumeThreadMax; |
|
||||
@Value("${suning.rocketmq.conumer.consumeMessageBatchMaxSize}") |
|
||||
private int consumeMessageBatchMaxSize; |
|
||||
} |
|
@ -1,25 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.model; |
|
||||
|
|
||||
import lombok.Data; |
|
||||
import org.springframework.beans.factory.annotation.Value; |
|
||||
import org.springframework.cloud.context.config.annotation.RefreshScope; |
|
||||
import org.springframework.context.annotation.Configuration; |
|
||||
|
|
||||
/** |
|
||||
* 生产者初始化 |
|
||||
*/ |
|
||||
@RefreshScope |
|
||||
@Data |
|
||||
@Configuration |
|
||||
public class ProducerMode { |
|
||||
@Value("${suning.rocketmq.producer.groupName}") |
|
||||
private String groupName; |
|
||||
@Value("${suning.rocketmq.namesrvAddr}") |
|
||||
private String namesrvAddr; |
|
||||
@Value("${suning.rocketmq.producer.maxMessageSize}") |
|
||||
private Integer maxMessageSize; |
|
||||
@Value("${suning.rocketmq.producer.sendMsgTimeout}") |
|
||||
private Integer sendMsgTimeout; |
|
||||
@Value("${suning.rocketmq.producer.retryTimesWhenSendFailed}") |
|
||||
private Integer retryTimesWhenSendFailed; |
|
||||
} |
|
@ -1,47 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.producer; |
|
||||
|
|
||||
import com.ruoyi.testrocketmq.config.ProducerConfig; |
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
|
||||
|
|
||||
public class AsyncProducer { |
|
||||
|
|
||||
@Autowired |
|
||||
private ProducerConfig producerConfig; |
|
||||
|
|
||||
/** |
|
||||
* 发送异步的消息 |
|
||||
* @param topic 主题 |
|
||||
* @param tag 标签 |
|
||||
* @param key 自定义的key,根据业务来定 |
|
||||
* @param value 消息的内容 |
|
||||
* @return org.apache.rocketmq.client.producer.SendResult |
|
||||
*/ |
|
||||
// public SendResult sendAsyncProducerMessage(String topic, String tag, String key, String value) throws UnsupportedEncodingException {
|
|
||||
//
|
|
||||
// try {
|
|
||||
// DefaultMQProducer defaultMQProducer = producerConfig.producer;
|
|
||||
// //Create a message instance, specifying topic, tag and message body.
|
|
||||
// Message msg = new Message(topic, tag, key,value.getBytes(RemotingHelper.DEFAULT_CHARSET));
|
|
||||
// defaultMQProducer.send(msg, new SendCallback() {
|
|
||||
// // 异步回调的处理
|
|
||||
// @Override
|
|
||||
// public void onSuccess(SendResult sendResult) {
|
|
||||
// System.out.printf("%-10d 异步发送消息成功 %s %n", msg, sendResult.getMsgId());
|
|
||||
// }
|
|
||||
//
|
|
||||
// @Override
|
|
||||
// public void onException(Throwable e) {
|
|
||||
// System.out.printf("%-10d 异步发送消息失败 %s %n", msg, e);
|
|
||||
// e.printStackTrace();
|
|
||||
// }
|
|
||||
// });
|
|
||||
// } catch (MQClientException e) {
|
|
||||
// e.printStackTrace();
|
|
||||
// } catch (RemotingException e) {
|
|
||||
// e.printStackTrace();
|
|
||||
// } catch (InterruptedException e) {
|
|
||||
// e.printStackTrace();
|
|
||||
// }
|
|
||||
// return null;
|
|
||||
// }
|
|
||||
} |
|
@ -1,20 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.producer; |
|
||||
|
|
||||
/** |
|
||||
* @author 影子 |
|
||||
*/ |
|
||||
public class ConsumeException extends RuntimeException{ |
|
||||
private static final long serialVersionUID = 4093867789628938836L; |
|
||||
|
|
||||
public ConsumeException(String message) { |
|
||||
super(message); |
|
||||
} |
|
||||
|
|
||||
public ConsumeException(Throwable cause) { |
|
||||
super(cause); |
|
||||
} |
|
||||
|
|
||||
public ConsumeException(String message, Throwable cause) { |
|
||||
super(message, cause); |
|
||||
} |
|
||||
} |
|
@ -1,34 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.producer; |
|
||||
|
|
||||
|
|
||||
|
|
||||
import lombok.Data; |
|
||||
import lombok.ToString; |
|
||||
import org.apache.rocketmq.common.message.MessageExt; |
|
||||
import org.apache.rocketmq.common.message.MessageQueue; |
|
||||
|
|
||||
/** |
|
||||
* 消费时,当前所消费的消息的上下文信息 |
|
||||
* |
|
||||
* @author jolly |
|
||||
*/ |
|
||||
@ToString |
|
||||
@Data |
|
||||
public final class MessageContext { |
|
||||
|
|
||||
/** |
|
||||
* 所消费消息所在的消息队列 |
|
||||
* |
|
||||
* @see MessageQueue |
|
||||
*/ |
|
||||
private MessageQueue messageQueue; |
|
||||
|
|
||||
/** |
|
||||
* 所消费的消息的扩展属性 |
|
||||
* |
|
||||
* @see MessageExt |
|
||||
*/ |
|
||||
private MessageExt messageExt; |
|
||||
|
|
||||
|
|
||||
} |
|
@ -1,110 +0,0 @@ |
|||||
package com.ruoyi.testrocketmq.producer; |
|
||||
|
|
||||
import com.alibaba.fastjson.JSON; |
|
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.rocketmq.client.exception.MQBrokerException; |
|
||||
import org.apache.rocketmq.client.exception.MQClientException; |
|
||||
import org.apache.rocketmq.client.producer.SendResult; |
|
||||
import org.apache.rocketmq.common.message.Message; |
|
||||
import org.apache.rocketmq.remoting.common.RemotingHelper; |
|
||||
import org.apache.rocketmq.remoting.exception.RemotingException; |
|
||||
|
|
||||
import java.io.UnsupportedEncodingException; |
|
||||
import java.text.DateFormat; |
|
||||
import java.text.SimpleDateFormat; |
|
||||
import java.util.Date; |
|
||||
import java.util.List; |
|
||||
|
|
||||
import static com.ruoyi.rocketmq.config.ProducerConfig.producer; |
|
||||
|
|
||||
/** |
|
||||
* 消息发送 |
|
||||
*/ |
|
||||
@Slf4j |
|
||||
public class MessageProducer { |
|
||||
|
|
||||
|
|
||||
/** |
|
||||
* 同步发送消息 |
|
||||
* @param topic 主题 |
|
||||
* @param tag 标签 |
|
||||
* @param key 自定义的key,根据业务来定 |
|
||||
* @param value 消息的内容 |
|
||||
* @return org.apache.rocketmq.client.producer.SendResult |
|
||||
*/ |
|
||||
public SendResult sendSynchronizeMessage(String topic, String tag, String key, String value){ |
|
||||
String body = "topic:【"+topic+"】, tag:【"+tag+"】, key:【"+key+"】, value:【"+value+"】"; |
|
||||
try { |
|
||||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|
||||
System.out.println("生产者发送消息:"+ JSON.toJSONString(value)); |
|
||||
SendResult result = producer.send(msg); |
|
||||
return result; |
|
||||
} catch (UnsupportedEncodingException e) { |
|
||||
log.error("消息初始化失败!body:{}",body); |
|
||||
|
|
||||
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) { |
|
||||
log.error("消息发送失败! body:{}",body); |
|
||||
} |
|
||||
return null; |
|
||||
} |
|
||||
|
|
||||
|
|
||||
|
|
||||
/** |
|
||||
* 发送有序的消息 |
|
||||
* @param messagesList Message集合 |
|
||||
* @param messageQueueNumber 消息队列编号 |
|
||||
* @return org.apache.rocketmq.client.producer.SendResult |
|
||||
*/ |
|
||||
public SendResult sendOrderlyMessage(List<Message> messagesList, int messageQueueNumber) { |
|
||||
SendResult result = null; |
|
||||
for (Message message : messagesList) { |
|
||||
try { |
|
||||
// DefaultMQProducer defaultMQProducer = ProducerConfig.producer.send(message);
|
|
||||
// System.out.println(defaultMQProducer);
|
|
||||
result = producer.send(message, (list, msg, arg) -> { |
|
||||
Integer queueNumber = (Integer) arg; |
|
||||
return list.get(queueNumber); |
|
||||
}, messageQueueNumber); |
|
||||
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { |
|
||||
log.error("发送有序消息失败"); |
|
||||
return result; |
|
||||
} |
|
||||
} |
|
||||
return result; |
|
||||
} |
|
||||
|
|
||||
/** |
|
||||
* 推送延迟消息 |
|
||||
* @param topic |
|
||||
* @param tag |
|
||||
* @param key |
|
||||
* @return boolean |
|
||||
*/ |
|
||||
public SendResult sendDelayMessage(String topic, String tag, String key, String value) |
|
||||
{ |
|
||||
SendResult result = null; |
|
||||
try |
|
||||
{ |
|
||||
Message msg = new Message(topic,tag,key, value.getBytes(RemotingHelper.DEFAULT_CHARSET)); |
|
||||
//设置消息延迟级别,我这里设置5,对应就是延时一分钟
|
|
||||
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
|
|
||||
msg.setDelayTimeLevel(4); |
|
||||
// 发送消息到一个Broker
|
|
||||
result = producer.send(msg); |
|
||||
// 通过sendResult返回消息是否成功送达
|
|
||||
log.info("发送延迟消息结果:======sendResult:{}", result); |
|
||||
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
||||
log.info("发送时间:{}", format.format(new Date())); |
|
||||
return result; |
|
||||
} |
|
||||
catch (Exception e) |
|
||||
{ |
|
||||
e.printStackTrace(); |
|
||||
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), result); |
|
||||
} |
|
||||
return result; |
|
||||
} |
|
||||
|
|
||||
|
|
||||
} |
|
Loading…
Reference in new issue