RedisPipelining底層原理分析及實踐

架構互聯高可用 2024-05-12 08:25:26

作者:vivo 互聯網服務器團隊-Wang Fei

Redis是一種基于客戶端-服務端模型以及請求/響應的TCP服務。在遇到批處理命令執行時,Redis提供了Pipelining(管道)來提升批處理性能。本文結合實踐分析了Spring Boot框架下Redis的Lettuce客戶端和Redisson客戶端對Pipeline特性的支持原理,並針對實踐過程中遇到的問題進行了分析,可以幫助開發者了解不同客戶端對Pipeline支持原理及避免實際使用中出現問題。

一、前言

Redis 已經提供了像 mget 、mset 這種批量的命令,但是某些操作根本就不支持或沒有批量的操作,從而與 Redis 高性能背道而馳。爲此, Redis基于管道機制,提供Redis Pipeline新特性。Redis Pipeline是一種通過一次性發送多條命令並在執行完後一次性將結果返回,從而減少客戶端與redis的通信次數來實現降低往返延時時間提升操作性能的技術。目前,Redis Pipeline是被很多個版本的Redis 客戶端所支持的。

二、Pipeline 底層原理分析

2.1 Redis單個命令執行基本步驟

Redis是一種基于客戶端-服務端模型以及請求/響應的TCP服務。一次Redis客戶端發起的請求,經過服務端的響應後,大致會經曆如下的步驟:

客戶端發起一個(查詢/插入)請求,並監聽socket返回,通常情況都是阻塞模式等待Redis服務器的響應。

服務端處理命令,並且返回處理結果給客戶端。

客戶端接收到服務的返回結果,程序從阻塞代碼處返回。

2.2 RTT 時間

Redis客戶端和服務端之間通過網絡連接進行數據傳輸,數據包從客戶端到達服務器,並從服務器返回數據回複客戶端的時間被稱之爲RTT(Round Trip Time - 往返時間)。我們可以很容易就意識到,Redis在連續請求服務端時,如果RTT時間爲250ms, 即使Redis每秒能處理100k請求,但也會因爲網絡傳輸花費大量時間,導致每秒最多也只能處理4個請求,導致整體性能的下降。

2.3 Redis Pipeline

爲了提升效率,這時候Pipeline出現了。Pipelining不僅僅能夠降低RRT,實際上它極大的提升了單次執行的操作數。這是因爲如果不使用Pipelining,那麽每次執行單個命令,從訪問數據的結構和服務端産生應答的角度,它的成本是很低的。但是從執行網絡IO的角度,它的成本其實是很高的。其中涉及到read()和write()的系統調用,這意味著需要從用戶態切換到內核態,而這個上下文的切換成本是巨大的。

當使用Pipeline時,它允許多個命令的讀通過一次read()操作,多個命令的應答使用一次write()操作,它允許客戶端可以一次發送多條命令,而不等待上一條命令執行的結果。不僅減少了RTT,同時也減少了IO調用次數(IO調用涉及到用戶態到內核態之間的切換),最終提升程序的執行效率與性能。如下圖:

要支持Pipeline,其實既要服務端的支持,也要客戶端支持。對于服務端來說,所需要的是能夠處理一個客戶端通過同一個TCP連接發來的多個命令,可以理解爲,這裏將多個命令切分,和處理單個命令一樣,Redis就是這樣處理的。而客戶端,則是要將多個命令緩存起來,緩沖區滿了就發送,然後再寫緩沖,最後才處理Redis的應答。

三、Pipeline 基本使用及性能比較

下面我們以給10w個set結構分別插入一個整數值爲例,分別使用jedis單個命令插入、jedis使用Pipeline模式進行插入和redisson使用Pipeline模式進行插入以及測試其耗時。

