개요
토요일마다 우아한 대규모 시스템 설계 스터디2에 참여하고 있다. 스터디 때 마다 각자 책을 읽고 질문거리를 들고와서 함께 토론하는 형식이다. 2장 주변 친구 주제의 55페이지를 보면 "위치 정보 캐시에 일괄(batch) 요청을 보내어 모든 친구의 위치를 한번에 가져온다."라는 문장이 있다.
레디스 클러스터 모드인 경우 데이터 저장 시 해시 슬롯 알고리즘을 통해 데이터가 저장될 노드를 결정한다. 한마디로 여러개의 레디스 노드에 분산되어 저장된다는 것이다. 이런 경우 MGET으로 한번에 데이터를 가져올 수 없다. 따라서 55페이지에 나와있는 것 처럼 배치 요청을 통해 가져올 수 없는 것이 아닌가? 라는 의문이 들었다.
이에 관해서는 다음과 같은 방법을 사용해볼 수 있겠다.
- 레디스 클러스터 모드가 아닌 스탠드얼론 모드로 사용하되, 여러개를 준비한다.
- 어플리케이션 안에서 Map등의 자료구조를 통해 현재 저장/조회하려는 키를 해싱한다. 그리고 해당 값이 A라면 1번 노드로, B라면 2번 노드로, C라면 3번 노드를 향하도록 일종의 해시링을 구성한다.
- 데이터 저장 및 조회 시 2번의 해시링을 통해 저장/조회할 대상 노드를 선정 후 작업을 수행한다.
관련하여 더 찾아보던 도중 Lettuce 라이브러리에서 SlotHash라는 메소드를 찾았다. 뭔가..위 방식이 이미 내부적으로 구현되어있을 것이란 생각이 들었다. 이에 대해 조사해본 내용을 간단하게 기술한다.
MGET
redisTemplate.opsForValue().multiGet(...)
먼저 RedisTemplate에서 Operation을 가져오고 multiGet() 메소드를 호출해보자.
@Nullable
List<V> multiGet(Collection<K> keys);
ValueOperations 인터페이스의 multiGet으로 이동한다. 구현체로 이동해보자.
public List<V> multiGet(Collection<K> keys) {
if (keys.isEmpty()) {
return Collections.emptyList(); // 1
} else {
byte[][] rawKeys = new byte[keys.size()][];
int counter = 0;
Object hashKey;
for(Iterator var4 = keys.iterator(); var4.hasNext(); rawKeys[counter++] = this.rawKey(hashKey)) { // 2
hashKey = var4.next();
}
List<byte[]> rawValues = (List)this.execute((connection) -> { // 3
return connection.mGet(rawKeys); // HERE
});
return this.deserializeValues(rawValues); // 4
}
}
- 키가 비어있다면 추가적인 작업이 필요없기 때문에 빈 리스트를 리턴한다.
- 모든 키를 Serialize한다.
- 커넥션을 사용하여 MGET 명령을 수행한다.
- 3번의 결과로 나온 값들을 Deserialize한 후 리턴한다.
HERE 주석을 달아놓은 부분을 타고 들어가자.
/** @deprecated */
@Deprecated
default List<byte[]> mGet(byte[]... keys) {
return this.stringCommands().mGet(keys);
}
DefaultRedisConnection 인터페이스의 mGet 메소드이다. stringCommands().mget()으로 들어가자.
@Nullable
List<byte[]> mGet(byte[]... keys);
RedisStringCommands 인터페이스의 mGet 메소드이다.
이때 구현체로 이동하려고 하면 여러개가 존재하는 모습을 확인할 수 있는데 본문에서는 Lettuce를 사용중이므로 LettuceStringCommands로 이동한다.
// LettuceConnection.class
LettuceInvoker invoke() {
return this.invoke(this.getAsyncConnection());
}
// LettuceStringCommands.class
public List<byte[]> mGet(byte[]... keys) {
Assert.notNull(keys, "Keys must not be null");
Assert.noNullElements(keys, "Keys must not contain null elements");
return this.connection.invoke().fromMany(RedisStringAsyncCommands::mget, keys).toList((source) -> {
return (byte[])source.getValueOrElse((Object)null);
});
}
LettuceStringCommands 클래스의 mGet 메소드이다. 리턴쪽을 보면 invoke()를 통해 비동기 커넥션을 얻어온다. 그리고 해당 커넥션을 통해 RedisStringAsyncCommands의 mget 명령어를 실행한다. 해당 명령어로 들어가보자.
RedisFuture<List<KeyValue<K, V>>> mget(K... var1);
RedisStringAsyncCommands 인터페이스로 이동한다. 여기서 RedisAdvancedClusterAsyncCommandsImpl 구현체로 이동하자.
public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
...
}
가장 먼저 SlotHash를 통해 파티셔닝을 수행해준다. 어떠한 작업인지 들어가서 확인해본다.
public static <K, V> Map<Integer, List<K>> partition(RedisCodec<K, V> codec, Iterable<K> keys) {
Map<Integer, List<K>> partitioned = new HashMap();
Iterator var3 = keys.iterator();
while(var3.hasNext()) {
K key = var3.next();
int slot = getSlot(codec.encodeKey(key));
if (!partitioned.containsKey(slot)) {
partitioned.put(slot, new ArrayList());
}
Collection<K> list = (Collection)partitioned.get(slot);
list.add(key);
}
return partitioned;
}
주어진 키를 대상으로 각 키의 해시 슬롯을 계산하고 그룹핑하는 작업을 수행하고 있다. 이를 통해 다음 코드에서 동일 슬롯에 해당하는 키들을 한번에 요청하게 된다. 그전에 getSlot()으로 이동해보자.
public static int getSlot(ByteBuffer key) {
int limit = key.limit();
int position = key.position();
int start = indexOf(key, (byte)123);
int end;
if (start != -1) {
end = indexOf(key, start + 1, (byte)125);
if (end != -1 && end != start + 1) {
key.position(start + 1).limit(end);
}
}
try {
if (!key.hasArray()) {
end = CRC16.crc16(key) % 16384; // HERE
return end;
}
end = CRC16.crc16(key.array(), key.position(), key.limit() - key.position()) % 16384;
} finally {
key.position(position).limit(limit);
}
return end;
}
HERE로 주석 표시한 부분을 보면 CRC16를 통해 키를 연산하는 모습을 확인할 수 있다. 이는 레디스 클러스터에서 키를 해시 슬롯에 매핑하기 위해 사용되는 기본 알고리즘이다. 위 코드를 통해 실제 레디스 클러스터에서 키를 해싱하는 알고리즘이 우리가 사용하는 클라이언트 라이브러리에도 구현되었음을 알 수 있다. 따라서 위 코드를 사용하면 레디스의 어떤 노드에 키가 존재하는지 확인할 수 있다.
public RedisFuture<List<KeyValue<K, V>>> mget(K... keys) {
return this.mget((Iterable)Arrays.asList(keys));
}
public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys); // 1
if (partitioned.size() < 2) {
return super.mget(keys); // 2
} else {
Map<Integer, Map<K, Integer>> partitionedKeysToIndexes = new HashMap(partitioned.size()); // 3
Iterator var4 = partitioned.keySet().iterator();
while(var4.hasNext()) {
Integer partition = (Integer)var4.next();
List<K> keysForPartition = (List)partitioned.get(partition);
Map<K, Integer> keysToIndexes = new HashMap(keysForPartition.size());
for(int i = 0; i < keysForPartition.size(); ++i) {
keysToIndexes.put(keysForPartition.get(i), i); // 4
}
partitionedKeysToIndexes.put(partition, keysToIndexes);
}
Map<K, Integer> slots = SlotHash.getSlots(partitioned); // 5
Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap(partitioned.size());
Iterator var11 = partitioned.entrySet().iterator();
while(var11.hasNext()) {
Map.Entry<Integer, List<K>> entry = (Map.Entry)var11.next();
RedisFuture<List<KeyValue<K, V>>> mget = super.mget((Iterable)entry.getValue()); // 6
executions.put(entry.getKey(), mget);
}
return new PipelinedRedisFuture(executions, (objectPipelinedRedisFuture) -> {
List<KeyValue<K, V>> result = new ArrayList(slots.size());
Iterator var6 = keys.iterator();
while(var6.hasNext()) {
K opKey = var6.next();
int slot = (Integer)slots.get(opKey);
int position = (Integer)((Map)partitionedKeysToIndexes.get(slot)).get(opKey);
RedisFuture<List<KeyValue<K, V>>> listRedisFuture = (RedisFuture)executions.get(slot);
result.add(MultiNodeExecution.execute(() -> {
return (KeyValue)((List)listRedisFuture.get()).get(position); // 7
}));
}
return result;
});
}
}
다시 mget 전체 코드로 돌아와서 코드를 살펴보자.
- 키들을 해시 슬롯별로 그룹핑한다. 개요에서도 설명했듯이 클러스터 모드인 경우 키들이 여러개의 해시슬롯으로 분할되어 매핑되기 때문에 각 슬롯에 해당되는 키들을 모아서 조회하기 위함이다.
- 파티셔닝한 결과의 사이즈가 2개 미만이라면 동일한 하나의 노드에 저장된 경우이다. 따라서 mget 호출 후 즉시 반환한다.
- 각 슬롯별 키와 키가 몇번째 인덱스에 있는지에 대한 정보를 보관하는 맵을 생성한다. 이 인덱스 정보를 통해 최종 반환 시 MGET의 키 순서대로 결과가 리턴된다.
- 3번에서 생성한 맵에 슬롯에 해당하는 인덱스 정보를 저장한다.
- 키가 어떠한 슬롯에 속하는지 확인하고 그에 대한 맵을 생성한다.
- 각 슬롯별 MGET 명령어를 병렬로 실행하고 결과를 저장한다.
- 4번, 6번의 결과를 통해 요청온 키 순서대로 MGET결과를 정렬하여 리턴한다.
결론적으로 개요에서 설명한 내용은 이미 Lettuce를 통해 거의 동일하게 구현되어있음을 확인할 수 있다. 전체 과정을 짧게 요약하자면,
- Lettuce 클라이언트는 실제 레디스 클러스터로 값이 저장될 때 어떤 해시 슬롯에 매핑되는지 계산할 수 있다. (SlotHash#partition) 이를 통해 동일한 슬롯에 위치한 키끼리 그룹핑한다.
- 그룹핑한 키를 각 해시 슬롯에 해당하는 노드로 병렬 MGET 요청한다.
- 2번의 결과를 MGET 키의 순서에 맞게 정렬하여 리턴한다.