Kafka Archives - credativ®

In diesem nächsten Teil der Blog-Reihe über die Cadence-Drohnen-Demonstrationsanwendung stellen wir ein neues und verbessertes Cadence+Kafka-Integrationsmuster vor. Wir haben die Grenzen der Skalierbarkeit von Cadence getestet, um herauszufinden, wie viele Drohnen wir gleichzeitig fliegen lassen können.

1. Neu und verbessert: ein Matching-Service für Drohnenaufträge

In New York kann es schwierig sein, ein Taxi zu bekommen.

In New York kann es schwierig sein ein Taxi zu bekommen. – (Quelle: Shutterstock)

Wenn ich sehe, dass Produkte als „neu und verbessert“ angepriesen werden, frage ich mich oft, was vorher nicht mit ihnen in Ordnung war (Warum muss etwas verbessert werden, was nicht fehlerhaft war?). Angelehnt an diese Theorie hat sich herausgestellt, dass mein bisheriger Cadence-Anwendungscode für die Demo für Drohnenlieferungen „fehlerhaft“ war.

In einem vorherigen Blogbeitrag (Abschnitt 4: Drohne erhält nächste Bestellung zur Lieferung) haben wir eines der Cadence+Kafka-Integrationsmuster erläutert, das es Drohnen ermöglichen soll, eine Bestellung abzuholen, die zur Auslieferung bereit ist. Wir haben einen einfachen Warteschlangenansatz mit einem einzelnen Kafka-Consumer verwendet, der wie folgt eng mit jedem Drohnen-Workflow gekoppelt war. Der Drohnen-Workflow umfasst eine Aktivität waitForOrder(). Diese umschließt einen Kafka-Consumer, der tatsächlich im Cadence-Aktivitäts-Thread läuft. Er ist also vorübergehend und dauert nur so lange, wie die Aktivität ausgeführt wird. Der Workflow fragt das Topic „Bestellungen bereit“ ab, bis eine einzelne Bestellung zurückgegeben wird, wodurch die Aktivität abgeschlossen wird und die Drohne mit der Auslieferung beginnt.

Dieser Ansatz hat den Vorteil, dass er einfach ist. Er birgt jedoch ein potenzielles Problem. Da der Kafka-Consumer regelmäßig erstellt und beendet wird, entsteht ein ständiger Overhead bei der Consumer-Neuverteilung (in diesem Blog finden Sie weitere Informationen zu Rebalancing-Storms). Bei bis zu 100 Drohnen gleichzeitig war dies kein praktisches Problem, aber nachdem ich für diesen Blogbeitrag einige ernsthaftere Lasttests durchgeführt hatte, musste ich feststellen, dass es genau in diesem Teil des Workflows zu erheblichen Verzögerungen kam, wobei manchmal mehrere Dutzend Sekunden zwischen der Bereitschaft einer Drohne und der Erfassung einer Bestellung durch diese Drohne verstrichen. Es war also Zeit für einen neuen und verbesserten „Matching-Service“.

(Quelle: Shutterstock)

Um die Performance zu verbessern, habe ich einen „Vermittlungsdienst“ eingeführt, ähnlich dem, was man an belebten Taxiständen an Flughäfen sieht, wo ein Vermittler die Fahrgäste und Taxis koordiniert, indem er die Fahrgäste zu bestimmten Standplätzen leitet, wo sie auf das nächste verfügbare Taxi warten. Ich gehe davon aus, dass Ride-Share-Apps heutzutage ähnlich arbeiten.

So funktioniert der neue Matching-Service. Wie bisher werden (1) lieferbereite Bestellungen durch einen Bestellungs-Workflow zum Topic „Bestellungen bereit“ hinzugefügt. Wir fügen nun ein neues Kafka-Topic hinzu – das Topic „Drohne bereit“. Wenn eine Drohne für eine Bestellung bereit ist, sendet sie nun eine Nachricht an das Topic „Drohne bereit“ unter Verwendung eines Kafka-Producers (2) – diese Nachricht enthält die Drohnen-ID – und wartet auf ein Signal. Es gibt auch (3) einen neuen permanenten Kafka-Consumer („Auftrag an Drohne zuweisen“), der aus den Topics „Bestellungen bereit“ und „Drohne bereit“ liest. Dieser Consumer läuft unabhängig und kontinuierlich außerhalb der Cadence-Workflows und verhindert so die bisherigen Neuverteilungsprobleme. Wenn der Consumer eine Nachricht von beiden Topics erhält, sendet er ein Signal mit der Bestell-ID an den richtigen Drohnen-Workflow (4), der der Drohne mitteilt, welche Bestellung sie nun ausliefern soll, und den Auslieferungsteil des Workflows in Gang setzt. Siehe das folgende aktualisierte Diagramm.

Der Cadence-Drohnen-Demonstrationscode wurde aktualisiert, und es gibt eine neue Klasse für den neuen Kafka-Consumer MatchOrderToDrone.java.

2. Experimente zur Cadence-Skalierbarkeit

2.1 Cluster-Details

Cadence ist für die skalierbare Ausführung von Workflows mit hohem Durchsatz konzipiert. Jetzt wollen wir sehen, wie viele Drohnen wir tatsächlich fliegen lassen können.  Natürlich hängt die Skalierbarkeit von der verfügbaren Hardware und anderen Infrastrukturen ab. Für diese Experimente habe ich einen betriebsbereiten, von Instaclustr verwalteten Cadence-Service (auf AWS) mit den folgenden Ressourcen bereitgestellt:

Cadence-Cluster: 3 Knoten von CAD-PRD-m5ad.large-75-Instanzen (2 VCPUs) = 6 VCPUs

Cassandra-Cluster: 9 Knoten von m5l-250-v2-Instanzen (2 VCPUs) = 18 VCPUs

Gesamt Anzahl Cluster-Kerne = 24

Ich habe auch eine EC2-Instanz gestartet, um den Code für den Cadence-Client und -Worker auszuführen (8 VCPUs). Also, insgesamt 24+8 = 32 VCPUs. Das Verhältnis der Ressourcen im gesamten End-to-End-System (Client:Cadence:Cassandra) war also 1:1:3.

Zunächst habe ich Cassandra-Cluster mit 3 und 6 Knoten ausprobiert, die aber offensichtlich einen Engpass darstellten, sodass unsere Cadence-Ops-Gurus vorschlugen, die Anzahl der Knoten auf 9 zu erhöhen (also die dreifache Anzahl der Cadence-Knoten). Das entspricht dem empfohlenen Verhältnis von Cadence- zu Cassandra-Ressourcen von 1:3.

Der Cadence-Client wurde über einen Load Balancer mit dem verwalteten Cadence-Cluster verbunden, um eine ausgewogene Auslastung der Cadence-Cluster-Knoten zu gewährleisten (auf AWS wird automatisch einer von diesen bereitgestellt).

Hier finden Sie einen Überblick über die verschiedenen Komponenten, aus denen ein komplettes Cadence-System besteht. Apache Kafka und OpenSearch sind optional, bieten aber eine „erweiterte Einsicht“ in laufende Cadence-Workflows.

2.2. Experiment 1: Mit Vollgas

Meine Cadence-Demoanwendung für Drohnenlieferungen war nicht speziell für eine Benchmark konzipiert. Daher habe ich einige Tricks angewendet, um Benchmark-Ergebnisse zu erhalten. Für meinen ersten Versuch beschloss ich, die Fluggeschwindigkeit der Drohnen auf 100 km/h anzupassen und die Drohnen mit Vollgas laufen zu lassen (d. h., schneller als in Echtzeit, ohne Wartezeiten zwischen den Ereignissen). So konnte ich in kurzer Zeit eine große Anzahl von Benchmark-Tests durchführen und die Anzahl der gleichzeitigen Drohnen-Workflows auf die maximale Clusterkapazität erhöhen. Ich habe einen „Benchmarking“-Modus zum Code hinzugefügt, der bei jeder Lieferung für identische Lieferentfernungen sorgt (die maximal zulässige Entfernung), da die Abweichungen für ein wiederholbares Benchmarking sonst möglicherweise zu groß gewesen wären. Außerdem war die Anzahl der Bestellungen zehnmal so hoch wie die Anzahl der Drohnen. Um den Durchsatz des Drohnen-Workflows zu messen, messe ich einfach die Zeit jedes Laufs (aus den Protokollen) und teile die Gesamtzahl der Bestellungen durch die Gesamtzeit, um die abgeschlossenen Bestellungen pro Sekunde zu erhalten.  Das folgende Diagramm zeigt die Anzahl der gleichzeitigen Drohnen im Vergleich zu den Bestellungen pro Sekunde. Es zeigt deutlich, dass der Durchsatz ansteigt, bei 50 gleichzeitigen Drohnen-Workflows einen Spitzenwert erreicht und dann wieder abfällt. Für das Ergebnis bei 50 Drohnen lag die CPU-Auslastung des Cadence-Clusters zwischen 53–93 % (d. h., einige Knoten waren stärker ausgelastet als andere).

Gleichzeitige Drohnen im Vergleich zu Bestellungen/s, mit einem Höchstwert von 50 Drohnen

2.3. Experiment 2: Echtzeit

Für mein zweites Experiment wollte ich herausfinden, wie viele Drohnen ich in „Echtzeit“ fliegen lassen kann, d. h., wenn sie die tatsächliche Flugzeit benötigen: maximal 30 Minuten pro Lieferung (bei 10 s Wartezeit zwischen den einzelnen Bewegungen, 60 s für Abhol- und Absetzvorgänge und zusätzlicher Zeit zum Aufladen). Für das Benchmarking dieses Experiments habe ich einen modifizierten Code verwendet, der die Anzahl der Drohnen langsam ansteigen lässt. Warum? Es gibt einen erheblichen Overhead bei der Erstellung von Cadence-Workflows. Deshalb musste ich nach der Erstellung des Workflows genügend Zeit lassen, bevor ich den Drohnen Lieferaufträge erteilte, um sicherzustellen, dass genügend Cadence-Cluster-Ressourcen für die normale Workflow-Ausführung verfügbar waren. Und das Endergebnis war …

3. 2.000 Drohnen!

Was ist schlimmer als eine Lieferdrohne, die gelegentlich über unsere Köpfe summt? Stellen Sie sich 2.000 Lieferdrohnen vor, die Sie ständig umschwirren! (NASA/New Scientist haben nachgewiesen, dass das Summen von Drohnen lästiger ist als Geräusche anderer Fahrzeuge.) Ja, wir haben es geschafft, 2.000 Lieferdrohnen gleichzeitig einzusetzen! Dazu waren eine CPU-Auslastung der Cadence-Knoten von 50–88 %, eine Cassandra-Cluster-Auslastung von 80 % sowie 6 von 8 EC2-Instanzkernen für den Client/Worker-Code nötig. Da es für jeden Drohnen-Workflow einen Bestell-Workflow gibt, lag die Gesamtzahl der gleichzeitig ausgeführten Cadence-Workflows sogar bei 4.000 – also doppelt so hoch.

Cadence ist jedoch auch in der Lage, mehr Workflows als diese auszuführen, sogar auf denselben Ressourcen. Wie? Nun, Cadence ist für lang laufende Workflows konzipiert. Meine Workflows hatten Wartezeiten zwischen 10 und 60 Sekunden (plus Zeit zum Aufladen), die Drohnen waren also nicht die ganze Zeit über beschäftigt (im Gegensatz zum ersten „Vollgas“-Experiment). In einem Durchlauf habe ich versehentlich mehr Drohnen erstellt als ich brauchte, nämlich 23.151 Stück. Nach ihrer Erstellung verbrauchten sie überhaupt keine Cadence-Ressourcen, da sie nur auf Bestellungen warteten, also untätig waren. Je nachdem, wie ausgelastet Ihre Cadence-Workflows sind, können Sie also potenziell Millionen von Daten auf einem Cluster dieser Größe ausführen.

Wie würden 2.000 Drohnen aussehen? Erinnern Sie sich an die riesige rotierende Erde bei den Olympischen Spielen 2020 in Tokio? Offenbar wurden bei der Eröffnungsfeier fast 2.000 Drohnen (1.824) dafür eingesetzt! Wir könnten diese Drohnen-Präsentation mit unserem Cadence-Cluster betreiben (und mit einem Workflow-Orchestrator eine Drohnen-Choreografie schaffen). Und wenn Sie noch mehr Drohnen einsetzen möchten, können Sie natürlich einen größeren Cadence-/Cassandra-Cluster bereitstellen (oder Knoten zu bestehenden Clustern hinzufügen).

4. Animation – zehn fliegende Drohnen

Möchten Sie unsere Drohnen fliegen sehen? Hier ist eine Animation, die eine Webseite (dronepaths.html) mit Javascript und Mapbox für die Karte verwendet. Dabei werden Daten aus einem tatsächlichen Lauf der Drohnendemonstration verwendet. Es werden nur 10 Drohnen (schwarze Markierungen) angezeigt (leider nicht 2.000), die von der Drohnenbasis (lila Markierung, in der Nähe meines Hinterhofs) ausfliegen. Die Drohnen fliegen zu den verschiedenen Auftragsorten, um die Bestellungen abzuholen (orangefarbene Markierungen), sie an den rot markierten Lieferorten abzuliefern (die nach der Lieferung grün werden) und dann zur Basis zurückzukehren und das Ganze zu wiederholen (jede Drohne hat im Durchschnitt 2 Bestellungen). Theoretisch könnte der Javascript-/Mapbox-Code so erweitert werden, dass er einen Kafka-Consumer verwendet (hier eine mögliche Lösung), um die tatsächlichen Echtzeitdaten zum Drohnenstandort zu nutzen.

Anmerkung 1: In der vorherigen Version des Drohnen-Codes habe ich einen Kafka-Consumer verwendet, der im Cadence-Drohnen-Workflow, waitForOrder()-Aktivität , läuft. Kafka-Consumer verwenden einen abfragebasierten Ansatz, um Datensätze aus Kafka-Topics zu lesen und so Skalierbarkeit zu ermöglichen. Dieser Ansatz war also eigentlich ein Beispiel für einen Abfrage-Anwendungsfall für Cadence. Wie oben erklärt, habe ich dies aus der neuesten Version des Codes entfernt. Sie können jedoch ein anderes Beispiel für einen Abfrage-Anwendungsfall in unserem Cadence Polling Cookbook finden. Die neueste Version des Drohnen-Codes ist jedoch ein gutes Beispiel für einen Cadence-Anwendungsfall für eine ereignisgesteuerte Anwendung.

Anmerkung 2: Damit bleiben uns 2 Kafka-Consumer in der aktuellen Version. Interessant ist, dass theoretisch beide Consumer im Kafka Connect-Framework als Kafka Connect Cadence sink connector implementiert und ausgeführt werden könnten, um Zuverlässigkeit und Skalierbarkeit zu gewährleisten. Der Consumer „Neuen Bestellungs-Workflow starten“ könnte in einen allgemeinen sink connector „Neuen Workflow starten“ zum Starten neuer Workflows umgewandelt werden, und der Consumer „Auftrag an Drohne zuweisen“ könnte zu einem spezielleren sink connector gemacht werden, um Signale an Workflows mit einer bestimmten Nutzlast zu senden.

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

Monitoring im großen Maßstab