@Slf4jpublic RedisPipelineTestDemo { public static void main(String[] args) { //連接redis Jedis jedis = new Jedis("10.101.17.180", 6379); //jedis逐一給每個set新增一個value String zSetKey = "Pipeline-test-set"; int size = 100000; long begin = System.currentTimeMillis(); for (int i = 0; i < size; i++) { jedis.sadd(zSetKey + i, "aaa"); } log.info("Jedis逐一給每個set新增一個value耗時:{}ms", (System.currentTimeMillis() - begin)); //Jedis使用Pipeline模式 Pipeline Pipeline = jedis.Pipelined(); begin = System.currentTimeMillis(); for (int i = 0; i < size; i++) { Pipeline.sadd(zSetKey + i, "bbb"); } Pipeline.sync(); log.info("Jedis Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin)); //Redisson使用Pipeline模式 Config config = new Config(); config.useSingleServer().setAddress("redis://10.101.17.180:6379"); RedissonClient redisson = Redisson.create(config); RBatch redisBatch = redisson.createBatch(); begin = System.currentTimeMillis(); for (int i = 0; i < size; i++) { redisBatch.getSet(zSetKey + i).addAsync("ccc"); } redisBatch.execute(); log.info("Redisson Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin)); //關閉 Pipeline.close(); jedis.close(); redisson.shutdown(); }}

測試結果如下:

Jedis逐一給每個set新增一個value耗時:162655ms

Jedis Pipeline模式耗時:504ms

Redisson Pipeline模式耗時:1399ms

我們發現使用Pipeline模式對應的性能會明顯好于單個命令執行的情況。

四、項目中實際應用

在實際使用過程中有這樣一個場景,很多應用在節假日的時候需要更新應用圖標樣式,在運營進行後台配置的時候, 可以根據圈選的用戶標簽預先計算出單個用戶需要下發的圖標樣式並存儲在Redis裏面,從而提升性能,這裏就涉及Redis的批量操作問題,業務流程如下:

爲了提升Redis操作性能,我們決定使用Redis Pipelining機制進行批量執行。

4.1 Redis 客戶端對比

針對Java技術棧而言,目前Redis使用較多的客戶端爲Jedis、Lettuce和Redisson。

目前項目主要是基于SpringBoot開發,針對Redis,其默認的客戶端爲Lettuce,所以我們基于Lettuce客戶端進行分析。

4.2 Spring環境下Lettuce客戶端對Pipeline的實現

在Spring環境下,使用Redis的Pipeline也是很簡單的。spring-data-redis提供了

StringRedisTemplate簡化了對Redis的操作, 只需要調用StringRedisTemplate的executePipelined方法就可以了,但是在參數中提供了兩種回調方式:SessionCallback和RedisCallback。

兩種使用方式如下(這裏以操作set結構爲例):

RedisCallback的使用方式:

public void testRedisCallback() { List<Integer> ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); Integer contentId = 1; redisTemplate.executePipelined(new InsertPipelineExecutionA(ids, contentId)); } @AllArgsConstructor private static InsertPipelineExecutionA implements RedisCallback<Void> { private final List<Integer> ids; private final Integer contentId; @Override public Void doInRedis(RedisConnection connection) DataAccessException { RedisSetCommands redisSetCommands = connection.setCommands(); ids.forEach(id-> { String redisKey = "aaa:" + id; String value = String.valueOf(contentId); redisSetCommands.sAdd(redisKey.getBytes(), value.getBytes()); }); return ; } }

SessionCallback的使用方式:

public void testSessionCallback() { List<Integer> ids= Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); Integer contentId = 1; redisTemplate.executePipelined(new InsertPipelineExecutionB(ids, contentId)); } @AllArgsConstructor private static InsertPipelineExecutionB implements SessionCallback<Void> { private final List<Integer> ids; private final Integer contentId; @Override public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException { SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet(); ids.forEach(id-> { String redisKey = "aaa:" + id; String value = String.valueOf(contentId); setOperations.add(redisKey, value); }); return ; } }

4.3 RedisCallBack和SessionCallback之間的比較

1、RedisCallBack和SessionCallback都可以實現回調,通過它們可以在同一條連接中一次執行多個redis命令。

2、RedisCallback使用的是原生

RedisConnection,用起來比較麻煩,比如上面執行set的add操作,key和value需要進行轉換,可讀性差,但原生api提供的功能比較齊全。

3、SessionCalback提供了良好的封裝,可以優先選擇使用這種回調方式。

最終的代碼實現如下:

public void executeB(List<Integer> userIds, Integer iconId) { redisTemplate.executePipelined(new InsertPipelineExecution(userIds, iconId));} @AllArgsConstructorprivate static InsertPipelineExecution implements SessionCallback<Void> { private final List<Integer> userIds; private final Integer iconId; @Override public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException { SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet(); userIds.forEach(userId -> { String redisKey = "aaa:" + userId; String value = String.valueOf(iconId); setOperations.add(redisKey, value); }); return ; }}

