Spring cloud 整合Sentinel 实现redis缓存限流规则(最新二)

作者:青山常在人不老   阅读 (3577)  |  收藏 (0)  |  点赞 (0)

摘要

Spring cloud gateway 整合 sentinel 做限流和熔断,同时将规则缓存到Redis中最新教程,本文为作者结合最新的Sentinel v1.7.2整合的Redis持久化Sentinel限流规则、用redis持久化Sentinel规则的教程。


原文链接:Spring cloud 整合Sentinel 实现redis缓存限流规则(最新二)

书接上文,Srping cloud 整合Sentinel 实现redis缓存限流规则(最新一)继续ing...

/*
 * Copyright 1999-2018 Alibaba Group Holding Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.csp.sentinel.dashboard.controller;

import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.alibaba.csp.sentinel.dashboard.client.CommandFailedException;
import com.alibaba.csp.sentinel.dashboard.rule.DynamicRuleProvider;
import com.alibaba.csp.sentinel.dashboard.rule.DynamicRulePublisher;
import com.alibaba.csp.sentinel.dashboard.util.AsyncUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.csp.sentinel.dashboard.auth.AuthAction;
import com.alibaba.csp.sentinel.dashboard.auth.AuthService.PrivilegeType;
import com.alibaba.csp.sentinel.dashboard.client.SentinelApiClient;
import com.alibaba.csp.sentinel.dashboard.datasource.entity.rule.FlowRuleEntity;
import com.alibaba.csp.sentinel.dashboard.discovery.MachineInfo;
import com.alibaba.csp.sentinel.dashboard.domain.Result;
import com.alibaba.csp.sentinel.dashboard.repository.rule.InMemoryRuleRepositoryAdapter;
import com.alibaba.csp.sentinel.util.StringUtil;

/**
 * Flow rule controller.
 *
 * @author leyou
 * @author Eric Zhao
 */
@RestController
@RequestMapping(value = "/v1/flow")
public class FlowControllerV1 {

    private final Logger logger = LoggerFactory.getLogger(FlowControllerV1.class);


    /************ add by custorm *******************/
    @Autowired
    @Qualifier("flowRuleRedisProvider")
    private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;
    @Autowired
    @Qualifier("flowRuleRedisPublisher")
    private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;
    /************ add by custorm *******************/

    @Autowired
    private InMemoryRuleRepositoryAdapter<FlowRuleEntity> repository;

    @Autowired
    private SentinelApiClient sentinelApiClient;
    
    @GetMapping("/rules")
    @AuthAction(PrivilegeType.READ_RULE)
    public Result<List<FlowRuleEntity>> apiQueryMachineRules(@RequestParam String app, @RequestParam String ip,
                                                             @RequestParam Integer port)
    {

        if (StringUtil.isEmpty(app)) {
            return Result.ofFail(-1, "app can't be null or empty");
        }
//        if (StringUtil.isEmpty(ip)) {
//            return Result.ofFail(-1, "ip can't be null or empty");
//        }
//        if (port == null) {
//            return Result.ofFail(-1, "port can't be null");
//        }
        try {
            List<FlowRuleEntity> rules = ruleProvider.getRules(app);
            if (rules != null && !rules.isEmpty()) {
                for (FlowRuleEntity entity : rules) {
                    entity.setApp(app);
                    if (entity.getClusterConfig() != null && entity.getClusterConfig().getFlowId() != null) {
                        entity.setId(entity.getClusterConfig().getFlowId());
                    }
                }
            }
            rules = repository.saveAll(rules);
            return Result.ofSuccess(rules);
        } catch (Throwable throwable) {
            logger.error("Error when querying flow rules", throwable);
            return Result.ofThrowable(-1, throwable);
        }
    }

