카테고리 없음

레디스 클러스터 모드 MGET 동작 방식(feat. Lettuce)

Hide­ 2024. 6. 13. 19:21
반응형

개요

토요일마다 우아한 대규모 시스템 설계 스터디2에 참여하고 있다. 스터디 때 마다 각자 책을 읽고 질문거리를 들고와서 함께 토론하는 형식이다. 2장 주변 친구 주제의 55페이지를 보면 "위치 정보 캐시에 일괄(batch) 요청을 보내어 모든 친구의 위치를 한번에 가져온다."라는 문장이 있다.
레디스 클러스터 모드인 경우 데이터 저장 시 해시 슬롯 알고리즘을 통해 데이터가 저장될 노드를 결정한다. 한마디로 여러개의 레디스 노드에 분산되어 저장된다는 것이다. 이런 경우 MGET으로 한번에 데이터를 가져올 수 없다. 따라서 55페이지에 나와있는 것 처럼 배치 요청을 통해 가져올 수 없는 것이 아닌가? 라는 의문이 들었다.
이에 관해서는 다음과 같은 방법을 사용해볼 수 있겠다.

  1. 레디스 클러스터 모드가 아닌 스탠드얼론 모드로 사용하되, 여러개를 준비한다.
  2. 어플리케이션 안에서 Map등의 자료구조를 통해 현재 저장/조회하려는 키를 해싱한다. 그리고 해당 값이 A라면 1번 노드로, B라면 2번 노드로, C라면 3번 노드를 향하도록 일종의 해시링을 구성한다.
  3. 데이터 저장 및 조회 시 2번의 해시링을 통해 저장/조회할 대상 노드를 선정 후 작업을 수행한다.

관련하여 더 찾아보던 도중 Lettuce 라이브러리에서 SlotHash라는 메소드를 찾았다. 뭔가..위 방식이 이미 내부적으로 구현되어있을 것이란 생각이 들었다. 이에 대해 조사해본 내용을 간단하게 기술한다.

MGET

redisTemplate.opsForValue().multiGet(...)

먼저 RedisTemplate에서 Operation을 가져오고 multiGet() 메소드를 호출해보자.

@Nullable
List<V> multiGet(Collection<K> keys);

ValueOperations 인터페이스의 multiGet으로 이동한다. 구현체로 이동해보자.

public List<V> multiGet(Collection<K> keys) {
    if (keys.isEmpty()) {
        return Collections.emptyList();  // 1
    } else {
        byte[][] rawKeys = new byte[keys.size()][];
        int counter = 0;

        Object hashKey;
        for(Iterator var4 = keys.iterator(); var4.hasNext(); rawKeys[counter++] = this.rawKey(hashKey)) {  // 2
            hashKey = var4.next();
        }

        List<byte[]> rawValues = (List)this.execute((connection) -> {  // 3
            return connection.mGet(rawKeys);  // HERE
        });
        return this.deserializeValues(rawValues);  // 4
    }
}
  1. 키가 비어있다면 추가적인 작업이 필요없기 때문에 빈 리스트를 리턴한다.
  2. 모든 키를 Serialize한다.
  3. 커넥션을 사용하여 MGET 명령을 수행한다.
  4. 3번의 결과로 나온 값들을 Deserialize한 후 리턴한다.

HERE 주석을 달아놓은 부분을 타고 들어가자.

/** @deprecated */
@Deprecated
default List<byte[]> mGet(byte[]... keys) {
    return this.stringCommands().mGet(keys);
}

DefaultRedisConnection 인터페이스의 mGet 메소드이다. stringCommands().mget()으로 들어가자.

@Nullable
List<byte[]> mGet(byte[]... keys);

RedisStringCommands 인터페이스의 mGet 메소드이다. 

이때 구현체로 이동하려고 하면 여러개가 존재하는 모습을 확인할 수 있는데 본문에서는 Lettuce를 사용중이므로 LettuceStringCommands로 이동한다.

// LettuceConnection.class
LettuceInvoker invoke() {
    return this.invoke(this.getAsyncConnection());
}

// LettuceStringCommands.class
public List<byte[]> mGet(byte[]... keys) {
    Assert.notNull(keys, "Keys must not be null");
    Assert.noNullElements(keys, "Keys must not contain null elements");
    return this.connection.invoke().fromMany(RedisStringAsyncCommands::mget, keys).toList((source) -> {
        return (byte[])source.getValueOrElse((Object)null);
    });
}

LettuceStringCommands 클래스의 mGet 메소드이다. 리턴쪽을 보면 invoke()를 통해 비동기 커넥션을 얻어온다. 그리고 해당 커넥션을 통해 RedisStringAsyncCommands의 mget 명령어를 실행한다. 해당 명령어로 들어가보자.

RedisFuture<List<KeyValue<K, V>>> mget(K... var1);

RedisStringAsyncCommands 인터페이스로 이동한다. 여기서 RedisAdvancedClusterAsyncCommandsImpl 구현체로 이동하자.

public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
    Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
    ...
}

가장 먼저 SlotHash를 통해 파티셔닝을 수행해준다. 어떠한 작업인지 들어가서 확인해본다.

public static <K, V> Map<Integer, List<K>> partition(RedisCodec<K, V> codec, Iterable<K> keys) {
    Map<Integer, List<K>> partitioned = new HashMap();
    Iterator var3 = keys.iterator();

    while(var3.hasNext()) {
        K key = var3.next();
        int slot = getSlot(codec.encodeKey(key));
        if (!partitioned.containsKey(slot)) {
            partitioned.put(slot, new ArrayList());
        }

        Collection<K> list = (Collection)partitioned.get(slot);
        list.add(key);
    }

    return partitioned;
}