4.4 源碼分析

那麽爲什麽使用Pipeline方式會對性能有較大提升呢,我們現在從源碼入手著重分析一下:

4.4.1 Pipeline方式下獲取連接相關原理分析:

@Override public List<Object> executePipelined(SessionCallback<?> session, @able RedisSerializer<?> resultSerializer) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.not(session, "Callback object must not be "); //1. 獲取對應的Redis連接工廠 RedisConnectionFactory factory = getRequiredConnectionFactory(); //2. 綁定連接過程 RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); try { //3. 執行命令流程, 這裏請求參數爲RedisCallback, 裏面有對應的回調操作 return execute((RedisCallback<List<Object>>) connection -> { //具體的回調邏輯 connection.openPipeline(); boolean PipelinedClosed = false; try { //執行命令 Object result = executeSession(session); if (result != ) { throw new InvalidDataAccessApiUsageException( "Callback cannot return a non- value as it gets overwritten by the Pipeline"); } List<Object> closePipeline = connection.closePipeline(); PipelinedClosed = true; return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer); } finally { if (!PipelinedClosed) { connection.closePipeline(); } } }); } finally { RedisConnectionUtils.unbindConnection(factory); } }

① 獲取對應的Redis連接工廠,這裏要使用Pipeline特性需要使用

LettuceConnectionFactory方式,這裏獲取的連接工廠就是LettuceConnectionFactory。

② 綁定連接過程,具體指的是將當前連接綁定到當前線程上面, 核心方法爲:doGetConnection。

public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport) { Assert.not(factory, "No RedisConnectionFactory specified"); //核心類,有緩存作用,下次可以從這裏獲取已經存在的連接 RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); //如果connHolder不爲, 則獲取已經存在的連接, 提升性能 if (connHolder != ) { if (enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } ...... //第一次獲取連接,需要從Redis連接工廠獲取連接 RedisConnection conn = factory.getConnection(); //bind = true 執行綁定 if (bind) { RedisConnection connectionToBind = conn; ...... connHolder = new RedisConnectionHolder(connectionToBind); //綁定核心代碼: 將獲取的連接和當前線程綁定起來 TransactionSynchronizationManager.bindResource(factory, connHolder); ...... return connHolder.getConnection(); } return conn; }

裏面有個核心類RedisConnectionHolder,我們看一下

RedisConnectionHolder connHolder =

(RedisConnectionHolder)

TransactionSynchronizationManager.getResource(factory);

@able public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Object value = doGetResource(actualKey); if (value != && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; }

裏面有一個核心方法doGetResource

(actualKey),大家很容易猜測這裏涉及到一個map結構,如果我們看源碼,也確實是這樣一個結構。

@able private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == ) { return ; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = ; } return value; }

resources是一個ThreadLocal類型,這裏會涉及到根據RedisConnectionFactory獲取到連接connection的邏輯,如果下一次是同一個actualKey,那麽就直接使用已經存在的連接,而不需要新建一個連接。第一次這裏map爲,就直接返回了,然後回到doGetConnection方法,由于這裏bind爲true,我們會執行TransactionSynchronizationManager.bindResource(factory, connHolder);,也就是將連接和當前線程綁定了起來。

public static void bindResource(Object key, Object value) throws IllegalStateException { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); Assert.not(value, "Value must not be "); Map<Object, Object> map = resources.get(); // set ThreadLocal Map if none found if (map == ) { map = new HashMap<>(); resources.set(map); } Object oldValue = map.put(actualKey, value); ...... }

③ 我們回到executePipelined,在獲取到連接工廠,將連接和當前線程綁定起來以後,就開始需要正式去執行命令了, 這裏會調用execute方法

@Override@ablepublic <T> T execute(RedisCallback<T> action) { return execute(action, isExposeConnection());}

這裏我們注意到execute方法的入參爲RedisCallback<T>action,RedisCallback對應的doInRedis操作如下,這裏在後面的調用過程中會涉及到回調。

connection.openPipeline();boolean PipelinedClosed = false;try { Object result = executeSession(session); if (result != ) { throw new InvalidDataAccessApiUsageException( "Callback cannot return a non- value as it gets overwritten by the Pipeline"); } List<Object> closePipeline = connection.closePipeline(); PipelinedClosed = true; return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);} finally { if (!PipelinedClosed) { connection.closePipeline(); }}

