cadence Archives - credativ®

In diesem nächsten Teil der Blog-Reihe über die Cadence-Drohnen-Demonstrationsanwendung stellen wir ein neues und verbessertes Cadence+Kafka-Integrationsmuster vor. Wir haben die Grenzen der Skalierbarkeit von Cadence getestet, um herauszufinden, wie viele Drohnen wir gleichzeitig fliegen lassen können.

1. Neu und verbessert: ein Matching-Service für Drohnenaufträge

In New York kann es schwierig sein, ein Taxi zu bekommen.

In New York kann es schwierig sein ein Taxi zu bekommen. – (Quelle: Shutterstock)

Wenn ich sehe, dass Produkte als „neu und verbessert“ angepriesen werden, frage ich mich oft, was vorher nicht mit ihnen in Ordnung war (Warum muss etwas verbessert werden, was nicht fehlerhaft war?). Angelehnt an diese Theorie hat sich herausgestellt, dass mein bisheriger Cadence-Anwendungscode für die Demo für Drohnenlieferungen „fehlerhaft“ war.

In einem vorherigen Blogbeitrag (Abschnitt 4: Drohne erhält nächste Bestellung zur Lieferung) haben wir eines der Cadence+Kafka-Integrationsmuster erläutert, das es Drohnen ermöglichen soll, eine Bestellung abzuholen, die zur Auslieferung bereit ist. Wir haben einen einfachen Warteschlangenansatz mit einem einzelnen Kafka-Consumer verwendet, der wie folgt eng mit jedem Drohnen-Workflow gekoppelt war. Der Drohnen-Workflow umfasst eine Aktivität waitForOrder(). Diese umschließt einen Kafka-Consumer, der tatsächlich im Cadence-Aktivitäts-Thread läuft. Er ist also vorübergehend und dauert nur so lange, wie die Aktivität ausgeführt wird. Der Workflow fragt das Topic „Bestellungen bereit“ ab, bis eine einzelne Bestellung zurückgegeben wird, wodurch die Aktivität abgeschlossen wird und die Drohne mit der Auslieferung beginnt.

Dieser Ansatz hat den Vorteil, dass er einfach ist. Er birgt jedoch ein potenzielles Problem. Da der Kafka-Consumer regelmäßig erstellt und beendet wird, entsteht ein ständiger Overhead bei der Consumer-Neuverteilung (in diesem Blog finden Sie weitere Informationen zu Rebalancing-Storms). Bei bis zu 100 Drohnen gleichzeitig war dies kein praktisches Problem, aber nachdem ich für diesen Blogbeitrag einige ernsthaftere Lasttests durchgeführt hatte, musste ich feststellen, dass es genau in diesem Teil des Workflows zu erheblichen Verzögerungen kam, wobei manchmal mehrere Dutzend Sekunden zwischen der Bereitschaft einer Drohne und der Erfassung einer Bestellung durch diese Drohne verstrichen. Es war also Zeit für einen neuen und verbesserten „Matching-Service“.

(Quelle: Shutterstock)

Um die Performance zu verbessern, habe ich einen „Vermittlungsdienst“ eingeführt, ähnlich dem, was man an belebten Taxiständen an Flughäfen sieht, wo ein Vermittler die Fahrgäste und Taxis koordiniert, indem er die Fahrgäste zu bestimmten Standplätzen leitet, wo sie auf das nächste verfügbare Taxi warten. Ich gehe davon aus, dass Ride-Share-Apps heutzutage ähnlich arbeiten.

So funktioniert der neue Matching-Service. Wie bisher werden (1) lieferbereite Bestellungen durch einen Bestellungs-Workflow zum Topic „Bestellungen bereit“ hinzugefügt. Wir fügen nun ein neues Kafka-Topic hinzu – das Topic „Drohne bereit“. Wenn eine Drohne für eine Bestellung bereit ist, sendet sie nun eine Nachricht an das Topic „Drohne bereit“ unter Verwendung eines Kafka-Producers (2) – diese Nachricht enthält die Drohnen-ID – und wartet auf ein Signal. Es gibt auch (3) einen neuen permanenten Kafka-Consumer („Auftrag an Drohne zuweisen“), der aus den Topics „Bestellungen bereit“ und „Drohne bereit“ liest. Dieser Consumer läuft unabhängig und kontinuierlich außerhalb der Cadence-Workflows und verhindert so die bisherigen Neuverteilungsprobleme. Wenn der Consumer eine Nachricht von beiden Topics erhält, sendet er ein Signal mit der Bestell-ID an den richtigen Drohnen-Workflow (4), der der Drohne mitteilt, welche Bestellung sie nun ausliefern soll, und den Auslieferungsteil des Workflows in Gang setzt. Siehe das folgende aktualisierte Diagramm.

Der Cadence-Drohnen-Demonstrationscode wurde aktualisiert, und es gibt eine neue Klasse für den neuen Kafka-Consumer MatchOrderToDrone.java.

2. Experimente zur Cadence-Skalierbarkeit

2.1 Cluster-Details

Cadence ist für die skalierbare Ausführung von Workflows mit hohem Durchsatz konzipiert. Jetzt wollen wir sehen, wie viele Drohnen wir tatsächlich fliegen lassen können.  Natürlich hängt die Skalierbarkeit von der verfügbaren Hardware und anderen Infrastrukturen ab. Für diese Experimente habe ich einen betriebsbereiten, von Instaclustr verwalteten Cadence-Service (auf AWS) mit den folgenden Ressourcen bereitgestellt:

Cadence-Cluster: 3 Knoten von CAD-PRD-m5ad.large-75-Instanzen (2 VCPUs) = 6 VCPUs

Cassandra-Cluster: 9 Knoten von m5l-250-v2-Instanzen (2 VCPUs) = 18 VCPUs

Gesamt Anzahl Cluster-Kerne = 24

Ich habe auch eine EC2-Instanz gestartet, um den Code für den Cadence-Client und -Worker auszuführen (8 VCPUs). Also, insgesamt 24+8 = 32 VCPUs. Das Verhältnis der Ressourcen im gesamten End-to-End-System (Client:Cadence:Cassandra) war also 1:1:3.

Zunächst habe ich Cassandra-Cluster mit 3 und 6 Knoten ausprobiert, die aber offensichtlich einen Engpass darstellten, sodass unsere Cadence-Ops-Gurus vorschlugen, die Anzahl der Knoten auf 9 zu erhöhen (also die dreifache Anzahl der Cadence-Knoten). Das entspricht dem empfohlenen Verhältnis von Cadence- zu Cassandra-Ressourcen von 1:3.

Der Cadence-Client wurde über einen Load Balancer mit dem verwalteten Cadence-Cluster verbunden, um eine ausgewogene Auslastung der Cadence-Cluster-Knoten zu gewährleisten (auf AWS wird automatisch einer von diesen bereitgestellt).

Hier finden Sie einen Überblick über die verschiedenen Komponenten, aus denen ein komplettes Cadence-System besteht. Apache Kafka und OpenSearch sind optional, bieten aber eine „erweiterte Einsicht“ in laufende Cadence-Workflows.

2.2. Experiment 1: Mit Vollgas

Meine Cadence-Demoanwendung für Drohnenlieferungen war nicht speziell für eine Benchmark konzipiert. Daher habe ich einige Tricks angewendet, um Benchmark-Ergebnisse zu erhalten. Für meinen ersten Versuch beschloss ich, die Fluggeschwindigkeit der Drohnen auf 100 km/h anzupassen und die Drohnen mit Vollgas laufen zu lassen (d. h., schneller als in Echtzeit, ohne Wartezeiten zwischen den Ereignissen). So konnte ich in kurzer Zeit eine große Anzahl von Benchmark-Tests durchführen und die Anzahl der gleichzeitigen Drohnen-Workflows auf die maximale Clusterkapazität erhöhen. Ich habe einen „Benchmarking“-Modus zum Code hinzugefügt, der bei jeder Lieferung für identische Lieferentfernungen sorgt (die maximal zulässige Entfernung), da die Abweichungen für ein wiederholbares Benchmarking sonst möglicherweise zu groß gewesen wären. Außerdem war die Anzahl der Bestellungen zehnmal so hoch wie die Anzahl der Drohnen. Um den Durchsatz des Drohnen-Workflows zu messen, messe ich einfach die Zeit jedes Laufs (aus den Protokollen) und teile die Gesamtzahl der Bestellungen durch die Gesamtzeit, um die abgeschlossenen Bestellungen pro Sekunde zu erhalten.  Das folgende Diagramm zeigt die Anzahl der gleichzeitigen Drohnen im Vergleich zu den Bestellungen pro Sekunde. Es zeigt deutlich, dass der Durchsatz ansteigt, bei 50 gleichzeitigen Drohnen-Workflows einen Spitzenwert erreicht und dann wieder abfällt. Für das Ergebnis bei 50 Drohnen lag die CPU-Auslastung des Cadence-Clusters zwischen 53–93 % (d. h., einige Knoten waren stärker ausgelastet als andere).

Gleichzeitige Drohnen im Vergleich zu Bestellungen/s, mit einem Höchstwert von 50 Drohnen

2.3. Experiment 2: Echtzeit

Für mein zweites Experiment wollte ich herausfinden, wie viele Drohnen ich in „Echtzeit“ fliegen lassen kann, d. h., wenn sie die tatsächliche Flugzeit benötigen: maximal 30 Minuten pro Lieferung (bei 10 s Wartezeit zwischen den einzelnen Bewegungen, 60 s für Abhol- und Absetzvorgänge und zusätzlicher Zeit zum Aufladen). Für das Benchmarking dieses Experiments habe ich einen modifizierten Code verwendet, der die Anzahl der Drohnen langsam ansteigen lässt. Warum? Es gibt einen erheblichen Overhead bei der Erstellung von Cadence-Workflows. Deshalb musste ich nach der Erstellung des Workflows genügend Zeit lassen, bevor ich den Drohnen Lieferaufträge erteilte, um sicherzustellen, dass genügend Cadence-Cluster-Ressourcen für die normale Workflow-Ausführung verfügbar waren. Und das Endergebnis war …

3. 2.000 Drohnen!

Was ist schlimmer als eine Lieferdrohne, die gelegentlich über unsere Köpfe summt? Stellen Sie sich 2.000 Lieferdrohnen vor, die Sie ständig umschwirren! (NASA/New Scientist haben nachgewiesen, dass das Summen von Drohnen lästiger ist als Geräusche anderer Fahrzeuge.) Ja, wir haben es geschafft, 2.000 Lieferdrohnen gleichzeitig einzusetzen! Dazu waren eine CPU-Auslastung der Cadence-Knoten von 50–88 %, eine Cassandra-Cluster-Auslastung von 80 % sowie 6 von 8 EC2-Instanzkernen für den Client/Worker-Code nötig. Da es für jeden Drohnen-Workflow einen Bestell-Workflow gibt, lag die Gesamtzahl der gleichzeitig ausgeführten Cadence-Workflows sogar bei 4.000 – also doppelt so hoch.

Cadence ist jedoch auch in der Lage, mehr Workflows als diese auszuführen, sogar auf denselben Ressourcen. Wie? Nun, Cadence ist für lang laufende Workflows konzipiert. Meine Workflows hatten Wartezeiten zwischen 10 und 60 Sekunden (plus Zeit zum Aufladen), die Drohnen waren also nicht die ganze Zeit über beschäftigt (im Gegensatz zum ersten „Vollgas“-Experiment). In einem Durchlauf habe ich versehentlich mehr Drohnen erstellt als ich brauchte, nämlich 23.151 Stück. Nach ihrer Erstellung verbrauchten sie überhaupt keine Cadence-Ressourcen, da sie nur auf Bestellungen warteten, also untätig waren. Je nachdem, wie ausgelastet Ihre Cadence-Workflows sind, können Sie also potenziell Millionen von Daten auf einem Cluster dieser Größe ausführen.

Wie würden 2.000 Drohnen aussehen? Erinnern Sie sich an die riesige rotierende Erde bei den Olympischen Spielen 2020 in Tokio? Offenbar wurden bei der Eröffnungsfeier fast 2.000 Drohnen (1.824) dafür eingesetzt! Wir könnten diese Drohnen-Präsentation mit unserem Cadence-Cluster betreiben (und mit einem Workflow-Orchestrator eine Drohnen-Choreografie schaffen). Und wenn Sie noch mehr Drohnen einsetzen möchten, können Sie natürlich einen größeren Cadence-/Cassandra-Cluster bereitstellen (oder Knoten zu bestehenden Clustern hinzufügen).

4. Animation – zehn fliegende Drohnen

Möchten Sie unsere Drohnen fliegen sehen? Hier ist eine Animation, die eine Webseite (dronepaths.html) mit Javascript und Mapbox für die Karte verwendet. Dabei werden Daten aus einem tatsächlichen Lauf der Drohnendemonstration verwendet. Es werden nur 10 Drohnen (schwarze Markierungen) angezeigt (leider nicht 2.000), die von der Drohnenbasis (lila Markierung, in der Nähe meines Hinterhofs) ausfliegen. Die Drohnen fliegen zu den verschiedenen Auftragsorten, um die Bestellungen abzuholen (orangefarbene Markierungen), sie an den rot markierten Lieferorten abzuliefern (die nach der Lieferung grün werden) und dann zur Basis zurückzukehren und das Ganze zu wiederholen (jede Drohne hat im Durchschnitt 2 Bestellungen). Theoretisch könnte der Javascript-/Mapbox-Code so erweitert werden, dass er einen Kafka-Consumer verwendet (hier eine mögliche Lösung), um die tatsächlichen Echtzeitdaten zum Drohnenstandort zu nutzen.

