前言

上一篇分析了sentinel的使用及slotChain的创建过程,对于每一个Slot都有对应的作用,本篇主要学习关于StatisticSlot如果做实时数据统计,以便后面进行各种场景的流控。

数据统计

image-1671116411414

图来自sentinel架构图的一部分,sentinel数据统计部分采用滑动时间窗口优化算法

算法分析

1、固定时间窗口限流
image-1671149430599
问题:无法控制窗口边界突发流量

2、滑动时间窗口限流
image-1671149449002

问题:两个请求可能需要重复统计相交部分,如下图所示:
image-1671149477338

3、滑动时间窗口改进
image-1671149492646
思考:与其说改进不如说是一种取舍,减少重复统计,也可能牺牲掉严格滑动窗口的精准度。当样本大小为1时就是固定时间窗口,样本大小为请求阈值时,就是滑动时间窗口。sentinel默认是2个样本窗口(500ms一个样本窗口长度),为了可以使用更少的内存及计算和更好的性能。


源码分析

统计核心类:

// com.alibaba.csp.sentinel.node.StatisticNode

/**
 * 秒级别的统计数据,注意不是每秒的数量,而是单位为秒
 */
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
    IntervalProperty.INTERVAL);

/**
 * 分钟级别的统计数据
 */
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

private LongAdder curThreadNum = new LongAdder();

通过ArrayMetric进行统计,ArrayMetric 为 Metric 的实现:

// com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric


/**
 * 包括了一个LeapArray对象,泛型为MetricBucket,用于记录样本窗口数据
 *
 * 注意类名字是Array结尾,但它不是一个数组,是该类内部有一个数组的成员变量
 */
private final LeapArray<MetricBucket> data;

/**
 * 构造函数
 * @param sampleCount:样本窗口数量,默认2
 * @param intervalInMs:总时间间隔,默认1000ms
 */
public ArrayMetric(int sampleCount, int intervalInMs) {
    this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
// com.alibaba.csp.sentinel.slots.statistic.base.LeapArray

/**
 * 样本窗口长度,时间毫秒,默认 1000/2 = 500毫秒
 */
protected int windowLengthInMs;

/**
 * 样本窗口数量,默认2,越大精准度越高
 */
protected int sampleCount;

/**
 * 毫秒级别,总时间间隔,默认1000ms,越小精准度越高
 */
protected int intervalInMs;

/**
 * 秒级别,总时间间隔:intervalInMs / 1000.0,限流时用于计算QPS
 */
private double intervalInSecond;

/**
 * 真正的数组,所有的样本窗口组成的
 */
protected final AtomicReferenceArray<WindowWrap<T>> array;


/**
 * 构造函数,初始化上面的数据
 * @param sampleCount:样本窗口数量,默认2
 * @param intervalInMs:毫秒级别时间,默认1000ms
 */
public LeapArray(int sampleCount, int intervalInMs) {
    AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
    AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
    AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

    this.windowLengthInMs = intervalInMs / sampleCount;
    this.intervalInMs = intervalInMs;
    this.intervalInSecond = intervalInMs / 1000.0;
    this.sampleCount = sampleCount;

    // 数组长度为样本窗口数量
    this.array = new AtomicReferenceArray<>(sampleCount);
}

样本窗口定义:

// com.alibaba.csp.sentinel.slots.statistic.base.WindowWrap

/**
 * 样本窗口的长度,毫秒,默认500ms
 */
private final long windowLengthInMs;

/**
 * 窗口的开始时间,毫秒,动态的
 */
private long windowStart;

/**
 * 数据,范型类:MetricBucket
 */
private T value;

真正存数值的类MetricBucket:

// private final LongAdder[] counters;

/**
 * 每个样本窗口里的统计数据,包括MetricEvent枚举中所有的类型,包括PASS、BLOCK、EXCEPTION、SUCCESS、RT、OCCUPIED_PASS
 */
private final LongAdder[] counters;

/**
 * 最小响应时间
 */
private volatile long minRt;

/**
 * 构造counters数组
 */
public MetricBucket() {
    MetricEvent[] events = MetricEvent.values();
    this.counters = new LongAdder[events.length];
    for (MetricEvent event : events) {
        counters[event.ordinal()] = new LongAdder();
    }
  	// 初始化minRt
    initMinRt();
}

熟悉了关键类,看下统计的核心流程:
image-1671149704652


接下里通过源码串一下核心流程,从各插槽代码入口:

// com.alibaba.csp.sentinel.CtSph

	ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

  if (chain == null) {
      return new CtEntry(resourceWrapper, null, context);
  }

  Entry e = new CtEntry(resourceWrapper, chain, context);
  try {
      // 进入各个插槽slot
      chain.entry(context, resourceWrapper, null, count, prioritized, args);
  } catch (BlockException e1) {
      e.exit(count, args);
      throw e1;
  } catch (Throwable e1) {
      // This should not happen, unless there are errors existing in Sentinel internal.
      RecordLog.info("Sentinel unexpected exception", e1);
  }

进入统计插槽入口:

// com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
  try {
      // Do some checking.
      // 进入后续的插槽 -> ParamFlowSlot -> SystemSlot等,通过则进入下一句,不通过则进入catch部分
      fireEntry(context, resourceWrapper, node, count, prioritized, args);

      // Request passed, add thread count and pass count.
      // 请求通过,进入此处,增加线程数(访问结束调用相关api会decrease)
      node.increaseThreadNum();
      // 请求通过,进入此处,增加通过请求数
      node.addPassRequest(count);

      // ...
  } catch (PriorityWaitException ex) {
      node.increaseThreadNum();
      // ...
  } catch (BlockException e) {
      // 进入此处,增加阻塞Qps数
      node.increaseBlockQps(count);
      // ... 
      throw e;
  } catch (Throwable e) {
      context.getCurEntry().setError(e);
      throw e;
  }
}

