11

資料庫查詢合併外部呼叫結果之效能與容錯設計 - 使用 .NET Polly

 3 years ago
source link: https://blog.darkthread.net/blog/polly-policy-wrap/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
資料庫查詢合併外部呼叫結果之效能與容錯設計 - 使用 .NET Polly-黑暗執行緒

情境如下:原本 API 由資料庫查詢資料,現今要整合由第三方 API 取得的資料,將二者合併成一個資料陣列傳回呼叫端。最直白的做法是查資料庫得到 List<T> list,呼叫 WebAPI 也取得 List<T> extraData,list.AddRange(extraData)。

不過,系統希望降低第三方服務品質造成的衝擊,例如:避免第三方 API 執行過慢讓原本很快的查詢等到地老天荒,或是因為第三方 API 故障導致連資料庫查詢結果都看不到。簡單來說,在第三方 API 查詢過慢或異常時,就放棄顯示來自第三方資料,盡快顯示資料庫查詢,至少維持改版前的服務水準。

我想到的策略是呼叫第三方 API 時設定一個較短時限,一旦逾時就放棄查詢只回傳資料庫查詢結果;若第三方 API 出錯,也一樣採取棄卒保帥的策略。而這兩種情況發生時,我也不希望使用者渾然無知,打算在結果插入一筆假資料提供警示(好處是前端完全不用修改),告知資料可能短少。(註:資料類似待辦工作性質,清單項目各自獨立,缺資料不會導致資訊錯亂)

原本要徒手寫 Code,但很快想起去年學到處理 Deadlock、網路瞬斷、伺服器忙線等暫時性故障的利器,去吧! Polly,就決定是你了。

這是之前沒試過的花式應用,以下是我學到的新技巧:

  1. 用 Timeout 限定 5 秒逾時。TimeoutStrategy 有分 Optimistic(樂觀) 跟 Pessimistic(悲觀) 兩種,前者會補捉 OperationCanceledException 跟 CancellationTokenSource.IsCancellationRequested 判定超時,後者則是時間到就直接報錯。範例程式用 TimeoutStrategy.Pessimistic 讓程式碼簡單一點,樂觀逾時的實作細節可參考開發人員不可缺少的重試處理利器 Polly by 余小章
  2. 我希望 Timeout 或出錯時也要傳回一筆假資料提供警示訊息,這可以透過 Polly<ReturnType>.Handle<ExceptionType>().Fallback(() => Some_ResultType) 實現。
  3. 規劃的處理流程是,設定 Timeout Policy (timeoutPolicy),用 Fallback Policy 攔截 TimeoutRejectedException (逾時會拋出的例外物件) 回傳 Timeout 警示 (timeoutFallbackPolicy),若有其他錯誤,再由最外層的 Policy 攔截所有 Exception 回傳 API 失敗警示 (fallbackPolicy)。要組合這些 Policy 的寫法是 fallbackPolicy.Wrap(timeoutFallbackPolicy).Wrap(timeoutPolicy) (由外包內) 得到 PolicyWrap 物件,再用它 .Execute() 執行作業。
  4. 由於第三方 API 可能有多個,我用 Parallel.ForEach() 多執行緒同步執行,提高效能。

我第一個完成的版本如下:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Polly;
using System.Text.Json;