주어진 키를 대상으로 각 키의 해시 슬롯을 계산하고 그룹핑하는 작업을 수행하고 있다. 이를 통해 다음 코드에서 동일 슬롯에 해당하는 키들을 한번에 요청하게 된다. 그전에 getSlot()으로 이동해보자.

public static int getSlot(ByteBuffer key) {
        int limit = key.limit();
        int position = key.position();
        int start = indexOf(key, (byte)123);
        int end;
        if (start != -1) {
            end = indexOf(key, start + 1, (byte)125);
            if (end != -1 && end != start + 1) {
                key.position(start + 1).limit(end);
            }
        }

        try {
            if (!key.hasArray()) {
                end = CRC16.crc16(key) % 16384;  // HERE
                return end;
            }

            end = CRC16.crc16(key.array(), key.position(), key.limit() - key.position()) % 16384;
        } finally {
            key.position(position).limit(limit);
        }

        return end;
    }

HERE로 주석 표시한 부분을 보면 CRC16를 통해 키를 연산하는 모습을 확인할 수 있다. 이는 레디스 클러스터에서 키를 해시 슬롯에 매핑하기 위해 사용되는 기본 알고리즘이다. 위 코드를 통해 실제 레디스 클러스터에서 키를 해싱하는 알고리즘이 우리가 사용하는 클라이언트 라이브러리에도 구현되었음을 알 수 있다. 따라서 위 코드를 사용하면 레디스의 어떤 노드에 키가 존재하는지 확인할 수 있다.

public RedisFuture<List<KeyValue<K, V>>> mget(K... keys) {
        return this.mget((Iterable)Arrays.asList(keys));
    }

public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
    Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys); // 1
    if (partitioned.size() < 2) {
        return super.mget(keys); // 2
    } else {
        Map<Integer, Map<K, Integer>> partitionedKeysToIndexes = new HashMap(partitioned.size());  // 3
        Iterator var4 = partitioned.keySet().iterator();

        while(var4.hasNext()) {
            Integer partition = (Integer)var4.next();
            List<K> keysForPartition = (List)partitioned.get(partition);
            Map<K, Integer> keysToIndexes = new HashMap(keysForPartition.size());

            for(int i = 0; i < keysForPartition.size(); ++i) {
                keysToIndexes.put(keysForPartition.get(i), i);  // 4
            }

            partitionedKeysToIndexes.put(partition, keysToIndexes);
        }

        Map<K, Integer> slots = SlotHash.getSlots(partitioned);  // 5
        Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap(partitioned.size());
        Iterator var11 = partitioned.entrySet().iterator();

        while(var11.hasNext()) {
            Map.Entry<Integer, List<K>> entry = (Map.Entry)var11.next();
            RedisFuture<List<KeyValue<K, V>>> mget = super.mget((Iterable)entry.getValue());  // 6
            executions.put(entry.getKey(), mget);
        }

        return new PipelinedRedisFuture(executions, (objectPipelinedRedisFuture) -> {
            List<KeyValue<K, V>> result = new ArrayList(slots.size());
            Iterator var6 = keys.iterator();

            while(var6.hasNext()) {
                K opKey = var6.next();
                int slot = (Integer)slots.get(opKey);
                int position = (Integer)((Map)partitionedKeysToIndexes.get(slot)).get(opKey);
                RedisFuture<List<KeyValue<K, V>>> listRedisFuture = (RedisFuture)executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> {
                    return (KeyValue)((List)listRedisFuture.get()).get(position);  // 7
                }));
            }

            return result;
        });
    }
}

다시 mget 전체 코드로 돌아와서 코드를 살펴보자.

  1. 키들을 해시 슬롯별로 그룹핑한다. 개요에서도 설명했듯이 클러스터 모드인 경우 키들이 여러개의 해시슬롯으로 분할되어 매핑되기 때문에 각 슬롯에 해당되는 키들을 모아서 조회하기 위함이다.
  2. 파티셔닝한 결과의 사이즈가 2개 미만이라면 동일한 하나의 노드에 저장된 경우이다. 따라서 mget 호출 후 즉시 반환한다.
  3. 각 슬롯별 키와 키가 몇번째 인덱스에 있는지에 대한 정보를 보관하는 맵을 생성한다. 이 인덱스 정보를 통해 최종 반환 시 MGET의 키 순서대로 결과가 리턴된다.
  4. 3번에서 생성한 맵에 슬롯에 해당하는 인덱스 정보를 저장한다.
  5. 키가 어떠한 슬롯에 속하는지 확인하고 그에 대한 맵을 생성한다.
  6. 각 슬롯별 MGET 명령어를 병렬로 실행하고 결과를 저장한다. 
  7. 4번, 6번의 결과를 통해 요청온 키 순서대로 MGET결과를 정렬하여 리턴한다.

결론적으로 개요에서 설명한 내용은 이미 Lettuce를 통해 거의 동일하게 구현되어있음을 확인할 수 있다. 전체 과정을 짧게 요약하자면,

  1. Lettuce 클라이언트는 실제 레디스 클러스터로 값이 저장될 때 어떤 해시 슬롯에 매핑되는지 계산할 수 있다. (SlotHash#partition) 이를 통해 동일한 슬롯에 위치한 키끼리 그룹핑한다.
  2. 그룹핑한 키를 각 해시 슬롯에 해당하는 노드로 병렬 MGET 요청한다.
  3. 2번의 결과를 MGET 키의 순서에 맞게 정렬하여 리턴한다.