본문 바로가기

redis

분산락을 적용해보자 (Redisson)

풀필먼트 입고 서비스팀에서 분산락을 사용하는 방법 - Spring Redisson - 컬리 기술 블로그 (kurly.com)

 

풀필먼트 입고 서비스팀에서 분산락을 사용하는 방법 - Spring Redisson

어노테이션 기반으로 분산락을 사용하는 방법에 대해 소개합니다.

helloworld.kurly.com

 

 Redis의 Redisson 라이브러리 선정 이유

분산락은 Redis, Mysql, Zookeeper 등 여러가지 방법을 통해 구현할 수 있는데,

Mysql 같이 DB에서 직접 락을 구현한다면 별도의 커넥션 풀을 관리해야 하고, 락에 관련된 부하를 RDS에서 받는다는 점에서 Redis를  사용하는 것이 더 효율적일 수 있다.

또한 Redisson은 일반적으로 많이 쓰는 Lettuce와 비교했을 때, 락 사용 방식에 여러 차이가 있는데 아래와 같다.

Lock Interface 지원

Lettuce는 분산락을 사용하기 위해 setnx, setex 등을 이용해 분산락을 직접 구현해야하는데, 개발자가 직접 retry, timeout과 같은 기능을 직접 구현해주어야 하는 번거러움이 있다.

이에 비해 Redisson은 별도의 Lock interface를 지원해 보다 안전하게 사용할 수 있다.

하지만 직접 구현해야 한다는 번거로움의 뜻은 더욱 세밀하고 자세하게 커스텀할 수 있다는 뜻도 되니 본인의 상황에 맞게 사용하면 좋을 것 같다.

 

Lock 획득 방식

Lettuce는 분산락 구현 시, setnx, setex과 같은 명령어를 이요해 지속적으로 Redis에게 락이 해제 되었는지 요청을 보내는 스핀락 방식으로 동작하는데 이로써 요청이 많아질수록 Redis가 받는 부하가 커지게 된다.

하지만 Redisson은 Pub/Sub 방식을 이용하기에 락이 해제되면 락을 subscribe하는 클라이언트는 락이 해제되었다는 신호를 받고 락 획득을 시도하게 된다.

그럼 이제부터 구현을 시작할건데, 아래와 같은 규칙하에 적용을 해보도록 하겠다.

  1. 분산락 처리 로직은 비즈니스 로직이 오염되지 않게 분리해서 사용한다.
  2. waitTime, leaseTime을 커스텀 하게 지정 가능하다.
  3. 락의 name에 대해 사용자로부터 커스텀 하게 받아 처리한다.
  4. 추가 요구사항에 대해서 공통으로 관리한다.

build.gradle

dependencies {
    // redisson
    implementation 'org.redisson:redisson-spring-boot-starter:3.18.0'
}

 

RedissonConfig.java

@Configuration
public class RedissonConfig {
    @Value("${spring.redis.host}")
    private String redisHost;

    @Value("${spring.redis.port}")
    private int redisPort;

    private static final String REDISSON_HOST_PREFIX = "redis://";

    @Bean
    public RedissonClient redissonClient() {
        RedissonClient redisson = null;
        Config config = new Config();
        config.useSingleServer().setAddress(REDISSON_HOST_PREFIX + redisHost + ":" + redisPort);
        redisson = Redisson.create(config);
        return redisson;
    }
}

@DistributedLock  어노테이션 정의

락을 걸기위해 사용할 @DistributedLock 어노테이션을 정의

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {

    /**
     * 락의 이름
     */
    String key();

    /**
     * 락의 시간 단위
     */
    TimeUnit timeUnit() default TimeUnit.SECONDS;

    /**
     * 락을 기다리는 시간 (default - 5s)
     * 락 획득을 위해 waitTime 만큼 대기한다
     */
    long waitTime() default 10L;

    /**
     * 락 임대 시간 (default - 3s)
     * 락을 획득한 이후 leaseTime 이 지나면 락을 해제한다
     */
    long leaseTime() default 3L;
}

@Target(ElementType.METHOD) => 해당 어노테이션이 메소드에 적용될 수 있음을 정의

@Retention(RetentionPolicy.RUNTIME) => 해당 어노테이션이 런타임 시에도 유지되어야함을 정의. 즉 프로그램이 실행될 때 어노테이션 정보가 유지되어야함을 의미

즉, @DistributedLock 어노테이션은 메소드에만 적용되고, 런타임 시에도 그 정보가 유지되어야한다.

 

DistributedLockAop

@Aspect
@Component
@RequiredArgsConstructor
@Slf4j
public class DistributedLockAop {
    private static final String REDISSON_LOCK_PREFIX = "LOCK:";

    private final RedissonClient redissonClient;
    private final AopForTransaction aopForTransaction;

    //@DistributedLock 어노테이션이 붙은 메소드를 대상으로 하는 Aspect를 정의
    @Around("@annotation(org.sparta.mytaek1.global.redis.DistributedLock)")
    public Object lock(final ProceedingJoinPoint joinPoint) throws Throwable {//조인 포인트는 프로그램 실행 중에 다른 코드와 결합할 수 있는 특정 지점을 의미

        // 메소드 서명과 어노테이션 정보를 획득
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        DistributedLock distributedLock = method.getAnnotation(DistributedLock.class);

        // 분산 락 키를 생성
        String key = REDISSON_LOCK_PREFIX + CustomSpringELParser.getDynamicValue(signature.getParameterNames(), joinPoint.getArgs(), distributedLock.key());
        RLock rLock = redissonClient.getLock(key);

        try {
            // 락을 획득하기 위해 시도
            boolean available = rLock.tryLock(distributedLock.waitTime(), distributedLock.leaseTime(), distributedLock.timeUnit());
            if (!available) {
                return false;
            }

            // AOP를 통해 트랜잭션 처리를 진행
            return aopForTransaction.proceed(joinPoint);
        } catch (InterruptedException e) {
            throw new InterruptedException();
        } finally {
            // 락을 해제
            rLock.unlock();
        }
    }
}

@DistributedLock 어노테이션 선언 시 수행되는 AOP 클래스로

@DistributedLock 어노테이션의 파라미터 값을 가져와 분산락 획득 시도 그리고 어노테이션이 선언된 메서드를 실행

  1. 락의 이름으로 RLock 인스턴스를 가져온다.
  2. 정의된 waitTime까지 획득을 시도한다, 정의된 leaseTime이 지나면 잠금을 해제한다.
  3. DistributedLock 어노테이션이 선언된 메서드를 별도의 트랜잭션으로 실행한다.
  4. 종료 시 무조건 락을 해제한다.

 

CustomSpringELParser

public class CustomSpringELParser {
    private CustomSpringELParser() {
    }

    public static Object getDynamicValue(String[] parameterNames, Object[] args, String key) {
        ExpressionParser parser = new SpelExpressionParser();
        StandardEvaluationContext context = new StandardEvaluationContext();

        for (int i = 0; i < parameterNames.length; i++) {
            context.setVariable(parameterNames[i], args[i]);
        }

        return parser.parseExpression(key).getValue(context, Object.class);
    }
}

CustomSpringELParser 는 전달받은 Lock의 이름을 Spring Expression Language 로 파싱하여 읽어온다.

AopForTransaction

@Component
public class AopForTransaction {

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Object proceed(final ProceedingJoinPoint joinPoint) throws Throwable {
        return joinPoint.proceed();
    }
}

@DistributedLock 이 선언된 메서드는 Propagation.REQUIRES_NEW 옵션을 지정해 부모 트랜잭션의 유무에 관계없이 별도의 트랜잭션으로 동작하게끔 설정

 

OrderService

    @DistributedLock(key = "#lockName")
    public OrderResponseDto createOrder(String lockName, Long productId, OrderRequestDto orderRequestDto, User user) {
        System.out.println(lockName);
        Product product = findProduct(productId);
        Stock stock = findStock(productId);
        if (stock.getProductStock() < orderRequestDto.getQuantity()) {
            throw new IllegalArgumentException("잔여 수량이 부족합니다.");
        }
        Orders order = new Orders(orderRequestDto, product, user, false);
        stock.updateStock(orderRequestDto.getQuantity());
        orderRepository.save(order);
        return new OrderResponseDto(order);
    }

 

StockLockTest

@SpringBootTest
@ActiveProfiles("test")
public class StockLockTest {

    private final UserRepository userRepository;
    private final OrderService orderService;
    private final ProductRepository productRepository;
    private final OrderRepository orderRepository;
    private final StockRepository stockRepository;
    private Product product;
    private Stock stock;
    private User user;

    @Autowired
    public StockLockTest(UserRepository userRepository, OrderService orderService, ProductRepository productRepository, OrderRepository orderRepository, StockRepository stockRepository) {
        this.userRepository = userRepository;
        this.orderService = orderService;
        this.productRepository = productRepository;
        this.orderRepository = orderRepository;
        this.stockRepository = stockRepository;
    }

    @BeforeEach
    void setUp() {
        product = new Product("고구마", "호박고구망", 10000);
        productRepository.save(product);
        stock = new Stock(product, 100);
        stockRepository.save(stock);
    }

    /**
     * Feature: 쿠폰 차감 동시성 테스트
     * Background
     *     Given KURLY_001 라는 이름의 쿠폰 100장이 등록되어 있음
     * <p>
     * Scenario: 100장의 쿠폰을 100명의 사용자가 동시에 접근해 발급 요청함
     *           Lock의 이름은 쿠폰명으로 설정함
     * <p>
     * Then 사용자들의 요청만큼 정확히 쿠폰의 개수가 차감되어야 함
     */