    private <R> Result<R> checkEntityInternal(FlowRuleEntity entity) {
        if (entity == null) {
            return Result.ofFail(-1, "invalid body");
        }
        if (StringUtil.isBlank(entity.getApp())) {
            return Result.ofFail(-1, "app can't be null or empty");
        }
//        if (StringUtil.isBlank(entity.getIp())) {
//            return Result.ofFail(-1, "ip can't be null or empty");
//        }
//        if (entity.getPort() == null) {
//            return Result.ofFail(-1, "port can't be null");
//        }
        if (StringUtil.isBlank(entity.getLimitApp())) {
            return Result.ofFail(-1, "limitApp can't be null or empty");
        }
        if (StringUtil.isBlank(entity.getResource())) {
            return Result.ofFail(-1, "resource can't be null or empty");
        }
        if (entity.getGrade() == null) {
            return Result.ofFail(-1, "grade can't be null");
        }
        if (entity.getGrade() != 0 && entity.getGrade() != 1) {
            return Result.ofFail(-1, "grade must be 0 or 1, but " + entity.getGrade() + " got");
        }
        if (entity.getCount() == null || entity.getCount() < 0) {
            return Result.ofFail(-1, "count should be at lease zero");
        }
        if (entity.getStrategy() == null) {
            return Result.ofFail(-1, "strategy can't be null");
        }
        if (entity.getStrategy() != 0 && StringUtil.isBlank(entity.getRefResource())) {
            return Result.ofFail(-1, "refResource can't be null or empty when strategy!=0");
        }
        if (entity.getControlBehavior() == null) {
            return Result.ofFail(-1, "controlBehavior can't be null");
        }
        int controlBehavior = entity.getControlBehavior();
        if (controlBehavior == 1 && entity.getWarmUpPeriodSec() == null) {
            return Result.ofFail(-1, "warmUpPeriodSec can't be null when controlBehavior==1");
        }
        if (controlBehavior == 2 && entity.getMaxQueueingTimeMs() == null) {
            return Result.ofFail(-1, "maxQueueingTimeMs can't be null when controlBehavior==2");
        }
        if (entity.isClusterMode() && entity.getClusterConfig() == null) {
            return Result.ofFail(-1, "cluster config should be valid");
        }
        return null;
    }

    @PostMapping("/rule")
    @AuthAction(PrivilegeType.WRITE_RULE)
    public Result<FlowRuleEntity> apiAddFlowRule(@RequestBody FlowRuleEntity entity) {
        Result<FlowRuleEntity> checkResult = checkEntityInternal(entity);
        if (checkResult != null) {
            return checkResult;
        }
        entity.setId(null);
        Date date = new Date();
        entity.setGmtCreate(date);
        entity.setGmtModified(date);
        entity.setLimitApp(entity.getLimitApp().trim());
        entity.setResource(entity.getResource().trim());
        try {
            entity = repository.save(entity);

            publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
            return Result.ofSuccess(entity);
        } catch (Throwable t) {
            Throwable e = t instanceof ExecutionException ? t.getCause() : t;
            logger.error("Failed to add new flow rule, app={}, ip={}", entity.getApp(), entity.getIp(), e);
            return Result.ofFail(-1, e.getMessage());
        }
    }

    @PutMapping("/save.json")
    @AuthAction(PrivilegeType.WRITE_RULE)
    public Result<FlowRuleEntity> apiUpdateFlowRule(Long id, String app,
                                                  String limitApp, String resource, Integer grade,
                                                  Double count, Integer strategy, String refResource,
                                                  Integer controlBehavior, Integer warmUpPeriodSec,
                                                  Integer maxQueueingTimeMs) {
        if (id == null) {
            return Result.ofFail(-1, "id can't be null");
        }
        FlowRuleEntity entity = repository.findById(id);
        if (entity == null) {
            return Result.ofFail(-1, "id " + id + " dose not exist");
        }
        if (StringUtil.isNotBlank(app)) {
            entity.setApp(app.trim());
        }
        if (StringUtil.isNotBlank(limitApp)) {
            entity.setLimitApp(limitApp.trim());
        }
        if (StringUtil.isNotBlank(resource)) {
            entity.setResource(resource.trim());
        }
        if (grade != null) {
            if (grade != 0 && grade != 1) {
                return Result.ofFail(-1, "grade must be 0 or 1, but " + grade + " got");
            }
            entity.setGrade(grade);
        }
        if (count != null) {
            entity.setCount(count);
        }
        if (strategy != null) {
            if (strategy != 0 && strategy != 1 && strategy != 2) {
                return Result.ofFail(-1, "strategy must be in [0, 1, 2], but " + strategy + " got");
            }
            entity.setStrategy(strategy);
            if (strategy != 0) {
                if (StringUtil.isBlank(refResource)) {
                    return Result.ofFail(-1, "refResource can't be null or empty when strategy!=0");
                }
                entity.setRefResource(refResource.trim());
            }
        }
        if (controlBehavior != null) {
            if (controlBehavior != 0 && controlBehavior != 1 && controlBehavior != 2) {
                return Result.ofFail(-1, "controlBehavior must be in [0, 1, 2], but " + controlBehavior + " got");
            }
            if (controlBehavior == 1 && warmUpPeriodSec == null) {
                return Result.ofFail(-1, "warmUpPeriodSec can't be null when controlBehavior==1");
            }
            if (controlBehavior == 2 && maxQueueingTimeMs == null) {
                return Result.ofFail(-1, "maxQueueingTimeMs can't be null when controlBehavior==2");
            }
            entity.setControlBehavior(controlBehavior);
            if (warmUpPeriodSec != null) {
                entity.setWarmUpPeriodSec(warmUpPeriodSec);
            }
            if (maxQueueingTimeMs != null) {
                entity.setMaxQueueingTimeMs(maxQueueingTimeMs);
            }
        }
        Date date = new Date();
        entity.setGmtModified(date);
        try {
            entity = repository.save(entity);
            if (entity == null) {
                return Result.ofFail(-1, "save entity fail: null");
            }

            publishRules(entity.getApp(), entity.getIp(), entity.getPort()).get(5000, TimeUnit.MILLISECONDS);
            return Result.ofSuccess(entity);
        } catch (Throwable t) {
            Throwable e = t instanceof ExecutionException ? t.getCause() : t;
            logger.error("Error when updating flow rules, app={}, ip={}, ruleId={}", entity.getApp(),
                entity.getIp(), id, e);
            return Result.ofFail(-1, e.getMessage());
        }
    }

    @DeleteMapping("/delete.json")
    @AuthAction(PrivilegeType.WRITE_RULE)
    public Result<Long> apiDeleteFlowRule(Long id) {

        if (id == null) {
            return Result.ofFail(-1, "id can't be null");
        }
        FlowRuleEntity oldEntity = repository.findById(id);
        if (oldEntity == null) {
            return Result.ofSuccess(null);
        }

        try {
            repository.delete(id);
        } catch (Exception e) {
            return Result.ofFail(-1, e.getMessage());
        }
        try {
            publishRules(oldEntity.getApp(), oldEntity.getIp(), oldEntity.getPort()).get(5000, TimeUnit.MILLISECONDS);
            return Result.ofSuccess(id);
        } catch (Throwable t) {
            Throwable e = t instanceof ExecutionException ? t.getCause() : t;
            logger.error("Error when deleting flow rules, app={}, ip={}, id={}", oldEntity.getApp(),
                oldEntity.getIp(), id, e);
            return Result.ofFail(-1, e.getMessage());
        }
    }

    private CompletableFuture<Void> publishRules(String app, String ip, Integer port)
    {
        List<FlowRuleEntity> rules = repository.findAllByApp(app);
        try {
            rulePublisher.publish(app, rules);
            logger.info("添加限流规则成功{}");
        } catch (Exception e) {
            e.printStackTrace();
            logger.warn("publishRules failed", e);
            return AsyncUtils.newFailedFuture(new CommandFailedException("Sentinel 推送规则到Redis失败>>>>>>>>>>>>>>>>>>>>>>>>"));
        }
        //核心代码,sentinel-dashboard通过http的形式进行数据推送,客户端接收后将规则保存在本地内存中
        return sentinelApiClient.setFlowRuleOfMachineAsync(app, ip, port, rules);
    }
}