In dieser Blogserie berichten wir, wie wir unser Metriksystem – hauptsächlich unseren Apache Cassandra®-Cluster mit dem Namen Instametrics – bis an die Grenzen gebracht haben und wie wir anschließend die tägliche Last verringern konnten.

Der Problemraum

Instaclustr hostet Hunderte von Clustern und betreibt Tausende von Knoten, von denen jeder alle 20 Sekunden Metriken übermittelt. Von betriebssystembezogenen Metriken wie CPU-, Laufwerks- und Speichernutzung bis zu anwendungsspezifischen Metriken wie Cassandra-Leselatenz oder Kafka®-Consumer-Lag:

Instaclustr stellt seinen Kunden diese Metriken auf unserer Metrik-API zur Abfrage bereit. Mit zunehmendem Wachstum und dem Hinzukommen weiterer Kunden und Produkte muss die zugrundeliegende Infrastruktur skaliert werden, um die wachsende Zahl von Knoten und Metriken zu unterstützen.

Unsere Monitoring-Pipeline

Wir erfassen Metriken von jedem Knoten über unsere Monitoring-Anwendung, die auf dem Knoten ausgeführt wird. Diese Anwendung ist für die periodische Erfassung verschiedener Metriken und deren Umwandlung in ein übermittelbares Standardformat verantwortlich.

Anschließend übermitteln wir die Metriken an unsere zentrale Monitoring-Infrastruktur, wo sie von unseren Monitoring-Servern verarbeitet werden. Das umfasst Operationen wie z. B.:

In einem früheren Blog hat unser Plattform-Team berichtet, wie wir die Erfassungs-Pipeline für unsere Metriken verbessert haben, um unseren Cassandra-Cluster Instametrics stark zu entlasten.  Dies wurde durch eine Kafka-Streaming-Pipeline erreicht.

Entlastung der API

Mit zunehmender Vergrößerung unseres Produktangebots und mehr Kunden und Knoten fragten wir uns, wo bei unserer Monitoring-Pipeline Verbesserungsbedarf bestehen könnte.

Unser Fokus lag dabei unter anderem auf unserer Monitoring REST API, die angesichts von immer mehr Kunden, die unsere Metriken so schnell wie möglich abrufen wollen, zunehmend Latenz aufwies. Alle 20 Sekunden werden Metriken veröffentlicht, und einige Kunden schnappen sich diese, sobald sie zur Verfügung stehen.

Um die Latenzen und an die Kunden berichtete Fehler zu verringern, suchten wir nach der Ursache für die Probleme. Nachdem wir die Kapazität der API-Server selbst verstärkt hatten, kamen wir nach einiger Analyse zu dem Schluss, dass Cassandra eine relativ aufwändige Methode (in Bezug auf Kosten und CPU-Last) ist, um die hohe Arbeitslast für frische Metriken zu bedienen. Um weiterhin alle Anforderungen an unseren Instametrics-Cluster mit der gewünschten Latenz bedienen zu können, müsste dieser weiter skaliert werden, um die Last angemessen bewältigen zu können. Wir mussten Anfragen, die von mehreren Dimensionen kamen, erhebliche Schreiblast aufgrund des Speicherns der Metriken, eine große Last aufgrund der ursprünglichen Spark-basierten Roll-up-Berechnungen der Metriken und nicht zuletzt die Leselast aufgrund der API-Anfragen bewältigen.

Unser Plattform-Team hatte bereits damit begonnen, die Berechnungen beim Erfassen der Metriken zu reduzieren, aber wir wollten zusätzlich auch die Leselast auf unserem Cassandra-Cluster senken. Idealerweise wollten wir etwas modulares haben, das wir kontinuierlich durch Hinzufügen weiterer Cluster und Produkte skalieren könnten, um unseren Cassandra-Cluster zu entlasten und die Benutzererfahrung zu verbessern.

Redis kommt ins Spiel

Für das Problem der Leselast war Redis eine offensichtliche Wahl. Unser Instaclustr for Redis-Angebot wurde entwickelt, um Unternehmen bei der Lösung genau dieser Art von Problemen zu helfen. Wir erstellten einen Plan für die Nutzung unseres Terraform Provider und erstellten einen Redis-Cluster, konfiguriert und vernetzt mit unseren Kernanwendungen, der vollständig zur Bedienung unserer Metrikanfragen bereitstand.

Die Herausforderung war nun, die Metriken dorthin zu bekommen.

Das richtige Datenmodell

Wir hatten immer damit gerechnet, dass die in Redis gespeicherten Daten etwas anders als diejenigen in Instametrics sein würden.

Unser Cassandra-Cluster speichert alle Rohmetriken 2 Wochen lang. Eine solche Datenmenge in Redis zu speichern, wäre aus Kostengründen nicht machbar. Der Grund dafür ist, dass Redis diese Informationen statt auf einem Laufwerk (wie bei Cassandra) im Arbeitsspeicher ablegt. Das ermöglicht zwar erheblich schnellere Zugriffe, ist aber auch deutlich teurer.

Zwar können Kunden Metriken abfragen, die vor 2 Stunden oder 2 Tagen erfasst wurden, aber wir wissen, dass der Großteil der API-Last von Kunden verursacht wird, die konstant die neuesten verfügbaren Metriken abfragen, um sie oftmals in ihre eigenen Monitoring-Pipelines zu übertragen. Tatsächlich brauchen wir nur etwa die letzte Minute an Daten in Redis, um den Großteil an API-Anfragen zu bedienen.

Wir wissen außerdem, dass nicht jeder Kunde die Monitoring-API verwendet – viele unserer Kunden sehen sich die Metriken einfach in unserer Konsole an. Und diejenigen, die sie selbst speichern möchten, verwenden sie möglicherweise nicht rund um die Uhr, sondern speichern sie nur in bestimmten Situationen. Indem wir nur Metriken von denjenigen Clustern zwischenspeichern, bei denen über die API ausgelesen wird, können wir unsere Datenübertragungskosten und die CPU- und Speicherauslastung verringern.

Wenn man dann noch bedenkt, dass sich die meisten unserer API-Anfragen auf die neuesten Metriken beziehen, wenden wir eine TTL (Time To Live) von 15 Minuten auf alle Redis-Datensätze an und speichern nur für diejenigen Kunden Metriken zwischen, die in der letzten Stunde die Monitoring-API verwendet haben.

Der erste Versuch

Die einfachste Lösung, die minimale Änderungen an unserem vorhandenen Stack erforderte, war die Einführung dualer Schreibvorgänge in der Monitoring-Pipeline. Zusätzlich zum Schreiben von Rohmetriken in unseren Cassandra-Cluster schreiben wir sie nun auch in unseren Redis-Cluster.

Das war nicht ganz gefahrlos, da die Monitoring-Server unter konstant hoher Last stehen. Wenn die Pipeline nicht schnell genug bereinigt wurde, entstand eine Art negativer Feedback-Schleife, die schnell zu einem Ausfall führte.

Diese Pipeline ist jedoch ziemlich gut instrumentiert, und wir können einen Anfragenstau erkennen, bevor er zu einem großen Problem wird.

Deshalb schrieben wir ein Übermittlungsprogramm für Redis-Metriken und verknüpften sie mit unserer Verarbeitungs-Engine hinter einem Feature Flag. Dies wandten wir dann auf eine kleine Gruppe der Monitoring-Server an und beobachteten, was passierte.

Der Durchschnitt der Monitoring-Boxen mit aktivierter Funktion ist blau dargestellt, und die anderen sind lila. Wie das Bild oben zeigt, schossen unsere Latenzen in die Höhe, was zu entsprechenden Warteschlangen führte. Wir beschlossen, das Experiment abzubrechen und unsere Erkenntnisse auszuwerten.

Außerdem wandten wir uns an das Instaclustr Technical Operations-Team, das sich den Redis-Cluster kurz für uns ansah und zu dem Schluss kam, dass dieser weitgehend problemlos lief. Der Engpass befand sich nicht im Redis-Cluster selbst.

Diese Grafik zeigt die CPU-Auslastung – die periodischen Spitzen sind auf AOF-Rewriting zurückzuführen, das wir in einem späteren Artikel behandeln werden. Ohne diese erreichten wir eine CPU-Last von ca. 10 %, während etwa 30 % der Gesamtmetriken aufgenommen wurden.

Insgesamt ein ganz ordentlicher erster Versuch, aber wir mussten noch nachbessern!

Versuch 2:

Wir stellten eine deutlich höhere CPU-Auslastung auf unseren Monitoring-Servern fest, als wir erwartet hatten, und suchten nach Verbesserungen. Nachdem wir ein wenig herumgesucht hatten, fanden wir eine Funktion, die moderate Kosten verursachte, wenn sie nur einmal pro Metrikerfassung aufgerufen wurde, die jedoch aufgrund des neuen Redis-Ziels noch deutlich effektiver war, wenn wir sie 2- bis 3-mal aufriefen.

Wir machten uns an die Arbeit, den Aufruf so zu konsolidieren, dass er nur einmal je Zyklus stattfand, und schalteten alles wieder ein.

Wie man sehen kann, ging die Durchschnittslatenz unserer Verarbeitungs-Engine erheblich zurück – auch wenn wir das Metrik-Übermittlungsprogramm für Redis einschalteten! Unser Redis-Cluster lief erneut problemlos. Jeder Knoten verarbeitete 90.000 Arbeitsschritte pro Sekunde bei rund 70 % CPU-Auslastung mit reichlich freiem Speicher.

Ziel erreicht! Nun war es an der Zeit, diese Metriken in der API zu verwenden.

Versuch 2: Fortsetzung

Leider ohne Erfolg.

Nachdem wir das Problem mit der Verarbeitungslatenz gelöst hatten, glaubten wir uns auf der richtigen Spur. Doch nachdem wir die Server einige Tage lang sich selbst überlassen hatten, beobachteten wir sporadische CPU-Spitzen, bei denen die Server außer Kontrolle gerieten, abstürzten und neu starteten.

Außerdem bemerkten wir einen entsprechenden Anstieg bei der Latenz im Anwendungs-Streaming – was darauf hindeutete, dass diese allmählich langsamer wurden, schließlich abstürzten und dann neu starteten.

Dies wurde zunächst bei den am stärksten belasteten Servern beobachtet, aber mit der Zeit zeigten alle von ihnen ähnliche Symptome.

Wir mussten die Übermittlung von Metriken an Redis erneut abschalten und alles noch einmal durchdenken.

Das Problem war, dass uns so langsam die Optimierungsmöglichkeiten ausgingen. Unsere Monitoring-Pipeline ist in Clojure geschrieben, und die Optionen für Kundenbibliotheken und Support sind begrenzt. Oft müssen wir auf native Java-Bibliotheken zurückgreifen, um die volle Funktionalität zu erhalten, die wir brauchen, aber das kann neue Probleme mit sich bringen.

An diesem Punkt steckten wir irgendwie fest.

Die Rettung: Kafka

Glücklicherweise waren wir nicht das einzige Team, das an einer Verbesserung der Monitoring-Pipeline arbeitete. Eines unserer anderen Teams näherte sich einer endgültigen Implementierung ihres Kafka-Metrics-Übermittlungsprogramms an.

Hier sollten die Rohmetriken zunächst an einen Kafka-Cluster und anschließend an ihr endgültiges Ziel, den Instametrics Cassandra-Cluster, gehen.

Sobald sich die Metriken in Kafka befinden, eröffnen sich uns ganz neue Möglichkeiten. Wir können es uns leisten, die Metriken etwas langsamer aufzunehmen, da wir damit am Eingang keine kritische Warteschlange mehr blockieren, wir können Consumer leichter skalieren und im Fall einer Störung beliebig im zeitlichen Verlauf der Einträge springen.

Während die anderen an einer endgültigen Stabilisierung arbeiteten, schrieben wir die zweite Version unseres Metrics-Übermittlungsservice für Redis. Dieses Mal holten wir die Metriken aus Kafka und konnten eine kleine Java-Anwendung erstellen, die auf einem vertrauten Satz von Tools und Standards basiert.

Das bestätigte unsere Wahl von Kafka als Grundlage für unsere Metrik-Pipeline. Wir sehen bereits jetzt, welche Vorteile es hat, wenn mehrere Teams mehrere Anwendungen erstellen und alle auf den gleichen Nachrichtenstrom zugreifen können.

Indem wir die oben genannten Einschränkungen rund um die Verarbeitungszeit auf unseren Monitoring-Instanzen beseitigten, konnten wir diesen Mikroservice in kürzester Zeit und mit minimalem Aufwand bereitstellen, und wir haben alle Instrumente, die zur Verwendung von jmx-Metriken und Protokoll-Tools nötig sind.

Versuch Nr. 3 Redis Writer als dedizierte Kafka Consumer-Anwendungen

Die Entwicklung einer Anwendung zum Lesen von Kafka und Schreiben auf Redis war relativ einfach, und es dauerte nicht lange, bis wir etwas in der Hand hatten, um mit dem Testen zu beginnen.

Wir setzten unsere neue Redis Writer-Anwendung in unserer Testumgebung ein und ließen das Ganze zwei Wochen lang laufen, um es auf Stabilität und Korrektheit zu prüfen. Unsere Testumgebung hat im Vergleich zur Produktionsumgebung nur eine sehr geringe Monitoring-Last. Nachdem jedoch sowohl die Redis Writer als auch der Redis-Cluster nach zwei Wochen noch immer stabil waren, entschieden wir uns, die Writer an der Produktions-Workload zu testen.

Hier zeigte sich ein weiterer Vorteil der Nutzung von Kafka als Monitoring-Infrastruktur. Das Risiko dabei war extrem gering, denn selbst wenn der Redis Writer überfordert wäre, angehalten werden müsste oder einfach fehlerhaft wäre, hätte dies für unseren Kafka-Cluster nur eine geringe Zusatzlast bedeutet – ohne dass der Rest des Systems davon betroffen gewesen wäre.

Nachdem wir die Redis Writer-Anwendungen in die Produktion gebracht hatten, wurde schnell klar, dass die Writer mit dem Datenverkehr nicht mithalten würden. Die CPU-Auslastung war am Anschlag, und der Consumer-Lag für die Redis Writer Consumer-Gruppe stieg schnell an. Der Gesamtdurchsatz war nur ein Bruchteil dessen, was die ursprüngliche Riemann-basierte Lösung erreichen konnte.

Problem 3a: Übermäßige CPU-Auslastung durch die Writer

Als Nächstes mussten wir herausfinden, warum der Redis Writer unsere Leistungserwartungen nicht erfüllen konnte. Zu diesem Zweck begannen wir mit der Profilierung durch den async-profiler, der aufdeckte, dass 72 % der CPU-Zeit mit der Durchführung linearer Suchen in Listen kürzlich aktiver Objekt-IDs verbracht wurde. Im Wesentlichen war dies der Code-Pfad, der angibt, ob Kundenmetriken in Redis gespeichert werden sollten. Und tatsächlich gingen 75 % der CPU-Auslastung in das Ermitteln, ob wir bestimmte Metriken speichern sollten, und nur 25 % wurden dafür genutzt, die Metriken tatsächlich in Redis zu speichern. Verschlimmert wurde dies noch durch die Verwendung der Java Stream API auf eine Weise, die zu einer großen Zahl an invokeinterface JVM-Anweisungen führte mit 24 % einen großen Anteil an den 72 % hatte. Bei Listen mit Tausenden von IDs besteht die Lösung in der Verwendung von Hash-Tabellen.3b: Speic

Problem 3b: Speicher des Redis Caching-Clusters voll

Während wir an Problem 3a arbeiteten, ereilte uns eine potenzielle Katastrophe. Unser Monitoring-System meldete dem Support-Team einen Ausfall des internen Redis-Caching-Clusters. Wie sich schnell herausstellte, war dies auf einen vollen Speicher des Clusters zurückzuführen. Wie konnte unser Speicher voll sein, wo wir doch weniger Daten als zuvor verarbeiteten? Und wie konnte unser Speicher voll sein, wo doch unsere Redis-Cluster mit unserer Meinung nach sinnvollen Speicherbegrenzungen belegt waren – in Verbindung mit einer LRU-Verdrängungsstrategie (Least-Recently-Used)?

Eine Analyse des neuen Redis Writer-Codes förderte einen Fehler in der TTL-basierten Ablauflogik zutage, der diese bei fast jeder halbwegs bedeutenden Datenmenge beinahe nutzlos machte. Die TTLs wurden nur alle 30 Minuten während einminütiger Intervalle angewendet, sodass die meisten Daten keine TTLs erhielten und es so zu einem unkontrollierten Anstieg der Speichernutzung kam. Dies war eine unnötige Optimierung, die wir beheben konnten, indem wir die TTLs bei jedem Schreiben von Metriken aktualisieren, was eine relativ einfache Änderung darstellte. Doch dies führte uns zu einer anderen wichtigen Frage – warum hatte der Speicherbegrenzungsmechanismus nicht funktioniert?

Die von Redis gelieferten Metriken zur Speichernutzung zeigen uns, dass die Speicherbegrenzung eingehalten wurde, zumindest so lange, bis der Cluster zu kippen begann. Was aber überraschte, waren die vom System gemeldeten umgekehrten Spitzen beim verfügbaren Speicher, die teilweise bis auf null reichten!

Wir verglichen die Zeitstempel der Spitzen mit den Redis-Protokollen und stellten fest, dass sie von einer Append-Only-Datei (AOF) verursacht wurden. Weitere Nachforschung förderte das allgemeine Problem zutage, dass die Spitzenspeichernutzung von Redis weit über dem maximalen Speicherlimit liegen konnte (redis#6646). Redis spaltet den Hauptprozess auf, um konsistente Snapshots der Datenbank zu erstellen, die für das AOF-Rewrite benötigt werden. Die Aufspaltung ist typischerweise platzsparend, da sie auf dem COW-Prinzip (Copy-On-Write) basiert. Wenn die Workload jedoch schreibintensiv ist, muss ein erheblicher Anteil der ursprünglichen Speicherseiten kopiert werden. Für diese Workload müssten wir den maximalen Speicher von Redis auf weniger als die Hälfte des Gesamtsystemspeichers begrenzen, und selbst dann müssten wir erst Tests durchführen, um sicherzustellen, dass so das Speicherproblem von Redis behoben wird.

Problem 3c: Ineffizientes Format für Metriken in Redis

Unsere Anwendung speicherte Metriken in Redis als JSON-Objekte in sortieren Sätzen. Nach einigen Durchläufen der Lösung landeten wir bei einem Vorgängermodell, das den Schlüsselnamen in jedem Wert dupliziert. Für den typischen Metrikwert nahm der begonnene Schlüsselname rund die Hälfte des Speichers ein.

Als Beispiel hier ein Schlüssel für die CPU-Last-Metrik eines Knotens:

{46e4157b-e6de-42e1-9c37-5fe5e8d1e676}/metrics/cpuUtilization

Und hier ein Wert, der in diesem Schlüssel gespeichert sein könnte:

{"service":"{46e4157b-e6de-42e1-9c37-5fe5e8d1e676}/metrics/cpuUtilization","time":1623814124123,"value":0,0}

Wenn wir alle redundanten Informationen entfernen, bleibt Folgendes übrig:

{"time":1623814124123}

Zusätzlich zum Service-Namen können wir auch den Wert entfernen, wenn dieser der Standard ist. Durch diese beiden Optimierungen können wir die Speichernutzung ungefähr halbieren.

Versuch 4: Fehlerbehebung, Optimierung und Feinabstimmung

Nachdem die Probleme gelöst waren, fiel die CPU-Nutzung ab, und der Durchsatz stieg an, aber der ständig zunehmende Consumer-Lag wurde kaum weniger. Wir verarbeiteten noch immer nicht genügend Nachrichten, um mit der Zahl an einlaufenden Ereignissen Schritt zu halten.

Die niedrige CPU-Auslastung zusammen mit dem Fehlen anderer offensichtlicher Ressourcenengpässe deutete darauf hin, dass eine Art von Thread-Konflikt vorlag. Der Redis Writer verwendet mehrere Kafka-Consumer-Threads, aber alle Threads nutzen – wie von der Lettuce-Dokumentation empfohlen – dieselbe Instanz des Lettuce Redis-Clients. Wir ignorierten die Empfehlung und probierten eine Refaktorierung des Redis Writers, sodass jeder Consumer-Thread seinen eigenen Lettuce Client erhält.

Erfolg! Unmittelbar nach der Inbetriebnahme des neuen Redis Writers verdoppelte sich der Durchsatz, und der Consumer-Lag ging zum ersten Mal zurück.

Hier ist anzumerken, dass eine höhere Last bestehen bleibt, während der Writer den Rückstand aufholt. Sobald dies abgearbeitet ist, fällt die CPU-Last deutlich auf rund 15 %. An diesem Punkt müssen wir nur noch die Redis Writer-Instanzen verkleinern, um die CPU-Nutzung zwischen dem Redis Writer und dem Redis-Cluster bestmöglich abzustimmen, wobei wir reichlich Raum für zukünftiges Wachstum lassen.

Aktivierung des Lesens von der API

Jetzt haben wir also eine Pipeline, die kontinuierlich die letzten 15 Minuten an Metriken für alle Kundenknoten erfasst, die die API kürzlich genutzt haben. Doch das alles nützt nichts, solange wir nicht unsere API auf die Abfrage von Redis erweitern!

Das letzte Stück Arbeit bestand darin, unseren API-Instanzen zu erlauben, Metriken bei Redis abzufragen.

Am Ende filtert unsere API-Logik nur zeitbasiert nach Metrikabfragen und danach, ob sie in den letzten 15 Minuten liegen – und sie fragt Redis zuerst ab. Redis ist schnell, wenn es ums Lesen geht, aber extrem schnell, wenn es darum geht zu sagen, dass kein zwischengespeicherter Wert vorhanden ist. Statt also programmatisch herauszufinden, ob eine bestimmte Metrik zwischengespeichert ist, versuchen wir es zuerst bei Redis, und wenn sie dort nicht ist, fragen wir sie bei Cassandra ab. Dieser „Fail-Fast“-Ansatz an das Abrufen von Metriken führt im schlimmsten Fall zu minimal höherer Latenz.

Der erste Einsatz der API-Funktion funktionierte ziemlich gut, und wir bemerkten eine Reduzierung der Lesevorgänge von unserem Cassandra-Cluster. Allerdings gab es weiterhin einige Randfälle, die mit einer geringen Anzahl von Metriken Probleme bereiteten und uns dazu zwangen, die Funktion abzuschalten und eine Lösung zu entwickeln. Diese abschließende Lösung wurde am 27. Oktober eingeführt.

Die erste Grafik zeigt die Verringerung der Anzahl von Anfragen, die bei unserem Instametrics Cassandra-Cluster von unserer API eingingen – woraus deutlich wird, dass wir diese Lesevorgänge fast vollständig beseitigt haben.

Diese Grafik zeigt die Zahl der Lesevorgänge, die auf unseren Redis-Cluster übertragen wurden (bitte beachten, dass diese Metrik erst am 25. Oktober eingeführt wurde).

Der interessante Teil ist, dass dies tatsächlich keine großen Auswirkungen auf unsere API-Latenzen hatte. Wir melden noch immer sehr ähnliche Latenzen von P95 (blau) und P50 (lila).

Dies lässt sich durch zwei Dinge erklären:

  1. Unser Cassandra-Cluster war zu dieser Zeit mit über 90 Knoten und i3.2xlarge sehr groß, was eine extrem schnelle lokale Speicherung umfasste. Dadurch wurden alle Leseanforderungen tatsächlich noch immer in einem vernünftigen Zeitrahmen bedient.
  2. Der Redis-Cluster ist deutlich kleiner als unser Cassandra-Cluster, und wir können noch immer einige Leistungsverbesserungen vornehmen. Eine besteht darin, von AOF-Persistence auf Diskless-Persistence zu wechseln, was die Leistung für eine große, schreibintensive Workload wie unsere verbessern würde.

An diesem Punkt besteht der Hauptvorteil unseres Cachings von Metriken mit Redis darin, wie es sich auf den Zustand unseres Cassandra-Clusters ausgewirkt hat. Als wir mit dem Redis-Caching begannen, hatten wir einen i3.2xlarge Cassandra-Cluster mit 90 Knoten. Dieser wurde Anfang September in einen i3.4xlarge Cluster mit 48 Knoten umgewandelt, um eine höhere Verarbeitungskapazität zu erreichen.

Die erste Verbesserung des Clusters waren die Kafka-basierten Roll-ups, die am 28. September eingeführt wurden, und dann das Redis-Caching etwa einen Monat später, am 27. Oktober.

Aus den nachfolgenden Grafiken geht die erhebliche Verbesserung hervor, die beide Releases auf die CPU-Auslastung, die Betriebssystemlast und die Zahl der Lesevorgänge auf dem Cassandra-Cluster hatten.

Dies ermöglichte uns schließlich Mitte November eine Verkleinerung unseres Cassandra-Clusters von einem i3.4xlarge Cluster mit 48 Knoten zu einem i3en.2xlarge Cluster mit 48 Knoten. Dies bedeutet erhebliche Einsparungen bei den Infrastrukturkosten, während der neuerdings gute Zustand unseres Clusters einschließlich der Leselatenzen erhalten bleibt.

In den letzten Monaten ist mit unseren Redis Writern alles gut gelaufen, wir mussten keinerlei größere Arbeiten durchführen, um eine stabile Caching-Pipeline aufrechtzuerhalten, und die Auswirkungen bei den Kunden sind weiterhin sehr vielversprechend.

Im nächsten Blog erklären wir, wie ein Redis-Cache uns ermöglichte, unseren neuen Prometheus Autodiscovery Endpoint aufzubauen, der es Kunden erleichtert, mit Prometheus alle verfügbaren Metriken zu erfassen.

RabbitMQ® ist eine eingetragene Marke von VMware, Inc. in den U.S.A. und anderen Ländern.
Der Originalartikel wurde am 2.März von Kuangda He auf Instaclustr.com auf Englisch veröffentlicht.

Einführung:

Nach längerer Zeit veröffentlichen wir endlich wieder eine Blogserie zu unserer Monitoring-Plattform bei Instaclustr. Beim letzten Mal stellten wir eine technische Lösung zur Unterstützung einer Plattform vor, auf der mehr als 500 Nodes laufen. Mittlerweile sprechen wir von etwa 7.000 verwalteten Instanzen.

Für diesen Anwendungsfall haben wir diverse Änderungen an der Architektur unserer internen Infrastruktur vorgenommen. Dabei mussten wir zunehmend auf die Nutzung unserer eigenen internen Dienste zurückgreifen – sie also selbst ausprobieren. Im ersten Blog dieser speziellen Serie wird beschrieben, wie wir im Entwicklungsteam von Instaclustr unsere eigene Instaclustr Managed Service Platform verwenden, um in Sachen Big Data optimale Resultate zu erzielen.

Bestehende Infrastruktur

Ursprünglich baute unsere Monitoring-Infrastruktur auf der folgenden Architektur auf:

  1. Die Knoten sendeten alle Metriken zu einem RabbitMQ® Exchange.
  2. Unter Verwendung von Riemann-Anwendungen wurden die Messgrößen analysiert und bei Problemen unser technischer Support informiert.
  3. Diese Rohmetriken wurden auch in unseren Instametrics Cassandra-Cluster geschrieben.
  4. Anschließend wurden diese Rohdaten zu 5-minütigen, stündlichen und täglichen Mindest-, Durchschnitts- und Höchstwerten „aufbereitet“.

Bei kundenseitiger Datenanforderung entweder über Konsole oder API werden diese Daten von unserem Apache Cassandra®-Cluster abgerufen und an den Kunden zurückgesendet. Diese Lösung funktioniert mit minimalem technischem Aufwand insgesamt sehr gut und ist gut skalierbar.

Die zunehmende Anzahl von Instanzen bedeutete, dass wir die Größe unseres Cassandra-Clusters deutlich erhöhen mussten – zwischenzeitlich hatten wir einen i3.2xlarge Cassandra-Cluster mit 90 Knoten und 15 Riemann-Instanzen, um etwa 7.000 Instanzen zu unterstützen, die etwa eine Million metrische Schreibvorgänge pro Sekunde abwickelten.

Uns waren jedoch einige Schwachstellen dieser aktuellen Lösung bekannt. In den vergangenen drei Jahren haben wir eine Verbesserung unserer Metrikpipeline in Angriff genommen, mit dem Ziel, durch eine bessere Darstellung der Metriken höhere Kundenzufriedenheit zu erzielen und unsere Kosten für die Infrastruktur zu senken.

Das erste Problem war die hohe Belastung unseres Cassandra-Clusters durch unsere Apache Spark™-Rollup-Aufträge. Bei der Verarbeitung einer großen Anzahl von Schreibvorgängen ist Cassandra äußerst leistungsstark, sodass eine Million Schreibvorgänge pro Sekunde keine große Belastung für den Cluster darstellten. Eine unnötige Belastung stellten jedoch die Spark-Aggregationsaufträge dar. Bei diesen Aufträgen werden alle 5 Minuten alle Rohdaten, die in den Cluster geschrieben werden, gelesen und es wird eine einfache Aggregation durchgeführt. Anschließend werden stündlich alle 5-Minuten-Aggregate ausgelesen und so aggregiert, dass die Stundenwerte unverändert bleiben. Zum Schluss werden jeden Tag alle stündlichen Aggregate gelesen und ein tägliches Aggregat erstellt.

Verwirrt? Kurz gesagt, wir lesen jede Rohgröße praktisch dreimal. Auf einem schreiborientierten System wie Cassandra führte dies zu einem erheblichen Leistungsengpass. Das bedeutet auch, dass Kunden, die unser Dashboard oder unsere Metrik-API zu Zeiten nutzten, in denen wir diese Aggregationsvorgänge durchführten, eine längere Latenzzeit bei ihren Anfragen feststellten und ihnen manchmal sogar der Fehlerstatus 429 angezeigt wurde. Grund dafür waren Anfrage-Timeouts, da Cassandra mit Leseanfragen überlastet war.

Kurzfristig war die Erweiterung der Cluster-Kapazitäten die einfachste Lösung. Tatsächlich konnten wir mit steigender Anzahl der verwalteten Knoten und der geschriebenen Rohmetriken eine lineare Leistungssteigerung feststellen. Aber wir wussten, dass wir auf lange Sicht vermutlich einige größere strukturelle Änderungen vornehmen müssen.

Neben der mangelnden Leseleistung wollten wir auch einige andere Aspekte der Plattform langfristig verbessern. Uns war es wichtig, dass unsere Monitoring-Plattform flexibel und erweiterbar ist, d. h. dass wir zusätzliche Tools hinzufügen, Metriken im Bedarfsfall neu verarbeiten, Blue-/Green-Tests mit Live-Daten durchführen und einen Puffer an Metriken bei einem möglichen Anwendungsausfall haben.

Apache Kafka „Dogfooding“

Kafka bietet genau die Möglichkeiten, nach denen wir gesucht haben. Wir nutzen Kafka als Nachrichtenwarteschlange zwischen unseren Riemann-Servern und unserem Cassandra-Cluster. Unser Kafka-Cluster speichert nun alle Rohmetriken von allen Knoten in der Flotte. Auf diese Weise lassen sich alle Funktionen einer verteilten und fehlertoleranten Nachrichtenplattform nutzen, und es besteht die Möglichkeit, mehrere Consumers denselben Metriken zuzuordnen. Einer dieser Consumers ist eine Kafka-Anwendung, die als Ersatz für die Spark-Aufgaben die Rollups für die Metriken durchführt und die Leselast des Cassandra-Clusters reduziert.

Für die Konfiguration des Clusters und die Bereitstellung unserer Verwaltungsinfrastruktur verwenden wir intern Terraform. Mit der Nutzung des Terraform Provider von Instaclustr lässt sich der Cluster bereitstellen und ihn korrekt für die Nutzung durch unsere Anwendungen konfigurieren. Alle Tools und das gesamte Know-how unseres Entwicklungsteams stehen auch allen unseren Managed Service Kafka-Kunden zur Verfügung.

Im ersten Entwicklungsschritt wurden Daten durch unsere Riemann-Anwendungen in den Cassandra-Cluster und einen Kafka-Cluster geschrieben. So konnten wir etwaige Probleme im Kafka-Schreibsystem evaluieren und gleichzeitig die Folgen für die Kunden minimieren. Da die Konfiguration für jede Anwendung einzeln möglich ist, ließen sich auch einige unserer Riemann-Anwendungen so konfigurieren, dass sie Metriken an Kafka senden. So konnten wir die Arbeitslast unseres Kafka-Clusters langsam erhöhen und prüfen, ob er die Leistungsanforderungen erfüllen kann.

Das Schreiben von Rohmetriken war relativ einfach. Es musste nur die Konfiguration des Producers geringfügig angepasst werden, wobei vor allem die Batching-Einstellungen wie Verweildauer und Batch-Größe erhöht werden mussten. Nach erfolgreicher Änderung konnten wir feststellen, dass unsere einzelne Riemann-Anwendung die Produktionslast problemlos bewältigte, ohne dass sich die Latenzzeit erhöhte.

Da wir jetzt in der Lage waren, Rohdaten an unseren Kafka-Cluster zu senden, überlegten wir, welche Größe der Cluster zur Bewältigung der Produktionslast haben sollte. Als immer mehr Datenverkehr auf den Cluster geleitet wurde, wandten wir uns an unser Kafka-Supportteam, das zentrale Leistungskennzahlen wie CPU-Auslastung, I/O Wait und Festplattennutzung auswertete. Mit ihrer Hilfe konnten wir unsere Aufbewahrungsdauer verkürzen, die Komprimierung aktivieren und die Replikationsstrategie je nach Wichtigkeit unserer Daten ändern.

Bei der Knotengröße haben wir uns für i3en.xl entschieden, da diese eine hohe Speicherleistung bieten und wir von den Preisen für reservierte Instanzen profitieren können. Wir haben 12 Kafka-Knoten und 3 dedizierte Apache ZooKeeper™ -Instanzen für unsere Produktionsumgebung. Kundenseitig initiierte Funktionen zur Größenanpassung und unser engagiertes Supportteam ermöglichten ein einfaches Arbeiten mit verschiedenen Knotengrößen.

Als sichergestellt war, dass der Kafka-Cluster und die Producer entsprechend konfiguriert waren und unsere Benachrichtigungs-Pipeline nicht beeinträchtigten, aktivierten wir die Schreibfunktion auf unseren gesamten Riemann-Servern.

In einem späteren Blog können Sie lesen, dass durch die Verwendung von Kafka und seiner Topologie mit mehreren Consumer-Gruppen unser RedisTM-Team nun in der Lage war, mit dem Caching von Metriken für unsere Monitoring-API zu beginnen.

Hinzufügung eines Cassandra Writer

Als nächstes wurde eine kleine Anwendung entwickelt, die Rohdaten aus dem Kafka-Topic ausliest und diese direkt in Cassandra schreibt.

Die Auslagerung dieser Aufgabe in eine eigene Anwendung ist zwar etwas komplexer, bringt aber auch Vorteile mit sich, da das Schreiben der Daten nun asynchron zur Verarbeitung der Daten erfolgt. So können eventuelle Ausfälle separat in unser zeitkritischeres Warnsystem übertragen werden. Bei Bedarf lässt sich die Schreiblast drosseln, damit unsere Systeme nicht überlastet werden. Außerdem können wir durch den Einsatz von Kafka-Consumer-Gruppen die Skalierbarkeit und Hochverfügbarkeit unserer Schreibanwendungen gewährleisten.

Zunächst hofften wir, einen handelsüblichen Kafka-Connect-Connector zum Schreiben in Cassandra verwenden zu können, aber wir konnten nichts finden, das die Stapelverarbeitung oder Schreiboptimierungen genauso unterstützt wie unser vorhandener Writer. Einer unserer Gründer hielt 2016 einen Vortrag auf dem Cassandra Summit über genau diese Optimierungen. Wir haben stattdessen eine sehr einfache Java-basierte Anwendung entwickelt, die von Kafka liest und in Cassandra schreibt und dabei unsere vorhandenen modellspezifischen Datenbankoptimierungen nutzt.

Erst als wir mit der Leistung unserer Writer-Anwendung zufrieden waren, deaktivierten wir das direkte Schreiben von Riemann in Cassandra und verließen uns auf unseren Kafka-basierten Cassandra-Writer.

Zu diesem Zeitpunkt sah unsere Architektur so aus:

Kafka-basierte Rollups

Der nächste Schritt war die Verwendung von Kafka zur Umstrukturierung unseres Metrik-Rollup-Systems. Durch die korrekte Partitionierung der Metriken in Kafka war es möglich, kleinere Kafka-Streams-Anwendungen zu entwickeln, die alle Aggregationen durchführen, ohne in Cassandra zu lesen.

Diese Anwendungen aggregieren Kennzahlen und speichern die Durchschnitts-, Minimal- und Maximalwerte für jeweils fünf Minuten, eine Stunde und einen Tag. Ursprünglich hatten wir uns für das Kafka-Streams-Framework entschieden, da es für derartige Arbeitslasten ausgelegt zu sein schien. Eine Anwendung in unserer Testumgebung einzurichten und auszuführen war recht einfach. Die eigentliche Schwierigkeit lag in der Skalierung der Anwendung zur Bewältigung der Produktionslast.

Mit zunehmender Belastung der Streams-Anwendung wurde die Verbindung zum Broker unterbrochen und es kam zu einer Schleife aus Wiederverbinden, Wiederherstellen und Trennen der Verbindung. Es schien keinen Unterschied zu machen, egal wie sehr man die Parameter der Consumer veränderte. Nach einigen Profilerhebungen und Versuchen wurde deutlich, dass die Anwendung nicht mehr genügend Speicherplatz hatte. Wenn der Speicherbedarf höher wurde, verfehlten die Consumer den Heartbeat, schieden aus dem Broker aus, gaben Speicher frei, stellten die Verbindung wieder her und begannen den Zyklus erneut.

Wir hofften zunächst, dass diese Systeme extrem ressourcenschonend arbeiten würden, doch wir mussten mehrere R5.4xl mit jeweils 128 GB RAM einsetzen, damit die Aggregationsanwendung unter unserer Produktionslast überhaupt laufen konnte. Es gelang uns, die Parameter der Rocks DB weiter anzupassen. So ließ sich die freie Nutzung des Off-Heap-Speichers verhindern, und die Anwendung lief endlich stabil.

Wir wollten Spark-Aufträge lieber heute als morgen abschalten, um die Belastung des Cassandra-Clusters zu verringern. Deshalb haben wir diese Anwendungsversion in der Produktion eingesetzt. Der Plan war, die Anwendung später zu überarbeiten und zu optimieren, um sie auf kleineren Instanzen ausführen zu können.

Die zusätzliche Bereitstellung spezieller Aggregationsinstanzen und des Kafka-Clusters selbst erhöht zwar die Gesamtkomplexität, bedeutet aber, dass wir dadurch den Cassandra-Cluster von einer großen Rechenlast befreien konnten. Diese Änderung hatte zur Folge, dass die CPU-Auslastung von Cassandra um 30 % sank und die Latenz für synthetische Cassandra-Transaktionen auf ein ⅓ des ursprünglichen Wertes zurückging – eine Halbierung der P99-Latenz von Cassandra. Viel wichtiger ist allerdings die Reduzierung der API-Latenzzeiten für Kunden und die deutliche Verringerung der Zahl der an Kunden zurückgesandten Statusbenachrichtigungen vom Typ 429. Die Änderung erfolgte am 28. September.

Durch die Verwendung von Kafka-Consumer-Gruppen können wir weiterhin unsere Aggregationsanwendung bzw. unsere Writer-Anwendung skalieren, und zwar unabhängig von allen anderen Diensten. Das bedeutet auch, dass etwaige Verzögerungen in der Schreib-Pipeline unsere Pipeline für Problemmeldungen und -lösungen nicht ausbremsen!

Überarbeitung der Cassandra Writers

Bei der Ausführung der Rollup-Anwendung trat ein Problem bei der Implementierung unseres Cassandra-Writers auf. Er konnte die schnelle Ausführung der Rollup-Anwendung nicht verarbeiten und es kam beim Schreiben in Cassandra zu Zeitüberschreitungen. Wir haben uns entschlossen, die Writer-Anwendung durch eine Begrenzung der Ausgaberate zu ergänzen, damit die Schreiblast konstanter wird.

Bei der Implementierung dieser Änderung in der Writer-App entschieden wir uns auch, mit Batchverarbeitungen zu experimentieren. Unsere ursprüngliche Implementierung verwendete Tokenaware-Batches. Im Normalfall sind partitionierte Batches in Cassandra sehr kostspielig – wenn Sie Partitionen kombinieren, die zu jedem Knoten gehören, müssen ALLE Knoten koordiniert werden, um diesen Batch zu verarbeiten. Dieses Problem verschlimmert sich je größer der Cluster wird.

Die Lösung besteht darin, Partitionen, die einem einzigen Satz von Replikaten (vnode) angehören, zu einem Stapel zusammenzufassen, so dass der gesamte Stapel nur von dem Koordinator und seinem Replikat bearbeitet werden kann. In den meisten allgemeinen Tipps zu Cassandra wird zwar erklärt, dass dieser Schritt nicht notwendig ist. Für unseren Anwendungsfall ist er jedoch extrem wichtig, sowohl für die CPU-Auslastung des Clients als auch des Clusters.

Das Batching wirkte sich auch deutlich auf die Auslastung des Cassandra-Clusters aus.

Schließlich gelang es uns, unsere Produktionsinfrastruktur mit 3 m5.2xlarge Cassandra-Writer-Instanzen auszuführen.

Im Folgenden finden Sie ein einfaches Beispiel für Micro-Batching mit dem Cassandra Java-Treiber 4. Sie können es gerne selbst ausprobieren:

public void accept(final BoundStatement bound) {

final TokenMap tokenMap = session.getMetadata().getTokenMap().get();

final Set<Node> replicas =

       tokenMap.getReplicas(bound.getRoutingKeyspace(), bound.getRoutingKey());

final List<BatchableStatement<?>> statements =

batches.computeIfAbsent(replicas, t -> new ArrayList<>());

statements.add(bound);

if (statements.size() >= batchSize) {

final BatchStatement batch = BatchStatement.builder(BatchType.UNLOGGED)

.addStatements(statements)

.build();

session.executeAsync(batch);

batch.clear();

}

}

Überarbeitung des Rollups

Nachdem nun alles funktionierte, wurde es Zeit, die Rollup-Anwendung zu überarbeiten, um die Anforderungen an Skalierbarkeit, Instanzgröße und einige andere Aspekte zu optimieren.

Nach längerem Ausprobieren implementierten wir die Funktionalität schließlich als reinen Kafka-Consumer, also ohne Verwendung des Kafka-Streams-Frameworks. Durch die Zusammenführung von mehreren Streaming-Stufen in eine einzige ließ sich der Speicherbedarf erheblich reduzieren. Anstatt den Zwischenzustand in ein Kafka-Topic zu übertragen, wie es das Streams-Framework automatisch tut, werden die Daten im Falle eines Neustarts der Anwendung vom Quell-Topic wiedergegeben.

Diese Entscheidung war zwar ein Kompromiss, den wir unter diesen Umständen aber gerne in Kauf nahmen. Da wir davon ausgehen, dass Neustarts nur selten vorkommen, lässt sich der nötige Aufwand für die Speicherung des Zwischenzustands während des Normalbetriebs senken. Im Falle eines Anwendungsausfalls müssen wir dann allerdings mehr Ereignisse wiederholen, um wieder auf die Beine zu kommen.

So wird nicht nur die Belastung der Anwendungsinstanzen verringert,

sondern auch für den Kafka-Cluster selbst.

Nächste Schritte und Fazit

Eigentlich hatten wir uns vorgenommen, die Auslastung unseres internen Cassandra-Clusters zu reduzieren. Wir wollten vor allem die Effizienz der Arbeitsabläufe verbessern und gleichzeitig unsere Flexibilität und Erweiterbarkeit erhöhen.

Wie Sie sehen, konnten wir die Auslastung unseres internen Cassandra-Clusters erheblich reduzieren. Gleichzeitig verringern sich die Latenzzeiten und die Fehlerraten für Kunden, die ihre Daten über unsere API abfragen.

In den meisten Anwendungsfällen müssten wir dann sofort mit der Verkleinerung unseres Clusters beginnen, um deutliche Kosteneinsparungen zu erzielen. Wir werden den Cluster jedoch nicht sofort verkleinern, da wir in Kürze weitere Metrik-Anwendungen veröffentlichen und unseren deutlichen Vorsprung bei eventuellen Problemen wahren wollen.

In den kommenden Blogbeiträgen werden wir darüber hinaus aufzeigen, wie wichtig Kafka für die Flexibilität und Erweiterbarkeit unserer gesamten Metrik-Pipeline inzwischen geworden ist. Die erfolgreiche Eliminierung der Spark-Workloads aus unserer Metrik-Pipeline ist bereits zu erkennen.

Bleiben Sie am Ball, und erfahren Sie in Kürze, wie wir durch den Einsatz von Redis unsere Monitoring-API für die Latenzzeit von Kundenanfragen deutlich verbessern und die Leistung unseres Cassandra-Clusters weiter deutlich steigern konnten.

RabbitMQ® ist eine Marke von VMware, Inc. in den USA und anderen Ländern.
Der Originalartikel wurde auf Englisch auf https://www.instaclustr.com/blog/the-introduction-of-apache-kafka-infrastructure/ veröffentlicht und ist geschrieben von Greg Raevski.