    @Test
//    @Transactional
    void 재고차감_분산락_적용_동시성100명_테스트() throws InterruptedException {
        int numberOfThreads = 200;
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        CountDownLatch latch = new CountDownLatch(numberOfThreads);
        user = new User("다보미","da123@email.com","asdf1234!","123477756","01012345678","경기도 파주시","12345");
        userRepository.save(user);

        for (int i = 0; i < numberOfThreads; i++) {
            executorService.submit(() -> {
                try {
                    Long productId = product.getProductId();
                    OrderRequestDto orderRequestDto = new OrderRequestDto(1,1);
                    orderService.createOrder(user.getUserName(), productId, orderRequestDto, user);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
        List<Orders> orders = orderRepository.findAllByProductProductId(product.getProductId());
        int numberOfOrders = orders.size();
        Stock persistStock = stockRepository.findById(stock.getStockId())
                  .orElseThrow(IllegalArgumentException::new);

        assertThat(numberOfOrders).isEqualTo(100);
        assertThat(persistStock.getProductStock()).isZero();
    }
}

 

 

결과

 

200개의 요청이 동시에 들어와도

오더는 100개만 만들어지고 재고도 0개로 의도에 맞게 락이 적용된 걸 알 수 있다.

적용 전과 비교

적용 전 코드

    @Transactional
    public OrderResponseDto createOrderBeforeLock(Long productId, OrderRequestDto orderRequestDto, User user) {
        Product product = findProduct(productId);
        Stock stock = findStock(productId);
        if (stock.getProductStock() < orderRequestDto.getQuantity()) {
            throw new IllegalArgumentException("잔여 수량이 부족합니다.");
        }
        Orders order = new Orders(orderRequestDto, product, user, false);
        stock.updateStock(orderRequestDto.getQuantity());
        orderRepository.save(order);
        return new OrderResponseDto(order);
    }

 

테스트 코드

package org.sparta.mytaek1.domain.order;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.sparta.mytaek1.domain.order.dto.OrderRequestDto;
import org.sparta.mytaek1.domain.order.entity.Orders;
import org.sparta.mytaek1.domain.order.repository.OrderRepository;
import org.sparta.mytaek1.domain.order.service.OrderService;
import org.sparta.mytaek1.domain.product.entity.Product;
import org.sparta.mytaek1.domain.product.repository.ProductRepository;
import org.sparta.mytaek1.domain.stock.entity.Stock;
import org.sparta.mytaek1.domain.stock.repository.StockRepository;
import org.sparta.mytaek1.domain.user.entity.User;
import org.sparta.mytaek1.domain.user.repository.UserRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;


@SpringBootTest
@ActiveProfiles("test")
public class BeforeLockTest {


    private final UserRepository userRepository;
    private final OrderService orderService;
    private final ProductRepository productRepository;
    private final OrderRepository orderRepository;
    private final StockRepository stockRepository;
    private Product product;
    private Stock stock;
    private User user;

    @Autowired
    public BeforeLockTest(UserRepository userRepository, OrderService orderService, ProductRepository productRepository, OrderRepository orderRepository, StockRepository stockRepository) {
        this.userRepository = userRepository;
        this.orderService = orderService;
        this.productRepository = productRepository;
        this.orderRepository = orderRepository;
        this.stockRepository = stockRepository;
    }

    @BeforeEach
    void setUp() {
        product = new Product("고구마", "호박고구망", 10000);
        productRepository.save(product);
        stock = new Stock(product, 100);
        stockRepository.save(stock);
    }

    @Test
//    @Transactional
    void 재고차감_분산락_적용_동시성100명_테스트() throws InterruptedException {
        int numberOfThreads = 200;
        ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
        CountDownLatch latch = new CountDownLatch(numberOfThreads);
        user = new User("다보미","da123@email.com","asdf1234!","123477756","01012345678","경기도 파주시","12345");
        userRepository.save(user);
        String userName = "da123@email.com";

        for (int i = 0; i < numberOfThreads; i++) {
            final int index = i; // final로 선언된 변수에 할당
            executorService.submit(() -> {
                try {
                    Long productId = product.getProductId();
                    OrderRequestDto orderRequestDto = new OrderRequestDto(1,1);
                    orderService.createOrderBeforeLock(productId, orderRequestDto, user);
                } finally {
                    latch.countDown();
                }
            });
        }
        latch.await();
        List<Orders> orders = orderRepository.findAllByProductProductId(product.getProductId());
        int numberOfOrders = orders.size();
        Stock persistStock = stockRepository.findById(stock.getStockId())
                .orElseThrow(IllegalArgumentException::new);

        assertThat(numberOfOrders).isEqualTo(100);
        assertThat(persistStock.getProductStock()).isZero();
    }
}

 

결과

보는것과 같이

동시에 200개의 요청이 들어오자 수량과 상관없이 200개의 오더가 다 만들어지고

수량은 23개밖에 줄어들지 않았다.

'redis' 카테고리의 다른 글

레디스의 자료구조  (0) 2024.01.04
대기열을 이용한 선착순 쿠폰 발행  (0) 2024.01.04
캐시와 레디스  (0) 2024.01.03