gongdear

gongdear的技术博客

欢迎大家参观我的博客
  menu
101 文章
89355 浏览
2 当前访客
ღゝ◡╹)ノ❤️

注解实现Redis并发锁

1. 核心redis锁策略
public class RedisLock {
    private String key;
    private boolean lock = false;

    private final StringRedisTemplate redisClient;
    private final RedisConnection redisConnection;

    /**
     * @param purpose 锁前缀
     * @param key     锁定的ID等东西
     */
    public RedisLock(String purpose, String key, StringRedisTemplate redisClient) {
        if (redisClient == null) {
            throw new IllegalArgumentException("redisClient 不能为null!");
        }
        this.key = purpose + "_" + key + "_redis_lock";
        this.redisClient = redisClient;
        this.redisConnection = redisClient.getConnectionFactory().getConnection();
    }

    /**
     * 锁的策略参考: <a href="http://blog.csdn.net/u010359884/article/details/50310387">基于redis分布式锁实现“秒杀”</a>
     * FIXME 此方式加锁策略存在一定缺陷: 在setIfAbsent()之后expire()执行之前程序异常 锁不会被释放. 虽然出现几率极低
     *
     * @param timeout timeout的时间范围内轮询锁, 单位: 秒
     * @param expire  设置锁超时时间
     * @return true, 获取锁成功; false, 获取锁失败.
     */
    public boolean lockA(long timeout, long expire, final TimeUnit unit) {
        long beginTime = System.nanoTime();  // 用nanos、mills具体看需求.
        timeout = unit.toNanos(timeout);
        try {
            // 在timeout的时间范围内不断轮询锁
            while (System.nanoTime() - beginTime < timeout) {
                // 锁不存在的话,设置锁并设置锁过期时间,即加锁
                if (this.redisClient.opsForValue().setIfAbsent(this.key, "1")) {
                    this.redisClient.expire(key, expire, unit);//设置锁失效时间, 防止永久阻塞
                    this.lock = true;
                    return true;
                }

                // 短暂休眠后轮询,避免可能的活锁
                System.out.println("get lock waiting...");
                Thread.sleep(30);
            }
        } catch (Exception e) {
            throw new RedisLockException("locking error", e);
        }
        return false;
    }

    /**
     * 特别注意: 如果多服务器之间存在时间差, 并不建议用System.nanoTime()、System.currentTimeMillis().
     * 更好的是统一用redis-server的时间, 但只能获取到milliseconds.
     * 锁的策略参考: <a href="http://www.jeffkit.info/2011/07/1000/?spm=5176.100239.blogcont60663.7.9f4d4a8h4IOxe">用Redis实现分布式锁</a>
     *
     * @param timeout 获取锁超时, 单位: 毫秒
     * @param expire 锁失效时常, 单位: 毫秒
     * @return true, 获取锁成功; false, 获取锁失败.
     */
    public boolean lockB(long timeout, long expire) {
        long bt = System.currentTimeMillis();
        long lockVal;
        String lockExpireTime;
        try {
            while (!this.lock) {
                if(System.currentTimeMillis() - bt > timeout){
                    throw new RedisLockException("get lock timeout!");
                }

                // 锁的键值: {当前时间} + {失效时常} = {锁失效时间}
                lockVal = getRedisTime() + expire;

                // 1. 尝试获取锁
                boolean ifAbsent = this.redisClient.opsForValue().setIfAbsent(this.key, lockVal + "");
                if (ifAbsent) { // 设置成功, 表示获得锁
                    this.lock = true;
                    return true;
                }

                lockExpireTime = this.redisClient.opsForValue().get(this.key);
                long curTime = getRedisTime();

                if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
                    lockExpireTime = this.redisClient.opsForValue().getAndSet(this.key, lockVal + "");

                    if (curTime > NumberUtils.toLong(lockExpireTime, 0)) {
                        this.lock = true;
                        return true;
                    }
                }

                // 锁被占用, 短暂休眠等待轮询
                System.out.println(this + ": get lock waiting...");
                Thread.sleep(40);
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RedisLockException("locking error", e);
        }
        System.out.println(this + ": get lock error.");
        return false;
    }

    /**
     * @return current redis-server time in milliseconds.
     */
    private long getRedisTime() {
        return this.redisConnection.time();
    }

    private void closeConnection(){
        if(!this.redisConnection.isClosed()){
            this.redisConnection.close();
        }
    }

    /** 释放锁 */
    public void unlock() {
        if (this.lock) {
            redisClient.delete(key);
        }
    }

    public boolean isLock() {
        return lock;
    }
}
2. 注解部分
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLockedKey {
    /**
     * 复杂对象中需要加锁的成员变量
     */
    String field() default "";
}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisDistributedLock {
/** 锁key的前缀 /
String lockedPrefix() default "";
/
* 轮询锁的时间超时时常, 单位: ms /
long timeout() default 2000;
/
* redis-key失效时常, 单位: ms */
int expireTime() default 1000;
}