Anmerkung 1: In der vorherigen Version des Drohnen-Codes habe ich einen Kafka-Consumer verwendet, der im Cadence-Drohnen-Workflow, waitForOrder()-Aktivität , läuft. Kafka-Consumer verwenden einen abfragebasierten Ansatz, um Datensätze aus Kafka-Topics zu lesen und so Skalierbarkeit zu ermöglichen. Dieser Ansatz war also eigentlich ein Beispiel für einen Abfrage-Anwendungsfall für Cadence. Wie oben erklärt, habe ich dies aus der neuesten Version des Codes entfernt. Sie können jedoch ein anderes Beispiel für einen Abfrage-Anwendungsfall in unserem Cadence Polling Cookbook finden. Die neueste Version des Drohnen-Codes ist jedoch ein gutes Beispiel für einen Cadence-Anwendungsfall für eine ereignisgesteuerte Anwendung.

Anmerkung 2: Damit bleiben uns 2 Kafka-Consumer in der aktuellen Version. Interessant ist, dass theoretisch beide Consumer im Kafka Connect-Framework als Kafka Connect Cadence sink connector implementiert und ausgeführt werden könnten, um Zuverlässigkeit und Skalierbarkeit zu gewährleisten. Der Consumer „Neuen Bestellungs-Workflow starten“ könnte in einen allgemeinen sink connector „Neuen Workflow starten“ zum Starten neuer Workflows umgewandelt werden, und der Consumer „Auftrag an Drohne zuweisen“ könnte zu einem spezielleren sink connector gemacht werden, um Signale an Workflows mit einer bestimmten Nutzlast zu senden.

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

In diesem Blog-Beitrag fahren wir mit unserer Anwendung der Cadence Drohnenlieferung fort, inklusive einer Zusammenfassung des vollständigen Workflows und der Cadence+Kafka Integrationsmuster. Außerdem werden wir uns einige neue Cadence-Funktionen ansehen, die wir verwendet haben (Retry, Continue, Abfragen, Nebeneffekte), und wir werden eine beispielhafte Verfolgung einer Drohnenlieferung enthüllen.

1. Bewegung und Lieferung

(Quelle: Shutterstock)

Die Details der Drohnenbewegung wurden im vorherigen Blog-Beitrag erläutert, doch zusammenfassend lässt sich Folgendes festhalten: Der Drohnen-Workflow ist für die „Bewegung“ vom Stützpunkt zum Bestellstandort, für die Abholung der Bestellung, für das Fliegen zum Lieferstandort, für das Ablegen der Bestellung und das Zurückkehren zum Stützpunkt verantwortlich. Dies geschieht mit der Activity nextLeg() und wenn die Bestellung abgeholt, aber noch nicht geliefert wurde, werden Signale der Standortstatusänderung an den Bestellungs-Workflow (6) gesendet. Eine interessante Erweiterung könnte die Aktualisierung einer ETA im Bestellungs-Workflow sein. Nachdem die Drohne zum Stützpunkt zurückgekehrt ist, markiert sie die Lieferung als „abgehakt“ und signalisiert dem Bestellungs-Workflow, dass er abgeschlossen ist. Das führt dazu, dass der Bestellungs-Workflow abgeschlossen wird (7) und der Drohnen-Workflow eine neue Instanz von sich selbst startet (8).

2. Zusammenfassung der Cadence+Kafka Integrationsmuster

Nun haben wir einige Beispiel für verschiedene Möglichkeiten gesehen, wie Cadence und Kafka integriert werden können, einige davon in unserem früheren Blog-Beitrag. Wir fassen sie hier für ein leichteres Verständnis in einer Tabelle zusammen.

Musternummer Mustername Richtung Zweck Merkmale Beispiele
1 Senden einer Nachricht an Kafka Cadence → Kafka Senden einer einzelnen Nachricht an Kafka-Thema, simple Notification (no response) Umschließt einen Kafka-Producer in einer Cadence-Activity Blog 2, Blog 4
2 Anfrage/    Antwort von Cadence an Kafka Cadence → Kafka → Cadence Wiederverwenden des Kafka-Microservice von Cadence, Anfragen und Antworten an Kafka Umschließt einen Kafka-Producer in einer Cadence-Activity, um eine Nachricht und Header-Metadaten als Antwort zu senden; Kafka-Consumer verwendet Metadaten, um dem Cadence-Workflow oder der Activity mit dem Ergebnis zu signalisieren, dass fortgefahren werden kann Blog 2
3 Starten eines neuen Cadence-Workflows von Kafka Kafka → Cadence Starten des Cadence-Workflows vom Kafka-Consumer als Reaktion auf das Empfangen eines neuen Datensatzes Kafka-Consumer startet das Ausführen der Cadence-Workflow-Instanz, eine Instanz pro empfangenem Datensatz Blog 4
4 Erhalten des nächsten Jobs aus einer Warteschlange Cadence → Kafka → Cadence Jede Workflow-Instanz erhält einen einzelnen Job von einem Kafka-Thema zum Verarbeiten Die Cadence-Activity umschließt einen Kafka-Consumer, der das Kafka-Thema kontinuierlich abruft, nur einen einzelnen Datensatz zurückgibt, transient für die Dauer der Activity Blog 4

3. Neue verwendete Cadence-Merkmale

Wir haben einige neue Funktionen in die Demo-Anwendung der Drohnenlieferung eingebaut, die ich in diesem Blog-Beitrag näher beleuchten möchte.

3.1 Cadence-Retry

(Quelle: Shutterstock)

Cadence-Activities und -Workflows können aus unterschiedlichen Gründen fehlschlagen, unter anderem wegen Zeitüberschreitungen. Retries können manuell verarbeitet werden, oder Sie lassen den Cadence-Server den Retry handhaben, was wesentlich einfacher ist. Sie finden die Cadence-Dokumentation zu Retries hier und hier. Für meine Drohnenlieferung-Anwendung habe ich festgelegt, dass Retries für meine Activities verwendet werden sollen. Der Grund dafür ist, dass sie das Potenzial haben, „lange“ zu laufen, und wenn sie fehlschlagen, können sie wieder aufgenommen und dort fortgesetzt werden, wo sie abgebrochen wurden. Die waitForOrder()-Activity ist eine blockierende Activity, die erst dann zurückkehrt, wenn sie einen Auftrag erhalten hat, der geliefert werden muss, sodass sie potenziell eine unbestimmte Zeit dauern kann. Außerdem ist sie unabhängig, was bedeutet, dass sie problemlos mehrmals aufgerufen werden kann (wie wir oben gesehen haben, handelt es sich tatsächlich um einen Kafka-Consumer, der ein Thema abruft). Die nextLeg()-Activity ist dafür verantwortlich, den Weg der Drohne von einem Ort zum anderen aufzuzeichnen, was viele Minuten in Anspruch nehmen kann. Wenn sie fehlschlägt, wollen wir, dass sie wiederaufgenommen wird, aber nicht vom aktuellen Standort (die neueste Version verwendet jetzt eine Abfragemethode, um den aktuellen Drohnenstandort zu Beginn der Activity abzurufen). Die einfachste Möglichkeit, die Optionen für einen Retry festzulegen, ist eine @MethodRetry-Annotation im Activity-Interface wie in diesem Beispiel. Die Einstellungen einschließlich Zeiten richtig abzurufen, kann jedoch eine Herausforderung sein, und ich habe diese nur „geraten“:

@MethodRetry(maximumAttempts = 3, initialIntervalSeconds = 1, expirationSeconds = 300, maximumIntervalSeconds = 30)
    	void nextLeg(LatLon start, LatLon end, boolean updateOrderLocation, String orderID);

3.2 Cadence „Continue“

Sie könnten versucht sein, einen Cadence-Workflow ewig laufen zu lassen – schließlich sind sie für lang laufende Prozesse konzipiert. Dies wird jedoch im Allgemeinen als „schlechte Praxis“ angesehen, da die Größe des Workflow-Status immer weiter ansteigt, möglicherweise die maximale Statusgröße überschreitet und die Neuberechnung des aktuellen Status von Workflows aus dem Statusverlauf verlangsamt. Ich habe folgende einfache Lösung verwendet: Am Ende des vorhandenen Workflows wird mit Workflow.continueAsNew() ein neuer Drohnen-Workflow gestartet. Dadurch beginnt eine neue Workflow-Instanz mit derselben Workflow-ID, aber nicht mit demselben Status wie der ursprüngliche Workflow. Das bedeutet, dass Drohnen-Workflows am Stützpunkt beginnen, voll aufgeladen sind und noch keine Bestellung zum Abholen haben.

3.3 Cadence-Abfragen

Im 2. Cadence-Blog-Beitrag haben wir gezeigt, dass Workflows eine Schnittstellenklasse definieren müssen und dass die Schnittstellenmethoden Annotationen haben können, einschließlich @WorkflowMethod (der Einstiegspunkt in einen Workflow, exakt eine Methode muss diese Annotation haben) und @SignalMethod (eine Methode, die auf externe Signale reagiert, null oder mehr Methoden können diese Annotation haben).

Es gibt jedoch eine weitere Annotation, die wir nicht verwendet haben: @QueryMethod. Diese Annotation zeigt eine Methode an, die auf synchrone Abfrageanfragen reagiert, und von denen kann es viele geben. Sie dienen dazu, anderen Workflows usw. den Status anzuzeigen, sodass es sich im Grunde um eine „Getter“-Methode handelt.  Als Beispiel ist hier eine vollständige Liste von Methoden, einschließlich einiger Abfragemethoden für den OrderWorkflow:

public interface OrderWorkflow {
@WorkflowMethod(executionStartToCloseTimeoutSeconds = (int)maxFlightTime, taskList = orderActivityName)
String startWorkflow(String name);
@SignalMethod
void updateGPSLocation(LatLon l);
@SignalMethod
void signalOrder(String msg);
@SignalMethod
void updateLocation(String loc);
@QueryMethod
String getState();
@QueryMethod
LatLon getOrderLocation();
@QueryMethod
LatLon getDeliveryLocation();
    }

Abfragen müssen schreibgeschützt und nicht blockierend sein. Ein wesentlicher Unterschied zwischen Signalen und Abfragen, der mir aufgefallen ist, besteht darin, dass Signale nur bei nicht abgeschlossenen Workflows funktionieren, während Abfragen sowohl bei nicht abgeschlossenen als auch bei abgeschlossenen Workflows funktionieren. Bei abgeschlossenen Workflows werden die Workflows automatisch neu gestartet, sodass ihr Endzustand wiederhergestellt werden kann. Das ist clever!

3.4 Cadence-Nebeneffekte

Der Cadence-Workflow-Code muss im Allgemeinen deterministisch sein (Quelle: Shutterstock)

Ein letzter Trick, den ich bei dieser Anwendung genutzt habe, ist die Verwendung von Zufallszahlen zur Berechnung der Bestell- und Lieferorte in der Funktion newDestination(). Um diese im Order Activity-Workflow korrekt zu verwenden, musste ich Cadence mit der Methode Workflow.sideEffect()
sagen, dass er Nebeneffekte hat. So können Workflows die bereitgestellte Funktion einmal ausführen und das Ergebnis im Workflow-Verlauf speichern. Das Ergebnis des aufgezeichneten Verlaufs wird zurückgegeben, ohne dass die angegebene Funktion während der Wiedergabe ausgeführt wird. Dies garantiert die deterministische Anforderung für Workflows, da bei der Wiedergabe genau das gleiche Ergebnis zurückgegeben wird. Hier ist mein Beispiel aus der startWorkflow()-Methode des Bestellungs-Workflows (oben):

startLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(baseLocation, 0.1, maxLegDistance));

deliveryLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(startLocation, 0.1, maxLegDistance));

Der Code ist in unserem Github-Repository verfügbar. Im nächsten Blog-Beitrag in dieser Reihe werden wir die Skalierbarkeit von Cadence näher betrachten und herausfinden, wie viele Drohnen gleichzeitig fliegen können.

Anhang: Beispielhafte Verfolgung

Wenn alles nach Plan verläuft, sieht eine typische (gekürzte) Drohnen- und Bestellungs-Workflow-Verfolgung so aus:

Order WF readyForDelivery activity order_0 id 939328c5-eaad-4f52-9aaf-08100fde1e84
waitForOrder got an order! topic = orderjobs2, partition = 0, offset = 17, key = , value = 939328c5-eaad-4f52-9aaf-08100fde1e84
Drone Drone_0 got an order from Kafka + 939328c5-eaad-4f52-9aaf-08100fde1e84
Drone Drone_0 has got order 939328c5-eaad-4f52-9aaf-08100fde1e84
Drone Drone_0: new state = gotOrder, location = base
order order_0 got signal = droneHasOrder
Drone Drone_0 has generated a flight plan based on Order and Delivery locations
Start lat -35.20586, lon 149.09462
Order lat -35.189085007724415, lon 149.09190627324634
Delivery lat -35.1849320141879, lon 149.08653974053584
End lat -35.20586, lon 149.09462
Drone Drone_0 flight plan total distance (km) = 4.9930887486783835
Drone Drone_0 estimated total flight time (h) = 0.24965443743391919
Drone Drone_0 distance to order (km) = 1.881431511294941
Drone Drone_0 estimated time until order pickup (h) = 0.09407157556474705
Drone Drone_0 distance to delivery (km) = 2.5530367293280447
Drone Drone_0 estimated time until delivery (h) = 0.12765183646640224
Drone Drone_0: new state = flightPlanGenerated, location = base
Drone + Drone_0 flying to pickup Order
Drone Drone_0: new state = flyingToOrder, location = betweenBaseAndOrder
order order_0 got signal = droneOnWayForPickup
Drone WF gpsLocation = lat -35.20586, lon 149.09462
start loc = lat -35.20586, lon 149.09462
Drone flew to new location = lat -35.20536523834301, lon 149.09453994515312
Distance to destination = 1.825940473393331 km
Drone Drone_0 gps location update lat -35.20536523834301, lon 149.09453994515312
Drone Drone_0 charge now = 99.44444444444444%, last used = 0.5555555555555556
Drone WF gpsLocation = lat -35.20536523834301, lon 149.09453994515312
start loc = lat -35.20586, lon 149.09462
Drone flew to new location = lat -35.20487047663333, lon 149.09445989128173
Distance to destination = 1.770449431350198 km
Drone Drone_0 gps location update lat -35.20487047663333, lon 149.09445989128173
Drone flew to new location = lat -35.204375714870956, lon 149.0943798383858
Distance to destination = 1.714958384759105 km
Drone Drone_0 charge now = 98.88888888888889%, last used = 0.5555555555555556
...
Drone flew to new location = lat -35.189085007724415, lon 149.09190627324634
Distance to destination = 0.0 km
Drone Drone_0 gps location update lat -35.189085007724415, lon 149.09190627324634
Drone arrived at destination.
Drone Drone_0 charge now = 81.11111111111106%, last used = 0.5555555555555556
Drone Drone_0 picking up Order
Drone Drone_0: new state = pickingUpOrder, location = orderLocation
Drone Drone_0 picking up Order!
Drone Drone_0 picked up Order!
order order_0 got signal = pickedUpByDrone
Drone Drone_0 charge now = 77.77777777777773%, last used = 3.3333333333333335
Drone Drone_0 delivering Order...
Drone Drone_0: new state = startedDelivery, location = betweenOrderAndDelivery
Order new location = orderLocation
order order_0 got signal = outForDelivery
Order new location = onWay
Drone WF gpsLocation = lat -35.189085007724415, lon 149.09190627324634
start loc = lat -35.189085007724415, lon 149.09190627324634
Drone flew to new location = lat -35.188741877698526, lon 149.09146284539662
Distance to destination = 0.6161141689892213 km
Drone Drone_0 gps location update lat -35.188741877698526, lon 149.09146284539662
Drone Drone_0 charge now = 77.22222222222217%, last used = 0.5555555555555556
Drone flew to new location = lat -35.18839874605641, lon 149.09101942129195
Distance to destination = 0.5606231205354245 km
Order  GPS Location lat -35.188741877698526, lon 149.09146284539662
...
Drone flew to new location = lat -35.1849320141879, lon 149.08653974053584
Distance to destination = 0.0 km
Drone Drone_0 gps location update lat -35.1849320141879, lon 149.08653974053584
Drone Drone_0 charge now = 70.55555555555549%, last used = 0.5555555555555556
Drone arrived at destination.
Order GPS Location lat -35.1849320141879, lon 149.08653974053584
Drone + Drone_0 dropping Order!
Drone Drone_0: new state = droppingOrder, location = deliveryLocation
Drone + Drone_0 dropped Order!
Order  new location = deliveryLocation
Drone Drone_0 charge now = 67.22222222222216%, last used = 3.3333333333333335
Drone Drone_0 returning to Base
Drone Drone_0: new state = returningToBase, location = betweenDeliveryAndBase
order order_0 got signal = delivered
Drone WF gpsLocation = lat -35.1849320141879, lon 149.08653974053584
start loc = lat -35.1849320141879, lon 149.08653974053584
Drone flew to new location = lat -35.1854079590788, lon 149.08672345348626
Distance to destination = 2.384560977495208 km
Drone Drone_0 gps location update lat -35.1854079590788, lon 149.08672345348626
Drone flew to new location = lat -35.18588390369231, lon 149.08690716858857
Distance to destination = 2.3290699362110967 km
Drone Drone_0 charge now = 66.6666666666666%, last used = 0.5555555555555556
...
Drone flew to new location = lat -35.20586, lon 149.09462
Distance to destination = 0.0 km
Drone Drone_0 charge now = 43.3333333333332%, last used = 0.5555555555555556
Drone Drone_0 gps location update lat -35.20586, lon 149.09462
Drone arrived at destination.
Drone Drone_0 charge now = 42.777777777777644%, last used = 0.5555555555555556
Drone + Drone_0 returned to Base!
Drone Drone_0: new state = backAtBase, location = base
Drone Drone_0: new state = checkOrder, location = base
Drone Drone_0: new state = droneDeliveryCompleted, location = base
Drone Drone_0: new state = charging, location = base
Drone Drone_0 charging! charging time = 515s
Drone Drone_0: new state = charged, location = base
order order_0 got signal = orderComplete
Order WF exiting!

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

In Teil 3 meiner Cadence-Blogserie habe ich die Drone Delivery Demo-Anwendung vorgestellt und mich dabei auf den Drohnen-Workflow konzentriert. In diesem Beitrag werden wir uns Drone Delivery aus der Perspektive der Bestellungs-Workflows ansehen, erfahren, wie die Drohnen- und Bestellungs-Workflows miteinander interagieren, einige zusätzliche Cadence+Kafka-Integrationsmuster entdecken und einige neue Cadence-Funktionen (z. B. Neuversuche, Nebeneffekte, Abfragen und Als neu fortfahren) genauer betrachten.

1. Die Architektur der Drone Delivery-Anwendung

Damit wir tiefer in die Drone Delivery-Anwendung eintauchen können, verwenden wir dieses übersichtliche Architekturdiagramm, um den Aufbau besser verstehen zu können. Die oberen 2 Reihen sind die Apache Kafka®-Komponenten, und die unteren 2 Reihen sind die Cadence-Workflows. Ich habe die Workflow-Schritte aus Gründen der Übersichtlichkeit vereinfacht. In den unteren Reihen wird jeder Drohnen-Workflow einer physischen Drohne zugeordnet, wobei die Schritte den Ereignissen im Lebenszyklus der Drohnenlieferung entsprechen.

Aus der Bestellungsperspektive spiegeln die nummerierten Schritte die Ereignisse im Lebenszyklus von Bestellungen/Lieferungen wie folgt wider:

  1. Neue Bestellung erstellen
  2. Neuen Bestellungs-Workflow erstellen
  3. Bestellung bereit für Lieferung
  4. Drohne wartet auf Bestellung
  5. Drohne hat Bestellung zugeordnet
  6. Bestellung wurde von Drohne aufgenommen und befindet sich in Auslieferung, Standort wird während des Flugs aktualisiert
  7. Bestellung geliefert und geprüft, Bestellung abgeschlossen

Sehen wir uns nun den Bestellablauf genauer an.

2. Neue Bestellung

Ein Kunde bestellt etwas über die Drone Delivery App und löst so den Bestell- und Drohnenlieferprozess aus (Quelle: Shutterstock)

Der erste Schritt im Lebenszyklus einer Bestellung besteht darin, dass ein Kunde etwas über eine Drohnenlieferungs-App bestellt. Wir gehen davon aus, dass dies ein Ereignis „Neue Bestellung erstellen“ auslöst, das in das Kafka-Topic „Neue Bestellungen“ gestellt wird. Dies bringt uns zum ersten unserer neuen Cadence+Kafka-Integrationsmuster, dem unten stehenden Muster „Neuen Cadence-Workflow von Kafka starten“ (1).

Dieses Muster ist einfach. Ein unabhängiger Kafka-Consumer läuft ständig und holt die nächste Bestellung aus dem Topic „Neue Bestellungen“ ab. Mithilfe eines Cadence-Clients erstellt und startet er eine neue Bestellungs-Workflow-Instanz. Hier ist ein beispielhafter Code dafür:

WorkflowClient workflowClient =
                WorkflowClient.newInstance(
                        new WorkflowServiceTChannel(ClientOptions.newBuilder().setHost(host).setPort(7933).build()),
                        WorkflowClientOptions.newBuilder().setDomain(domainName).build());

        Properties kafkaProps = new Properties();

        try (FileReader fileReader = new FileReader("consumer2.properties")) {
            kafkaProps.load(fileReader);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // uses a unique group
        kafkaProps.put("group.id", "newOrder");

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
            consumer.subscribe(Collections.singleton(newordersTopicName));

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                	System.out.print("Consumer got new Order WF creation request! ");
                    System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %s",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                    
                    String orderName = record.value().toString();
                    
                    OrderWorkflow orderWorkflow = workflowClient.newWorkflowStub(OrderWorkflow.class);
                    System.out.println("Starting new Order workfow!");
                	WorkflowExecution workflowExecution = WorkflowClient.start(orderWorkflow::startWorkflow, orderName);
                    System.out.println("Started new Order workfow! Workflow ID = " + workflowExecution.getWorkflowId());
                    }
                }
            }
        }

Dies ist also ein Beispiel für die Ausführung von Cadence-Code in einem Kafka-Consumer. Etwas Ähnliches haben wir bereits in Teil 2 gesehen, wo wir ein Signal an einen laufenden Cadence-Workflow in einem Kafka-Consumer gesendet haben. Der Unterschied besteht darin, dass wir in diesem Beispiel einen Cadence-Workflow starten.

3. Der Bestellungs-Workflow

Der Bestellungs-Workflow ist ganz einfach. Nach dem Start werden zufällige Bestellungs- und Lieferorte generiert (die garantiert innerhalb der Reichweite der Drohne liegen, damit sie auch zur Basis zurückkehren kann), in einer Activity wird eine Nachricht an Kafka gesendet, um mitzuteilen, dass die Drohne bereit für die Lieferung ist (siehe unten), der Status des Ortes wird in einer Schleife aktualisiert (die über ein Signal vom Drohnen-Workflow empfangen wird) und dann wird gewartet, bis der Status „orderComplete“ erreicht ist, um den Prozess zu beenden. Andere Activities sind denkbar, z. B. die Überprüfung auf Lieferverletzungen und das Senden von Standortaktualisierungen an Kafka zur Analyse und Zuordnung.

public static class OrderWorkflowImpl implements OrderWorkflow {
@Override
        public String startWorkflow(String name) {
        	System.out.println("Started Order workflow " + name + ", ID=" + Workflow.getWorkflowInfo().getWorkflowId());
        	        	
        	// Order creates fake order and delivery locations
// randomly generated but within range of Drones
        	startLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(baseLocation, 0.1, maxLegDistance));
        	System.out.println("Order WF startLocation = " + startLocation.toString());
        	
          	deliveryLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(startLocation, 0.1, maxLegDistance));
        	System.out.println("Order WF deliveryLocation = " + deliveryLocation.toString());

        	// A real activity - request a drone - wraps a Kafka producer
        	activities.readyForDelivery(name);
        	boolean delivered = false;      	
          	String endState = "orderComplete";
        	
        	while (!delivered)
        	{
          		Workflow.await(() -> newState != "");
        		System.out.println("order " + name + " got signal = " + newState);
        		updates.add(newState);
        		if (newState.equals(endState))
        		{
        			delivered = true;
        			System.out.println("Order WF exiting!");
        		}
          		lastState = newState;
        		newState = "";      		
          	} 	
        	return "Order " + name + " " + endState;	
        }
}

Wir werden die Nebeneffekte unten erklären.

4. Die Drohne erhält die nächste Bestellung zur Lieferung

Ein Taxistand modelliert Menschen (Bestellungen), die in der Schlange auf das nächste verfügbare Taxi (Drohne) warten. (Quelle: Shutterstock)

Sobald die Bestellung zur Abholung bereit ist (möglicherweise nach einer Verzögerung aufgrund der Vorbereitungszeit für die Bestellung), sind wir bereit für die entscheidende Koordination zwischen den Workflows der Drohne und der Bestellung unter Verwendung des Cadence+Kafka-Musters „nächsten Auftrag aus einer Warteschlange holen“ (2, 3).

Wir erhoffen uns von dieser Interaktion, dass (a) die Drohnen bereit sind, eine Bestellung auszuliefern, (b) die Bestellungen zur Auslieferung bereit sind, (c) genau eine Bestellung genau einer Drohne zugewiesen wird. Das heißt, wir wollen nicht, dass sich Drohnen um Bestellungen „streiten“, dass Drohnen versuchen, mehr als eine Bestellung auszuliefern, oder dass Bestellungen, denen keine Drohne zugewiesen wird, nicht ausgeführt werden. (a) und (b) können in beliebiger Reihenfolge auftreten, und es können jederzeit 0 oder mehr Drohnen oder Bestellungen bereitstehen.

Wie funktioniert dieses Muster in der Praxis? Es besteht tatsächlich aus zwei Cadence+Kafka-Untermustern.

Das erste Muster (2) ist eine einfache Ein-Wege-Benachrichtigung von Cadence an Kafka. Der Bestellungs-Workflow hat eine Activity, readyForDelivery(), die einen Kafka-Producer umschließt. Dies ist ein Fernaufruf, der fehlschlagen kann. Deshalb habe ich eine Cadence Activity verwendet, obwohl sie nicht lange läuft und nicht auf eine Antwort wartet, anders als das Cadence+Kafka-Microservices-Muster, das wir in Blog 2 demonstriert haben, das eine Benachrichtigung sendet und dann auf eine Antwort von Kafka wartet. Der Producer sendet die ID der Bestellung an das Topic „Bestellungen bereit“ und dann blockiert der Bestellungs-Workflow, während er mit Workflow.await() auf ein Signal von einer Drohne wartet, das besagt, dass die Bestellung abgeholt wurde.

Aber wie nimmt die Drohne eine vorbereitete Bestellung an? Hier kommt das zweite Muster ins Spiel (3). Der Drohnen-Workflow hat eine Activity „Warten auf Bestellung“ (3). Diese umschließt einen Kafka-Consumer (3a), der tatsächlich im Cadence Activity-Thread läuft. Er ist also vorübergehend und dauert nur so lange, wie die Activity läuft. Der Workflow fragt das Topic „Bestellungen bereit“ ab, bis eine einzelne Bestellung zurückgegeben wird (3b), wodurch die Activity abgeschlossen wird (3c).

Kafka-Consumer werden für diesen Anwendungsfall etwas anders als normalerweise verwendet. Es gibt genau einen Consumer pro Drohnen-Workflow im Zustand „Warten auf Bestellung“. Der Consumer fragt das Topic so lange ab, bis ein einziger Datensatz zurückgegeben wird, und wird dann beendet. Um sicherzustellen, dass nur 1 Bestellung zurückgegeben wird, haben wir max.poll.records auf 1 gesetzt.

Alle diese Consumer teilen sich eine gemeinsame Consumer-Gruppe, sodass die Bestellungen auf alle wartenden Drohnen verteilt werden, aber nur eine Drohne die jeweilige Bestellung erhalten kann. Wir verwenden keinen Kafka-Schlüssel, sodass die Datensätze einfach nach dem Round-Robin-Prinzip an die Consumer geliefert werden. Es kann ein gewisser Overhead entstehen, weil Consumer regelmäßig der Gruppe beitreten und sie wieder verlassen (hauptsächlich Verzögerung durch Neuverteilung). Und wenn die Anzahl der Drohnen steigt, muss die Anzahl der Partitionen des Topics erhöht werden, um sicherzustellen, dass es genügend Partitionen für die Anzahl der Consumer gibt. Die Regel lautet: Partitionen >= Consumer. Sie könnten versucht sein, die Anzahl der Partitionen zu Beginn sehr hoch anzusetzen, aber frühere Experimente haben gezeigt, dass zu viele Partitionen den Durchsatz des Kafka-Clusters verringern können und dass es eine optimale Anzahl von Partitionen gibt, die von der Größe des Clusters abhängt (<= 100 Partitionen ist für den normalen Betrieb in Ordnung). Wenn Sie mehr Drohnen haben, erhöhen Sie einfach die Größe Ihres Kafka-Clusters, um damit Schritt zu halten. Hier ist die Implementierung der waitForOrder() Activity:

public static class DroneActivitiesImpl implements DroneActivities
{
public String waitForOrder(String name) {
        	        	 
        	 // Kafka consumer that polls for a new Order that's been created and is ready for pickup to trigger Drone delivery trip
        	 // Each Drone can only have 1 order at a time, and each order can only be delivered by 1 drone (or drone wars may result)
          	 Properties kafkaProps = new Properties();

             try (FileReader fileReader = new FileReader("consumer2.properties")) {
                 kafkaProps.load(fileReader);
             } catch (IOException e) {
                 e.printStackTrace();
             }
             
             // set max.poll.records to 1 so we onlyu get 1 order at time. 
             // All consumers waiting for order are in their own shared consumer group
             // NOTE that this means we need partitions >= number of Drones - assumption is this is < 100 for performance reasons
             kafkaProps.put("group.id", "waitForOrder");
             kafkaProps.put("max.poll.records", "1");
             
             try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {
                 consumer.subscribe(Collections.singleton(orderjobsTopicName));
                 while (true) {
                     ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                     for (ConsumerRecord<String, String> record : records) {
                         System.out.print("waitForOrder got an order! ");
                         System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %s",
                                 record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                         // ensure that we don't get this order again
                         consumer.commitAsync();
                         return record.value().toString();
                     }
                 }
             }
             catch (Exception e)
             {
            	 e.printStackTrace();
             }
			return "";
         }
}

Der Code ist in unserem Github-Repository verfügbar.

Das soll es für diesen Blog-Beitrag gewesen sein. Im nächsten Teil werden wir mit einer Zusammenfassung der verwendeten Cadence- & Kafka-Integrationsmuster fortfahren und einige der neuen Cadence-Funktionen genauer betrachten.

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

Lassen Sie uns eine neue und komplexere Cadence-Anwendung für Drohnen-Lieferungen erstellen! Dies ist der erste Teil einer mehrteiligen Reihe über Cadence-Drohnen mit einer Einführung in das Problem der Drohnenlieferung, mit dem Cadence-Hauptworkflow für Drohnen und mit einem Anhang zu Drohnenbewegungen.

1. Der Himmel ist voller Drohnen (und Krähen)

(Quelle: Shutterstock)

Meine jüngste Silvesterlektüre war „Termination Shock“ von Neal Stephenson, ein Roman, der in der nahen Zukunft spielt, in der Drohnen allgegenwärtig sind, wobei es kurioserweise auch um dressierte Raubvögel geht, die Drohnen zerstören. Fakten sind häufig seltsamer als Fiktion, denn dies erinnerte mich an den Test eines Drohnen-Lieferdienstes in der Nähe von Canberra, wo ich lebe, der es jüngst wegen eines Krähen-Angriffs in die Nachrichten schaffte. Als ich den Lieferdienst selbst testen wollte, stellte sich leider heraus, dass er in meiner Gegend noch nicht angeboten wird, obwohl es viele perfekte Drohnen-Landeplätze in der Nähe gibt. Nun gut, vorerst musste also die Fiktion genügen.

Dafür beschloss ich aber, für mein nächstes Cadence-Experiment einen Drohnen-Lieferdienst aufzubauen (simuliert natürlich, auch wenn meine Garage mit mehreren Generationen ausrangierter Drohnen vollgestopft ist). Im Experiment soll der örtliche Versuch als Vorbild dienen, bei dem die Kunden bei ausgewählten teilnehmenden Geschäften kleine Artikel (z. B. Medikamente, Kaffee, Lebensmittel, Kleinteile) bestellen können. Es gibt einen zentralen Drohnenstützpunkt (ähnlich, aber viel langweiliger als das Versteck eines klassischen Film-Schurken oder Tracy Island aus der Science-Fiction-Serie „Thunderbirds Are Go!“ oder die Drohnensammelplätze der Bienenvölker), wo sich alle Drohnen befinden, wenn sie nicht gerade eine Bestellung ausliefern.

Drohnensammelplätze von Bienen (Quelle: Shutterstock)

Der Stützpunkt ist der Ort, an dem die Batterien der Drohnen aufgeladen werden. Wenn eine Bestellung versandfertig ist, fliegen die Drohnen vom Stützpunkt zum jeweiligen Geschäft, holen die Bestellung ab und fliegen zum Standort des Kunden, um die Bestellung auszuliefern. Danach fliegen sie zurück zum Stützpunkt, und der Prozess beginnt von vorn. Da die Drohnen autonom sind, kann es viele von ihnen geben. Es gibt allerdings Grenzen hinsichtlich Gewicht der Artikel, Einsatzzeiten, Bereiche, in denen geflogen werden darf oder nicht (z. B. Flugverbotszonen), dazu viele potenzielle Fehlerarten (z. B. Nichterreichen des Zielstandorts, Nichtzustellung der Bestellung, Absturz usw.) und die maximale Reichweite der Drohnenbatterie mit ausreichendem Sicherheitspuffer, um zum Stützpunkt zurückzukehren (insbesondere, wenn sie angreifenden Krähen ausweichen müssen oder vom Kurs abgebracht werden usw.). Die einzigen Faktoren, die ich zunächst berücksichtigen wollte, waren Entfernung und Flugzeit.

2. Anwendung Drohnenlieferung: Cadence Workflow-Design

Der allgemeine Drohnen-Workflow (Quelle: Shutterstock – Zusammenstellung)

Ich wollte bei meiner Demo für Drohnenlieferungen die Dinge einfach, aber trotzdem interessant halten. Daher beschloss ich, alles, was stateful ist und Aktionen oder Aufgaben haben kann, als Cadence Workflow zu implementieren. Es gibt zwei Workflow-Typen, die miteinander interagieren.

Jede Drohne ist als Workflow modelliert, wobei verschiedene Zustände durchlaufen werden: Bereit am Stützpunkt, Warten auf eine Bestellung, Fliegen zur Abholung der Bestellung, Abholen der Bestellung, Fliegen zum Zielstandort, Zustellen der Bestellung und dann Zurückfliegen zum Stützpunkt und Aufladen. Nach jedem Lieferzyklus startet der Drohnen-Workflow eine neue Instanz (mit derselben Workflow-ID, aber einer anderen Ausführungs- oder Run-ID) und ist bereit für die nächste Lieferung.

Außerdem habe ich entschieden, Bestellungen als Cadence Workflow zu modellieren. Der Grund dafür ist, dass auch Bestellungen einen Zustand und Zustandsübergänge von einem Zustand zum nächsten haben (z. B. Annahme der Bestellung, Bereit zur Abholung, Von Drohne abgeholt, Unterwegs, Geliefert, Bestellung abgeschlossen). Außerdem gibt es Aktionen. Der Bestellungs-Workflow ist beispielsweise dafür verantwortlich, zufällige Abhol- und Lieferorte für Bestellungen zu generieren, die sich innerhalb der Drohnenreichweite des Stützpunkts befinden, und anzugeben, wann die Bestellung zur Abholung bereit ist. Wir werden diesen Bestellungs-Workflow im nächsten Blog genauer unter die Lupe nehmen.

Im Folgenden sind die Hauptschritte des Cadence Workflow für Drohnen aufgeführt. Zuerst wird für jede Drohne genau eine Drohnen-Workflow-Instanz erstellt. Der Workflow modelliert den kompletten Liefervorgang einer Bestellung, einschließlich Aufladen der Drohne, und startet dann eine neue Instanz mit einer voll aufgeladenen Drohne, die für die nächste Lieferung bereit ist. Beachten Sie, dass dieser Code nur die @WorkflowMethod (entry point)-Methode für die Drohnen-Workflow-Implementierung darstellt und daher nicht vollständig ist. Der Code verwendet bei einigen Schritten von mir bereitgestellte Hilfsfunktionen, und er benötigt zur Ausführung die Workflow-Schnittstelle und Workflow-Activities. Der gesamte Code ist hier verfügbar.

// missing code

public static class DroneWorkflowImpl implements DroneWorkflow {

// missing code

@Override

    public String startWorkflow(String name) {

    

     droneName = name;

    

     System.out.println("Started Drone workflow " + name + ", ID=" + Workflow.getWorkflowInfo().getWorkflowId());

    

     // STEP 0 - ready

     // Drones always start ready, at the base location

     newStateAndLocation("ready", "base");

        

     // STEP 1 - wait for an Order

     // this step calls a real activity which blocks until an order arrives

     // returns an OrderWorkflow which we used to signal Order WF, also sets OrderID which is just a String

     orderWorkflow = step1_GetOrder();

     newStateAndLocation("gotOrder", "base");




    

        // STEP 2 - generate "flight plan" using the order and delivery locations from the Order

     // The Order WF is responsible for generating random order and delivery locations that are within Drone range

        step2_GenerateFlightPlan();

     newStateAndLocation("flightPlanGenerated", "base");




        // STEP 3 - another real activity - flying to get the order 

        System.out.println("Drone + " + name + " flying to pickup Order");

     newStateAndLocation("flyingToOrder", "betweenBaseAndOrder");

     // Let the Order WF know that the drone is on the way

        orderWorkflow.signalOrder("droneOnWayForPickup");

        // nextLeg is where the Drone movement is calculated, causing the drone to "fly" from planStart to planOrder locations

        // false and null arguments ensure that the Order location isn't updated yet, but charge is reduced

        activities.nextLeg(planStart, planOrder, false, null);

        

        // STEP 4 - arrived at order location, collect order - this takes time and uses charge to

        System.out.println("Drone + " + name + " picking up Order");

        newStateAndLocation("pickingUpOrder", "orderLocation");

        step4_pickUpOrder();

        

        // STEP 5 - flying to deliver the order   

        System.out.println("Drone + " + name + " delivering Order...");

        newStateAndLocation("startedDelivery", "betweenOrderAndDelivery");

        // next GPS location drone flies to

        nextGPSLocation = planDelivery;

        // let Order WF know the delivery has started

        orderWorkflow.signalOrder("outForDelivery");

        orderWorkflow.updateLocation("onWay");

        // drone flies to delivery location, updating drone and order locations and drone charge as it goes

        activities.nextLeg(planOrder, planDelivery, true, orderID);

        

        // STEP 6 - drop order

        System.out.println("Drone + " + name + " dropping Order!");

        newStateAndLocation("droppingOrder", "deliveryLocation");

        step6_dropOrder();

         

        // Step 7 - return to base

        System.out.println("Drone + " + name + " returning to Base");

        newStateAndLocation("returningToBase", "betweenDeliveryAndBase");

        nextGPSLocation = planEnd;

        // fly back to base, update drone location and charge, but not Order location as it's already been delivered.

        activities.nextLeg(planDelivery, planEnd, false, null);

        

        // STEP 8 - back at base

        System.out.println("Drone + " + name + " returned to Base!");

        newStateAndLocation("backAtBase", "base");

 

        // STEP 9 - check order - if successful then Order WF completes

        newStateAndLocation("checkOrder", "base");

        step9_checkOrder();  

        

        // Step 10 - delivery complete

        newStateAndLocation("droneDeliveryCompleted", "base");

              

        // Step 11 - charge

        newStateAndLocation("charging", "base");

        step11_recharge();

        

        // Step 12 - fully recharged

        newStateAndLocation("charged", "base");

        

        System.out.println("Starting new Drone delivery WF with coninueAsnew with same WF ID!");

        

        Workflow.continueAsNew(name);

        

     return "Drone Delivery Workflow " + name + " completed!";

    }

}

3. Anwendungsfall für Cadence: Drohnenlieferung von A nach B

Drohnenlieferung von A nach B (Quelle: Shutterstock)

Mit dem obigen Code und dieser Abbildung scheinen Drohnenlieferungen von einem Standort zum anderen eine einfache Sache zu sein. In der Praxis ist die Sache jedoch etwas komplexer, was mich motiviert hat, bei einigen Workflow-Schritten Cadence-Activity zu verwenden.