namespace PollyDemo
{
    class Program
    {
        static void Main(string[] args)
        {
            var data = QueryData();
            var options = new JsonSerializerOptions {
                WriteIndented = true
            };
            Console.WriteLine(JsonSerializer.Serialize(data, options));
        }
        public class Entry
        {
            public Guid Id { get; set; }
            public string Subject { get; set; }
        }
        static IEnumerable<Entry> Call3rdApi(string srcName, int delayTime)
        {
            Task.Delay(delayTime * 1000).Wait();
            return Enumerable.Range(1, 2).Select(o =>
                 new Entry
                 {
                     Id = Guid.NewGuid(),
                     Subject = $"Data from ExtraData[{srcName}] {DateTime.Now.Ticks % 100000:00000}"
                 });
        }
        static Dictionary<string, Func<IEnumerable<Entry>>> extDataJobs = 
            new Dictionary<string, Func<IEnumerable<Entry>>>
            {
                ["SrcA"] = () => Call3rdApi("A", 3),
                ["SrcB"] = () => Call3rdApi("B", 8),
                ["SrcC"] = () => { throw new ApplicationException("Error"); }
            };
        static IEnumerable<Entry> QueryData()
        {
            var list = new List<Entry>();
            //database query simulation
            list.Add(new Entry
            {
                Id = Guid.NewGuid(),
                Subject = "Data from local service"
            });

            Parallel.ForEach(extDataJobs.Keys, (src) =>
            {
                var timeoutPolicy = 
                    Policy.Timeout(TimeSpan.FromSeconds(5), Polly.Timeout.TimeoutStrategy.Pessimistic);
                var timeoutFallbackPolicy = Policy<IEnumerable<Entry>>
                    .Handle<Polly.Timeout.TimeoutRejectedException>()
                    .Fallback(() =>
                        new List<Entry>() {
                            new Entry
                            {
                                Id = Guid.NewGuid(),
                                Subject = $"Warning: [{src}] API timeout"
                            }
                    });
                var fallbackPolicy = Policy<IEnumerable<Entry>>.Handle<Exception>().Fallback(() =>
                {
                    return new List<Entry>() {
                        new Entry
                        {
                            Id = Guid.NewGuid(),
                            Subject = $"Warning: [{src}] API failed"
                        }
                    };
                });
                
                var policyWrap = fallbackPolicy.Wrap(timeoutFallbackPolicy).Wrap(timeoutPolicy);

                var extData = policyWrap.Execute(() =>
                {
                    return extDataJobs[src]();
                });
                lock (list)
                {
                    list.AddRange(extData);
                }
            });

            return list;
        }

    }
}

測試成功。

Parllel.ForEach() 迴圈每次重新建立 Policy 有點浪費資源,另一方面,若將來要使用 CircuitBreaker (累計出錯幾次先熔斷一段時間),必須共用或重複使用 Policy 物件,上面的寫法借用了 Closure 特性,在 Fallback 中直接取用 src 變數,如果要共用,src 就必須以參數方式傳入。這裡又學到一個技巧,透過 contextData 在 Policy 間傳遞參數。做法是 Execute() 時加入 context 參數並將 src 存入 Dictionary<string, object> 建立 contextData 物件,policy.Execute((context) => , , contextData: new Dictionary<string, object> { ["Src"] = src });兩個 Fallback 則接收 context,由 context["Src"] 取出 src,另外宣告一個空的 onFallback: (ex, ctx) => ) 參數是為了吻合 Fallback 多載 (Overloading) 要求的參數數量與型別。

修改版如下:

static void Main(string[] args)
{
    PreparePolicy();
    var data = QueryData();
    var options = new JsonSerializerOptions
    {
        WriteIndented = true
    };
    Console.WriteLine(JsonSerializer.Serialize(data, options));
}

static Polly.Wrap.PolicyWrap<IEnumerable<Entry>> policy = null;
static void PreparePolicy()
{
    var timeoutPolicy =
        Policy.Timeout(TimeSpan.FromSeconds(5), Polly.Timeout.TimeoutStrategy.Pessimistic);
    var timeoutFallbackPolicy = Policy<IEnumerable<Entry>>
        .Handle<Polly.Timeout.TimeoutRejectedException>()
        .Fallback((context) =>
            new List<Entry>() {
                        new Entry
                        {
                            Id = Guid.NewGuid(),
                            Subject = $"Warning: [{context["Src"]}] API timeout"
                        }
            }, onFallback: (ex, ctx) => { });
    var fallbackPolicy = Policy<IEnumerable<Entry>>.Handle<Exception>()
        .Fallback((context) =>
            new List<Entry>() {
                        new Entry
                        {
                            Id = Guid.NewGuid(),
                            Subject = $"Warning: [{context["Src"]}] API failed"
                        }
            }, onFallback: (ex, ctx) => { });

    policy = fallbackPolicy.Wrap(timeoutFallbackPolicy).Wrap(timeoutPolicy);
}

static IEnumerable<Entry> QueryData()
{
    var list = new List<Entry>();
    //database query simulation
    list.Add(new Entry
    {
        Id = Guid.NewGuid(),
        Subject = "Data from local service"
    });

    Parallel.ForEach(extDataJobs.Keys, (src) =>
    {
        var extData = policy.Execute((context) =>
        {
            return extDataJobs[src]();
        }, contextData: new Dictionary<string, object>
        {
            ["Src"] = src
        });
        lock (list)
        {
            list.AddRange(extData);
        }
    });

    return list;
}

範例程式已放在 Github,有需要的同學請自取參考。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK