티스토리 뷰

개발/DB

[Redis] Stream 사용 방법

Jaeyeon Baek 2021. 9. 14. 22:29

작은 프로젝트를 진행할 때도 메시지 브로커는 아키텍처에 따라 필요한 경우가 종종 있습니다. 이때 Apache kafka를 쓰자니 배보다 배꼽이 커지고, 클라우드의 메시지 큐(AWS SQS 등)를 사용하자니 벤더 락인(vendor lock-in)이 우려될 때 Redis stream은 아주 좋은 대안이 됩니다. 혹시 kafka를 redis pub/sub과 비교하려 한다면 정확한 비교가 안되는데 토픽 개념은 유사하지만 redis pub/sub에는 소비자 그룹의 개념이 없기 때문입니다. ( stream에서 xread의 경우에는 소비자 그룹을 사용하지 않습니다. xgroupread로 접근하는게 맞습니다. 소비자 그룹의 개념이 들어가면서 메시지가 잘 도착했는지 확인이 가능합니다. 혹시 메시지 전달의 성공/실패/처리가 중요하지 않다면 xread가 더 좋은 선택이 될겁니다. 본문에서는 xread에 대해서는 다루지 않습니다 )

요즘은 캐시 용도, 혹은 여러 가지 휘발성 데이터를 저장하기 위한 목적으로 redis가 다방면으로 사용 되는데요, 특히 개발자라면 로컬 호스트에 redis container 하나쯤은 있을 겁니다. 이번 글에서는 Redis Stream의 사용방법을 예제와 함께 알아보고 메시지 브로커로 사용하기 위한 전체적인 그림을 그려봅니다. 본문에서 다룬 예제는 공식문서를 (거의) 그대로 사용했습니다. 그럼 시작합니다.


 

# Redis Stream은,

  • Stream은 Redis 5.0에 새롭게 도입된 데이터 유형
  • 로그 데이터 구조를 모델링 함
  • producers, consumers 개념
  • Consumer groups 개념은 메시징 시스템 Kafka에서 처음 도입됐지만 Redis의 consumer group은 완전히 다름.
  • 하지만 목표는 같음. 결국 로그 메시지 처리를 위한 것

 

아래에서 설명하는 명령어의 대략적인 도식화

 

# XGROUP

  • 레디스 키와 함께 그룹을 생성
  • redis에 mystream 키가 존재한다면 아래와 같이 그룹을 생성
  • > XGROUP CREATE mystream mygroup $
    OK
  • mystream이 존재하지 않는 경우 아래와 같이  MKSTREAM을 통해 스트림과 그룹을 동시에 생성
  • > XGROUP CREATE newstream mygroup $ MKSTREAM
    OK
  • 이미 그룹이 존재하는 경우 아래와 같이 에러가 발생함
  • > XGROUP CREATE mystream mygroup $ MKSTREAM
    (error) BUSYGROUP Consumer Group name already exists

 

# XADD

  • 스트림에 데이터를 추가
  • > XADD mystream * message apple
    1526569495631-0
    > XADD mystream * message orange
    1526569498055-0
    > XADD mystream * message strawberry
    1526569506935-0
    > XADD mystream * message apricot
    1526569535168-0
    > XADD mystream * message banana
    1526569544280-0
  • 첫 부분에 스트림 키로 mystream을 지정했고 두 번째 argument로 Entry IDs를 넣는 부분에는  *   를 넣음
  •   로 사용하는 경우 서버에서 아래와 같은 형태로 아이디를 생성해줌
  • <millisecondsTime>-<sequenceNumber>

 

# XRANGE

  • 스트림에 담겨있는 전체 내역 출력
  • > XRANGE newstream - +
    (empty array)
  • -, + 부분에는 Entry ID를 넣을 수 있는데 -, + 로 지정하는 경우 가장 작은 아이디부터 가장 큰 아이디까지 범위를 나타냄

 

# XLEN

  • 스트림에 들어있는 데이터 개수 확인
  • > XLEN newstream
    (integer) 0

 

# XREADGROUP

  • 데이터 추출은 아래와 같이 진행
  • > XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
    1) 1) "mystream"
       2) 1) 1) 1526569495631-0
             2) 1) "message"
                2) "apple"
  • <group-name> <consumer-name> 순서대로 명시해서 사용함
  • 여기 예제에서 group-name은 mygroup, consumer-name은 Alice로 사용됨
  • 끝에  >  는 다른 소비자에게 전달된 적이 없는 데이터를 꺼내올 때 사용하는 특수 ID
  • 만약 스트림을 바라보고 대기하려면 아래와 같이 BLOCK <millisecondsTime> 을 옵션을 사용하면 됨
  • > XREADGROUP GROUP mygroup Alice BLOCK 2000 COUNT 1 STREAMS mystream >
    (nil)
    (2.10s)
  • 추출된 데이터 확인은 다음과 같음. 이때는 pending 되어 있는 아이템만 확인 됨
  • > XREADGROUP GROUP mygroup Alice STREAMS mystream 0
    1) 1) "mystream"
       2) 1) 1) 1526569495631-0
             2) 1) "message"
                2) "apple"

 