Der Standort ist bei Drohnenlieferungen der wichtigste Faktor. Ich habe entschieden, Koordinaten mit Längen- und Breitengrad (als Dezimalwerte) mit einer Auflösung von 1 m zu verwenden. Jede Drohne hat ein „GPS“, mit dem die Position der Drohne verfolgt wird, und die Workflow-Instanz der Drohne hat einen Standort-Zustand. Drohnen starten am Stützpunkt-Standort und kehren (hoffentlich) wieder dorthin zurück. Bestellungen und deren zugehörige Workflow-Instanzen haben außerdem einen Standort-Zustand. Dieser umfasst den Standort der Abholung, den Standort der Zustellung und den eigentlichen Standort (aktuelle Position) der Bestellung, der während des Transports der Bestellung durch die Drohne aktualisiert wird. Dies ist wichtig, da der Drohnen-Lieferdienst und das Geschäft jederzeit wissen möchten, wo sich die Bestellung befindet. Weiterhin können so auch die Kunden verfolgen, wann ihre Lieferung eintrifft, und diese dann von der Drohne in Empfang nehmen (und vor verärgerten Krähen retten).

Hier ist die Karte mit einem Beispiel für eine komplette Drohnenlieferung:

Bevor sich die Piraten auf Schatzsuche begeben, lassen sie sich aus dem Dorf noch eine Mahlzeit liefern! (Quelle: Shutterstock)

Aber wie kommt eine Drohne von einem Standort zum anderen? Ich habe ein einfaches Drohnenmathematik-Package DroneMaths geschrieben, das bei den Berechnungen hilft. Wenn Sie an Details interessiert sind, wie wir die Drohnenbewegungen von einem Standort zum anderen berechnen, werfen Sie einen Blick in den Anhang unten. Die Funktion DroneMaths.nextPosition(start, end, speed, time) berechnet die nächste Position der Drohne unter Verwendung des Startstandorts (start), des Zielstandorts (end) und von Geschwindigkeit (speed) und Flugzeit (time), was dann in der nextLeg()-Activity verwendet wird.

4. Cadence Workflow Activity

Die nextLeg()-Activity ist für die Simulation der Drohnenbewegung in jedem Teilabschnitt des Lieferflugs und für das Stoppen bei Ankunft am Ziel verantwortlich. Die Activity aktualisiert die Drohnenposition und den Ladestand der Drohnenbatterie und optional den Bestellstandort während des Flugs. Die Activity kehrt zurück, wenn die Drohne angekommen ist. Wir verwenden dafür eine Cadence Activity, da diese potenziell über längere Zeit ausgeführt wird, durch Verwendung von DroneMaths rechenintensiv ist und daher in einem vom Drohnen-Workflow separaten Thread ausgeführt werden sollte. Und sie kann potenziell fehlschlagen (wie bei einer echten Drohne, d. h. einer nicht simulierten, wirklich fliegenden Drohne, die mit der realen Welt interagieren muss, was sie zu einem guten Beispiel für eine nicht deterministische Activity macht). In diesem Fall soll sichergestellt sein, dass die Activity neu gestartet und die Drohnenbewegung von dem Punkt aus fortgesetzt wird, wo die Unterbrechung stattfand. Wir werden diesen Aspekt aber erst im nächsten Blog berücksichtigen, wenn wir uns eingehender mit der Behandlung von Ausnahmen befassen.

public static class DroneActivitiesImpl implements DroneActivities

{




// “Fly” from start to end location, return when drone arrives 

// Update Drone location and charge, and Order Location only if required.

public void nextLeg(LatLon start, LatLon end, boolean updateOrderLocation, String orderID) {         

         WorkflowExecution execution = Activity.getWorkflowExecution();

         String id = execution.getWorkflowId();  

DroneWorkflow droneWF = workflowClient.newWorkflowStub(DroneWorkflow.class, id);

 

OrderWorkflow orderWF = workflowClient.newWorkflowStub(OrderWorkflow.class, orderID);  

LatLon here = start;

       

         while (true)

      {

         try {

Thread.sleep((int)(moveTime * 1000 * timeScale));

} catch (InterruptedException e) { e.printStackTrace();

return;

}

         

      LatLon next = DroneMaths.nextPosition(here, end, droneSpeed, moveTime);

      here = next;

      System.out.println("Drone flew to new location = " + here.toString());

      double distance = DroneMaths.distance(here, end);

      System.out.println("Distance to destination = " + distance + " km");

      // update drone location and charge         droneWF.updateGPSLocation(here);

      droneWF.updateCharge(moveTime);

      // only update Order location if drone is transporting it

      if (updateOrderLocation)

      orderWF.updateGPSLocation(here);

     

      // check if we have arrived within 1m

      if (end.sameLocation(here))

    {

System.out.println("Drone arrived at destination.");

      return;

      }

      }

         }

}

Sie werden bemerken, dass ich im Activity-Code den allgemeinen Thread.sleep()-Aufruf verwendet habe. Das ist in Ordnung, da eine Cadence Activity (im Gegensatz zu einem Cadence Workflow) beliebigen Code verwenden kann. Allerdings habe ich bei den Activities eine Einschränkung entdeckt. Es hat sich herausgestellt, dass die Argumente und Rückgabewerte einer Activity-Methode mit dem bereitgestellten DataConverter in ein Byte-Array serialisiert werden müssen (die Standardimplementierung verwendet einen JSON-Serialisierer). Daher musste ich an die Methode einen „String OrderID“-Wert übergeben und damit eine neue OrderWorkflow-Instanz erstellen, anstatt nur einen OrderWorkflow zu übergeben (der nicht serialisierbar ist).

Abschließend gibt es noch einen DroneWorkflow-Schritt step1_GetOrder(), der entscheidend für den Fortschritt des Drohnen-Workflows ist. Dieser Schritt wartet, bis eine Bestellung zur Lieferung bereit ist, und legt dann die Bestell-ID und eine OrderWorkflow-Instanz im Hauptworkflow fest. Diese Hilfsmethode ist eigentlich ein Wrapper für eine weitere Activity, die blockiert, bis eine Bestellung zur Abholung durch eine Drohne bereit ist, und die garantiert, dass jede abholbereite Bestellung von genau einer Drohne abgeholt wird. Wie funktioniert das? Im Grunde ist dies ein weiteres Beispiel für ein Cadence+Kafka-Integrationsmuster, das wir im nächsten Blog zusammen mit einigen weiteren Kafka+Cadence-Mustern untersuchen werden (einschließlich Starten eines Cadence Workflow mit Kafka, das auch beim Bestellungs-Workflow verwendet wird). Im nächsten Blog werden wir außerdem die gesamte Lösung analysieren, einschließlich des Bestellungs-Workflows, sowie weitere Cadence-Funktionen wie Abfragen, Neuversuche, Heartbeats, Als neu fortfahren und Nebeneffekte.

P.S. Hat der Drohnen-Lieferdienst zufällig von meinem Projekt gehört? Finde gerade eine Broschüre in meinem Briefkasten (ja, ziemlich „old school“, aber vielleicht per Drohne zugestellt?), die den Start des Lieferdienstes in meiner Gegend ankündigt – immer her mit den Drohen-gelieferten Sachen! (Ich habe auch festgestellt, dass deren Drohnen viel schneller sind als meine – ich werden die Durchschnittsgeschwindigkeit auf 100 km/h erhöhen müssen, um mithalten zu können).

5. Anhang – Drohnenbewegung: Standort, Entfernung, Richtung, Geschwindigkeit und Ladung!

Wie bewegt sich eine Drohne von einem Standort zum anderen? Ich habe ein einfaches Drohnenmathematik-Package DroneMaths geschrieben, das bei den Berechnungen hilft (die meisten Formeln sind hier nachzulesen). Zuerst benötigen wir eine Funktion zur Berechnung der Entfernung (in km) zwischen zwei Standorten anhand der dezimalen Längen-/Breitenkoordinaten (dabei wird auch die Erdkrümmung berücksichtigt, wobei dies für die kurzen Entfernungen, mit denen wir es hier zu tun haben, vernachlässigt werden kann – außer natürlich, die Drohnen fliegen sehr hoch). So wird sichergestellt, dass sich der gesamte Liefervorgang (einschließlich Rückkehr) im Bereich der maximalen Reichweite der Drohne abspielt.

Aber wie erfolgt die Navigation von einem Standort zum anderen? Wie sich herausstellt, handelt es sich hier um eine ganz traditionelle Navigation, wie sie für Schiffe verwendet wird. Dafür muss die Richtung (Kurs) von einem Standort zum anderen in Grad bekannt sein (0 Grad nach Norden, 90 Grad nach Osten, 180 Grad nach Süden und 270 Grad nach Westen).

Um Schiffe rund um den Erdball zu navigieren, braucht es nur Karte, Kompass und Zirkel. (Quelle: Shutterstock)

Um schließlich den nächsten „Wegpunkt“ zu ermitteln und der Drohne zu ermöglichen, auf kürzestem Weg von einem Standort zum anderen zu gelangen (also per Luftlinie, was voraussetzt, dass es auf dem Weg keine Hindernisse oder Flugverbotszonen gibt – und auch keine Krähen), verwenden wir die Funktion nextPosition() zur Berechnung der nächsten Drohnenposition. Damit wird die Richtung vom aktuellen Standort zum vorgesehenen Zielstandort ermittelt (Bestellstandort, Lieferstandort oder Stützpunkt-Standort, je nach aktuellem Zustand der Drohne), und bei gegebener Geschwindigkeit (wir nehmen an, dass sich die Drohne über die gesamte Strecke mit einer Durchschnittsgeschwindigkeit von 20 km/h bewegt) wird die im nächsten Zeitintervall (welches konfigurierbar ist und für eine höhere Geschwindigkeit als in Echtzeit-Simulationen skaliert werden kann) zurückzulegende Entfernung berechnet. Danach wird eine weitere Funktion aufgerufen, um den Standort anhand des aktuellen Standorts, der Reichweite und der Richtung zu berechnen.

Wenn wir das Ziel bereits erreicht haben, kann der Flug gestoppt und die nächste Aktion ausgeführt werden (z. B. Bestellung aufnehmen, Bestellung ablegen, zum Stützpunkt absenken). Momentan gehen wir davon aus, dass jede Flugroute erfolgreich ist, aber aus Spaß können auch verschiedene Ausnahmen wie Wind, Krähen, Zusammenstöße usw. mit der entsprechende Ausnahmebehandlung eingeführt werden (wieder je nach Drohnenzustand – wenn z. B. die Drohne aus irgendeinem Grund eine Lieferung nicht aufnehmen kann, muss die Bestellung neu geplant werden, vielleicht unter Verwendung einer Prioritätswarteschlange, sodass die nächste verfügbare Drohne erst der nicht gelieferten Bestellung zugeteilt wird, bevor weitere Bestellungen akzeptiert werden).

Die Activity nextLeg() (siehe oben) ist für die Berechnung der verschiedenen Drohnenbewegungen in jedem Abschnitt der Lieferung verantwortlich, aber sendet vor allem die aktualisierten Positionsdaten der Drohne und der zugehörigen Bestellung unter Verwendung von Cadence-Signalen, deren Funktionsweise im vorherigen Blog erklärt wurde. Auch der Drohnen-Workflow selbst signalisiert dem Bestellungs-Workflow Zustandsänderungen, z. B. mit orderWorkflow.signalOrder("droneHasOrder").

Der nächste Drohnenstandort wird anhand von aktuellem Standort, Richtung, Geschwindigkeit und Entfernung berechnet (Quelle: Shutterstock)

Drohnen werden elektrisch betrieben, sodass beim Fliegen, Schweben usw. Strom verbraucht wird. Für jede inkrementelle Entfernung, die die Drohne fliegt, reduzieren wir die Batterieladung mithilfe der Hilfsfunktion updateCharge(time), die den Ladungsverbrauch auf Basis der bereitgestellten Flugzeit berechnet.

Und natürlich muss die Batterieladung ausreichen, bis die Drohne zum Stützpunkt zurückgekehrt ist. (Quelle: Shutterstock)

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

In „Workflows mit Cadence optimieren“ haben wir Cadence, eine neue skalierbare, auf Entwickler fokussierte Open-Source-Workflow-Engine, vorgestellt. Cadence ist eine großartige Lösung für ein äußerst fehlertolerantes, zustandsabhängiges Workflow-Management, auch bekannt als „Orchestrierung“ (in Anlehnung an die Idee, dass ein Dirigent ein Orchester leitet). Eine andere beliebte Architektur für große nachrichtenbasierte dezentrale Systeme verwendet jedoch einen losgelösten Ansatz, der als „Choreografie“ bekannt ist (in Anlehnung an die Idee, dass Tänzer mit ihren unmittelbaren Nachbarn interagieren). Apache Kafka ist ein gängiges Beispiel und erlaubt zahlreichen Microservices, skalierbar und mit geringer Latenz miteinander zu kommunizieren, ohne dass eine zentrale Zustandsverwaltung erforderlich ist.

In vielen Unternehmen ist es allerdings üblich, schon jetzt zahlreiche Kafka-Microservices zu nutzen. Darüber hinaus verwenden typische moderne Unternehmenssysteme mehrere Technologien, sodass man im Endeffekt beide Architekturen kombinieren und Kafka-Microservices mit Cadence-Workflows integrieren muss. Anstatt nur ein Orchester oder nur Tänzer zu engagieren, müssen Sie ein Ballett aufführen, das beides kombiniert.

(Quelle: Shutterstock)

Es gibt eine ganze Reihe von Anwendungsfälle für Kafka in Kombination mit Cadence, in diesem Blog konzentrieren wir uns jedoch auf die Vorteile bei der Wiederverwendung von Kafka-Consumer-basierten Microservices aus Cadence-Workflows. Wie kann man also eine Nachricht von Cadence an Kafka senden, um eine Activity in Kafka zu starten, und wie wartet man asynchron auf eine Antwort und empfängt diese schließlich? In meiner Blog-Serie Anomalia Machina habe ich beispielsweise darüber berichtet, wie ich ein System zur Erkennung von Anomalien als Kafka-Consumer implementiert habe.