我們再來看execute(action,

isExposeConnection())方法,這裏最終會調用

<T>execute(RedisCallback<T>action, boolean exposeConnection, boolean Pipeline)方法。

@ablepublic <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean Pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.not(action, "Callback object must not be "); //獲取對應的連接工廠 RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = ; try { if (enableTransactionSupport) { // only bind resources in case of potential transaction synchronization conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport); } else { //獲取對應的連接(enableTransactionSupport=false) conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = preProcessConnection(conn, existingConnection); boolean PipelineStatus = connToUse.isPipelined(); if (Pipeline && !PipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); //核心方法,這裏就開始執行回調操作 T result = action.doInRedis(connToExpose); // close Pipeline if (Pipeline && !PipelineStatus) { connToUse.closePipeline(); } // TODO: any other connection processing? return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport); }}

我們看到這裏最開始也是獲取對應的連接工廠,然後獲取對應的連接

(enableTransactionSupport=false),具體調用是

RedisConnectionUtils.getConnection(factory)方法,最終會調用

RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport),此時bind爲false

public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport) { Assert.not(factory, "No RedisConnectionFactory specified"); //直接獲取與當前線程綁定的Redis連接 RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory); if (connHolder != ) { if (enableTransactionSupport) { potentiallyRegisterTransactionSynchronisation(connHolder, factory); } return connHolder.getConnection(); } ...... return conn;}

前面我們分析過一次,這裏調用

RedisConnectionHolder connHolder =

(RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);會獲取到之前和當前線程綁定的Redis,而不會新創建一個連接。

然後會去執行T result = action.

doInRedis(connToExpose),這裏的action爲RedisCallback,執行doInRedis爲:

//開啓Pipeline功能connection.openPipeline();boolean PipelinedClosed = false;try { //執行Redis命令 Object result = executeSession(session); if (result != ) { throw new InvalidDataAccessApiUsageException( "Callback cannot return a non- value as it gets overwritten by the Pipeline"); } List<Object> closePipeline = connection.closePipeline(); PipelinedClosed = true; return deserializeMixedResults(closePipeline, resultSerializer, hashKeySerializer, hashValueSerializer);} finally { if (!PipelinedClosed) { connection.closePipeline(); }}

這裏最開始會開啓Pipeline功能,然後執行

Object result = executeSession(session);

private Object executeSession(SessionCallback<?> session) { return session.execute(this);}

這裏會調用我們自定義的execute方法

@AllArgsConstructorprivate static InsertPipelineExecution implements SessionCallback<Void> { private final List<Integer> userIds; private final Integer iconId; @Override public <K, V> Void execute(RedisOperations<K, V> operations) throws DataAccessException { SetOperations<String, String> setOperations = (SetOperations<String, String>) operations.opsForSet(); userIds.forEach(userId -> { String redisKey = "aaa:" + userId; String value = String.valueOf(iconId); setOperations.add(redisKey, value); }); return ; }}

進入到foreach循環,執行DefaultSetOperations的add方法。

@Overridepublic Long add(K key, V... values) { byte[] rawKey = rawKey(key); byte[][] rawValues = rawValues((Object[]) values); //這裏的connection.sAdd是後續回調要執行的方法 return execute(connection -> connection.sAdd(rawKey, rawValues), true);}

這裏會繼續執行redisTemplate的execute方法,裏面最終會調用我們之前分析過的<T>T execute(RedisCallback<T>action, boolean exposeConnection, boolean Pipeline)方法。

@ablepublic <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean Pipeline) { Assert.isTrue(initialized, "template not initialized; call afterPropertiesSet() before using it"); Assert.not(action, "Callback object must not be "); RedisConnectionFactory factory = getRequiredConnectionFactory(); RedisConnection conn = ; try { ...... //再次執行回調方法,這裏執行的Redis基本數據結構對應的操作命令 T result = action.doInRedis(connToExpose); ...... // TODO: any other connection processing? return postProcessResult(result, connToUse, existingConnection); } finally { RedisConnectionUtils.releaseConnection(conn, factory, enableTransactionSupport); }}

這裏會繼續執行T result =

action.doInRedis(connToExpose);,這裏其實執行的doInRedis方法爲:

connection -> connection.sAdd(rawKey, rawValues)

4.4.2 Pipeline方式下執行命令的流程分析:

① 接著上面的流程分析,這裏的sAdd方法實際調用的是DefaultStringRedisConnection的sAdd方法

@Overridepublic Long sAdd(byte[] key, byte[]... values) { return convertAndReturn(delegate.sAdd(key, values), identityConverter);}

② 這裏會進一步調用

DefaultedRedisConnection的sAdd方法

@Override@Deprecateddefault Long sAdd(byte[] key, byte[]... values) { return setCommands().sAdd(key, values);}

③ 接著調用LettuceSetCommands的sAdd方法

@Overridepublic Long sAdd(byte[] key, byte[]... values) { Assert.not(key, "Key must not be !"); Assert.not(values, "Values must not be !"); Assert.noElements(values, "Values must not contain elements!"); try { // 如果開啓了 Pipelined 模式,獲取的是 異步連接,進行異步操作 if (isPipelined()) { Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values))); return ; } if (isQueueing()) { transaction(connection.newLettuceResult(getAsyncConnection().sadd(key, values))); return ; } //常規模式下,使用的是同步操作 return getConnection().sadd(key, values); } catch (Exception ex) { throw convertLettuceAccessException(ex); }}

這裏我們開啓了Pipeline, 實際會調用

Pipeline(connection.newLettuceResult(getAsyncConnection().sadd(key, values))); 也就是獲取異步連接getAsyncConnection,然後進行異步操作sadd,而常規模式下,使用的是同步操作,所以在Pipeline模式下,執行效率更高。

從上面的獲取連接和具體命令執行相關源碼分析可以得出使用Lettuce客戶端Pipeline模式高效的根本原因:

普通模式下,每執行一個命令都需要先打開一個連接,命令執行完畢以後又需要關閉這個連接,執行下一個命令時,又需要經過連接打開和關閉的流程;而Pipeline的所有命令的執行只需要經過一次連接打開和關閉。

普通模式下命令的執行是同步阻塞模式,而Pipeline模式下命令的執行是異步非阻塞模式。

五、項目中遇到的坑

前面介紹了涉及到批量操作,可以使用Redis Pipelining機制,那是不是任何批量操作相關的場景都可以使用呢,比如list類型數據的批量移除操作,我們的代碼最開始是這麽寫的:

public void deleteSet(String updateKey, Set<Integer> userIds) { if (CollectionUtils.isEmpty(userIds)) { return; } redisTemplate.executePipelined(new DeleteListCallBack(userIds, updateKey)); } @AllArgsConstructorprivate static DeleteListCallBack implements SessionCallback<Object> { private Set<Integer> userIds; private String updateKey; @Override public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException { ListOperations<String, String> listOperations = (ListOperations<String, String>) operations.opsForList(); userIds.forEach(userId -> listOperations.remove(updateKey, 1, userId.toString())); return ; }}

在數據量比較小的時候沒有出現問題,直到有一條收到了Redis的內存和cpu利用率的告警消息,我們發現這麽使用是有問題的,核心原因在于list的lrem操作的時間複雜度是O(N+M),其中N是list的長度, M是要移除的元素的個數,而我們這裏還是一個一個移除的,當然會導致Redis數據積壓和cpu每秒ops升高導致cpu利用率飚高。也就是說,即使使用Pipeline進行批量操作,但是由于單次操作很耗時,是會導致整個Redis出現問題的。

後面我們進行了優化,選用了list的ltrim命令,一次命令執行批量remove操作:

public void deleteSet(String updateKey, Set<Integer> deviceIds) { if (CollectionUtils.isEmpty(deviceIds)) { return; } int maxSize = 10000; redisTemplate.opsForList().trim(updateKey, maxSize + 1, -1); }

由于ltrim本身的時間複雜度爲O(M), 其中M要移除的元素的個數,相比于原始方案的lrem,效率提升很多,可以不需要使用Redis Pipeline,優化結果使得Redis內存利用率和cpu利用率都極大程度得到緩解。

六、Redisson 對 Redis Pipeline 特性支持

在redisson官方文檔中額外特性介紹中有說到批量命令執行這個特性, 也就是多個命令在一次網絡調用中集中發送,該特性是RBatch這個類支持的,從這個類的描述來看,主要是爲Redis Pipeline這個特性服務的,並且主要是通過隊列和異步實現的。

/** * Interface for using Redis Pipeline feature. * <p> * All method invocations on objects got through this interface * are batched to separate queue and could be executed later * with <code>execute()</code> or <code>executeAsync()</code> methods. * * * @author Nikita Koksharov * */public interface RBatch { /** * Returns stream instance by <code>name</code> * * @param <K> type of key * @param <V> type of value * @param name of stream * @return RStream object */ <K, V> RStreamAsync<K, V> getStream(String name); /** * Returns stream instance by <code>name</code> * using provided <code>codec</code> for entries. * * @param <K> type of key * @param <V> type of value * @param name - name of stream * @param codec - codec for entry * @return RStream object */ <K, V> RStreamAsync<K, V> getStream(String name, Codec codec); ...... /** * Returns list instance by name. * * @param <V> type of object * @param name - name of object * @return List object */ <V> RListAsync<V> getList(String name); <V> RListAsync<V> getList(String name, Codec codec); ...... /** * Executes all operations accumulated during async methods invocations. * <p> * If cluster configuration used then operations are grouped by slot ids * and may be executed on different servers. Thus command execution order could be changed * * @return List with result object for each command * @throws RedisException in case of any error * */ BatchResult<?> execute() throws RedisException; /** * Executes all operations accumulated during async methods invocations asynchronously. * <p> * In cluster configurations operations grouped by slot ids * so may be executed on different servers. Thus command execution order could be changed * * @return List with result object for each command */ RFuture<BatchResult<?>> executeAsync(); /** * Discard batched commands and release allocated buffers used for parameters encoding. */ void discard(); /** * Discard batched commands and release allocated buffers used for parameters encoding. * * @return void */ RFuture<Void> discardAsync(); }

簡單的測試代碼如下:

@Slf4jpublic RedisPipelineTest { public static void main(String[] args) { //Redisson使用Pipeline模式 Config config = new Config(); config.useSingleServer().setAddress("redis://xx.xx.xx.xx:6379"); RedissonClient redisson = Redisson.create(config); RBatch redisBatch = redisson.createBatch(); int size = 100000; String zSetKey = "Pipeline-test-set"; long begin = System.currentTimeMillis(); //將命令放入隊列中 for (int i = 0; i < size; i++) { redisBatch.getSet(zSetKey + i).addAsync("ccc"); } //批量執行命令 redisBatch.execute(); log.info("Redisson Pipeline模式耗時:{}ms", (System.currentTimeMillis() - begin)); //關閉 redisson.shutdown(); }}

核心方法分析:

1.建Redisson客戶端RedissonClient redisson = redisson.create(config), 該方法最終會調用Reddison的構造方法Redisson(Config config)。

protected Redisson(Config config) { this.config = config; Config configCopy = new Config(config); connectionManager = ConfigSupport.createConnectionManager(configCopy); RedissonObjectBuilder objectBuilder = ; if (config.isReferenceEnabled()) { objectBuilder = new RedissonObjectBuilder(this); } //新建異步命令執行器 commandExecutor = new CommandSyncService(connectionManager, objectBuilder); //執行刪除超時任務的定時器 evictionScheduler = new EvictionScheduler(commandExecutor); writeBehindService = new WriteBehindService(commandExecutor);}

該構造方法中會新建異步命名執行器CommandAsyncExecutor commandExecutor和用戶刪除超時任務的EvictionScheduler evictionScheduler。

2.創建RBatch實例RBatch redisBatch = redisson.createBatch(), 該方法會使用到步驟1中的commandExecutor和evictionScheduler實例對象。

@Overridepublic RBatch createBatch(BatchOptions options) { return new RedissonBatch(evictionScheduler, commandExecutor, options);} public RedissonBatch(EvictionScheduler evictionScheduler, CommandAsyncExecutor executor, BatchOptions options) { this.executorService = new CommandBatchService(executor, options); this.evictionScheduler = evictionScheduler;}

其中的options對象會影響後面批量執行命令的流程。

3. 異步給set集合添加元素的操作addAsync,這裏會具體調用RedissonSet的addAsync方法

@Overridepublic RFuture<Boolean> addAsync(V e) { String name = getRawName(e); return commandExecutor.writeAsync(name, codec, RedisCommands.SADD_SINGLE, name, encode(e));}

(1)接著調用CommandAsyncExecutor的異步寫入方法writeAsync。

@Overridepublic <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) { RPromise<R> mainPromise = createPromise(); NodeSource source = getNodeSource(key); async(false, source, codec, command, params, mainPromise, false); return mainPromise;}

(2) 接著調用批量命令執行器

CommandBatchService的異步發送命令。

@Overridepublic <V, R> void async(boolean readOnlyMode, NodeSource nodeSource, Codec codec, RedisCommand<V> command, Object[] params, RPromise<R> mainPromise, boolean ignoreRedirect) { if (isRedisBasedQueue()) { boolean isReadOnly = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC; RedisExecutor<V, R> executor = new RedisQueuedBatchExecutor<>(isReadOnly, nodeSource, codec, command, params, mainPromise, false, connectionManager, objectBuilder, commands, connections, options, index, executed, latch, referenceType); executor.execute(); } else { //執行分支 RedisExecutor<V, R> executor = new RedisBatchExecutor<>(readOnlyMode, nodeSource, codec, command, params, mainPromise, false, connectionManager, objectBuilder, commands, options, index, executed, referenceType); executor.execute(); } }

(3) 接著調用了RedisBatchExecutor.

execute方法和BaseRedisBatchExecutor.

addBatchCommandData方法。

@Overridepublic void execute() { addBatchCommandData(params);} protected final void addBatchCommandData(Object[] batchParams) { MasterSlaveEntry msEntry = getEntry(source); Entry entry = commands.get(msEntry); if (entry == ) { entry = new Entry(); Entry oldEntry = commands.putIfAbsent(msEntry, entry); if (oldEntry != ) { entry = oldEntry; } } if (!readOnlyMode) { entry.setReadOnlyMode(false); } Codec codecToUse = getCodec(codec); BatchCommandData<V, R> commandData = new BatchCommandData<V, R>(mainPromise, codecToUse, command, batchParams, index.incrementAndGet()); entry.getCommands().add(commandData);}

這裏的commands以主節點爲KEY,以待發送命令隊列列表爲VALUE(Entry),保存一個MAP.然後會把命令都添加到entry的commands命令隊列中, Entry結構如下面代碼所示。

public static Entry { Deque<BatchCommandData<?, ?>> commands = new LinkedBlockingDeque<>(); volatile boolean readOnlyMode = true; public Deque<BatchCommandData<?, ?>> getCommands() { return commands; } public void setReadOnlyMode(boolean readOnlyMode) { this.readOnlyMode = readOnlyMode; } public boolean isReadOnlyMode() { return readOnlyMode; } public void clearErrors() { for (BatchCommandData<?, ?> commandEntry : commands) { commandEntry.clearError(); } } }

4. 批量執行命令redisBatch.execute(),這裏會最終調用CommandBatchService的executeAsync方法,該方法完整代碼如下,我們下面來逐一進行拆解。

public RFuture<BatchResult<?>> executeAsync() { ...... RPromise<BatchResult<?>> promise = new RedissonPromise<>(); RPromise<Void> voidPromise = new RedissonPromise<Void>(); if (this.options.isSkipResult() && this.options.getSyncSlaves() == 0) { ...... } else { //這裏是對異步執行結果進行處理,可以先忽略, 後面會詳細講,先關注批量執行命令的邏輯 voidPromise.onComplete((res, ex) -> { ...... }); } AtomicInteger slots = new AtomicInteger(commands.size()); ...... //真正執行的代碼入口,批量執行命令 for (Map.Entry<MasterSlaveEntry, Entry> e : commands.entrySet()) { RedisCommonBatchExecutor executor = new RedisCommonBatchExecutor(new NodeSource(e.getKey()), voidPromise, connectionManager, this.options, e.getValue(), slots, referenceType); executor.execute(); } return promise; }

裏面會用到我們在3.3步驟所生成的commands實例。

(1)接著調用了基類RedisExecutor的execute方法

public void execute() { ...... connectionFuture.onComplete((connection, e) -> { if (connectionFuture.isCancelled()) { connectionManager.getShutdownLatch().release(); return; } if (!connectionFuture.isSuccess()) { connectionManager.getShutdownLatch().release(); exception = convertException(connectionFuture); return; } //調用RedisCommonBatchExecutor的sendCommand方法, 裏面會將多個命令放到一個List<CommandData<?, ?>> list列表裏面 sendCommand(attemptPromise, connection); writeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { checkWriteFuture(writeFuture, attemptPromise, connection); } }); }); ...... }

(2)接著調用

RedisCommonBatchExecutor的sendCommand方法,裏面會將多個命令放到一個List<commanddata> list列表裏面。

@Override protected void sendCommand(RPromise<Void> attemptPromise, RedisConnection connection) { boolean isAtomic = options.getExecutionMode() != ExecutionMode.IN_MEMORY; boolean isQueued = options.getExecutionMode() == ExecutionMode.REDIS_READ_ATOMIC || options.getExecutionMode() == ExecutionMode.REDIS_WRITE_ATOMIC; //將多個命令放到一個List<CommandData<?, ?>> list列表裏面 List<CommandData<?, ?>> list = new ArrayList<>(entry.getCommands().size()); if (source.getRedirect() == Redirect.ASK) { RPromise<Void> promise = new RedissonPromise<Void>(); list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {})); } for (CommandData<?, ?> c : entry.getCommands()) { if ((c.getPromise().isCancelled() || c.getPromise().isSuccess()) && !isWaitCommand(c) && !isAtomic) { // skip command continue; } list.add(c); } ...... //調用RedisConnection的send方法,將命令一次性發到Redis服務器端 writeFuture = connection.send(new CommandsData(attemptPromise, list, options.isSkipResult(), isAtomic, isQueued, options.getSyncSlaves() > 0)); }

(3)接著調用RedisConnection的send方法,通過Netty通信發送命令到Redis服務器端執行,這裏也驗證了Redisson客戶端底層是采用Netty進行通信的。

public ChannelFuture send(CommandsData data) { return channel.writeAndFlush(data);}

5. 接收返回結果,這裏主要是監聽事件是否完成,然後組裝返回結果, 核心方法是步驟4提到的CommandBatchService的executeAsync方法,裏面會對返回結果進行監聽和處理, 核心代碼如下:

public RFuture<BatchResult<?>> executeAsync() { ...... RPromise<BatchResult<?>> promise = new RedissonPromise<>(); RPromise<Void> voidPromise = new RedissonPromise<Void>(); if (this.options.isSkipResult() && this.options.getSyncSlaves() == 0) { ...... } else { voidPromise.onComplete((res, ex) -> { //對返回結果的處理 executed.set(true); ...... List<Object> responses = new ArrayList<Object>(entries.size()); int syncedSlaves = 0; for (BatchCommandData<?, ?> commandEntry : entries) { if (isWaitCommand(commandEntry)) { syncedSlaves = (Integer) commandEntry.getPromise().getNow(); } else if (!commandEntry.getCommand().getName().equals(RedisCommands.MULTI.getName()) && !commandEntry.getCommand().getName().equals(RedisCommands.EXEC.getName()) && !this.options.isSkipResult()) { ...... //獲取單個命令的執行結果 Object entryResult = commandEntry.getPromise().getNow(); ...... //將單個命令執行結果放到List中 responses.add(entryResult); } } BatchResult<Object> result = new BatchResult<Object>(responses, syncedSlaves); promise.trySuccess(result); ...... }); } ...... return promise;}

這裏會把單個命令的執行結果放到responses裏面,最終返回RPromise<batchresult>promise。

從上面的分析來看,Redisson客戶端對Redis Pipeline的支持也是從多個命令在一次網絡通信中執行和異步處理來實現的。

七、總結

Redis提供了Pipelining進行批量操作的高級特性,極大地提高了部分數據類型沒有批量執行命令導致的執行耗時而引起的性能問題,但是我們在使用的過程中需要考慮Pipeline操作中單個命令執行的耗時問題,否則帶來的效果可能適得其反。最後擴展分析了Redisson客戶端對Redis Pipeline特性的支持原理,可以與Lettuce客戶端對Redis Pipeline支持原理進行比較,加深Pipeline在不同Redis客戶端實現方式的理解。

參考資料:

Redis Pipelining

RedisTemplate使用Pipeline管道命令

如何使用好Redis Pipeline

Redisson 管道批量發送命令流程分析

活動介紹:GIAC全球互聯網架構大會

本文由高可用架構轉載。技術原創及架構實踐文章,歡迎通過公衆號菜單「聯系我們」進行投稿

0 阅读:0

架構互聯高可用

簡介:感謝大家的關注