# XPENDING

  • 처리되지 않은 메시지 확인
  • > XPENDING mystream mygroup
    1) (integer) 2
    2) 1526569498055-0
    3) 1526569506935-0
    4) 1) 1) "Bob"
          2) "2"
  • xack로 처리 완료되면 여기 항목에서는 제거돼서 안보임

 

# XINFO

  • 스트림의 전체적인 정보를 확인
  • 아래와 같이 컨슈머에 펜딩되어 있는 데이터를 확인하는 등
  • > XINFO CONSUMERS mystream mygroup
    1) 1) name
       2) "Alice"
       3) pending
       4) (integer) 1
       5) idle
       6) (integer) 9104628
    2) 1) name
       2) "Bob"
       3) pending
       4) (integer) 1
       5) idle
       6) (integer) 83841983

 

# XACK

  • 데이터 처리 완료
  • > XACK mystream mygroup 1526569495631-0
    (integer) 1
    > XREADGROUP GROUP mygroup Alice STREAMS mystream 0
    1) 1) "mystream"
       2) (empty list or set)

 

# XCLAIM

  • 펜딩되어 있는 데이터를 다른 소비자에게 할당
  • > XCLAIM mystream mygroup Alice 3600000 1526569498055-0
    1) 1) 1526569498055-0
       2) 1) "message"
          2) "orange"
  • 형식은 아래와 같음
  • XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
  •  consumer  는 펜딩 데이터를 할당받을 소비자를 지정
  •  min-idle-time 은 millisecond로 지정
    • 메시지가 유휴 상태로 머무른 시간, 즉 ack나 claim을 받지 못한 상태
    • 예를 들어 20분 안에 ack 처리가 되는 게 정상인데 아닌 경우 다른 컨슈머가 처리하도록 할 때 지정하면 됨
  • 위 예제에서 1526569498055-0 메시지는 이제 Alice에게 할당됨

 

# XDEL

  • 스트림 안에 데이터 삭제
  • > XDEL mystream 1631518050741-0
    (integer) 1
  • 스크림에 메시지는 머신의 메모리 크기에 따라 무한정 커질 수 있지만 자원 사용량을 고려해서 삭제
  • 이 과정이 비즈니스를 해친다면 xadd 시에 아래와 같이 처리하는 것도 방법 ( 스트림에 10개의 메시지만 보관한다는 의미 )
  • > XTRIM mystream MAXLEN 10
  • 하지만 이렇게 사용하면 메시지 개수를 10개로 유지하기 위해 내부적으로 기존 데이터를 삭제하는 등의 로직이 타이트하게 동작함
  • 그래서 스트림이 묵직하게 사용되는 대규모 서비스라면 아래와 같이 (about)의 개념으로  ~ 을 넣어서 사용하는 것이 좋음 ( 애초에 대규모 서비스였다면 메시지 브로커를 선택할 때 kafka와 조금 더 심도 있는 비교를 하고 결정해야 함 )
  • > XTRIM mystream MAXLEN ~ 10

 

# 가상 시나리오

  • 공장의 수많은 센서로부터 메시지를 수신 ( xadd )
  • worker x N 서비스는 센서로부터 수신된 메시지를 데이터베이스에 저장 ( xgroupread )
    • xgroupread는 atomic이 보장되기 때문에 여러 개의 워커가 함께 xgroupread 해도 됨
  • 데이터베이스에 저장이 성공했으면 완료 처리 ( xack )
  • 중앙 서버에서 메시지 큐에 일정 시간 동안 처리 안된 메시지가 있으면 다른 워커에게 재처리 요청 ( xclaim )
    • worker의 비정상적인 동작으로 메시지 처리가 안된 것으로 판단하고 재처리를 요청하는 경우에 해당함 

 


 

stream을 사용하는 대략적인 명령어를 살펴봤습니다. stream key를 xgroupread를 통해서 읽어 들이고, 읽어 들인 항목은 pending 상태를 유지하며 ack를 받게 되면 pending 목록에서 제거되는 일련의 과정은 아래 그림처럼 표현할 수 있습니다.

위 그림처럼 stream key를 제거하기 위해서는 xdel이 사용되고 키 자체를 제거하려면 del을 쓰면 됩니다. 이벤트 큐로부터 메시지를 꺼내와서 처리하고 처리가 끝난 아이템에 대해서 ack로 마무리하는 방식은 용어는 달라도 일반적으로 많이 사용되는 패턴입니다. 그럼 redis stream을 통해 메시지 브로커와 즐거운 여정 함께 하시길! 

댓글
댓글쓰기 폼