Was aber, wenn wir eine Prüfung auf Abweichungen im Rahmen eines Cadence-Workflows durchführen wollen?

Um eine Nachricht an Kafka zu senden, muss lediglich ein Kafka-Producer mit den richtigen Metadaten im Rahmen einer Cadence Activity Method verwendet werden, um sicherzustellen, dass eine Antwort gesendet/empfangen werden kann. Es gibt dabei verschiedene Wege, auf eine Antwort zu warten und sie zu erhalten. Das schauen wir uns jetzt mal genauer an.

1. Eine Nachricht von Cadence an Kafka senden

(Quelle: Shutterstock)

Glücklicherweise ist es ein wenig einfacher, eine Nachricht an Kafka zu senden, als Flaschenpost zu versenden, in der Hoffnung, dass die enthaltene Nachricht auch empfangen wird.

Bereiten Sie also zunächst Ihre „Flasche“ (den Kafka-Producer) vor, ich habe beispielsweise diese Eigenschaftsdatei verwendet:

Properties kafkaProps = new Properties();




        try (FileReader fileReader = new FileReader("producer.properties")) {

            kafkaProps.load(fileReader);

        } catch (IOException e) {

            e.printStackTrace();

        }

Die Datei producer.properties für einen Instaclustr Managed Kafka-Dienst wird folgendermaßen konfiguriert (die clusterspezifischen Details müssen über die Instaclustr-Konsole oder die Verwaltungs-API eingegeben werden):

bootstrap.servers=IP1:9092,IP2:9092,IP3:9092

key.serializer=org.apache.kafka.common.serialization.StringSerializer

value.serializer=org.apache.kafka.common.serialization.StringSerializer

security.protocol=SASL_PLAINTEXT

sasl.mechanism=SCRAM-SHA-256

sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \

    username="kafakUser" \

    password="kafkaPassword";

Anschließend wird eine Cadence Activity Method festgelegt, die eine Nachricht an Kafka sendet:

public interface ExampleActivities {

     @ActivityMethod(scheduleToCloseTimeoutSeconds = 60)

     String sendMessageToKafka(String msg);

}




public static class ExampleActivitiesImpl implements ExampleActivities {

public String sendMessageToKafka(String msg) {

// The id is the workflow id

         String id = Activity.getWorkflowExecution().getWorkflowId();

        

         // Record contains: topic, key (null), value

         ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "", msg);

                  

// Add the workflow id to the Record Header

         producerRecord.headers().add(new RecordHeader("Cadence_WFID", id.getBytes()));




             try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

                 producer.send(producerRecord);

                 producer.flush();

             } catch (Exception e) {

                 e.printStackTrace();

             }




return "done";

  }

}

Im Kafka-Datensatz wird das Zielthema, ein Nullschlüssel und der Nachrichtenwert angegeben. Mit dem Kafka-Header wird die Workflow-ID an Kafka übergeben, damit das System weiß, woher die Nachricht stammt. Ein Header enthält ein Key-Value-Paar, und jeder Kafka-Datensatz kann mehrere Header haben. Die Verwendung von Kafka-Headern zur Unterstützung von OpenTracing ist bereits bekannt.

Warum werden Header für die Workflow-ID verwendet und nicht etwa der Datensatzschlüssel?

Nun, wenn die vorhandenen Kafka-Microservices wiederverwendet werden sollen, dürfen sich die Änderungen auf der Kafka-Seite nur minimal auswirken. Bestehende Kafka-Consumers gehen von dem Schlüssel und dem Wert des Datensatzes aus, aber die Idee hinter einem Kafka-Header ist, dass man optionale Metadaten hinzufügt, wodurch bestehende Implementierungen nicht unterbrochen, sondern leicht erweitert werden können, sodass die Metadaten verstanden werden und eine entsprechende Reaktion erfolgt.

Wenn es Ihnen nur darum geht, eine Nachricht an Kafka zu senden und dann mit dem übrigen Workflow fortzufahren („Fire-and-Forget“), dann ist Ihre Arbeit hiermit getan. Eigentlich brauchen Sie keine Workflow-ID zu senden, wenn Sie keine Antwort erwarten (es sei denn, es gibt einen anderen Grund, warum der Kafka-Consumer wissen muss, dass die Nachricht von Cadence stammt). Der Workflow (in BPMN-ähnlicher Darstellung) sähe dann so aus (mit einer optionalen zweiten Aufgabe usw.).

Wir gehen allerdings davon aus, dass es in unserem Anwendungsfall eine Verarbeitung auf der Kafka-Seite als Reaktion auf den Empfang der Nachricht gibt. Das Ergebnis muss wieder vom Workflow empfangen werden, bevor weitere Activities durchgeführt werden können. Wie lässt sich die Schleife beenden?

2. Einführung von Cadence-Signalen

Signallampen werden zur Kommunikation zwischen Schiffen auf See verwendet. https://commons.wikimedia.org/wiki/File:Seaman_send_Morse_code_signals.jpg

Für eine Antwort von Kafka brauchen wir etwas Besseres als Flaschenpost. Cadence-Signale basieren auf UNIX-ähnlichen IPC-Signalen (Interprozesskommunikation), die es einem Prozess oder einem bestimmten Thread erlauben, Nachrichten zu senden, und ihn über ein Ereignis zu informieren (z.B. „kill -9 <pid>“!). In einem kürzlich erschienenen Blog über Apache ZooKeeperTM und Curator haben wir auch über die Verwendung von Signalen (Semaphoren, Abschnitt 3.3) berichtet.

Aus der Dokumentation geht hervor, dass Cadence-Signale:

„… einen vollständig asynchronen und dauerhaften Mechanismus zur Bereitstellung von Daten für einen laufenden Workflow bieten. Wenn ein Signal für einen laufenden Workflow empfangen wird, hält Cadence das Ereignis und die Nutzdaten in der Workflow-Historie fest. Im Workflow kann das Signal später jederzeit verarbeitet werden, ohne dass die Informationen verloren gehen. Außerdem kann die Ausführung des Workflows durch Blockieren eines Signalkanals gestoppt werden.”

In meinem ersten Cadence-Blog habe ich darüber berichtet, dass es bei jedem Workflow eine Schnittstelle und eine Implementierung mit mehreren möglichen Methoden gibt, aber nur eine Methode mit der Kennzeichnung @WorkflowMethod, die anzeigt, bei welcher Methode es sich um den Einstiegspunkt für den Workflow handelt. Es gibt auch andere Methoden, die mit @SignalMethod gekennzeichnet sind. Zum Beispiel:

public interface ExampleWorkflow {

        @WorkflowMethod(executionStartToCloseTimeoutSeconds = 120, taskList = activityName)

        String startWorkflow(String name);

        @SignalMethod

        void signalKafkaReply(String name);

    }

Und die Umsetzung dieser beiden Methoden:

public static class ExampleWorkflowImpl implements ExampleWorkflow { String message = "";

     private ExampleActivities activities = null;

    

     public ExampleWorkflowImpl() {

            this.activities = Workflow.newActivityStub(ExampleActivities.class);

     }

    

     @Override

     public String startWorkflow(String msg) {

         String r1 = activities.sendMessageToKafka(msg);

         Workflow.await(() -> message != "");

         System.out.println("workflow got signal = " + message);

           return message;

         }

     }

        

     @Override

     public void signalKafkaReply(String msg) {

         message = msg;

     } 

}

 

Es können ganz unterschiedliche Signalmethoden verwendet werden, aber für dieses Beispiel verwende ich aus praktischen Gründen nur eine Signalmethode. Jedes Mal, wenn ein ExampleWorkflow.signalKafkaReply-Signal an eine Workflow-Instanz gesendet wird, kommt es zum Aufruf der entsprechenden Signalmethode. Signalmethoden geben nie einen Wert zurück, sondern bewirken etwas im Hintergrund: In diesem Fall wird die Nachrichtenvariable mit dem Wert des Signalarguments belegt.

Abschließend ist noch anzumerken, dass die startWorkflow-Methode implementiert wurde, die nun die sendMessageToKafka-Activity Methode abruft. Danach wird sie sofort durch die Funktion Workflow.await() blockiert, und zwar so lange, bis die Funktion, die sie als Parameter erhält, auf True gesetzt wird, d. h. bis dieser Workflow ein Signal mit einem Nachrichtenwert erhält, der ungleich null ist. Das Design von Cadence lässt eine Auswertung der Bedingung nur bei Statusänderungen des Workflows zu (d. h. es findet kein Abruf statt).

Wie lassen sich also Cadence-Signale für die Integration mit Kafka nutzen?

2. Cadence-Signale mit Kafka integrieren: Erster Ansatz

Im Folgenden wird der Workflow unter Verwendung von Signalen beschrieben:

Wir erfahren, wie das gesamte Muster funktioniert, also das Senden einer Nachricht an Kafka, das Warten auf eine Antwort und die Meldung der richtigen Workflow-Instanz von Kafka. Die ersten zwei Teile haben wir bereits behandelt. Der letzte Teil der Aufgabe ist die Frage, wie ein Kafka-Consumer eine bestimmte Workflow-Instanz melden kann. Im zuvor genannten Kafka-Producer haben wir die Workflow-ID im Datensatz-Header gesendet. So kann der Kafka-Consumer diese Information abrufen und

  1. erkennen, dass der Datensatz von Cadence stammt, und
  2. die Antwort an den richtigen Workflow zurückschicken.

Der Kafka-Consumer muss so konfiguriert werden, dass er sich zum einen mit dem Kafka-Cluster verbindet, an den der Datensatz gesendet wurde, und zum anderen mit dem Cadence-Cluster, der diesen Workflow verwaltet (was zu KafkaConsumer– und WorkflowClient-Objekten führt). Die Hauptabrufschleife des Kafka-Consumer sieht so aus:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {

            consumer.subscribe(Collections.singleton("topic1"));




            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {

                    String id = "";

                    // If the header has Cadence_WFID key then we send the response to Cadence

                    for (Header header : record.headers()) {  

                        if (header.key().matches("Cadence_WFID"))

                         id = new String(header.value());

                    }

                    if (id != "")

                    {

                       ExampleWorkflow workflowById = workflowClient.newWorkflowStub(ExampleWorkflow.class, id);

                        workflowById.signalKafkaReply("Kafka consumer received value = " + record.value());                        

                    }

                    // else don't send signal! But do whatever is normal

                }

            }

Es handelt sich hierbei nur um ein Anschauungsbeispiel, d. h. der Kafka-Consumer verarbeitet die Anfrage nicht wirklich, und die Antwort ist nur eine Zeichenkette, die den Anfragewert enthält. Wird im Header „Cadence_WFID“ gefunden, wird zunächst mit WorkflowClient.newWorkflowStub() ein neuer Stub für den Workflow mit ExampleWorkflow.class und der Workflow-ID erstellt. Anschließend wird das Signal durch Aufruf der Signalmethode an diesen Stub gesendet. Andernfalls erfolgt die normale Verarbeitung und Lieferung der Ergebnisse. Wenn der Workflow das Signal erhalten hat, wird die Blockierung aufgehoben, und er setzt seine Arbeit fort, indem er die Antwort verwendet oder gegebenenfalls weitere Activities durchführt.

In diesem Beispiel wird davon ausgegangen, dass für jede an Kafka gesendete Nachricht nur ein Signal als Antwort gesendet wird. In Kafka können auch Consumer-Gruppen angelegt werden, sodass theoretisch mehrere Signale gesendet werden könnten. In diesem Fall spielt das jetzt keine Rolle, da es nur auf das erste Signal ankommt. Bei anderen Beispielen müssen eventuell mehrere Signale richtig verarbeitet werden.

Mehr Infos zu Cadence-Signalen gibt es hier und hier.

3. Abschluss von Activities in Cadence: Zweiter Ansatz

Abschlussarbeiten auf der Sydney Harbour Bridge („den Bogen spannen“). https://commons.wikimedia.org/wiki/File:SLNSW_44281_Sydney_ferry_Kubu_passes_near_the_Harbour_Bridge_as_the_lower_chord_is_ready_for_joining.jpg circa 1930

Der Abschluss von asynchronen Activities in Cadence (Cadence Asynchronous Activity Completion) ist ein alternativer Ansatz zur Lösung dieses Problems. Mehr Infos zum Abschluss von Activities in Cadence gibt es hier, hier und hier. Die Dokumentation zeigt, dass es sich um eine perfekte Ergänzung handelt:

Manchmal geht der Lebenszyklus einer Activity über einen synchronen Prozessaufruf hinaus. So kann zum Beispiel eine Anfrage in eine Warteschlange gestellt werden und zu einem späteren Zeitpunkt kommt eine Antwort und wird von einem anderen Worker-Prozess abgeholt. Die gesamte Anfrage-Antwort-Interaktion lässt sich als eine einzige Cadence-Activity modellieren.

Wie unterscheidet sich dieser Ansatz von dem Signalansatz? Der Hauptunterschied liegt in der Blockierung, die nun (konzeptionell) in der Activity-Method und nicht mehr im Workflow selbst stattfindet. Außerdem ist der Aufruf zum Abschluss mit der Activity und nicht mit dem Workflow verbunden. Es handelt sich jedoch immer noch um eine „Punkt-zu-Punkt“-Kommunikation, die über Prozessgrenzen hinweg funktioniert Dieser Workflow sieht dann folgendermaßen aus:

Die Implementierung des Haupt-Workflows ist jetzt ganz einfach, da weder Wartezeiten noch eine Signalmethode erforderlich sind:

public static class ExampleWorkflowImpl implements ExampleWorkflow {      




private ExampleActivities activities = null;

     public ExampleWorkflowImpl() {

            this.activities = Workflow.newActivityStub(ExampleActivities.class);

        } 

 

@Override

public String startWorkflow2(String msg) {

String r1 = activities.sendMessageToKafka2(msg);

      return r1;

} 

}