通过node.addPassRequest(count);接着调用DefaultNode

// com.alibaba.csp.sentinel.node.DefaultNode

@Override
public void addPassRequest(int count) {
    // 添加当前资源的通过的统计数据
    super.addPassRequest(count);
  	// 添加当前资源关联的clusterNode下的通过的统计数据,流程和上面一样
    this.clusterNode.addPassRequest(count);
}

调用super.addPassRequest(count); 进入StatisticNode

this.clusterNode同理

// com.alibaba.csp.sentinel.node.StatisticNode

@Override
public void addPassRequest(int count) {
    // 增加pass指标是通过一个叫 Metric 的接口进行操作的,并且是通过 ArrayMetric 实现类实现统计
    // 秒级统计
    rollingCounterInSecond.addPass(count);
    // 分钟级别统计
    rollingCounterInMinute.addPass(count);
}

关注rollingCounterInSecond.addPass(count);进入ArrayMetric的addPass(count);

// com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric

@Override
public void addPass(int count) {
    // 1、获取当前窗口
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    // 2、添加通过数
    wrap.value().addPass(count);
}

因为1为算法核心,先看2,通过第1步获取到当前样本窗口,wrap.value().addPass(count);添加通过数,wrap.value()进入到对应的MetricBucket,从上文分析它是最终记录数值的地方,如下:
image-1671149864621

// com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket

public void addPass(int n) {
  	// MetricEvent.PASS类型
    add(MetricEvent.PASS, n);
}

/**
 * counters成员变量,数组类型,获取对应下表后进行add n
 */
public MetricBucket add(MetricEvent event, long n) {
    counters[event.ordinal()].add(n);
    return this;
}

最后看下获取当前窗口的代码逻辑:ArrayMetric#data.currentWindow();

// com.alibaba.csp.sentinel.slots.statistic.base.LeapArray

// 调用入口data的定义是ArrayMetric类的成员变量
// private final LeapArray<MetricBucket> data;

/**
 * 获取当前窗口
 */
public WindowWrap<T> currentWindow() {
    return currentWindow(TimeUtil.currentTimeMillis());
}