@Component
@Aspect
public class RedisDistributedLockAop {
    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 定义缓存逻辑
     */
    @Around("@annotation(com.vergilyn.demo.springboot.distributed.lock.annotation.RedisDistributedLock)")
    public void cache(ProceedingJoinPoint pjp) {
        Method method = getMethod(pjp);

        RedisDistributedLock cacheLock = method.getAnnotation(RedisDistributedLock.class);
        String key = getRedisKey(method.getParameterAnnotations(), pjp.getArgs());

        RedisLock redisLock = new RedisLock(cacheLock.lockedPrefix(), key, redisTemplate);

        //       boolean isLock = redisLock.lockB(cacheLock.timeout(), cacheLock.expireTime());
        boolean isLock = redisLock.lockA(cacheLock.timeout(), cacheLock.expireTime(), TimeUnit.MILLISECONDS);
        if (isLock) {
            try {
                pjp.proceed();
                return;
            } catch (Throwable e) {
                e.printStackTrace();
            } finally {
                redisLock.unlock();
            }
        }
        System.out.println("执行方法失败");
    }

    /**
     * 获取被拦截的方法对象
     */
    private Method getMethod(ProceedingJoinPoint pjp) {
        Object[] args = pjp.getArgs();
        Class[] argTypes = new Class[pjp.getArgs().length];
        for (int i = 0; i < args.length; i++) {
            argTypes[i] = args[i].getClass();
        }
        Method method = null;
        try {
            method = pjp.getTarget().getClass().getMethod(pjp.getSignature().getName(), argTypes);
        } catch (NoSuchMethodException | SecurityException e) {
            e.printStackTrace();
        }
        return method;

    }

    private String getRedisKey(Annotation[][] annotations, Object[] args){
        if (null == args || args.length == 0) {
            throw new RedisLockException("方法参数为空,没有被锁定的对象");
        }
        if (null == annotations || annotations.length == 0) {
            throw new RedisLockException("没有被注解的参数");
        }
        // 只支持第一个注解为RedisLockedKey的参数
        for (int i = 0; i < annotations.length; i++) {
            for (int j = 0; j < annotations[i].length; j++) {
                if (annotations[i][j] instanceof RedisLockedKey) { //注解为LockedComplexObject
                    RedisLockedKey redisLockedKey = (RedisLockedKey) annotations[i][j];
                    String field = redisLockedKey.field();
                    try {
                        // field存在, 表示取参数对象的相应field;
                        if(StringUtils.isBlank(field)){
                            return args[i].toString();
                        }else {
                            return args[i].getClass().getDeclaredField(redisLockedKey.field()).toString();
                        }
                    } catch (NoSuchFieldException | SecurityException e) {
                        e.printStackTrace();
                        throw new RedisLockException("注解对象中不存在属性: " + redisLockedKey.field());
                    }
                }
            }
        }

        throw new RedisLockException("未找到注解对象!");
    }
} 

public class RedisLockException extends RuntimeException{
public RedisLockException(String msg, Throwable throwable) {
super(msg, throwable);
}
public RedisLockException(String msg) {
super(msg);
}
}

调用代码:

@RedisDistributedLock(lockedPrefix="TEST_PREFIX") 
public void lockMethod(String arg1, @RedisLockedKey Long arg2) { 
    //最简单的秒杀,这里仅作为demo示例 
    System.out.println("lockMethod, goods: " + reduceInventory(arg2)); 
}

以上改进代码依然可能存在的问题:
    1) 连接很可能没有正常关闭.
    2) 连接依然过多, 假设并发有1000个, 那一样会产生1000个连接, 且这些连接只会在竞争获取锁完后才会释放.(且产生了1000个RedisLock对象)
    3) 是否可以缓存注解对象?

  针对问题2), 主要想达到怎么尽可能减少redis连接?
  比如: 有1000个并发, 其中200个是兑换商品A, 其中300个是兑换商品B, 其中500个是兑换商品C.

  1、是否可以用单例模式来实现RedisLock?
    对单例多线程还是很混乱, 不好说. 但如果可行, 会否太影响获取锁的性能?
  比如兑换商品A的200个并发共用一个redisConnection, 感觉还是合理的, 毕竟互相之间是竞争关系.
  但商品A、商品B、商品C如果也共用一个redisConnection, 是不是完全不合理?
  他们之间根本是"并行"的, 相互之间没有一点联系.
  2、所以, 是否更进一步的实现是: 同一个锁竞争用相同的RedisLock对象和RedisConnection连接.
  即竞争商品A的200个并发用同一个"redisConnection_A"、"redisLock_A", 商品B的300个并发用同一个"redisConnection_B"、"redisLock_B"?

  针对问题3), 在代码RedisDistributedLockAop中, 每次都会:
    1)getMethod(pjp): 获取拦截方法.
    2) 通过拦截方法解析出getRedisKey.

宝剑锋从磨砺出,梅花香自苦寒来.