Die Activity-Method ist jedoch etwas komplizierter und sieht wie folgt aus:

public String sendMessageToKafka2(String msg) {

ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic1", "", name);  

// The task token is now used to correlate reply. 

      byte[] taskToken = Activity.getTaskToken();          

      producerRecord.headers().add(new RecordHeader("Cadence_TASKID", taskToken));




      try (KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps)) {

                 producer.send(producerRecord);

                 producer.flush();

             } catch (Exception e) {

                 e.printStackTrace();

      }

// Don’t complete - wait for completion call

      Activity.doNotCompleteOnReturn();

// the return value is replaced by the completion value

      return "ignored";

}

Alternativ zur Workflow-ID (wie zuvor bei den Signalen beschrieben) wird nun der Task-Token ermittelt und als Wert des Schlüssels „Cadence_TASKID“ in den Kafka-Header eingefügt. Die Nachricht wird wie zuvor mit einem Kafka-Producer an das Kafka-Topic gesendet. Die Activity zeigt dann jedoch mit der Methode Activity.doNotCompleteOnReturn() an, dass sie bei Rückgabe nicht „abgeschlossen“ ist. Der Rückgabewert wird „ignoriert“ (es kann sich dabei um einen beliebigen Wert handeln). Doch was bedeutet das? Grundsätzlich wird jeder Wert, der von der Activity-Method zurückgegeben wird, verworfen und später durch den Wert ersetzt, der durch den Aufruf zum Abschluss bereitgestellt wird.

Auch beim Kafka-Consumer gibt es einige Änderungen, und zwar:

ActivityCompletionClient completionClient = workflowClient.newActivityCompletionClient();




try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) {

            consumer.subscribe(Collections.singleton("topic1"));




            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {

                    byte[] task = null;

                    // If the header has key = Cadence_TASKID then send response to Cadence task using completion

                    for (Header header : record.headers()) {

                       if (header.key().matches("Cadence_TASKID"))

                        task = Arrays.copyOf(header.value(), header.value().length);

                    }

                    if (task != null)

                     completionClient.complete(task, "Kafka consumer received value = " + record.value());

                    // else process as normal

            }

        }

Zunächst wird ein ActivityCompletionClient unter Verwendung der WorkflowClient-Instanz erstellt (die mit den richtigen Cadence-Cluster-Einstellungen angelegt wurde). Wird in Kafka der Schlüssel „Cadence_TASKID“ im Datensatz-Header für ein Ereignis gefunden, wird die Abschlussmethode mit dem Task-ID-Wert und einem Response-Wert aufgerufen. Dieser Response-Wert wird von der Activity-Method zurückgegeben, die blockiert ist und auf den Abschluss wartet.

Im Kafka-Consumer kann ein „Handler“ für den Signalansatz und den Abschlussansatz vorhanden sein. Wenn keiner der beiden Ansätze im Header vorhanden ist, findet eine normale Verarbeitung statt.

Das Thema „Abschluss“ war mir immer noch ein Rätsel, also fragte ich unsere Entwickler nach weiteren Details (danke Kuangda, Matthew und Tanvir). Hier das Wichtigste in Kürze:

Der Abschluss (Completion) ist ein praktisches paralleles Programmiermuster, um auf einen externen Callback zu warten, der das Ergebnis liefert (Push), anstatt das Ergebnis abzufragen (Pull). Der ursprüngliche Rückgabewert wird ignoriert, da er eigentlich nur einen speziellen Fehler zurückgibt (mithilfe eines Golang-Fehlerbehandlungsmusters), der signalisiert, dass das Resultat, das vom Completion-Callback geliefert wird, noch aussteht.

In der Cadence-go-Client-Dokumentation wird dies näher ausgeführt. Bei Java gibt es einen ähnlichen Mechanismus, der Futures und CompletableFutures verwendet, und einen weiteren Blog.

4. Kafka und Cadence: Fazit

(Quelle: Shutterstock)

Wir sind jetzt am Ende dieses Blogs angelangt und haben die Ansätze „Signale“ und „Abschluss von Activities“ kennengelernt – zwei Lösungen für die Integration von Kafka-Microservices mit Cadence-Workflows, mit denen sich die zwei Welten der Choreografie und der Orchestrierung perfekt kombinieren lassen. Beide Lösungen funktionieren einwandfrei. Ich hoffe, die „Aufführung“ hat Ihnen gefallen.

Der Unterschied besteht im Wesentlichen darin, dass „Signale“ auf der Ebene von Workflows arbeiten, und „Abschlüsse von Activities“ dafür detaillierter angelegt sind und auf der Ebene von Aufgaben/Activities agieren. Zur Korrelation komplexerer Workflows mit mehreren gleichzeitigen Aufgaben/Activities, die mit Kafka kommunizieren, kann dies durchaus ein Vorteil sein.

Beim Ansatz „Abschluss“ regelt das Activity Timeout die Zeitüberschreitung für den gesamten Nachrichtenumlauf zu und von Kafka. Beim Ansatz „Signal“ hingegen bieten die separate Zeitüberschreitung für die Activity (Senden der Nachricht an Kafka) und das anschließende Warten auf die Antwort von Kafka mehr Flexibilität.

Abschließend sei noch darauf hingewiesen, dass „Signale“ und „Abschluss von Aktivtäten“ einen Remote-Aufruf darstellen und eine Verzögerung beim Kafka-Consumer verursachen. Der Ansatz „Signale“ erfordert 2 Remote-Aufrufe, während der Ansatz „Abschluss“ nur 1 Remote-Aufruf benötigt und daher eventuell etwas schneller ist. Es kann auch sein, dass die Anzahl der Kafka-Topic-Partitionen und -Consumer erhöht werden muss, um einen ausreichenden Durchsatz zu erzielen.

Der Code ist hier verfügbar.

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence

1. Was ist Cadence?

Cadence Logo

Beim Radfahren ist Kadenz (engl. Cadence) ein anderes Wort für Trittfrequenz, also die Drehzahl, mit der die Tretkurbel betätigt wird. Das Fahren mit hoher Kadenz wird Spinning genannt. Beim Fahren mit niedriger Kadenz, dem sogenannten Grinding oder Mashing, ist man langsamer. Und es ist schlecht für die Kniegelenke.

Die rasante Zunahme von Big-Data-Anwendungsfällen in den letzten zehn Jahren wurde durch beliebte, hochgradig skalierbare Open-Source-Technologien wie Apache Cassandra® für Speicherung, Apache Kafka® für Streaming und OpenSearch® für Suchen noch beschleunigt. Nun begrüßen wir ein neues Mannschaftsmitglied: Cadence zur Workflow-Orchestrierung.

Mein Mitbewohner ist mit seinem Cello immer auf dem Fahrrad zur Musikschule gefahren. Das sah ungefähr so wie auf dem Foto aus. Aber Musik und Fahrradfahren haben noch etwas anderes gemeinsam: Bei beidem arbeitet man mit Kadenzen!

(Quelle: Adobe Stock)

In der Musik ist eine Kadenz eine musikalische Phrase, die den Abschluss eines Stücks oder eines Abschnitts markiert. Beim Radfahren zeigt die Kadenz an, wie hoch die Trittfrequenz des Radfahrers ist, was sich wiederum auf die Effizienz und das Tempo der Fahrt auswirkt.

Als Workflow-Orchestrierung wird der Prozess bezeichnet, mit dem Reihen von Aufgaben festgelegt und ausgeführt werden, sowie die Reihenfolge, in der sie ausgeführt werden. Einige Aufgaben müssen vielleicht nacheinander ausgeführt werden, andere gleichzeitig. Nach diesem Prinzip funktionieren auch Orchesterpartituren, aus denen Dirigent, Musiker und Chor ablesen können, welche Noten wann zu spielen bzw. zu singen sind.

