滑动时间块记录单位时间内通过次数

起因

想要完成一个个人轻量级微服务框架,负载均衡和接口安全都需要一个这样的工具来统计访问频率,那么就选择了一种比较传统的方式来实现,其他博客中有提供一些方式,但设计较为简单,不能满足我的需求,所以再起炉灶

滑动窗口

概念上非常简单,就是给定一个数据结构列表,例如20个窗口,步长固定为1,长度为10,当index为11时,该窗口范围为[1-11]
在这里插入图片描述

代码实现

首先思考一下算法,我们大概需要几个参数【窗口个数、每个时间片时长、队列长度、每个时间片内允许授权个数(可选)】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 时间窗滑块
* since 2019/12/8
*
* @author eddie
*/
public class TimeWindowSliding {

/** 队列的总长度 */
private volatile int timeSliceSize;

/** 每个时间片的时长,以毫秒为单位 */
private volatile int timeMillisPerSlice;

/** 当前所使用的时间片位置 */
private AtomicInteger cursor = new AtomicInteger(0);

/** 在一个完整窗口期内允许通过的最大阈值 */
private int threshold;

/** 窗口个数 */
private int windowSize;

/** 最小每个时间片的时长,以毫秒为单位 */
private static final int MIN_TIME_MILLIS_PER_SLICE = 50;

/** 最小窗口数量 */
private static final int DEFAULT_WINDOW_SIZE = 5;

/** 数据存储 */
private TimeWindowSlidingDataSource timeWindowSlidingDataSource;

public TimeWindowSliding(TimeWindowSlidingDataSource timeWindowSlidingDataSource, int windowSize, int timeMillisPerSlice, int threshold){
this.timeWindowSlidingDataSource = timeWindowSlidingDataSource;
this.timeMillisPerSlice = timeMillisPerSlice;
this.threshold = threshold;
/* 低于一定窗口个数会丢失精准度 */
this.windowSize = Math.max(windowSize, DEFAULT_WINDOW_SIZE);
/* 保证每个时间窗至少有2个窗口存储 不会重叠 */
this.timeSliceSize = this.windowSize * 2 + 1;
/* 可以忽略这个操作 数据存储结构中定义的生命周期函数 如果接口有实现会调用 没有实现走默认实现直接return */
timeWindowSlidingDataSource.initTimeSlices();
/* 初始化参数校验 */
this.verifier();
}

初始化函数中包含了一个TimeWindowSlidingDataSource,看下这个接口的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

/**
* 时间分配数据源
* since 2019/12/8
*
* @author eddie
*/
public interface TimeWindowSlidingDataSource {

/**
* 记录通过记录
* @param timeSlices 时间分片
* @param recordKey 记录参数
* @throws TimeWindowSlidingDataSourceException 时间分片数据源操作异常
*/
void allocAdoptRecord(int timeSlices, String recordKey) throws TimeWindowSlidingDataSourceException;

/**
* 获取<recordKey>通过次数
* @param timeSlices 时间分片
* @param recordKey 记录参数
* @return
* @throws TimeWindowSlidingDataSourceException
*/
int getAllocAdoptRecordTimes(int timeSlices, String recordKey) throws TimeWindowSlidingDataSourceException;

/**
* 将fromIndex~toIndex之间的时间片计数清零
* @param fromIndex 起始索引
* @param toIndex 结束索引
* @param totalLength 窗口数量
* @throws TimeWindowSlidingDataSourceException 时间分片数据源操作异常
*/
void clearBetween(int fromIndex, int toIndex, int totalLength) throws TimeWindowSlidingDataSourceException;

/**
* 将index时间片计数清零
* @param index 索引
* @throws TimeWindowSlidingDataSourceException
*/
void clearSingle(int index) throws TimeWindowSlidingDataSourceException;

/**
* 根据需求 可不使用该函数 该函数会在初始化阶段调用一次
* @throws TimeWindowSlidingDataSourceException 时间分片数据源操作异常
*/
default void initTimeSlices() throws TimeWindowSlidingDataSourceException {}

static TimeWindowSlidingDataSource defaultDataSource() {
return new TimeWindowSlidingDataSource() {

private Map<String, Map<String,Integer>> timeWindowSlidingMap = new ConcurrentHashMap<>(16);

@Override
public void allocAdoptRecord(int timeSlices, String recordKey) throws TimeWindowSlidingDataSourceException {
Map<String, Integer> timeSlicesMap = timeWindowSlidingMap.get(String.valueOf(timeSlices));
if (Objects.isNull(timeSlicesMap)){
timeSlicesMap = new ConcurrentHashMap<>(16);
timeWindowSlidingMap.put(String.valueOf(timeSlices), timeSlicesMap);
}

Integer adoptTimes = timeSlicesMap.get(recordKey);
int nextTimes = adoptTimes == null ? 0 : adoptTimes;
timeSlicesMap.put(recordKey, ++ nextTimes);
}

@Override
public int getAllocAdoptRecordTimes(int timeSlices, String recordKey) throws TimeWindowSlidingDataSourceException {
Map<String, Integer> timeSlicesMap = timeWindowSlidingMap.get(String.valueOf(timeSlices));
if (Objects.isNull(timeSlicesMap)){
return 0;
}
Integer recordTimes = timeSlicesMap.get(recordKey);
return Objects.isNull(recordTimes) ? 0 : recordTimes;
}

@Override
public void clearBetween(int fromIndex, int toIndex, int totalLength) throws TimeWindowSlidingDataSourceException {
if (fromIndex >= toIndex){
toIndex += totalLength;
}
while (fromIndex <= toIndex) {
Map<String, Integer> timeWindowSlidingScopeMap = timeWindowSlidingMap.get(String.valueOf(fromIndex));
if (!Objects.isNull(timeWindowSlidingScopeMap) && timeWindowSlidingScopeMap.size() > 0){
timeWindowSlidingScopeMap.clear();
}
fromIndex++;
}
}

@Override
public void clearSingle(int index) throws TimeWindowSlidingDataSourceException {
Map<String, Integer> timeWindowSlidingScopeMap = timeWindowSlidingMap.get(String.valueOf(index));
if (!Objects.isNull(timeWindowSlidingScopeMap) && timeWindowSlidingScopeMap.size() > 0){
timeWindowSlidingScopeMap.clear();
}
}
};
}
}

这个类其实就是一个约定,默认提供了一个基于HashMap的实现,我需要统计单用户访问接口频率,所以,数据结构采用HashMap,也就是说相当于有一个List<Map<String,Integer>>的结构,但因为习惯问题,我才用了Map<String, Map<String,Integer>> 的结构。

但这个无所谓,因为出于场景考虑,大概率是要支持分布式的,那就需要引入redis,可这是一个工具包,因为这个工具就引入一个redis的包不太划算,所以定义了这么一个约定接口,不管你用什么存储,memchcheredismangodb,只要实现了这几个方法,传入工具中就能完成对应的功能,算是一个变形的策略模式

这些参数都初始化好之后,看下算法的部分,提供三个接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* 判断是否允许进行访问,未超过阈值的话才会对某个时间片+1
*/
public boolean allowLimitTimes(String key) {
int index = locationIndex();
int sum = 0;
// cursor不等于index,将cursor设置为index
int oldCursor = cursor.getAndSet(index);
if (oldCursor != index) {
// 清零,访问量不大时会有时间片跳跃的情况
clearBetween(oldCursor, index);
}
for (int i = 1; i < timeSliceSize; i++) {
sum += timeWindowSlidingDataSource.getAllocAdoptRecordTimes(i, key);
}

// 阈值判断
if (sum <= threshold) {
// 未超过阈值才+1
this.timeWindowSlidingDataSource.allocAdoptRecord(index, key);
return true;
}
return false;
}

/**
* 返回平均每秒访问次数
*/
public int allowNotLimitPerMin(String key) {
int index = locationIndex();
int sum = 0;
int nextIndex = index + 1;
this.timeWindowSlidingDataSource.clearSingle(nextIndex);
int from = index, to = index;
if (index < windowSize) {
from += windowSize + 1;
to += 2 * windowSize;
}else {
from = index - windowSize + 1;
}
while (from <= to){
int targetIndex = from;
if (from >= timeSliceSize) {
targetIndex = from - 2 * windowSize;
}
sum += timeWindowSlidingDataSource.getAllocAdoptRecordTimes(targetIndex, key);
from ++;
}
this.timeWindowSlidingDataSource.allocAdoptRecord(index, key);
return (sum + 1) / windowSize;
}

/**
* 返回每秒访问次数
*/
public int allowNotLimit(String key) {
int index = locationIndex();
int sum = 0;
// cursor不等于index,将cursor设置为index
int oldCursor = cursor.getAndSet(index);
if (oldCursor != index) {
// 清零,访问量不大时会有时间片跳跃的情况
clearBetween(oldCursor, index);
}
for (int i = 0; i <= timeSliceSize; i++) {
sum += timeWindowSlidingDataSource.getAllocAdoptRecordTimes(i, key);
}
this.timeWindowSlidingDataSource.allocAdoptRecord(index, key);
return sum + 1;
}

/**
* <p>将fromIndex~toIndex之间的时间片计数都清零
* <p>极端情况下,当循环队列已经走了超过1个timeSliceSize以上,这里的清零并不能如期望的进行
*/
private void clearBetween(int fromIndex, int toIndex) {
this.timeWindowSlidingDataSource.clearBetween(fromIndex, toIndex, timeSliceSize);
}

private int locationIndex() {
long time = System.currentTimeMillis();
return (int) ((time / timeMillisPerSlice) % timeSliceSize);
}

演示结果就不给大家展示了,刚把一大堆log去掉,然后滑动窗口的这个算法我有点拎不清,如果有更好的办法欢迎给我留言,或者github协作,该项目github地址,对应实现在com.el.common.time.sliding包下

另外
感谢博主: tianyaleixiaowu提供的基础思路