Korzystanie z sygnalizującego z przełączaniem awaryjnym usługi Redis Messagebus przy użyciu usługi ConnectionUtils.Connect () BookSleeve

112

Próbuję utworzyć scenariusz pracy awaryjnej magistrali komunikatów Redis za pomocą aplikacji SignalR.

Na początku próbowaliśmy prostego przełączania awaryjnego równoważenia obciążenia sprzętu, który po prostu monitorował dwa serwery Redis. Aplikacja sygnalizująca wskazała pojedynczy punkt końcowy HLB. Następnie jeden serwer nie powiódł się, ale nie udało mi się pomyślnie pobrać żadnych komunikatów na drugim serwerze Redis bez ponownego odtwarzania puli aplikacji SignalR. Prawdopodobnie dzieje się tak dlatego, że musi wydać polecenia konfiguracji do nowej magistrali komunikatów Redis.

Począwszy od SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBusużywa Booksleeve RedisConnection()do łączenia się z pojedynczym Redis dla pub / sub.

Utworzyłem nową klasę, RedisMessageBusCluster()która używa Booksleeve ConnectionUtils.Connect()do łączenia się z jedną w klastrze serwerów Redis.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;

namespace Microsoft.AspNet.SignalR.Redis
{
    /// <summary>
    /// WIP:  Getting scaleout for Redis working
    /// </summary>
    public class RedisMessageBusCluster : ScaleoutMessageBus
    {
        private readonly int _db;
        private readonly string[] _keys;
        private RedisConnection _connection;
        private RedisSubscriberConnection _channel;
        private Task _connectTask;

        private readonly TaskQueue _publishQueue = new TaskQueue();

        public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
            : base(resolver)
        {
            _db = db;
            _keys = keys.ToArray();

            // uses a list of connections
            _connection = ConnectionUtils.Connect(serverList);

            //_connection = new RedisConnection(host: server, port: port, password: password);

            _connection.Closed += OnConnectionClosed;
            _connection.Error += OnConnectionError;


            // Start the connection - TODO:  can remove this Open as the connection is already opened, but there's the _connectTask is used later on
            _connectTask = _connection.Open().Then(() =>
            {
                // Create a subscription channel in redis
                _channel = _connection.GetOpenSubscriberChannel();

                // Subscribe to the registered connections
                _channel.Subscribe(_keys, OnMessage);

                // Dirty hack but it seems like subscribe returns before the actual
                // subscription is properly setup in some cases
                while (_channel.SubscriptionCount == 0)
                {
                    Thread.Sleep(500);
                }
            });
        }


        protected override Task Send(Message[] messages)
        {
            return _connectTask.Then(msgs =>
            {
                var taskCompletionSource = new TaskCompletionSource<object>();

                // Group messages by source (connection id)
                var messagesBySource = msgs.GroupBy(m => m.Source);

                SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);

                return taskCompletionSource.Task;
            },
            messages);
        }

        private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
        {
            if (!enumerator.MoveNext())
            {
                taskCompletionSource.TrySetResult(null);
            }
            else
            {
                IGrouping<string, Message> group = enumerator.Current;

                // Get the channel index we're going to use for this message
                int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;

                string key = _keys[index];

                // Increment the channel number
                _connection.Strings.Increment(_db, key)
                                   .Then((id, k) =>
                                   {
                                       var message = new RedisMessage(id, group.ToArray());

                                       return _connection.Publish(k, message.GetBytes());
                                   }, key)
                                   .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
                                   .ContinueWithNotComplete(taskCompletionSource);
            }
        }

        private void OnConnectionClosed(object sender, EventArgs e)
        {
            // Should we auto reconnect?
            if (true)
            {
                ;
            }
        }

        private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
        {
            // How do we bubble errors?
            if (true)
            {
                ;
            }
        }

        private void OnMessage(string key, byte[] data)
        {
            // The key is the stream id (channel)
            var message = RedisMessage.Deserialize(data);

            _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing)
            {
                if (_channel != null)
                {
                    _channel.Unsubscribe(_keys);
                    _channel.Close(abort: true);
                }

                if (_connection != null)
                {
                    _connection.Close(abort: true);
                }                
            }

            base.Dispose(disposing);
        }
    }
}

Booksleeve ma własny mechanizm określania mastera i automatycznie przełącza się w tryb failover na inny serwer, a teraz testuję to z SignalR.Chat.

W programie web.configustawiam listę dostępnych serwerów:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>

Następnie w Application_Start():

        // Redis cluster server list
        string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];

        List<string> eventKeys = new List<string>();
        eventKeys.Add("SignalR.Redis.FailoverTest");
        GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);

Dodałem dwie dodatkowe metody do Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
    resolver.Register(typeof(IMessageBus), () => bus.Value);

    return resolver;
}

Teraz problem polega na tym, że gdy mam włączonych kilka punktów przerwania, do momentu dodania nazwy użytkownika, a następnie wyłączenia wszystkich punktów przerwania, aplikacja działa zgodnie z oczekiwaniami. Jednak gdy punkty przerwania są wyłączone od początku, wydaje się, że wystąpiły sytuacje wyścigu, które mogą zakończyć się niepowodzeniem podczas procesu łączenia.

Zatem w RedisMessageCluster():

    // Start the connection
    _connectTask = _connection.Open().Then(() =>
    {
        // Create a subscription channel in redis
        _channel = _connection.GetOpenSubscriberChannel();

        // Subscribe to the registered connections
        _channel.Subscribe(_keys, OnMessage);

        // Dirty hack but it seems like subscribe returns before the actual
        // subscription is properly setup in some cases
        while (_channel.SubscriptionCount == 0)
        {
            Thread.Sleep(500);
        }
    });

Próbowałem dodać oba Task.Wait, a nawet dodatkowe Sleep()(nie pokazane powyżej) - które czekały / etc, ale nadal otrzymywały błędy.

Powtarzający się błąd wydaje się znajdować się w Booksleeve.MessageQueue.cs~ ln 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
   --- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
   at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
   at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
   at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
   at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
   at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
   at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
   at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---



public void Enqueue(RedisMessage item, bool highPri)
{
    lock (stdPriority)
    {
        if (closed)
        {
            throw new InvalidOperationException("The queue is closed");
        }

Gdzie jest zgłaszany wyjątek zamkniętej kolejki.

Przewiduję inny problem: ponieważ połączenie Redis jest nawiązywane, Application_Start()mogą wystąpić problemy z „ponownym połączeniem” z innym serwerem. Jednak myślę, że jest to ważne, gdy używamy liczby pojedynczej RedisConnection(), gdzie jest tylko jedno połączenie do wyboru. Jednak z wprowadzeniem ConnectionUtils.Connect()chciałbym usłyszeć od @dfowlerlub innych facetów SignalR, w jaki sposób ten scenariusz jest obsługiwany w SignalR.

ElHaix
źródło
Przyjrzę się, ale: pierwszą rzeczą, która się pojawia, jest to, że nie musisz dzwonić, Openponieważ połączenie, które masz, powinno już być otwarte. Nie będę mógł jednak patrzeć od razu, ponieważ przygotowuję się do lotu
Marc Gravell
Uważam, że są tu dwie kwestie. 1) jak Booksleeve radzi sobie z przełączaniem awaryjnym; 2) Jak SignalR używa kursorów do śledzenia klientów. Kiedy inicjalizowana jest nowa magistrala komunikatów, wszystkie kursory z mb1 nie wychodzą na mb2. Dlatego podczas resetowania puli aplikacji SignalR zacznie działać - nie wcześniej, co oczywiście nie jest realną opcją.
ElHaix,
2
Łącze opisujące, w jaki sposób SignalR używa kursorów: stackoverflow.com/questions/13054592/ ...
ElHaix
Spróbuj użyć najnowszej wersji magistrali komunikatów redis. Obsługuje przekazywanie fabryki połączeń i obsługuje ponawianie połączenia w przypadku awarii serwera.
davidfowl
Czy masz link do informacji o wersji? Dzięki.
ElHaix,

Odpowiedzi:

13

Zespół SignalR zaimplementował teraz obsługę niestandardowej fabryki połączeń z StackExchange.Redis , następcą BookSleeve, który obsługuje nadmiarowe połączenia Redis za pośrednictwem ConnectionMultiplexer.

Początkowy napotkany problem polegał na tym, że pomimo stworzenia własnych metod rozszerzających w BookSleeve, aby akceptować zbiór serwerów, przełączenie awaryjne nie było możliwe.

Teraz, wraz z ewolucją BookSleeve do StackExchange.Redis, możemy teraz konfigurować zbiór serwerów / portów bezpośrednio podczas Connectinicjalizacji.

Nowa implementacja jest znacznie prostsza niż droga, którą podążałem, w tworzeniu UseRedisClustermetody, a back-end pluming obsługuje teraz prawdziwe przełączanie awaryjne:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true");

StackExchange.Redis umożliwia również dodatkową ręczną konfigurację, zgodnie z opisem w Automatic and Manual Configurationsekcji dokumentacji:

ConfigurationOptions config = new ConfigurationOptions
{
    EndPoints =
    {
        { "redis0", 6379 },
        { "redis1", 6380 }
    },
    CommandMap = CommandMap.Create(new HashSet<string>
    { // EXCLUDE a few commands
        "INFO", "CONFIG", "CLUSTER",
        "PING", "ECHO", "CLIENT"
    }, available: false),
    KeepAlive = 180,
    DefaultVersion = new Version(2, 8, 8),
    Password = "changeme"
};

Zasadniczo możliwość zainicjowania naszego skalowalnego środowiska SignalR z kolekcją serwerów rozwiązuje teraz początkowy problem.

ElHaix
źródło
Czy mam nagrodzić twoją odpowiedź nagrodą za 500 rep? ;)
nicael
Cóż, jeśli uważasz, że to jest teraz odpowiedź :)
ElHaix
@ElHaix skoro zadałeś pytanie, prawdopodobnie masz największe kwalifikacje, aby powiedzieć, czy Twoja odpowiedź jest rozstrzygająca, czy też jest to tylko element układanki - proponuję dodać zdanie, aby wskazać, czy i możliwe, jak rozwiązało Twój problem
Lars Höppner
Więc? Nagroda bounty? Albo mogę poczekać, aż przyciągnie więcej uwagi.
nicael
Czy czegoś mi brakuje, czy jest to tylko w gałęzi funkcji, a nie w głównym (2.1) pakiecie NuGet? Ponadto wygląda na to, że w gałęzi bug- stackexchange ( github.com/SignalR/SignalR/tree/bug-stackexchange/src/… ) nie ma jeszcze sposobu w klasie RedisScaleoutConfiguration, aby zapewnić własny multiplekser.
Steve,