// 核心方法,获取当前样本时间窗口!!!
public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
    // 计算当前时间所在的样本窗口下标,见下面方法解释
    int idx = calculateTimeIdx(timeMillis);

    // 计算当前样本窗口的开始时间,见下面方法解释
    long windowStart = calculateWindowStart(timeMillis);

    // 循环处理
    while (true) {
        // 根据下标获取一个样本窗口
        WindowWrap<T> old = array.get(idx);
        // 为null,说明当前窗口还没创建
        if (old == null) {
            /*
             *     B0       B1      B2    NULL      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            bucket is empty, so create new and update
             */
          
            // 创建新的样本窗口,windowLengthInMs窗口长度;windowStart窗口开始时间;newEmptyBucket空的样本窗口数值对象
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            // cas 将计算出的样本窗口下标,改成新WindowWrap
            if (array.compareAndSet(idx, null, window)) {
                // 成功的话,返回
                return window;
            } else {
                // 当前线程不成功的话,说明其它线程更新成功了。
                // 让当前线程从运行状态 转为 就绪状态,以允许具有相同优先级的其他线程获得运行机会
                // 无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            /*
             *     B0       B1      B2     B3      B4
             * ||_______|_______|_______|_______|_______||___
             * 200     400     600     800     1000    1200  timestamp
             *                             ^
             *                          time=888
             *            startTime of Bucket 3: 800, so it's up-to-date
             */
              
            // 如果计算出的样本窗口开始时间timeMillis和通过下标获取的窗口的开始时间相等,说明样本窗口没有发生滑动
            return old;
        } else if (windowStart > old.windowStart()) {
            // 下面这个图,可以看出,old是上一轮的一个窗口,过时了
            // 已经是第二轮了,出现NULL,因为上一轮的时间窗口内并无访问
            /*
             *   (old)
             *             B0       B1      B2    NULL      B4
             * |_______||_______|_______|_______|_______|_______||___
             * ...    1200     1400    1600    1800    2000    2200  timestamp
             *                              ^
             *                           time=1676
             *          startTime of Bucket 2: 400, deprecated, should be reset
             */
              
            // 说明,窗口向前滑动了,old已经过期失效,而且当前窗口没有被reset过
            // (这种情况比较少出现,所以大多数情况下不会影响性能)
            // 1、哪些线程会有竞争?共享Node的资源会有竞争
            // 2、会什么要锁?因为resetWindowTo不是原子操作
            if (updateLock.tryLock()) {
                try {
                    // 更新样本窗口
                    // 1、将样本窗口开始时间更新为计算得到的windowStart
                    // 2、重置里面的时间
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                // Contention failed, the thread will yield its time slice to wait for bucket available.
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // 一般不会出现,当前计算的样本窗口开始时间比下标样本窗口的开始时间小
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}


/**
 * 根据时间计算样本窗口数组的下标
 * @param timeMillis
 * @return
 */
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    // 当前时间戳取整样本窗口时间长度,默认windowLengthInMs为500ms
    // (该处也可以看出,如果是第一次创建第一个样本窗口,当前时间不是开始时间,因为可能除不尽)
    long timeId = timeMillis / windowLengthInMs;

    // 根据timeId获取样本窗口数组下标,每经过500ms,timeId就会加1
    // (默认样本时间窗口为2,如果timeId % array.length()原来是0,过500ms后变成1,再过500ms又变成0,依次循环)
    return (int)(timeId % array.length());
}

/**
 * 计算窗口起始时间
 * @param timeMillis 当前时间戳
 * @return
 */
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
    // timeMillis % windowLengthInMs 从当前窗口开始点到当前时间的长度
    return timeMillis - timeMillis % windowLengthInMs;
}

至此,完成了数据的统计,包括DefaultNode和ClusterNode。

流量控制,便是使用的ClusterNode的PassQps。


总结

1、了解时间窗口算法,同时了解Sentinel对滑动时间窗口的改进思想
2、对Sentinel的实时数据统计原理大致了解,知其然知其所以然,并能以此深入其它模块


看到最后,给大家推荐个小程序吧
变有钱记账本,让我变有钱)
image-1671115699919