在这个类中,我做了以下修改:

  • 引入了以下依赖

    /************ add by custorm *******************/
    @Autowired
    @Qualifier("flowRuleRedisProvider")
    private DynamicRuleProvider<List<FlowRuleEntity>> ruleProvider;
    @Autowired
    @Qualifier("flowRuleRedisPublisher")
    private DynamicRulePublisher<List<FlowRuleEntity>> rulePublisher;
    /************ add by custorm *******************/

这两个就是上文中创建的Redis推送和拉取的类

  • 修改apiQueryMachineRules方法

  • 修改publishRules方法

这两出很重要,修改的时候需要重点关注,本类中其他的修改项不多,可以直接使用比对工具参考修改(如果您使用的是Sentinel 1.7.2,可以直接覆盖)。

至此,sentinel dashboard端的源码就修改完了(你可以执行mvn install 把这个源码编译成jar,然后运行方式和官方推荐方式相同,如果您不知道如何运行,请参考Spring cloud gateway 整合 sentinel 做限流和熔断(最新)解决。

博文长度受限了,请看下篇文章,继续客户端

客户端修改(你自己的项目)

这里说明下,往上很多资料写的乱七八糟,啥样的都有,更坑的是官方的文档写的也是很烂(要自己继承AbstractDataSource类,其实不用,因为最新的依赖中已经有一个实现类了,完全可以拿来用)

对于客户端,你需要做的是:

  • application.yml(增加sentinel和Redis配置)

  • pom.xml(增加redis和sentinel的依赖)

  • RedisDataSourceConfig(新增,应用启动增加Sentinel 和 redis 监听)

application.yml

请在此文件新增您的redis和sentinel配置,示例如下:

spring:
  cloud:
    sentinel:
      transport: #dashboard地址
        dashboard: localhost:8080  #对应自己的sentinel控制台端口
        port: 8719  #您的项目和sentinel交互使用的端口
    redis:
      database: 0
      host: 127.0.0.1
      port: 6379
      password:
      #jedis:
      lettuce:
        pool:
          max-active: 8
          max-idle: 8
          min-wait: 0
          timeout: 5000

配置这块不多说,redis客户端在于你自己的选择,sentinel配置请按照你自己的sentinel地址来配置

pom.xml

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.csp</groupId>
	<artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba.csp</groupId>
	<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba.csp</groupId>
	<artifactId>sentinel-transport-simple-http</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

如果您发现无法依赖alibaba.cloud的jar(俗话说的pom报错),请参考配置Spring cloud 整合 alibaba.cloud pom依赖报错的解决办法

RedisDataSourceConfig.java

这里是唯一新增的类(类的位置你想放哪就放那,我不管,只要你老板不打你)

package cn.com.xxxx.sentinel;
 
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import com.alibaba.csp.sentinel.datasource.Converter;
import com.alibaba.csp.sentinel.datasource.ReadableDataSource;
import com.alibaba.csp.sentinel.datasource.redis.RedisDataSource;
import com.alibaba.csp.sentinel.datasource.redis.config.RedisConnectionConfig;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
 
/**
 * @project: xxx
 * @description: 客户端在启动时初始化Redis中规则
 * @version 1.0.0
 * @errorcode
 *            错误码: 错误描述
 * @author
 *         <li>2020-07-17 guopengfei@bobfintech.com.cn Create 1.0
 * @copyright ©2019-2020 xxxx,版权所有。
 */
@Component
public class RedisDataSourceConfig implements ApplicationRunner {
    private static final Logger log = LoggerFactory.getLogger(RedisDataSourceConfig.class);
 
    @Value("${spring.redis.host}")
    public String redisHost;
 
    @Value("${spring.redis.port}")
    public int redisPort;
 
    @Value("${spring.redis.password}")
    public String redisPass;
    
    @Value("${spring.redis.database}")
    public Integer database;
 
    //限流规则key前缀
    public final String RULE_FLOW = "sentinel_rule_flow_";
    public final String RULE_FLOW_CHANNEL = "sentinel_rule_flow_channel";
 
    //降级规则key前缀
    // public final String RULE_DEGRADE = "sentinel_rule_degrade_";
    // public final String RULE_DEGRADE_CHANNEL = "sentinel_rule_degrade_channel";
 
    //系统规则key前缀
    // public final String RULE_SYSTEM = "sentinel_rule_system_";
    // public final String RULE_SYSTEM_CHANNEL = "sentinel_rule_system_channel";
 
    /**
     * ApplicationRunner
     * 该接口的方法会在服务启动之后被立即执行
     * 主要用来做一些初始化的工作
     * 但是该方法的运行是在SpringApplication.run(…) 执行完毕之前执行
     */
    @Override
    public void run(ApplicationArguments args) {
        log.info("执行sentinel规则初始化 start >>>>>>>>>>>>>");
        RedisConnectionConfig config = RedisConnectionConfig.builder().withHost(redisHost).withPort(redisPort)
            .withPassword(redisPass).withDatabase(database).build();
        Converter<String, List<FlowRule>> parser = source -> JSON.parseObject(source,
            new TypeReference<List<FlowRule>>()
        {});
        
        ReadableDataSource<String, List<FlowRule>> redisDataSource = new RedisDataSource<>(config, RULE_FLOW,
            RULE_FLOW_CHANNEL, parser);
        FlowRuleManager.register2Property(redisDataSource.getProperty());
        log.info("执行sentinel规则初始化 end >>>>>>>>>>>>>");
 
        // Converter<String, List<DegradeRule>> parserDegrade = source -> JSON.parseObject(source,
        // new TypeReference<List<DegradeRule>>() {
        // });
        // ReadableDataSource<String, List<DegradeRule>> redisDataSourceDegrade = new
        // RedisDataSource<>(config, RULE_DEGRADE + SentinelConfig.getAppName(),
        // RULE_DEGRADE_CHANNEL, parserDegrade);
        // DegradeRuleManager.register2Property(redisDataSourceDegrade.getProperty());
 
        // Converter<String, List<SystemRule>> parserSystem = source -> JSON.parseObject(source, new
        // TypeReference<List<SystemRule>>() {
        // });
        // ReadableDataSource<String, List<SystemRule>> redisDataSourceSystem = new
        // RedisDataSource<>(config, RULE_SYSTEM + SentinelConfig.getAppName(), RULE_SYSTEM_CHANNEL,
        // parserSystem);
        // SystemRuleManager.register2Property(redisDataSourceSystem.getProperty());
        log.info(">>>>>>>>>执行sentinel规则初始化 end。。。");
    }
    
}

这个类没有什么东西,最主要的代码就是初始化一个RedisConnectionConfig,并监听redis中的规则变化情况(注意,注释中的部分分别是熔断规则监听和数据规则监听,他们在sentinel dashboard中的修改本文没涉及,但是完全和限流规则使用方法完全一样,请自己实现),如果您有需要,可以放开此部分注释。

需要注意的是,这个类中引用的redis限流的KEY必须和sentinel dashboard中配置的一致,否则(呵呵,等自己骂娘吧)。

至此,客户端配置完成,分别启动自己的应用和编译好的sintinel dashboard项目,就能正常给你的项目添加限流配置,同时在redis中也能看到规则信息,并且重启您的项目后,sentinel dashboard也能加载出redis中缓存到规则(注意:此处会有一定的延迟,因为sentinel dashboard 需要定时任务去轮询redis中的规则,虽然不想放图毒害大家,但是还是要放上一两张证明我确实搞通了。

senti

redis中缓存到规则:

Sentinel 结合Redis持久化限流规则和熔断规则的最新教程


应用被限流成功,返回如下内容

Sentinel 结合Redis持久化限流规则和熔断规则的最新教程

最后一句,写文章不易,尤其不是想那些抄来抄去的文章,作者经历了各种坑,才能给大家分享出这个文章,如果您觉得对您有帮助,还请您把这个文章分享给更多的朋友,让他们少走弯路(记得加上本文的链接哦)。




分类   Spring boot 开发
字数   19247

博客标签    sentinel 持久化redis  

评论