Von Max Reger—IMSLP, lizenzfrei, (https://commons.wikimedia.org/w/index.php?curid=51020706)

In der üblichen Workflow-Terminologie (z. B. der von BPMN) besteht ein Workflow aus Start- und Endereignissen, Activities (Atomic Task oder Composite Task), Gateways (Verzweigungen) und Sequenzen bzw. Flüssen – die Reihenfolge von Activities in einem Workflow.

In der Informatik habe ich einige der früheren Versuche erlebt, Workflows zu implementieren und zu modellieren, u. a. Enterprise Java Beans (Stateful Session Beans wurden für Workflows verwendet), ebXML, BPEL und die Modellierung und Evaluierung von Workflows mithilfe von ereignisorientierter Simulation. In der traditionellen Workflow-Modellierung wird die Ausführungssemantik mit Hilfe von semi- formalen Spezifikation, Endlichen Zustandsautomaten (EA), Markov-Modellen, ereignisorientierter Simulation und Petri-Netzen, genutzt, um Workflows festzulegen, zu visualisieren und zu analysieren.

Die Open-Source-Lösung Cadence wurde von Uber auf der Grundlage von AWS SWF (Simple Workflow Service) entwickelt. Sie nutzt Apache Cassandra (und andere Datenbanken) für Persistenz und ist auf eine hohe Fehlertoleranz und Skalierbarkeit ausgelegt. Das vielleicht überraschendste Merkmal von Cadence im Vergleich zu anderen Workflow-Engine-Ansätzen, die ich bereits kennengelernt habe, ist der Fokus auf die Unterstützung der Entwickler, Workflows hauptsächlich in Java oder Go zu schreiben (weitere Sprachen sind verfügbar). Eine spezielle Visualisierungsnotation oder ein weiteres Tool zum Festlegen des Workflow wird daher nicht benötigt – die Semantik besteht einfach nur aus Code. Mit dem Cadence-Web-Client können Workflows jedoch bei der Ausführung visualisiert werden.

(Quelle: Shutterstock)

Zum Fahrradfahren braucht man eine Menge Ausrüstung: beispielsweise ein Fahrrad (logisch…), einen Helm, Schuhe, Wasserflasche, Brille, Handschuhe, Lampe, Pumpe, Shirt und Hose aus Lycra (optional) usw. Für den erfolgreichen Einsatz von Cadence werden ebenfalls viele Elemente benötigt, unter anderem Workflows, Activities, Domains, Workflow-Clients und Worker. Sehen wir uns diese Elemente nacheinander mal genauer an.

2. Cadence – Workflows

Bei Workflows in Fabrikanlagen muss die Reihenfolge der Aufgaben eindeutig festgelegt werden. (Quelle: Shutterstock)

Beginnen wir mit dem Grundkonzept von Cadence: Workflows. Ich werde den Cadence-Java-Client benutzen, und würde Ihnen empfehlen diesen herunterzuladen und entsprechend zu konfigurieren, damit Sie ihn in der von Ihnen gewählten IDE kompilieren können. (Ich nutze Eclipse und Maven. Zur Vereinfachung habe ich die Importe weggelassen). Es gibt einige Beispiele für Java-Clients, die mich inspiriert haben. Zunächst benötigen wir eine Workflow-Interface und -Implementierung.

Workflow-Interface:

static final String activityName = "ExampleActivity";

    public interface ExampleWorkflow {

        @WorkflowMethod(executionStartToCloseTimeoutSeconds = 120, taskList = activityName)

        void startWorkflow(String name);

    }

Workflow-Implementierung:

public static class ExampleWorkflowImpl implements ExampleWorkflow {

   

     private ExampleActivities activities = null;

    

     public ExampleWorkflowImpl() {

            this.activities = Workflow.newActivityStub(ExampleActivities.class);

        }

    

        @Override

        public void startWorkflow(String name) {

         System.out.println("Started workflow! ID=" + Workflow.getWorkflowInfo().getWorkflowId());

        

         String r1 = activities.task1(name);

           String r2 = activities.task2(r1);

            

         System.out.println("Workflow result = " + r2 + " ID=" + Workflow.getWorkflowInfo().getWorkflowId());

        }

       

    }

In Cadence gibt es immer genau eine Methode mit der @WorkflowMethod-Annotation, die als Startereignis für den Workflow fungiert und den Sequenzfluss und Activities enthält. Ihr Aufruf startet eine stateful Workflow-Instanz, die schließlich endet.

In der Implementierung ruft die Methode startWorkflow() zwei weitere Methoden nacheinander auf (task1, task2), wodurch wir eine Workflow-Logik (einfach sequenziell) und 2 Activities erhalten.

Jede ausgeführte Workflow-Instanz verfügt über eine eindeutige ID, die wir im Beispiel oben verwendet haben. Sie brauchen eine Workflow-ID, da Workflows über mehrere Instanzen verfügen können.

Hier ist es aber noch komplexer. Tatsächlich funktioniert Cadence nicht nur nach dem POJI/POJO-Modell. Es ist eine Plattform, mit der sich stateful Workflows skalierbar und zuverlässig ausführen lassen.

Sie bemerken sicher die Elemente timeout, activities und taskList im vorstehenden Beispiel. Timeouts sind notwendig, weil jeder Workflow, ganz wie ein Musikstück oder eine Fahrradtour, irgendwann enden muss.

3. Cadence – Activity

Einige Leute veranstalten die sonderbarsten Sachen mit ihrem Fahrrad. Das hier wird wahrscheinlich schiefgehen. (Quelle: Shutterstock)

Activities sind ein zentrales Element von Cadence. Workflows sind stateful und fehlertolerant. In einer verteilten Microservices-Architektur möchten sie aber schließlich Remote-APIs aufrufen, um Aufgaben auszuführen. Remote-API-Aufrufe können jedoch fehlerhaft sein und nicht fehlertolerant gestaltet werden, da sie außerhalb des Cadence-Workflows liegen. Cadence nutzt sie, damit jeder Code, einschließlich Remote-Aufrufen, in eine Cadence-Activity eingebunden werden kann, unter dem Vorbehalt, dass Cadence bei Fehlern den Zustand der Activity nicht wiederherstellt und Activities garantiert höchstens einmal ausgeführt werden. (Idempotente Activities werden bei Fehlern automatisch erneut ausgeführt, nicht-idempotente Activities müssen von einer unternehmensspezifischen Logik behandelt werden, einschließlich Kompensations-Activities. Weitere Informationen zu RetryOptions finden Sie hier.)

Activities sind ebenfalls nur ein POJO/POJI-Paar und enthalten die auszuführenden Aufgaben/Methoden. Jede Methode verfügt über eine @ActivityMethod-Annotation mit Optionen.

public interface ExampleActivities

    {

     @ActivityMethod(scheduleToCloseTimeoutSeconds = 60)

     String task1(String name);

     @ActivityMethod(scheduleToCloseTimeoutSeconds = 60)

         String task2(String name);

    }




public static class ExampleActivitiesImpl implements ExampleActivities

    {

         public String task1(String arg) {

         System.out.println("task1 " + arg + " in " + Activity.getWorkflowExecution().getWorkflowId());

         return arg+"task1";

         }




    public String task2(String arg) {

         System.out.println("task2 " + arg + " in " + Activity.getWorkflowExecution().getWorkflowId());

         return arg+"task2";

         }    

    }

Wir haben die Activities bereits im Workflow-Konstrukteur oben registriert. Beachten Sie, dass man etwas anders vorgehen muss, um die Workflow-ID mit einer Activity zu verknüpfen. Brauchen wir also nur einen Workflow und Activities um loszulegen?

4. Cadence – Domains

Königreich: Staatsgebiet, dessen Beherrschung in die Domäne eines Monarchen fällt (falls Sie mal einen Tipp für das Kreuzworträtsel brauchen) (Quelle: Shutterstock)

Cadence nutzt das Konzept der Domains, die im Grunde nur ein Namensraum für die darin enthaltenen Workflows sind. Sie müssen eine Domain erstellen oder eine bestehende Domain wiederverwenden, bevor Sie einen Workflow oder Worker starten können. Hier sehen Sie ein Beispiel für die Registrierung einer neuen Domain (oder für deren Wiederverwendung, falls sie bereits besteht):

public static void registerDomain(String host, String domain)

    {  

     String nameDescription = "a new domain";

    

      IWorkflowService cadenceService = new WorkflowServiceTChannel(ClientOptions.newBuilder().setHost(host).setPort(7933).build());

        RegisterDomainRequest request = new RegisterDomainRequest();

        request.setDescription(nameDescription);

        request.setEmitMetric(false);

        request.setName(domain);

        int retentionPeriodInDays = 1;

        request.setWorkflowExecutionRetentionPeriodInDays(retentionPeriodInDays);

        try {

          cadenceService.RegisterDomain(request);

          System.out.println(

              "Successfully registered domain "

                  + domain

                  + " with retentionDays="

                  + retentionPeriodInDays);

        } catch (DomainAlreadyExistsError e) {

          System.out.println("Domain " + domain + " is already registered")

    }

Und nur für den Fall, dass Sie sich fragen, was IWorkflowService und WorkflowServiceTChannel sind (Ich habe mich das auch schon gefragt, und wir werden sie später auch nochmal benutzen)! Diese Typen sind im ServiceClient-Paket enthalten, das nicht gut dokumentiert zu sein scheint, aber es ist anscheinend die Hauptmethode, um eine Verbindung zum Cadence-Server herzustellen.

„Der ServiceClient ist ein RPC-Service-Client, der eine Verbindung zum Cadence-Service herstellt. Er dient auch als Baustein für die anderen Clients.“

Im Java-Client-Code habe ich folgende Client-Typen gefunden (im Client-Paket und im ServiceClient-Paket): client/ActivityCompletionClient, client/WorkflowClient, serviceclient/IWorkflowService. (Vielleicht gibt es noch mehr.)

5. Cadence – Workflow-Client

Wie kommunizieren Sie mit dem Cadence-Server? (Quelle: Shutterstock)

Nun sind wir so weit, dass wir den Beispielcode ausprobieren können. Hier ist etwas Beispielcode, um eine Domain und eine neue WorkflowClient-Instanz zu erstellen:

String host  = "Cadence Server IP";

    

String domainName = "example-domain";

    

registerDomain(domainName);

    

WorkflowClient workflowClient =

                WorkflowClient.newInstance(

                        new WorkflowServiceTChannel(ClientOptions.newBuilder().setHost(host).setPort(7933).build()),

                        WorkflowClientOptions.newBuilder().setDomain(domainName).build());

6. Cadence – Worker

Ich hätte nie gedacht, dass es „zu viele Fahrräder“ geben kann, bis ich in die Niederlande reiste und ganze Horden von Fahrradfahrern erlebte. Das wichtigste Zubehör ist hier nicht der Helm, sondern die Klingel! (Quelle: Adobe Stock)

Wenn Sie den Beispielcode oben versuchsweise ausführen, werden Sie feststellen, dass keine der Aufgaben ausgeführt wird und für den Workflow schließlich ein Timeout eintritt. Was ist schiefgegangen? Activities werden durch Worker ausgeführt. Bevor Arbeiten ausgeführt werden können, müssen für die Activities Worker erstellt werden. Ähnlich ist es bei den Musikern eines Orchesters. Vielleicht sind Dirigent und Partitur schon da. Aber die Partitur wird erst gespielt, wenn die Musiker mit ihren Instrumenten auf die Bühne gekommen und bereit für die Vorführung sind. Dann erst kann der Dirigent das Konzert beginnen.

Wir haben also jetzt einen WorkflowClient und können einen Worker erstellen. Sie müssen den String activityName festlegen und den Workflow und die ActivitiesImplementations im Workflow registrieren, bevor Sie diesen starten. Wenn Sie es versäumen, die Activities zu registrieren, wird kein Fehler gemeldet. Es geschieht aber auch nichts, weil Cadence darauf wartet, dass ein Worker-Prozess startet, zumindest bis zum Workflow-Timeout:

        WorkerFactory factory = WorkerFactory.newInstance(workflowClient);

        Worker worker = factory.newWorker(activityName);

        worker.registerWorkflowImplementationTypes(ExampleWorkflowImpl.class);




        worker.registerActivitiesImplementations(new ExampleActivitiesImpl());

        

        factory.start();

Beachten Sie, dass die Worker in der Regel in einem vom Workflow getrennten Prozess und aus Gründen der Skalierbarkeit auf (mehreren) Servern mit ausreichenden Ressourcen ausgeführt werden, da hier die eigentliche Workflow-Aufgabe „work“, also die Arbeit, erledigt wird.

Aber wie viele Worker haben wir tatsächlich? Es bestehen standardmäßige Beschränkungen für Gleichzeitigkeit und Rate. Die standardmäßigen Einstellungen von WorkerOptions sind

Wenn Sie clever sind und Workflows und Activities (jeweils Anzahl und Zeiten) ausreichend überwachen, könnten Sie vermutlich Little’s Law verwenden, das Gleichzeitigkeit, Durchsatz und Zeit verknüpft, um die magischen Zahlen genauer zu schätzen. Hier sehen Sie ein Beispiel für das Ändern der WorkerOptions-Standardeinstellungen, inklusive einer Erklärung, wie sinnvoll vorgegangen werden kann.

7. Cadence – Workflow-Ausführung: synchron oder asynchron

(Quelle: Shutterstock)

Jetzt ist es an der Zeit, wie folgt die Workflow-Instanz auzuführen und zu starten:

ExampleWorkflow exampleWorkflow = workflowClient.newWorkflowStub(ExampleWorkflow.class);

exampleWorkflow.startWorkflow("workflow 1");

Workflows können synchron oder asynchron gestartet werden. Zwei Instanzen starten synchron:

ExampleWorkflow exampleWorkflow = workflowClient.newWorkflowStub(ExampleWorkflow.class);

exampleWorkflow.startWorkflow("workflow 1 sync");




ExampleWorkflow exampleWorkflow2 = workflowClient.newWorkflowStub(ExampleWorkflow.class);

exampleWorkflow2.startWorkflow("workflow 2 sync");

Asynchroner Start:

ExampleWorkflow exampleWorkflow3 = workflowClient.newWorkflowStub(ExampleWorkflow.class);




CompletableFuture r3 = WorkflowClient.execute(exampleWorkflow3::startWorkflow, "workflow async");

CompletableFuture r4 = WorkflowClient.execute(exampleWorkflow3::startWorkflow, "workflow 2 async");




          try {

r3.get();

r4.get();

} catch (InterruptedException e1) {

e1.printStackTrace();

} catch (ExecutionException e1) {

e1.printStackTrace();

}

Denken Sie daran, den Abschluss der asynchronen Workflows abzuwarten (wie in diesem Beispiel mit der Verwendung von get()). Andernfalls wird nicht viel geschehen, da die Worker in diesem Beispiel in demselben Thread sind.

8. Cadence – asynchrone Activities und Blockieren

(Quelle: Shutterstock)

Nicht nur Workflows lassen sich asynchron aufrufen, auch Activities können gleichzeitig ausgeführt werden. Dies ist hilfreich, wenn Sie eine große Anzahl an gleichzeitigen Aufgaben ausführen und dann abwarten möchten, bis alle abgeschlossen sind. Lassen Sie uns beispielsweise die ursprüngliche Methode startWorkflow(String name) mit der neuen Methode startConcurrentWorkflow(int max) ersetzen. Zudem verwenden wir die ursprüngliche Activity task1() erneut, führen sie jedoch gleichzeitig aus:

        public void startConcurentWorkflow(int concurrency) {

            List<Promise> promises = new ArrayList<>();

            List processed = null;

            try {

                       for (int i=0; i<concurrency; i++)

             {

                        Promise x = Async.function(activities::task1, “subpart “ + i);

             promises.add(x);

             }

                

                // allOf converts a list of promises to a single promise that contains a list

                // of each promise value.

                Promise<List> promiseList = Promise.allOf(promises);




                // get() blocks on all tasks completing

                List results = promiseList.get();

                

                int count = 0;

                for (String s: results)

                {

                 System.out.println("Processed " + s);

                 count++;

                }

                System.out.println("Processed “ + count + " concurrent tasks");

            } finally {

                }

            }

        }

Beachten Sie, dass wir die Methode Async.function(activities::activityName, arg) und die Promise-Klasse mit den Methoden add(), allOf() und get() verwenden müssen, um korrekt zu blockieren, bis alle Activities abgeschlossen sind.

9. Cadence – Einschränkungen

Dieses Schild weist vermutlich darauf hin, dass hier keine Traktoren, Pferdekutschen und Fahrräder fahren dürfen. (Quelle: Shutterstock)

Wirklich clever, wie Cadence den Status von Workflows verwaltet! Beim üblichen Ansatz für Persistenz und zuverlässige Workflow-Engines wird der Status jeder stateful Workflow-Instanz in einer Datenbank beibehalten. Cadence gibt jedoch Statusänderungen als Historienereignisse an die Datenbank aus. Bei Fehlern (selbst, wenn der Workflow im Ruhezustand war) startet Cadence den Workflow einfach erneut und wiederholt die Historie, um zu bestimmen, welche Aufgabe als nächste ausgeführt werden muss. Hierbei kommen sog. event sourcing pattern zum Einsatz, die, ähnlich wie Kafka Streams, den Zustand aus den Ereignisströmen oder Ereignisströme aus dem Zustand berechnen können.

Bei dieser Vorgehensweise wird eine große Anzahl an gleichzeitig ausgeführten Workflow-Instanzen unterstützt, wie sie bei Workflows mit langer Laufzeit und hohem Durchsatz wahrscheinlich vorkommen werden. Dies hat allerdings negative Folgen für lange Workflows: Es sind immer größere Historien erforderlich, und viele Ereignisse müssen wiederholt werden, um den korrekten Zustand zu erhalten.

Dieser Ansatz zieht jedoch einige wichtige Beschränkungen für die Workflow-Implementierung nach sich, denen Sie sich bewusst sein müssen – sie müssen im Prinzip deterministisch sein. Die Cadence-Workflow-Klasse verfügt über viele hilfreiche Methoden, die anstelle von unsicheren / nicht-deterministischen / von Nebeneffekten betroffenen standardmäßigen Java-Methoden verwendet werden können, insbesondere für Zeit, Ruhezustand und Zufallszahlen.

Das ist so ziemlich alles, was ich bis jetzt über Cadence weiß. Es gibt jedoch noch viel mehr zu entdecken, u. a. über die Behandlung von Ausnahmen, Neuversuche, lang laufende Transaktionen, verschiedene Arten von Timeouts, Kompensations-Activities, Signale, Abfragen, abgeleitete Workflows, den Aufruf von Remote-APIs und die Integration mit Kafka (z. B. das Senden und Blockieren/Empfangen einer Nachricht als Activity).

Wir unterstützen Sie gerne

Ob Cadence, Debian oder PostgreSQL: mit über 22+ Jahren an Entwicklungs- und Dienstleistungserfahrung im Open Source Bereich, können credativ und Instaclustr Sie mit einem beispiellosen und individuell konfigurierbaren Support professionell begleiten und Sie in allen Fragen bei Ihrer Open Source Infrastruktur voll und ganz unterstützen.

Sie möchten mehr über Cadence lernen und über die Vorteile die es Ihrer Organisation bietet. Dann laden Sie sich unser englischsprachiges Whitepaper runter.

Sollten Sie Fragen zu unserem Artikel haben oder würden sich wünschen, dass unsere Spezialisten sich Ihr System angucken und Ihre Infrastruktur optimieren, dann schauen Sie doch vorbei und melden sich über unser Kontaktformular oder schreiben uns eine E-mail an info@credativ.de.

Über unsere Mutterfirma Instaclustr bieten wir auch eine komplett verwaltete Plattform für Cadence an.

Original englischsprachige Artikel auf Instaclustr.com

Folgen Sie der Reihe auf credativ.de: Drohnen fliegen lassen mit Cadence